aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman/Queue
diff options
context:
space:
mode:
author <>2003-01-02 05:25:50 +0000
committer <>2003-01-02 05:25:50 +0000
commitb132a73f15e432eaf43310fce9196ca0c0651465 (patch)
treec15f816ba7c4de99fef510e3bd75af0890d47441 /Mailman/Queue
downloadmailman2-b132a73f15e432eaf43310fce9196ca0c0651465.tar.gz
mailman2-b132a73f15e432eaf43310fce9196ca0c0651465.tar.xz
mailman2-b132a73f15e432eaf43310fce9196ca0c0651465.zip
This commit was manufactured by cvs2svn to create branch
'Release_2_1-maint'.
Diffstat (limited to '')
-rw-r--r--Mailman/Queue/.cvsignore1
-rw-r--r--Mailman/Queue/ArchRunner.py76
-rw-r--r--Mailman/Queue/BounceRunner.py195
-rw-r--r--Mailman/Queue/CommandRunner.py220
-rw-r--r--Mailman/Queue/IncomingRunner.py170
-rw-r--r--Mailman/Queue/MaildirRunner.py184
-rw-r--r--Mailman/Queue/Makefile.in69
-rw-r--r--Mailman/Queue/NewsRunner.py158
-rw-r--r--Mailman/Queue/OutgoingRunner.py139
-rw-r--r--Mailman/Queue/Runner.py245
-rw-r--r--Mailman/Queue/Switchboard.py340
-rw-r--r--Mailman/Queue/VirginRunner.py43
-rw-r--r--Mailman/Queue/__init__.py15
-rw-r--r--Mailman/Queue/sbcache.py26
14 files changed, 1881 insertions, 0 deletions
diff --git a/Mailman/Queue/.cvsignore b/Mailman/Queue/.cvsignore
new file mode 100644
index 00000000..f3c7a7c5
--- /dev/null
+++ b/Mailman/Queue/.cvsignore
@@ -0,0 +1 @@
+Makefile
diff --git a/Mailman/Queue/ArchRunner.py b/Mailman/Queue/ArchRunner.py
new file mode 100644
index 00000000..14097332
--- /dev/null
+++ b/Mailman/Queue/ArchRunner.py
@@ -0,0 +1,76 @@
+# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Outgoing queue runner."""
+
+import time
+from email.Utils import parsedate_tz, mktime_tz, formatdate
+
+from Mailman import mm_cfg
+from Mailman import LockFile
+from Mailman.Queue.Runner import Runner
+
+
+
+class ArchRunner(Runner):
+ QDIR = mm_cfg.ARCHQUEUE_DIR
+
+ def _dispose(self, mlist, msg, msgdata):
+ # Support clobber_date, i.e. setting the date in the archive to the
+ # received date, not the (potentially bogus) Date: header of the
+ # original message.
+ clobber = 0
+ originaldate = msg.get('date')
+ receivedtime = formatdate(msgdata['received_time'])
+ if not originaldate:
+ clobber = 1
+ elif mm_cfg.ARCHIVER_CLOBBER_DATE_POLICY == 1:
+ clobber = 1
+ elif mm_cfg.ARCHIVER_CLOBBER_DATE_POLICY == 2:
+ # what's the timestamp on the original message?
+ tup = parsedate_tz(originaldate)
+ now = time.time()
+ try:
+ if not tup:
+ clobber = 1
+ elif abs(now - mktime_tz(tup)) > \
+ mm_cfg.ARCHIVER_ALLOWABLE_SANE_DATE_SKEW:
+ clobber = 1
+ except ValueError:
+ # The likely cause of this is that the year in the Date: field
+ # is horribly incorrect, e.g. (from SF bug # 571634):
+ # Date: Tue, 18 Jun 0102 05:12:09 +0500
+ # Obviously clobber such dates.
+ clobber = 1
+ if clobber:
+ del msg['date']
+ del msg['x-original-date']
+ msg['Date'] = receivedtime
+ if originaldate:
+ msg['X-Original-Date'] = originaldate
+ # Always put an indication of when we received the message.
+ msg['X-List-Received-Date'] = receivedtime
+ # Now try to get the list lock
+ try:
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ # oh well, try again later
+ return 1
+ try:
+ mlist.ArchiveMail(msg)
+ mlist.Save()
+ finally:
+ mlist.Unlock()
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py
new file mode 100644
index 00000000..e59ac47e
--- /dev/null
+++ b/Mailman/Queue/BounceRunner.py
@@ -0,0 +1,195 @@
+# Copyright (C) 2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Bounce queue runner."""
+
+import re
+from email.MIMEText import MIMEText
+from email.MIMEMessage import MIMEMessage
+from email.Utils import parseaddr
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import LockFile
+from Mailman.Message import UserNotification
+from Mailman.Bouncers import BouncerAPI
+from Mailman.Queue.Runner import Runner
+from Mailman.Queue.sbcache import get_switchboard
+from Mailman.Logging.Syslog import syslog
+from Mailman.i18n import _
+
+COMMASPACE = ', '
+
+
+
+class BounceRunner(Runner):
+ QDIR = mm_cfg.BOUNCEQUEUE_DIR
+ # We only do bounce processing once per minute.
+ SLEEPTIME = mm_cfg.minutes(1)
+
+ def _dispose(self, mlist, msg, msgdata):
+ # Make sure we have the most up-to-date state
+ mlist.Load()
+ outq = get_switchboard(mm_cfg.OUTQUEUE_DIR)
+ # There are a few possibilities here:
+ #
+ # - the message could have been VERP'd in which case, we know exactly
+ # who the message was destined for. That make our job easy.
+ # - the message could have been originally destined for a list owner,
+ # but a list owner address itself bounced. That's bad, and for now
+ # we'll simply log the problem and attempt to deliver the message to
+ # the site owner.
+ #
+ # All messages to list-owner@vdom.ain have their envelope sender set
+ # to site-owner@dom.ain (no virtual domain). Is this a bounce for a
+ # message to a list owner, coming to the site owner?
+ if msg.get('to', '') == Utils.get_site_email(extra='-owner'):
+ # Send it on to the site owners, but craft the envelope sender to
+ # be the -loop detection address, so if /they/ bounce, we won't
+ # get stuck in a bounce loop.
+ outq.enqueue(msg, msgdata,
+ recips=[Utils.get_site_email()],
+ envsender=Utils.get_site_email(extra='loop'),
+ )
+ # List isn't doing bounce processing?
+ if not mlist.bounce_processing:
+ return
+ # Try VERP detection first, since it's quick and easy
+ addrs = verp_bounce(mlist, msg)
+ if not addrs:
+ # That didn't give us anything useful, so try the old fashion
+ # bounce matching modules
+ addrs = BouncerAPI.ScanMessages(mlist, msg)
+ # If that still didn't return us any useful addresses, then send it on
+ # or discard it.
+ if not addrs:
+ syslog('bounce', 'bounce message w/no discernable addresses: %s',
+ msg.get('message-id'))
+ maybe_forward(mlist, msg)
+ return
+ # BAW: It's possible that there are None's in the list of addresses,
+ # although I'm unsure how that could happen. Possibly ScanMessages()
+ # can let None's sneak through. In any event, this will kill them.
+ addrs = filter(None, addrs)
+ # Okay, we have some recognized addresses. We now need to register
+ # the bounces for each of these. If the bounce came to the site list,
+ # then we'll register the address on every list in the system, but
+ # note: this could be VERY resource intensive!
+ foundp = 0
+ listname = mlist.internal_name()
+ if listname == mm_cfg.MAILMAN_SITE_LIST:
+ foundp = 1
+ for listname in Utils.list_names():
+ xlist = self._open_list(listname)
+ xlist.Load()
+ for addr in addrs:
+ if xlist.isMember(addr):
+ unlockp = 0
+ if not xlist.Locked():
+ try:
+ xlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ # Oh well, forget aboutf this list
+ continue
+ unlockp = 1
+ try:
+ xlist.registerBounce(addr, msg)
+ foundp = 1
+ xlist.Save()
+ finally:
+ if unlockp:
+ xlist.Unlock()
+ else:
+ try:
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ # Try again later
+ syslog('bounce', "%s: couldn't get list lock", listname)
+ return 1
+ else:
+ try:
+ for addr in addrs:
+ if mlist.isMember(addr):
+ mlist.registerBounce(addr, msg)
+ foundp = 1
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ if not foundp:
+ # It means an address was recognized but it wasn't an address
+ # that's on any mailing list at this site. BAW: don't forward
+ # these, but do log it.
+ syslog('bounce', 'bounce message with non-members of %s: %s',
+ listname, COMMASPACE.join(addrs))
+ #maybe_forward(mlist, msg)
+
+
+
+def verp_bounce(mlist, msg):
+ bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail())
+ # Sadly not every MTA bounces VERP messages correctly, or consistently.
+ # Fall back to Delivered-To: (Postfix), Envelope-To: (Exim) and
+ # Apparently-To:, and then short-circuit if we still don't have anything
+ # to work with. Note that there can be multiple Delivered-To: headers so
+ # we need to search them all (and we don't worry about false positives for
+ # forwarded email, because only one should match VERP_REGEXP).
+ vals = []
+ for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'):
+ vals.extend(msg.get_all(header, []))
+ for field in vals:
+ to = parseaddr(field)[1]
+ if not to:
+ continue # empty header
+ mo = re.search(mm_cfg.VERP_REGEXP, to)
+ if not mo:
+ continue # no match of regexp
+ try:
+ if bmailbox <> mo.group('bounces'):
+ continue # not a bounce to our list
+ # All is good
+ addr = '%s@%s' % mo.group('mailbox', 'host')
+ except IndexError:
+ syslog('error',
+ "VERP_REGEXP doesn't yield the right match groups: %s",
+ mm_cfg.VERP_REGEXP)
+ return []
+ return [addr]
+
+
+
+def maybe_forward(mlist, msg):
+ # Does the list owner want to get non-matching bounce messages?
+ # If not, simply discard it.
+ if mlist.bounce_unrecognized_goes_to_list_owner:
+ adminurl = mlist.GetScriptURL('admin', absolute=1) + '/bounce'
+ mlist.ForwardMessage(msg,
+ text=_("""\
+The attached message was received as a bounce, but either the bounce format
+was not recognized, or no member addresses could be extracted from it. This
+mailing list has been configured to send all unrecognized bounce messages to
+the list administrator(s).
+
+For more information see:
+%(adminurl)s
+
+"""),
+ subject=_('Uncaught bounce notification'),
+ tomoderators=0)
+ syslog('bounce', 'forwarding unrecognized, message-id: %s',
+ msg.get('message-id', 'n/a'))
+ else:
+ syslog('bounce', 'discarding unrecognized, message-id: %s',
+ msg.get('message-id', 'n/a'))
diff --git a/Mailman/Queue/CommandRunner.py b/Mailman/Queue/CommandRunner.py
new file mode 100644
index 00000000..303d4c52
--- /dev/null
+++ b/Mailman/Queue/CommandRunner.py
@@ -0,0 +1,220 @@
+# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""-request robot command queue runner."""
+
+# See the delivery diagram in IncomingRunner.py. This module handles all
+# email destined for mylist-request, -join, and -leave. It no longer handles
+# bounce messages (i.e. -admin or -bounces), nor does it handle mail to
+# -owner.
+
+
+
+# BAW: get rid of this when we Python 2.2 is a minimum requirement.
+from __future__ import nested_scopes
+
+import sys
+import re
+from types import StringType, UnicodeType
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import Message
+from Mailman.Handlers import Replybot
+from Mailman.i18n import _
+from Mailman.Queue.Runner import Runner
+from Mailman.Logging.Syslog import syslog
+from Mailman import LockFile
+
+from email.MIMEText import MIMEText
+from email.MIMEMessage import MIMEMessage
+from email.Iterators import typed_subpart_iterator
+
+NL = '\n'
+
+
+
+class Results:
+ def __init__(self, mlist, msg, msgdata):
+ self.mlist = mlist
+ self.msg = msg
+ self.msgdata = msgdata
+ # Only set returnaddr if the response is to go to someone other than
+ # the address specified in the From: header (e.g. for the password
+ # command).
+ self.returnaddr = None
+ self.commands = []
+ self.results = []
+ self.ignored = []
+ self.lineno = 0
+ self.subjcmdretried = 0
+ self.respond = 1
+ # Always process the Subject: header first
+ self.commands.append(msg.get('subject', ''))
+ # Find the first text/plain part
+ part = None
+ for part in typed_subpart_iterator(msg, 'text', 'plain'):
+ break
+ if part is None or part is not msg:
+ # Either there was no text/plain part or we ignored some
+ # non-text/plain parts.
+ self.results.append(_('Ignoring non-text/plain MIME parts'))
+ if part is None:
+ # E.g the outer Content-Type: was text/html
+ return
+ body = part.get_payload()
+ # text/plain parts better have string payloads
+ assert isinstance(body, StringType) or isinstance(body, UnicodeType)
+ lines = body.splitlines()
+ # Use no more lines than specified
+ self.commands.extend(lines[:mm_cfg.DEFAULT_MAIL_COMMANDS_MAX_LINES])
+ self.ignored.extend(lines[mm_cfg.DEFAULT_MAIL_COMMANDS_MAX_LINES:])
+
+ def process(self):
+ # Now, process each line until we find an error. The first
+ # non-command line found stops processing.
+ stop = 0
+ for line in self.commands:
+ if line and line.strip():
+ args = line.split()
+ cmd = args.pop(0).lower()
+ stop = self.do_command(cmd, args)
+ self.lineno += 1
+ if stop:
+ break
+
+ def do_command(self, cmd, args=None):
+ if args is None:
+ args = ()
+ # Try to import a command handler module for this command
+ modname = 'Mailman.Commands.cmd_' + cmd
+ try:
+ __import__(modname)
+ handler = sys.modules[modname]
+ except ImportError:
+ # If we're on line zero, it was the Subject: header that didn't
+ # contain a command. It's possible there's a Re: prefix (or
+ # localized version thereof) on the Subject: line that's messing
+ # things up. Pop the prefix off and try again... once.
+ #
+ # If that still didn't work it isn't enough to stop processing.
+ # BAW: should we include a message that the Subject: was ignored?
+ if not self.subjcmdretried and args:
+ self.subjcmdretried += 1
+ cmd = args.pop(0)
+ return self.do_command(cmd, args)
+ return self.lineno <> 0
+ return handler.process(self, args)
+
+ def send_response(self):
+ # Helper
+ def indent(lines):
+ return [' ' + line for line in lines]
+ # Quick exit for some commands which don't need a response
+ if not self.respond:
+ return
+ resp = [Utils.wrap(_("""\
+The results of your email command are provided below.
+Attached is your original message.
+"""))]
+ if self.results:
+ resp.append(_('- Results:'))
+ resp.extend(indent(self.results))
+ # Ignore empty lines
+ unprocessed = [line for line in self.commands[self.lineno:]
+ if line and line.strip()]
+ if unprocessed:
+ resp.append(_('\n- Unprocessed:'))
+ resp.extend(indent(unprocessed))
+ if self.ignored:
+ resp.append(_('\n- Ignored:'))
+ resp.extend(indent(self.ignored))
+ resp.append(_('\n- Done.\n\n'))
+ results = MIMEText(
+ NL.join(resp),
+ _charset=Utils.GetCharSet(self.mlist.preferred_language))
+ # Safety valve for mail loops with misconfigured email 'bots. We
+ # don't respond to commands sent with "Precedence: bulk|junk|list"
+ # unless they explicitly "X-Ack: yes", but not all mail 'bots are
+ # correctly configured, so we max out the number of responses we'll
+ # give to an address in a single day.
+ #
+ # BAW: We wait until now to make this decision since our sender may
+ # not be self.msg.get_sender(), but I'm not sure this is right.
+ recip = self.returnaddr or self.msg.get_sender()
+ if not self.mlist.autorespondToSender(recip):
+ return
+ msg = Message.UserNotification(
+ recip,
+ self.mlist.GetBouncesEmail(),
+ _('The results of your email commands'),
+ lang=self.mlist.preferred_language)
+ msg.set_type('multipart/mixed')
+ msg.attach(results)
+ orig = MIMEMessage(self.msg)
+ msg.attach(orig)
+ msg.send(self.mlist)
+
+
+
+class CommandRunner(Runner):
+ QDIR = mm_cfg.CMDQUEUE_DIR
+
+ def _dispose(self, mlist, msg, msgdata):
+ # The policy here is similar to the Replybot policy. If a message has
+ # "Precedence: bulk|junk|list" and no "X-Ack: yes" header, we discard
+ # it to prevent replybot response storms.
+ precedence = msg.get('precedence', '').lower()
+ ack = msg.get('x-ack', '').lower()
+ if ack <> 'yes' and precedence in ('bulk', 'junk', 'list'):
+ syslog('vette', 'Precedence: %s message discarded by: %s',
+ precedence, mlist.GetRequestEmail())
+ return 0
+ # Do replybot for commands
+ mlist.Load()
+ Replybot.process(mlist, msg, msgdata)
+ if mlist.autorespond_requests == 1:
+ syslog('vette', 'replied and discard')
+ # w/discard
+ return 0
+ # Now craft the response
+ res = Results(mlist, msg, msgdata)
+ # BAW: Not all the functions of this qrunner require the list to be
+ # locked. Still, it's more convenient to lock it here and now and
+ # deal with lock failures in one place.
+ try:
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ # Oh well, try again later
+ return 1
+ # This message will have been delivered to one of mylist-request,
+ # mylist-join, or mylist-leave, and the message metadata will contain
+ # a key to which one was used.
+ try:
+ if msgdata.get('torequest'):
+ res.process()
+ elif msgdata.get('tojoin'):
+ res.do_command('join')
+ elif msgdata.get('toleave'):
+ res.do_command('leave')
+ elif msgdata.get('toconfirm'):
+ mo = re.match(mm_cfg.VERP_CONFIRM_REGEXP, msg.get('to', ''))
+ if mo:
+ res.do_command('confirm', (mo.group('cookie'),))
+ res.send_response()
+ mlist.Save()
+ finally:
+ mlist.Unlock()
diff --git a/Mailman/Queue/IncomingRunner.py b/Mailman/Queue/IncomingRunner.py
new file mode 100644
index 00000000..4a60ceb9
--- /dev/null
+++ b/Mailman/Queue/IncomingRunner.py
@@ -0,0 +1,170 @@
+# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Incoming queue runner."""
+
+# A typical Mailman list exposes nine aliases which point to seven different
+# wrapped scripts. E.g. for a list named `mylist', you'd have:
+#
+# mylist-bounces -> bounces (-admin is a deprecated alias)
+# mylist-confirm -> confirm
+# mylist-join -> join (-subscribe is an alias)
+# mylist-leave -> leave (-unsubscribe is an alias)
+# mylist-owner -> owner
+# mylist -> post
+# mylist-request -> request
+#
+# -request, -join, and -leave are a robot addresses; their sole purpose is to
+# process emailed commands in a Majordomo-like fashion (although the latter
+# two are hardcoded to subscription and unsubscription requests). -bounces is
+# the automated bounce processor, and all messages to list members have their
+# return address set to -bounces. If the bounce processor fails to extract a
+# bouncing member address, it can optionally forward the message on to the
+# list owners.
+#
+# -owner is for reaching a human operator with minimal list interaction
+# (i.e. no bounce processing). -confirm is another robot address which
+# processes replies to VERP-like confirmation notices.
+#
+# So delivery flow of messages look like this:
+#
+# joerandom ---> mylist ---> list members
+# | |
+# | |[bounces]
+# | mylist-bounces <---+ <-------------------------------+
+# | | |
+# | +--->[internal bounce processing] |
+# | ^ | |
+# | | | [bounce found] |
+# | [bounces *] +--->[register and discard] |
+# | | | | |
+# | | | |[*] |
+# | [list owners] |[no bounce found] | |
+# | ^ | | |
+# | | | | |
+# +-------> mylist-owner <--------+ | |
+# | | |
+# | data/owner-bounces.mbox <--[site list] <---+ |
+# | |
+# +-------> mylist-join--+ |
+# | | |
+# +------> mylist-leave--+ |
+# | | |
+# | v |
+# +-------> mylist-request |
+# | | |
+# | +---> [command processor] |
+# | | |
+# +-----> mylist-confirm ----> +---> joerandom |
+# | |
+# |[bounces] |
+# +----------------------+
+#
+# A person can send an email to the list address (for posting), the -owner
+# address (to reach the human operator), or the -confirm, -join, -leave, and
+# -request mailbots. Message to the list address are then forwarded on to the
+# list membership, with bounces directed to the -bounces address.
+#
+# [*] Messages sent to the -owner address are forwarded on to the list
+# owner/moderators. All -owner destined messages have their bounces directed
+# to the site list -bounces address, regardless of whether a human sent the
+# message or the message was crafted internally. The intention here is that
+# the site owners want to be notified when one of their list owners' addresses
+# starts bouncing (yes, the will be automated in a future release).
+#
+# Any messages to site owners has their bounces directed to a special
+# "loop-killer" address, which just dumps the message into
+# data/owners-bounces.mbox.
+#
+# Finally, message to any of the mailbots causes the requested action to be
+# performed. Results notifications are sent to the author of the message,
+# which all bounces pointing back to the -bounces address.
+
+
+import sys
+import os
+from cStringIO import StringIO
+
+from Mailman import mm_cfg
+from Mailman import Errors
+from Mailman import LockFile
+from Mailman.Queue.Runner import Runner
+from Mailman.Logging.Syslog import syslog
+
+
+
+class IncomingRunner(Runner):
+ QDIR = mm_cfg.INQUEUE_DIR
+
+ def _dispose(self, mlist, msg, msgdata):
+ # Try to get the list lock.
+ try:
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ # Oh well, try again later
+ return 1
+ # Process the message through a handler pipeline. The handler
+ # pipeline can actually come from one of three places: the message
+ # metadata, the mlist, or the global pipeline.
+ #
+ # If a message was requeued due to an uncaught exception, its metadata
+ # will contain the retry pipeline. Use this above all else.
+ # Otherwise, if the mlist has a `pipeline' attribute, it should be
+ # used. Final fallback is the global pipeline.
+ try:
+ pipeline = self._get_pipeline(mlist, msg, msgdata)
+ status = self._dopipeline(mlist, msg, msgdata, pipeline)
+ if status:
+ msgdata['pipeline'] = pipeline
+ mlist.Save()
+ return status
+ finally:
+ mlist.Unlock()
+
+ # Overridable
+ def _get_pipeline(self, mlist, msg, msgdata):
+ # We must return a copy of the list, otherwise, the first message that
+ # flows through the pipeline will empty it out!
+ return msgdata.get('pipeline',
+ getattr(mlist, 'pipeline',
+ mm_cfg.GLOBAL_PIPELINE))[:]
+
+ def _dopipeline(self, mlist, msg, msgdata, pipeline):
+ while pipeline:
+ handler = pipeline.pop(0)
+ modname = 'Mailman.Handlers.' + handler
+ __import__(modname)
+ try:
+ pid = os.getpid()
+ sys.modules[modname].process(mlist, msg, msgdata)
+ # Failsafe -- a child may have leaked through.
+ if pid <> os.getpid():
+ syslog('error', 'child process leaked thru: %s', modname)
+ os._exit(1)
+ except Errors.DiscardMessage:
+ # Throw the message away; we need do nothing else with it.
+ syslog('vette', 'Message discarded, msgid: %s',
+ msg.get('message-id', 'n/a'))
+ return 0
+ except Errors.HoldMessage:
+ # Let the approval process take it from here. The message no
+ # longer needs to be queued.
+ return 0
+ except Errors.RejectMessage, e:
+ mlist.BounceMessage(msg, msgdata, e)
+ return 0
+ # We've successfully completed handling of this message
+ return 0
diff --git a/Mailman/Queue/MaildirRunner.py b/Mailman/Queue/MaildirRunner.py
new file mode 100644
index 00000000..e14ab339
--- /dev/null
+++ b/Mailman/Queue/MaildirRunner.py
@@ -0,0 +1,184 @@
+# Copyright (C) 2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Maildir pre-queue runner.
+
+Most MTAs can be configured to deliver messages to a `Maildir'[1]. This
+runner will read messages from a maildir's new/ directory and inject them into
+Mailman's qfiles/in directory for processing in the normal pipeline. This
+delivery mechanism contrasts with mail program delivery, where incoming
+messages end up in qfiles/in via the MTA executing the scripts/post script
+(and likewise for the other -aliases for each mailing list).
+
+The advantage to Maildir delivery is that it is more efficient; there's no
+need to fork an intervening program just to take the message from the MTA's
+standard output, to the qfiles/in directory.
+
+[1] http://cr.yp.to/proto/maildir.html
+
+We're going to use the :info flag == 1, experimental status flag for our own
+purposes. The :1 can be followed by one of these letters:
+
+- P means that MaildirRunner's in the process of parsing and enqueuing the
+ message. If successful, it will delete the file.
+
+- X means something failed during the parse/enqueue phase. An error message
+ will be logged to log/error and the file will be renamed <filename>:1,X.
+ MaildirRunner will never automatically return to this file, but once the
+ problem is fixed, you can manually move the file back to the new/ directory
+ and MaildirRunner will attempt to re-process it. At some point we may do
+ this automatically.
+
+See the variable USE_MAILDIR in Defaults.py.in for enabling this delivery
+mechanism.
+"""
+
+# NOTE: Maildir delivery is experimental in Mailman 2.1.
+
+import os
+import re
+import errno
+
+from email.Parser import Parser
+from email.Utils import parseaddr
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman.Message import Message
+from Mailman.Queue.Runner import Runner
+from Mailman.Queue.sbcache import get_switchboard
+from Mailman.Logging.Syslog import syslog
+
+# We only care about the listname and the subq as in listname@ or
+# listname-request@
+lre = re.compile(r"""
+ ^ # start of string
+ (?P<listname>[^-@]+) # listname@ or listname-subq@
+ (?: # non-grouping
+ - # dash separator
+ (?P<subq>[^-+@]+) # everything up to + or - or @
+ )? # if it exists
+ """, re.VERBOSE | re.IGNORECASE)
+
+
+
+class MaildirRunner(Runner):
+ # This class is much different than most runners because it pulls files
+ # of a different format than what scripts/post and friends leaves. The
+ # files this runner reads are just single message files as dropped into
+ # the directory by the MTA. This runner will read the file, and enqueue
+ # it in the expected qfiles directory for normal processing.
+ def __init__(self, slice=None, numslices=1):
+ # Don't call the base class constructor, but build enough of the
+ # underlying attributes to use the base class's implementation.
+ self._stop = 0
+ self._dir = os.path.join(mm_cfg.MAILDIR_DIR, 'new')
+ self._cur = os.path.join(mm_cfg.MAILDIR_DIR, 'cur')
+ self._parser = Parser(Message)
+
+ def _oneloop(self):
+ # Refresh this each time through the list. BAW: could be too
+ # expensive.
+ listnames = Utils.list_names()
+ # Cruise through all the files currently in the new/ directory
+ try:
+ files = os.listdir(self._dir)
+ except OSError, e:
+ if e.errno <> errno.ENOENT: raise
+ # Nothing's been delivered yet
+ return 0
+ for file in files:
+ srcname = os.path.join(self._dir, file)
+ dstname = os.path.join(self._cur, file + ':1,P')
+ xdstname = os.path.join(self._cur, file + ':1,X')
+ try:
+ os.rename(srcname, dstname)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ # Some other MaildirRunner beat us to it
+ continue
+ syslog('error', 'Could not rename maildir file: %s', srcname)
+ raise
+ # Now open, read, parse, and enqueue this message
+ try:
+ fp = open(dstname)
+ try:
+ msg = self._parser.parse(fp)
+ finally:
+ fp.close()
+ # Now we need to figure out which queue of which list this
+ # message was destined for. See verp_bounce() in
+ # BounceRunner.py for why we do things this way.
+ vals = []
+ for header in ('delivered-to', 'envelope-to', 'apparently-to'):
+ vals.extend(msg.get_all(header, []))
+ for field in vals:
+ to = parseaddr(field)[1]
+ if not to:
+ continue
+ mo = lre.match(to)
+ if not mo:
+ # This isn't an address we care about
+ continue
+ listname, subq = mo.group('listname', 'subq')
+ if listname in listnames:
+ break
+ else:
+ # As far as we can tell, this message isn't destined for
+ # any list on the system. What to do?
+ syslog('error', 'Message apparently not for any list: %s',
+ xdstname)
+ os.rename(dstname, xdstname)
+ continue
+ # BAW: blech, hardcoded
+ msgdata = {'listname': listname}
+ # -admin is deprecated
+ if subq in ('bounces', 'admin'):
+ queue = get_switchboard(mm_cfg.BOUNCEQUEUE_DIR)
+ elif subq == 'confirm':
+ msgdata['toconfirm'] = 1
+ queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
+ elif subq in ('join', 'subscribe'):
+ msgdata['tojoin'] = 1
+ queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
+ elif subq in ('leave', 'unsubscribe'):
+ msgdata['toleave'] = 1
+ queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
+ elif subq == 'owner':
+ msgdata.update({
+ 'toowner': 1,
+ 'envsender': Utils.get_site_email(extra='bounces'),
+ 'pipeline': mm_cfg.OWNER_PIPELINE,
+ })
+ queue = get_switchboard(mm_cfg.INQUEUE_DIR)
+ elif subq is None:
+ msgdata['tolist'] = 1
+ queue = get_switchboard(mm_cfg.INQUEUE_DIR)
+ elif subq == 'request':
+ msgdata['torequest'] = 1
+ queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
+ else:
+ syslog('error', 'Unknown sub-queue: %s', subq)
+ os.rename(dstname, xdstname)
+ continue
+ queue.enqueue(msg, msgdata)
+ os.unlink(dstname)
+ except Exception, e:
+ os.rename(dstname, xdstname)
+ syslog('error', str(e))
+
+ def _cleanup(self):
+ pass
diff --git a/Mailman/Queue/Makefile.in b/Mailman/Queue/Makefile.in
new file mode 100644
index 00000000..a92ae67d
--- /dev/null
+++ b/Mailman/Queue/Makefile.in
@@ -0,0 +1,69 @@
+# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+# NOTE: Makefile.in is converted into Makefile by the configure script
+# in the parent directory. Once configure has run, you can recreate
+# the Makefile by running just config.status.
+
+# Variables set by configure
+
+VPATH= @srcdir@
+srcdir= @srcdir@
+bindir= @bindir@
+prefix= @prefix@
+exec_prefix= @exec_prefix@
+
+CC= @CC@
+CHMOD= @CHMOD@
+INSTALL= @INSTALL@
+
+DEFS= @DEFS@
+
+# Customizable but not set by configure
+
+OPT= @OPT@
+CFLAGS= $(OPT) $(DEFS)
+PACKAGEDIR= $(prefix)/Mailman/Queue
+SHELL= /bin/sh
+
+MODULES= *.py
+
+# Modes for directories and executables created by the install
+# process. Default to group-writable directories but
+# user-only-writable for executables.
+DIRMODE= 775
+EXEMODE= 755
+FILEMODE= 644
+INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE)
+
+
+# Rules
+
+all:
+
+install:
+ for f in $(MODULES); \
+ do \
+ $(INSTALL) -m $(FILEMODE) $(srcdir)/$$f $(PACKAGEDIR); \
+ done
+
+finish:
+
+clean:
+
+distclean:
+ -rm *.pyc
+ -rm Makefile
diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/Queue/NewsRunner.py
new file mode 100644
index 00000000..0439f0e1
--- /dev/null
+++ b/Mailman/Queue/NewsRunner.py
@@ -0,0 +1,158 @@
+# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""NNTP queue runner."""
+
+import re
+import socket
+import nntplib
+from cStringIO import StringIO
+
+import email
+from email.Utils import getaddresses
+
+COMMASPACE = ', '
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman.Queue.Runner import Runner
+from Mailman.Logging.Syslog import syslog
+
+
+# Matches our Mailman crafted Message-IDs. See Utils.unique_message_id()
+mcre = re.compile(r"""
+ <mailman. # match the prefix
+ \d+. # serial number
+ \d+. # time in seconds since epoch
+ \d+. # pid
+ (?P<listname>[^@]+) # list's internal_name()
+ @ # localpart@dom.ain
+ (?P<hostname>[^>]+) # list's host_name
+ > # trailer
+ """, re.VERBOSE)
+
+
+
+class NewsRunner(Runner):
+ QDIR = mm_cfg.NEWSQUEUE_DIR
+
+ def _dispose(self, mlist, msg, msgdata):
+ # Make sure we have the most up-to-date state
+ mlist.Load()
+ if not msgdata.get('prepped'):
+ prepare_message(mlist, msg, msgdata)
+ try:
+ # Flatten the message object, sticking it in a StringIO object
+ fp = StringIO(msg.as_string())
+ conn = None
+ try:
+ try:
+ conn = nntplib.NNTP(mlist.nntp_host, readermode=1,
+ user=mm_cfg.NNTP_USERNAME,
+ password=mm_cfg.NNTP_PASSWORD)
+ conn.post(fp)
+ except nntplib.error_temp, e:
+ syslog('error',
+ '(NNTPDirect) NNTP error for list "%s": %s',
+ mlist.internal_name(), e)
+ except socket.error, e:
+ syslog('error',
+ '(NNTPDirect) socket error for list "%s": %s',
+ mlist.internal_name(), e)
+ finally:
+ if conn:
+ conn.quit()
+ except Exception, e:
+ # Some other exception occurred, which we definitely did not
+ # expect, so set this message up for requeuing.
+ self._log(e)
+ return 1
+ return 0
+
+
+
+def prepare_message(mlist, msg, msgdata):
+ # If the newsgroup is moderated, we need to add this header for the Usenet
+ # software to accept the posting, and not forward it on to the n.g.'s
+ # moderation address. The posting would not have gotten here if it hadn't
+ # already been approved. 1 == open list, mod n.g., 2 == moderated
+ if mlist.news_moderation in (1, 2):
+ del msg['approved']
+ msg['Approved'] = mlist.GetListEmail()
+ # Should we restore the original, non-prefixed subject for gatewayed
+ # messages?
+ origsubj = msgdata.get('origsubj')
+ if not mlist.news_prefix_subject_too and origsubj is not None:
+ del msg['subject']
+ msg['subject'] = origsubj
+ # Add the appropriate Newsgroups: header
+ ngheader = msg['newsgroups']
+ if ngheader is not None:
+ # See if the Newsgroups: header already contains our linked_newsgroup.
+ # If so, don't add it again. If not, append our linked_newsgroup to
+ # the end of the header list
+ ngroups = [s.strip() for s in ngheader.split(',')]
+ if mlist.linked_newsgroup not in ngroups:
+ ngroups.append(mlist.linked_newsgroup)
+ # Subtitute our new header for the old one.
+ del msg['newsgroups']
+ msg['Newsgroups'] = COMMASPACE.join(ngroups)
+ else:
+ # Newsgroups: isn't in the message
+ msg['Newsgroups'] = mlist.linked_newsgroup
+ # Note: We need to be sure two messages aren't ever sent to the same list
+ # in the same process, since message ids need to be unique. Further, if
+ # messages are crossposted to two Usenet-gated mailing lists, they each
+ # need to have unique message ids or the nntpd will only accept one of
+ # them. The solution here is to substitute any existing message-id that
+ # isn't ours with one of ours, so we need to parse it to be sure we're not
+ # looping.
+ #
+ # Our Message-ID format is <mailman.secs.pid.listname@hostname>
+ msgid = msg['message-id']
+ hackmsgid = 1
+ if msgid:
+ mo = mcre.search(msgid)
+ if mo:
+ lname, hname = mo.group('listname', 'hostname')
+ if lname == mlist.internal_name() and hname == mlist.host_name:
+ hackmsgid = 0
+ if hackmsgid:
+ del msg['message-id']
+ msg['Message-ID'] = Utils.unique_message_id(mlist)
+ # Lines: is useful
+ if msg['Lines'] is None:
+ # BAW: is there a better way?
+ count = len(list(email.Iterators.body_line_iterator(msg)))
+ msg['Lines'] = str(count)
+ # Massage the message headers by remove some and rewriting others. This
+ # woon't completely sanitize the message, but it will eliminate the bulk
+ # of the rejections based on message headers. The NNTP server may still
+ # reject the message because of other problems.
+ for header in mm_cfg.NNTP_REMOVE_HEADERS:
+ del msg[header]
+ for header, rewrite in mm_cfg.NNTP_REWRITE_DUPLICATE_HEADERS:
+ values = msg.get_all(header, [])
+ if len(values) < 2:
+ # We only care about duplicates
+ continue
+ del msg[header]
+ # But keep the first one...
+ msg[header] = values[0]
+ for v in values[1:]:
+ msg[rewrite] = v
+ # Mark this message as prepared in case it has to be requeued
+ msgdata['prepped'] = 1
diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py
new file mode 100644
index 00000000..aed8dcb9
--- /dev/null
+++ b/Mailman/Queue/OutgoingRunner.py
@@ -0,0 +1,139 @@
+# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Outgoing queue runner."""
+
+import sys
+import os
+import time
+import socket
+
+import email
+
+from Mailman import mm_cfg
+from Mailman import Message
+from Mailman import Errors
+from Mailman import LockFile
+from Mailman.Queue.Runner import Runner
+from Mailman.Logging.Syslog import syslog
+
+# This controls how often _doperiodic() will try to deal with deferred
+# permanent failures.
+DEAL_WITH_PERMFAILURES_EVERY = 1
+
+
+
+class OutgoingRunner(Runner):
+ QDIR = mm_cfg.OUTQUEUE_DIR
+
+ def __init__(self, slice=None, numslices=1):
+ Runner.__init__(self, slice, numslices)
+ # Maps mailing lists to (recip, msg) tuples
+ self._permfailures = {}
+ self._permfail_counter = 0
+ # We look this function up only at startup time
+ modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
+ mod = __import__(modname)
+ self._func = getattr(sys.modules[modname], 'process')
+ # This prevents smtp server connection problems from filling up the
+ # error log. It gets reset if the message was successfully sent, and
+ # set if there was a socket.error.
+ self.__logged = 0
+
+ def _dispose(self, mlist, msg, msgdata):
+ # Make sure we have the most up-to-date state
+ mlist.Load()
+ try:
+ pid = os.getpid()
+ self._func(mlist, msg, msgdata)
+ # Failsafe -- a child may have leaked through.
+ if pid <> os.getpid():
+ syslog('error', 'child process leaked thru: %s', modname)
+ os._exit(1)
+ self.__logged = 0
+ except socket.error:
+ # There was a problem connecting to the SMTP server. Log this
+ # once, but crank up our sleep time so we don't fill the error
+ # log.
+ port = mm_cfg.SMTPPORT
+ if port == 0:
+ port = 'smtp'
+ # Log this just once.
+ if not self.__logged:
+ syslog('error', 'Cannot connect to SMTP server %s on port %s',
+ mm_cfg.SMTPHOST, port)
+ self.__logged = 1
+ return 1
+ except Errors.SomeRecipientsFailed, e:
+ # The delivery module being used (SMTPDirect or Sendmail) failed
+ # to deliver the message to one or all of the recipients.
+ # Permanent failures should be registered (but registration
+ # requires the list lock), and temporary failures should be
+ # retried later.
+ #
+ # For permanent failures, make a copy of the message for bounce
+ # handling. I'm not sure this is necessary, or the right thing to
+ # do.
+ pcnt = len(e.permfailures)
+ copy = email.message_from_string(str(msg))
+ self._permfailures.setdefault(mlist, []).extend(
+ zip(e.permfailures, [copy] * pcnt))
+ # Temporary failures
+ if not e.tempfailures:
+ # Don't need to keep the message queued if there were only
+ # permanent failures.
+ return 0
+ now = time.time()
+ recips = e.tempfailures
+ last_recip_count = msgdata.get('last_recip_count', 0)
+ deliver_until = msgdata.get('deliver_until', now)
+ if len(recips) == last_recip_count:
+ # We didn't make any progress, so don't attempt delivery any
+ # longer. BAW: is this the best disposition?
+ if now > deliver_until:
+ return 0
+ else:
+ # Keep trying to delivery this for 3 days
+ deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
+ msgdata['last_recip_count'] = len(recips)
+ msgdata['deliver_until'] = deliver_until
+ msgdata['recips'] = recips
+ # Requeue
+ return 1
+ # We've successfully completed handling of this message
+ return 0
+
+ def _doperiodic(self):
+ # Periodically try to acquire the list lock and clear out the
+ # permanent failures.
+ self._permfail_counter += 1
+ if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY:
+ return
+ # Reset the counter
+ self._permfail_counter = 0
+ # And deal with the deferred permanent failures.
+ for mlist in self._permfailures.keys():
+ try:
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ return
+ try:
+ for recip, msg in self._permfailures[mlist]:
+ mlist.registerBounce(recip, msg)
+ del self._permfailures[mlist]
+ mlist.Save()
+ finally:
+ mlist.Unlock()
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py
new file mode 100644
index 00000000..134dac99
--- /dev/null
+++ b/Mailman/Queue/Runner.py
@@ -0,0 +1,245 @@
+# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Generic queue runner class.
+"""
+
+import time
+import traceback
+import weakref
+from cStringIO import StringIO
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import Errors
+from Mailman import MailList
+from Mailman import i18n
+
+from Mailman.Queue.Switchboard import Switchboard
+from Mailman.Logging.Syslog import syslog
+
+
+
+class Runner:
+ QDIR = None
+ SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME
+
+ def __init__(self, slice=None, numslices=1):
+ self._kids = {}
+ # Create our own switchboard. Don't use the switchboard cache because
+ # we want to provide slice and numslice arguments.
+ self._switchboard = Switchboard(self.QDIR, slice, numslices)
+ # Create the shunt switchboard
+ self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
+ self._stop = 0
+
+ def stop(self):
+ self._stop = 1
+
+ def run(self):
+ # Start the main loop for this queue runner.
+ try:
+ try:
+ while 1:
+ # Once through the loop that processes all the files in
+ # the queue directory.
+ filecnt = self._oneloop()
+ # Do the periodic work for the subclass. BAW: this
+ # shouldn't be called here. There should be one more
+ # _doperiodic() call at the end of the _oneloop() loop.
+ self._doperiodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # If there were no files to process, then we'll simply
+ # sleep for a little while and expect some to show up.
+ if not filecnt:
+ self._snooze()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ # We've broken out of our main loop, so we want to reap all the
+ # subprocesses we've created and do any other necessary cleanups.
+ self._cleanup()
+
+ def _oneloop(self):
+ # First, list all the files in our queue directory.
+ # Switchboard.files() is guaranteed to hand us the files in FIFO
+ # order. Return an integer count of the number of files that were
+ # available for this qrunner to process. A non-zero value tells run()
+ # not to snooze for a while.
+ files = self._switchboard.files()
+ for filebase in files:
+ # Ask the switchboard for the message and metadata objects
+ # associated with this filebase.
+ msg, msgdata = self._switchboard.dequeue(filebase)
+ # It's possible one or both files got lost. If so, just ignore
+ # this filebase entry. dequeue() will automatically unlink the
+ # other file, but we should log an error message for diagnostics.
+ if msg is None or msgdata is None:
+ syslog('error', 'lost data files for filebase: %s', filebase)
+ else:
+ # Now that we've dequeued the message, we want to be
+ # incredibly anal about making sure that no uncaught exception
+ # could cause us to lose the message. All runners that
+ # implement _dispose() must guarantee that exceptions are
+ # caught and dealt with properly. Still, there may be a bug
+ # in the infrastructure, and we do not want those to cause
+ # messages to be lost. Any uncaught exceptions will cause the
+ # message to be stored in the shunt queue for human
+ # intervention.
+ try:
+ self._onefile(msg, msgdata)
+ except Exception, e:
+ self._log(e)
+ syslog('error', 'SHUNTING: %s', filebase)
+ # Put a marker in the metadata for unshunting
+ msgdata['whichq'] = self._switchboard.whichq()
+ self._shunt.enqueue(msg, msgdata)
+ # Other work we want to do each time through the loop
+ Utils.reap(self._kids, once=1)
+ self._doperiodic()
+ if self._shortcircuit():
+ break
+ return len(files)
+
+ def _onefile(self, msg, msgdata):
+ # Do some common sanity checking on the message metadata. It's got to
+ # be destined for a particular mailing list. This switchboard is used
+ # to shunt off badly formatted messages. We don't want to just trash
+ # them because they may be fixable with human intervention. Just get
+ # them out of our site though.
+ #
+ # Find out which mailing list this message is destined for.
+ listname = msgdata.get('listname')
+ if not listname:
+ listname = mm_cfg.MAILMAN_SITE_LIST
+ mlist = self._open_list(listname)
+ if not mlist:
+ syslog('error',
+ 'Dequeuing message destined for missing list: %s',
+ listname)
+ self._shunt.enqueue(msg, msgdata)
+ return
+ # Now process this message, keeping track of any subprocesses that may
+ # have been spawned. We'll reap those later.
+ #
+ # We also want to set up the language context for this message. The
+ # context will be the preferred language for the user if a member of
+ # the list, or the list's preferred language. However, we must take
+ # special care to reset the defaults, otherwise subsequent messages
+ # may be translated incorrectly. BAW: I'm not sure I like this
+ # approach, but I can't think of anything better right now.
+ otranslation = i18n.get_translation()
+ sender = msg.get_sender()
+ if mlist:
+ lang = mlist.getMemberLanguage(sender)
+ else:
+ lang = mm_cfg.DEFAULT_SERVER_LANGUAGE
+ i18n.set_language(lang)
+ msgdata['lang'] = lang
+ try:
+ keepqueued = self._dispose(mlist, msg, msgdata)
+ finally:
+ i18n.set_translation(otranslation)
+ # Keep tabs on any child processes that got spawned.
+ kids = msgdata.get('_kids')
+ if kids:
+ self._kids.update(kids)
+ if keepqueued:
+ self._switchboard.enqueue(msg, msgdata)
+
+ # Mapping of listnames to MailList instances as a weak value dictionary.
+ _listcache = weakref.WeakValueDictionary()
+
+ def _open_list(self, listname):
+ # Cache the open list so that any use of the list within this process
+ # uses the same object. We use a WeakValueDictionary so that when the
+ # list is no longer necessary, its memory is freed.
+ mlist = self._listcache.get(listname)
+ if not mlist:
+ try:
+ mlist = MailList.MailList(listname, lock=0)
+ except Errors.MMListError, e:
+ syslog('error', 'error opening list: %s\n%s', listname, e)
+ return None
+ else:
+ self._listcache[listname] = mlist
+ return mlist
+
+ def _log(self, exc):
+ syslog('error', 'Uncaught runner exception: %s', exc)
+ s = StringIO()
+ traceback.print_exc(file=s)
+ syslog('error', s.getvalue())
+
+ #
+ # Subclasses can override these methods.
+ #
+ def _cleanup(self):
+ """Clean up upon exit from the main processing loop.
+
+ Called when the Runner's main loop is stopped, this should perform
+ any necessary resource deallocation. Its return value is irrelevant.
+ """
+ Utils.reap(self._kids)
+
+ def _dispose(self, mlist, msg, msgdata):
+ """Dispose of a single message destined for a mailing list.
+
+ Called for each message that the Runner is responsible for, this is
+ the primary overridable method for processing each message.
+ Subclasses, must provide implementation for this method.
+
+ mlist is the MailList instance this message is destined for.
+
+ msg is the Message object representing the message.
+
+ msgdata is a dictionary of message metadata.
+ """
+ raise NotImplementedError
+
+ def _doperiodic(self):
+ """Do some processing `every once in a while'.
+
+ Called every once in a while both from the Runner's main loop, and
+ from the Runner's hash slice processing loop. You can do whatever
+ special periodic processing you want here, and the return value is
+ irrelevant.
+
+ """
+ pass
+
+ def _snooze(self):
+ """Sleep for a little while, because there was nothing to do.
+
+ This is called from the Runner's main loop, but only when the last
+ processing loop had no work to do (i.e. there were no messages in it's
+ little slice of hash space).
+ """
+ if self.SLEEPTIME <= 0:
+ return
+ time.sleep(self.SLEEPTIME)
+
+ def _shortcircuit(self):
+ """Return a true value if the individual file processing loop should
+ exit before it's finished processing each message in the current slice
+ of hash space. A false value tells _oneloop() to continue processing
+ until the current snapshot of hash space is exhausted.
+
+ You could, for example, implement a throttling algorithm here.
+ """
+ return self._stop
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
new file mode 100644
index 00000000..530055ad
--- /dev/null
+++ b/Mailman/Queue/Switchboard.py
@@ -0,0 +1,340 @@
+# Copyright (C) 2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Reading and writing message objects and message metadata.
+"""
+
+# enqueue() and dequeue() are not symmetric. enqueue() takes a Message
+# object. dequeue() returns a email.Message object tree.
+#
+# Message metadata is represented internally as a Python dictionary. Keys and
+# values must be strings. When written to a queue directory, the metadata is
+# written into an externally represented format, as defined here. Because
+# components of the Mailman system may be written in something other than
+# Python, the external interchange format should be chosen based on what those
+# other components can read and write.
+#
+# Most efficient, and recommended if everything is Python, is Python marshal
+# format. Also supported by default is Berkeley db format (using the default
+# bsddb module compiled into your Python executable -- usually Berkeley db
+# 2), and rfc822 style plain text. You can write your own if you have other
+# needs.
+
+import os
+import time
+import sha
+import marshal
+import errno
+import cPickle
+
+import email
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import Message
+from Mailman.Logging.Syslog import syslog
+
+# 20 bytes of all bits set, maximum sha.digest() value
+shamax = 0xffffffffffffffffffffffffffffffffffffffffL
+
+SAVE_MSGS_AS_PICKLES = 1
+
+
+
+class _Switchboard:
+ def __init__(self, whichq, slice=None, numslices=1):
+ self.__whichq = whichq
+ # Create the directory if it doesn't yet exist.
+ # FIXME
+ omask = os.umask(0) # rwxrws---
+ try:
+ try:
+ os.mkdir(self.__whichq, 0770)
+ except OSError, e:
+ if e.errno <> errno.EEXIST: raise
+ finally:
+ os.umask(omask)
+ # Fast track for no slices
+ self.__lower = None
+ self.__upper = None
+ # BAW: test performance and end-cases of this algorithm
+ if numslices <> 1:
+ self.__lower = (shamax * slice) / numslices
+ self.__upper = (shamax * (slice+1)) / numslices
+
+ def whichq(self):
+ return self.__whichq
+
+ def enqueue(self, _msg, _metadata={}, **_kws):
+ # Calculate the SHA hexdigest of the message to get a unique base
+ # filename. We're also going to use the digest as a hash into the set
+ # of parallel qrunner processes.
+ data = _metadata.copy()
+ data.update(_kws)
+ listname = data.get('listname', '--nolist--')
+ # Get some data for the input to the sha hash
+ now = time.time()
+ if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
+ msgsave = cPickle.dumps(_msg, 1)
+ ext = '.pck'
+ else:
+ msgsave = str(_msg)
+ ext = '.msg'
+ hashfood = msgsave + listname + `now`
+ # Encode the current time into the file name for FIFO sorting in
+ # files(). The file name consists of two parts separated by a `+':
+ # the received time for this message (i.e. when it first showed up on
+ # this system) and the sha hex digest.
+ #rcvtime = data.setdefault('received_time', now)
+ rcvtime = data.setdefault('received_time', now)
+ filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
+ # Figure out which queue files the message is to be written to.
+ msgfile = os.path.join(self.__whichq, filebase + ext)
+ dbfile = os.path.join(self.__whichq, filebase + '.db')
+ # Always add the metadata schema version number
+ data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
+ # Filter out volatile entries
+ for k in data.keys():
+ if k[0] == '_':
+ del data[k]
+ # Now write the message text to one file and the metadata to another
+ # file. The metadata is always written second to avoid race
+ # conditions with the various queue runners (which key off of the .db
+ # filename).
+ omask = os.umask(007) # -rw-rw----
+ try:
+ msgfp = open(msgfile, 'w')
+ finally:
+ os.umask(omask)
+ msgfp.write(msgsave)
+ msgfp.close()
+ # Now write the metadata using the appropriate external metadata
+ # format. We play rename-switcheroo here to further plug the race
+ # condition holes.
+ tmpfile = dbfile + '.tmp'
+ self._ext_write(tmpfile, data)
+ os.rename(tmpfile, dbfile)
+
+ def dequeue(self, filebase):
+ # Calculate the .db and .msg filenames from the given filebase.
+ msgfile = os.path.join(self.__whichq, filebase + '.msg')
+ pckfile = os.path.join(self.__whichq, filebase + '.pck')
+ dbfile = os.path.join(self.__whichq, filebase + '.db')
+ # Now we are going to read the message and metadata for the given
+ # filebase. We want to read things in this order: first, the metadata
+ # file to find out whether the message is stored as a pickle or as
+ # plain text. Second, the actual message file. However, we want to
+ # first unlink the message file and then the .db file, because the
+ # qrunner only cues off of the .db file
+ msg = data = None
+ try:
+ data = self._ext_read(dbfile)
+ os.unlink(dbfile)
+ except EnvironmentError, e:
+ if e.errno <> errno.ENOENT: raise
+ # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata
+ # was renamed to `rejection_notice', since dashes in the keys are not
+ # supported in METAFMT_ASCII.
+ if data.has_key('rejection-notice'):
+ data['rejection_notice'] = data['rejection-notice']
+ del data['rejection-notice']
+ msgfp = None
+ try:
+ try:
+ msgfp = open(pckfile)
+ msg = cPickle.load(msgfp)
+ os.unlink(pckfile)
+ except EnvironmentError, e:
+ if e.errno <> errno.ENOENT: raise
+ msgfp = None
+ try:
+ msgfp = open(msgfile)
+ msg = email.message_from_file(msgfp, Message.Message)
+ os.unlink(msgfile)
+ except EnvironmentError, e:
+ if e.errno <> errno.ENOENT: raise
+ except email.Errors.MessageParseError, e:
+ # This message was unparsable, most likely because its
+ # MIME encapsulation was broken. For now, there's not
+ # much we can do about it.
+ syslog('error', 'message is unparsable: %s', filebase)
+ msgfp.close()
+ msgfp = None
+ if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
+ # Cheapo way to ensure the directory exists w/ the
+ # proper permissions.
+ sb = Switchboard(mm_cfg.BADQUEUE_DIR)
+ os.rename(msgfile, os.path.join(
+ mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
+ else:
+ os.unlink(msgfile)
+ msg = data = None
+ finally:
+ if msgfp:
+ msgfp.close()
+ return msg, data
+
+ def files(self):
+ times = {}
+ lower = self.__lower
+ upper = self.__upper
+ for f in os.listdir(self.__whichq):
+ # We only care about the file's base name (i.e. no extension).
+ # Thus we'll ignore anything that doesn't end in .db.
+ if not f.endswith('.db'):
+ continue
+ filebase = os.path.splitext(f)[0]
+ when, digest = filebase.split('+')
+ # Throw out any files which don't match our bitrange. BAW: test
+ # performance and end-cases of this algorithm.
+ if not lower or (lower <= long(digest, 16) < upper):
+ times[float(when)] = filebase
+ # FIFO sort
+ keys = times.keys()
+ keys.sort()
+ return [times[k] for k in keys]
+
+ def _ext_write(self, tmpfile, data):
+ raise NotImplementedError
+
+ def _ext_read(self, dbfile):
+ raise NotImplementedError
+
+
+
+class MarshalSwitchboard(_Switchboard):
+ """Python marshal format."""
+ FLOAT_ATTRIBUTES = ['received_time']
+
+ def _ext_write(self, filename, dict):
+ omask = os.umask(007) # -rw-rw----
+ try:
+ fp = open(filename, 'w')
+ finally:
+ os.umask(omask)
+ # Python's marshal, up to and including in Python 2.1, has a bug where
+ # the full precision of floats was not stored. We work around this
+ # bug by hardcoding a list of float values we know about, repr()-izing
+ # them ourselves, and doing the reverse conversion on _ext_read().
+ for attr in self.FLOAT_ATTRIBUTES:
+ # We use try/except because we expect a hitrate of nearly 100%
+ try:
+ fval = dict[attr]
+ except KeyError:
+ pass
+ else:
+ dict[attr] = repr(fval)
+ marshal.dump(dict, fp)
+ fp.close()
+
+ def _ext_read(self, filename):
+ fp = open(filename)
+ dict = marshal.load(fp)
+ # Update from version 2 files
+ if dict.get('version', 0) == 2:
+ del dict['filebase']
+ # Do the reverse conversion (repr -> float)
+ for attr in self.FLOAT_ATTRIBUTES:
+ try:
+ sval = dict[attr]
+ except KeyError:
+ pass
+ else:
+ # Do a safe eval by setting up a restricted execution
+ # environment. This may not be strictly necessary since we
+ # know they are floats, but it can't hurt.
+ dict[attr] = eval(sval, {'__builtins__': {}})
+ fp.close()
+ return dict
+
+
+
+class BSDDBSwitchboard(_Switchboard):
+ """Native (i.e. compiled-in) Berkeley db format."""
+ def _ext_write(self, filename, dict):
+ import bsddb
+ omask = os.umask(0)
+ try:
+ hashfile = bsddb.hashopen(filename, 'n', 0660)
+ finally:
+ os.umask(omask)
+ # values must be strings
+ for k, v in dict.items():
+ hashfile[k] = marshal.dumps(v)
+ hashfile.sync()
+ hashfile.close()
+
+ def _ext_read(self, filename):
+ import bsddb
+ dict = {}
+ hashfile = bsddb.hashopen(filename, 'r')
+ for k in hashfile.keys():
+ dict[k] = marshal.loads(hashfile[k])
+ hashfile.close()
+ return dict
+
+
+
+class ASCIISwitchboard(_Switchboard):
+ """Human readable .db file format.
+
+ key/value pairs are written as
+
+ key = value
+
+ as real Python code which can be execfile'd.
+ """
+
+ def _ext_write(self, filename, dict):
+ omask = os.umask(007) # -rw-rw----
+ try:
+ fp = open(filename, 'w')
+ finally:
+ os.umask(omask)
+ for k, v in dict.items():
+ print >> fp, '%s = %s' % (k, repr(v))
+ fp.close()
+
+ def _ext_read(self, filename):
+ dict = {'__builtins__': {}}
+ execfile(filename, dict)
+ del dict['__builtins__']
+ return dict
+
+
+
+# Here are the various types of external file formats available. The format
+# chosen is given defined in the mm_cfg.py configuration file.
+if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
+ Switchboard = MarshalSwitchboard
+elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
+ Switchboard = BSDDBSwitchboard
+elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
+ Switchboard = ASCIISwitchboard
+else:
+ syslog('error', 'Undefined metadata format: %d (using marshals)',
+ mm_cfg.METADATA_FORMAT)
+ Switchboard = MarshalSwitchboard
+
+
+
+# For bin/dumpdb
+class DumperSwitchboard(Switchboard):
+ def __init__(self):
+ pass
+
+ def read(self, filename):
+ return self._ext_read(filename)
diff --git a/Mailman/Queue/VirginRunner.py b/Mailman/Queue/VirginRunner.py
new file mode 100644
index 00000000..720ecd25
--- /dev/null
+++ b/Mailman/Queue/VirginRunner.py
@@ -0,0 +1,43 @@
+# Copyright (C) 1998,1999,2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Virgin message queue runner.
+
+This qrunner handles messages that the Mailman system gives virgin birth to.
+E.g. acknowledgement responses to user posts or Replybot messages. They need
+to go through some minimal processing before they can be sent out to the
+recipient.
+"""
+
+from Mailman import mm_cfg
+from Mailman.Queue.Runner import Runner
+from Mailman.Queue.IncomingRunner import IncomingRunner
+
+
+
+class VirginRunner(IncomingRunner):
+ QDIR = mm_cfg.VIRGINQUEUE_DIR
+
+ def _dispose(self, mlist, msg, msgdata):
+ # We need to fasttrack this message through any handlers that touch
+ # it. E.g. especially CookHeaders.
+ msgdata['_fasttrack'] = 1
+ return IncomingRunner._dispose(self, mlist, msg, msgdata)
+
+ def _get_pipeline(self, mlist, msg, msgdata):
+ # It's okay to hardcode this, since it'll be the same for all
+ # internally crafted messages.
+ return ['CookHeaders', 'ToOutgoing']
diff --git a/Mailman/Queue/__init__.py b/Mailman/Queue/__init__.py
new file mode 100644
index 00000000..cdf93257
--- /dev/null
+++ b/Mailman/Queue/__init__.py
@@ -0,0 +1,15 @@
+# Copyright (C) 2000,2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
diff --git a/Mailman/Queue/sbcache.py b/Mailman/Queue/sbcache.py
new file mode 100644
index 00000000..9b918fc5
--- /dev/null
+++ b/Mailman/Queue/sbcache.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""A factory of Switchboards with caching."""
+
+from Mailman.Queue.Switchboard import Switchboard
+
+# a mapping from queue directory to Switchboard instance
+_sbcache = {}
+
+def get_switchboard(qdir):
+ switchboard = _sbcache.setdefault(qdir, Switchboard(qdir))
+ return switchboard