Fix header match rule suffix inflation.
[mailman.git] / src / mailman / chains / tests / test_headers.py
blob6cd37c98868c4548cf61ab10852d50049b449d8f
1 # Copyright (C) 2012-2016 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18 """Test the header chain."""
20 import unittest
22 from mailman.app.lifecycle import create_list
23 from mailman.chains.headers import HeaderMatchRule, make_link
24 from mailman.config import config
25 from mailman.core.chains import process
26 from mailman.email.message import Message
27 from mailman.interfaces.chain import DiscardEvent, HoldEvent, LinkAction
28 from mailman.interfaces.mailinglist import IHeaderMatchList
29 from mailman.testing.helpers import (
30 LogFileMark, configuration, event_subscribers,
31 specialized_message_from_string as mfs)
32 from mailman.testing.layers import ConfigLayer
35 class TestHeaderChain(unittest.TestCase):
36 """Test the header chain code."""
38 layer = ConfigLayer
40 def setUp(self):
41 self._mlist = create_list('test@example.com')
43 def test_make_link(self):
44 # Test that make_link() with no given chain creates a Link with a
45 # deferred link action.
46 link = make_link('Subject', '[tT]esting')
47 self.assertEqual(link.rule.header, 'Subject')
48 self.assertEqual(link.rule.pattern, '[tT]esting')
49 self.assertEqual(link.action, LinkAction.defer)
50 self.assertIsNone(link.chain)
52 def test_make_link_with_chain(self):
53 # Test that make_link() with a given chain creates a Link with a jump
54 # action to the chain.
55 link = make_link('Subject', '[tT]esting', 'accept')
56 self.assertEqual(link.rule.header, 'Subject')
57 self.assertEqual(link.rule.pattern, '[tT]esting')
58 self.assertEqual(link.action, LinkAction.jump)
59 self.assertEqual(link.chain, config.chains['accept'])
61 @configuration('antispam', header_checks="""
62 Foo: a+
63 Bar: bb?
64 """)
65 def test_config_checks(self):
66 # Test that the header-match chain has the header checks from the
67 # configuration file.
68 chain = config.chains['header-match']
69 # The links are created dynamically; the rule names will all start
70 # with the same prefix, but have a variable suffix. The actions will
71 # all be to jump to the named chain. Do these checks now, while we
72 # collect other useful information.
73 post_checks = []
74 saw_any_rule = False
75 for link in chain.get_links(self._mlist, Message(), {}):
76 if link.rule.name == 'any':
77 saw_any_rule = True
78 self.assertEqual(link.action, LinkAction.jump)
79 elif saw_any_rule:
80 raise AssertionError("'any' rule was not last")
81 else:
82 self.assertEqual(link.rule.name[:13], 'header-match-')
83 self.assertEqual(link.action, LinkAction.defer)
84 post_checks.append((link.rule.header, link.rule.pattern))
85 self.assertListEqual(post_checks, [
86 ('Foo', 'a+'),
87 ('Bar', 'bb?'),
90 @configuration('antispam', header_checks="""
91 Foo: foo
92 A-bad-line
93 Bar: bar
94 """)
95 def test_bad_configuration_line(self):
96 # Take a mark on the error log file.
97 mark = LogFileMark('mailman.error')
98 # A bad value in [antispam]header_checks should just get ignored, but
99 # with an error message logged.
100 chain = config.chains['header-match']
101 # The links are created dynamically; the rule names will all start
102 # with the same prefix, but have a variable suffix. The actions will
103 # all be to jump to the named chain. Do these checks now, while we
104 # collect other useful information.
105 post_checks = []
106 saw_any_rule = False
107 for link in chain.get_links(self._mlist, Message(), {}):
108 if link.rule.name == 'any':
109 saw_any_rule = True
110 self.assertEqual(link.action, LinkAction.jump)
111 elif saw_any_rule:
112 raise AssertionError("'any' rule was not last")
113 else:
114 self.assertEqual(link.rule.name[:13], 'header-match-')
115 self.assertEqual(link.action, LinkAction.defer)
116 post_checks.append((link.rule.header, link.rule.pattern))
117 self.assertListEqual(post_checks, [
118 ('Foo', 'foo'),
119 ('Bar', 'bar'),
121 # Check the error log.
122 self.assertEqual(mark.readline()[-77:-1],
123 'Configuration error: [antispam]header_checks '
124 'contains bogus line: A-bad-line')
126 def test_duplicate_header_match_rule(self):
127 # 100% coverage: test an assertion in a corner case.
129 # Save the existing rules so they can be restored later.
130 saved_rules = config.rules.copy()
131 self.addCleanup(setattr, config, 'rules', saved_rules)
132 HeaderMatchRule('x-spam-score', '*', suffix='100')
133 self.assertRaises(AssertionError,
134 HeaderMatchRule, 'x-spam-score', '.*', suffix='100')
136 def test_list_rule(self):
137 # Test that the header-match chain has the header checks from the
138 # mailing-list configuration.
139 chain = config.chains['header-match']
140 header_matches = IHeaderMatchList(self._mlist)
141 header_matches.append('Foo', 'a+')
142 links = [link for link in chain.get_links(self._mlist, Message(), {})
143 if link.rule.name != 'any']
144 self.assertEqual(len(links), 1)
145 self.assertEqual(links[0].action, LinkAction.jump)
146 self.assertEqual(links[0].chain.name, config.antispam.jump_chain)
147 self.assertEqual(links[0].rule.header, 'foo')
148 self.assertEqual(links[0].rule.pattern, 'a+')
149 self.assertTrue(links[0].rule.name.startswith(
150 'header-match-test.example.com-'))
152 def test_list_complex_rule(self):
153 # Test that the mailing-list header-match complex rules are read
154 # properly.
155 chain = config.chains['header-match']
156 header_matches = IHeaderMatchList(self._mlist)
157 header_matches.append('Foo', 'a+', 'reject')
158 header_matches.append('Bar', 'b+', 'discard')
159 header_matches.append('Baz', 'z+', 'accept')
160 links = [link for link in chain.get_links(self._mlist, Message(), {})
161 if link.rule.name != 'any']
162 self.assertEqual(len(links), 3)
163 self.assertEqual([
164 (link.rule.header, link.rule.pattern, link.action, link.chain.name)
165 for link in links
167 [('foo', 'a+', LinkAction.jump, 'reject'),
168 ('bar', 'b+', LinkAction.jump, 'discard'),
169 ('baz', 'z+', LinkAction.jump, 'accept'),
170 ]) # noqa
172 @configuration('antispam', header_checks="""
173 Foo: foo
174 """, jump_chain='hold')
175 def test_priority_site_over_list(self):
176 # Test that the site-wide checks take precedence over the list-specific
177 # checks.
178 msg = mfs("""\
179 From: anne@example.com
180 To: test@example.com
181 Subject: A message
182 Message-ID: <ant>
183 Foo: foo
184 MIME-Version: 1.0
186 A message body.
187 """)
188 msgdata = {}
189 header_matches = IHeaderMatchList(self._mlist)
190 header_matches.append('Foo', 'foo', 'accept')
191 # This event subscriber records the event that occurs when the message
192 # is processed by the owner chain.
193 events = []
194 with event_subscribers(events.append):
195 process(self._mlist, msg, msgdata, start_chain='header-match')
196 self.assertEqual(len(events), 1)
197 event = events[0]
198 # Site-wide wants to hold the message, the list wants to accept it.
199 self.assertIsInstance(event, HoldEvent)
200 self.assertEqual(event.chain, config.chains['hold'])
202 def test_no_action_defaults_to_site_wide_action(self):
203 # If the list-specific header check matches, but there is no defined
204 # action, the site-wide antispam action is used.
205 msg = mfs("""\
206 From: anne@example.com
207 To: test@example.com
208 Subject: A message
209 Message-ID: <ant>
210 Foo: foo
211 MIME-Version: 1.0
213 A message body.
214 """)
215 header_matches = IHeaderMatchList(self._mlist)
216 header_matches.append('Foo', 'foo')
217 # This event subscriber records the event that occurs when the message
218 # is processed by the owner chain, which holds its for approval.
219 events = []
220 def record_holds(event): # noqa
221 if not isinstance(event, HoldEvent):
222 return
223 events.append(event)
224 with event_subscribers(record_holds):
225 # Set the site-wide antispam action to hold the message.
226 with configuration('antispam', header_checks="""
227 Spam: [*]{3,}
228 """, jump_chain='hold'): # noqa
229 process(self._mlist, msg, {}, start_chain='header-match')
230 self.assertEqual(len(events), 1)
231 event = events[0]
232 self.assertIsInstance(event, HoldEvent)
233 self.assertEqual(event.chain, config.chains['hold'])
234 self.assertEqual(event.mlist, self._mlist)
235 self.assertEqual(event.msg, msg)
236 events = []
237 def record_discards(event): # noqa
238 if not isinstance(event, DiscardEvent):
239 return
240 events.append(event)
241 with event_subscribers(record_discards):
242 # Set the site-wide default to discard the message.
243 msg.replace_header('Message-Id', '<bee>')
244 with configuration('antispam', header_checks="""
245 Spam: [*]{3,}
246 """, jump_chain='discard'): # noqa
247 process(self._mlist, msg, {}, start_chain='header-match')
248 self.assertEqual(len(events), 1)
249 event = events[0]
250 self.assertIsInstance(event, DiscardEvent)
251 self.assertEqual(event.chain, config.chains['discard'])
252 self.assertEqual(event.mlist, self._mlist)
253 self.assertEqual(event.msg, msg)
255 @configuration('antispam', header_checks="""
256 Header1: a+
257 """, jump_chain='hold')
258 def test_reuse_rules(self):
259 # Test that existing header-match rules are used instead of creating
260 # new ones.
261 chain = config.chains['header-match']
262 header_matches = IHeaderMatchList(self._mlist)
263 header_matches.append('Header2', 'b+')
264 header_matches.append('Header3', 'c+')
265 def get_links(): # noqa
266 return [
267 link for link in chain.get_links(self._mlist, Message(), {})
268 if link.rule.name != 'any'
270 links_1 = get_links()
271 self.assertEqual(len(links_1), 3)
272 links_2 = get_links()
273 # The link rules both have the same name...
274 self.assertEqual(
275 [l.rule.name for l in links_1],
276 [l.rule.name for l in links_2],
278 # ...and are actually the identical objects.
279 for link1, link2 in zip(links_1, links_2):
280 self.assertIs(link1.rule, link2.rule)