# Copyright (C) 2001-2011 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. """Unit tests for the various Mailman/Handlers/*.py modules. """ import os import time import email import errno import cPickle import unittest from types import ListType from email.Generator import Generator try: from Mailman import __init__ except ImportError: import paths from Mailman import mm_cfg from Mailman.MailList import MailList from Mailman import Message from Mailman import Errors from Mailman import Pending from Mailman.Queue.Switchboard import Switchboard from Mailman.Handlers import Acknowledge from Mailman.Handlers import AfterDelivery from Mailman.Handlers import Approve from Mailman.Handlers import CalcRecips from Mailman.Handlers import Cleanse from Mailman.Handlers import CookHeaders from Mailman.Handlers import Decorate from Mailman.Handlers import FileRecips from Mailman.Handlers import Hold from Mailman.Handlers import MimeDel from Mailman.Handlers import Moderate from Mailman.Handlers import Replybot # Don't test handlers such as SMTPDirect and Sendmail here from Mailman.Handlers import SpamDetect from Mailman.Handlers import Tagger from Mailman.Handlers import ToArchive from Mailman.Handlers import ToDigest from Mailman.Handlers import ToOutgoing from Mailman.Handlers import ToUsenet from Mailman.Utils import sha_new from TestBase import TestBase def password(plaintext): return sha_new(plaintext).hexdigest() class TestAcknowledge(TestBase): def setUp(self): TestBase.setUp(self) # We're going to want to inspect this queue directory self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR) # Add a member self._mlist.addNewMember('aperson@dom.ain') self._mlist.personalize = False def tearDown(self): for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR): os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f)) TestBase.tearDown(self) def test_no_ack_msgdata(self): eq = self.assertEqual # Make sure there are no files in the virgin queue already eq(len(self._sb.files()), 0) msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) Acknowledge.process(self._mlist, msg, {'original_sender': 'aperson@dom.ain'}) eq(len(self._sb.files()), 0) def test_no_ack_not_a_member(self): eq = self.assertEqual # Make sure there are no files in the virgin queue already eq(len(self._sb.files()), 0) msg = email.message_from_string("""\ From: bperson@dom.ain """, Message.Message) Acknowledge.process(self._mlist, msg, {'original_sender': 'bperson@dom.ain'}) eq(len(self._sb.files()), 0) def test_no_ack_sender(self): eq = self.assertEqual eq(len(self._sb.files()), 0) msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) Acknowledge.process(self._mlist, msg, {}) eq(len(self._sb.files()), 0) def test_ack_no_subject(self): eq = self.assertEqual self._mlist.setMemberOption( 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1) eq(len(self._sb.files()), 0) msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) Acknowledge.process(self._mlist, msg, {}) files = self._sb.files() eq(len(files), 1) qmsg, qdata = self._sb.dequeue(files[0]) # Check the .db file eq(qdata.get('listname'), '_xtest') eq(qdata.get('recips'), ['aperson@dom.ain']) eq(qdata.get('version'), 3) # Check the .pck eq(str(str(qmsg['subject'])), '_xtest post acknowledgement') eq(qmsg['to'], 'aperson@dom.ain') eq(qmsg['from'], '_xtest-bounces@dom.ain') eq(qmsg.get_content_type(), 'text/plain') eq(qmsg.get_param('charset'), 'us-ascii') msgid = qmsg['message-id'] self.failUnless(msgid.startswith('')) eq(qmsg.get_payload(), """\ Your message entitled (no subject) was successfully received by the _xtest mailing list. List info page: http://www.dom.ain/mailman/listinfo/_xtest Your preferences: http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain """) # Make sure we dequeued the only message eq(len(self._sb.files()), 0) def test_ack_with_subject(self): eq = self.assertEqual self._mlist.setMemberOption( 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1) eq(len(self._sb.files()), 0) msg = email.message_from_string("""\ From: aperson@dom.ain Subject: Wish you were here """, Message.Message) Acknowledge.process(self._mlist, msg, {}) files = self._sb.files() eq(len(files), 1) qmsg, qdata = self._sb.dequeue(files[0]) # Check the .db file eq(qdata.get('listname'), '_xtest') eq(qdata.get('recips'), ['aperson@dom.ain']) eq(qdata.get('version'), 3) # Check the .pck eq(str(qmsg['subject']), '_xtest post acknowledgement') eq(qmsg['to'], 'aperson@dom.ain') eq(qmsg['from'], '_xtest-bounces@dom.ain') eq(qmsg.get_content_type(), 'text/plain') eq(qmsg.get_param('charset'), 'us-ascii') msgid = qmsg['message-id'] self.failUnless(msgid.startswith('')) eq(qmsg.get_payload(), """\ Your message entitled Wish you were here was successfully received by the _xtest mailing list. List info page: http://www.dom.ain/mailman/listinfo/_xtest Your preferences: http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain """) # Make sure we dequeued the only message eq(len(self._sb.files()), 0) class TestAfterDelivery(TestBase): # Both msg and msgdata are ignored def test_process(self): mlist = self._mlist last_post_time = mlist.last_post_time post_id = mlist.post_id AfterDelivery.process(mlist, None, None) self.failUnless(mlist.last_post_time > last_post_time) self.assertEqual(mlist.post_id, post_id + 1) class TestApprove(TestBase): def test_short_circuit(self): msgdata = {'approved': 1} rtn = Approve.process(self._mlist, Message.Message(), msgdata) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_approved_moderator(self): mlist = self._mlist mlist.mod_password = password('wazoo') msg = email.message_from_string("""\ Approved: wazoo """) msgdata = {} Approve.process(mlist, msg, msgdata) self.failUnless(msgdata.has_key('approved')) self.assertEqual(msgdata['approved'], 1) def test_approve_moderator(self): mlist = self._mlist mlist.mod_password = password('wazoo') msg = email.message_from_string("""\ Approve: wazoo """) msgdata = {} Approve.process(mlist, msg, msgdata) self.failUnless(msgdata.has_key('approved')) self.assertEqual(msgdata['approved'], 1) def test_approved_admin(self): mlist = self._mlist mlist.password = password('wazoo') msg = email.message_from_string("""\ Approved: wazoo """) msgdata = {} Approve.process(mlist, msg, msgdata) self.failUnless(msgdata.has_key('approved')) self.assertEqual(msgdata['approved'], 1) def test_approve_admin(self): mlist = self._mlist mlist.password = password('wazoo') msg = email.message_from_string("""\ Approve: wazoo """) msgdata = {} Approve.process(mlist, msg, msgdata) self.failUnless(msgdata.has_key('approved')) self.assertEqual(msgdata['approved'], 1) def test_unapproved(self): mlist = self._mlist mlist.password = password('zoowa') msg = email.message_from_string("""\ Approve: wazoo """) msgdata = {} Approve.process(mlist, msg, msgdata) self.assertEqual(msgdata.get('approved'), None) def test_trip_beentheres(self): mlist = self._mlist msg = email.message_from_string("""\ X-BeenThere: %s """ % mlist.GetListEmail()) self.assertRaises(Errors.LoopError, Approve.process, mlist, msg, {}) class TestCalcRecips(TestBase): def setUp(self): TestBase.setUp(self) # Add a bunch of regular members mlist = self._mlist mlist.addNewMember('aperson@dom.ain') mlist.addNewMember('bperson@dom.ain') mlist.addNewMember('cperson@dom.ain') # And a bunch of digest members mlist.addNewMember('dperson@dom.ain', digest=1) mlist.addNewMember('eperson@dom.ain', digest=1) mlist.addNewMember('fperson@dom.ain', digest=1) def test_short_circuit(self): msgdata = {'recips': 1} rtn = CalcRecips.process(self._mlist, None, msgdata) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_simple_path(self): msgdata = {} msg = email.message_from_string("""\ From: dperson@dom.ain """, Message.Message) CalcRecips.process(self._mlist, msg, msgdata) self.failUnless(msgdata.has_key('recips')) recips = msgdata['recips'] recips.sort() self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain', 'cperson@dom.ain']) def test_exclude_sender(self): msgdata = {} msg = email.message_from_string("""\ From: cperson@dom.ain """, Message.Message) self._mlist.setMemberOption('cperson@dom.ain', mm_cfg.DontReceiveOwnPosts, 1) CalcRecips.process(self._mlist, msg, msgdata) self.failUnless(msgdata.has_key('recips')) recips = msgdata['recips'] recips.sort() self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain']) def test_urgent_moderator(self): self._mlist.mod_password = password('xxXXxx') msgdata = {} msg = email.message_from_string("""\ From: dperson@dom.ain Urgent: xxXXxx """, Message.Message) CalcRecips.process(self._mlist, msg, msgdata) self.failUnless(msgdata.has_key('recips')) recips = msgdata['recips'] recips.sort() self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain', 'cperson@dom.ain', 'dperson@dom.ain', 'eperson@dom.ain', 'fperson@dom.ain']) def test_urgent_admin(self): self._mlist.mod_password = password('yyYYyy') self._mlist.password = password('xxXXxx') msgdata = {} msg = email.message_from_string("""\ From: dperson@dom.ain Urgent: xxXXxx """, Message.Message) CalcRecips.process(self._mlist, msg, msgdata) self.failUnless(msgdata.has_key('recips')) recips = msgdata['recips'] recips.sort() self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain', 'cperson@dom.ain', 'dperson@dom.ain', 'eperson@dom.ain', 'fperson@dom.ain']) def test_urgent_reject(self): self._mlist.mod_password = password('yyYYyy') self._mlist.password = password('xxXXxx') msgdata = {} msg = email.message_from_string("""\ From: dperson@dom.ain Urgent: zzZZzz """, Message.Message) self.assertRaises(Errors.RejectMessage, CalcRecips.process, self._mlist, msg, msgdata) # BAW: must test the do_topic_filters() path... class TestCleanse(TestBase): def setUp(self): TestBase.setUp(self) def test_simple_cleanse(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain Approved: yes Urgent: indeed Reply-To: bperson@dom.ain Sender: asystem@dom.ain Return-Receipt-To: another@dom.ain Disposition-Notification-To: athird@dom.ain X-Confirm-Reading-To: afourth@dom.ain X-PMRQC: afifth@dom.ain Subject: a message to you """, Message.Message) Cleanse.process(self._mlist, msg, {}) eq(msg['approved'], None) eq(msg['urgent'], None) eq(msg['return-receipt-to'], None) eq(msg['disposition-notification-to'], None) eq(msg['x-confirm-reading-to'], None) eq(msg['x-pmrqc'], None) eq(msg['from'], 'aperson@dom.ain') eq(msg['reply-to'], 'bperson@dom.ain') eq(msg['sender'], 'asystem@dom.ain') eq(msg['subject'], 'a message to you') def test_anon_cleanse(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain Approved: yes Urgent: indeed Reply-To: bperson@dom.ain Sender: asystem@dom.ain Return-Receipt-To: another@dom.ain Disposition-Notification-To: athird@dom.ain X-Confirm-Reading-To: afourth@dom.ain X-PMRQC: afifth@dom.ain Subject: a message to you """, Message.Message) self._mlist.anonymous_list = 1 Cleanse.process(self._mlist, msg, {}) eq(msg['approved'], None) eq(msg['urgent'], None) eq(msg['return-receipt-to'], None) eq(msg['disposition-notification-to'], None) eq(msg['x-confirm-reading-to'], None) eq(msg['x-pmrqc'], None) eq(len(msg.get_all('from')), 1) eq(len(msg.get_all('reply-to')), 1) eq(msg['from'], '_xtest@dom.ain') eq(msg['reply-to'], '_xtest@dom.ain') eq(msg['sender'], None) eq(msg['subject'], 'a message to you') class TestCookHeaders(TestBase): def test_transform_noack_to_xack(self): eq = self.assertEqual msg = email.message_from_string("""\ X-Ack: yes """, Message.Message) CookHeaders.process(self._mlist, msg, {'noack': 1}) eq(len(msg.get_all('x-ack')), 1) eq(msg['x-ack'], 'no') def test_original_sender(self): msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) msgdata = {} CookHeaders.process(self._mlist, msg, msgdata) self.assertEqual(msgdata.get('original_sender'), 'aperson@dom.ain') def test_no_original_sender(self): msg = email.message_from_string("""\ Subject: about this message """, Message.Message) msgdata = {} CookHeaders.process(self._mlist, msg, msgdata) self.assertEqual(msgdata.get('original_sender'), '') def test_xbeenthere(self): msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {}) self.assertEqual(msg['x-beenthere'], '_xtest@dom.ain') def test_multiple_xbeentheres(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain X-BeenThere: alist@another.dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(len(msg.get_all('x-beenthere')), 2) beentheres = msg.get_all('x-beenthere') beentheres.sort() eq(beentheres, ['_xtest@dom.ain', 'alist@another.dom.ain']) def test_nonexisting_mmversion(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(msg['x-mailman-version'], mm_cfg.VERSION) def test_existing_mmversion(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain X-Mailman-Version: 3000 """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(len(msg.get_all('x-mailman-version')), 1) eq(msg['x-mailman-version'], '3000') def test_nonexisting_precedence(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(msg['precedence'], 'list') def test_existing_precedence(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain Precedence: junk """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(len(msg.get_all('precedence')), 1) eq(msg['precedence'], 'junk') def test_subject_munging_no_subject(self): self._mlist.subject_prefix = '[XTEST] ' msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) msgdata = {} CookHeaders.process(self._mlist, msg, msgdata) self.assertEqual(msgdata.get('origsubj'), '') self.assertEqual(str(msg['subject']), '[XTEST] (no subject)') def test_subject_munging(self): self._mlist.subject_prefix = '[XTEST] ' msg = email.message_from_string("""\ From: aperson@dom.ain Subject: About Mailman... """, Message.Message) CookHeaders.process(self._mlist, msg, {}) self.assertEqual(str(msg['subject']), '[XTEST] About Mailman...') def test_no_subject_munging_for_digests(self): self._mlist.subject_prefix = '[XTEST] ' msg = email.message_from_string("""\ From: aperson@dom.ain Subject: About Mailman... """, Message.Message) CookHeaders.process(self._mlist, msg, {'isdigest': 1}) self.assertEqual(msg['subject'], 'About Mailman...') def test_no_subject_munging_for_fasttrack(self): self._mlist.subject_prefix = '[XTEST] ' msg = email.message_from_string("""\ From: aperson@dom.ain Subject: About Mailman... """, Message.Message) CookHeaders.process(self._mlist, msg, {'_fasttrack': 1}) self.assertEqual(msg['subject'], 'About Mailman...') def test_no_subject_munging_has_prefix(self): self._mlist.subject_prefix = '[XTEST] ' msg = email.message_from_string("""\ From: aperson@dom.ain Subject: Re: [XTEST] About Mailman... """, Message.Message) CookHeaders.process(self._mlist, msg, {}) # prefixing depends on mm_cfg.py self.failUnless(str(msg['subject']) == 'Re: [XTEST] About Mailman...' or str(msg['subject']) == '[XTEST] Re: About Mailman...') def test_reply_to_list(self): eq = self.assertEqual mlist = self._mlist mlist.reply_goes_to_list = 1 msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(mlist, msg, {}) eq(msg['reply-to'], '_xtest@dom.ain') eq(msg.get_all('reply-to'), ['_xtest@dom.ain']) def test_reply_to_list_with_strip(self): eq = self.assertEqual mlist = self._mlist mlist.reply_goes_to_list = 1 mlist.first_strip_reply_to = 1 msg = email.message_from_string("""\ From: aperson@dom.ain Reply-To: bperson@dom.ain """, Message.Message) CookHeaders.process(mlist, msg, {}) eq(msg['reply-to'], '_xtest@dom.ain') eq(msg.get_all('reply-to'), ['_xtest@dom.ain']) def test_reply_to_explicit(self): eq = self.assertEqual mlist = self._mlist mlist.reply_goes_to_list = 2 mlist.reply_to_address = 'mlist@dom.ain' msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(mlist, msg, {}) eq(msg['reply-to'], 'mlist@dom.ain') eq(msg.get_all('reply-to'), ['mlist@dom.ain']) def test_reply_to_explicit_with_strip(self): eq = self.assertEqual mlist = self._mlist mlist.reply_goes_to_list = 2 mlist.first_strip_reply_to = 1 mlist.reply_to_address = 'mlist@dom.ain' msg = email.message_from_string("""\ From: aperson@dom.ain Reply-To: bperson@dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(msg['reply-to'], 'mlist@dom.ain') eq(msg.get_all('reply-to'), ['mlist@dom.ain']) def test_reply_to_extends_to_list(self): eq = self.assertEqual mlist = self._mlist mlist.reply_goes_to_list = 1 mlist.first_strip_reply_to = 0 msg = email.message_from_string("""\ From: aperson@dom.ain Reply-To: bperson@dom.ain """, Message.Message) CookHeaders.process(mlist, msg, {}) eq(msg['reply-to'], 'bperson@dom.ain, _xtest@dom.ain') def test_reply_to_extends_to_explicit(self): eq = self.assertEqual mlist = self._mlist mlist.reply_goes_to_list = 2 mlist.first_strip_reply_to = 0 mlist.reply_to_address = 'mlist@dom.ain' msg = email.message_from_string("""\ From: aperson@dom.ain Reply-To: bperson@dom.ain """, Message.Message) CookHeaders.process(mlist, msg, {}) eq(msg['reply-to'], 'mlist@dom.ain, bperson@dom.ain') def test_list_headers_nolist(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {'_nolist': 1}) eq(msg['list-id'], None) eq(msg['list-help'], None) eq(msg['list-unsubscribe'], None) eq(msg['list-subscribe'], None) eq(msg['list-post'], None) eq(msg['list-archive'], None) def test_list_headers(self): eq = self.assertEqual self._mlist.archive = 1 msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) oldval = mm_cfg.DEFAULT_URL_HOST mm_cfg.DEFAULT_URL_HOST = 'www.dom.ain' try: CookHeaders.process(self._mlist, msg, {}) finally: mm_cfg.DEFAULT_URL_HOST = oldval eq(msg['list-id'], '<_xtest.dom.ain>') eq(msg['list-help'], '') eq(msg['list-unsubscribe'], ',' '\n\t') eq(msg['list-subscribe'], ',' '\n\t') eq(msg['list-post'], '') eq(msg['list-archive'], '') def test_list_headers_with_description(self): eq = self.assertEqual self._mlist.archive = 1 self._mlist.description = 'A Test List' msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) CookHeaders.process(self._mlist, msg, {}) eq(unicode(msg['list-id']), 'A Test List <_xtest.dom.ain>') eq(msg['list-help'], '') eq(msg['list-unsubscribe'], ',' '\n\t') eq(msg['list-subscribe'], ',' '\n\t') eq(msg['list-post'], '') class TestDecorate(TestBase): def test_short_circuit(self): msgdata = {'isdigest': 1} rtn = Decorate.process(self._mlist, None, msgdata) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_no_multipart(self): mlist = self._mlist mlist.msg_header = 'header\n' mlist.msg_footer = 'footer' msg = email.message_from_string("""\ From: aperson@dom.ain Here is a message. """) Decorate.process(self._mlist, msg, {}) self.assertEqual(msg.get_payload(), """\ header Here is a message. footer""") def test_no_multipart_template(self): mlist = self._mlist mlist.msg_header = '%(real_name)s header\n' mlist.msg_footer = '%(real_name)s footer' mlist.real_name = 'XTest' msg = email.message_from_string("""\ From: aperson@dom.ain Here is a message. """) Decorate.process(self._mlist, msg, {}) self.assertEqual(msg.get_payload(), """\ XTest header Here is a message. XTest footer""") def test_no_multipart_type_error(self): mlist = self._mlist mlist.msg_header = '%(real_name) header\n' mlist.msg_footer = '%(real_name) footer' mlist.real_name = 'XTest' msg = email.message_from_string("""\ From: aperson@dom.ain Here is a message. """) Decorate.process(self._mlist, msg, {}) self.assertEqual(msg.get_payload(), """\ %(real_name) header Here is a message. %(real_name) footer""") def test_no_multipart_value_error(self): mlist = self._mlist # These will generate warnings in logs/error mlist.msg_header = '%(real_name)p header\n' mlist.msg_footer = '%(real_name)p footer' mlist.real_name = 'XTest' msg = email.message_from_string("""\ From: aperson@dom.ain Here is a message. """) Decorate.process(self._mlist, msg, {}) self.assertEqual(msg.get_payload(), """\ %(real_name)p header Here is a message. %(real_name)p footer""") def test_no_multipart_missing_key(self): mlist = self._mlist mlist.msg_header = '%(spooge)s header\n' mlist.msg_footer = '%(spooge)s footer' msg = email.message_from_string("""\ From: aperson@dom.ain Here is a message. """) Decorate.process(self._mlist, msg, {}) self.assertEqual(msg.get_payload(), """\ %(spooge)s header Here is a message. %(spooge)s footer""") def test_multipart(self): eq = self.ndiffAssertEqual mlist = self._mlist mlist.msg_header = 'header' mlist.msg_footer = 'footer' msg1 = email.message_from_string("""\ From: aperson@dom.ain Here is the first message. """) msg2 = email.message_from_string("""\ From: bperson@dom.ain Here is the second message. """) msg = Message.Message() msg.set_type('multipart/mixed') msg.set_boundary('BOUNDARY') msg.attach(msg1) msg.attach(msg2) Decorate.process(self._mlist, msg, {}) eq(msg.as_string(unixfrom=0), """\ MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="BOUNDARY" --BOUNDARY Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline header --BOUNDARY From: aperson@dom.ain Here is the first message. --BOUNDARY From: bperson@dom.ain Here is the second message. --BOUNDARY Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline footer --BOUNDARY--""") def test_image(self): eq = self.assertEqual mlist = self._mlist mlist.msg_header = 'header\n' mlist.msg_footer = 'footer' msg = email.message_from_string("""\ From: aperson@dom.ain Content-type: image/x-spooge IMAGEDATAIMAGEDATAIMAGEDATA """) Decorate.process(self._mlist, msg, {}) eq(len(msg.get_payload()), 3) self.assertEqual(msg.get_payload(1).get_payload(), """\ IMAGEDATAIMAGEDATAIMAGEDATA """) def test_personalize_assert(self): raises = self.assertRaises raises(AssertionError, Decorate.process, self._mlist, None, {'personalize': 1}) raises(AssertionError, Decorate.process, self._mlist, None, {'personalize': 1, 'recips': [1, 2, 3]}) class TestFileRecips(TestBase): def test_short_circuit(self): msgdata = {'recips': 1} rtn = FileRecips.process(self._mlist, None, msgdata) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_file_nonexistant(self): msgdata = {} FileRecips.process(self._mlist, None, msgdata) self.assertEqual(msgdata.get('recips'), []) def test_file_exists_no_sender(self): msg = email.message_from_string("""\ To: yall@dom.ain """, Message.Message) msgdata = {} file = os.path.join(self._mlist.fullpath(), 'members.txt') addrs = ['aperson@dom.ain', 'bperson@dom.ain', 'cperson@dom.ain', 'dperson@dom.ain'] fp = open(file, 'w') try: for addr in addrs: print >> fp, addr fp.close() FileRecips.process(self._mlist, msg, msgdata) self.assertEqual(msgdata.get('recips'), addrs) finally: try: os.unlink(file) except OSError, e: if e.errno <> e.ENOENT: raise def test_file_exists_no_member(self): msg = email.message_from_string("""\ From: eperson@dom.ain To: yall@dom.ain """, Message.Message) msgdata = {} file = os.path.join(self._mlist.fullpath(), 'members.txt') addrs = ['aperson@dom.ain', 'bperson@dom.ain', 'cperson@dom.ain', 'dperson@dom.ain'] fp = open(file, 'w') try: for addr in addrs: print >> fp, addr fp.close() FileRecips.process(self._mlist, msg, msgdata) self.assertEqual(msgdata.get('recips'), addrs) finally: try: os.unlink(file) except OSError, e: if e.errno <> e.ENOENT: raise def test_file_exists_is_member(self): msg = email.message_from_string("""\ From: aperson@dom.ain To: yall@dom.ain """, Message.Message) msgdata = {} file = os.path.join(self._mlist.fullpath(), 'members.txt') addrs = ['aperson@dom.ain', 'bperson@dom.ain', 'cperson@dom.ain', 'dperson@dom.ain'] fp = open(file, 'w') try: for addr in addrs: print >> fp, addr self._mlist.addNewMember(addr) fp.close() FileRecips.process(self._mlist, msg, msgdata) self.assertEqual(msgdata.get('recips'), addrs[1:]) finally: try: os.unlink(file) except OSError, e: if e.errno <> e.ENOENT: raise class TestHold(TestBase): def setUp(self): TestBase.setUp(self) self._mlist.administrivia = 1 self._mlist.respond_to_post_requests = 0 self._mlist.admin_immed_notify = 0 # We're going to want to inspect this queue directory self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR) def tearDown(self): for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR): os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f)) TestBase.tearDown(self) try: os.unlink(os.path.join(mm_cfg.DATA_DIR, 'pending.db')) except OSError, e: if e.errno <> errno.ENOENT: raise for f in [holdfile for holdfile in os.listdir(mm_cfg.DATA_DIR) if holdfile.startswith('heldmsg-')]: os.unlink(os.path.join(mm_cfg.DATA_DIR, f)) def test_short_circuit(self): msgdata = {'approved': 1} rtn = Hold.process(self._mlist, None, msgdata) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_administrivia(self): msg = email.message_from_string("""\ From: aperson@dom.ain Subject: unsubscribe """, Message.Message) self.assertRaises(Hold.Administrivia, Hold.process, self._mlist, msg, {}) def test_max_recips(self): self._mlist.max_num_recipients = 5 msg = email.message_from_string("""\ From: aperson@dom.ain To: _xtest@dom.ain, bperson@dom.ain Cc: cperson@dom.ain Cc: dperson@dom.ain (Jimmy D. Person) To: Billy E. Person Hey folks! """, Message.Message) self.assertRaises(Hold.TooManyRecipients, Hold.process, self._mlist, msg, {}) def test_implicit_destination(self): self._mlist.require_explicit_destination = 1 msg = email.message_from_string("""\ From: aperson@dom.ain Subject: An implicit message """, Message.Message) self.assertRaises(Hold.ImplicitDestination, Hold.process, self._mlist, msg, {}) def test_implicit_destination_fromusenet(self): self._mlist.require_explicit_destination = 1 msg = email.message_from_string("""\ From: aperson@dom.ain Subject: An implicit message """, Message.Message) rtn = Hold.process(self._mlist, msg, {'fromusenet': 1}) self.assertEqual(rtn, None) def test_suspicious_header(self): self._mlist.bounce_matching_headers = 'From: .*person@(blah.)?dom.ain' msg = email.message_from_string("""\ From: aperson@dom.ain To: _xtest@dom.ain Subject: An implicit message """, Message.Message) self.assertRaises(Hold.SuspiciousHeaders, Hold.process, self._mlist, msg, {}) def test_suspicious_header_ok(self): self._mlist.bounce_matching_headers = 'From: .*person@blah.dom.ain' msg = email.message_from_string("""\ From: aperson@dom.ain To: _xtest@dom.ain Subject: An implicit message """, Message.Message) rtn = Hold.process(self._mlist, msg, {}) self.assertEqual(rtn, None) def test_max_message_size(self): self._mlist.max_message_size = 1 msg = email.message_from_string("""\ From: aperson@dom.ain To: _xtest@dom.ain xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx """, Message.Message) self.assertRaises(Hold.MessageTooBig, Hold.process, self._mlist, msg, {}) def test_hold_notifications(self): eq = self.assertEqual self._mlist.respond_to_post_requests = 1 self._mlist.admin_immed_notify = 1 # Now cause an implicit destination hold msg = email.message_from_string("""\ From: aperson@dom.ain """, Message.Message) self.assertRaises(Hold.ImplicitDestination, Hold.process, self._mlist, msg, {}) # Now we have to make sure there are two messages in the virgin queue, # one to the sender and one to the list owners. qfiles = {} files = self._sb.files() eq(len(files), 2) for filebase in files: qmsg, qdata = self._sb.dequeue(filebase) to = qmsg['to'] qfiles[to] = qmsg, qdata # BAW: We could be testing many other attributes of either the # messages or the metadata files... keys = qfiles.keys() keys.sort() eq(keys, ['_xtest-owner@dom.ain', 'aperson@dom.ain']) # Get the pending cookie from the message to the sender pmsg, pdata = qfiles['aperson@dom.ain'] confirmlines = pmsg.get_payload().split('\n') cookie = confirmlines[-3].split('/')[-1] # We also need to make sure there's an entry in the Pending database # for the heold message. data = self._mlist.pend_confirm(cookie) eq(data, ('H', 1)) heldmsg = os.path.join(mm_cfg.DATA_DIR, 'heldmsg-_xtest-1.pck') self.failUnless(os.path.exists(heldmsg)) os.unlink(heldmsg) holdfiles = [f for f in os.listdir(mm_cfg.DATA_DIR) if f.startswith('heldmsg-')] eq(len(holdfiles), 0) class TestMimeDel(TestBase): def setUp(self): TestBase.setUp(self) self._mlist.filter_content = 1 self._mlist.filter_mime_types = ['image/jpeg'] self._mlist.pass_mime_types = [] self._mlist.convert_html_to_plaintext = 1 self._mlist.collapse_alternatives = 1 def test_outer_matches(self): msg = email.message_from_string("""\ From: aperson@dom.ain Content-Type: image/jpeg MIME-Version: 1.0 xxxxx """) self.assertRaises(Errors.DiscardMessage, MimeDel.process, self._mlist, msg, {}) def test_strain_multipart(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain Content-Type: multipart/mixed; boundary=BOUNDARY MIME-Version: 1.0 --BOUNDARY Content-Type: image/jpeg MIME-Version: 1.0 xxx --BOUNDARY Content-Type: image/gif MIME-Version: 1.0 yyy --BOUNDARY-- """) MimeDel.process(self._mlist, msg, {}) self.assertTrue(not msg.is_multipart()) eq(msg.get_content_type(), 'image/gif') eq(msg.get_payload(), 'yyy') def test_collapse_multipart_alternative(self): eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain Content-Type: multipart/mixed; boundary=BOUNDARY MIME-Version: 1.0 --BOUNDARY Content-Type: multipart/alternative; boundary=BOUND2 MIME-Version: 1.0 --BOUND2 Content-Type: image/jpeg MIME-Version: 1.0 xxx --BOUND2 Content-Type: image/gif MIME-Version: 1.0 yyy --BOUND2-- --BOUNDARY-- """) MimeDel.process(self._mlist, msg, {}) self.assertTrue(not msg.is_multipart()) eq(msg.get_content_type(), 'image/gif') eq(msg.get_payload(), 'yyy') def test_convert_to_plaintext(self): # BAW: This test is dependent on your particular lynx version eq = self.assertEqual msg = email.message_from_string("""\ From: aperson@dom.ain Content-Type: text/html MIME-Version: 1.0 """) MimeDel.process(self._mlist, msg, {}) eq(msg.get_content_type(), 'text/plain') #eq(msg.get_payload(), '\n\n\n') eq(msg.get_payload().strip(), '') def test_deep_structure(self): eq = self.assertEqual self._mlist.filter_mime_types.append('text/html') msg = email.message_from_string("""\ From: aperson@dom.ain Content-Type: multipart/mixed; boundary=AAA --AAA Content-Type: multipart/mixed; boundary=BBB --BBB Content-Type: image/jpeg xxx --BBB Content-Type: image/jpeg yyy --BBB--- --AAA Content-Type: multipart/alternative; boundary=CCC --CCC Content-Type: text/html

