aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman/Queue
diff options
context:
space:
mode:
Diffstat (limited to 'Mailman/Queue')
-rw-r--r--Mailman/Queue/BounceRunner.py105
-rw-r--r--Mailman/Queue/NewsRunner.py23
-rw-r--r--Mailman/Queue/OutgoingRunner.py62
-rw-r--r--Mailman/Queue/Runner.py47
-rw-r--r--Mailman/Queue/Switchboard.py24
5 files changed, 153 insertions, 108 deletions
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py
index e59ac47e..b416d19a 100644
--- a/Mailman/Queue/BounceRunner.py
+++ b/Mailman/Queue/BounceRunner.py
@@ -17,6 +17,8 @@
"""Bounce queue runner."""
import re
+import time
+
from email.MIMEText import MIMEText
from email.MIMEMessage import MIMEMessage
from email.Utils import parseaddr
@@ -33,12 +35,22 @@ from Mailman.i18n import _
COMMASPACE = ', '
+REGISTER_BOUNCES_EVERY = mm_cfg.minutes(15)
+
class BounceRunner(Runner):
QDIR = mm_cfg.BOUNCEQUEUE_DIR
- # We only do bounce processing once per minute.
- SLEEPTIME = mm_cfg.minutes(1)
+
+ def __init__(self, slice=None, numslices=1):
+ Runner.__init__(self, slice, numslices)
+ # This is a simple sequence of bounce score events. Each entry in the
+ # list is a tuple of (address, day, msg) where day is a tuple of
+ # (YYYY, MM, DD). We'll sort and collate all this information in
+ # _register_bounces() below.
+ self._bounces = {}
+ self._bouncecnt = 0
+ self._next_registration = time.time() + REGISTER_BOUNCES_EVERY
def _dispose(self, mlist, msg, msgdata):
# Make sure we have the most up-to-date state
@@ -84,57 +96,48 @@ class BounceRunner(Runner):
# although I'm unsure how that could happen. Possibly ScanMessages()
# can let None's sneak through. In any event, this will kill them.
addrs = filter(None, addrs)
- # Okay, we have some recognized addresses. We now need to register
- # the bounces for each of these. If the bounce came to the site list,
- # then we'll register the address on every list in the system, but
- # note: this could be VERY resource intensive!
- foundp = 0
- listname = mlist.internal_name()
- if listname == mm_cfg.MAILMAN_SITE_LIST:
- foundp = 1
- for listname in Utils.list_names():
- xlist = self._open_list(listname)
- xlist.Load()
- for addr in addrs:
- if xlist.isMember(addr):
- unlockp = 0
- if not xlist.Locked():
- try:
- xlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
- except LockFile.TimeOutError:
- # Oh well, forget aboutf this list
- continue
- unlockp = 1
- try:
- xlist.registerBounce(addr, msg)
- foundp = 1
- xlist.Save()
- finally:
- if unlockp:
- xlist.Unlock()
+ # Store the bounce score events so we can register them periodically
+ today = time.localtime()[:3]
+ events = [(addr, today, msg) for addr in addrs]
+ self._bounces.setdefault(mlist.internal_name(), []).extend(events)
+ self._bouncecnt += len(addrs)
+
+ def _doperiodic(self):
+ now = time.time()
+ if self._next_registration > now or not self._bounces:
+ return
+ # Let's go ahead and register the bounces we've got stored up
+ self._next_registration = now + REGISTER_BOUNCES_EVERY
+ self._register_bounces()
+
+ def _register_bounces(self):
+ syslog('bounce', 'Processing %s queued bounces', self._bouncecnt)
+ # First, get the list of bounces register against the site list. For
+ # these addresses, we want to register a bounce on every list the
+ # address is a member of -- which we don't know yet.
+ sitebounces = self._bounces.get(mm_cfg.MAILMAN_SITE_LIST, [])
+ if sitebounces:
+ listnames = Utils.list_names()
else:
+ listnames = self._bounces.keys()
+ for listname in listnames:
+ mlist = self._open_list(listname)
+ mlist.Lock()
try:
- mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
- except LockFile.TimeOutError:
- # Try again later
- syslog('bounce', "%s: couldn't get list lock", listname)
- return 1
- else:
- try:
- for addr in addrs:
- if mlist.isMember(addr):
- mlist.registerBounce(addr, msg)
- foundp = 1
- mlist.Save()
- finally:
- mlist.Unlock()
- if not foundp:
- # It means an address was recognized but it wasn't an address
- # that's on any mailing list at this site. BAW: don't forward
- # these, but do log it.
- syslog('bounce', 'bounce message with non-members of %s: %s',
- listname, COMMASPACE.join(addrs))
- #maybe_forward(mlist, msg)
+ events = self._bounces.get(listname, []) + sitebounces
+ for addr, day, msg in events:
+ mlist.registerBounce(addr, msg, day=day)
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ # Reset and free all the cached memory
+ self._bounces = {}
+ self._bouncecnt = 0
+
+ def _cleanup(self):
+ if self._bounces:
+ self._register_bounces()
+ Runner._cleanup(self)
diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/Queue/NewsRunner.py
index 0439f0e1..2b40a3ca 100644
--- a/Mailman/Queue/NewsRunner.py
+++ b/Mailman/Queue/NewsRunner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc.
+# Copyright (C) 2000-2003 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -45,6 +45,13 @@ mcre = re.compile(r"""
""", re.VERBOSE)
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
+
class NewsRunner(Runner):
QDIR = mm_cfg.NEWSQUEUE_DIR
@@ -60,7 +67,9 @@ class NewsRunner(Runner):
conn = None
try:
try:
- conn = nntplib.NNTP(mlist.nntp_host, readermode=1,
+ nntp_host, nntp_port = Utils.nntpsplit(mlist.nntp_host)
+ conn = nntplib.NNTP(nntp_host, nntp_port,
+ readermode=True,
user=mm_cfg.NNTP_USERNAME,
password=mm_cfg.NNTP_PASSWORD)
conn.post(fp)
@@ -79,8 +88,8 @@ class NewsRunner(Runner):
# Some other exception occurred, which we definitely did not
# expect, so set this message up for requeuing.
self._log(e)
- return 1
- return 0
+ return True
+ return False
@@ -123,13 +132,13 @@ def prepare_message(mlist, msg, msgdata):
#
# Our Message-ID format is <mailman.secs.pid.listname@hostname>
msgid = msg['message-id']
- hackmsgid = 1
+ hackmsgid = True
if msgid:
mo = mcre.search(msgid)
if mo:
lname, hname = mo.group('listname', 'hostname')
if lname == mlist.internal_name() and hname == mlist.host_name:
- hackmsgid = 0
+ hackmsgid = False
if hackmsgid:
del msg['message-id']
msg['Message-ID'] = Utils.unique_message_id(mlist)
@@ -155,4 +164,4 @@ def prepare_message(mlist, msg, msgdata):
for v in values[1:]:
msg[rewrite] = v
# Mark this message as prepared in case it has to be requeued
- msgdata['prepped'] = 1
+ msgdata['prepped'] = True
diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py
index 11c94dfe..a1b6bdeb 100644
--- a/Mailman/Queue/OutgoingRunner.py
+++ b/Mailman/Queue/OutgoingRunner.py
@@ -29,11 +29,12 @@ from Mailman import Message
from Mailman import Errors
from Mailman import LockFile
from Mailman.Queue.Runner import Runner
+from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
# This controls how often _doperiodic() will try to deal with deferred
# permanent failures. It is a count of calls to _doperiodic()
-DEAL_WITH_PERMFAILURES_EVERY = 1
+DEAL_WITH_PERMFAILURES_EVERY = 10
try:
True, False
@@ -59,6 +60,7 @@ class OutgoingRunner(Runner):
# error log. It gets reset if the message was successfully sent, and
# set if there was a socket.error.
self.__logged = False
+ self.__retryq = Switchboard(mm_cfg.RETRYQUEUE_DIR)
def _dispose(self, mlist, msg, msgdata):
# See if we should retry delivery of this message again.
@@ -98,33 +100,30 @@ class OutgoingRunner(Runner):
# For permanent failures, make a copy of the message for bounce
# handling. I'm not sure this is necessary, or the right thing to
# do.
- pcnt = len(e.permfailures)
- msgcopy = copy.deepcopy(msg)
- self._permfailures.setdefault(mlist, []).extend(
- zip(e.permfailures, [msgcopy] * pcnt))
- # Temporary failures
- if not e.tempfailures:
- # Don't need to keep the message queued if there were only
- # permanent failures.
- return False
- now = time.time()
- recips = e.tempfailures
- last_recip_count = msgdata.get('last_recip_count', 0)
- deliver_until = msgdata.get('deliver_until', now)
- if len(recips) == last_recip_count:
- # We didn't make any progress, so don't attempt delivery any
- # longer. BAW: is this the best disposition?
- if now > deliver_until:
- return False
- else:
- # Keep trying to delivery this message for a while
- deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
- msgdata['last_recip_count'] = len(recips)
- msgdata['deliver_until'] = deliver_until
- msgdata['deliver_after'] = now + mm_cfg.DELIVERY_RETRY_WAIT
- msgdata['recips'] = recips
- # Requeue
- return True
+ if e.permfailures:
+ pcnt = len(e.permfailures)
+ msgcopy = copy.deepcopy(msg)
+ self._permfailures.setdefault(mlist, []).extend(
+ zip(e.permfailures, [msgcopy] * pcnt))
+ # Move temporary failures to the qfiles/retry queue which will
+ # occasionally move them back here for another shot at delivery.
+ if e.tempfailures:
+ now = time.time()
+ recips = e.tempfailures
+ last_recip_count = msgdata.get('last_recip_count', 0)
+ deliver_until = msgdata.get('deliver_until', now)
+ if len(recips) == last_recip_count:
+ # We didn't make any progress, so don't attempt delivery
+ # any longer. BAW: is this the best disposition?
+ if now > deliver_until:
+ return False
+ else:
+ # Keep trying to delivery this message for a while
+ deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
+ msgdata['last_recip_count'] = len(recips)
+ msgdata['deliver_until'] = deliver_until
+ msgdata['recips'] = recips
+ self.__retryq.enqueue(msg, msgdata)
# We've successfully completed handling of this message
return False
@@ -134,6 +133,9 @@ class OutgoingRunner(Runner):
self._permfail_counter += 1
if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY:
return
+ self._handle_permfailures()
+
+ def _handle_permfailures(self):
# Reset the counter
self._permfail_counter = 0
# And deal with the deferred permanent failures.
@@ -149,3 +151,7 @@ class OutgoingRunner(Runner):
mlist.Save()
finally:
mlist.Unlock()
+
+ def _cleanup(self):
+ self._handle_permfailures()
+ Runner._cleanup(self)
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py
index 134dac99..8b5bddcb 100644
--- a/Mailman/Queue/Runner.py
+++ b/Mailman/Queue/Runner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
+# Copyright (C) 1998-2003 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -31,6 +31,12 @@ from Mailman import i18n
from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
class Runner:
@@ -44,16 +50,16 @@ class Runner:
self._switchboard = Switchboard(self.QDIR, slice, numslices)
# Create the shunt switchboard
self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
- self._stop = 0
+ self._stop = False
def stop(self):
- self._stop = 1
+ self._stop = True
def run(self):
# Start the main loop for this queue runner.
try:
try:
- while 1:
+ while True:
# Once through the loop that processes all the files in
# the queue directory.
filecnt = self._oneloop()
@@ -64,10 +70,10 @@ class Runner:
# If the stop flag is set, we're done.
if self._stop:
break
- # If there were no files to process, then we'll simply
- # sleep for a little while and expect some to show up.
- if not filecnt:
- self._snooze()
+ # Give the runner an opportunity to snooze for a while,
+ # but pass it the file count so it can decide whether to
+ # do more work now or not.
+ self._snooze(filecnt)
except KeyboardInterrupt:
pass
finally:
@@ -79,8 +85,7 @@ class Runner:
# First, list all the files in our queue directory.
# Switchboard.files() is guaranteed to hand us the files in FIFO
# order. Return an integer count of the number of files that were
- # available for this qrunner to process. A non-zero value tells run()
- # not to snooze for a while.
+ # available for this qrunner to process.
files = self._switchboard.files()
for filebase in files:
# Ask the switchboard for the message and metadata objects
@@ -105,12 +110,12 @@ class Runner:
self._onefile(msg, msgdata)
except Exception, e:
self._log(e)
- syslog('error', 'SHUNTING: %s', filebase)
# Put a marker in the metadata for unshunting
msgdata['whichq'] = self._switchboard.whichq()
- self._shunt.enqueue(msg, msgdata)
+ filebase = self._shunt.enqueue(msg, msgdata)
+ syslog('error', 'SHUNTING: %s', filebase)
# Other work we want to do each time through the loop
- Utils.reap(self._kids, once=1)
+ Utils.reap(self._kids, once=True)
self._doperiodic()
if self._shortcircuit():
break
@@ -172,7 +177,7 @@ class Runner:
mlist = self._listcache.get(listname)
if not mlist:
try:
- mlist = MailList.MailList(listname, lock=0)
+ mlist = MailList.MailList(listname, lock=False)
except Errors.MMListError, e:
syslog('error', 'error opening list: %s\n%s', listname, e)
return None
@@ -219,18 +224,18 @@ class Runner:
from the Runner's hash slice processing loop. You can do whatever
special periodic processing you want here, and the return value is
irrelevant.
-
"""
pass
- def _snooze(self):
- """Sleep for a little while, because there was nothing to do.
+ def _snooze(self, filecnt):
+ """Sleep for a little while.
- This is called from the Runner's main loop, but only when the last
- processing loop had no work to do (i.e. there were no messages in it's
- little slice of hash space).
+ filecnt is the number of messages in the queue the last time through.
+ Sub-runners can decide to continue to do work, or sleep for a while
+ based on this value. By default, we only snooze if there was nothing
+ to do last time around.
"""
- if self.SLEEPTIME <= 0:
+ if filecnt or self.SLEEPTIME <= 0:
return
time.sleep(self.SLEEPTIME)
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index 77179fcf..4a56223d 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -50,7 +50,20 @@ from Mailman.Logging.Syslog import syslog
# 20 bytes of all bits set, maximum sha.digest() value
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
-SAVE_MSGS_AS_PICKLES = 1
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
+# This flag causes Mailman to fsync() the file after writing and flushing its
+# contents. While this ensures the data is written to disk, avoiding data
+# loss, it is a huge performance killer.
+SYNC_AFTER_WRITE = False
+# This flag causes messages to be written as pickles (when True) or text files
+# (when False). Pickles are more efficient because the message doesn't need
+# to be re-parsed every time it's unqueued, but pickles are not human readable.
+SAVE_MSGS_AS_PICKLES = True
@@ -127,6 +140,7 @@ class _Switchboard:
tmpfile = dbfile + '.tmp'
self._ext_write(tmpfile, data)
os.rename(tmpfile, dbfile)
+ return filebase
def dequeue(self, filebase):
# Calculate the .db and .msg filenames from the given filebase.
@@ -239,6 +253,10 @@ class MarshalSwitchboard(_Switchboard):
else:
dict[attr] = repr(fval)
marshal.dump(dict, fp)
+ # Make damn sure that the data we just wrote gets flushed to disk
+ fp.flush()
+ if SYNC_AFTER_WRITE:
+ os.fsync(fp.fileno())
fp.close()
def _ext_read(self, filename):
@@ -307,6 +325,10 @@ class ASCIISwitchboard(_Switchboard):
os.umask(omask)
for k, v in dict.items():
print >> fp, '%s = %s' % (k, repr(v))
+ # Make damn sure that the data we just wrote gets flushed to disk
+ fp.flush()
+ if SYNC_AFTER_WRITE:
+ os.fsync(fp.fileno())
fp.close()
def _ext_read(self, filename):