# Copyright (C) 2003 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """A MemberAdaptor based on the Berkeley database wrapper for Python. Requires Python 2.2.2 or newer, and PyBSDDB3 4.1.3 or newer. """ # To use, put the following in a file called extend.py in the mailing list's # directory: # # from Mailman.BDBMemberAdaptor import extend # # that's it! import os import new import time import errno import struct import cPickle as pickle try: # Python 2.3 from bsddb import db except ImportError: # earlier Pythons from bsddb3 import db from Mailman import mm_cfg from Mailman import Utils from Mailman import Errors from Mailman import MemberAdaptor from Mailman.MailList import MailList from Mailman.Logging.Syslog import syslog STORAGE_VERSION = 'BA01' FMT = '>BHB' FMTSIZE = struct.calcsize(FMT) REGDELIV = 1 DIGDELIV = 2 REGFLAG = struct.pack('>B', REGDELIV) DIGFLAG = struct.pack('>B', DIGDELIV) # Positional arguments for _unpack() CPADDR = 0 PASSWD = 1 LANG = 2 NAME = 3 DIGEST = 4 OPTIONS = 5 STATUS = 6 class BDBMemberAdaptor(MemberAdaptor.MemberAdaptor): def __init__(self, mlist): self._mlist = mlist # metainfo -- {key -> value} # This table contains storage metadata information. The keys and # values are simple strings of variable length. Here are the # valid keys: # # version - the version of the database # # members -- {address | rec} # For all regular delivery members, this maps from the member's # key to their data record, which is a string concatenated of the # following: # # -- fixed data (as a packed struct) # + 1-byte digest or regular delivery flag # + 2-byte option flags # + 1-byte delivery status # -- variable data (as a pickle of a tuple) # + their case preserved address or '' # + their plaintext password # + their chosen language # + their realname or '' # # status -- {address | status+time} # Maps the member's key to their delivery status and change time. # These are passed as a tuple and are pickled for storage. # # topics -- {address | topicstrings} # Maps the member's key to their topic strings, concatenated and # separated by SEP # # bounceinfo -- {address | bounceinfo} # Maps the member's key to their bounceinfo, as a pickle # # Make sure the database directory exists path = os.path.join(mlist.fullpath(), 'member.db') exists = False try: os.mkdir(path, 02775) except OSError, e: if e.errno <> errno.EEXIST: raise exists = True # Create the environment self._env = env = db.DBEnv() if exists: # We must join an existing environment, otherwise we'll get # DB_RUNRECOVERY errors when the second process to open the # environment begins a transaction. I don't get it. env.open(path, db.DB_JOINENV) else: env.open(path, db.DB_CREATE | db.DB_RECOVER | db.DB_INIT_MPOOL | db.DB_INIT_TXN ) self._txn = None self._tables = [] self._metainfo = self._setupDB('metainfo') self._members = self._setupDB('members') self._status = self._setupDB('status') self._topics = self._setupDB('topics') self._bounceinfo = self._setupDB('bounceinfo') # Check the database version number version = self._metainfo.get('version') if version is None: # Initialize try: self.txn_begin() self._metainfo.put('version', STORAGE_VERSION, txn=self._txn) except: self.txn_abort() raise else: self.txn_commit() else: # Currently there's nothing to upgrade assert version == STORAGE_VERSION def _setupDB(self, name): d = db.DB(self._env) openflags = db.DB_CREATE # db 4.1 requires that databases be opened in a transaction. We'll # use auto commit, but only if that flag exists (i.e. we're using at # least db 4.1). try: openflags |= db.DB_AUTO_COMMIT except AttributeError: pass d.open(name, db.DB_BTREE, openflags) self._tables.append(d) return d def _close(self): self.txn_abort() for d in self._tables: d.close() # Checkpoint the database twice, as recommended by Sleepycat self._checkpoint() self._checkpoint() self._env.close() def _checkpoint(self): self._env.txn_checkpoint(0, 0, db.DB_FORCE) def txn_begin(self): assert self._txn is None self._txn = self._env.txn_begin() def txn_commit(self): assert self._txn is not None self._txn.commit() self._checkpoint() self._txn = None def txn_abort(self): if self._txn is not None: self._txn.abort() self._checkpoint() self._txn = None def _unpack(self, member): # Assume member is a LCE (i.e. lowercase key) rec = self._members.get(member.lower()) assert rec is not None fixed = struct.unpack(FMT, rec[:FMTSIZE]) vari = pickle.loads(rec[FMTSIZE:]) return vari + fixed def _pack(self, member, cpaddr, passwd, lang, name, digest, flags, status): # Assume member is a LCE (i.e. lowercase key) fixed = struct.pack(FMT, digest, flags, status) vari = pickle.dumps((cpaddr, passwd, lang, name)) self._members.put(member.lower(), fixed+vari, txn=self._txn) # MemberAdaptor writeable interface def addNewMember(self, member, **kws): assert self._mlist.Locked() # Make sure this address isn't already a member if self.isMember(member): raise Errors.MMAlreadyAMember, member # Parse the keywords digest = False password = Utils.MakeRandomPassword() language = self._mlist.preferred_language realname = None if kws.has_key('digest'): digest = kws['digest'] del kws['digest'] if kws.has_key('password'): password = kws['password'] del kws['password'] if kws.has_key('language'): language = kws['language'] del kws['language'] if kws.has_key('realname'): realname = kws['realname'] del kws['realname'] # Assert that no other keywords are present if kws: raise ValueError, kws.keys() # Should we store the case-preserved address? if Utils.LCDomain(member) == member.lower(): cpaddress = '' else: cpaddress = member # Calculate the realname if realname is None: realname = '' # Calculate the digest flag if digest: digest = DIGDELIV else: digest = REGDELIV self._pack(member.lower(), cpaddress, password, language, realname, digest, self._mlist.new_member_options, MemberAdaptor.ENABLED) def removeMember(self, member): txn = self._txn assert txn is not None assert self._mlist.Locked() self.__assertIsMember(member) key = member.lower() # Remove the table entries self._members.delete(key, txn=txn) if self._status.has_key(key): self._status.delete(key, txn=txn) if self._topics.has_key(key): self._topics.delete(key, txn=txn) if self._bounceinfo.has_key(key): self._bounceinfo.delete(key, txn=txn) def changeMemberAddress(self, member, newaddress, nodelete=0): assert self._mlist.Locked() self.__assertIsMember(member) okey = member.lower() nkey = newaddress.lower() txn = self._txn assert txn is not None # First, store a new member record, changing the case preserved addr. # Then delete the old record. cpaddr, passwd, lang, name, digest, flags, sts = self._unpack(okey) self._pack(nkey, newaddress, passwd, lang, name, digest, flags, sts) if not nodelete: self._members.delete(okey, txn) # Copy over the status times, topics, and bounce info, if present timestr = self._status.get(okey) if timestr is not None: self._status.put(nkey, timestr, txn=txn) if not nodelete: self._status.delete(okey, txn) topics = self._topics.get(okey) if topics is not None: self._topics.put(nkey, topics, txn=txn) if not nodelete: self._topics.delete(okey, txn) binfo = self._bounceinfo.get(nkey) if binfo is not None: self._binfo.put(nkey, binfo, txn=txn) if not nodelete: self._binfo.delete(okey, txn) def setMemberPassword(self, member, password): assert self._mlist.Locked() self.__assertIsMember(member) member = member.lower() cpaddr, oldpw, lang, name, digest, flags, status = self._unpack(member) self._pack(member, cpaddr, password, lang, name, digest, flags, status) def setMemberLanguage(self, member, language): assert self._mlist.Locked() self.__assertIsMember(member) member = member.lower() cpaddr, passwd, olang, name, digest, flags, sts = self._unpack(member) self._pack(member, cpaddr, passwd, language, name, digest, flags, sts) def setMemberOption(self, member, flag, value): assert self._mlist.Locked() self.__assertIsMember(member) member = member.lower() cpaddr, passwd, lang, name, digest, options, sts = self._unpack(member) # Sanity check for the digest flag if flag == mm_cfg.Digests: if value: # Be sure the list supports digest delivery if not self._mlist.digestable: raise Errors.CantDigestError digest = DIGDELIV else: # Be sure the list supports regular delivery if not self._mlist.nondigestable: raise Errors.MustDigestError # When toggling off digest delivery, we want to be sure to set # things up so that the user receives one last digest, # otherwise they may lose some email self._mlist.one_last_digest[member] = cpaddr digest = REGDELIV else: if value: options |= flag else: options &= ~flag self._pack(member, cpaddr, passwd, lang, name, digest, options, sts) def setMemberName(self, member, realname): assert self._mlist.Locked() self.__assertIsMember(member) member = member.lower() cpaddr, passwd, lang, oldname, digest, flags, sts = self._unpack( member) self._pack(member, cpaddr, passwd, lang, realname, digest, flags, sts) def setMemberTopics(self, member, topics): assert self._mlist.Locked() self.__assertIsMember(member) member = member.lower() if topics: self._topics.put(member, SEP.join(topics), txn=self._txn) elif self._topics.has_key(member): # No record is the same as no topics self._topics.delete(member, self._txn) def setDeliveryStatus(self, member, status): assert status in (MemberAdaptor.ENABLED, MemberAdaptor.UNKNOWN, MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN, MemberAdaptor.BYBOUNCE) assert self._mlist.Locked() self.__assertIsMember(member) if status == MemberAdaptor.ENABLED: # Enable by resetting their bounce info self.setBounceInfo(member, None) else: # Pickle up the status an the current time and store that in the # database. Use binary mode. data = pickle.dumps((status, time.time()), 1) self._status.put(member.lower(), data, txn=self._txn) def setBounceInfo(self, member, info): assert self._mlist.Locked() self.__assertIsMember(member) member = member.lower() if info is None: # This means to reset the bounce and delivery status information if self._bounceinfo.has_key(member): self._bounceinfo.delete(member, self._txn) if self._status.has_key(member): self._status.delete(member, self._txn) else: # Use binary mode data = pickle.dumps(info, 1) self._status.put(member, data, txn=self._txn) # The readable interface # BAW: It would be more efficient to simply return the iterator, but # modules like admin.py can't handle that yet. They requires lists. def getMembers(self): return list(_AllMembersIterator(self._members)) def getRegularMemberKeys(self): return list(_DeliveryMemberIterator(self._members, REGFLAG)) def getDigestMemberKeys(self): return list(_DeliveryMemberIterator(self._members, DIGFLAG)) def __assertIsMember(self, member): if not self.isMember(member): raise Errors.NotAMemberError, member def isMember(self, member): return self._members.has_key(member.lower()) def getMemberKey(self, member): self.__assertIsMember(member) return member.lower() def getMemberCPAddress(self, member): self.__assertIsMember(member) cpaddr = self._unpack(member)[CPADDR] if cpaddr: return cpaddr return member def getMemberCPAddresses(self, members): rtn = [] for member in members: member = member.lower() if self._members.has_key(member): rtn.append(self._unpack(member)[CPADDR]) else: rtn.append(None) return rtn def authenticateMember(self, member, response): self.__assertIsMember(member) passwd = self._unpack(member)[PASSWD] if passwd == response: return passwd return False def getMemberPassword(self, member): self.__assertIsMember(member) return self._unpack(member)[PASSWD] def getMemberLanguage(self, member): if not self.isMember(member): return self._mlist.preferred_language lang = self._unpack(member)[LANG] if lang in self._mlist.GetAvailableLanguages(): return lang return self._mlist.preferred_language def getMemberOption(self, member, flag): self.__assertIsMember(member) if flag == mm_cfg.Digests: return self._unpack(member)[DIGEST] == DIGDELIV options = self._unpack(member)[OPTIONS] return bool(options & flag) def getMemberName(self, member): self.__assertIsMember(member) name = self._unpack(member)[NAME] return name or None def getMemberTopics(self, member): self.__assertIsMember(member) topics = self._topics.get(member.lower(), '') if not topics: return [] return topics.split(SEP) def getDeliveryStatus(self, member): self.__assertIsMember(member) data = self._status.get(member.lower()) if data is None: return MemberAdaptor.ENABLED status, when = pickle.loads(data) return status def getDeliveryStatusChangeTime(self, member): self.__assertIsMember(member) data = self._status.get(member.lower()) if data is None: return 0 status, when = pickle.loads(data) return when # BAW: see above, re iterators def getDeliveryStatusMembers(self, status=(MemberAdaptor.UNKNOWN, MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN, MemberAdaptor.BYBOUNCE)): return list(_StatusMemberIterator(self._members, self._status, status)) def getBouncingMembers(self): return list(_BouncingMembersIterator(self._bounceinfo)) def getBounceInfo(self, member): self.__assertIsMember(member) return self._bounceinfo.get(member.lower()) class _MemberIterator: def __init__(self, table): self._table = table self._c = table.cursor() def __iter__(self): raise NotImplementedError def next(self): raise NotImplementedError def close(self): if self._c: self._c.close() self._c = None def __del__(self): self.close() class _AllMembersIterator(_MemberIterator): def __iter__(self): return _AllMembersIterator(self._table) def next(self): rec = self._c.next() if rec: return rec[0] self.close() raise StopIteration class _DeliveryMemberIterator(_MemberIterator): def __init__(self, table, flag): _MemberIterator.__init__(self, table) self._flag = flag def __iter__(self): return _DeliveryMemberIterator(self._table, self._flag) def next(self): rec = self._c.next() while rec: addr, data = rec if data[0] == self._flag: return addr rec = self._c.next() self.close() raise StopIteration class _StatusMemberIterator(_MemberIterator): def __init__(self, table, statustab, status): _MemberIterator.__init__(self, table) self._statustab = statustab self._status = status def __iter__(self): return _StatusMemberIterator(self._table, self._statustab, self._status) def next(self): rec = self._c.next() while rec: addr = rec[0] data = self._statustab.get(addr) if data is None: status = MemberAdaptor.ENABLED else: status, when = pickle.loads(data) if status in self._status: return addr rec = self._c.next() self.close() raise StopIteration class _BouncingMembersIterator(_MemberIterator): def __iter__(self): return _BouncingMembersIterator(self._table) def next(self): rec = self._c.next() if rec: return rec[0] self.close() raise StopIteration # For extend.py def fixlock(mlist): def Lock(self, timeout=0): MailList.Lock(self, timeout) try: self._memberadaptor.txn_begin() except: MailList.Unlock(self) raise mlist.Lock = new.instancemethod(Lock, mlist, MailList) def fixsave(mlist): def Save(self): self._memberadaptor.txn_commit() MailList.Save(self) mlist.Save = new.instancemethod(Save, mlist, MailList) def fixunlock(mlist): def Unlock(self): # It's fine to abort the transaction even if there isn't one in # process, say because the Save() already committed it self._memberadaptor.txn_abort() MailList.Unlock(self) mlist.Unlock = new.instancemethod(Unlock, mlist, MailList) def extend(mlist): mlist._memberadaptor = BDBMemberAdaptor(mlist) fixlock(mlist) fixsave(mlist) fixunlock(mlist) # To make sure we got everything, let's actually delete the # OldStyleMemberships dictionaries. Assume if it has one, it has all # attributes. try: del mlist.members del mlist.digest_members del mlist.passwords del mlist.language del mlist.user_options del mlist.usernames del mlist.topics_userinterest del mlist.delivery_status del mlist.bounce_info except AttributeError: pass # BAW: How can we ensure that the BDBMemberAdaptor is closed?