aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Mailman/Queue/Runner.py15
-rw-r--r--Mailman/Queue/Switchboard.py37
-rw-r--r--bin/unshunt9
3 files changed, 47 insertions, 14 deletions
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py
index 1e7854d7..e8c72273 100644
--- a/Mailman/Queue/Runner.py
+++ b/Mailman/Queue/Runner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 1998-2004 by the Free Software Foundation, Inc.
+# Copyright (C) 1998-2006 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -12,7 +12,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
"""Generic queue runner class.
"""
@@ -28,8 +29,8 @@ from Mailman import Errors
from Mailman import MailList
from Mailman import i18n
-from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
+from Mailman.Queue.Switchboard import Switchboard
import email.Errors
@@ -49,7 +50,7 @@ class Runner:
self._kids = {}
# Create our own switchboard. Don't use the switchboard cache because
# we want to provide slice and numslice arguments.
- self._switchboard = Switchboard(self.QDIR, slice, numslices)
+ self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
# Create the shunt switchboard
self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
self._stop = False
@@ -109,6 +110,7 @@ class Runner:
continue
try:
self._onefile(msg, msgdata)
+ self._switchboard.finish(filebase)
except Exception, e:
# All runners that implement _dispose() must guarantee that
# exceptions are caught and dealt with properly. Still, there
@@ -119,8 +121,9 @@ class Runner:
self._log(e)
# Put a marker in the metadata for unshunting
msgdata['whichq'] = self._switchboard.whichq()
- filebase = self._shunt.enqueue(msg, msgdata)
- syslog('error', 'SHUNTING: %s', filebase)
+ new_filebase = self._shunt.enqueue(msg, msgdata)
+ syslog('error', 'SHUNTING: %s', new_filebase)
+ self._switchboard.finish(filebase)
# Other work we want to do each time through the loop
Utils.reap(self._kids, once=True)
self._doperiodic()
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index 17046a8c..10bb9393 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -67,7 +67,7 @@ DELTA = .0001
class Switchboard:
- def __init__(self, whichq, slice=None, numslices=1):
+ def __init__(self, whichq, slice=None, numslices=1, recover=False):
self.__whichq = whichq
# Create the directory if it doesn't yet exist.
# FIXME
@@ -86,6 +86,8 @@ class Switchboard:
if numslices <> 1:
self.__lower = ((shamax+1) * slice) / numslices
self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1
+ if recover:
+ self.recover_backup_files()
def whichq(self):
return self.__whichq
@@ -143,9 +145,16 @@ class Switchboard:
def dequeue(self, filebase):
# Calculate the filename from the given filebase.
filename = os.path.join(self.__whichq, filebase + '.pck')
+ backfile = os.path.join(self.__whichq, filebase + '.bak')
# Read the message object and metadata.
fp = open(filename)
- os.unlink(filename)
+ # Move the file to the backup file name for processing. If this
+ # process crashes uncleanly the .bak file will be used to re-instate
+ # the .pck file in order to try again. XXX what if something caused
+ # Python to constantly crash? Is it possible that we'd end up mail
+ # bombing recipients or crushing the archiver? How would we defend
+ # against that?
+ os.rename(filename, backfile)
try:
msg = cPickle.load(fp)
data = cPickle.load(fp)
@@ -155,26 +164,42 @@ class Switchboard:
msg = email.message_from_string(msg, Message.Message)
return msg, data
- def files(self):
+ def finish(self, filebase):
+ bakfile = os.path.join(self.__whichq, filebase + '.bak')
+ try:
+ os.unlink(bakfile)
+ except EnvironmentError, e:
+ syslog('error', 'Failed to unlink backup file: %s', bakfile)
+
+ def files(self, extension='.pck'):
times = {}
lower = self.__lower
upper = self.__upper
for f in os.listdir(self.__whichq):
# By ignoring anything that doesn't end in .pck, we ignore
# tempfiles and avoid a race condition.
- if not f.endswith('.pck'):
+ filebase, ext = os.path.splitext(f)
+ if ext <> extension:
continue
- filebase = os.path.splitext(f)[0]
when, digest = filebase.split('+')
# Throw out any files which don't match our bitrange. BAW: test
# performance and end-cases of this algorithm. MAS: both
# comparisons need to be <= to get complete range.
if lower is None or (lower <= long(digest, 16) <= upper):
key = float(when)
- while times.has_key(key):
+ while key in times.keys():
key += DELTA
times[key] = filebase
# FIFO sort
keys = times.keys()
keys.sort()
return [times[k] for k in keys]
+
+ def recover_backup_files(self):
+ # Move all .bak files in our slice to .pck. It's impossible for both
+ # to exist at the same time, so the move is enough to ensure that our
+ # normal dequeuing process will handle them.
+ for filebase in self.files('.bak'):
+ src = os.path.join(self.__whichq, filebase + '.bak')
+ dst = os.path.join(self.__whichq, filebase + '.pck')
+ os.rename(src, dst) \ No newline at end of file
diff --git a/bin/unshunt b/bin/unshunt
index 8b675587..8c1c117f 100644
--- a/bin/unshunt
+++ b/bin/unshunt
@@ -1,6 +1,6 @@
#! @PYTHON@
-# Copyright (C) 2002 by the Free Software Foundation, Inc.
+# Copyright (C) 2002-2006 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -14,7 +14,8 @@
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
"""Move a message from the shunt queue to the original queue.
@@ -69,6 +70,7 @@ def main():
usage(1)
sb = get_switchboard(qdir)
+ sb.recover_backup_files()
for filebase in sb.files():
try:
msg, msgdata = sb.dequeue(filebase)
@@ -80,6 +82,9 @@ def main():
# other shunted messages.
print >> sys.stderr, _(
'Cannot unshunt message %(filebase)s, skipping:\n%(e)s')
+ else:
+ # Unlink the .bak file left by dequeue()
+ sb.finish(filebase)