This is a header

--CCC Content-Type: text/plain A different message --CCC-- --AAA Content-Type: image/gif zzz --AAA Content-Type: image/gif aaa --AAA-- """) MimeDel.process(self._mlist, msg, {}) payload = msg.get_payload() eq(len(payload), 3) part1 = msg.get_payload(0) eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'A different message') part2 = msg.get_payload(1) eq(part2.get_content_type(), 'image/gif') eq(part2.get_payload(), 'zzz') part3 = msg.get_payload(2) eq(part3.get_content_type(), 'image/gif') eq(part3.get_payload(), 'aaa') def test_top_multipart_alternative(self): eq = self.assertEqual self._mlist.filter_mime_types.append('text/html') msg = email.message_from_string("""\ From: aperson@dom.ain Content-Type: multipart/alternative; boundary=AAA --AAA Content-Type: text/html This is some html --AAA Content-Type: text/plain This is plain text --AAA-- """) MimeDel.process(self._mlist, msg, {}) eq(msg.get_content_type(), 'text/plain') eq(msg.get_payload(), 'This is plain text') def test_recast_multipart(self): eq = self.assertEqual self._mlist.filter_mime_types.append('application/pdf') msg = email.message_from_string("""\ From: aperson@dom.ain MIME-Version: 1.0 Content-type: multipart/mixed; boundary="Boundary_0" --Boundary_0 Content-Type: multipart/mixed; boundary="Boundary_1" --Boundary_1 Content-type: multipart/mixed; boundary="Boundary_2" --Boundary_2 Content-type: multipart/alternative; boundary="Boundary_3" --Boundary_3 Content-type: text/plain; charset=us-ascii Content-transfer-encoding: 7BIT Plain text part --Boundary_3 Content-type: text/html; charset=us-ascii Content-transfer-encoding: 7BIT HTML part --Boundary_3-- --Boundary_2 Content-type: application/pdf Content-transfer-encoding: 7BIT PDF part inner 2 --Boundary_2-- --Boundary_1 Content-type: text/plain; charset=us-ascii Content-transfer-encoding: 7BIT second text --Boundary_1-- --Boundary_0 Content-Type: application/pdf Content-transfer-encoding: 7BIT PDF part outer --Boundary_0-- """) MimeDel.process(self._mlist, msg, {}) payload = msg.get_payload() eq(len(payload), 2) part1 = msg.get_payload(0) eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Plain text part') part2 = msg.get_payload(1) eq(part2.get_content_type(), 'text/plain') eq(part2.get_payload(), 'second text') def test_message_rfc822(self): eq = self.assertEqual msg = email.message_from_string("""\ Message-ID: <4D9E6AEA.1060802@example.net> Date: Thu, 07 Apr 2011 18:54:50 -0700 From: User MIME-Version: 1.0 To: Someone Subject: Message Subject Content-Type: multipart/mixed; boundary="------------050603050603060608020908" This is a multi-part message in MIME format. --------------050603050603060608020908 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit Plain body. --------------050603050603060608020908 Content-Type: message/rfc822 Content-Transfer-Encoding: 7bit Content-Disposition: attachment Message-ID: <4D9E647F.4050308@example.net> Date: Thu, 07 Apr 2011 18:27:27 -0700 From: User1 MIME-Version: 1.0 To: Someone1 Content-Type: multipart/mixed; boundary="------------060107040402070208020705" Subject: Attached Message 1 Subject This is a multi-part message in MIME format. --------------060107040402070208020705 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit Attached Message 1 body. --------------060107040402070208020705 Content-Type: message/rfc822 Content-Transfer-Encoding: 7bit Content-Disposition: attachment From: User2 To: Someone2 Subject: Attached Message 2 Subject Date: Thu, 7 Apr 2011 19:09:35 -0500 Message-ID: MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="----=_NextPart_000_0066_01CBF557.56C6F370" This is a multi-part message in MIME format. ------=_NextPart_000_0066_01CBF557.56C6F370 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Attached Message 2 body. ------=_NextPart_000_0066_01CBF557.56C6F370 Content-Type: message/rfc822 Content-Transfer-Encoding: 7bit Content-Disposition: attachment From: User3 To: Someone3 Subject: Attached Message 3 Subject Date: Thu, 7 Apr 2011 17:22:04 -0500 Message-ID: MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="----=_NextPart_000_0058_01CBF557.56C48270" This is a multi-part message in MIME format. ------=_NextPart_000_0058_01CBF557.56C48270 Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: 7bit Attached Message 3 plain body. ------=_NextPart_000_0058_01CBF557.56C48270 Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Attached Message 3 html body. ------=_NextPart_000_0058_01CBF557.56C48270-- ------=_NextPart_000_0066_01CBF557.56C6F370 Content-Type: message/rfc822 Content-Transfer-Encoding: 7bit Content-Disposition: attachment From: User4 To: Someone4 Subject: Attached Message 4 Subject Date: Thu, 7 Apr 2011 17:24:26 -0500 Message-ID: <19CC3BDF28CF49AD988FF43B2DBC5F1D@example> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="----=_NextPart_000_0060_01CBF557.56C6F370" This is a multi-part message in MIME format. ------=_NextPart_000_0060_01CBF557.56C6F370 Content-Type: multipart/alternative; boundary="----=_NextPart_001_0061_01CBF557.56C6F370" ------=_NextPart_001_0061_01CBF557.56C6F370 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Attached Message 4 plain body. ------=_NextPart_001_0061_01CBF557.56C6F370 Content-Type: text/html; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Attached Message 4 html body. ------=_NextPart_001_0061_01CBF557.56C6F370-- ------=_NextPart_000_0060_01CBF557.56C6F370 Content-Type: message/rfc822 Content-Transfer-Encoding: 7bit Content-Disposition: attachment From: User5 To: Someone5 Subject: Attached Message 5 Subject Date: Thu, 7 Apr 2011 16:24:26 -0500 Message-ID: Content-Type: multipart/alternative; boundary="----=_NextPart_000_005C_01CBF557.56C6F370" This is a multi-part message in MIME format. ------=_NextPart_000_005C_01CBF557.56C6F370 Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: 7bit Attached Message 5 plain body. ------=_NextPart_000_005C_01CBF557.56C6F370 Content-Type: text/html; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable Attached Message 5 html body. ------=_NextPart_000_005C_01CBF557.56C6F370-- ------=_NextPart_000_0060_01CBF557.56C6F370 Content-Type: text/plain; name="ATT00055.txt" Content-Transfer-Encoding: quoted-printable Content-Disposition: attachment; filename="ATT00055.txt" Another plain part. ------=_NextPart_000_0060_01CBF557.56C6F370-- ------=_NextPart_000_0066_01CBF557.56C6F370-- --------------060107040402070208020705 Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline Final plain part. --------------060107040402070208020705-- --------------050603050603060608020908-- """) MimeDel.process(self._mlist, msg, {}) payload = msg.get_payload() eq(len(payload), 2) part1 = msg.get_payload(0) eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Plain body.\n') part2 = msg.get_payload(1) eq(part2.get_content_type(), 'message/rfc822') payload = part2.get_payload() eq(len(payload), 1) part1 = part2.get_payload(0) eq(part1['subject'], 'Attached Message 1 Subject') eq(part1.get_content_type(), 'multipart/mixed') payload = part1.get_payload() eq(len(payload), 3) part3 = part1.get_payload(2) eq(part3.get_content_type(), 'text/plain') eq(part3.get_payload(), 'Final plain part.\n') part2 = part1.get_payload(1) eq(part2.get_content_type(), 'message/rfc822') part1 = part1.get_payload(0) eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Attached Message 1 body.\n') payload = part2.get_payload() eq(len(payload), 1) part1 = part2.get_payload(0) eq(part1['subject'], 'Attached Message 2 Subject') eq(part1.get_content_type(), 'multipart/mixed') payload = part1.get_payload() eq(len(payload), 3) part3 = part1.get_payload(2) eq(part3.get_content_type(), 'message/rfc822') part2 = part1.get_payload(1) eq(part2.get_content_type(), 'message/rfc822') part1 = part1.get_payload(0) eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Attached Message 2 body.\n') payload = part2.get_payload() eq(len(payload), 1) part1 = part2.get_payload(0) eq(part1['subject'], 'Attached Message 3 Subject') eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Attached Message 3 plain body.\n') payload = part3.get_payload() eq(len(payload), 1) part1 = part3.get_payload(0) eq(part1['subject'], 'Attached Message 4 Subject') eq(part1.get_content_type(), 'multipart/mixed') payload = part1.get_payload() eq(len(payload), 3) part3 = part1.get_payload(2) eq(part3.get_content_type(), 'text/plain') eq(part3.get_filename(), 'ATT00055.txt') eq(part3.get_payload(), 'Another plain part.\n') part2 = part1.get_payload(1) eq(part2.get_content_type(), 'message/rfc822') part1 = part1.get_payload(0) eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Attached Message 4 plain body.\n') payload = part2.get_payload() eq(len(payload), 1) part1 = part2.get_payload(0) eq(part1['subject'], 'Attached Message 5 Subject') eq(part1.get_content_type(), 'text/plain') eq(part1.get_payload(), 'Attached Message 5 plain body.\n') class TestModerate(TestBase): pass class TestReplybot(TestBase): pass class TestSpamDetect(TestBase): def test_short_circuit(self): msgdata = {'approved': 1} rtn = SpamDetect.process(self._mlist, None, msgdata) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_spam_detect(self): msg1 = email.message_from_string("""\ From: aperson@dom.ain A message. """) msg2 = email.message_from_string("""\ To: xlist@dom.ain A message. """) spammers = mm_cfg.KNOWN_SPAMMERS[:] try: mm_cfg.KNOWN_SPAMMERS.append(('from', '.?person')) self.assertRaises(SpamDetect.SpamDetected, SpamDetect.process, self._mlist, msg1, {}) rtn = SpamDetect.process(self._mlist, msg2, {}) self.assertEqual(rtn, None) finally: mm_cfg.KNOWN_SPAMMERS = spammers class TestTagger(TestBase): def setUp(self): TestBase.setUp(self) self._mlist.topics = [('bar fight', '.*bar.*', 'catch any bars', 1)] self._mlist.topics_enabled = 1 def test_short_circuit(self): self._mlist.topics_enabled = 0 rtn = Tagger.process(self._mlist, None, {}) # Not really a great test, but there's little else to assert self.assertEqual(rtn, None) def test_simple(self): eq = self.assertEqual mlist = self._mlist mlist.topics_bodylines_limit = 0 msg = email.message_from_string("""\ Subject: foobar Keywords: barbaz """) msgdata = {} Tagger.process(mlist, msg, msgdata) eq(msg['x-topics'], 'bar fight') eq(msgdata.get('topichits'), ['bar fight']) def test_all_body_lines_plain_text(self): eq = self.assertEqual mlist = self._mlist mlist.topics_bodylines_limit = -1 msg = email.message_from_string("""\ Subject: Was Keywords: Raw Subject: farbaw Keywords: barbaz """) msgdata = {} Tagger.process(mlist, msg, msgdata) eq(msg['x-topics'], 'bar fight') eq(msgdata.get('topichits'), ['bar fight']) def test_no_body_lines(self): eq = self.assertEqual mlist = self._mlist mlist.topics_bodylines_limit = 0 msg = email.message_from_string("""\ Subject: Was Keywords: Raw Subject: farbaw Keywords: barbaz """) msgdata = {} Tagger.process(mlist, msg, msgdata) eq(msg['x-topics'], None) eq(msgdata.get('topichits'), None) def test_body_lines_in_multipart(self): eq = self.assertEqual mlist = self._mlist mlist.topics_bodylines_limit = -1 msg = email.message_from_string("""\ Subject: Was Keywords: Raw Content-Type: multipart/alternative; boundary="BOUNDARY" --BOUNDARY From: sabo To: obas Subject: farbaw Keywords: barbaz --BOUNDARY-- """) msgdata = {} Tagger.process(mlist, msg, msgdata) eq(msg['x-topics'], 'bar fight') eq(msgdata.get('topichits'), ['bar fight']) def test_body_lines_no_part(self): eq = self.assertEqual mlist = self._mlist mlist.topics_bodylines_limit = -1 msg = email.message_from_string("""\ Subject: Was Keywords: Raw Content-Type: multipart/alternative; boundary=BOUNDARY --BOUNDARY From: sabo To: obas Content-Type: message/rfc822 Subject: farbaw Keywords: barbaz --BOUNDARY From: sabo To: obas Content-Type: message/rfc822 Subject: farbaw Keywords: barbaz --BOUNDARY-- """) msgdata = {} Tagger.process(mlist, msg, msgdata) eq(msg['x-topics'], None) eq(msgdata.get('topichits'), None) class TestToArchive(TestBase): def setUp(self): TestBase.setUp(self) # We're going to want to inspect this queue directory self._sb = Switchboard(mm_cfg.ARCHQUEUE_DIR) def tearDown(self): for f in os.listdir(mm_cfg.ARCHQUEUE_DIR): os.unlink(os.path.join(mm_cfg.ARCHQUEUE_DIR, f)) TestBase.tearDown(self) def test_short_circuit(self): eq = self.assertEqual msgdata = {'isdigest': 1} ToArchive.process(self._mlist, None, msgdata) eq(len(self._sb.files()), 0) # Try the other half of the or... self._mlist.archive = 0 ToArchive.process(self._mlist, None, msgdata) eq(len(self._sb.files()), 0) # Now try the various message header shortcuts msg = email.message_from_string("""\ X-No-Archive: YES """) self._mlist.archive = 1 ToArchive.process(self._mlist, msg, {}) eq(len(self._sb.files()), 0) # And for backwards compatibility msg = email.message_from_string("""\ X-Archive: NO """) ToArchive.process(self._mlist, msg, {}) eq(len(self._sb.files()), 0) def test_normal_archiving(self): eq = self.assertEqual msg = email.message_from_string("""\ Subject: About Mailman It rocks! """) ToArchive.process(self._mlist, msg, {}) files = self._sb.files() eq(len(files), 1) msg2, data = self._sb.dequeue(files[0]) eq(len(data), 3) eq(data['_parsemsg'], False) eq(data['version'], 3) # Clock skew makes this unreliable #self.failUnless(data['received_time'] <= time.time()) eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0)) class TestToDigest(TestBase): def _makemsg(self, i=0): msg = email.message_from_string("""From: aperson@dom.ain To: _xtest@dom.ain Subject: message number %(i)d Here is message %(i)d """ % {'i' : i}) return msg def setUp(self): TestBase.setUp(self) self._path = os.path.join(self._mlist.fullpath(), 'digest.mbox') fp = open(self._path, 'w') g = Generator(fp) for i in range(5): g.flatten(self._makemsg(i), unixfrom=1) fp.close() self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR) def tearDown(self): try: os.unlink(self._path) except OSError, e: if e.errno <> errno.ENOENT: raise for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR): os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f)) TestBase.tearDown(self) def test_short_circuit(self): eq = self.assertEqual mlist = self._mlist mlist.digestable = 0 eq(ToDigest.process(mlist, None, {}), None) mlist.digestable = 1 eq(ToDigest.process(mlist, None, {'isdigest': 1}), None) eq(self._sb.files(), []) def test_undersized(self): msg = self._makemsg(99) size = os.path.getsize(self._path) + len(str(msg)) self._mlist.digest_size_threshhold = (size + 1) * 1024 ToDigest.process(self._mlist, msg, {}) self.assertEqual(self._sb.files(), []) def test_send_a_digest(self): eq = self.assertEqual mlist = self._mlist msg = self._makemsg(99) size = os.path.getsize(self._path) + len(str(msg)) # Set digest_size_threshhold to a very small value to force a digest. # Setting to zero no longer works. mlist.digest_size_threshhold = 0.001 ToDigest.process(mlist, msg, {}) files = self._sb.files() # There should be two files in the queue, one for the MIME digest and # one for the RFC 1153 digest. eq(len(files), 2) # Now figure out which of the two files is the MIME digest and which # is the RFC 1153 digest. for filebase in files: qmsg, qdata = self._sb.dequeue(filebase) if qmsg.get_content_maintype() == 'multipart': mimemsg = qmsg mimedata = qdata else: rfc1153msg = qmsg rfc1153data = qdata eq(rfc1153msg.get_content_type(), 'text/plain') eq(mimemsg.get_content_type(), 'multipart/mixed') eq(mimemsg['from'], mlist.GetRequestEmail()) eq(mimemsg['subject'], '%(realname)s Digest, Vol %(volume)d, Issue %(issue)d' % { 'realname': mlist.real_name, 'volume' : mlist.volume, 'issue' : mlist.next_digest_number - 1, }) eq(mimemsg['to'], mlist.GetListEmail()) # BAW: this test is incomplete... class TestToOutgoing(TestBase): def setUp(self): TestBase.setUp(self) # We're going to want to inspect this queue directory self._sb = Switchboard(mm_cfg.OUTQUEUE_DIR) def tearDown(self): for f in os.listdir(mm_cfg.OUTQUEUE_DIR): os.unlink(os.path.join(mm_cfg.OUTQUEUE_DIR, f)) TestBase.tearDown(self) def test_outgoing(self): eq = self.assertEqual msg = email.message_from_string("""\ Subject: About Mailman It rocks! """) msgdata = {'foo': 1, 'bar': 2} ToOutgoing.process(self._mlist, msg, msgdata) files = self._sb.files() eq(len(files), 1) msg2, data = self._sb.dequeue(files[0]) eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0)) self.failUnless(len(data) >= 6 and len(data) <= 7) eq(data['foo'], 1) eq(data['bar'], 2) eq(data['version'], 3) eq(data['listname'], '_xtest') eq(data['_parsemsg'], False) # Can't test verp. presence/value depend on mm_cfg.py #eq(data['verp'], 1) # Clock skew makes this unreliable #self.failUnless(data['received_time'] <= time.time()) class TestToUsenet(TestBase): def setUp(self): TestBase.setUp(self) # We're going to want to inspect this queue directory self._sb = Switchboard(mm_cfg.NEWSQUEUE_DIR) def tearDown(self): for f in os.listdir(mm_cfg.NEWSQUEUE_DIR): os.unlink(os.path.join(mm_cfg.NEWSQUEUE_DIR, f)) TestBase.tearDown(self) def test_short_circuit(self): eq = self.assertEqual mlist = self._mlist mlist.gateway_to_news = 0 ToUsenet.process(mlist, None, {}) eq(len(self._sb.files()), 0) mlist.gateway_to_news = 1 ToUsenet.process(mlist, None, {'isdigest': 1}) eq(len(self._sb.files()), 0) ToUsenet.process(mlist, None, {'fromusenet': 1}) eq(len(self._sb.files()), 0) def test_to_usenet(self): # BAW: Should we, can we, test the error conditions that only log to a # file instead of raising an exception? eq = self.assertEqual mlist = self._mlist mlist.gateway_to_news = 1 mlist.linked_newsgroup = 'foo' mlist.nntp_host = 'bar' msg = email.message_from_string("""\ Subject: About Mailman Mailman rocks! """) ToUsenet.process(mlist, msg, {}) files = self._sb.files() eq(len(files), 1) msg2, data = self._sb.dequeue(files[0]) eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0)) eq(data['version'], 3) eq(data['listname'], '_xtest') # Clock skew makes this unreliable #self.failUnless(data['received_time'] <= time.time()) def suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestAcknowledge)) suite.addTest(unittest.makeSuite(TestAfterDelivery)) suite.addTest(unittest.makeSuite(TestApprove)) suite.addTest(unittest.makeSuite(TestCalcRecips)) suite.addTest(unittest.makeSuite(TestCleanse)) suite.addTest(unittest.makeSuite(TestCookHeaders)) suite.addTest(unittest.makeSuite(TestDecorate)) suite.addTest(unittest.makeSuite(TestFileRecips)) suite.addTest(unittest.makeSuite(TestHold)) suite.addTest(unittest.makeSuite(TestMimeDel)) suite.addTest(unittest.makeSuite(TestModerate)) suite.addTest(unittest.makeSuite(TestReplybot)) suite.addTest(unittest.makeSuite(TestSpamDetect)) suite.addTest(unittest.makeSuite(TestTagger)) suite.addTest(unittest.makeSuite(TestToArchive)) suite.addTest(unittest.makeSuite(TestToDigest)) suite.addTest(unittest.makeSuite(TestToOutgoing)) suite.addTest(unittest.makeSuite(TestToUsenet)) return suite if __name__ == '__main__': unittest.main(defaultTest='suite')