diff options
author | <> | 2003-01-02 05:25:50 +0000 |
---|---|---|
committer | <> | 2003-01-02 05:25:50 +0000 |
commit | b132a73f15e432eaf43310fce9196ca0c0651465 (patch) | |
tree | c15f816ba7c4de99fef510e3bd75af0890d47441 /Mailman/Queue | |
download | mailman2-b132a73f15e432eaf43310fce9196ca0c0651465.tar.gz mailman2-b132a73f15e432eaf43310fce9196ca0c0651465.tar.xz mailman2-b132a73f15e432eaf43310fce9196ca0c0651465.zip |
This commit was manufactured by cvs2svn to create branch
'Release_2_1-maint'.
Diffstat (limited to 'Mailman/Queue')
-rw-r--r-- | Mailman/Queue/.cvsignore | 1 | ||||
-rw-r--r-- | Mailman/Queue/ArchRunner.py | 76 | ||||
-rw-r--r-- | Mailman/Queue/BounceRunner.py | 195 | ||||
-rw-r--r-- | Mailman/Queue/CommandRunner.py | 220 | ||||
-rw-r--r-- | Mailman/Queue/IncomingRunner.py | 170 | ||||
-rw-r--r-- | Mailman/Queue/MaildirRunner.py | 184 | ||||
-rw-r--r-- | Mailman/Queue/Makefile.in | 69 | ||||
-rw-r--r-- | Mailman/Queue/NewsRunner.py | 158 | ||||
-rw-r--r-- | Mailman/Queue/OutgoingRunner.py | 139 | ||||
-rw-r--r-- | Mailman/Queue/Runner.py | 245 | ||||
-rw-r--r-- | Mailman/Queue/Switchboard.py | 340 | ||||
-rw-r--r-- | Mailman/Queue/VirginRunner.py | 43 | ||||
-rw-r--r-- | Mailman/Queue/__init__.py | 15 | ||||
-rw-r--r-- | Mailman/Queue/sbcache.py | 26 |
14 files changed, 1881 insertions, 0 deletions
diff --git a/Mailman/Queue/.cvsignore b/Mailman/Queue/.cvsignore new file mode 100644 index 00000000..f3c7a7c5 --- /dev/null +++ b/Mailman/Queue/.cvsignore @@ -0,0 +1 @@ +Makefile diff --git a/Mailman/Queue/ArchRunner.py b/Mailman/Queue/ArchRunner.py new file mode 100644 index 00000000..14097332 --- /dev/null +++ b/Mailman/Queue/ArchRunner.py @@ -0,0 +1,76 @@ +# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Outgoing queue runner.""" + +import time +from email.Utils import parsedate_tz, mktime_tz, formatdate + +from Mailman import mm_cfg +from Mailman import LockFile +from Mailman.Queue.Runner import Runner + + + +class ArchRunner(Runner): + QDIR = mm_cfg.ARCHQUEUE_DIR + + def _dispose(self, mlist, msg, msgdata): + # Support clobber_date, i.e. setting the date in the archive to the + # received date, not the (potentially bogus) Date: header of the + # original message. + clobber = 0 + originaldate = msg.get('date') + receivedtime = formatdate(msgdata['received_time']) + if not originaldate: + clobber = 1 + elif mm_cfg.ARCHIVER_CLOBBER_DATE_POLICY == 1: + clobber = 1 + elif mm_cfg.ARCHIVER_CLOBBER_DATE_POLICY == 2: + # what's the timestamp on the original message? + tup = parsedate_tz(originaldate) + now = time.time() + try: + if not tup: + clobber = 1 + elif abs(now - mktime_tz(tup)) > \ + mm_cfg.ARCHIVER_ALLOWABLE_SANE_DATE_SKEW: + clobber = 1 + except ValueError: + # The likely cause of this is that the year in the Date: field + # is horribly incorrect, e.g. (from SF bug # 571634): + # Date: Tue, 18 Jun 0102 05:12:09 +0500 + # Obviously clobber such dates. + clobber = 1 + if clobber: + del msg['date'] + del msg['x-original-date'] + msg['Date'] = receivedtime + if originaldate: + msg['X-Original-Date'] = originaldate + # Always put an indication of when we received the message. + msg['X-List-Received-Date'] = receivedtime + # Now try to get the list lock + try: + mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + # oh well, try again later + return 1 + try: + mlist.ArchiveMail(msg) + mlist.Save() + finally: + mlist.Unlock() diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py new file mode 100644 index 00000000..e59ac47e --- /dev/null +++ b/Mailman/Queue/BounceRunner.py @@ -0,0 +1,195 @@ +# Copyright (C) 2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Bounce queue runner.""" + +import re +from email.MIMEText import MIMEText +from email.MIMEMessage import MIMEMessage +from email.Utils import parseaddr + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman import LockFile +from Mailman.Message import UserNotification +from Mailman.Bouncers import BouncerAPI +from Mailman.Queue.Runner import Runner +from Mailman.Queue.sbcache import get_switchboard +from Mailman.Logging.Syslog import syslog +from Mailman.i18n import _ + +COMMASPACE = ', ' + + + +class BounceRunner(Runner): + QDIR = mm_cfg.BOUNCEQUEUE_DIR + # We only do bounce processing once per minute. + SLEEPTIME = mm_cfg.minutes(1) + + def _dispose(self, mlist, msg, msgdata): + # Make sure we have the most up-to-date state + mlist.Load() + outq = get_switchboard(mm_cfg.OUTQUEUE_DIR) + # There are a few possibilities here: + # + # - the message could have been VERP'd in which case, we know exactly + # who the message was destined for. That make our job easy. + # - the message could have been originally destined for a list owner, + # but a list owner address itself bounced. That's bad, and for now + # we'll simply log the problem and attempt to deliver the message to + # the site owner. + # + # All messages to list-owner@vdom.ain have their envelope sender set + # to site-owner@dom.ain (no virtual domain). Is this a bounce for a + # message to a list owner, coming to the site owner? + if msg.get('to', '') == Utils.get_site_email(extra='-owner'): + # Send it on to the site owners, but craft the envelope sender to + # be the -loop detection address, so if /they/ bounce, we won't + # get stuck in a bounce loop. + outq.enqueue(msg, msgdata, + recips=[Utils.get_site_email()], + envsender=Utils.get_site_email(extra='loop'), + ) + # List isn't doing bounce processing? + if not mlist.bounce_processing: + return + # Try VERP detection first, since it's quick and easy + addrs = verp_bounce(mlist, msg) + if not addrs: + # That didn't give us anything useful, so try the old fashion + # bounce matching modules + addrs = BouncerAPI.ScanMessages(mlist, msg) + # If that still didn't return us any useful addresses, then send it on + # or discard it. + if not addrs: + syslog('bounce', 'bounce message w/no discernable addresses: %s', + msg.get('message-id')) + maybe_forward(mlist, msg) + return + # BAW: It's possible that there are None's in the list of addresses, + # although I'm unsure how that could happen. Possibly ScanMessages() + # can let None's sneak through. In any event, this will kill them. + addrs = filter(None, addrs) + # Okay, we have some recognized addresses. We now need to register + # the bounces for each of these. If the bounce came to the site list, + # then we'll register the address on every list in the system, but + # note: this could be VERY resource intensive! + foundp = 0 + listname = mlist.internal_name() + if listname == mm_cfg.MAILMAN_SITE_LIST: + foundp = 1 + for listname in Utils.list_names(): + xlist = self._open_list(listname) + xlist.Load() + for addr in addrs: + if xlist.isMember(addr): + unlockp = 0 + if not xlist.Locked(): + try: + xlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + # Oh well, forget aboutf this list + continue + unlockp = 1 + try: + xlist.registerBounce(addr, msg) + foundp = 1 + xlist.Save() + finally: + if unlockp: + xlist.Unlock() + else: + try: + mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + # Try again later + syslog('bounce', "%s: couldn't get list lock", listname) + return 1 + else: + try: + for addr in addrs: + if mlist.isMember(addr): + mlist.registerBounce(addr, msg) + foundp = 1 + mlist.Save() + finally: + mlist.Unlock() + if not foundp: + # It means an address was recognized but it wasn't an address + # that's on any mailing list at this site. BAW: don't forward + # these, but do log it. + syslog('bounce', 'bounce message with non-members of %s: %s', + listname, COMMASPACE.join(addrs)) + #maybe_forward(mlist, msg) + + + +def verp_bounce(mlist, msg): + bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail()) + # Sadly not every MTA bounces VERP messages correctly, or consistently. + # Fall back to Delivered-To: (Postfix), Envelope-To: (Exim) and + # Apparently-To:, and then short-circuit if we still don't have anything + # to work with. Note that there can be multiple Delivered-To: headers so + # we need to search them all (and we don't worry about false positives for + # forwarded email, because only one should match VERP_REGEXP). + vals = [] + for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'): + vals.extend(msg.get_all(header, [])) + for field in vals: + to = parseaddr(field)[1] + if not to: + continue # empty header + mo = re.search(mm_cfg.VERP_REGEXP, to) + if not mo: + continue # no match of regexp + try: + if bmailbox <> mo.group('bounces'): + continue # not a bounce to our list + # All is good + addr = '%s@%s' % mo.group('mailbox', 'host') + except IndexError: + syslog('error', + "VERP_REGEXP doesn't yield the right match groups: %s", + mm_cfg.VERP_REGEXP) + return [] + return [addr] + + + +def maybe_forward(mlist, msg): + # Does the list owner want to get non-matching bounce messages? + # If not, simply discard it. + if mlist.bounce_unrecognized_goes_to_list_owner: + adminurl = mlist.GetScriptURL('admin', absolute=1) + '/bounce' + mlist.ForwardMessage(msg, + text=_("""\ +The attached message was received as a bounce, but either the bounce format +was not recognized, or no member addresses could be extracted from it. This +mailing list has been configured to send all unrecognized bounce messages to +the list administrator(s). + +For more information see: +%(adminurl)s + +"""), + subject=_('Uncaught bounce notification'), + tomoderators=0) + syslog('bounce', 'forwarding unrecognized, message-id: %s', + msg.get('message-id', 'n/a')) + else: + syslog('bounce', 'discarding unrecognized, message-id: %s', + msg.get('message-id', 'n/a')) diff --git a/Mailman/Queue/CommandRunner.py b/Mailman/Queue/CommandRunner.py new file mode 100644 index 00000000..303d4c52 --- /dev/null +++ b/Mailman/Queue/CommandRunner.py @@ -0,0 +1,220 @@ +# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""-request robot command queue runner.""" + +# See the delivery diagram in IncomingRunner.py. This module handles all +# email destined for mylist-request, -join, and -leave. It no longer handles +# bounce messages (i.e. -admin or -bounces), nor does it handle mail to +# -owner. + + + +# BAW: get rid of this when we Python 2.2 is a minimum requirement. +from __future__ import nested_scopes + +import sys +import re +from types import StringType, UnicodeType + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman import Message +from Mailman.Handlers import Replybot +from Mailman.i18n import _ +from Mailman.Queue.Runner import Runner +from Mailman.Logging.Syslog import syslog +from Mailman import LockFile + +from email.MIMEText import MIMEText +from email.MIMEMessage import MIMEMessage +from email.Iterators import typed_subpart_iterator + +NL = '\n' + + + +class Results: + def __init__(self, mlist, msg, msgdata): + self.mlist = mlist + self.msg = msg + self.msgdata = msgdata + # Only set returnaddr if the response is to go to someone other than + # the address specified in the From: header (e.g. for the password + # command). + self.returnaddr = None + self.commands = [] + self.results = [] + self.ignored = [] + self.lineno = 0 + self.subjcmdretried = 0 + self.respond = 1 + # Always process the Subject: header first + self.commands.append(msg.get('subject', '')) + # Find the first text/plain part + part = None + for part in typed_subpart_iterator(msg, 'text', 'plain'): + break + if part is None or part is not msg: + # Either there was no text/plain part or we ignored some + # non-text/plain parts. + self.results.append(_('Ignoring non-text/plain MIME parts')) + if part is None: + # E.g the outer Content-Type: was text/html + return + body = part.get_payload() + # text/plain parts better have string payloads + assert isinstance(body, StringType) or isinstance(body, UnicodeType) + lines = body.splitlines() + # Use no more lines than specified + self.commands.extend(lines[:mm_cfg.DEFAULT_MAIL_COMMANDS_MAX_LINES]) + self.ignored.extend(lines[mm_cfg.DEFAULT_MAIL_COMMANDS_MAX_LINES:]) + + def process(self): + # Now, process each line until we find an error. The first + # non-command line found stops processing. + stop = 0 + for line in self.commands: + if line and line.strip(): + args = line.split() + cmd = args.pop(0).lower() + stop = self.do_command(cmd, args) + self.lineno += 1 + if stop: + break + + def do_command(self, cmd, args=None): + if args is None: + args = () + # Try to import a command handler module for this command + modname = 'Mailman.Commands.cmd_' + cmd + try: + __import__(modname) + handler = sys.modules[modname] + except ImportError: + # If we're on line zero, it was the Subject: header that didn't + # contain a command. It's possible there's a Re: prefix (or + # localized version thereof) on the Subject: line that's messing + # things up. Pop the prefix off and try again... once. + # + # If that still didn't work it isn't enough to stop processing. + # BAW: should we include a message that the Subject: was ignored? + if not self.subjcmdretried and args: + self.subjcmdretried += 1 + cmd = args.pop(0) + return self.do_command(cmd, args) + return self.lineno <> 0 + return handler.process(self, args) + + def send_response(self): + # Helper + def indent(lines): + return [' ' + line for line in lines] + # Quick exit for some commands which don't need a response + if not self.respond: + return + resp = [Utils.wrap(_("""\ +The results of your email command are provided below. +Attached is your original message. +"""))] + if self.results: + resp.append(_('- Results:')) + resp.extend(indent(self.results)) + # Ignore empty lines + unprocessed = [line for line in self.commands[self.lineno:] + if line and line.strip()] + if unprocessed: + resp.append(_('\n- Unprocessed:')) + resp.extend(indent(unprocessed)) + if self.ignored: + resp.append(_('\n- Ignored:')) + resp.extend(indent(self.ignored)) + resp.append(_('\n- Done.\n\n')) + results = MIMEText( + NL.join(resp), + _charset=Utils.GetCharSet(self.mlist.preferred_language)) + # Safety valve for mail loops with misconfigured email 'bots. We + # don't respond to commands sent with "Precedence: bulk|junk|list" + # unless they explicitly "X-Ack: yes", but not all mail 'bots are + # correctly configured, so we max out the number of responses we'll + # give to an address in a single day. + # + # BAW: We wait until now to make this decision since our sender may + # not be self.msg.get_sender(), but I'm not sure this is right. + recip = self.returnaddr or self.msg.get_sender() + if not self.mlist.autorespondToSender(recip): + return + msg = Message.UserNotification( + recip, + self.mlist.GetBouncesEmail(), + _('The results of your email commands'), + lang=self.mlist.preferred_language) + msg.set_type('multipart/mixed') + msg.attach(results) + orig = MIMEMessage(self.msg) + msg.attach(orig) + msg.send(self.mlist) + + + +class CommandRunner(Runner): + QDIR = mm_cfg.CMDQUEUE_DIR + + def _dispose(self, mlist, msg, msgdata): + # The policy here is similar to the Replybot policy. If a message has + # "Precedence: bulk|junk|list" and no "X-Ack: yes" header, we discard + # it to prevent replybot response storms. + precedence = msg.get('precedence', '').lower() + ack = msg.get('x-ack', '').lower() + if ack <> 'yes' and precedence in ('bulk', 'junk', 'list'): + syslog('vette', 'Precedence: %s message discarded by: %s', + precedence, mlist.GetRequestEmail()) + return 0 + # Do replybot for commands + mlist.Load() + Replybot.process(mlist, msg, msgdata) + if mlist.autorespond_requests == 1: + syslog('vette', 'replied and discard') + # w/discard + return 0 + # Now craft the response + res = Results(mlist, msg, msgdata) + # BAW: Not all the functions of this qrunner require the list to be + # locked. Still, it's more convenient to lock it here and now and + # deal with lock failures in one place. + try: + mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + # Oh well, try again later + return 1 + # This message will have been delivered to one of mylist-request, + # mylist-join, or mylist-leave, and the message metadata will contain + # a key to which one was used. + try: + if msgdata.get('torequest'): + res.process() + elif msgdata.get('tojoin'): + res.do_command('join') + elif msgdata.get('toleave'): + res.do_command('leave') + elif msgdata.get('toconfirm'): + mo = re.match(mm_cfg.VERP_CONFIRM_REGEXP, msg.get('to', '')) + if mo: + res.do_command('confirm', (mo.group('cookie'),)) + res.send_response() + mlist.Save() + finally: + mlist.Unlock() diff --git a/Mailman/Queue/IncomingRunner.py b/Mailman/Queue/IncomingRunner.py new file mode 100644 index 00000000..4a60ceb9 --- /dev/null +++ b/Mailman/Queue/IncomingRunner.py @@ -0,0 +1,170 @@ +# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Incoming queue runner.""" + +# A typical Mailman list exposes nine aliases which point to seven different +# wrapped scripts. E.g. for a list named `mylist', you'd have: +# +# mylist-bounces -> bounces (-admin is a deprecated alias) +# mylist-confirm -> confirm +# mylist-join -> join (-subscribe is an alias) +# mylist-leave -> leave (-unsubscribe is an alias) +# mylist-owner -> owner +# mylist -> post +# mylist-request -> request +# +# -request, -join, and -leave are a robot addresses; their sole purpose is to +# process emailed commands in a Majordomo-like fashion (although the latter +# two are hardcoded to subscription and unsubscription requests). -bounces is +# the automated bounce processor, and all messages to list members have their +# return address set to -bounces. If the bounce processor fails to extract a +# bouncing member address, it can optionally forward the message on to the +# list owners. +# +# -owner is for reaching a human operator with minimal list interaction +# (i.e. no bounce processing). -confirm is another robot address which +# processes replies to VERP-like confirmation notices. +# +# So delivery flow of messages look like this: +# +# joerandom ---> mylist ---> list members +# | | +# | |[bounces] +# | mylist-bounces <---+ <-------------------------------+ +# | | | +# | +--->[internal bounce processing] | +# | ^ | | +# | | | [bounce found] | +# | [bounces *] +--->[register and discard] | +# | | | | | +# | | | |[*] | +# | [list owners] |[no bounce found] | | +# | ^ | | | +# | | | | | +# +-------> mylist-owner <--------+ | | +# | | | +# | data/owner-bounces.mbox <--[site list] <---+ | +# | | +# +-------> mylist-join--+ | +# | | | +# +------> mylist-leave--+ | +# | | | +# | v | +# +-------> mylist-request | +# | | | +# | +---> [command processor] | +# | | | +# +-----> mylist-confirm ----> +---> joerandom | +# | | +# |[bounces] | +# +----------------------+ +# +# A person can send an email to the list address (for posting), the -owner +# address (to reach the human operator), or the -confirm, -join, -leave, and +# -request mailbots. Message to the list address are then forwarded on to the +# list membership, with bounces directed to the -bounces address. +# +# [*] Messages sent to the -owner address are forwarded on to the list +# owner/moderators. All -owner destined messages have their bounces directed +# to the site list -bounces address, regardless of whether a human sent the +# message or the message was crafted internally. The intention here is that +# the site owners want to be notified when one of their list owners' addresses +# starts bouncing (yes, the will be automated in a future release). +# +# Any messages to site owners has their bounces directed to a special +# "loop-killer" address, which just dumps the message into +# data/owners-bounces.mbox. +# +# Finally, message to any of the mailbots causes the requested action to be +# performed. Results notifications are sent to the author of the message, +# which all bounces pointing back to the -bounces address. + + +import sys +import os +from cStringIO import StringIO + +from Mailman import mm_cfg +from Mailman import Errors +from Mailman import LockFile +from Mailman.Queue.Runner import Runner +from Mailman.Logging.Syslog import syslog + + + +class IncomingRunner(Runner): + QDIR = mm_cfg.INQUEUE_DIR + + def _dispose(self, mlist, msg, msgdata): + # Try to get the list lock. + try: + mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + # Oh well, try again later + return 1 + # Process the message through a handler pipeline. The handler + # pipeline can actually come from one of three places: the message + # metadata, the mlist, or the global pipeline. + # + # If a message was requeued due to an uncaught exception, its metadata + # will contain the retry pipeline. Use this above all else. + # Otherwise, if the mlist has a `pipeline' attribute, it should be + # used. Final fallback is the global pipeline. + try: + pipeline = self._get_pipeline(mlist, msg, msgdata) + status = self._dopipeline(mlist, msg, msgdata, pipeline) + if status: + msgdata['pipeline'] = pipeline + mlist.Save() + return status + finally: + mlist.Unlock() + + # Overridable + def _get_pipeline(self, mlist, msg, msgdata): + # We must return a copy of the list, otherwise, the first message that + # flows through the pipeline will empty it out! + return msgdata.get('pipeline', + getattr(mlist, 'pipeline', + mm_cfg.GLOBAL_PIPELINE))[:] + + def _dopipeline(self, mlist, msg, msgdata, pipeline): + while pipeline: + handler = pipeline.pop(0) + modname = 'Mailman.Handlers.' + handler + __import__(modname) + try: + pid = os.getpid() + sys.modules[modname].process(mlist, msg, msgdata) + # Failsafe -- a child may have leaked through. + if pid <> os.getpid(): + syslog('error', 'child process leaked thru: %s', modname) + os._exit(1) + except Errors.DiscardMessage: + # Throw the message away; we need do nothing else with it. + syslog('vette', 'Message discarded, msgid: %s', + msg.get('message-id', 'n/a')) + return 0 + except Errors.HoldMessage: + # Let the approval process take it from here. The message no + # longer needs to be queued. + return 0 + except Errors.RejectMessage, e: + mlist.BounceMessage(msg, msgdata, e) + return 0 + # We've successfully completed handling of this message + return 0 diff --git a/Mailman/Queue/MaildirRunner.py b/Mailman/Queue/MaildirRunner.py new file mode 100644 index 00000000..e14ab339 --- /dev/null +++ b/Mailman/Queue/MaildirRunner.py @@ -0,0 +1,184 @@ +# Copyright (C) 2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Maildir pre-queue runner. + +Most MTAs can be configured to deliver messages to a `Maildir'[1]. This +runner will read messages from a maildir's new/ directory and inject them into +Mailman's qfiles/in directory for processing in the normal pipeline. This +delivery mechanism contrasts with mail program delivery, where incoming +messages end up in qfiles/in via the MTA executing the scripts/post script +(and likewise for the other -aliases for each mailing list). + +The advantage to Maildir delivery is that it is more efficient; there's no +need to fork an intervening program just to take the message from the MTA's +standard output, to the qfiles/in directory. + +[1] http://cr.yp.to/proto/maildir.html + +We're going to use the :info flag == 1, experimental status flag for our own +purposes. The :1 can be followed by one of these letters: + +- P means that MaildirRunner's in the process of parsing and enqueuing the + message. If successful, it will delete the file. + +- X means something failed during the parse/enqueue phase. An error message + will be logged to log/error and the file will be renamed <filename>:1,X. + MaildirRunner will never automatically return to this file, but once the + problem is fixed, you can manually move the file back to the new/ directory + and MaildirRunner will attempt to re-process it. At some point we may do + this automatically. + +See the variable USE_MAILDIR in Defaults.py.in for enabling this delivery +mechanism. +""" + +# NOTE: Maildir delivery is experimental in Mailman 2.1. + +import os +import re +import errno + +from email.Parser import Parser +from email.Utils import parseaddr + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman.Message import Message +from Mailman.Queue.Runner import Runner +from Mailman.Queue.sbcache import get_switchboard +from Mailman.Logging.Syslog import syslog + +# We only care about the listname and the subq as in listname@ or +# listname-request@ +lre = re.compile(r""" + ^ # start of string + (?P<listname>[^-@]+) # listname@ or listname-subq@ + (?: # non-grouping + - # dash separator + (?P<subq>[^-+@]+) # everything up to + or - or @ + )? # if it exists + """, re.VERBOSE | re.IGNORECASE) + + + +class MaildirRunner(Runner): + # This class is much different than most runners because it pulls files + # of a different format than what scripts/post and friends leaves. The + # files this runner reads are just single message files as dropped into + # the directory by the MTA. This runner will read the file, and enqueue + # it in the expected qfiles directory for normal processing. + def __init__(self, slice=None, numslices=1): + # Don't call the base class constructor, but build enough of the + # underlying attributes to use the base class's implementation. + self._stop = 0 + self._dir = os.path.join(mm_cfg.MAILDIR_DIR, 'new') + self._cur = os.path.join(mm_cfg.MAILDIR_DIR, 'cur') + self._parser = Parser(Message) + + def _oneloop(self): + # Refresh this each time through the list. BAW: could be too + # expensive. + listnames = Utils.list_names() + # Cruise through all the files currently in the new/ directory + try: + files = os.listdir(self._dir) + except OSError, e: + if e.errno <> errno.ENOENT: raise + # Nothing's been delivered yet + return 0 + for file in files: + srcname = os.path.join(self._dir, file) + dstname = os.path.join(self._cur, file + ':1,P') + xdstname = os.path.join(self._cur, file + ':1,X') + try: + os.rename(srcname, dstname) + except OSError, e: + if e.errno == errno.ENOENT: + # Some other MaildirRunner beat us to it + continue + syslog('error', 'Could not rename maildir file: %s', srcname) + raise + # Now open, read, parse, and enqueue this message + try: + fp = open(dstname) + try: + msg = self._parser.parse(fp) + finally: + fp.close() + # Now we need to figure out which queue of which list this + # message was destined for. See verp_bounce() in + # BounceRunner.py for why we do things this way. + vals = [] + for header in ('delivered-to', 'envelope-to', 'apparently-to'): + vals.extend(msg.get_all(header, [])) + for field in vals: + to = parseaddr(field)[1] + if not to: + continue + mo = lre.match(to) + if not mo: + # This isn't an address we care about + continue + listname, subq = mo.group('listname', 'subq') + if listname in listnames: + break + else: + # As far as we can tell, this message isn't destined for + # any list on the system. What to do? + syslog('error', 'Message apparently not for any list: %s', + xdstname) + os.rename(dstname, xdstname) + continue + # BAW: blech, hardcoded + msgdata = {'listname': listname} + # -admin is deprecated + if subq in ('bounces', 'admin'): + queue = get_switchboard(mm_cfg.BOUNCEQUEUE_DIR) + elif subq == 'confirm': + msgdata['toconfirm'] = 1 + queue = get_switchboard(mm_cfg.CMDQUEUE_DIR) + elif subq in ('join', 'subscribe'): + msgdata['tojoin'] = 1 + queue = get_switchboard(mm_cfg.CMDQUEUE_DIR) + elif subq in ('leave', 'unsubscribe'): + msgdata['toleave'] = 1 + queue = get_switchboard(mm_cfg.CMDQUEUE_DIR) + elif subq == 'owner': + msgdata.update({ + 'toowner': 1, + 'envsender': Utils.get_site_email(extra='bounces'), + 'pipeline': mm_cfg.OWNER_PIPELINE, + }) + queue = get_switchboard(mm_cfg.INQUEUE_DIR) + elif subq is None: + msgdata['tolist'] = 1 + queue = get_switchboard(mm_cfg.INQUEUE_DIR) + elif subq == 'request': + msgdata['torequest'] = 1 + queue = get_switchboard(mm_cfg.CMDQUEUE_DIR) + else: + syslog('error', 'Unknown sub-queue: %s', subq) + os.rename(dstname, xdstname) + continue + queue.enqueue(msg, msgdata) + os.unlink(dstname) + except Exception, e: + os.rename(dstname, xdstname) + syslog('error', str(e)) + + def _cleanup(self): + pass diff --git a/Mailman/Queue/Makefile.in b/Mailman/Queue/Makefile.in new file mode 100644 index 00000000..a92ae67d --- /dev/null +++ b/Mailman/Queue/Makefile.in @@ -0,0 +1,69 @@ +# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +# NOTE: Makefile.in is converted into Makefile by the configure script +# in the parent directory. Once configure has run, you can recreate +# the Makefile by running just config.status. + +# Variables set by configure + +VPATH= @srcdir@ +srcdir= @srcdir@ +bindir= @bindir@ +prefix= @prefix@ +exec_prefix= @exec_prefix@ + +CC= @CC@ +CHMOD= @CHMOD@ +INSTALL= @INSTALL@ + +DEFS= @DEFS@ + +# Customizable but not set by configure + +OPT= @OPT@ +CFLAGS= $(OPT) $(DEFS) +PACKAGEDIR= $(prefix)/Mailman/Queue +SHELL= /bin/sh + +MODULES= *.py + +# Modes for directories and executables created by the install +# process. Default to group-writable directories but +# user-only-writable for executables. +DIRMODE= 775 +EXEMODE= 755 +FILEMODE= 644 +INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE) + + +# Rules + +all: + +install: + for f in $(MODULES); \ + do \ + $(INSTALL) -m $(FILEMODE) $(srcdir)/$$f $(PACKAGEDIR); \ + done + +finish: + +clean: + +distclean: + -rm *.pyc + -rm Makefile diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/Queue/NewsRunner.py new file mode 100644 index 00000000..0439f0e1 --- /dev/null +++ b/Mailman/Queue/NewsRunner.py @@ -0,0 +1,158 @@ +# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""NNTP queue runner.""" + +import re +import socket +import nntplib +from cStringIO import StringIO + +import email +from email.Utils import getaddresses + +COMMASPACE = ', ' + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman.Queue.Runner import Runner +from Mailman.Logging.Syslog import syslog + + +# Matches our Mailman crafted Message-IDs. See Utils.unique_message_id() +mcre = re.compile(r""" + <mailman. # match the prefix + \d+. # serial number + \d+. # time in seconds since epoch + \d+. # pid + (?P<listname>[^@]+) # list's internal_name() + @ # localpart@dom.ain + (?P<hostname>[^>]+) # list's host_name + > # trailer + """, re.VERBOSE) + + + +class NewsRunner(Runner): + QDIR = mm_cfg.NEWSQUEUE_DIR + + def _dispose(self, mlist, msg, msgdata): + # Make sure we have the most up-to-date state + mlist.Load() + if not msgdata.get('prepped'): + prepare_message(mlist, msg, msgdata) + try: + # Flatten the message object, sticking it in a StringIO object + fp = StringIO(msg.as_string()) + conn = None + try: + try: + conn = nntplib.NNTP(mlist.nntp_host, readermode=1, + user=mm_cfg.NNTP_USERNAME, + password=mm_cfg.NNTP_PASSWORD) + conn.post(fp) + except nntplib.error_temp, e: + syslog('error', + '(NNTPDirect) NNTP error for list "%s": %s', + mlist.internal_name(), e) + except socket.error, e: + syslog('error', + '(NNTPDirect) socket error for list "%s": %s', + mlist.internal_name(), e) + finally: + if conn: + conn.quit() + except Exception, e: + # Some other exception occurred, which we definitely did not + # expect, so set this message up for requeuing. + self._log(e) + return 1 + return 0 + + + +def prepare_message(mlist, msg, msgdata): + # If the newsgroup is moderated, we need to add this header for the Usenet + # software to accept the posting, and not forward it on to the n.g.'s + # moderation address. The posting would not have gotten here if it hadn't + # already been approved. 1 == open list, mod n.g., 2 == moderated + if mlist.news_moderation in (1, 2): + del msg['approved'] + msg['Approved'] = mlist.GetListEmail() + # Should we restore the original, non-prefixed subject for gatewayed + # messages? + origsubj = msgdata.get('origsubj') + if not mlist.news_prefix_subject_too and origsubj is not None: + del msg['subject'] + msg['subject'] = origsubj + # Add the appropriate Newsgroups: header + ngheader = msg['newsgroups'] + if ngheader is not None: + # See if the Newsgroups: header already contains our linked_newsgroup. + # If so, don't add it again. If not, append our linked_newsgroup to + # the end of the header list + ngroups = [s.strip() for s in ngheader.split(',')] + if mlist.linked_newsgroup not in ngroups: + ngroups.append(mlist.linked_newsgroup) + # Subtitute our new header for the old one. + del msg['newsgroups'] + msg['Newsgroups'] = COMMASPACE.join(ngroups) + else: + # Newsgroups: isn't in the message + msg['Newsgroups'] = mlist.linked_newsgroup + # Note: We need to be sure two messages aren't ever sent to the same list + # in the same process, since message ids need to be unique. Further, if + # messages are crossposted to two Usenet-gated mailing lists, they each + # need to have unique message ids or the nntpd will only accept one of + # them. The solution here is to substitute any existing message-id that + # isn't ours with one of ours, so we need to parse it to be sure we're not + # looping. + # + # Our Message-ID format is <mailman.secs.pid.listname@hostname> + msgid = msg['message-id'] + hackmsgid = 1 + if msgid: + mo = mcre.search(msgid) + if mo: + lname, hname = mo.group('listname', 'hostname') + if lname == mlist.internal_name() and hname == mlist.host_name: + hackmsgid = 0 + if hackmsgid: + del msg['message-id'] + msg['Message-ID'] = Utils.unique_message_id(mlist) + # Lines: is useful + if msg['Lines'] is None: + # BAW: is there a better way? + count = len(list(email.Iterators.body_line_iterator(msg))) + msg['Lines'] = str(count) + # Massage the message headers by remove some and rewriting others. This + # woon't completely sanitize the message, but it will eliminate the bulk + # of the rejections based on message headers. The NNTP server may still + # reject the message because of other problems. + for header in mm_cfg.NNTP_REMOVE_HEADERS: + del msg[header] + for header, rewrite in mm_cfg.NNTP_REWRITE_DUPLICATE_HEADERS: + values = msg.get_all(header, []) + if len(values) < 2: + # We only care about duplicates + continue + del msg[header] + # But keep the first one... + msg[header] = values[0] + for v in values[1:]: + msg[rewrite] = v + # Mark this message as prepared in case it has to be requeued + msgdata['prepped'] = 1 diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py new file mode 100644 index 00000000..aed8dcb9 --- /dev/null +++ b/Mailman/Queue/OutgoingRunner.py @@ -0,0 +1,139 @@ +# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Outgoing queue runner.""" + +import sys +import os +import time +import socket + +import email + +from Mailman import mm_cfg +from Mailman import Message +from Mailman import Errors +from Mailman import LockFile +from Mailman.Queue.Runner import Runner +from Mailman.Logging.Syslog import syslog + +# This controls how often _doperiodic() will try to deal with deferred +# permanent failures. +DEAL_WITH_PERMFAILURES_EVERY = 1 + + + +class OutgoingRunner(Runner): + QDIR = mm_cfg.OUTQUEUE_DIR + + def __init__(self, slice=None, numslices=1): + Runner.__init__(self, slice, numslices) + # Maps mailing lists to (recip, msg) tuples + self._permfailures = {} + self._permfail_counter = 0 + # We look this function up only at startup time + modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE + mod = __import__(modname) + self._func = getattr(sys.modules[modname], 'process') + # This prevents smtp server connection problems from filling up the + # error log. It gets reset if the message was successfully sent, and + # set if there was a socket.error. + self.__logged = 0 + + def _dispose(self, mlist, msg, msgdata): + # Make sure we have the most up-to-date state + mlist.Load() + try: + pid = os.getpid() + self._func(mlist, msg, msgdata) + # Failsafe -- a child may have leaked through. + if pid <> os.getpid(): + syslog('error', 'child process leaked thru: %s', modname) + os._exit(1) + self.__logged = 0 + except socket.error: + # There was a problem connecting to the SMTP server. Log this + # once, but crank up our sleep time so we don't fill the error + # log. + port = mm_cfg.SMTPPORT + if port == 0: + port = 'smtp' + # Log this just once. + if not self.__logged: + syslog('error', 'Cannot connect to SMTP server %s on port %s', + mm_cfg.SMTPHOST, port) + self.__logged = 1 + return 1 + except Errors.SomeRecipientsFailed, e: + # The delivery module being used (SMTPDirect or Sendmail) failed + # to deliver the message to one or all of the recipients. + # Permanent failures should be registered (but registration + # requires the list lock), and temporary failures should be + # retried later. + # + # For permanent failures, make a copy of the message for bounce + # handling. I'm not sure this is necessary, or the right thing to + # do. + pcnt = len(e.permfailures) + copy = email.message_from_string(str(msg)) + self._permfailures.setdefault(mlist, []).extend( + zip(e.permfailures, [copy] * pcnt)) + # Temporary failures + if not e.tempfailures: + # Don't need to keep the message queued if there were only + # permanent failures. + return 0 + now = time.time() + recips = e.tempfailures + last_recip_count = msgdata.get('last_recip_count', 0) + deliver_until = msgdata.get('deliver_until', now) + if len(recips) == last_recip_count: + # We didn't make any progress, so don't attempt delivery any + # longer. BAW: is this the best disposition? + if now > deliver_until: + return 0 + else: + # Keep trying to delivery this for 3 days + deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD + msgdata['last_recip_count'] = len(recips) + msgdata['deliver_until'] = deliver_until + msgdata['recips'] = recips + # Requeue + return 1 + # We've successfully completed handling of this message + return 0 + + def _doperiodic(self): + # Periodically try to acquire the list lock and clear out the + # permanent failures. + self._permfail_counter += 1 + if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY: + return + # Reset the counter + self._permfail_counter = 0 + # And deal with the deferred permanent failures. + for mlist in self._permfailures.keys(): + try: + mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + return + try: + for recip, msg in self._permfailures[mlist]: + mlist.registerBounce(recip, msg) + del self._permfailures[mlist] + mlist.Save() + finally: + mlist.Unlock() diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py new file mode 100644 index 00000000..134dac99 --- /dev/null +++ b/Mailman/Queue/Runner.py @@ -0,0 +1,245 @@ +# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Generic queue runner class. +""" + +import time +import traceback +import weakref +from cStringIO import StringIO + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman import Errors +from Mailman import MailList +from Mailman import i18n + +from Mailman.Queue.Switchboard import Switchboard +from Mailman.Logging.Syslog import syslog + + + +class Runner: + QDIR = None + SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME + + def __init__(self, slice=None, numslices=1): + self._kids = {} + # Create our own switchboard. Don't use the switchboard cache because + # we want to provide slice and numslice arguments. + self._switchboard = Switchboard(self.QDIR, slice, numslices) + # Create the shunt switchboard + self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) + self._stop = 0 + + def stop(self): + self._stop = 1 + + def run(self): + # Start the main loop for this queue runner. + try: + try: + while 1: + # Once through the loop that processes all the files in + # the queue directory. + filecnt = self._oneloop() + # Do the periodic work for the subclass. BAW: this + # shouldn't be called here. There should be one more + # _doperiodic() call at the end of the _oneloop() loop. + self._doperiodic() + # If the stop flag is set, we're done. + if self._stop: + break + # If there were no files to process, then we'll simply + # sleep for a little while and expect some to show up. + if not filecnt: + self._snooze() + except KeyboardInterrupt: + pass + finally: + # We've broken out of our main loop, so we want to reap all the + # subprocesses we've created and do any other necessary cleanups. + self._cleanup() + + def _oneloop(self): + # First, list all the files in our queue directory. + # Switchboard.files() is guaranteed to hand us the files in FIFO + # order. Return an integer count of the number of files that were + # available for this qrunner to process. A non-zero value tells run() + # not to snooze for a while. + files = self._switchboard.files() + for filebase in files: + # Ask the switchboard for the message and metadata objects + # associated with this filebase. + msg, msgdata = self._switchboard.dequeue(filebase) + # It's possible one or both files got lost. If so, just ignore + # this filebase entry. dequeue() will automatically unlink the + # other file, but we should log an error message for diagnostics. + if msg is None or msgdata is None: + syslog('error', 'lost data files for filebase: %s', filebase) + else: + # Now that we've dequeued the message, we want to be + # incredibly anal about making sure that no uncaught exception + # could cause us to lose the message. All runners that + # implement _dispose() must guarantee that exceptions are + # caught and dealt with properly. Still, there may be a bug + # in the infrastructure, and we do not want those to cause + # messages to be lost. Any uncaught exceptions will cause the + # message to be stored in the shunt queue for human + # intervention. + try: + self._onefile(msg, msgdata) + except Exception, e: + self._log(e) + syslog('error', 'SHUNTING: %s', filebase) + # Put a marker in the metadata for unshunting + msgdata['whichq'] = self._switchboard.whichq() + self._shunt.enqueue(msg, msgdata) + # Other work we want to do each time through the loop + Utils.reap(self._kids, once=1) + self._doperiodic() + if self._shortcircuit(): + break + return len(files) + + def _onefile(self, msg, msgdata): + # Do some common sanity checking on the message metadata. It's got to + # be destined for a particular mailing list. This switchboard is used + # to shunt off badly formatted messages. We don't want to just trash + # them because they may be fixable with human intervention. Just get + # them out of our site though. + # + # Find out which mailing list this message is destined for. + listname = msgdata.get('listname') + if not listname: + listname = mm_cfg.MAILMAN_SITE_LIST + mlist = self._open_list(listname) + if not mlist: + syslog('error', + 'Dequeuing message destined for missing list: %s', + listname) + self._shunt.enqueue(msg, msgdata) + return + # Now process this message, keeping track of any subprocesses that may + # have been spawned. We'll reap those later. + # + # We also want to set up the language context for this message. The + # context will be the preferred language for the user if a member of + # the list, or the list's preferred language. However, we must take + # special care to reset the defaults, otherwise subsequent messages + # may be translated incorrectly. BAW: I'm not sure I like this + # approach, but I can't think of anything better right now. + otranslation = i18n.get_translation() + sender = msg.get_sender() + if mlist: + lang = mlist.getMemberLanguage(sender) + else: + lang = mm_cfg.DEFAULT_SERVER_LANGUAGE + i18n.set_language(lang) + msgdata['lang'] = lang + try: + keepqueued = self._dispose(mlist, msg, msgdata) + finally: + i18n.set_translation(otranslation) + # Keep tabs on any child processes that got spawned. + kids = msgdata.get('_kids') + if kids: + self._kids.update(kids) + if keepqueued: + self._switchboard.enqueue(msg, msgdata) + + # Mapping of listnames to MailList instances as a weak value dictionary. + _listcache = weakref.WeakValueDictionary() + + def _open_list(self, listname): + # Cache the open list so that any use of the list within this process + # uses the same object. We use a WeakValueDictionary so that when the + # list is no longer necessary, its memory is freed. + mlist = self._listcache.get(listname) + if not mlist: + try: + mlist = MailList.MailList(listname, lock=0) + except Errors.MMListError, e: + syslog('error', 'error opening list: %s\n%s', listname, e) + return None + else: + self._listcache[listname] = mlist + return mlist + + def _log(self, exc): + syslog('error', 'Uncaught runner exception: %s', exc) + s = StringIO() + traceback.print_exc(file=s) + syslog('error', s.getvalue()) + + # + # Subclasses can override these methods. + # + def _cleanup(self): + """Clean up upon exit from the main processing loop. + + Called when the Runner's main loop is stopped, this should perform + any necessary resource deallocation. Its return value is irrelevant. + """ + Utils.reap(self._kids) + + def _dispose(self, mlist, msg, msgdata): + """Dispose of a single message destined for a mailing list. + + Called for each message that the Runner is responsible for, this is + the primary overridable method for processing each message. + Subclasses, must provide implementation for this method. + + mlist is the MailList instance this message is destined for. + + msg is the Message object representing the message. + + msgdata is a dictionary of message metadata. + """ + raise NotImplementedError + + def _doperiodic(self): + """Do some processing `every once in a while'. + + Called every once in a while both from the Runner's main loop, and + from the Runner's hash slice processing loop. You can do whatever + special periodic processing you want here, and the return value is + irrelevant. + + """ + pass + + def _snooze(self): + """Sleep for a little while, because there was nothing to do. + + This is called from the Runner's main loop, but only when the last + processing loop had no work to do (i.e. there were no messages in it's + little slice of hash space). + """ + if self.SLEEPTIME <= 0: + return + time.sleep(self.SLEEPTIME) + + def _shortcircuit(self): + """Return a true value if the individual file processing loop should + exit before it's finished processing each message in the current slice + of hash space. A false value tells _oneloop() to continue processing + until the current snapshot of hash space is exhausted. + + You could, for example, implement a throttling algorithm here. + """ + return self._stop diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py new file mode 100644 index 00000000..530055ad --- /dev/null +++ b/Mailman/Queue/Switchboard.py @@ -0,0 +1,340 @@ +# Copyright (C) 2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Reading and writing message objects and message metadata. +""" + +# enqueue() and dequeue() are not symmetric. enqueue() takes a Message +# object. dequeue() returns a email.Message object tree. +# +# Message metadata is represented internally as a Python dictionary. Keys and +# values must be strings. When written to a queue directory, the metadata is +# written into an externally represented format, as defined here. Because +# components of the Mailman system may be written in something other than +# Python, the external interchange format should be chosen based on what those +# other components can read and write. +# +# Most efficient, and recommended if everything is Python, is Python marshal +# format. Also supported by default is Berkeley db format (using the default +# bsddb module compiled into your Python executable -- usually Berkeley db +# 2), and rfc822 style plain text. You can write your own if you have other +# needs. + +import os +import time +import sha +import marshal +import errno +import cPickle + +import email + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman import Message +from Mailman.Logging.Syslog import syslog + +# 20 bytes of all bits set, maximum sha.digest() value +shamax = 0xffffffffffffffffffffffffffffffffffffffffL + +SAVE_MSGS_AS_PICKLES = 1 + + + +class _Switchboard: + def __init__(self, whichq, slice=None, numslices=1): + self.__whichq = whichq + # Create the directory if it doesn't yet exist. + # FIXME + omask = os.umask(0) # rwxrws--- + try: + try: + os.mkdir(self.__whichq, 0770) + except OSError, e: + if e.errno <> errno.EEXIST: raise + finally: + os.umask(omask) + # Fast track for no slices + self.__lower = None + self.__upper = None + # BAW: test performance and end-cases of this algorithm + if numslices <> 1: + self.__lower = (shamax * slice) / numslices + self.__upper = (shamax * (slice+1)) / numslices + + def whichq(self): + return self.__whichq + + def enqueue(self, _msg, _metadata={}, **_kws): + # Calculate the SHA hexdigest of the message to get a unique base + # filename. We're also going to use the digest as a hash into the set + # of parallel qrunner processes. + data = _metadata.copy() + data.update(_kws) + listname = data.get('listname', '--nolist--') + # Get some data for the input to the sha hash + now = time.time() + if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): + msgsave = cPickle.dumps(_msg, 1) + ext = '.pck' + else: + msgsave = str(_msg) + ext = '.msg' + hashfood = msgsave + listname + `now` + # Encode the current time into the file name for FIFO sorting in + # files(). The file name consists of two parts separated by a `+': + # the received time for this message (i.e. when it first showed up on + # this system) and the sha hex digest. + #rcvtime = data.setdefault('received_time', now) + rcvtime = data.setdefault('received_time', now) + filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() + # Figure out which queue files the message is to be written to. + msgfile = os.path.join(self.__whichq, filebase + ext) + dbfile = os.path.join(self.__whichq, filebase + '.db') + # Always add the metadata schema version number + data['version'] = mm_cfg.QFILE_SCHEMA_VERSION + # Filter out volatile entries + for k in data.keys(): + if k[0] == '_': + del data[k] + # Now write the message text to one file and the metadata to another + # file. The metadata is always written second to avoid race + # conditions with the various queue runners (which key off of the .db + # filename). + omask = os.umask(007) # -rw-rw---- + try: + msgfp = open(msgfile, 'w') + finally: + os.umask(omask) + msgfp.write(msgsave) + msgfp.close() + # Now write the metadata using the appropriate external metadata + # format. We play rename-switcheroo here to further plug the race + # condition holes. + tmpfile = dbfile + '.tmp' + self._ext_write(tmpfile, data) + os.rename(tmpfile, dbfile) + + def dequeue(self, filebase): + # Calculate the .db and .msg filenames from the given filebase. + msgfile = os.path.join(self.__whichq, filebase + '.msg') + pckfile = os.path.join(self.__whichq, filebase + '.pck') + dbfile = os.path.join(self.__whichq, filebase + '.db') + # Now we are going to read the message and metadata for the given + # filebase. We want to read things in this order: first, the metadata + # file to find out whether the message is stored as a pickle or as + # plain text. Second, the actual message file. However, we want to + # first unlink the message file and then the .db file, because the + # qrunner only cues off of the .db file + msg = data = None + try: + data = self._ext_read(dbfile) + os.unlink(dbfile) + except EnvironmentError, e: + if e.errno <> errno.ENOENT: raise + # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata + # was renamed to `rejection_notice', since dashes in the keys are not + # supported in METAFMT_ASCII. + if data.has_key('rejection-notice'): + data['rejection_notice'] = data['rejection-notice'] + del data['rejection-notice'] + msgfp = None + try: + try: + msgfp = open(pckfile) + msg = cPickle.load(msgfp) + os.unlink(pckfile) + except EnvironmentError, e: + if e.errno <> errno.ENOENT: raise + msgfp = None + try: + msgfp = open(msgfile) + msg = email.message_from_file(msgfp, Message.Message) + os.unlink(msgfile) + except EnvironmentError, e: + if e.errno <> errno.ENOENT: raise + except email.Errors.MessageParseError, e: + # This message was unparsable, most likely because its + # MIME encapsulation was broken. For now, there's not + # much we can do about it. + syslog('error', 'message is unparsable: %s', filebase) + msgfp.close() + msgfp = None + if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES: + # Cheapo way to ensure the directory exists w/ the + # proper permissions. + sb = Switchboard(mm_cfg.BADQUEUE_DIR) + os.rename(msgfile, os.path.join( + mm_cfg.BADQUEUE_DIR, filebase + '.txt')) + else: + os.unlink(msgfile) + msg = data = None + finally: + if msgfp: + msgfp.close() + return msg, data + + def files(self): + times = {} + lower = self.__lower + upper = self.__upper + for f in os.listdir(self.__whichq): + # We only care about the file's base name (i.e. no extension). + # Thus we'll ignore anything that doesn't end in .db. + if not f.endswith('.db'): + continue + filebase = os.path.splitext(f)[0] + when, digest = filebase.split('+') + # Throw out any files which don't match our bitrange. BAW: test + # performance and end-cases of this algorithm. + if not lower or (lower <= long(digest, 16) < upper): + times[float(when)] = filebase + # FIFO sort + keys = times.keys() + keys.sort() + return [times[k] for k in keys] + + def _ext_write(self, tmpfile, data): + raise NotImplementedError + + def _ext_read(self, dbfile): + raise NotImplementedError + + + +class MarshalSwitchboard(_Switchboard): + """Python marshal format.""" + FLOAT_ATTRIBUTES = ['received_time'] + + def _ext_write(self, filename, dict): + omask = os.umask(007) # -rw-rw---- + try: + fp = open(filename, 'w') + finally: + os.umask(omask) + # Python's marshal, up to and including in Python 2.1, has a bug where + # the full precision of floats was not stored. We work around this + # bug by hardcoding a list of float values we know about, repr()-izing + # them ourselves, and doing the reverse conversion on _ext_read(). + for attr in self.FLOAT_ATTRIBUTES: + # We use try/except because we expect a hitrate of nearly 100% + try: + fval = dict[attr] + except KeyError: + pass + else: + dict[attr] = repr(fval) + marshal.dump(dict, fp) + fp.close() + + def _ext_read(self, filename): + fp = open(filename) + dict = marshal.load(fp) + # Update from version 2 files + if dict.get('version', 0) == 2: + del dict['filebase'] + # Do the reverse conversion (repr -> float) + for attr in self.FLOAT_ATTRIBUTES: + try: + sval = dict[attr] + except KeyError: + pass + else: + # Do a safe eval by setting up a restricted execution + # environment. This may not be strictly necessary since we + # know they are floats, but it can't hurt. + dict[attr] = eval(sval, {'__builtins__': {}}) + fp.close() + return dict + + + +class BSDDBSwitchboard(_Switchboard): + """Native (i.e. compiled-in) Berkeley db format.""" + def _ext_write(self, filename, dict): + import bsddb + omask = os.umask(0) + try: + hashfile = bsddb.hashopen(filename, 'n', 0660) + finally: + os.umask(omask) + # values must be strings + for k, v in dict.items(): + hashfile[k] = marshal.dumps(v) + hashfile.sync() + hashfile.close() + + def _ext_read(self, filename): + import bsddb + dict = {} + hashfile = bsddb.hashopen(filename, 'r') + for k in hashfile.keys(): + dict[k] = marshal.loads(hashfile[k]) + hashfile.close() + return dict + + + +class ASCIISwitchboard(_Switchboard): + """Human readable .db file format. + + key/value pairs are written as + + key = value + + as real Python code which can be execfile'd. + """ + + def _ext_write(self, filename, dict): + omask = os.umask(007) # -rw-rw---- + try: + fp = open(filename, 'w') + finally: + os.umask(omask) + for k, v in dict.items(): + print >> fp, '%s = %s' % (k, repr(v)) + fp.close() + + def _ext_read(self, filename): + dict = {'__builtins__': {}} + execfile(filename, dict) + del dict['__builtins__'] + return dict + + + +# Here are the various types of external file formats available. The format +# chosen is given defined in the mm_cfg.py configuration file. +if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL: + Switchboard = MarshalSwitchboard +elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE: + Switchboard = BSDDBSwitchboard +elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII: + Switchboard = ASCIISwitchboard +else: + syslog('error', 'Undefined metadata format: %d (using marshals)', + mm_cfg.METADATA_FORMAT) + Switchboard = MarshalSwitchboard + + + +# For bin/dumpdb +class DumperSwitchboard(Switchboard): + def __init__(self): + pass + + def read(self, filename): + return self._ext_read(filename) diff --git a/Mailman/Queue/VirginRunner.py b/Mailman/Queue/VirginRunner.py new file mode 100644 index 00000000..720ecd25 --- /dev/null +++ b/Mailman/Queue/VirginRunner.py @@ -0,0 +1,43 @@ +# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Virgin message queue runner. + +This qrunner handles messages that the Mailman system gives virgin birth to. +E.g. acknowledgement responses to user posts or Replybot messages. They need +to go through some minimal processing before they can be sent out to the +recipient. +""" + +from Mailman import mm_cfg +from Mailman.Queue.Runner import Runner +from Mailman.Queue.IncomingRunner import IncomingRunner + + + +class VirginRunner(IncomingRunner): + QDIR = mm_cfg.VIRGINQUEUE_DIR + + def _dispose(self, mlist, msg, msgdata): + # We need to fasttrack this message through any handlers that touch + # it. E.g. especially CookHeaders. + msgdata['_fasttrack'] = 1 + return IncomingRunner._dispose(self, mlist, msg, msgdata) + + def _get_pipeline(self, mlist, msg, msgdata): + # It's okay to hardcode this, since it'll be the same for all + # internally crafted messages. + return ['CookHeaders', 'ToOutgoing'] diff --git a/Mailman/Queue/__init__.py b/Mailman/Queue/__init__.py new file mode 100644 index 00000000..cdf93257 --- /dev/null +++ b/Mailman/Queue/__init__.py @@ -0,0 +1,15 @@ +# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. diff --git a/Mailman/Queue/sbcache.py b/Mailman/Queue/sbcache.py new file mode 100644 index 00000000..9b918fc5 --- /dev/null +++ b/Mailman/Queue/sbcache.py @@ -0,0 +1,26 @@ +# Copyright (C) 2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""A factory of Switchboards with caching.""" + +from Mailman.Queue.Switchboard import Switchboard + +# a mapping from queue directory to Switchboard instance +_sbcache = {} + +def get_switchboard(qdir): + switchboard = _sbcache.setdefault(qdir, Switchboard(qdir)) + return switchboard |