diff options
Diffstat (limited to '')
-rw-r--r-- | Mailman/Utils.py | 413 |
1 files changed, 408 insertions, 5 deletions
diff --git a/Mailman/Utils.py b/Mailman/Utils.py index 93e1fba1..75481563 100644 --- a/Mailman/Utils.py +++ b/Mailman/Utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 1998-2011 by the Free Software Foundation, Inc. +# Copyright (C) 1998-2017 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -34,6 +34,7 @@ import time import errno import base64 import random +import urllib2 import urlparse import htmlentitydefs import email.Header @@ -71,8 +72,16 @@ except NameError: True = 1 False = 0 +try: + import dns.resolver + from dns.exception import DNSException + dns_resolver = True +except ImportError: + dns_resolver = False + EMPTYSTRING = '' UEMPTYSTRING = u'' +CR = '\r' NL = '\n' DOT = '.' IDENTCHARS = ascii_letters + digits + '_' @@ -92,6 +101,12 @@ def list_exists(listname): # # The former two are for 2.1alpha3 and beyond, while the latter two are # for all earlier versions. + # + # But first ensure the list name doesn't contain a path traversal + # attack. + if len(re.sub(mm_cfg.ACCEPTABLE_LISTNAME_CHARACTERS, '', listname)) > 0: + syslog('mischief', 'Hostile listname: %s', listname) + return False basepath = Site.get_listpath(listname) for ext in ('.pck', '.pck.last', '.db', '.db.last'): dbfile = os.path.join(basepath, 'config' + ext) @@ -220,10 +235,18 @@ _valid_domain = re.compile('[-a-z0-9]', re.IGNORECASE) def ValidateEmail(s): """Verify that an email address isn't grossly evil.""" + # If a user submits a form or URL with post data or query fragments + # with multiple occurrences of the same variable, we can get a list + # here. Be as careful as possible. + if isinstance(s, list) or isinstance(s, tuple): + if len(s) == 0: + s = '' + else: + s = s[-1] # Pretty minimal, cheesy check. We could do better... if not s or s.count(' ') > 0: raise Errors.MMBadEmailError - if _badchars.search(s) or s[0] == '-': + if _badchars.search(s): raise Errors.MMHostileAddress, s user, domain_parts = ParseEmail(s) # This means local, unqualified addresses, are not allowed @@ -232,8 +255,9 @@ def ValidateEmail(s): if len(domain_parts) < 2: raise Errors.MMBadEmailError, s # domain parts may only contain ascii letters, digits and hyphen + # and must not begin with hyphen. for p in domain_parts: - if len(_valid_domain.sub('', p)) > 0: + if len(p) == 0 or p[0] == '-' or len(_valid_domain.sub('', p)) > 0: raise Errors.MMHostileAddress, s @@ -247,12 +271,24 @@ def GetPathPieces(envar='PATH_INFO'): if path: if CRNLpat.search(path): path = CRNLpat.split(path)[0] - syslog('error', 'Warning: Possible malformed path attack.') + remote = os.environ.get('HTTP_FORWARDED_FOR', + os.environ.get('HTTP_X_FORWARDED_FOR', + os.environ.get('REMOTE_ADDR', + 'unidentified origin'))) + syslog('error', + 'Warning: Possible malformed path attack domain=%s remote=%s', + get_domain(), + remote) return [p for p in path.split('/') if p] return None +def GetRequestMethod(): + return os.environ.get('REQUEST_METHOD') + + + def ScriptURL(target, web_page_url=None, absolute=False): """target - scriptname only, nothing extra web_page_url - the list's configvar of the same name @@ -427,6 +463,14 @@ def check_global_password(response, siteadmin=True): _ampre = re.compile('&((?:#[0-9]+|[a-z]+);)', re.IGNORECASE) def websafe(s): + # If a user submits a form or URL with post data or query fragments + # with multiple occurrences of the same variable, we can get a list + # here. Be as careful as possible. + if isinstance(s, list) or isinstance(s, tuple): + if len(s) == 0: + s = '' + else: + s = s[-1] if mm_cfg.BROKEN_BROWSER_WORKAROUND: # Archiver can pass unicode here. Just skip them as the # archiver escapes non-ascii anyway. @@ -715,7 +759,7 @@ def get_domain(): if port and host.endswith(':' + port): host = host[:-len(port)-1] if mm_cfg.VIRTUAL_HOST_OVERVIEW and host: - return host.lower() + return websafe(host.lower()) else: # See the note in Defaults.py concerning DEFAULT_URL # vs. DEFAULT_URL_HOST. @@ -905,6 +949,61 @@ def oneline(s, cset): return EMPTYSTRING.join(s.splitlines()) +def strip_verbose_pattern(pattern): + # Remove white space and comments from a verbose pattern and return a + # non-verbose, equivalent pattern. Replace CR and NL in the result + # with '\\r' and '\\n' respectively to avoid multi-line results. + if not isinstance(pattern, str): + return pattern + newpattern = '' + i = 0 + inclass = False + skiptoeol = False + copynext = False + while i < len(pattern): + c = pattern[i] + if copynext: + if c == NL: + newpattern += '\\n' + elif c == CR: + newpattern += '\\r' + else: + newpattern += c + copynext = False + elif skiptoeol: + if c == NL: + skiptoeol = False + elif c == '#' and not inclass: + skiptoeol = True + elif c == '[' and not inclass: + inclass = True + newpattern += c + copynext = True + elif c == ']' and inclass: + inclass = False + newpattern += c + elif re.search('\s', c): + if inclass: + if c == NL: + newpattern += '\\n' + elif c == CR: + newpattern += '\\r' + else: + newpattern += c + elif c == '\\' and not inclass: + newpattern += c + copynext = True + else: + if c == NL: + newpattern += '\\n' + elif c == CR: + newpattern += '\\r' + else: + newpattern += c + i += 1 + return newpattern + + # Patterns and functions to flag possible XSS attacks in HTML. # This list is compiled from information at http://ha.ckers.org/xss.html, # http://www.quirksmode.org/js/events_compinfo.html, @@ -1057,3 +1156,307 @@ def suspiciousHTML(html): else: return False + +# The next functions read data from +# https://publicsuffix.org/list/public_suffix_list.dat and implement the +# algorithm at https://publicsuffix.org/list/ to find the "Organizational +# Domain corresponding to a From: domain. + +s_dict = {} + +def get_suffixes(url): + """This loads and parses the data from the url argument into s_dict for + use by get_org_dom.""" + global s_dict + if s_dict: + return + if not url: + return + try: + d = urllib2.urlopen(url) + except urllib2.URLError, e: + syslog('error', + 'Unable to retrieve data from %s: %s', + url, e) + return + for line in d.readlines(): + if not line.strip() or line.startswith(' ') or line.startswith('//'): + continue + line = re.sub(' .*', '', line.strip()) + if not line: + continue + parts = line.lower().split('.') + if parts[0].startswith('!'): + exc = True + parts = [parts[0][1:]] + parts[1:] + else: + exc = False + parts.reverse() + k = '.'.join(parts) + s_dict[k] = exc + +def _get_dom(d, l): + """A helper to get a domain name consisting of the first l+1 labels + in d.""" + dom = d[:min(l+1, len(d))] + dom.reverse() + return '.'.join(dom) + +def get_org_dom(domain): + """Given a domain name, this returns the corresponding Organizational + Domain which may be the same as the input.""" + global s_dict + if not s_dict: + get_suffixes(mm_cfg.DMARC_ORGANIZATIONAL_DOMAIN_DATA_URL) + hits = [] + d = domain.lower().split('.') + d.reverse() + for k in s_dict.keys(): + ks = k.split('.') + if len(d) >= len(ks): + for i in range(len(ks)-1): + if d[i] != ks[i] and ks[i] != '*': + break + else: + if d[len(ks)-1] == ks[-1] or ks[-1] == '*': + hits.append(k) + if not hits: + return _get_dom(d, 1) + l = 0 + for k in hits: + if s_dict[k]: + # It's an exception + return _get_dom(d, len(k.split('.'))-1) + if len(k.split('.')) > l: + l = len(k.split('.')) + return _get_dom(d, l) + + +# This takes an email address, and returns True if DMARC policy is p=reject +# or possibly quarantine. +def IsDMARCProhibited(mlist, email): + if not dns_resolver: + # This is a problem; log it. + syslog('error', + 'DNS lookup for dmarc_moderation_action for list %s not available', + mlist.real_name) + return False + + email = email.lower() + # Scan from the right in case quoted local part has an '@'. + at_sign = email.rfind('@') + if at_sign < 1: + return False + f_dom = email[at_sign+1:] + x = _DMARCProhibited(mlist, email, '_dmarc.' + f_dom) + if x != 'continue': + return x + o_dom = get_org_dom(f_dom) + if o_dom != f_dom: + x = _DMARCProhibited(mlist, email, '_dmarc.' + o_dom, org=True) + if x != 'continue': + return x + return False + +def _DMARCProhibited(mlist, email, dmarc_domain, org=False): + + try: + resolver = dns.resolver.Resolver() + resolver.timeout = float(mm_cfg.DMARC_RESOLVER_TIMEOUT) + resolver.lifetime = float(mm_cfg.DMARC_RESOLVER_LIFETIME) + txt_recs = resolver.query(dmarc_domain, dns.rdatatype.TXT) + except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): + return 'continue' + except DNSException, e: + syslog('error', + 'DNSException: Unable to query DMARC policy for %s (%s). %s', + email, dmarc_domain, e.__doc__) + return 'continue' + else: + # Be as robust as possible in parsing the result. + results_by_name = {} + cnames = {} + want_names = set([dmarc_domain + '.']) + for txt_rec in txt_recs.response.answer: + if txt_rec.rdtype == dns.rdatatype.CNAME: + cnames[txt_rec.name.to_text()] = ( + txt_rec.items[0].target.to_text()) + if txt_rec.rdtype != dns.rdatatype.TXT: + continue + results_by_name.setdefault(txt_rec.name.to_text(), []).append( + "".join(txt_rec.items[0].strings)) + expands = list(want_names) + seen = set(expands) + while expands: + item = expands.pop(0) + if item in cnames: + if cnames[item] in seen: + continue # cname loop + expands.append(cnames[item]) + seen.add(cnames[item]) + want_names.add(cnames[item]) + want_names.discard(item) + + if len(want_names) != 1: + syslog('error', + """multiple DMARC entries in results for %s, + processing each to be strict""", + dmarc_domain) + for name in want_names: + if name not in results_by_name: + continue + dmarcs = filter(lambda n: n.startswith('v=DMARC1;'), + results_by_name[name]) + if len(dmarcs) == 0: + return 'continue' + if len(dmarcs) > 1: + syslog('error', + """RRset of TXT records for %s has %d v=DMARC1 entries; + testing them all""", + dmarc_domain, len(dmarcs)) + for entry in dmarcs: + mo = re.search(r'\bsp=(\w*)\b', entry, re.IGNORECASE) + if org and mo: + policy = mo.group(1).lower() + else: + mo = re.search(r'\bp=(\w*)\b', entry, re.IGNORECASE) + if mo: + policy = mo.group(1).lower() + else: + continue + if policy == 'reject': + syslog('vette', + '%s: DMARC lookup for %s (%s) found p=reject in %s = %s', + mlist.real_name, email, dmarc_domain, name, entry) + return True + + if (mlist.dmarc_quarantine_moderation_action and + policy == 'quarantine'): + syslog('vette', + '%s: DMARC lookup for %s (%s) found p=quarantine in %s = %s', + mlist.real_name, email, dmarc_domain, name, entry) + return True + + if (mlist.dmarc_none_moderation_action and + mlist.dmarc_quarantine_moderation_action and + mlist.dmarc_moderation_action in (1, 2) and + policy == 'none'): + syslog('vette', + '%s: DMARC lookup for %s (%s) found p=none in %s = %s', + mlist.real_name, email, dmarc_domain, name, entry) + return True + + return False + + +# Check a known list in order to auto-moderate verbose members +# dictionary to remember recent posts. +recentMemberPostings = {} +# counter of times through +clean_count = 0 +def IsVerboseMember(mlist, email): + """For lists that request it, we keep track of recent posts by address. +A message from an address to a list, if the list requests it, is remembered +for a specified time whether or not the address is a list member, and if the +address is a member and the member is over the threshold for the list, that +fact is returned.""" + + global clean_count + + if mlist.member_verbosity_threshold == 0: + return False + + email = email.lower() + + now = time.time() + recentMemberPostings.setdefault(email,[]).append(now + + float(mlist.member_verbosity_interval) + ) + x = range(len(recentMemberPostings[email])) + x.reverse() + for i in x: + if recentMemberPostings[email][i] < now: + del recentMemberPostings[email][i] + + clean_count += 1 + if clean_count >= mm_cfg.VERBOSE_CLEAN_LIMIT: + clean_count = 0 + for addr in recentMemberPostings.keys(): + x = range(len(recentMemberPostings[addr])) + x.reverse() + for i in x: + if recentMemberPostings[addr][i] < now: + del recentMemberPostings[addr][i] + if not recentMemberPostings[addr]: + del recentMemberPostings[addr] + if not mlist.isMember(email): + return False + return (len(recentMemberPostings.get(email, [])) > + mlist.member_verbosity_threshold + ) + + +def check_eq_domains(email, domains_list): + """The arguments are an email address and a string representing a + list of lists in a form like 'a,b,c;1,2' representing [['a', 'b', + 'c'],['1', '2']]. The inner lists are domains which are + equivalent in some sense. The return is an empty list or a list + of email addresses equivalent to the first argument. + For example, given + + email = 'user@me.com' + domains_list = '''domain1, domain2; mac.com, me.com, icloud.com; + domaina, domainb + ''' + + check_eq_domains(email, domains_list) will return + ['user@mac.com', 'user@icloud.com'] + """ + if not domains_list: + return [] + try: + local, domain = email.rsplit('@', 1) + except ValueError: + return [] + domain = domain.lower() + domains_list = re.sub('\s', '', domains_list).lower() + domains = domains_list.split(';') + domains_list = [] + for d in domains: + domains_list.append(d.split(',')) + for domains in domains_list: + if domain in domains: + return [local + '@' + x for x in domains if x != domain] + return [] + + +def _invert_xml(mo): + # This is used with re.sub below to convert XML char refs and textual \u + # escapes to unicodes. + try: + if mo.group(1)[:1] == '#': + return unichr(int(mo.group(1)[1:])) + elif mo.group(1)[:1].lower() == 'u': + return unichr(int(mo.group(1)[1:], 16)) + else: + return(u'\ufffd') + except ValueError: + # Value is out of range. Return the unicode replace character. + return(u'\ufffd') + + +def xml_to_unicode(s, cset): + """This converts a string s, encoded in cset to a unicode with translation + of XML character references and textual \uxxxx escapes. It is more or less + the inverse of unicode.decode(cset, errors='xmlcharrefreplace'). It is + similar to canonstr above except for replacing invalid refs with the + unicode replace character and recognizing \u escapes. + """ + if isinstance(s, str): + us = s.decode(cset, 'replace') + us = re.sub(u'&(#[0-9]+);', _invert_xml, us) + us = re.sub(u'(?i)\\\\(u[a-f0-9]{4})', _invert_xml, us) + return us + else: + return s + |