# Copyright (C) 2001-2007 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. """Old style Mailman membership adaptor. This adaptor gets and sets member information on the MailList object given to the constructor. It also equates member keys and lower-cased email addresses, i.e. KEY is LCE. This is the adaptor used by default in Mailman 2.1. """ import time from types import StringType from Mailman import mm_cfg from Mailman import Utils from Mailman import Errors from Mailman import MemberAdaptor ISREGULAR = 1 ISDIGEST = 2 # XXX check for bare access to mlist.members, mlist.digest_members, # mlist.user_options, mlist.passwords, mlist.topics_userinterest # XXX Fix Errors.MMAlreadyAMember and Errors.NotAMember # Actually, fix /all/ errors class OldStyleMemberships(MemberAdaptor.MemberAdaptor): def __init__(self, mlist): self.__mlist = mlist # # Read interface # def getMembers(self): return self.__mlist.members.keys() + self.__mlist.digest_members.keys() def getRegularMemberKeys(self): return self.__mlist.members.keys() def getDigestMemberKeys(self): return self.__mlist.digest_members.keys() def __get_cp_member(self, member): lcmember = member.lower() missing = [] val = self.__mlist.members.get(lcmember, missing) if val is not missing: if isinstance(val, StringType): return val, ISREGULAR else: return lcmember, ISREGULAR val = self.__mlist.digest_members.get(lcmember, missing) if val is not missing: if isinstance(val, StringType): return val, ISDIGEST else: return lcmember, ISDIGEST return None, None def isMember(self, member): cpaddr, where = self.__get_cp_member(member) if cpaddr is not None: return 1 return 0 def getMemberKey(self, member): cpaddr, where = self.__get_cp_member(member) if cpaddr is None: raise Errors.NotAMemberError, member return member.lower() def getMemberCPAddress(self, member): cpaddr, where = self.__get_cp_member(member) if cpaddr is None: raise Errors.NotAMemberError, member return cpaddr def getMemberCPAddresses(self, members): return [self.__get_cp_member(member)[0] for member in members] def getMemberPassword(self, member): secret = self.__mlist.passwords.get(member.lower()) if secret is None: raise Errors.NotAMemberError, member return secret def authenticateMember(self, member, response): secret = self.getMemberPassword(member) if secret == response: return secret return 0 def __assertIsMember(self, member): if not self.isMember(member): raise Errors.NotAMemberError, member def getMemberLanguage(self, member): lang = self.__mlist.language.get( member.lower(), self.__mlist.preferred_language) if lang in self.__mlist.GetAvailableLanguages(): return lang return self.__mlist.preferred_language def getMemberOption(self, member, flag): self.__assertIsMember(member) if flag == mm_cfg.Digests: cpaddr, where = self.__get_cp_member(member) return where == ISDIGEST option = self.__mlist.user_options.get(member.lower(), 0) return not not (option & flag) def getMemberName(self, member): self.__assertIsMember(member) return self.__mlist.usernames.get(member.lower()) def getMemberTopics(self, member): self.__assertIsMember(member) return self.__mlist.topics_userinterest.get(member.lower(), []) def getDeliveryStatus(self, member): self.__assertIsMember(member) return self.__mlist.delivery_status.get( member.lower(), # Values are tuples, so the default should also be a tuple. The # second item will be ignored. (MemberAdaptor.ENABLED, 0))[0] def getDeliveryStatusChangeTime(self, member): self.__assertIsMember(member) return self.__mlist.delivery_status.get( member.lower(), # Values are tuples, so the default should also be a tuple. The # second item will be ignored. (MemberAdaptor.ENABLED, 0))[1] def getDeliveryStatusMembers(self, status=(MemberAdaptor.UNKNOWN, MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN, MemberAdaptor.BYBOUNCE)): return [member for member in self.getMembers() if self.getDeliveryStatus(member) in status] def getBouncingMembers(self): return [member.lower() for member in self.__mlist.bounce_info.keys()] def getBounceInfo(self, member): self.__assertIsMember(member) return self.__mlist.bounce_info.get(member.lower()) # # Write interface # def addNewMember(self, member, **kws): assert self.__mlist.Locked() # Make sure this address isn't already a member if self.isMember(member): raise Errors.MMAlreadyAMember, member # Parse the keywords digest = 0 password = Utils.MakeRandomPassword() language = self.__mlist.preferred_language realname = None if kws.has_key('digest'): digest = kws['digest'] del kws['digest'] if kws.has_key('password'): password = kws['password'] del kws['password'] if kws.has_key('language'): language = kws['language'] del kws['language'] if kws.has_key('realname'): realname = kws['realname'] del kws['realname'] # Assert that no other keywords are present if kws: raise ValueError, kws.keys() # If the localpart has uppercase letters in it, then the value in the # members (or digest_members) dict is the case preserved address. # Otherwise the value is 0. Note that the case of the domain part is # of course ignored. if Utils.LCDomain(member) == member.lower(): value = 0 else: value = member member = member.lower() if digest: self.__mlist.digest_members[member] = value else: self.__mlist.members[member] = value self.setMemberPassword(member, password) self.setMemberLanguage(member, language) if realname: self.setMemberName(member, realname) # Set the member's default set of options if self.__mlist.new_member_options: self.__mlist.user_options[member] = self.__mlist.new_member_options def removeMember(self, member): assert self.__mlist.Locked() self.__assertIsMember(member) # Delete the appropriate entries from the various MailList attributes. # Remember that not all of them will have an entry (only those with # values different than the default). memberkey = member.lower() for attr in ('passwords', 'user_options', 'members', 'digest_members', 'language', 'topics_userinterest', 'usernames', 'bounce_info', 'delivery_status', ): dict = getattr(self.__mlist, attr) if dict.has_key(memberkey): del dict[memberkey] def changeMemberAddress(self, member, newaddress, nodelete=0): assert self.__mlist.Locked() # Make sure the old address is a member. Assertions that the new # address is not already a member is done by addNewMember() below. self.__assertIsMember(member) # Get the old values memberkey = member.lower() fullname = self.getMemberName(memberkey) flags = self.__mlist.user_options.get(memberkey, 0) digestsp = self.getMemberOption(memberkey, mm_cfg.Digests) password = self.__mlist.passwords.get(memberkey, Utils.MakeRandomPassword()) lang = self.getMemberLanguage(memberkey) delivery = self.__mlist.delivery_status.get(member.lower(), (MemberAdaptor.ENABLED,0)) # First, possibly delete the old member if not nodelete: self.removeMember(memberkey) # Now, add the new member self.addNewMember(newaddress, realname=fullname, digest=digestsp, password=password, language=lang) # Set the entire options bitfield if flags: self.__mlist.user_options[newaddress.lower()] = flags # If this is a straightforward address change, i.e. nodelete = 0, # preserve the delivery status and time if BYUSER or BYADMIN if delivery[0] in (MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN)\ and not nodelete: self.__mlist.delivery_status[newaddress.lower()] = delivery def setMemberPassword(self, memberkey, password): assert self.__mlist.Locked() self.__assertIsMember(memberkey) self.__mlist.passwords[memberkey.lower()] = password def setMemberLanguage(self, memberkey, language): assert self.__mlist.Locked() self.__assertIsMember(memberkey) self.__mlist.language[memberkey.lower()] = language def setMemberOption(self, member, flag, value): assert self.__mlist.Locked() self.__assertIsMember(member) memberkey = member.lower() # There's one extra gotcha we have to deal with. If the user is # toggling the Digests flag, then we need to move their entry from # mlist.members to mlist.digest_members or vice versa. Blarg. Do # this before the flag setting below in case it fails. if flag == mm_cfg.Digests: if value: # Be sure the list supports digest delivery if not self.__mlist.digestable: raise Errors.CantDigestError # The user is turning on digest mode if self.__mlist.digest_members.has_key(memberkey): raise Errors.AlreadyReceivingDigests, member cpuser = self.__mlist.members.get(memberkey) if cpuser is None: raise Errors.NotAMemberError, member del self.__mlist.members[memberkey] self.__mlist.digest_members[memberkey] = cpuser else: # Be sure the list supports regular delivery if not self.__mlist.nondigestable: raise Errors.MustDigestError # The user is turning off digest mode if self.__mlist.members.has_key(memberkey): raise Errors.AlreadyReceivingRegularDeliveries, member cpuser = self.__mlist.digest_members.get(memberkey) if cpuser is None: raise Errors.NotAMemberError, member del self.__mlist.digest_members[memberkey] self.__mlist.members[memberkey] = cpuser # When toggling off digest delivery, we want to be sure to set # things up so that the user receives one last digest, # otherwise they may lose some email self.__mlist.one_last_digest[memberkey] = cpuser # We don't need to touch user_options because the digest state # isn't kept as a bitfield flag. return # This is a bit kludgey because the semantics are that if the user has # no options set (i.e. the value would be 0), then they have no entry # in the user_options dict. We use setdefault() here, and then del # the entry below just to make things (questionably) cleaner. self.__mlist.user_options.setdefault(memberkey, 0) if value: self.__mlist.user_options[memberkey] |= flag else: self.__mlist.user_options[memberkey] &= ~flag if not self.__mlist.user_options[memberkey]: del self.__mlist.user_options[memberkey] def setMemberName(self, member, realname): assert self.__mlist.Locked() self.__assertIsMember(member) self.__mlist.usernames[member.lower()] = realname def setMemberTopics(self, member, topics): assert self.__mlist.Locked() self.__assertIsMember(member) memberkey = member.lower() if topics: self.__mlist.topics_userinterest[memberkey] = topics # if topics is empty, then delete the entry in this dictionary elif self.__mlist.topics_userinterest.has_key(memberkey): del self.__mlist.topics_userinterest[memberkey] def setDeliveryStatus(self, member, status): assert status in (MemberAdaptor.ENABLED, MemberAdaptor.UNKNOWN, MemberAdaptor.BYUSER, MemberAdaptor.BYADMIN, MemberAdaptor.BYBOUNCE) assert self.__mlist.Locked() self.__assertIsMember(member) member = member.lower() if status == MemberAdaptor.ENABLED: # Enable by resetting their bounce info. self.setBounceInfo(member, None) else: self.__mlist.delivery_status[member] = (status, time.time()) def setBounceInfo(self, member, info): assert self.__mlist.Locked() self.__assertIsMember(member) member = member.lower() if info is None: if self.__mlist.bounce_info.has_key(member): del self.__mlist.bounce_info[member] if self.__mlist.delivery_status.has_key(member): del self.__mlist.delivery_status[member] else: self.__mlist.bounce_info[member] = info