From 43e5fac2b77716e590b7b9d3e9a35f15946dd4d6 Mon Sep 17 00:00:00 2001 From: Phil Pennock Date: Mon, 18 Mar 2013 18:07:57 -0400 Subject: Handle CNAMEs when chasing DMARC TXT records. Handle TXT records missing tags, check all such records, etc. Use \b boundary anchors in regexp check. (Should only be one, but if there are multiple, check them all, reject if any of them say p=reject). --- Mailman/Utils.py | 49 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 5 deletions(-) (limited to 'Mailman/Utils.py') diff --git a/Mailman/Utils.py b/Mailman/Utils.py index a2cc0caa..6c839a3c 100644 --- a/Mailman/Utils.py +++ b/Mailman/Utils.py @@ -35,6 +35,7 @@ import errno import base64 import random import urlparse +import collections import htmlentitydefs import email.Header import email.Iterators @@ -1081,18 +1082,56 @@ def IsDmarcProhibited(email): resolver.timeout = 1 resolver.lifetime = 5 txt_recs = resolver.query(dmarc_domain, dns.rdatatype.TXT) - except dns.resolver.NXDOMAIN: + except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): return False except DNSException, e: syslog('error', 'DNSException: Unable to query DMARC policy for %s (%s). %s', email, dmarc_domain, e.__class__) return False else: +# people are already being dumb, don't trust them to provide honest DNS +# where the answer section only contains what was asked for, nor to include +# CNAMEs before the values they point to. + full_record = "" + results_by_name = collections.defaultdict(list) + cnames = {} + want_names = set([dmarc_domain + '.']) for txt_rec in txt_recs.response.answer: - assert( txt_rec.rdtype == dns.rdatatype.TXT) - if re.search(r"[^s]p=reject", "".join(txt_rec.items[0].strings), re.IGNORECASE): - return True - + if txt_rec.rdtype == dns.rdatatype.CNAME: + cnames[txt_rec.name.to_text()] = txt_rec.items[0].target.to_text() + if txt_rec.rdtype != dns.rdatatype.TXT: + continue + results_by_name[txt_rec.name.to_text()].append("".join(txt_rec.items[0].strings)) + expands = list(want_names) + seen = set(expands) + while expands: + item = expands.pop(0) + if item in cnames: + if cnames[item] in seen: + continue # cname loop + expands.append(cnames[item]) + seen.add(cnames[item]) + want_names.add(cnames[item]) + want_names.discard(item) + + if len(want_names) != 1: + syslog('error', 'multiple DMARC entries in results for %s, processing each to be strict', + dmarc_domain) + for name in want_names: + if name not in results_by_name: + continue + dmarcs = filter(lambda n: n.startswith('v=DMARC1;'), results_by_name[name]) + if len(dmarcs) == 0: + return False + if len(dmarcs) > 1: + syslog('error', 'RRset of TXT records for %s has %d v=DMARC1 entries; testing them all', + dmarc_domain, len(dmarc)) + for entry in dmarcs: + if re.search(r'\bp=reject\b', entry, re.IGNORECASE): + syslog('info', 'DMARC lookup for %s (%s) found p=reject in %s = %s', + email, dmarc_domain, name, entry) + return True + return False -- cgit v1.2.3