diff options
-rw-r--r-- | Mailman/Queue/Runner.py | 15 | ||||
-rw-r--r-- | Mailman/Queue/Switchboard.py | 37 | ||||
-rw-r--r-- | bin/unshunt | 9 |
3 files changed, 47 insertions, 14 deletions
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py index 1e7854d7..e8c72273 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/Queue/Runner.py @@ -1,4 +1,4 @@ -# Copyright (C) 1998-2004 by the Free Software Foundation, Inc. +# Copyright (C) 1998-2006 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -12,7 +12,8 @@ # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. """Generic queue runner class. """ @@ -28,8 +29,8 @@ from Mailman import Errors from Mailman import MailList from Mailman import i18n -from Mailman.Queue.Switchboard import Switchboard from Mailman.Logging.Syslog import syslog +from Mailman.Queue.Switchboard import Switchboard import email.Errors @@ -49,7 +50,7 @@ class Runner: self._kids = {} # Create our own switchboard. Don't use the switchboard cache because # we want to provide slice and numslice arguments. - self._switchboard = Switchboard(self.QDIR, slice, numslices) + self._switchboard = Switchboard(self.QDIR, slice, numslices, True) # Create the shunt switchboard self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) self._stop = False @@ -109,6 +110,7 @@ class Runner: continue try: self._onefile(msg, msgdata) + self._switchboard.finish(filebase) except Exception, e: # All runners that implement _dispose() must guarantee that # exceptions are caught and dealt with properly. Still, there @@ -119,8 +121,9 @@ class Runner: self._log(e) # Put a marker in the metadata for unshunting msgdata['whichq'] = self._switchboard.whichq() - filebase = self._shunt.enqueue(msg, msgdata) - syslog('error', 'SHUNTING: %s', filebase) + new_filebase = self._shunt.enqueue(msg, msgdata) + syslog('error', 'SHUNTING: %s', new_filebase) + self._switchboard.finish(filebase) # Other work we want to do each time through the loop Utils.reap(self._kids, once=True) self._doperiodic() diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 17046a8c..10bb9393 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -67,7 +67,7 @@ DELTA = .0001 class Switchboard: - def __init__(self, whichq, slice=None, numslices=1): + def __init__(self, whichq, slice=None, numslices=1, recover=False): self.__whichq = whichq # Create the directory if it doesn't yet exist. # FIXME @@ -86,6 +86,8 @@ class Switchboard: if numslices <> 1: self.__lower = ((shamax+1) * slice) / numslices self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1 + if recover: + self.recover_backup_files() def whichq(self): return self.__whichq @@ -143,9 +145,16 @@ class Switchboard: def dequeue(self, filebase): # Calculate the filename from the given filebase. filename = os.path.join(self.__whichq, filebase + '.pck') + backfile = os.path.join(self.__whichq, filebase + '.bak') # Read the message object and metadata. fp = open(filename) - os.unlink(filename) + # Move the file to the backup file name for processing. If this + # process crashes uncleanly the .bak file will be used to re-instate + # the .pck file in order to try again. XXX what if something caused + # Python to constantly crash? Is it possible that we'd end up mail + # bombing recipients or crushing the archiver? How would we defend + # against that? + os.rename(filename, backfile) try: msg = cPickle.load(fp) data = cPickle.load(fp) @@ -155,26 +164,42 @@ class Switchboard: msg = email.message_from_string(msg, Message.Message) return msg, data - def files(self): + def finish(self, filebase): + bakfile = os.path.join(self.__whichq, filebase + '.bak') + try: + os.unlink(bakfile) + except EnvironmentError, e: + syslog('error', 'Failed to unlink backup file: %s', bakfile) + + def files(self, extension='.pck'): times = {} lower = self.__lower upper = self.__upper for f in os.listdir(self.__whichq): # By ignoring anything that doesn't end in .pck, we ignore # tempfiles and avoid a race condition. - if not f.endswith('.pck'): + filebase, ext = os.path.splitext(f) + if ext <> extension: continue - filebase = os.path.splitext(f)[0] when, digest = filebase.split('+') # Throw out any files which don't match our bitrange. BAW: test # performance and end-cases of this algorithm. MAS: both # comparisons need to be <= to get complete range. if lower is None or (lower <= long(digest, 16) <= upper): key = float(when) - while times.has_key(key): + while key in times.keys(): key += DELTA times[key] = filebase # FIFO sort keys = times.keys() keys.sort() return [times[k] for k in keys] + + def recover_backup_files(self): + # Move all .bak files in our slice to .pck. It's impossible for both + # to exist at the same time, so the move is enough to ensure that our + # normal dequeuing process will handle them. + for filebase in self.files('.bak'): + src = os.path.join(self.__whichq, filebase + '.bak') + dst = os.path.join(self.__whichq, filebase + '.pck') + os.rename(src, dst)
\ No newline at end of file diff --git a/bin/unshunt b/bin/unshunt index 8b675587..8c1c117f 100644 --- a/bin/unshunt +++ b/bin/unshunt @@ -1,6 +1,6 @@ #! @PYTHON@ -# Copyright (C) 2002 by the Free Software Foundation, Inc. +# Copyright (C) 2002-2006 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -14,7 +14,8 @@ # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. """Move a message from the shunt queue to the original queue. @@ -69,6 +70,7 @@ def main(): usage(1) sb = get_switchboard(qdir) + sb.recover_backup_files() for filebase in sb.files(): try: msg, msgdata = sb.dequeue(filebase) @@ -80,6 +82,9 @@ def main(): # other shunted messages. print >> sys.stderr, _( 'Cannot unshunt message %(filebase)s, skipping:\n%(e)s') + else: + # Unlink the .bak file left by dequeue() + sb.finish(filebase) |