From 491e5794fea450235f0bab35bcd4862e0d4195e9 Mon Sep 17 00:00:00 2001 From: Mark Sapiro Date: Wed, 2 Jan 2008 20:25:11 -0800 Subject: Switchboard.py - Moved the counting of the number of recoveries of a .bak file from the dequeue() method to the recover_backup_files() method in order to minimize added i/o. --- Mailman/Queue/Switchboard.py | 60 ++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 895d6df2..a774c580 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2001-2007 by the Free Software Foundation, Inc. +# Copyright (C) 2001-2008 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -63,10 +63,9 @@ SAVE_MSGS_AS_PICKLES = True # Small increment to add to time in case two entries have the same time. This # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 -# We count the number of times a file has been dequeued. This can be more -# than one if the file has been moved to .bak and recovered. In order to -# prevent loops and a message flood, when the count reaches this value, we -# move the file to the shunt queue as a .psv. +# We count the number of times a file has been moved to .bak and recovered. +# In order to prevent loops and a message flood, when the count reaches this +# value, we move the file to the shunt queue as a .psv. MAX_BAK_COUNT = 3 @@ -152,34 +151,16 @@ class Switchboard: filename = os.path.join(self.__whichq, filebase + '.pck') backfile = os.path.join(self.__whichq, filebase + '.bak') # Read the message object and metadata. - fp = open(filename, 'rb+') + fp = open(filename) # Move the file to the backup file name for processing. If this # process crashes uncleanly the .bak file will be used to re-instate - # the .pck file in order to try again. We keep count in _bak_count - # in the metadata which we rewrite of the number of times we recover - # and dequeue this file. When the count reaches MAX_BAK_COUNT, we - # move the .bak file to a .psv file in the shunt queue. + # the .pck file in order to try again. os.rename(filename, backfile) try: msg = cPickle.load(fp) - data_pos = fp.tell() data = cPickle.load(fp) - data['_bak_count'] = data.setdefault('_bak_count', 0) + 1 - fp.seek(data_pos) - if data.get('_parsemsg'): - protocol = 0 - else: - protocol = 1 - cPickle.dump(data, fp, protocol) - fp.truncate() - fp.flush() - os.fsync(fp.fileno()) finally: fp.close() - if data['_bak_count'] >= MAX_BAK_COUNT: - syslog('error', '.bak file max count, preserving file: %s', - filebase) - self.finish(filebase, preserve=True) if data.get('_parsemsg'): msg = email.message_from_string(msg, Message.Message) return msg, data @@ -233,8 +214,33 @@ class Switchboard: def recover_backup_files(self): # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our - # normal dequeuing process will handle them. + # normal dequeuing process will handle them. We keep count in + # _bak_count in the metadata of the number of times we recover this + # file. When the count reaches MAX_BAK_COUNT, we move the .bak file + # to a .psv file in the shunt queue. for filebase in self.files('.bak'): src = os.path.join(self.__whichq, filebase + '.bak') dst = os.path.join(self.__whichq, filebase + '.pck') - os.rename(src, dst) + fp = open(src, 'rb+') + try: + msg = cPickle.load(fp) + data_pos = fp.tell() + data = cPickle.load(fp) + data['_bak_count'] = data.setdefault('_bak_count', 0) + 1 + fp.seek(data_pos) + if data.get('_parsemsg'): + protocol = 0 + else: + protocol = 1 + cPickle.dump(data, fp, protocol) + fp.truncate() + fp.flush() + os.fsync(fp.fileno()) + finally: + fp.close() + if data['_bak_count'] >= MAX_BAK_COUNT: + syslog('error', '.bak file max count, preserving file: %s', + filebase) + self.finish(filebase, preserve=True) + else: + os.rename(src, dst) -- cgit v1.2.3