aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman
diff options
context:
space:
mode:
authorbwarsaw <>2004-02-22 22:34:50 +0000
committerbwarsaw <>2004-02-22 22:34:50 +0000
commit128af0573c554be30be6397a11d66e69e888c565 (patch)
tree0dfba159a85865df0b0519d5a6ef7b07a3242c81 /Mailman
parentd7eec848a0f8c6641275dd491ca558e35f2e8728 (diff)
downloadmailman2-128af0573c554be30be6397a11d66e69e888c565.tar.gz
mailman2-128af0573c554be30be6397a11d66e69e888c565.tar.xz
mailman2-128af0573c554be30be6397a11d66e69e888c565.zip
Much refactoring to improve bounce processing. The basic change moves the
queuing of bounce events out of the process's memory and into a 'bounce event' file, essentially consisting of a stream of pickles. Every once in a while the qrunner wakes up and processes the event file, which is named to be specific to each qrunner instance. The actually processing is factored out into a mixin class which is now subclassed by both the BounceRunner and OutgoingRunner classes (since the latter has to handle SMTP rejects as bounces). Specific changes include: REGISTER_BOUNCES_EVERY: Promote to a Defaults.py.in variable. BounceMixin: Mixin class for handling the basics of bounce event queuing, periodic processing, and registering, as well as cleanup. It also contains a _probe_bounce() method for handling bounces of the probe message (which cause an immediate disable). BounceRunner: Add BounceMixin as a base class.
Diffstat (limited to 'Mailman')
-rw-r--r--Mailman/Queue/BounceRunner.py176
1 files changed, 119 insertions, 57 deletions
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py
index 6ea1fc68..8d802a60 100644
--- a/Mailman/Queue/BounceRunner.py
+++ b/Mailman/Queue/BounceRunner.py
@@ -16,8 +16,10 @@
"""Bounce queue runner."""
+import os
import re
import time
+import cPickle
from email.MIMEText import MIMEText
from email.MIMEMessage import MIMEMessage
@@ -35,8 +37,6 @@ from Mailman.i18n import _
COMMASPACE = ', '
-REGISTER_BOUNCES_EVERY = mm_cfg.minutes(15)
-
try:
True, False
except NameError:
@@ -45,18 +45,123 @@ except NameError:
-class BounceRunner(Runner):
+class BounceMixin:
+ def __init__(self):
+ # Registering a bounce means acquiring the list lock, and it would be
+ # too expensive to do this for each message. Instead, each bounce
+ # runner maintains an event log which is essentially a file with
+ # multiple pickles. Each bounce we receive gets appended to this file
+ # as a 4-tuple record: (listname, addr, today, msg)
+ #
+ # today is itself a 3-tuple of (year, month, day)
+ #
+ # Every once in a while (see _doperiodic()), the bounce runner cracks
+ # open the file, reads all the records and registers all the bounces.
+ # Then it truncates the file and continues on. We don't need to lock
+ # the bounce event file because bounce qrunners are single threaded
+ # and each creates a uniquely named file to contain the events.
+ #
+ # XXX When Python 2.3 is minimal require, we can use the new
+ # tempfile.TemporaryFile() function.
+ #
+ # XXX We used to classify bounces to the site list as bounce events
+ # for every list, but this caused severe problems. Here's the
+ # scenario: aperson@example.com is a member of 4 lists, and a list
+ # owner of the foo list. example.com has an aggressive spam filter
+ # which rejects any message that is spam or contains spam as an
+ # attachment. Now, a spambot sends a piece of spam to the foo list,
+ # but since that spambot is not a member, the list holds the message
+ # for approval, and sends a notification to aperson@example.com as
+ # list owner. That notification contains a copy of the spam. Now
+ # example.com rejects the message, causing a bounce to be sent to the
+ # site list's bounce address. The bounce runner would then dutifully
+ # register a bounce for all 4 lists that aperson@example.com was a
+ # member of, and eventually that person would get disabled on all
+ # their lists. So now we ignore site list bounces. Ce La Vie for
+ # password reminder bounces.
+ self._bounce_events_file = os.path.join(
+ mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid())
+ self._bounce_events_fp = None
+ self._bouncecnt = 0
+ self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY
+
+ def _queue_bounces(self, listname, addrs, msg):
+ today = time.localtime()[:3]
+ if self._bounce_events_fp is None:
+ self._bounce_events_fp = open(self._bounce_events_file, 'a+b')
+ for addr in addrs:
+ cPickle.dump((listname, addr, today, msg),
+ self._bounce_events_fp, 1)
+ self._bounce_events_fp.flush()
+ os.fsync(self._bounce_events_fp.fileno())
+ self._bouncecnt += len(addrs)
+
+ def _register_bounces(self):
+ syslog('bounce', '%s processing %s queued bounces',
+ self, self._bouncecnt)
+ # Read all the records from the bounce file, then unlink it. Sort the
+ # records by listname for more efficient processing.
+ events = {}
+ self._bounce_events_fp.seek(0)
+ while True:
+ try:
+ listname, addr, day, msg = cPickle.load(self._bounce_events_fp)
+ except ValueError, e:
+ syslog('bounce', 'Error reading bounce events: %s', e)
+ except EOFError:
+ break
+ events.setdefault(listname, []).append((addr, day, msg))
+ # Now register all events sorted by list
+ for listname in events.keys():
+ mlist = self._open_list(listname)
+ mlist.Lock()
+ try:
+ for addr, day, msg in events[listname]:
+ mlist.registerBounce(addr, msg, day=day)
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ # Reset and free all the cached memory
+ self._bounce_events_fp.close()
+ self._bounce_events_fp = None
+ os.unlink(self._bounce_events_file)
+ self._bouncecnt = 0
+
+ def _cleanup(self):
+ if self._bouncecnt > 0:
+ self._register_bounces()
+
+ def _doperiodic(self):
+ now = time.time()
+ if self._nextaction > now or self._bouncecnt == 0:
+ return
+ # Let's go ahead and register the bounces we've got stored up
+ self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY
+ self._register_bounces()
+
+ def _probe_bounce(self, mlist, token):
+ locked = mlist.Locked()
+ if not locked:
+ mlist.Lock()
+ try:
+ op, addr, bmsg = mlist.pend_confirm(token)
+ info = mlist.getBounceInfo(addr)
+ mlist.disableBouncingMember(addr, info, bmsg)
+ # Only save the list if we're unlocking it
+ if not locked:
+ mlist.Save()
+ finally:
+ if not locked:
+ mlist.Unlock()
+
+
+
+class BounceRunner(Runner, BounceMixin):
QDIR = mm_cfg.BOUNCEQUEUE_DIR
def __init__(self, slice=None, numslices=1):
Runner.__init__(self, slice, numslices)
- # This is a simple sequence of bounce score events. Each entry in the
- # list is a tuple of (address, day, msg) where day is a tuple of
- # (YYYY, MM, DD). We'll sort and collate all this information in
- # _register_bounces() below.
- self._bounces = {}
- self._bouncecnt = 0
- self._next_registration = time.time() + REGISTER_BOUNCES_EVERY
+ BounceMixin.__init__(self)
def _dispose(self, mlist, msg, msgdata):
# Make sure we have the most up-to-date state
@@ -91,15 +196,7 @@ class BounceRunner(Runner):
# See if this was a probe message.
token = verp_probe(mlist, msg)
if token:
- # The list must be locked to perform these operations
- mlist.Lock()
- try:
- op, addr, bmsg = mlist.pend_confirm(token)
- info = mlist.getBounceInfo(addr)
- mlist.disableBouncingMember(addr, info, bmsg)
- mlist.Save()
- finally:
- mlist.Unlock()
+ self._probe_bounce(mlist, token)
return
# That didn't give us anything useful, so try the old fashion
# bounce matching modules.
@@ -115,47 +212,12 @@ class BounceRunner(Runner):
# although I'm unsure how that could happen. Possibly ScanMessages()
# can let None's sneak through. In any event, this will kill them.
addrs = filter(None, addrs)
- # Store the bounce score events so we can register them periodically
- today = time.localtime()[:3]
- events = [(addr, today, msg) for addr in addrs]
- self._bounces.setdefault(mlist.internal_name(), []).extend(events)
- self._bouncecnt += len(addrs)
+ self._queue_bounces(mlist.internal_name(), addrs, msg)
- def _doperiodic(self):
- now = time.time()
- if self._next_registration > now or not self._bounces:
- return
- # Let's go ahead and register the bounces we've got stored up
- self._next_registration = now + REGISTER_BOUNCES_EVERY
- self._register_bounces()
-
- def _register_bounces(self):
- syslog('bounce', 'Processing %s queued bounces', self._bouncecnt)
- # First, get the list of bounces register against the site list. For
- # these addresses, we want to register a bounce on every list the
- # address is a member of -- which we don't know yet.
- sitebounces = self._bounces.get(mm_cfg.MAILMAN_SITE_LIST, [])
- if sitebounces:
- listnames = Utils.list_names()
- else:
- listnames = self._bounces.keys()
- for listname in listnames:
- mlist = self._open_list(listname)
- mlist.Lock()
- try:
- events = self._bounces.get(listname, []) + sitebounces
- for addr, day, msg in events:
- mlist.registerBounce(addr, msg, day=day)
- mlist.Save()
- finally:
- mlist.Unlock()
- # Reset and free all the cached memory
- self._bounces = {}
- self._bouncecnt = 0
+ _doperiodic = BounceMixin._doperiodic
def _cleanup(self):
- if self._bounces:
- self._register_bounces()
+ BounceMixin._cleanup(self)
Runner._cleanup(self)