aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman/Queue/Runner.py
blob: 134dac9988c6da9dfdf9e6443feb4b856795eb0f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

"""Generic queue runner class.
"""

import time
import traceback
import weakref
from cStringIO import StringIO

from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Errors
from Mailman import MailList
from Mailman import i18n

from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog



class Runner:
    QDIR = None
    SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME

    def __init__(self, slice=None, numslices=1):
        self._kids = {}
        # Create our own switchboard.  Don't use the switchboard cache because
        # we want to provide slice and numslice arguments.
        self._switchboard = Switchboard(self.QDIR, slice, numslices)
        # Create the shunt switchboard
        self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
        self._stop = 0

    def stop(self):
        self._stop = 1

    def run(self):
        # Start the main loop for this queue runner.
        try:
            try:
                while 1:
                    # Once through the loop that processes all the files in
                    # the queue directory.
                    filecnt = self._oneloop()
                    # Do the periodic work for the subclass.  BAW: this
                    # shouldn't be called here.  There should be one more
                    # _doperiodic() call at the end of the _oneloop() loop.
                    self._doperiodic()
                    # If the stop flag is set, we're done.
                    if self._stop:
                        break
                    # If there were no files to process, then we'll simply
                    # sleep for a little while and expect some to show up.
                    if not filecnt:
                        self._snooze()
            except KeyboardInterrupt:
                pass
        finally:
            # We've broken out of our main loop, so we want to reap all the
            # subprocesses we've created and do any other necessary cleanups.
            self._cleanup()

    def _oneloop(self):
        # First, list all the files in our queue directory.
        # Switchboard.files() is guaranteed to hand us the files in FIFO
        # order.  Return an integer count of the number of files that were
        # available for this qrunner to process.  A non-zero value tells run()
        # not to snooze for a while.
        files = self._switchboard.files()
        for filebase in files:
            # Ask the switchboard for the message and metadata objects
            # associated with this filebase.
            msg, msgdata = self._switchboard.dequeue(filebase)
            # It's possible one or both files got lost.  If so, just ignore
            # this filebase entry.  dequeue() will automatically unlink the
            # other file, but we should log an error message for diagnostics.
            if msg is None or msgdata is None:
                syslog('error', 'lost data files for filebase: %s', filebase)
            else:
                # Now that we've dequeued the message, we want to be
                # incredibly anal about making sure that no uncaught exception
                # could cause us to lose the message.  All runners that
                # implement _dispose() must guarantee that exceptions are
                # caught and dealt with properly.  Still, there may be a bug
                # in the infrastructure, and we do not want those to cause
                # messages to be lost.  Any uncaught exceptions will cause the
                # message to be stored in the shunt queue for human
                # intervention.
                try:
                    self._onefile(msg, msgdata)
                except Exception, e:
                    self._log(e)
                    syslog('error', 'SHUNTING: %s', filebase)
                    # Put a marker in the metadata for unshunting
                    msgdata['whichq'] = self._switchboard.whichq()
                    self._shunt.enqueue(msg, msgdata)
            # Other work we want to do each time through the loop
            Utils.reap(self._kids, once=1)
            self._doperiodic()
            if self._shortcircuit():
                break
        return len(files)

    def _onefile(self, msg, msgdata):
        # Do some common sanity checking on the message metadata.  It's got to
        # be destined for a particular mailing list.  This switchboard is used
        # to shunt off badly formatted messages.  We don't want to just trash
        # them because they may be fixable with human intervention.  Just get
        # them out of our site though.
        #
        # Find out which mailing list this message is destined for.
        listname = msgdata.get('listname')
        if not listname:
            listname = mm_cfg.MAILMAN_SITE_LIST
        mlist = self._open_list(listname)
        if not mlist:
            syslog('error',
                   'Dequeuing message destined for missing list: %s',
                   listname)
            self._shunt.enqueue(msg, msgdata)
            return
        # Now process this message, keeping track of any subprocesses that may
        # have been spawned.  We'll reap those later.
        #
        # We also want to set up the language context for this message.  The
        # context will be the preferred language for the user if a member of
        # the list, or the list's preferred language.  However, we must take
        # special care to reset the defaults, otherwise subsequent messages
        # may be translated incorrectly.  BAW: I'm not sure I like this
        # approach, but I can't think of anything better right now.
        otranslation = i18n.get_translation()
        sender = msg.get_sender()
        if mlist:
            lang = mlist.getMemberLanguage(sender)
        else:
            lang = mm_cfg.DEFAULT_SERVER_LANGUAGE
        i18n.set_language(lang)
        msgdata['lang'] = lang
        try:
            keepqueued = self._dispose(mlist, msg, msgdata)
        finally:
            i18n.set_translation(otranslation)
        # Keep tabs on any child processes that got spawned.
        kids = msgdata.get('_kids')
        if kids:
            self._kids.update(kids)
        if keepqueued:
            self._switchboard.enqueue(msg, msgdata)

    # Mapping of listnames to MailList instances as a weak value dictionary.
    _listcache = weakref.WeakValueDictionary()

    def _open_list(self, listname):
        # Cache the open list so that any use of the list within this process
        # uses the same object.  We use a WeakValueDictionary so that when the
        # list is no longer necessary, its memory is freed.
        mlist = self._listcache.get(listname)
        if not mlist:
            try:
                mlist = MailList.MailList(listname, lock=0)
            except Errors.MMListError, e:
                syslog('error', 'error opening list: %s\n%s', listname, e)
                return None
            else:
                self._listcache[listname] = mlist
        return mlist

    def _log(self, exc):
        syslog('error', 'Uncaught runner exception: %s', exc)
        s = StringIO()
        traceback.print_exc(file=s)
        syslog('error', s.getvalue())

    #
    # Subclasses can override these methods.
    #
    def _cleanup(self):
        """Clean up upon exit from the main processing loop.

        Called when the Runner's main loop is stopped, this should perform
        any necessary resource deallocation.  Its return value is irrelevant.
        """
        Utils.reap(self._kids)

    def _dispose(self, mlist, msg, msgdata):
        """Dispose of a single message destined for a mailing list.

        Called for each message that the Runner is responsible for, this is
        the primary overridable method for processing each message.
        Subclasses, must provide implementation for this method.

        mlist is the MailList instance this message is destined for.

        msg is the Message object representing the message.

        msgdata is a dictionary of message metadata.
        """
        raise NotImplementedError

    def _doperiodic(self):
        """Do some processing `every once in a while'.

        Called every once in a while both from the Runner's main loop, and
        from the Runner's hash slice processing loop.  You can do whatever
        special periodic processing you want here, and the return value is
        irrelevant.

        """
        pass

    def _snooze(self):
        """Sleep for a little while, because there was nothing to do.

        This is called from the Runner's main loop, but only when the last
        processing loop had no work to do (i.e. there were no messages in it's
        little slice of hash space).
        """
        if self.SLEEPTIME <= 0:
            return
        time.sleep(self.SLEEPTIME)

    def _shortcircuit(self):
        """Return a true value if the individual file processing loop should
        exit before it's finished processing each message in the current slice
        of hash space.  A false value tells _oneloop() to continue processing
        until the current snapshot of hash space is exhausted.

        You could, for example, implement a throttling algorithm here.
        """
        return self._stop