From b312b09faef93382f26ad3ba15101c581405d545 Mon Sep 17 00:00:00 2001 From: bwarsaw <> Date: Tue, 10 Feb 2004 22:55:49 +0000 Subject: enqueue(), dequeue(), files(): Implementation of new, more efficient, one-file-per-queued-message architecture. The message object and metadata dictionary are now written to the same .pck file -- in that order -- instead of to separate files which were more complicated to manage. dequeue() especially simplifies considerably. Also, get rid of the special _Switchboard hack, as well as the MarshalSwitchboard, ASCIISwitchboard, and BSDDBSwitchboard implementations. Also get rid of the DumperSwitchboard class. --- Mailman/Queue/Switchboard.py | 263 ++++++------------------------------------- 1 file changed, 37 insertions(+), 226 deletions(-) diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 9853866d..f86b83b7 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2001-2003 by the Free Software Foundation, Inc. +# Copyright (C) 2001-2004 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -34,13 +34,12 @@ # needs. import os -import time import sha -import marshal +import time +import email import errno import cPickle - -import email +import marshal from Mailman import mm_cfg from Mailman import Utils @@ -63,7 +62,7 @@ SAVE_MSGS_AS_PICKLES = True -class _Switchboard: +class Switchboard: def __init__(self, whichq, slice=None, numslices=1): self.__whichq = whichq # Create the directory if it doesn't yet exist. @@ -97,11 +96,11 @@ class _Switchboard: # Get some data for the input to the sha hash now = time.time() if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): - msgsave = cPickle.dumps(_msg, 1) - ext = '.pck' + protocol = 1 + msgsave = cPickle.dumps(_msg, protocol) else: - msgsave = str(_msg) - ext = '.msg' + protocol = 0 + msgsave = cPickle.dumps(str(_msg), protocol) hashfood = msgsave + listname + `now` # Encode the current time into the file name for FIFO sorting in # files(). The file name consists of two parts separated by a `+': @@ -110,94 +109,46 @@ class _Switchboard: #rcvtime = data.setdefault('received_time', now) rcvtime = data.setdefault('received_time', now) filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() - # Figure out which queue files the message is to be written to. - msgfile = os.path.join(self.__whichq, filebase + ext) - dbfile = os.path.join(self.__whichq, filebase + '.db') + filename = os.path.join(self.__whichq, filebase + '.pck') + tmpfile = filename + '.tmp' # Always add the metadata schema version number data['version'] = mm_cfg.QFILE_SCHEMA_VERSION # Filter out volatile entries for k in data.keys(): if k.startswith('_'): del data[k] - # Now write the message text to one file and the metadata to another - # file. The metadata is always written second to avoid race - # conditions with the various queue runners (which key off of the .db - # filename). + # We have to tell the dequeue() method whether to parse the message + # object or not. + data['_parsemsg'] = (protocol == 0) + # Write to the pickle file the message object and metadata. omask = os.umask(007) # -rw-rw---- try: - msgfp = open(msgfile, 'w') + fp = open(tmpfile, 'w') + try: + fp.write(msgsave) + cPickle.dump(data, fp, protocol) + fp.flush() + os.fsync(fp.fileno()) + finally: + fp.close() finally: os.umask(omask) - msgfp.write(msgsave) - msgfp.flush() - os.fsync(msgfp.fileno()) - msgfp.close() - # Now write the metadata using the appropriate external metadata - # format. We play rename-switcheroo here to further plug the race - # condition holes. - tmpfile = dbfile + '.tmp' - self._ext_write(tmpfile, data) - os.rename(tmpfile, dbfile) + os.rename(tmpfile, filename) return filebase def dequeue(self, filebase): - # Calculate the .db and .msg filenames from the given filebase. - msgfile = os.path.join(self.__whichq, filebase + '.msg') - pckfile = os.path.join(self.__whichq, filebase + '.pck') - dbfile = os.path.join(self.__whichq, filebase + '.db') - # Now we are going to read the message and metadata for the given - # filebase. We want to read things in this order: first, the metadata - # file to find out whether the message is stored as a pickle or as - # plain text. Second, the actual message file. However, we want to - # first unlink the message file and then the .db file, because the - # qrunner only cues off of the .db file - msg = None - try: - data = self._ext_read(dbfile) - os.unlink(dbfile) - except EnvironmentError, e: - if e.errno <> errno.ENOENT: raise - data = {} - # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata - # was renamed to `rejection_notice', since dashes in the keys are not - # supported in METAFMT_ASCII. - if data.has_key('rejection-notice'): - data['rejection_notice'] = data['rejection-notice'] - del data['rejection-notice'] - msgfp = None + # Calculate the filename from the given filebase. + filename = os.path.join(self.__whichq, filebase + '.pck') + # Read the message object and metadata. + fp = open(filename) + os.unlink(filename) try: - try: - msgfp = open(pckfile) - msg = cPickle.load(msgfp) - os.unlink(pckfile) - except EnvironmentError, e: - if e.errno <> errno.ENOENT: raise - msgfp = None - try: - msgfp = open(msgfile) - msg = email.message_from_file(msgfp, Message.Message) - os.unlink(msgfile) - except EnvironmentError, e: - if e.errno <> errno.ENOENT: raise - except email.Errors.MessageParseError, e: - # This message was unparsable, most likely because its - # MIME encapsulation was broken. For now, there's not - # much we can do about it. - syslog('error', 'message is unparsable: %s', filebase) - msgfp.close() - msgfp = None - if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES: - # Cheapo way to ensure the directory exists w/ the - # proper permissions. - sb = Switchboard(mm_cfg.BADQUEUE_DIR) - os.rename(msgfile, os.path.join( - mm_cfg.BADQUEUE_DIR, filebase + '.txt')) - else: - os.unlink(msgfile) - msg = data = None + msg = cPickle.load(fp) + data = cPickle.load(fp) finally: - if msgfp: - msgfp.close() + fp.close() + if data.get('_parsemsg'): + msg = email.message_from_string(msg, Message.Message) return msg, data def files(self): @@ -205,9 +156,9 @@ class _Switchboard: lower = self.__lower upper = self.__upper for f in os.listdir(self.__whichq): - # We only care about the file's base name (i.e. no extension). - # Thus we'll ignore anything that doesn't end in .db. - if not f.endswith('.db'): + # By ignoring anything that doesn't end in .pck, we ignore + # tempfiles and avoid a race condition. + if not f.endswith('.pck'): continue filebase = os.path.splitext(f)[0] when, digest = filebase.split('+') @@ -219,143 +170,3 @@ class _Switchboard: keys = times.keys() keys.sort() return [times[k] for k in keys] - - def _ext_write(self, tmpfile, data): - raise NotImplementedError - - def _ext_read(self, dbfile): - raise NotImplementedError - - - -class MarshalSwitchboard(_Switchboard): - """Python marshal format.""" - FLOAT_ATTRIBUTES = ['received_time'] - - def _ext_write(self, filename, dict): - omask = os.umask(007) # -rw-rw---- - try: - fp = open(filename, 'w') - finally: - os.umask(omask) - # Python's marshal, up to and including in Python 2.1, has a bug where - # the full precision of floats was not stored. We work around this - # bug by hardcoding a list of float values we know about, repr()-izing - # them ourselves, and doing the reverse conversion on _ext_read(). - for attr in self.FLOAT_ATTRIBUTES: - # We use try/except because we expect a hitrate of nearly 100% - try: - fval = dict[attr] - except KeyError: - pass - else: - dict[attr] = repr(fval) - marshal.dump(dict, fp) - # Make damn sure that the data we just wrote gets flushed to disk - fp.flush() - if mm_cfg.SYNC_AFTER_WRITE: - os.fsync(fp.fileno()) - fp.close() - - def _ext_read(self, filename): - fp = open(filename) - dict = marshal.load(fp) - # Update from version 2 files - if dict.get('version', 0) == 2: - del dict['filebase'] - # Do the reverse conversion (repr -> float) - for attr in self.FLOAT_ATTRIBUTES: - try: - sval = dict[attr] - except KeyError: - pass - else: - # Do a safe eval by setting up a restricted execution - # environment. This may not be strictly necessary since we - # know they are floats, but it can't hurt. - dict[attr] = eval(sval, {'__builtins__': {}}) - fp.close() - return dict - - - -class BSDDBSwitchboard(_Switchboard): - """Native (i.e. compiled-in) Berkeley db format.""" - def _ext_write(self, filename, dict): - import bsddb - omask = os.umask(0) - try: - hashfile = bsddb.hashopen(filename, 'n', 0660) - finally: - os.umask(omask) - # values must be strings - for k, v in dict.items(): - hashfile[k] = marshal.dumps(v) - hashfile.sync() - hashfile.close() - - def _ext_read(self, filename): - import bsddb - dict = {} - hashfile = bsddb.hashopen(filename, 'r') - for k in hashfile.keys(): - dict[k] = marshal.loads(hashfile[k]) - hashfile.close() - return dict - - - -class ASCIISwitchboard(_Switchboard): - """Human readable .db file format. - - key/value pairs are written as - - key = value - - as real Python code which can be execfile'd. - """ - - def _ext_write(self, filename, dict): - omask = os.umask(007) # -rw-rw---- - try: - fp = open(filename, 'w') - finally: - os.umask(omask) - for k, v in dict.items(): - print >> fp, '%s = %s' % (k, repr(v)) - # Make damn sure that the data we just wrote gets flushed to disk - fp.flush() - if mm_cfg.SYNC_AFTER_WRITE: - os.fsync(fp.fileno()) - fp.close() - - def _ext_read(self, filename): - dict = {'__builtins__': {}} - execfile(filename, dict) - del dict['__builtins__'] - return dict - - - -# Here are the various types of external file formats available. The format -# chosen is given defined in the mm_cfg.py configuration file. -if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL: - Switchboard = MarshalSwitchboard -elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE: - Switchboard = BSDDBSwitchboard -elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII: - Switchboard = ASCIISwitchboard -else: - syslog('error', 'Undefined metadata format: %d (using marshals)', - mm_cfg.METADATA_FORMAT) - Switchboard = MarshalSwitchboard - - - -# For bin/dumpdb -class DumperSwitchboard(Switchboard): - def __init__(self): - pass - - def read(self, filename): - return self._ext_read(filename) -- cgit v1.2.3