aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman
diff options
context:
space:
mode:
Diffstat (limited to 'Mailman')
-rw-r--r--Mailman/BDBMemberAdaptor.py637
-rw-r--r--Mailman/Queue/RetryRunner.py46
2 files changed, 683 insertions, 0 deletions
diff --git a/Mailman/BDBMemberAdaptor.py b/Mailman/BDBMemberAdaptor.py
new file mode 100644
index 00000000..589be626
--- /dev/null
+++ b/Mailman/BDBMemberAdaptor.py
@@ -0,0 +1,637 @@
+# Copyright (C) 2003 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""A MemberAdaptor based on the Berkeley database wrapper for Python.
+
+Requires Python 2.2.2 or newer, and PyBSDDB3 4.1.3 or newer.
+"""
+
+# To use, put the following in a file called extend.py in the mailing list's
+# directory:
+#
+# from Mailman.BDBMemberAdaptor import extend
+#
+# that's it!
+
+import os
+import new
+import time
+import errno
+import struct
+import cPickle as pickle
+
+try:
+ # Python 2.3
+ from bsddb import db
+except ImportError:
+ # earlier Pythons
+ from bsddb3 import db
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import Errors
+from Mailman import MemberAdaptor
+from Mailman.MailList import MailList
+from Mailman.Logging.Syslog import syslog
+
+STORAGE_VERSION = 'BA01'
+FMT = '>BHB'
+FMTSIZE = struct.calcsize(FMT)
+
+REGDELIV = 1
+DIGDELIV = 2
+REGFLAG = struct.pack('>B', REGDELIV)
+DIGFLAG = struct.pack('>B', DIGDELIV)
+
+# Positional arguments for _unpack()
+CPADDR = 0
+PASSWD = 1
+LANG = 2
+NAME = 3
+DIGEST = 4
+OPTIONS = 5
+STATUS = 6
+
+
+
+class BDBMemberAdaptor(MemberAdaptor.MemberAdaptor):
+ def __init__(self, mlist):
+ self._mlist = mlist
+ # metainfo -- {key -> value}
+ # This table contains storage metadata information. The keys and
+ # values are simple strings of variable length. Here are the
+ # valid keys:
+ #
+ # version - the version of the database
+ #
+ # members -- {address | rec}
+ # For all regular delivery members, this maps from the member's
+ # key to their data record, which is a string concatenated of the
+ # following:
+ #
+ # -- fixed data (as a packed struct)
+ # + 1-byte digest or regular delivery flag
+ # + 2-byte option flags
+ # + 1-byte delivery status
+ # -- variable data (as a pickle of a tuple)
+ # + their case preserved address or ''
+ # + their plaintext password
+ # + their chosen language
+ # + their realname or ''
+ #
+ # status -- {address | status+time}
+ # Maps the member's key to their delivery status and change time.
+ # These are passed as a tuple and are pickled for storage.
+ #
+ # topics -- {address | topicstrings}
+ # Maps the member's key to their topic strings, concatenated and
+ # separated by SEP
+ #
+ # bounceinfo -- {address | bounceinfo}
+ # Maps the member's key to their bounceinfo, as a pickle
+ #
+ # Make sure the database directory exists
+ path = os.path.join(mlist.fullpath(), 'member.db')
+ exists = False
+ try:
+ os.mkdir(path, 02775)
+ except OSError, e:
+ if e.errno <> errno.EEXIST: raise
+ exists = True
+ # Create the environment
+ self._env = env = db.DBEnv()
+ if exists:
+ # We must join an existing environment, otherwise we'll get
+ # DB_RUNRECOVERY errors when the second process to open the
+ # environment begins a transaction. I don't get it.
+ env.open(path, db.DB_JOINENV)
+ else:
+ env.open(path,
+ db.DB_CREATE |
+ db.DB_RECOVER |
+ db.DB_INIT_MPOOL |
+ db.DB_INIT_TXN
+ )
+ self._txn = None
+ self._tables = []
+ self._metainfo = self._setupDB('metainfo')
+ self._members = self._setupDB('members')
+ self._status = self._setupDB('status')
+ self._topics = self._setupDB('topics')
+ self._bounceinfo = self._setupDB('bounceinfo')
+ # Check the database version number
+ version = self._metainfo.get('version')
+ if version is None:
+ # Initialize
+ try:
+ self.txn_begin()
+ self._metainfo.put('version', STORAGE_VERSION, txn=self._txn)
+ except:
+ self.txn_abort()
+ raise
+ else:
+ self.txn_commit()
+ else:
+ # Currently there's nothing to upgrade
+ assert version == STORAGE_VERSION
+
+ def _setupDB(self, name):
+ d = db.DB(self._env)
+ openflags = db.DB_CREATE
+ # db 4.1 requires that databases be opened in a transaction. We'll
+ # use auto commit, but only if that flag exists (i.e. we're using at
+ # least db 4.1).
+ try:
+ openflags |= db.DB_AUTO_COMMIT
+ except AttributeError:
+ pass
+ d.open(name, db.DB_BTREE, openflags)
+ self._tables.append(d)
+ return d
+
+ def _close(self):
+ self.txn_abort()
+ for d in self._tables:
+ d.close()
+ # Checkpoint the database twice, as recommended by Sleepycat
+ self._checkpoint()
+ self._checkpoint()
+ self._env.close()
+
+ def _checkpoint(self):
+ self._env.txn_checkpoint(0, 0, db.DB_FORCE)
+
+ def txn_begin(self):
+ assert self._txn is None
+ self._txn = self._env.txn_begin()
+
+ def txn_commit(self):
+ assert self._txn is not None
+ self._txn.commit()
+ self._checkpoint()
+ self._txn = None
+
+ def txn_abort(self):
+ if self._txn is not None:
+ self._txn.abort()
+ self._checkpoint()
+ self._txn = None
+
+ def _unpack(self, member):
+ # Assume member is a LCE (i.e. lowercase key)
+ rec = self._members.get(member.lower())
+ assert rec is not None
+ fixed = struct.unpack(FMT, rec[:FMTSIZE])
+ vari = pickle.loads(rec[FMTSIZE:])
+ return vari + fixed
+
+ def _pack(self, member, cpaddr, passwd, lang, name, digest, flags, status):
+ # Assume member is a LCE (i.e. lowercase key)
+ fixed = struct.pack(FMT, digest, flags, status)
+ vari = pickle.dumps((cpaddr, passwd, lang, name))
+ self._members.put(member.lower(), fixed+vari, txn=self._txn)
+
+ # MemberAdaptor writeable interface
+
+ def addNewMember(self, member, **kws):
+ assert self._mlist.Locked()
+ # Make sure this address isn't already a member
+ if self.isMember(member):
+ raise Errors.MMAlreadyAMember, member
+ # Parse the keywords
+ digest = False
+ password = Utils.MakeRandomPassword()
+ language = self._mlist.preferred_language
+ realname = None
+ if kws.has_key('digest'):
+ digest = kws['digest']
+ del kws['digest']
+ if kws.has_key('password'):
+ password = kws['password']
+ del kws['password']
+ if kws.has_key('language'):
+ language = kws['language']
+ del kws['language']
+ if kws.has_key('realname'):
+ realname = kws['realname']
+ del kws['realname']
+ # Assert that no other keywords are present
+ if kws:
+ raise ValueError, kws.keys()
+ # Should we store the case-preserved address?
+ if Utils.LCDomain(member) == member.lower():
+ cpaddress = ''
+ else:
+ cpaddress = member
+ # Calculate the realname
+ if realname is None:
+ realname = ''
+ # Calculate the digest flag
+ if digest:
+ digest = DIGDELIV
+ else:
+ digest = REGDELIV
+ self._pack(member.lower(),
+ cpaddress, password, language, realname,
+ digest, self._mlist.new_member_options,
+ MemberAdaptor.ENABLED)
+
+ def removeMember(self, member):
+ txn = self._txn
+ assert txn is not None
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ key = member.lower()
+ # Remove the table entries
+ self._members.delete(key, txn=txn)
+ if self._status.has_key(key):
+ self._status.delete(key, txn=txn)
+ if self._topics.has_key(key):
+ self._topics.delete(key, txn=txn)
+ if self._bounceinfo.has_key(key):
+ self._bounceinfo.delete(key, txn=txn)
+
+ def changeMemberAddress(self, member, newaddress, nodelete=0):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ okey = member.lower()
+ nkey = newaddress.lower()
+ txn = self._txn
+ assert txn is not None
+ # First, store a new member record, changing the case preserved addr.
+ # Then delete the old record.
+ cpaddr, passwd, lang, name, digest, flags, sts = self._unpack(okey)
+ self._pack(nkey, newaddress, passwd, lang, name, digest, flags, sts)
+ if not nodelete:
+ self._members.delete(okey, txn)
+ # Copy over the status times, topics, and bounce info, if present
+ timestr = self._status.get(okey)
+ if timestr is not None:
+ self._status.put(nkey, timestr, txn=txn)
+ if not nodelete:
+ self._status.delete(okey, txn)
+ topics = self._topics.get(okey)
+ if topics is not None:
+ self._topics.put(nkey, topics, txn=txn)
+ if not nodelete:
+ self._topics.delete(okey, txn)
+ binfo = self._bounceinfo.get(nkey)
+ if binfo is not None:
+ self._binfo.put(nkey, binfo, txn=txn)
+ if not nodelete:
+ self._binfo.delete(okey, txn)
+
+ def setMemberPassword(self, member, password):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ member = member.lower()
+ cpaddr, oldpw, lang, name, digest, flags, status = self._unpack(member)
+ self._pack(member, cpaddr, password, lang, name, digest, flags, status)
+
+ def setMemberLanguage(self, member, language):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ member = member.lower()
+ cpaddr, passwd, olang, name, digest, flags, sts = self._unpack(member)
+ self._pack(member, cpaddr, passwd, language, name, digest, flags, sts)
+
+ def setMemberOption(self, member, flag, value):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ member = member.lower()
+ cpaddr, passwd, lang, name, digest, options, sts = self._unpack(member)
+ # Sanity check for the digest flag
+ if flag == mm_cfg.Digests:
+ if value:
+ # Be sure the list supports digest delivery
+ if not self._mlist.digestable:
+ raise Errors.CantDigestError
+ digest = DIGDELIV
+ else:
+ # Be sure the list supports regular delivery
+ if not self._mlist.nondigestable:
+ raise Errors.MustDigestError
+ # When toggling off digest delivery, we want to be sure to set
+ # things up so that the user receives one last digest,
+ # otherwise they may lose some email
+ self._mlist.one_last_digest[member] = cpaddr
+ digest = REGDELIV
+ else:
+ if value:
+ options |= flag
+ else:
+ options &= ~flag
+ self._pack(member, cpaddr, passwd, lang, name, digest, options, sts)
+
+ def setMemberName(self, member, realname):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ member = member.lower()
+ cpaddr, passwd, lang, oldname, digest, flags, sts = self._unpack(
+ member)
+ self._pack(member, cpaddr, passwd, lang, realname, digest, flags, sts)
+
+ def setMemberTopics(self, member, topics):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ member = member.lower()
+ if topics:
+ self._topics.put(member, SEP.join(topics), txn=self._txn)
+ elif self._topics.has_key(member):
+ # No record is the same as no topics
+ self._topics.delete(member, self._txn)
+
+ def setDeliveryStatus(self, member, status):
+ assert status in (MemberAdaptor.ENABLED, MemberAdaptor.UNKNOWN,
+ MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN,
+ MemberAdaptor.BYBOUNCE)
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ if status == MemberAdaptor.ENABLED:
+ # Enable by resetting their bounce info
+ self.setBounceInfo(member, None)
+ else:
+ # Pickle up the status an the current time and store that in the
+ # database. Use binary mode.
+ data = pickle.dumps((status, time.time()), 1)
+ self._status.put(member.lower(), data, txn=self._txn)
+
+ def setBounceInfo(self, member, info):
+ assert self._mlist.Locked()
+ self.__assertIsMember(member)
+ member = member.lower()
+ if info is None:
+ # This means to reset the bounce and delivery status information
+ if self._bounceinfo.has_key(member):
+ self._bounceinfo.delete(member, self._txn)
+ if self._status.has_key(member):
+ self._status.delete(member, self._txn)
+ else:
+ # Use binary mode
+ data = pickle.dumps(info, 1)
+ self._status.put(member, data, txn=self._txn)
+
+ # The readable interface
+
+ # BAW: It would be more efficient to simply return the iterator, but
+ # modules like admin.py can't handle that yet. They requires lists.
+ def getMembers(self):
+ return list(_AllMembersIterator(self._members))
+
+ def getRegularMemberKeys(self):
+ return list(_DeliveryMemberIterator(self._members, REGFLAG))
+
+ def getDigestMemberKeys(self):
+ return list(_DeliveryMemberIterator(self._members, DIGFLAG))
+
+ def __assertIsMember(self, member):
+ if not self.isMember(member):
+ raise Errors.NotAMemberError, member
+
+ def isMember(self, member):
+ return self._members.has_key(member.lower())
+
+ def getMemberKey(self, member):
+ self.__assertIsMember(member)
+ return member.lower()
+
+ def getMemberCPAddress(self, member):
+ self.__assertIsMember(member)
+ cpaddr = self._unpack(member)[CPADDR]
+ if cpaddr:
+ return cpaddr
+ return member
+
+ def getMemberCPAddresses(self, members):
+ rtn = []
+ for member in members:
+ member = member.lower()
+ if self._members.has_key(member):
+ rtn.append(self._unpack(member)[CPADDR])
+ else:
+ rtn.append(None)
+ return rtn
+
+ def authenticateMember(self, member, response):
+ self.__assertIsMember(member)
+ passwd = self._unpack(member)[PASSWD]
+ if passwd == response:
+ return passwd
+ return False
+
+ def getMemberPassword(self, member):
+ self.__assertIsMember(member)
+ return self._unpack(member)[PASSWD]
+
+ def getMemberLanguage(self, member):
+ if not self.isMember(member):
+ return self._mlist.preferred_language
+ lang = self._unpack(member)[LANG]
+ if lang in self._mlist.GetAvailableLanguages():
+ return lang
+ return self._mlist.preferred_language
+
+ def getMemberOption(self, member, flag):
+ self.__assertIsMember(member)
+ if flag == mm_cfg.Digests:
+ return self._unpack(member)[DIGEST] == DIGDELIV
+ options = self._unpack(member)[OPTIONS]
+ return bool(options & flag)
+
+ def getMemberName(self, member):
+ self.__assertIsMember(member)
+ name = self._unpack(member)[NAME]
+ return name or None
+
+ def getMemberTopics(self, member):
+ self.__assertIsMember(member)
+ topics = self._topics.get(member.lower(), '')
+ if not topics:
+ return []
+ return topics.split(SEP)
+
+ def getDeliveryStatus(self, member):
+ self.__assertIsMember(member)
+ data = self._status.get(member.lower())
+ if data is None:
+ return MemberAdaptor.ENABLED
+ status, when = pickle.loads(data)
+ return status
+
+ def getDeliveryStatusChangeTime(self, member):
+ self.__assertIsMember(member)
+ data = self._status.get(member.lower())
+ if data is None:
+ return 0
+ status, when = pickle.loads(data)
+ return when
+
+ # BAW: see above, re iterators
+ def getDeliveryStatusMembers(self, status=(MemberAdaptor.UNKNOWN,
+ MemberAdaptor.BYUSER,
+ MemberAdaptor.BYADMIN,
+ MemberAdaptor.BYBOUNCE)):
+ return list(_StatusMemberIterator(self._members, self._status, status))
+
+ def getBouncingMembers(self):
+ return list(_BouncingMembersIterator(self._bounceinfo))
+
+ def getBounceInfo(self, member):
+ self.__assertIsMember(member)
+ return self._bounceinfo.get(member.lower())
+
+
+
+class _MemberIterator:
+ def __init__(self, table):
+ self._table = table
+ self._c = table.cursor()
+
+ def __iter__(self):
+ raise NotImplementedError
+
+ def next(self):
+ raise NotImplementedError
+
+ def close(self):
+ if self._c:
+ self._c.close()
+ self._c = None
+
+ def __del__(self):
+ self.close()
+
+
+class _AllMembersIterator(_MemberIterator):
+ def __iter__(self):
+ return _AllMembersIterator(self._table)
+
+ def next(self):
+ rec = self._c.next()
+ if rec:
+ return rec[0]
+ self.close()
+ raise StopIteration
+
+
+class _DeliveryMemberIterator(_MemberIterator):
+ def __init__(self, table, flag):
+ _MemberIterator.__init__(self, table)
+ self._flag = flag
+
+ def __iter__(self):
+ return _DeliveryMemberIterator(self._table, self._flag)
+
+ def next(self):
+ rec = self._c.next()
+ while rec:
+ addr, data = rec
+ if data[0] == self._flag:
+ return addr
+ rec = self._c.next()
+ self.close()
+ raise StopIteration
+
+
+class _StatusMemberIterator(_MemberIterator):
+ def __init__(self, table, statustab, status):
+ _MemberIterator.__init__(self, table)
+ self._statustab = statustab
+ self._status = status
+
+ def __iter__(self):
+ return _StatusMemberIterator(self._table,
+ self._statustab,
+ self._status)
+
+ def next(self):
+ rec = self._c.next()
+ while rec:
+ addr = rec[0]
+ data = self._statustab.get(addr)
+ if data is None:
+ status = MemberAdaptor.ENABLED
+ else:
+ status, when = pickle.loads(data)
+ if status in self._status:
+ return addr
+ rec = self._c.next()
+ self.close()
+ raise StopIteration
+
+
+class _BouncingMembersIterator(_MemberIterator):
+ def __iter__(self):
+ return _BouncingMembersIterator(self._table)
+
+ def next(self):
+ rec = self._c.next()
+ if rec:
+ return rec[0]
+ self.close()
+ raise StopIteration
+
+
+
+# For extend.py
+def fixlock(mlist):
+ def Lock(self, timeout=0):
+ MailList.Lock(self, timeout)
+ try:
+ self._memberadaptor.txn_begin()
+ except:
+ MailList.Unlock(self)
+ raise
+ mlist.Lock = new.instancemethod(Lock, mlist, MailList)
+
+
+def fixsave(mlist):
+ def Save(self):
+ self._memberadaptor.txn_commit()
+ MailList.Save(self)
+ mlist.Save = new.instancemethod(Save, mlist, MailList)
+
+
+def fixunlock(mlist):
+ def Unlock(self):
+ # It's fine to abort the transaction even if there isn't one in
+ # process, say because the Save() already committed it
+ self._memberadaptor.txn_abort()
+ MailList.Unlock(self)
+ mlist.Unlock = new.instancemethod(Unlock, mlist, MailList)
+
+
+def extend(mlist):
+ mlist._memberadaptor = BDBMemberAdaptor(mlist)
+ fixlock(mlist)
+ fixsave(mlist)
+ fixunlock(mlist)
+ # To make sure we got everything, let's actually delete the
+ # OldStyleMemberships dictionaries. Assume if it has one, it has all
+ # attributes.
+ try:
+ del mlist.members
+ del mlist.digest_members
+ del mlist.passwords
+ del mlist.language
+ del mlist.user_options
+ del mlist.usernames
+ del mlist.topics_userinterest
+ del mlist.delivery_status
+ del mlist.bounce_info
+ except AttributeError:
+ pass
+ # BAW: How can we ensure that the BDBMemberAdaptor is closed?
diff --git a/Mailman/Queue/RetryRunner.py b/Mailman/Queue/RetryRunner.py
new file mode 100644
index 00000000..d1983848
--- /dev/null
+++ b/Mailman/Queue/RetryRunner.py
@@ -0,0 +1,46 @@
+# Copyright (C) 2003 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+import time
+
+from Mailman import mm_cfg
+from Mailman.Queue.Runner import Runner
+from Mailman.Queue.Switchboard import Switchboard
+
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
+
+
+class RetryRunner(Runner):
+ QDIR = mm_cfg.RETRYQUEUE_DIR
+ SLEEPTIME = mm_cfg.minutes(15)
+
+ def __init__(self, slice=None, numslices=1):
+ Runner.__init__(self, slice, numslices)
+ self.__outq = Switchboard(mm_cfg.OUTQUEUE_DIR)
+
+ def _dispose(self, mlist, msg, msgdata):
+ # Move it to the out queue for another retry
+ self.__outq.enqueue(msg, msgdata)
+ return False
+
+ def _snooze(self, filecnt):
+ # We always want to snooze
+ time.sleep(self.SLEEPTIME)