diff options
author | bwarsaw <> | 2004-02-22 22:34:50 +0000 |
---|---|---|
committer | bwarsaw <> | 2004-02-22 22:34:50 +0000 |
commit | 128af0573c554be30be6397a11d66e69e888c565 (patch) | |
tree | 0dfba159a85865df0b0519d5a6ef7b07a3242c81 /Mailman | |
parent | d7eec848a0f8c6641275dd491ca558e35f2e8728 (diff) | |
download | mailman2-128af0573c554be30be6397a11d66e69e888c565.tar.gz mailman2-128af0573c554be30be6397a11d66e69e888c565.tar.xz mailman2-128af0573c554be30be6397a11d66e69e888c565.zip |
Much refactoring to improve bounce processing. The basic change moves the
queuing of bounce events out of the process's memory and into a 'bounce event'
file, essentially consisting of a stream of pickles. Every once in a while
the qrunner wakes up and processes the event file, which is named to be
specific to each qrunner instance. The actually processing is factored out
into a mixin class which is now subclassed by both the BounceRunner and
OutgoingRunner classes (since the latter has to handle SMTP rejects as
bounces).
Specific changes include:
REGISTER_BOUNCES_EVERY: Promote to a Defaults.py.in variable.
BounceMixin: Mixin class for handling the basics of bounce event queuing,
periodic processing, and registering, as well as cleanup. It also contains a
_probe_bounce() method for handling bounces of the probe message (which cause
an immediate disable).
BounceRunner: Add BounceMixin as a base class.
Diffstat (limited to 'Mailman')
-rw-r--r-- | Mailman/Queue/BounceRunner.py | 176 |
1 files changed, 119 insertions, 57 deletions
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py index 6ea1fc68..8d802a60 100644 --- a/Mailman/Queue/BounceRunner.py +++ b/Mailman/Queue/BounceRunner.py @@ -16,8 +16,10 @@ """Bounce queue runner.""" +import os import re import time +import cPickle from email.MIMEText import MIMEText from email.MIMEMessage import MIMEMessage @@ -35,8 +37,6 @@ from Mailman.i18n import _ COMMASPACE = ', ' -REGISTER_BOUNCES_EVERY = mm_cfg.minutes(15) - try: True, False except NameError: @@ -45,18 +45,123 @@ except NameError: -class BounceRunner(Runner): +class BounceMixin: + def __init__(self): + # Registering a bounce means acquiring the list lock, and it would be + # too expensive to do this for each message. Instead, each bounce + # runner maintains an event log which is essentially a file with + # multiple pickles. Each bounce we receive gets appended to this file + # as a 4-tuple record: (listname, addr, today, msg) + # + # today is itself a 3-tuple of (year, month, day) + # + # Every once in a while (see _doperiodic()), the bounce runner cracks + # open the file, reads all the records and registers all the bounces. + # Then it truncates the file and continues on. We don't need to lock + # the bounce event file because bounce qrunners are single threaded + # and each creates a uniquely named file to contain the events. + # + # XXX When Python 2.3 is minimal require, we can use the new + # tempfile.TemporaryFile() function. + # + # XXX We used to classify bounces to the site list as bounce events + # for every list, but this caused severe problems. Here's the + # scenario: aperson@example.com is a member of 4 lists, and a list + # owner of the foo list. example.com has an aggressive spam filter + # which rejects any message that is spam or contains spam as an + # attachment. Now, a spambot sends a piece of spam to the foo list, + # but since that spambot is not a member, the list holds the message + # for approval, and sends a notification to aperson@example.com as + # list owner. That notification contains a copy of the spam. Now + # example.com rejects the message, causing a bounce to be sent to the + # site list's bounce address. The bounce runner would then dutifully + # register a bounce for all 4 lists that aperson@example.com was a + # member of, and eventually that person would get disabled on all + # their lists. So now we ignore site list bounces. Ce La Vie for + # password reminder bounces. + self._bounce_events_file = os.path.join( + mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid()) + self._bounce_events_fp = None + self._bouncecnt = 0 + self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY + + def _queue_bounces(self, listname, addrs, msg): + today = time.localtime()[:3] + if self._bounce_events_fp is None: + self._bounce_events_fp = open(self._bounce_events_file, 'a+b') + for addr in addrs: + cPickle.dump((listname, addr, today, msg), + self._bounce_events_fp, 1) + self._bounce_events_fp.flush() + os.fsync(self._bounce_events_fp.fileno()) + self._bouncecnt += len(addrs) + + def _register_bounces(self): + syslog('bounce', '%s processing %s queued bounces', + self, self._bouncecnt) + # Read all the records from the bounce file, then unlink it. Sort the + # records by listname for more efficient processing. + events = {} + self._bounce_events_fp.seek(0) + while True: + try: + listname, addr, day, msg = cPickle.load(self._bounce_events_fp) + except ValueError, e: + syslog('bounce', 'Error reading bounce events: %s', e) + except EOFError: + break + events.setdefault(listname, []).append((addr, day, msg)) + # Now register all events sorted by list + for listname in events.keys(): + mlist = self._open_list(listname) + mlist.Lock() + try: + for addr, day, msg in events[listname]: + mlist.registerBounce(addr, msg, day=day) + mlist.Save() + finally: + mlist.Unlock() + # Reset and free all the cached memory + self._bounce_events_fp.close() + self._bounce_events_fp = None + os.unlink(self._bounce_events_file) + self._bouncecnt = 0 + + def _cleanup(self): + if self._bouncecnt > 0: + self._register_bounces() + + def _doperiodic(self): + now = time.time() + if self._nextaction > now or self._bouncecnt == 0: + return + # Let's go ahead and register the bounces we've got stored up + self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY + self._register_bounces() + + def _probe_bounce(self, mlist, token): + locked = mlist.Locked() + if not locked: + mlist.Lock() + try: + op, addr, bmsg = mlist.pend_confirm(token) + info = mlist.getBounceInfo(addr) + mlist.disableBouncingMember(addr, info, bmsg) + # Only save the list if we're unlocking it + if not locked: + mlist.Save() + finally: + if not locked: + mlist.Unlock() + + + +class BounceRunner(Runner, BounceMixin): QDIR = mm_cfg.BOUNCEQUEUE_DIR def __init__(self, slice=None, numslices=1): Runner.__init__(self, slice, numslices) - # This is a simple sequence of bounce score events. Each entry in the - # list is a tuple of (address, day, msg) where day is a tuple of - # (YYYY, MM, DD). We'll sort and collate all this information in - # _register_bounces() below. - self._bounces = {} - self._bouncecnt = 0 - self._next_registration = time.time() + REGISTER_BOUNCES_EVERY + BounceMixin.__init__(self) def _dispose(self, mlist, msg, msgdata): # Make sure we have the most up-to-date state @@ -91,15 +196,7 @@ class BounceRunner(Runner): # See if this was a probe message. token = verp_probe(mlist, msg) if token: - # The list must be locked to perform these operations - mlist.Lock() - try: - op, addr, bmsg = mlist.pend_confirm(token) - info = mlist.getBounceInfo(addr) - mlist.disableBouncingMember(addr, info, bmsg) - mlist.Save() - finally: - mlist.Unlock() + self._probe_bounce(mlist, token) return # That didn't give us anything useful, so try the old fashion # bounce matching modules. @@ -115,47 +212,12 @@ class BounceRunner(Runner): # although I'm unsure how that could happen. Possibly ScanMessages() # can let None's sneak through. In any event, this will kill them. addrs = filter(None, addrs) - # Store the bounce score events so we can register them periodically - today = time.localtime()[:3] - events = [(addr, today, msg) for addr in addrs] - self._bounces.setdefault(mlist.internal_name(), []).extend(events) - self._bouncecnt += len(addrs) + self._queue_bounces(mlist.internal_name(), addrs, msg) - def _doperiodic(self): - now = time.time() - if self._next_registration > now or not self._bounces: - return - # Let's go ahead and register the bounces we've got stored up - self._next_registration = now + REGISTER_BOUNCES_EVERY - self._register_bounces() - - def _register_bounces(self): - syslog('bounce', 'Processing %s queued bounces', self._bouncecnt) - # First, get the list of bounces register against the site list. For - # these addresses, we want to register a bounce on every list the - # address is a member of -- which we don't know yet. - sitebounces = self._bounces.get(mm_cfg.MAILMAN_SITE_LIST, []) - if sitebounces: - listnames = Utils.list_names() - else: - listnames = self._bounces.keys() - for listname in listnames: - mlist = self._open_list(listname) - mlist.Lock() - try: - events = self._bounces.get(listname, []) + sitebounces - for addr, day, msg in events: - mlist.registerBounce(addr, msg, day=day) - mlist.Save() - finally: - mlist.Unlock() - # Reset and free all the cached memory - self._bounces = {} - self._bouncecnt = 0 + _doperiodic = BounceMixin._doperiodic def _cleanup(self): - if self._bounces: - self._register_bounces() + BounceMixin._cleanup(self) Runner._cleanup(self) |