diff options
Diffstat (limited to '')
-rw-r--r-- | Mailman/Queue/Switchboard.py | 340 |
1 files changed, 340 insertions, 0 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py new file mode 100644 index 00000000..530055ad --- /dev/null +++ b/Mailman/Queue/Switchboard.py @@ -0,0 +1,340 @@ +# Copyright (C) 2001,2002 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +"""Reading and writing message objects and message metadata. +""" + +# enqueue() and dequeue() are not symmetric. enqueue() takes a Message +# object. dequeue() returns a email.Message object tree. +# +# Message metadata is represented internally as a Python dictionary. Keys and +# values must be strings. When written to a queue directory, the metadata is +# written into an externally represented format, as defined here. Because +# components of the Mailman system may be written in something other than +# Python, the external interchange format should be chosen based on what those +# other components can read and write. +# +# Most efficient, and recommended if everything is Python, is Python marshal +# format. Also supported by default is Berkeley db format (using the default +# bsddb module compiled into your Python executable -- usually Berkeley db +# 2), and rfc822 style plain text. You can write your own if you have other +# needs. + +import os +import time +import sha +import marshal +import errno +import cPickle + +import email + +from Mailman import mm_cfg +from Mailman import Utils +from Mailman import Message +from Mailman.Logging.Syslog import syslog + +# 20 bytes of all bits set, maximum sha.digest() value +shamax = 0xffffffffffffffffffffffffffffffffffffffffL + +SAVE_MSGS_AS_PICKLES = 1 + + + +class _Switchboard: + def __init__(self, whichq, slice=None, numslices=1): + self.__whichq = whichq + # Create the directory if it doesn't yet exist. + # FIXME + omask = os.umask(0) # rwxrws--- + try: + try: + os.mkdir(self.__whichq, 0770) + except OSError, e: + if e.errno <> errno.EEXIST: raise + finally: + os.umask(omask) + # Fast track for no slices + self.__lower = None + self.__upper = None + # BAW: test performance and end-cases of this algorithm + if numslices <> 1: + self.__lower = (shamax * slice) / numslices + self.__upper = (shamax * (slice+1)) / numslices + + def whichq(self): + return self.__whichq + + def enqueue(self, _msg, _metadata={}, **_kws): + # Calculate the SHA hexdigest of the message to get a unique base + # filename. We're also going to use the digest as a hash into the set + # of parallel qrunner processes. + data = _metadata.copy() + data.update(_kws) + listname = data.get('listname', '--nolist--') + # Get some data for the input to the sha hash + now = time.time() + if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): + msgsave = cPickle.dumps(_msg, 1) + ext = '.pck' + else: + msgsave = str(_msg) + ext = '.msg' + hashfood = msgsave + listname + `now` + # Encode the current time into the file name for FIFO sorting in + # files(). The file name consists of two parts separated by a `+': + # the received time for this message (i.e. when it first showed up on + # this system) and the sha hex digest. + #rcvtime = data.setdefault('received_time', now) + rcvtime = data.setdefault('received_time', now) + filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() + # Figure out which queue files the message is to be written to. + msgfile = os.path.join(self.__whichq, filebase + ext) + dbfile = os.path.join(self.__whichq, filebase + '.db') + # Always add the metadata schema version number + data['version'] = mm_cfg.QFILE_SCHEMA_VERSION + # Filter out volatile entries + for k in data.keys(): + if k[0] == '_': + del data[k] + # Now write the message text to one file and the metadata to another + # file. The metadata is always written second to avoid race + # conditions with the various queue runners (which key off of the .db + # filename). + omask = os.umask(007) # -rw-rw---- + try: + msgfp = open(msgfile, 'w') + finally: + os.umask(omask) + msgfp.write(msgsave) + msgfp.close() + # Now write the metadata using the appropriate external metadata + # format. We play rename-switcheroo here to further plug the race + # condition holes. + tmpfile = dbfile + '.tmp' + self._ext_write(tmpfile, data) + os.rename(tmpfile, dbfile) + + def dequeue(self, filebase): + # Calculate the .db and .msg filenames from the given filebase. + msgfile = os.path.join(self.__whichq, filebase + '.msg') + pckfile = os.path.join(self.__whichq, filebase + '.pck') + dbfile = os.path.join(self.__whichq, filebase + '.db') + # Now we are going to read the message and metadata for the given + # filebase. We want to read things in this order: first, the metadata + # file to find out whether the message is stored as a pickle or as + # plain text. Second, the actual message file. However, we want to + # first unlink the message file and then the .db file, because the + # qrunner only cues off of the .db file + msg = data = None + try: + data = self._ext_read(dbfile) + os.unlink(dbfile) + except EnvironmentError, e: + if e.errno <> errno.ENOENT: raise + # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata + # was renamed to `rejection_notice', since dashes in the keys are not + # supported in METAFMT_ASCII. + if data.has_key('rejection-notice'): + data['rejection_notice'] = data['rejection-notice'] + del data['rejection-notice'] + msgfp = None + try: + try: + msgfp = open(pckfile) + msg = cPickle.load(msgfp) + os.unlink(pckfile) + except EnvironmentError, e: + if e.errno <> errno.ENOENT: raise + msgfp = None + try: + msgfp = open(msgfile) + msg = email.message_from_file(msgfp, Message.Message) + os.unlink(msgfile) + except EnvironmentError, e: + if e.errno <> errno.ENOENT: raise + except email.Errors.MessageParseError, e: + # This message was unparsable, most likely because its + # MIME encapsulation was broken. For now, there's not + # much we can do about it. + syslog('error', 'message is unparsable: %s', filebase) + msgfp.close() + msgfp = None + if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES: + # Cheapo way to ensure the directory exists w/ the + # proper permissions. + sb = Switchboard(mm_cfg.BADQUEUE_DIR) + os.rename(msgfile, os.path.join( + mm_cfg.BADQUEUE_DIR, filebase + '.txt')) + else: + os.unlink(msgfile) + msg = data = None + finally: + if msgfp: + msgfp.close() + return msg, data + + def files(self): + times = {} + lower = self.__lower + upper = self.__upper + for f in os.listdir(self.__whichq): + # We only care about the file's base name (i.e. no extension). + # Thus we'll ignore anything that doesn't end in .db. + if not f.endswith('.db'): + continue + filebase = os.path.splitext(f)[0] + when, digest = filebase.split('+') + # Throw out any files which don't match our bitrange. BAW: test + # performance and end-cases of this algorithm. + if not lower or (lower <= long(digest, 16) < upper): + times[float(when)] = filebase + # FIFO sort + keys = times.keys() + keys.sort() + return [times[k] for k in keys] + + def _ext_write(self, tmpfile, data): + raise NotImplementedError + + def _ext_read(self, dbfile): + raise NotImplementedError + + + +class MarshalSwitchboard(_Switchboard): + """Python marshal format.""" + FLOAT_ATTRIBUTES = ['received_time'] + + def _ext_write(self, filename, dict): + omask = os.umask(007) # -rw-rw---- + try: + fp = open(filename, 'w') + finally: + os.umask(omask) + # Python's marshal, up to and including in Python 2.1, has a bug where + # the full precision of floats was not stored. We work around this + # bug by hardcoding a list of float values we know about, repr()-izing + # them ourselves, and doing the reverse conversion on _ext_read(). + for attr in self.FLOAT_ATTRIBUTES: + # We use try/except because we expect a hitrate of nearly 100% + try: + fval = dict[attr] + except KeyError: + pass + else: + dict[attr] = repr(fval) + marshal.dump(dict, fp) + fp.close() + + def _ext_read(self, filename): + fp = open(filename) + dict = marshal.load(fp) + # Update from version 2 files + if dict.get('version', 0) == 2: + del dict['filebase'] + # Do the reverse conversion (repr -> float) + for attr in self.FLOAT_ATTRIBUTES: + try: + sval = dict[attr] + except KeyError: + pass + else: + # Do a safe eval by setting up a restricted execution + # environment. This may not be strictly necessary since we + # know they are floats, but it can't hurt. + dict[attr] = eval(sval, {'__builtins__': {}}) + fp.close() + return dict + + + +class BSDDBSwitchboard(_Switchboard): + """Native (i.e. compiled-in) Berkeley db format.""" + def _ext_write(self, filename, dict): + import bsddb + omask = os.umask(0) + try: + hashfile = bsddb.hashopen(filename, 'n', 0660) + finally: + os.umask(omask) + # values must be strings + for k, v in dict.items(): + hashfile[k] = marshal.dumps(v) + hashfile.sync() + hashfile.close() + + def _ext_read(self, filename): + import bsddb + dict = {} + hashfile = bsddb.hashopen(filename, 'r') + for k in hashfile.keys(): + dict[k] = marshal.loads(hashfile[k]) + hashfile.close() + return dict + + + +class ASCIISwitchboard(_Switchboard): + """Human readable .db file format. + + key/value pairs are written as + + key = value + + as real Python code which can be execfile'd. + """ + + def _ext_write(self, filename, dict): + omask = os.umask(007) # -rw-rw---- + try: + fp = open(filename, 'w') + finally: + os.umask(omask) + for k, v in dict.items(): + print >> fp, '%s = %s' % (k, repr(v)) + fp.close() + + def _ext_read(self, filename): + dict = {'__builtins__': {}} + execfile(filename, dict) + del dict['__builtins__'] + return dict + + + +# Here are the various types of external file formats available. The format +# chosen is given defined in the mm_cfg.py configuration file. +if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL: + Switchboard = MarshalSwitchboard +elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE: + Switchboard = BSDDBSwitchboard +elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII: + Switchboard = ASCIISwitchboard +else: + syslog('error', 'Undefined metadata format: %d (using marshals)', + mm_cfg.METADATA_FORMAT) + Switchboard = MarshalSwitchboard + + + +# For bin/dumpdb +class DumperSwitchboard(Switchboard): + def __init__(self): + pass + + def read(self, filename): + return self._ext_read(filename) |