aboutsummaryrefslogtreecommitdiffstats
path: root/Mailman/Utils.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--Mailman/Utils.py413
1 files changed, 408 insertions, 5 deletions
diff --git a/Mailman/Utils.py b/Mailman/Utils.py
index 93e1fba1..75481563 100644
--- a/Mailman/Utils.py
+++ b/Mailman/Utils.py
@@ -1,4 +1,4 @@
-# Copyright (C) 1998-2011 by the Free Software Foundation, Inc.
+# Copyright (C) 1998-2017 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -34,6 +34,7 @@ import time
import errno
import base64
import random
+import urllib2
import urlparse
import htmlentitydefs
import email.Header
@@ -71,8 +72,16 @@ except NameError:
True = 1
False = 0
+try:
+ import dns.resolver
+ from dns.exception import DNSException
+ dns_resolver = True
+except ImportError:
+ dns_resolver = False
+
EMPTYSTRING = ''
UEMPTYSTRING = u''
+CR = '\r'
NL = '\n'
DOT = '.'
IDENTCHARS = ascii_letters + digits + '_'
@@ -92,6 +101,12 @@ def list_exists(listname):
#
# The former two are for 2.1alpha3 and beyond, while the latter two are
# for all earlier versions.
+ #
+ # But first ensure the list name doesn't contain a path traversal
+ # attack.
+ if len(re.sub(mm_cfg.ACCEPTABLE_LISTNAME_CHARACTERS, '', listname)) > 0:
+ syslog('mischief', 'Hostile listname: %s', listname)
+ return False
basepath = Site.get_listpath(listname)
for ext in ('.pck', '.pck.last', '.db', '.db.last'):
dbfile = os.path.join(basepath, 'config' + ext)
@@ -220,10 +235,18 @@ _valid_domain = re.compile('[-a-z0-9]', re.IGNORECASE)
def ValidateEmail(s):
"""Verify that an email address isn't grossly evil."""
+ # If a user submits a form or URL with post data or query fragments
+ # with multiple occurrences of the same variable, we can get a list
+ # here. Be as careful as possible.
+ if isinstance(s, list) or isinstance(s, tuple):
+ if len(s) == 0:
+ s = ''
+ else:
+ s = s[-1]
# Pretty minimal, cheesy check. We could do better...
if not s or s.count(' ') > 0:
raise Errors.MMBadEmailError
- if _badchars.search(s) or s[0] == '-':
+ if _badchars.search(s):
raise Errors.MMHostileAddress, s
user, domain_parts = ParseEmail(s)
# This means local, unqualified addresses, are not allowed
@@ -232,8 +255,9 @@ def ValidateEmail(s):
if len(domain_parts) < 2:
raise Errors.MMBadEmailError, s
# domain parts may only contain ascii letters, digits and hyphen
+ # and must not begin with hyphen.
for p in domain_parts:
- if len(_valid_domain.sub('', p)) > 0:
+ if len(p) == 0 or p[0] == '-' or len(_valid_domain.sub('', p)) > 0:
raise Errors.MMHostileAddress, s
@@ -247,12 +271,24 @@ def GetPathPieces(envar='PATH_INFO'):
if path:
if CRNLpat.search(path):
path = CRNLpat.split(path)[0]
- syslog('error', 'Warning: Possible malformed path attack.')
+ remote = os.environ.get('HTTP_FORWARDED_FOR',
+ os.environ.get('HTTP_X_FORWARDED_FOR',
+ os.environ.get('REMOTE_ADDR',
+ 'unidentified origin')))
+ syslog('error',
+ 'Warning: Possible malformed path attack domain=%s remote=%s',
+ get_domain(),
+ remote)
return [p for p in path.split('/') if p]
return None
+def GetRequestMethod():
+ return os.environ.get('REQUEST_METHOD')
+
+
+
def ScriptURL(target, web_page_url=None, absolute=False):
"""target - scriptname only, nothing extra
web_page_url - the list's configvar of the same name
@@ -427,6 +463,14 @@ def check_global_password(response, siteadmin=True):
_ampre = re.compile('&amp;((?:#[0-9]+|[a-z]+);)', re.IGNORECASE)
def websafe(s):
+ # If a user submits a form or URL with post data or query fragments
+ # with multiple occurrences of the same variable, we can get a list
+ # here. Be as careful as possible.
+ if isinstance(s, list) or isinstance(s, tuple):
+ if len(s) == 0:
+ s = ''
+ else:
+ s = s[-1]
if mm_cfg.BROKEN_BROWSER_WORKAROUND:
# Archiver can pass unicode here. Just skip them as the
# archiver escapes non-ascii anyway.
@@ -715,7 +759,7 @@ def get_domain():
if port and host.endswith(':' + port):
host = host[:-len(port)-1]
if mm_cfg.VIRTUAL_HOST_OVERVIEW and host:
- return host.lower()
+ return websafe(host.lower())
else:
# See the note in Defaults.py concerning DEFAULT_URL
# vs. DEFAULT_URL_HOST.
@@ -905,6 +949,61 @@ def oneline(s, cset):
return EMPTYSTRING.join(s.splitlines())
+def strip_verbose_pattern(pattern):
+ # Remove white space and comments from a verbose pattern and return a
+ # non-verbose, equivalent pattern. Replace CR and NL in the result
+ # with '\\r' and '\\n' respectively to avoid multi-line results.
+ if not isinstance(pattern, str):
+ return pattern
+ newpattern = ''
+ i = 0
+ inclass = False
+ skiptoeol = False
+ copynext = False
+ while i < len(pattern):
+ c = pattern[i]
+ if copynext:
+ if c == NL:
+ newpattern += '\\n'
+ elif c == CR:
+ newpattern += '\\r'
+ else:
+ newpattern += c
+ copynext = False
+ elif skiptoeol:
+ if c == NL:
+ skiptoeol = False
+ elif c == '#' and not inclass:
+ skiptoeol = True
+ elif c == '[' and not inclass:
+ inclass = True
+ newpattern += c
+ copynext = True
+ elif c == ']' and inclass:
+ inclass = False
+ newpattern += c
+ elif re.search('\s', c):
+ if inclass:
+ if c == NL:
+ newpattern += '\\n'
+ elif c == CR:
+ newpattern += '\\r'
+ else:
+ newpattern += c
+ elif c == '\\' and not inclass:
+ newpattern += c
+ copynext = True
+ else:
+ if c == NL:
+ newpattern += '\\n'
+ elif c == CR:
+ newpattern += '\\r'
+ else:
+ newpattern += c
+ i += 1
+ return newpattern
+
+
# Patterns and functions to flag possible XSS attacks in HTML.
# This list is compiled from information at http://ha.ckers.org/xss.html,
# http://www.quirksmode.org/js/events_compinfo.html,
@@ -1057,3 +1156,307 @@ def suspiciousHTML(html):
else:
return False
+
+# The next functions read data from
+# https://publicsuffix.org/list/public_suffix_list.dat and implement the
+# algorithm at https://publicsuffix.org/list/ to find the "Organizational
+# Domain corresponding to a From: domain.
+
+s_dict = {}
+
+def get_suffixes(url):
+ """This loads and parses the data from the url argument into s_dict for
+ use by get_org_dom."""
+ global s_dict
+ if s_dict:
+ return
+ if not url:
+ return
+ try:
+ d = urllib2.urlopen(url)
+ except urllib2.URLError, e:
+ syslog('error',
+ 'Unable to retrieve data from %s: %s',
+ url, e)
+ return
+ for line in d.readlines():
+ if not line.strip() or line.startswith(' ') or line.startswith('//'):
+ continue
+ line = re.sub(' .*', '', line.strip())
+ if not line:
+ continue
+ parts = line.lower().split('.')
+ if parts[0].startswith('!'):
+ exc = True
+ parts = [parts[0][1:]] + parts[1:]
+ else:
+ exc = False
+ parts.reverse()
+ k = '.'.join(parts)
+ s_dict[k] = exc
+
+def _get_dom(d, l):
+ """A helper to get a domain name consisting of the first l+1 labels
+ in d."""
+ dom = d[:min(l+1, len(d))]
+ dom.reverse()
+ return '.'.join(dom)
+
+def get_org_dom(domain):
+ """Given a domain name, this returns the corresponding Organizational
+ Domain which may be the same as the input."""
+ global s_dict
+ if not s_dict:
+ get_suffixes(mm_cfg.DMARC_ORGANIZATIONAL_DOMAIN_DATA_URL)
+ hits = []
+ d = domain.lower().split('.')
+ d.reverse()
+ for k in s_dict.keys():
+ ks = k.split('.')
+ if len(d) >= len(ks):
+ for i in range(len(ks)-1):
+ if d[i] != ks[i] and ks[i] != '*':
+ break
+ else:
+ if d[len(ks)-1] == ks[-1] or ks[-1] == '*':
+ hits.append(k)
+ if not hits:
+ return _get_dom(d, 1)
+ l = 0
+ for k in hits:
+ if s_dict[k]:
+ # It's an exception
+ return _get_dom(d, len(k.split('.'))-1)
+ if len(k.split('.')) > l:
+ l = len(k.split('.'))
+ return _get_dom(d, l)
+
+
+# This takes an email address, and returns True if DMARC policy is p=reject
+# or possibly quarantine.
+def IsDMARCProhibited(mlist, email):
+ if not dns_resolver:
+ # This is a problem; log it.
+ syslog('error',
+ 'DNS lookup for dmarc_moderation_action for list %s not available',
+ mlist.real_name)
+ return False
+
+ email = email.lower()
+ # Scan from the right in case quoted local part has an '@'.
+ at_sign = email.rfind('@')
+ if at_sign < 1:
+ return False
+ f_dom = email[at_sign+1:]
+ x = _DMARCProhibited(mlist, email, '_dmarc.' + f_dom)
+ if x != 'continue':
+ return x
+ o_dom = get_org_dom(f_dom)
+ if o_dom != f_dom:
+ x = _DMARCProhibited(mlist, email, '_dmarc.' + o_dom, org=True)
+ if x != 'continue':
+ return x
+ return False
+
+def _DMARCProhibited(mlist, email, dmarc_domain, org=False):
+
+ try:
+ resolver = dns.resolver.Resolver()
+ resolver.timeout = float(mm_cfg.DMARC_RESOLVER_TIMEOUT)
+ resolver.lifetime = float(mm_cfg.DMARC_RESOLVER_LIFETIME)
+ txt_recs = resolver.query(dmarc_domain, dns.rdatatype.TXT)
+ except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
+ return 'continue'
+ except DNSException, e:
+ syslog('error',
+ 'DNSException: Unable to query DMARC policy for %s (%s). %s',
+ email, dmarc_domain, e.__doc__)
+ return 'continue'
+ else:
+ # Be as robust as possible in parsing the result.
+ results_by_name = {}
+ cnames = {}
+ want_names = set([dmarc_domain + '.'])
+ for txt_rec in txt_recs.response.answer:
+ if txt_rec.rdtype == dns.rdatatype.CNAME:
+ cnames[txt_rec.name.to_text()] = (
+ txt_rec.items[0].target.to_text())
+ if txt_rec.rdtype != dns.rdatatype.TXT:
+ continue
+ results_by_name.setdefault(txt_rec.name.to_text(), []).append(
+ "".join(txt_rec.items[0].strings))
+ expands = list(want_names)
+ seen = set(expands)
+ while expands:
+ item = expands.pop(0)
+ if item in cnames:
+ if cnames[item] in seen:
+ continue # cname loop
+ expands.append(cnames[item])
+ seen.add(cnames[item])
+ want_names.add(cnames[item])
+ want_names.discard(item)
+
+ if len(want_names) != 1:
+ syslog('error',
+ """multiple DMARC entries in results for %s,
+ processing each to be strict""",
+ dmarc_domain)
+ for name in want_names:
+ if name not in results_by_name:
+ continue
+ dmarcs = filter(lambda n: n.startswith('v=DMARC1;'),
+ results_by_name[name])
+ if len(dmarcs) == 0:
+ return 'continue'
+ if len(dmarcs) > 1:
+ syslog('error',
+ """RRset of TXT records for %s has %d v=DMARC1 entries;
+ testing them all""",
+ dmarc_domain, len(dmarcs))
+ for entry in dmarcs:
+ mo = re.search(r'\bsp=(\w*)\b', entry, re.IGNORECASE)
+ if org and mo:
+ policy = mo.group(1).lower()
+ else:
+ mo = re.search(r'\bp=(\w*)\b', entry, re.IGNORECASE)
+ if mo:
+ policy = mo.group(1).lower()
+ else:
+ continue
+ if policy == 'reject':
+ syslog('vette',
+ '%s: DMARC lookup for %s (%s) found p=reject in %s = %s',
+ mlist.real_name, email, dmarc_domain, name, entry)
+ return True
+
+ if (mlist.dmarc_quarantine_moderation_action and
+ policy == 'quarantine'):
+ syslog('vette',
+ '%s: DMARC lookup for %s (%s) found p=quarantine in %s = %s',
+ mlist.real_name, email, dmarc_domain, name, entry)
+ return True
+
+ if (mlist.dmarc_none_moderation_action and
+ mlist.dmarc_quarantine_moderation_action and
+ mlist.dmarc_moderation_action in (1, 2) and
+ policy == 'none'):
+ syslog('vette',
+ '%s: DMARC lookup for %s (%s) found p=none in %s = %s',
+ mlist.real_name, email, dmarc_domain, name, entry)
+ return True
+
+ return False
+
+
+# Check a known list in order to auto-moderate verbose members
+# dictionary to remember recent posts.
+recentMemberPostings = {}
+# counter of times through
+clean_count = 0
+def IsVerboseMember(mlist, email):
+ """For lists that request it, we keep track of recent posts by address.
+A message from an address to a list, if the list requests it, is remembered
+for a specified time whether or not the address is a list member, and if the
+address is a member and the member is over the threshold for the list, that
+fact is returned."""
+
+ global clean_count
+
+ if mlist.member_verbosity_threshold == 0:
+ return False
+
+ email = email.lower()
+
+ now = time.time()
+ recentMemberPostings.setdefault(email,[]).append(now +
+ float(mlist.member_verbosity_interval)
+ )
+ x = range(len(recentMemberPostings[email]))
+ x.reverse()
+ for i in x:
+ if recentMemberPostings[email][i] < now:
+ del recentMemberPostings[email][i]
+
+ clean_count += 1
+ if clean_count >= mm_cfg.VERBOSE_CLEAN_LIMIT:
+ clean_count = 0
+ for addr in recentMemberPostings.keys():
+ x = range(len(recentMemberPostings[addr]))
+ x.reverse()
+ for i in x:
+ if recentMemberPostings[addr][i] < now:
+ del recentMemberPostings[addr][i]
+ if not recentMemberPostings[addr]:
+ del recentMemberPostings[addr]
+ if not mlist.isMember(email):
+ return False
+ return (len(recentMemberPostings.get(email, [])) >
+ mlist.member_verbosity_threshold
+ )
+
+
+def check_eq_domains(email, domains_list):
+ """The arguments are an email address and a string representing a
+ list of lists in a form like 'a,b,c;1,2' representing [['a', 'b',
+ 'c'],['1', '2']]. The inner lists are domains which are
+ equivalent in some sense. The return is an empty list or a list
+ of email addresses equivalent to the first argument.
+ For example, given
+
+ email = 'user@me.com'
+ domains_list = '''domain1, domain2; mac.com, me.com, icloud.com;
+ domaina, domainb
+ '''
+
+ check_eq_domains(email, domains_list) will return
+ ['user@mac.com', 'user@icloud.com']
+ """
+ if not domains_list:
+ return []
+ try:
+ local, domain = email.rsplit('@', 1)
+ except ValueError:
+ return []
+ domain = domain.lower()
+ domains_list = re.sub('\s', '', domains_list).lower()
+ domains = domains_list.split(';')
+ domains_list = []
+ for d in domains:
+ domains_list.append(d.split(','))
+ for domains in domains_list:
+ if domain in domains:
+ return [local + '@' + x for x in domains if x != domain]
+ return []
+
+
+def _invert_xml(mo):
+ # This is used with re.sub below to convert XML char refs and textual \u
+ # escapes to unicodes.
+ try:
+ if mo.group(1)[:1] == '#':
+ return unichr(int(mo.group(1)[1:]))
+ elif mo.group(1)[:1].lower() == 'u':
+ return unichr(int(mo.group(1)[1:], 16))
+ else:
+ return(u'\ufffd')
+ except ValueError:
+ # Value is out of range. Return the unicode replace character.
+ return(u'\ufffd')
+
+
+def xml_to_unicode(s, cset):
+ """This converts a string s, encoded in cset to a unicode with translation
+ of XML character references and textual \uxxxx escapes. It is more or less
+ the inverse of unicode.decode(cset, errors='xmlcharrefreplace'). It is
+ similar to canonstr above except for replacing invalid refs with the
+ unicode replace character and recognizing \u escapes.
+ """
+ if isinstance(s, str):
+ us = s.decode(cset, 'replace')
+ us = re.sub(u'&(#[0-9]+);', _invert_xml, us)
+ us = re.sub(u'(?i)\\\\(u[a-f0-9]{4})', _invert_xml, us)
+ return us
+ else:
+ return s
+