# Copyright (C) 2001-2008 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. """Reading and writing message objects and message metadata. """ # enqueue() and dequeue() are not symmetric. enqueue() takes a Message # object. dequeue() returns a email.Message object tree. # # Message metadata is represented internally as a Python dictionary. Keys and # values must be strings. When written to a queue directory, the metadata is # written into an externally represented format, as defined here. Because # components of the Mailman system may be written in something other than # Python, the external interchange format should be chosen based on what those # other components can read and write. # # Most efficient, and recommended if everything is Python, is Python marshal # format. Also supported by default is Berkeley db format (using the default # bsddb module compiled into your Python executable -- usually Berkeley db # 2), and rfc822 style plain text. You can write your own if you have other # needs. import os import time import email import errno import cPickle import marshal from Mailman import mm_cfg from Mailman import Utils from Mailman import Message from Mailman.Logging.Syslog import syslog from Mailman.Utils import sha_new # 20 bytes of all bits set, maximum sha.digest() value shamax = 0xffffffffffffffffffffffffffffffffffffffffL try: True, False except NameError: True = 1 False = 0 # This flag causes messages to be written as pickles (when True) or text files # (when False). Pickles are more efficient because the message doesn't need # to be re-parsed every time it's unqueued, but pickles are not human readable. SAVE_MSGS_AS_PICKLES = True # Small increment to add to time in case two entries have the same time. This # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 # We count the number of times a file has been moved to .bak and recovered. # In order to prevent loops and a message flood, when the count reaches this # value, we move the file to the shunt queue as a .psv. MAX_BAK_COUNT = 3 class Switchboard: def __init__(self, whichq, slice=None, numslices=1, recover=False): self.__whichq = whichq # Create the directory if it doesn't yet exist. # FIXME omask = os.umask(0) # rwxrws--- try: try: os.mkdir(self.__whichq, 0770) except OSError, e: if e.errno <> errno.EEXIST: raise finally: os.umask(omask) # Fast track for no slices self.__lower = None self.__upper = None # BAW: test performance and end-cases of this algorithm if numslices <> 1: self.__lower = ((shamax+1) * slice) / numslices self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1 if recover: self.recover_backup_files() def whichq(self): return self.__whichq def enqueue(self, _msg, _metadata={}, **_kws): # Calculate the SHA hexdigest of the message to get a unique base # filename. We're also going to use the digest as a hash into the set # of parallel qrunner processes. data = _metadata.copy() data.update(_kws) listname = data.get('listname', '--nolist--') # Get some data for the input to the sha hash now = time.time() if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): protocol = 1 msgsave = cPickle.dumps(_msg, protocol) else: protocol = 0 msgsave = cPickle.dumps(str(_msg), protocol) hashfood = msgsave + listname + `now` # Encode the current time into the file name for FIFO sorting in # files(). The file name consists of two parts separated by a `+': # the received time for this message (i.e. when it first showed up on # this system) and the sha hex digest. #rcvtime = data.setdefault('received_time', now) rcvtime = data.setdefault('received_time', now) filebase = `rcvtime` + '+' + sha_new(hashfood).hexdigest() filename = os.path.join(self.__whichq, filebase + '.pck') tmpfile = filename + '.tmp' # Always add the metadata schema version number data['version'] = mm_cfg.QFILE_SCHEMA_VERSION # Filter out volatile entries for k in data.keys(): if k.startswith('_'): del data[k] # We have to tell the dequeue() method whether to parse the message # object or not. data['_parsemsg'] = (protocol == 0) # Write to the pickle file the message object and metadata. omask = os.umask(007) # -rw-rw---- try: fp = open(tmpfile, 'w') try: fp.write(msgsave) cPickle.dump(data, fp, protocol) fp.flush() os.fsync(fp.fileno()) finally: fp.close() finally: os.umask(omask) os.rename(tmpfile, filename) return filebase def dequeue(self, filebase): # Calculate the filename from the given filebase. filename = os.path.join(self.__whichq, filebase + '.pck') backfile = os.path.join(self.__whichq, filebase + '.bak') # Read the message object and metadata. fp = open(filename) # Move the file to the backup file name for processing. If this # process crashes uncleanly the .bak file will be used to re-instate # the .pck file in order to try again. os.rename(filename, backfile) try: msg = cPickle.load(fp) data = cPickle.load(fp) finally: fp.close() if data.get('_parsemsg'): msg = email.message_from_string(msg, Message.Message) return msg, data def finish(self, filebase, preserve=False): bakfile = os.path.join(self.__whichq, filebase + '.bak') try: if preserve: psvfile = os.path.join(mm_cfg.BADQUEUE_DIR, filebase + '.psv') # Create the directory if it doesn't yet exist. # Copied from __init__. omask = os.umask(0) # rwxrws--- try: try: os.mkdir(mm_cfg.BADQUEUE_DIR, 0770) except OSError, e: if e.errno <> errno.EEXIST: raise finally: os.umask(omask) os.rename(bakfile, psvfile) else: os.unlink(bakfile) except EnvironmentError, e: syslog('error', 'Failed to unlink/preserve backup file: %s', bakfile) def files(self, extension='.pck'): times = {} lower = self.__lower upper = self.__upper for f in os.listdir(self.__whichq): # By ignoring anything that doesn't end in .pck, we ignore # tempfiles and avoid a race condition. filebase, ext = os.path.splitext(f) if ext <> extension: continue when, digest = filebase.split('+') # Throw out any files which don't match our bitrange. BAW: test # performance and end-cases of this algorithm. MAS: both # comparisons need to be <= to get complete range. if lower is None or (lower <= long(digest, 16) <= upper): key = float(when) while times.has_key(key): key += DELTA times[key] = filebase # FIFO sort keys = times.keys() keys.sort() return [times[k] for k in keys] def recover_backup_files(self): # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our # normal dequeuing process will handle them. We keep count in # _bak_count in the metadata of the number of times we recover this # file. When the count reaches MAX_BAK_COUNT, we move the .bak file # to a .psv file in the shunt queue. for filebase in self.files('.bak'): src = os.path.join(self.__whichq, filebase + '.bak') dst = os.path.join(self.__whichq, filebase + '.pck') fp = open(src, 'rb+') try: try: msg = cPickle.load(fp) data_pos = fp.tell() data = cPickle.load(fp) except Exception, s: # If unpickling throws any exception, just log and # preserve this entry syslog('error', 'Unpickling .bak exception: %s\n' + 'preserving file: %s', s, filebase) self.finish(filebase, preserve=True) else: data['_bak_count'] = data.setdefault('_bak_count', 0) + 1 fp.seek(data_pos) if data.get('_parsemsg'): protocol = 0 else: protocol = 1 cPickle.dump(data, fp, protocol) fp.truncate() fp.flush() os.fsync(fp.fileno()) if data['_bak_count'] >= MAX_BAK_COUNT: syslog('error', '.bak file max count, preserving file: %s', filebase) self.finish(filebase, preserve=True) else: os.rename(src, dst) finally: fp.close()