aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Mailman/Queue/Switchboard.py263
1 files changed, 37 insertions, 226 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index 9853866d..f86b83b7 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2001-2003 by the Free Software Foundation, Inc.
+# Copyright (C) 2001-2004 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -34,13 +34,12 @@
# needs.
import os
-import time
import sha
-import marshal
+import time
+import email
import errno
import cPickle
-
-import email
+import marshal
from Mailman import mm_cfg
from Mailman import Utils
@@ -63,7 +62,7 @@ SAVE_MSGS_AS_PICKLES = True
-class _Switchboard:
+class Switchboard:
def __init__(self, whichq, slice=None, numslices=1):
self.__whichq = whichq
# Create the directory if it doesn't yet exist.
@@ -97,11 +96,11 @@ class _Switchboard:
# Get some data for the input to the sha hash
now = time.time()
if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
- msgsave = cPickle.dumps(_msg, 1)
- ext = '.pck'
+ protocol = 1
+ msgsave = cPickle.dumps(_msg, protocol)
else:
- msgsave = str(_msg)
- ext = '.msg'
+ protocol = 0
+ msgsave = cPickle.dumps(str(_msg), protocol)
hashfood = msgsave + listname + `now`
# Encode the current time into the file name for FIFO sorting in
# files(). The file name consists of two parts separated by a `+':
@@ -110,94 +109,46 @@ class _Switchboard:
#rcvtime = data.setdefault('received_time', now)
rcvtime = data.setdefault('received_time', now)
filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
- # Figure out which queue files the message is to be written to.
- msgfile = os.path.join(self.__whichq, filebase + ext)
- dbfile = os.path.join(self.__whichq, filebase + '.db')
+ filename = os.path.join(self.__whichq, filebase + '.pck')
+ tmpfile = filename + '.tmp'
# Always add the metadata schema version number
data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
# Filter out volatile entries
for k in data.keys():
if k.startswith('_'):
del data[k]
- # Now write the message text to one file and the metadata to another
- # file. The metadata is always written second to avoid race
- # conditions with the various queue runners (which key off of the .db
- # filename).
+ # We have to tell the dequeue() method whether to parse the message
+ # object or not.
+ data['_parsemsg'] = (protocol == 0)
+ # Write to the pickle file the message object and metadata.
omask = os.umask(007) # -rw-rw----
try:
- msgfp = open(msgfile, 'w')
+ fp = open(tmpfile, 'w')
+ try:
+ fp.write(msgsave)
+ cPickle.dump(data, fp, protocol)
+ fp.flush()
+ os.fsync(fp.fileno())
+ finally:
+ fp.close()
finally:
os.umask(omask)
- msgfp.write(msgsave)
- msgfp.flush()
- os.fsync(msgfp.fileno())
- msgfp.close()
- # Now write the metadata using the appropriate external metadata
- # format. We play rename-switcheroo here to further plug the race
- # condition holes.
- tmpfile = dbfile + '.tmp'
- self._ext_write(tmpfile, data)
- os.rename(tmpfile, dbfile)
+ os.rename(tmpfile, filename)
return filebase
def dequeue(self, filebase):
- # Calculate the .db and .msg filenames from the given filebase.
- msgfile = os.path.join(self.__whichq, filebase + '.msg')
- pckfile = os.path.join(self.__whichq, filebase + '.pck')
- dbfile = os.path.join(self.__whichq, filebase + '.db')
- # Now we are going to read the message and metadata for the given
- # filebase. We want to read things in this order: first, the metadata
- # file to find out whether the message is stored as a pickle or as
- # plain text. Second, the actual message file. However, we want to
- # first unlink the message file and then the .db file, because the
- # qrunner only cues off of the .db file
- msg = None
- try:
- data = self._ext_read(dbfile)
- os.unlink(dbfile)
- except EnvironmentError, e:
- if e.errno <> errno.ENOENT: raise
- data = {}
- # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata
- # was renamed to `rejection_notice', since dashes in the keys are not
- # supported in METAFMT_ASCII.
- if data.has_key('rejection-notice'):
- data['rejection_notice'] = data['rejection-notice']
- del data['rejection-notice']
- msgfp = None
+ # Calculate the filename from the given filebase.
+ filename = os.path.join(self.__whichq, filebase + '.pck')
+ # Read the message object and metadata.
+ fp = open(filename)
+ os.unlink(filename)
try:
- try:
- msgfp = open(pckfile)
- msg = cPickle.load(msgfp)
- os.unlink(pckfile)
- except EnvironmentError, e:
- if e.errno <> errno.ENOENT: raise
- msgfp = None
- try:
- msgfp = open(msgfile)
- msg = email.message_from_file(msgfp, Message.Message)
- os.unlink(msgfile)
- except EnvironmentError, e:
- if e.errno <> errno.ENOENT: raise
- except email.Errors.MessageParseError, e:
- # This message was unparsable, most likely because its
- # MIME encapsulation was broken. For now, there's not
- # much we can do about it.
- syslog('error', 'message is unparsable: %s', filebase)
- msgfp.close()
- msgfp = None
- if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
- # Cheapo way to ensure the directory exists w/ the
- # proper permissions.
- sb = Switchboard(mm_cfg.BADQUEUE_DIR)
- os.rename(msgfile, os.path.join(
- mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
- else:
- os.unlink(msgfile)
- msg = data = None
+ msg = cPickle.load(fp)
+ data = cPickle.load(fp)
finally:
- if msgfp:
- msgfp.close()
+ fp.close()
+ if data.get('_parsemsg'):
+ msg = email.message_from_string(msg, Message.Message)
return msg, data
def files(self):
@@ -205,9 +156,9 @@ class _Switchboard:
lower = self.__lower
upper = self.__upper
for f in os.listdir(self.__whichq):
- # We only care about the file's base name (i.e. no extension).
- # Thus we'll ignore anything that doesn't end in .db.
- if not f.endswith('.db'):
+ # By ignoring anything that doesn't end in .pck, we ignore
+ # tempfiles and avoid a race condition.
+ if not f.endswith('.pck'):
continue
filebase = os.path.splitext(f)[0]
when, digest = filebase.split('+')
@@ -219,143 +170,3 @@ class _Switchboard:
keys = times.keys()
keys.sort()
return [times[k] for k in keys]
-
- def _ext_write(self, tmpfile, data):
- raise NotImplementedError
-
- def _ext_read(self, dbfile):
- raise NotImplementedError
-
-
-
-class MarshalSwitchboard(_Switchboard):
- """Python marshal format."""
- FLOAT_ATTRIBUTES = ['received_time']
-
- def _ext_write(self, filename, dict):
- omask = os.umask(007) # -rw-rw----
- try:
- fp = open(filename, 'w')
- finally:
- os.umask(omask)
- # Python's marshal, up to and including in Python 2.1, has a bug where
- # the full precision of floats was not stored. We work around this
- # bug by hardcoding a list of float values we know about, repr()-izing
- # them ourselves, and doing the reverse conversion on _ext_read().
- for attr in self.FLOAT_ATTRIBUTES:
- # We use try/except because we expect a hitrate of nearly 100%
- try:
- fval = dict[attr]
- except KeyError:
- pass
- else:
- dict[attr] = repr(fval)
- marshal.dump(dict, fp)
- # Make damn sure that the data we just wrote gets flushed to disk
- fp.flush()
- if mm_cfg.SYNC_AFTER_WRITE:
- os.fsync(fp.fileno())
- fp.close()
-
- def _ext_read(self, filename):
- fp = open(filename)
- dict = marshal.load(fp)
- # Update from version 2 files
- if dict.get('version', 0) == 2:
- del dict['filebase']
- # Do the reverse conversion (repr -> float)
- for attr in self.FLOAT_ATTRIBUTES:
- try:
- sval = dict[attr]
- except KeyError:
- pass
- else:
- # Do a safe eval by setting up a restricted execution
- # environment. This may not be strictly necessary since we
- # know they are floats, but it can't hurt.
- dict[attr] = eval(sval, {'__builtins__': {}})
- fp.close()
- return dict
-
-
-
-class BSDDBSwitchboard(_Switchboard):
- """Native (i.e. compiled-in) Berkeley db format."""
- def _ext_write(self, filename, dict):
- import bsddb
- omask = os.umask(0)
- try:
- hashfile = bsddb.hashopen(filename, 'n', 0660)
- finally:
- os.umask(omask)
- # values must be strings
- for k, v in dict.items():
- hashfile[k] = marshal.dumps(v)
- hashfile.sync()
- hashfile.close()
-
- def _ext_read(self, filename):
- import bsddb
- dict = {}
- hashfile = bsddb.hashopen(filename, 'r')
- for k in hashfile.keys():
- dict[k] = marshal.loads(hashfile[k])
- hashfile.close()
- return dict
-
-
-
-class ASCIISwitchboard(_Switchboard):
- """Human readable .db file format.
-
- key/value pairs are written as
-
- key = value
-
- as real Python code which can be execfile'd.
- """
-
- def _ext_write(self, filename, dict):
- omask = os.umask(007) # -rw-rw----
- try:
- fp = open(filename, 'w')
- finally:
- os.umask(omask)
- for k, v in dict.items():
- print >> fp, '%s = %s' % (k, repr(v))
- # Make damn sure that the data we just wrote gets flushed to disk
- fp.flush()
- if mm_cfg.SYNC_AFTER_WRITE:
- os.fsync(fp.fileno())
- fp.close()
-
- def _ext_read(self, filename):
- dict = {'__builtins__': {}}
- execfile(filename, dict)
- del dict['__builtins__']
- return dict
-
-
-
-# Here are the various types of external file formats available. The format
-# chosen is given defined in the mm_cfg.py configuration file.
-if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
- Switchboard = MarshalSwitchboard
-elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
- Switchboard = BSDDBSwitchboard
-elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
- Switchboard = ASCIISwitchboard
-else:
- syslog('error', 'Undefined metadata format: %d (using marshals)',
- mm_cfg.METADATA_FORMAT)
- Switchboard = MarshalSwitchboard
-
-
-
-# For bin/dumpdb
-class DumperSwitchboard(Switchboard):
- def __init__(self):
- pass
-
- def read(self, filename):
- return self._ext_read(filename)