# Copyright (C) 2001-2003 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Reading and writing message objects and message metadata.
"""
# enqueue() and dequeue() are not symmetric. enqueue() takes a Message
# object. dequeue() returns a email.Message object tree.
#
# Message metadata is represented internally as a Python dictionary. Keys and
# values must be strings. When written to a queue directory, the metadata is
# written into an externally represented format, as defined here. Because
# components of the Mailman system may be written in something other than
# Python, the external interchange format should be chosen based on what those
# other components can read and write.
#
# Most efficient, and recommended if everything is Python, is Python marshal
# format. Also supported by default is Berkeley db format (using the default
# bsddb module compiled into your Python executable -- usually Berkeley db
# 2), and rfc822 style plain text. You can write your own if you have other
# needs.
import os
import time
import sha
import marshal
import errno
import cPickle
import email
from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Message
from Mailman.Logging.Syslog import syslog
# 20 bytes of all bits set, maximum sha.digest() value
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
try:
True, False
except NameError:
True = 1
False = 0
# This flag causes Mailman to fsync() the file after writing and flushing its
# contents. While this ensures the data is written to disk, avoiding data
# loss, it is a huge performance killer.
SYNC_AFTER_WRITE = False
# This flag causes messages to be written as pickles (when True) or text files
# (when False). Pickles are more efficient because the message doesn't need
# to be re-parsed every time it's unqueued, but pickles are not human readable.
SAVE_MSGS_AS_PICKLES = True
class _Switchboard:
def __init__(self, whichq, slice=None, numslices=1):
self.__whichq = whichq
# Create the directory if it doesn't yet exist.
# FIXME
omask = os.umask(0) # rwxrws---
try:
try:
os.mkdir(self.__whichq, 0770)
except OSError, e:
if e.errno <> errno.EEXIST: raise
finally:
os.umask(omask)
# Fast track for no slices
self.__lower = None
self.__upper = None
# BAW: test performance and end-cases of this algorithm
if numslices <> 1:
self.__lower = ((shamax+1) * slice) / numslices
self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1
def whichq(self):
return self.__whichq
def enqueue(self, _msg, _metadata={}, **_kws):
# Calculate the SHA hexdigest of the message to get a unique base
# filename. We're also going to use the digest as a hash into the set
# of parallel qrunner processes.
data = _metadata.copy()
data.update(_kws)
listname = data.get('listname', '--nolist--')
# Get some data for the input to the sha hash
now = time.time()
if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
msgsave = cPickle.dumps(_msg, 1)
ext = '.pck'
else:
msgsave = str(_msg)
ext = '.msg'
hashfood = msgsave + listname + `now`
# Encode the current time into the file name for FIFO sorting in
# files(). The file name consists of two parts separated by a `+':
# the received time for this message (i.e. when it first showed up on
# this system) and the sha hex digest.
#rcvtime = data.setdefault('received_time', now)
rcvtime = data.setdefault('received_time', now)
filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
# Figure out which queue files the message is to be written to.
msgfile = os.path.join(self.__whichq, filebase + ext)
dbfile = os.path.join(self.__whichq, filebase + '.db')
# Always add the metadata schema version number
data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
# Filter out volatile entries
for k in data.keys():
if k[0] == '_':
del data[k]
# Now write the message text to one file and the metadata to another
# file. The metadata is always written second to avoid race
# conditions with the various queue runners (which key off of the .db
# filename).
omask = os.umask(007) # -rw-rw----
try:
msgfp = open(msgfile, 'w')
finally:
os.umask(omask)
msgfp.write(msgsave)
msgfp.close()
# Now write the metadata using the appropriate external metadata
# format. We play rename-switcheroo here to further plug the race
# condition holes.
tmpfile = dbfile + '.tmp'
self._ext_write(tmpfile, data)
os.rename(tmpfile, dbfile)
return filebase
def dequeue(self, filebase):
# Calculate the .db and .msg filenames from the given filebase.
msgfile = os.path.join(self.__whichq, filebase + '.msg')
pckfile = os.path.join(self.__whichq, filebase + '.pck')
dbfile = os.path.join(self.__whichq, filebase + '.db')
# Now we are going to read the message and metadata for the given
# filebase. We want to read things in this order: first, the metadata
# file to find out whether the message is stored as a pickle or as
# plain text. Second, the actual message file. However, we want to
# first unlink the message file and then the .db file, because the
# qrunner only cues off of the .db file
msg = None
try:
data = self._ext_read(dbfile)
os.unlink(dbfile)
except EnvironmentError, e:
if e.errno <> errno.ENOENT: raise
data = {}
# Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata
# was renamed to `rejection_notice', since dashes in the keys are not
# supported in METAFMT_ASCII.
if data.has_key('rejection-notice'):
data['rejection_notice'] = data['rejection-notice']
del data['rejection-notice']
msgfp = None
try:
try:
msgfp = open(pckfile)
msg = cPickle.load(msgfp)
os.unlink(pckfile)
except EnvironmentError, e:
if e.errno <> errno.ENOENT: raise
msgfp = None
try:
msgfp = open(msgfile)
msg = email.message_from_file(msgfp, Message.Message)
os.unlink(msgfile)
except EnvironmentError, e:
if e.errno <> errno.ENOENT: raise
except email.Errors.MessageParseError, e:
# This message was unparsable, most likely because its
# MIME encapsulation was broken. For now, there's not
# much we can do about it.
syslog('error', 'message is unparsable: %s', filebase)
msgfp.close()
msgfp = None
if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
# Cheapo way to ensure the directory exists w/ the
# proper permissions.
sb = Switchboard(mm_cfg.BADQUEUE_DIR)
os.rename(msgfile, os.path.join(
mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
else:
os.unlink(msgfile)
msg = data = None
finally:
if msgfp:
msgfp.close()
return msg, data
def files(self):
times = {}
lower = self.__lower
upper = self.__upper
for f in os.listdir(self.__whichq):
# We only care about the file's base name (i.e. no extension).
# Thus we'll ignore anything that doesn't end in .db.
if not f.endswith('.db'):
continue
filebase = os.path.splitext(f)[0]
when, digest = filebase.split('+')
# Throw out any files which don't match our bitrange. BAW: test
# performance and end-cases of this algorithm.
if not lower or (lower <= long(digest, 16) < upper):
times[float(when)] = filebase
# FIFO sort
keys = times.keys()
keys.sort()
return [times[k] for k in keys]
def _ext_write(self, tmpfile, data):
raise NotImplementedError
def _ext_read(self, dbfile):
raise NotImplementedError
class MarshalSwitchboard(_Switchboard):
"""Python marshal format."""
FLOAT_ATTRIBUTES = ['received_time']
def _ext_write(self, filename, dict):
omask = os.umask(007) # -rw-rw----
try:
fp = open(filename, 'w')
finally:
os.umask(omask)
# Python's marshal, up to and including in Python 2.1, has a bug where
# the full precision of floats was not stored. We work around this
# bug by hardcoding a list of float values we know about, repr()-izing
# them ourselves, and doing the reverse conversion on _ext_read().
for attr in self.FLOAT_ATTRIBUTES:
# We use try/except because we expect a hitrate of nearly 100%
try:
fval = dict[attr]
except KeyError:
pass
else:
dict[attr] = repr(fval)
marshal.dump(dict, fp)
# Make damn sure that the data we just wrote gets flushed to disk
fp.flush()
if SYNC_AFTER_WRITE:
os.fsync(fp.fileno())
fp.close()
def _ext_read(self, filename):
fp = open(filename)
dict = marshal.load(fp)
# Update from version 2 files
if dict.get('version', 0) == 2:
del dict['filebase']
# Do the reverse conversion (repr -> float)
for attr in self.FLOAT_ATTRIBUTES:
try:
sval = dict[attr]
except KeyError:
pass
else:
# Do a safe eval by setting up a restricted execution
# environment. This may not be strictly necessary since we
# know they are floats, but it can't hurt.
dict[attr] = eval(sval, {'__builtins__': {}})
fp.close()
return dict
class BSDDBSwitchboard(_Switchboard):
"""Native (i.e. compiled-in) Berkeley db format."""
def _ext_write(self, filename, dict):
import bsddb
omask = os.umask(0)
try:
hashfile = bsddb.hashopen(filename, 'n', 0660)
finally:
os.umask(omask)
# values must be strings
for k, v in dict.items():
hashfile[k] = marshal.dumps(v)
hashfile.sync()
hashfile.close()
def _ext_read(self, filename):
import bsddb
dict = {}
hashfile = bsddb.hashopen(filename, 'r')
for k in hashfile.keys():
dict[k] = marshal.loads(hashfile[k])
hashfile.close()
return dict
class ASCIISwitchboard(_Switchboard):
"""Human readable .db file format.
key/value pairs are written as
key = value
as real Python code which can be execfile'd.
"""
def _ext_write(self, filename, dict):
omask = os.umask(007) # -rw-rw----
try:
fp = open(filename, 'w')
finally:
os.umask(omask)
for k, v in dict.items():
print >> fp, '%s = %s' % (k, repr(v))
# Make damn sure that the data we just wrote gets flushed to disk
fp.flush()
if SYNC_AFTER_WRITE:
os.fsync(fp.fileno())
fp.close()
def _ext_read(self, filename):
dict = {'__builtins__': {}}
execfile(filename, dict)
del dict['__builtins__']
return dict
# Here are the various types of external file formats available. The format
# chosen is given defined in the mm_cfg.py configuration file.
if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
Switchboard = MarshalSwitchboard
elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
Switchboard = BSDDBSwitchboard
elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
Switchboard = ASCIISwitchboard
else:
syslog('error', 'Undefined metadata format: %d (using marshals)',
mm_cfg.METADATA_FORMAT)
Switchboard = MarshalSwitchboard
# For bin/dumpdb
class DumperSwitchboard(Switchboard):
def __init__(self):
pass
def read(self, filename):
return self._ext_read(filename)