# Copyright (C) 1998-2016 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. """Do more detailed spam detection. This module hard codes site wide spam detection. By hacking the KNOWN_SPAMMERS variable, you can set up more regular expression matches against message headers. If spam is detected the message is discarded immediately. TBD: This needs to be made more configurable and robust. """ import re from unicodedata import normalize from email.Errors import HeaderParseError from email.Header import decode_header from email.Utils import parseaddr from Mailman import mm_cfg from Mailman import Errors from Mailman import i18n from Mailman import Utils from Mailman.Handlers.Hold import hold_for_approval from Mailman.Logging.Syslog import syslog try: True, False except NameError: True = 1 False = 0 # First, play footsie with _ so that the following are marked as translated, # but aren't actually translated until we need the text later on. def _(s): return s class SpamDetected(Errors.DiscardMessage): """The message contains known spam""" class HeaderMatchHold(Errors.HoldMessage): def __init__(self, pattern): self.__pattern = pattern def reason_notice(self): pattern = self.__pattern return _('Header matched regexp: %(pattern)s') # And reset the translator _ = i18n._ def getDecodedHeaders(msg, cset='utf-8'): """Returns a unicode containing all the headers of msg, unfolded and RFC 2047 decoded, normalized and separated by new lines. """ headers = u'' for h, v in msg.items(): uvalue = u'' try: v = decode_header(re.sub('\n\s', ' ', v)) except HeaderParseError: v = [(v, 'us-ascii')] for frag, cs in v: if not cs: cs = 'us-ascii' try: uvalue += unicode(frag, cs, 'replace') except LookupError: # The encoding charset is unknown. At this point, frag # has been QP or base64 decoded into a byte string whose # charset we don't know how to handle. We will try to # unicode it as iso-8859-1 which may result in a garbled # mess, but we have to do something. uvalue += unicode(frag, 'iso-8859-1', 'replace') uhdr = h.decode('us-ascii', 'replace') headers += u'%s: %s\n' % (h, normalize(mm_cfg.NORMALIZE_FORM, uvalue)) return headers def process(mlist, msg, msgdata): # Before anything else, check DMARC if necessary. We do this as early # as possible so reject/discard actions trump other holds/approvals and # wrap/munge actions get flagged even for approved messages. # But not for owner mail which should not be subject to DMARC reject or # discard actions. if not msgdata.get('toowner'): msgdata['from_is_list'] = 0 dn, addr = parseaddr(msg.get('from')) if addr and mlist.dmarc_moderation_action > 0: if Utils.IsDMARCProhibited(mlist, addr): # Note that for dmarc_moderation_action, 0 = Accept, # 1 = Munge, 2 = Wrap, 3 = Reject, 4 = Discard if mlist.dmarc_moderation_action == 1: msgdata['from_is_list'] = 1 elif mlist.dmarc_moderation_action == 2: msgdata['from_is_list'] = 2 elif mlist.dmarc_moderation_action == 3: # Reject text = mlist.dmarc_moderation_notice if text: text = Utils.wrap(text) else: listowner = mlist.GetOwnerEmail() text = Utils.wrap(_( """You are not allowed to post to this mailing list From: a domain which publishes a DMARC policy of reject or quarantine, and your message has been automatically rejected. If you think that your messages are being rejected in error, contact the mailing list owner at %(listowner)s.""")) raise Errors.RejectMessage, text elif mlist.dmarc_moderation_action == 4: raise Errors.DiscardMessage # Get member address if any. for sender in msg.get_senders(): if mlist.isMember(sender): break else: sender = msg.get_sender() if (mlist.member_verbosity_threshold > 0 and Utils.IsVerboseMember(mlist, sender) ): mlist.setMemberOption(sender, mm_cfg.Moderate, 1) syslog('vette', '%s: Automatically Moderated %s for verbose postings.', mlist.real_name, sender) if msgdata.get('approved'): return # First do site hard coded header spam checks for header, regex in mm_cfg.KNOWN_SPAMMERS: cre = re.compile(regex, re.IGNORECASE) for value in msg.get_all(header, []): mo = cre.search(value) if mo: # we've detected spam, so throw the message away raise SpamDetected # Now do header_filter_rules # TK: Collect headers in sub-parts because attachment filename # extension may be a clue to possible virus/spam. headers = u'' # Get the character set of the lists preferred language for headers lcset = Utils.GetCharSet(mlist.preferred_language) for p in msg.walk(): headers += getDecodedHeaders(p, lcset) for patterns, action, empty in mlist.header_filter_rules: if action == mm_cfg.DEFER: continue for pattern in patterns.splitlines(): if pattern.startswith('#'): continue # ignore 'empty' patterns if not pattern.strip(): continue pattern = Utils.xml_to_unicode(pattern, lcset) pattern = normalize(mm_cfg.NORMALIZE_FORM, pattern) try: mo = re.search(pattern, headers, re.IGNORECASE|re.MULTILINE|re.UNICODE) except (re.error, TypeError): syslog('error', 'ignoring header_filter_rules invalid pattern: %s', pattern) if mo: if action == mm_cfg.DISCARD: raise Errors.DiscardMessage if action == mm_cfg.REJECT: if msgdata.get('toowner'): # Don't send rejection notice if addressed to '-owner' # because it may trigger a loop of notices if the # sender address is forged. We just discard it here. raise Errors.DiscardMessage raise Errors.RejectMessage( _('Message rejected by filter rule match')) if action == mm_cfg.HOLD: if msgdata.get('toowner'): # Don't hold '-owner' addressed message. We just # pass it here but list-owner can set this to be # discarded on the GUI if he wants. return hold_for_approval( mlist, msg, msgdata, HeaderMatchHold(pattern)) if action == mm_cfg.ACCEPT: return