aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman/Queue/Switchboard.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--Mailman/Queue/Switchboard.py340
1 files changed, 340 insertions, 0 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
new file mode 100644
index 00000000..530055ad
--- /dev/null
+++ b/Mailman/Queue/Switchboard.py
@@ -0,0 +1,340 @@
+# Copyright (C) 2001,2002 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Reading and writing message objects and message metadata.
+"""
+
+# enqueue() and dequeue() are not symmetric. enqueue() takes a Message
+# object. dequeue() returns a email.Message object tree.
+#
+# Message metadata is represented internally as a Python dictionary. Keys and
+# values must be strings. When written to a queue directory, the metadata is
+# written into an externally represented format, as defined here. Because
+# components of the Mailman system may be written in something other than
+# Python, the external interchange format should be chosen based on what those
+# other components can read and write.
+#
+# Most efficient, and recommended if everything is Python, is Python marshal
+# format. Also supported by default is Berkeley db format (using the default
+# bsddb module compiled into your Python executable -- usually Berkeley db
+# 2), and rfc822 style plain text. You can write your own if you have other
+# needs.
+
+import os
+import time
+import sha
+import marshal
+import errno
+import cPickle
+
+import email
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import Message
+from Mailman.Logging.Syslog import syslog
+
+# 20 bytes of all bits set, maximum sha.digest() value
+shamax = 0xffffffffffffffffffffffffffffffffffffffffL
+
+SAVE_MSGS_AS_PICKLES = 1
+
+
+
+class _Switchboard:
+ def __init__(self, whichq, slice=None, numslices=1):
+ self.__whichq = whichq
+ # Create the directory if it doesn't yet exist.
+ # FIXME
+ omask = os.umask(0) # rwxrws---
+ try:
+ try:
+ os.mkdir(self.__whichq, 0770)
+ except OSError, e:
+ if e.errno <> errno.EEXIST: raise
+ finally:
+ os.umask(omask)
+ # Fast track for no slices
+ self.__lower = None
+ self.__upper = None
+ # BAW: test performance and end-cases of this algorithm
+ if numslices <> 1:
+ self.__lower = (shamax * slice) / numslices
+ self.__upper = (shamax * (slice+1)) / numslices
+
+ def whichq(self):
+ return self.__whichq
+
+ def enqueue(self, _msg, _metadata={}, **_kws):
+ # Calculate the SHA hexdigest of the message to get a unique base
+ # filename. We're also going to use the digest as a hash into the set
+ # of parallel qrunner processes.
+ data = _metadata.copy()
+ data.update(_kws)
+ listname = data.get('listname', '--nolist--')
+ # Get some data for the input to the sha hash
+ now = time.time()
+ if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
+ msgsave = cPickle.dumps(_msg, 1)
+ ext = '.pck'
+ else:
+ msgsave = str(_msg)
+ ext = '.msg'
+ hashfood = msgsave + listname + `now`
+ # Encode the current time into the file name for FIFO sorting in
+ # files(). The file name consists of two parts separated by a `+':
+ # the received time for this message (i.e. when it first showed up on
+ # this system) and the sha hex digest.
+ #rcvtime = data.setdefault('received_time', now)
+ rcvtime = data.setdefault('received_time', now)
+ filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
+ # Figure out which queue files the message is to be written to.
+ msgfile = os.path.join(self.__whichq, filebase + ext)
+ dbfile = os.path.join(self.__whichq, filebase + '.db')
+ # Always add the metadata schema version number
+ data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
+ # Filter out volatile entries
+ for k in data.keys():
+ if k[0] == '_':
+ del data[k]
+ # Now write the message text to one file and the metadata to another
+ # file. The metadata is always written second to avoid race
+ # conditions with the various queue runners (which key off of the .db
+ # filename).
+ omask = os.umask(007) # -rw-rw----
+ try:
+ msgfp = open(msgfile, 'w')
+ finally:
+ os.umask(omask)
+ msgfp.write(msgsave)
+ msgfp.close()
+ # Now write the metadata using the appropriate external metadata
+ # format. We play rename-switcheroo here to further plug the race
+ # condition holes.
+ tmpfile = dbfile + '.tmp'
+ self._ext_write(tmpfile, data)
+ os.rename(tmpfile, dbfile)
+
+ def dequeue(self, filebase):
+ # Calculate the .db and .msg filenames from the given filebase.
+ msgfile = os.path.join(self.__whichq, filebase + '.msg')
+ pckfile = os.path.join(self.__whichq, filebase + '.pck')
+ dbfile = os.path.join(self.__whichq, filebase + '.db')
+ # Now we are going to read the message and metadata for the given
+ # filebase. We want to read things in this order: first, the metadata
+ # file to find out whether the message is stored as a pickle or as
+ # plain text. Second, the actual message file. However, we want to
+ # first unlink the message file and then the .db file, because the
+ # qrunner only cues off of the .db file
+ msg = data = None
+ try:
+ data = self._ext_read(dbfile)
+ os.unlink(dbfile)
+ except EnvironmentError, e:
+ if e.errno <> errno.ENOENT: raise
+ # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata
+ # was renamed to `rejection_notice', since dashes in the keys are not
+ # supported in METAFMT_ASCII.
+ if data.has_key('rejection-notice'):
+ data['rejection_notice'] = data['rejection-notice']
+ del data['rejection-notice']
+ msgfp = None
+ try:
+ try:
+ msgfp = open(pckfile)
+ msg = cPickle.load(msgfp)
+ os.unlink(pckfile)
+ except EnvironmentError, e:
+ if e.errno <> errno.ENOENT: raise
+ msgfp = None
+ try:
+ msgfp = open(msgfile)
+ msg = email.message_from_file(msgfp, Message.Message)
+ os.unlink(msgfile)
+ except EnvironmentError, e:
+ if e.errno <> errno.ENOENT: raise
+ except email.Errors.MessageParseError, e:
+ # This message was unparsable, most likely because its
+ # MIME encapsulation was broken. For now, there's not
+ # much we can do about it.
+ syslog('error', 'message is unparsable: %s', filebase)
+ msgfp.close()
+ msgfp = None
+ if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
+ # Cheapo way to ensure the directory exists w/ the
+ # proper permissions.
+ sb = Switchboard(mm_cfg.BADQUEUE_DIR)
+ os.rename(msgfile, os.path.join(
+ mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
+ else:
+ os.unlink(msgfile)
+ msg = data = None
+ finally:
+ if msgfp:
+ msgfp.close()
+ return msg, data
+
+ def files(self):
+ times = {}
+ lower = self.__lower
+ upper = self.__upper
+ for f in os.listdir(self.__whichq):
+ # We only care about the file's base name (i.e. no extension).
+ # Thus we'll ignore anything that doesn't end in .db.
+ if not f.endswith('.db'):
+ continue
+ filebase = os.path.splitext(f)[0]
+ when, digest = filebase.split('+')
+ # Throw out any files which don't match our bitrange. BAW: test
+ # performance and end-cases of this algorithm.
+ if not lower or (lower <= long(digest, 16) < upper):
+ times[float(when)] = filebase
+ # FIFO sort
+ keys = times.keys()
+ keys.sort()
+ return [times[k] for k in keys]
+
+ def _ext_write(self, tmpfile, data):
+ raise NotImplementedError
+
+ def _ext_read(self, dbfile):
+ raise NotImplementedError
+
+
+
+class MarshalSwitchboard(_Switchboard):
+ """Python marshal format."""
+ FLOAT_ATTRIBUTES = ['received_time']
+
+ def _ext_write(self, filename, dict):
+ omask = os.umask(007) # -rw-rw----
+ try:
+ fp = open(filename, 'w')
+ finally:
+ os.umask(omask)
+ # Python's marshal, up to and including in Python 2.1, has a bug where
+ # the full precision of floats was not stored. We work around this
+ # bug by hardcoding a list of float values we know about, repr()-izing
+ # them ourselves, and doing the reverse conversion on _ext_read().
+ for attr in self.FLOAT_ATTRIBUTES:
+ # We use try/except because we expect a hitrate of nearly 100%
+ try:
+ fval = dict[attr]
+ except KeyError:
+ pass
+ else:
+ dict[attr] = repr(fval)
+ marshal.dump(dict, fp)
+ fp.close()
+
+ def _ext_read(self, filename):
+ fp = open(filename)
+ dict = marshal.load(fp)
+ # Update from version 2 files
+ if dict.get('version', 0) == 2:
+ del dict['filebase']
+ # Do the reverse conversion (repr -> float)
+ for attr in self.FLOAT_ATTRIBUTES:
+ try:
+ sval = dict[attr]
+ except KeyError:
+ pass
+ else:
+ # Do a safe eval by setting up a restricted execution
+ # environment. This may not be strictly necessary since we
+ # know they are floats, but it can't hurt.
+ dict[attr] = eval(sval, {'__builtins__': {}})
+ fp.close()
+ return dict
+
+
+
+class BSDDBSwitchboard(_Switchboard):
+ """Native (i.e. compiled-in) Berkeley db format."""
+ def _ext_write(self, filename, dict):
+ import bsddb
+ omask = os.umask(0)
+ try:
+ hashfile = bsddb.hashopen(filename, 'n', 0660)
+ finally:
+ os.umask(omask)
+ # values must be strings
+ for k, v in dict.items():
+ hashfile[k] = marshal.dumps(v)
+ hashfile.sync()
+ hashfile.close()
+
+ def _ext_read(self, filename):
+ import bsddb
+ dict = {}
+ hashfile = bsddb.hashopen(filename, 'r')
+ for k in hashfile.keys():
+ dict[k] = marshal.loads(hashfile[k])
+ hashfile.close()
+ return dict
+
+
+
+class ASCIISwitchboard(_Switchboard):
+ """Human readable .db file format.
+
+ key/value pairs are written as
+
+ key = value
+
+ as real Python code which can be execfile'd.
+ """
+
+ def _ext_write(self, filename, dict):
+ omask = os.umask(007) # -rw-rw----
+ try:
+ fp = open(filename, 'w')
+ finally:
+ os.umask(omask)
+ for k, v in dict.items():
+ print >> fp, '%s = %s' % (k, repr(v))
+ fp.close()
+
+ def _ext_read(self, filename):
+ dict = {'__builtins__': {}}
+ execfile(filename, dict)
+ del dict['__builtins__']
+ return dict
+
+
+
+# Here are the various types of external file formats available. The format
+# chosen is given defined in the mm_cfg.py configuration file.
+if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
+ Switchboard = MarshalSwitchboard
+elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
+ Switchboard = BSDDBSwitchboard
+elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
+ Switchboard = ASCIISwitchboard
+else:
+ syslog('error', 'Undefined metadata format: %d (using marshals)',
+ mm_cfg.METADATA_FORMAT)
+ Switchboard = MarshalSwitchboard
+
+
+
+# For bin/dumpdb
+class DumperSwitchboard(Switchboard):
+ def __init__(self):
+ pass
+
+ def read(self, filename):
+ return self._ext_read(filename)