diff options
author | bwarsaw <> | 2003-09-22 02:34:11 +0000 |
---|---|---|
committer | bwarsaw <> | 2003-09-22 02:34:11 +0000 |
commit | d206e467d12f5e5a6e08f6ed11a559054f2e183f (patch) | |
tree | 89ed27fe7ea928f76f23a6d52ffb4d50fcdbdb7a /Mailman/Queue | |
parent | aa151822b8ed7d5b3751ac6c33e0f603ca67a27e (diff) | |
download | mailman2-d206e467d12f5e5a6e08f6ed11a559054f2e183f.tar.gz mailman2-d206e467d12f5e5a6e08f6ed11a559054f2e183f.tar.xz mailman2-d206e467d12f5e5a6e08f6ed11a559054f2e183f.zip |
Backporting from the HEAD -- updated queue runners
Diffstat (limited to 'Mailman/Queue')
-rw-r--r-- | Mailman/Queue/BounceRunner.py | 105 | ||||
-rw-r--r-- | Mailman/Queue/NewsRunner.py | 23 | ||||
-rw-r--r-- | Mailman/Queue/OutgoingRunner.py | 62 | ||||
-rw-r--r-- | Mailman/Queue/Runner.py | 47 | ||||
-rw-r--r-- | Mailman/Queue/Switchboard.py | 24 |
5 files changed, 153 insertions, 108 deletions
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py index e59ac47e..b416d19a 100644 --- a/Mailman/Queue/BounceRunner.py +++ b/Mailman/Queue/BounceRunner.py @@ -17,6 +17,8 @@ """Bounce queue runner.""" import re +import time + from email.MIMEText import MIMEText from email.MIMEMessage import MIMEMessage from email.Utils import parseaddr @@ -33,12 +35,22 @@ from Mailman.i18n import _ COMMASPACE = ', ' +REGISTER_BOUNCES_EVERY = mm_cfg.minutes(15) + class BounceRunner(Runner): QDIR = mm_cfg.BOUNCEQUEUE_DIR - # We only do bounce processing once per minute. - SLEEPTIME = mm_cfg.minutes(1) + + def __init__(self, slice=None, numslices=1): + Runner.__init__(self, slice, numslices) + # This is a simple sequence of bounce score events. Each entry in the + # list is a tuple of (address, day, msg) where day is a tuple of + # (YYYY, MM, DD). We'll sort and collate all this information in + # _register_bounces() below. + self._bounces = {} + self._bouncecnt = 0 + self._next_registration = time.time() + REGISTER_BOUNCES_EVERY def _dispose(self, mlist, msg, msgdata): # Make sure we have the most up-to-date state @@ -84,57 +96,48 @@ class BounceRunner(Runner): # although I'm unsure how that could happen. Possibly ScanMessages() # can let None's sneak through. In any event, this will kill them. addrs = filter(None, addrs) - # Okay, we have some recognized addresses. We now need to register - # the bounces for each of these. If the bounce came to the site list, - # then we'll register the address on every list in the system, but - # note: this could be VERY resource intensive! - foundp = 0 - listname = mlist.internal_name() - if listname == mm_cfg.MAILMAN_SITE_LIST: - foundp = 1 - for listname in Utils.list_names(): - xlist = self._open_list(listname) - xlist.Load() - for addr in addrs: - if xlist.isMember(addr): - unlockp = 0 - if not xlist.Locked(): - try: - xlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) - except LockFile.TimeOutError: - # Oh well, forget aboutf this list - continue - unlockp = 1 - try: - xlist.registerBounce(addr, msg) - foundp = 1 - xlist.Save() - finally: - if unlockp: - xlist.Unlock() + # Store the bounce score events so we can register them periodically + today = time.localtime()[:3] + events = [(addr, today, msg) for addr in addrs] + self._bounces.setdefault(mlist.internal_name(), []).extend(events) + self._bouncecnt += len(addrs) + + def _doperiodic(self): + now = time.time() + if self._next_registration > now or not self._bounces: + return + # Let's go ahead and register the bounces we've got stored up + self._next_registration = now + REGISTER_BOUNCES_EVERY + self._register_bounces() + + def _register_bounces(self): + syslog('bounce', 'Processing %s queued bounces', self._bouncecnt) + # First, get the list of bounces register against the site list. For + # these addresses, we want to register a bounce on every list the + # address is a member of -- which we don't know yet. + sitebounces = self._bounces.get(mm_cfg.MAILMAN_SITE_LIST, []) + if sitebounces: + listnames = Utils.list_names() else: + listnames = self._bounces.keys() + for listname in listnames: + mlist = self._open_list(listname) + mlist.Lock() try: - mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) - except LockFile.TimeOutError: - # Try again later - syslog('bounce', "%s: couldn't get list lock", listname) - return 1 - else: - try: - for addr in addrs: - if mlist.isMember(addr): - mlist.registerBounce(addr, msg) - foundp = 1 - mlist.Save() - finally: - mlist.Unlock() - if not foundp: - # It means an address was recognized but it wasn't an address - # that's on any mailing list at this site. BAW: don't forward - # these, but do log it. - syslog('bounce', 'bounce message with non-members of %s: %s', - listname, COMMASPACE.join(addrs)) - #maybe_forward(mlist, msg) + events = self._bounces.get(listname, []) + sitebounces + for addr, day, msg in events: + mlist.registerBounce(addr, msg, day=day) + mlist.Save() + finally: + mlist.Unlock() + # Reset and free all the cached memory + self._bounces = {} + self._bouncecnt = 0 + + def _cleanup(self): + if self._bounces: + self._register_bounces() + Runner._cleanup(self) diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/Queue/NewsRunner.py index 0439f0e1..2b40a3ca 100644 --- a/Mailman/Queue/NewsRunner.py +++ b/Mailman/Queue/NewsRunner.py @@ -1,4 +1,4 @@ -# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc. +# Copyright (C) 2000-2003 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -45,6 +45,13 @@ mcre = re.compile(r""" """, re.VERBOSE) +try: + True, False +except NameError: + True = 1 + False = 0 + + class NewsRunner(Runner): QDIR = mm_cfg.NEWSQUEUE_DIR @@ -60,7 +67,9 @@ class NewsRunner(Runner): conn = None try: try: - conn = nntplib.NNTP(mlist.nntp_host, readermode=1, + nntp_host, nntp_port = Utils.nntpsplit(mlist.nntp_host) + conn = nntplib.NNTP(nntp_host, nntp_port, + readermode=True, user=mm_cfg.NNTP_USERNAME, password=mm_cfg.NNTP_PASSWORD) conn.post(fp) @@ -79,8 +88,8 @@ class NewsRunner(Runner): # Some other exception occurred, which we definitely did not # expect, so set this message up for requeuing. self._log(e) - return 1 - return 0 + return True + return False @@ -123,13 +132,13 @@ def prepare_message(mlist, msg, msgdata): # # Our Message-ID format is <mailman.secs.pid.listname@hostname> msgid = msg['message-id'] - hackmsgid = 1 + hackmsgid = True if msgid: mo = mcre.search(msgid) if mo: lname, hname = mo.group('listname', 'hostname') if lname == mlist.internal_name() and hname == mlist.host_name: - hackmsgid = 0 + hackmsgid = False if hackmsgid: del msg['message-id'] msg['Message-ID'] = Utils.unique_message_id(mlist) @@ -155,4 +164,4 @@ def prepare_message(mlist, msg, msgdata): for v in values[1:]: msg[rewrite] = v # Mark this message as prepared in case it has to be requeued - msgdata['prepped'] = 1 + msgdata['prepped'] = True diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py index 11c94dfe..a1b6bdeb 100644 --- a/Mailman/Queue/OutgoingRunner.py +++ b/Mailman/Queue/OutgoingRunner.py @@ -29,11 +29,12 @@ from Mailman import Message from Mailman import Errors from Mailman import LockFile from Mailman.Queue.Runner import Runner +from Mailman.Queue.Switchboard import Switchboard from Mailman.Logging.Syslog import syslog # This controls how often _doperiodic() will try to deal with deferred # permanent failures. It is a count of calls to _doperiodic() -DEAL_WITH_PERMFAILURES_EVERY = 1 +DEAL_WITH_PERMFAILURES_EVERY = 10 try: True, False @@ -59,6 +60,7 @@ class OutgoingRunner(Runner): # error log. It gets reset if the message was successfully sent, and # set if there was a socket.error. self.__logged = False + self.__retryq = Switchboard(mm_cfg.RETRYQUEUE_DIR) def _dispose(self, mlist, msg, msgdata): # See if we should retry delivery of this message again. @@ -98,33 +100,30 @@ class OutgoingRunner(Runner): # For permanent failures, make a copy of the message for bounce # handling. I'm not sure this is necessary, or the right thing to # do. - pcnt = len(e.permfailures) - msgcopy = copy.deepcopy(msg) - self._permfailures.setdefault(mlist, []).extend( - zip(e.permfailures, [msgcopy] * pcnt)) - # Temporary failures - if not e.tempfailures: - # Don't need to keep the message queued if there were only - # permanent failures. - return False - now = time.time() - recips = e.tempfailures - last_recip_count = msgdata.get('last_recip_count', 0) - deliver_until = msgdata.get('deliver_until', now) - if len(recips) == last_recip_count: - # We didn't make any progress, so don't attempt delivery any - # longer. BAW: is this the best disposition? - if now > deliver_until: - return False - else: - # Keep trying to delivery this message for a while - deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD - msgdata['last_recip_count'] = len(recips) - msgdata['deliver_until'] = deliver_until - msgdata['deliver_after'] = now + mm_cfg.DELIVERY_RETRY_WAIT - msgdata['recips'] = recips - # Requeue - return True + if e.permfailures: + pcnt = len(e.permfailures) + msgcopy = copy.deepcopy(msg) + self._permfailures.setdefault(mlist, []).extend( + zip(e.permfailures, [msgcopy] * pcnt)) + # Move temporary failures to the qfiles/retry queue which will + # occasionally move them back here for another shot at delivery. + if e.tempfailures: + now = time.time() + recips = e.tempfailures + last_recip_count = msgdata.get('last_recip_count', 0) + deliver_until = msgdata.get('deliver_until', now) + if len(recips) == last_recip_count: + # We didn't make any progress, so don't attempt delivery + # any longer. BAW: is this the best disposition? + if now > deliver_until: + return False + else: + # Keep trying to delivery this message for a while + deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD + msgdata['last_recip_count'] = len(recips) + msgdata['deliver_until'] = deliver_until + msgdata['recips'] = recips + self.__retryq.enqueue(msg, msgdata) # We've successfully completed handling of this message return False @@ -134,6 +133,9 @@ class OutgoingRunner(Runner): self._permfail_counter += 1 if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY: return + self._handle_permfailures() + + def _handle_permfailures(self): # Reset the counter self._permfail_counter = 0 # And deal with the deferred permanent failures. @@ -149,3 +151,7 @@ class OutgoingRunner(Runner): mlist.Save() finally: mlist.Unlock() + + def _cleanup(self): + self._handle_permfailures() + Runner._cleanup(self) diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py index 134dac99..8b5bddcb 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/Queue/Runner.py @@ -1,4 +1,4 @@ -# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc. +# Copyright (C) 1998-2003 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -31,6 +31,12 @@ from Mailman import i18n from Mailman.Queue.Switchboard import Switchboard from Mailman.Logging.Syslog import syslog +try: + True, False +except NameError: + True = 1 + False = 0 + class Runner: @@ -44,16 +50,16 @@ class Runner: self._switchboard = Switchboard(self.QDIR, slice, numslices) # Create the shunt switchboard self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) - self._stop = 0 + self._stop = False def stop(self): - self._stop = 1 + self._stop = True def run(self): # Start the main loop for this queue runner. try: try: - while 1: + while True: # Once through the loop that processes all the files in # the queue directory. filecnt = self._oneloop() @@ -64,10 +70,10 @@ class Runner: # If the stop flag is set, we're done. if self._stop: break - # If there were no files to process, then we'll simply - # sleep for a little while and expect some to show up. - if not filecnt: - self._snooze() + # Give the runner an opportunity to snooze for a while, + # but pass it the file count so it can decide whether to + # do more work now or not. + self._snooze(filecnt) except KeyboardInterrupt: pass finally: @@ -79,8 +85,7 @@ class Runner: # First, list all the files in our queue directory. # Switchboard.files() is guaranteed to hand us the files in FIFO # order. Return an integer count of the number of files that were - # available for this qrunner to process. A non-zero value tells run() - # not to snooze for a while. + # available for this qrunner to process. files = self._switchboard.files() for filebase in files: # Ask the switchboard for the message and metadata objects @@ -105,12 +110,12 @@ class Runner: self._onefile(msg, msgdata) except Exception, e: self._log(e) - syslog('error', 'SHUNTING: %s', filebase) # Put a marker in the metadata for unshunting msgdata['whichq'] = self._switchboard.whichq() - self._shunt.enqueue(msg, msgdata) + filebase = self._shunt.enqueue(msg, msgdata) + syslog('error', 'SHUNTING: %s', filebase) # Other work we want to do each time through the loop - Utils.reap(self._kids, once=1) + Utils.reap(self._kids, once=True) self._doperiodic() if self._shortcircuit(): break @@ -172,7 +177,7 @@ class Runner: mlist = self._listcache.get(listname) if not mlist: try: - mlist = MailList.MailList(listname, lock=0) + mlist = MailList.MailList(listname, lock=False) except Errors.MMListError, e: syslog('error', 'error opening list: %s\n%s', listname, e) return None @@ -219,18 +224,18 @@ class Runner: from the Runner's hash slice processing loop. You can do whatever special periodic processing you want here, and the return value is irrelevant. - """ pass - def _snooze(self): - """Sleep for a little while, because there was nothing to do. + def _snooze(self, filecnt): + """Sleep for a little while. - This is called from the Runner's main loop, but only when the last - processing loop had no work to do (i.e. there were no messages in it's - little slice of hash space). + filecnt is the number of messages in the queue the last time through. + Sub-runners can decide to continue to do work, or sleep for a while + based on this value. By default, we only snooze if there was nothing + to do last time around. """ - if self.SLEEPTIME <= 0: + if filecnt or self.SLEEPTIME <= 0: return time.sleep(self.SLEEPTIME) diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 77179fcf..4a56223d 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -50,7 +50,20 @@ from Mailman.Logging.Syslog import syslog # 20 bytes of all bits set, maximum sha.digest() value shamax = 0xffffffffffffffffffffffffffffffffffffffffL -SAVE_MSGS_AS_PICKLES = 1 +try: + True, False +except NameError: + True = 1 + False = 0 + +# This flag causes Mailman to fsync() the file after writing and flushing its +# contents. While this ensures the data is written to disk, avoiding data +# loss, it is a huge performance killer. +SYNC_AFTER_WRITE = False +# This flag causes messages to be written as pickles (when True) or text files +# (when False). Pickles are more efficient because the message doesn't need +# to be re-parsed every time it's unqueued, but pickles are not human readable. +SAVE_MSGS_AS_PICKLES = True @@ -127,6 +140,7 @@ class _Switchboard: tmpfile = dbfile + '.tmp' self._ext_write(tmpfile, data) os.rename(tmpfile, dbfile) + return filebase def dequeue(self, filebase): # Calculate the .db and .msg filenames from the given filebase. @@ -239,6 +253,10 @@ class MarshalSwitchboard(_Switchboard): else: dict[attr] = repr(fval) marshal.dump(dict, fp) + # Make damn sure that the data we just wrote gets flushed to disk + fp.flush() + if SYNC_AFTER_WRITE: + os.fsync(fp.fileno()) fp.close() def _ext_read(self, filename): @@ -307,6 +325,10 @@ class ASCIISwitchboard(_Switchboard): os.umask(omask) for k, v in dict.items(): print >> fp, '%s = %s' % (k, repr(v)) + # Make damn sure that the data we just wrote gets flushed to disk + fp.flush() + if SYNC_AFTER_WRITE: + os.fsync(fp.fileno()) fp.close() def _ext_read(self, filename): |