python:tests: Fix typo
[Samba.git] / python / samba / tests / emulate / traffic.py
blob63fbd10339599f4d7780f05956da7711138d4c35
1 # Unit and integration tests for traffic.py
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # from pprint import pprint
19 from io import StringIO
21 import samba.tests
23 from samba.emulate import traffic
26 TEST_FILE = 'testdata/traffic-sample-very-short.txt'
29 class TrafficEmulatorTests(samba.tests.TestCase):
30 def setUp(self):
31 self.model = traffic.TrafficModel()
33 def tearDown(self):
34 del self.model
36 def test_parse_ngrams_dns_included(self):
37 model = traffic.TrafficModel()
38 f = open(TEST_FILE)
39 (conversations,
40 interval,
41 duration,
42 dns_counts) = traffic.ingest_summaries([f], dns_mode='include')
43 f.close()
44 model.learn(conversations)
45 expected_ngrams = {
46 ('-', '-'): ['dns:0', 'dns:0', 'dns:0', 'ldap:3'],
47 ('-', 'dns:0'): ['dns:0', 'dns:0', 'dns:0'],
48 ('-', 'ldap:3'): ['wait:0'],
49 ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
50 ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
51 ('dns:0', 'dns:0'): ['dns:0', 'dns:0', 'dns:0', 'wait:0'],
52 ('dns:0', 'wait:0'): ['cldap:3'],
53 ('kerberos:', 'ldap:3'): ['-'],
54 ('ldap:3', 'wait:0'): ['ldap:2'],
55 ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
56 ('wait:0', 'cldap:3'): ['cldap:3'],
57 ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
59 expected_query_details = {
60 'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
61 ('', '', '', 'Netlogon', '', '', ''),
62 ('', '', '', 'Netlogon', '', '', '')],
63 'dns:0': [(), (), (), (), (), (), (), (), ()],
64 'kerberos:': [('',)],
65 'ldap:2': [('', '', '', '', '', '', '')],
66 'ldap:3': [('',
67 '',
68 '',
69 'subschemaSubentry,dsServiceName,namingContexts,'
70 'defaultNamingContext,schemaNamingContext,'
71 'configurationNamingContext,rootDomainNamingContext,'
72 'supportedControl,supportedLDAPVersion,'
73 'supportedLDAPPolicies,supportedSASLMechanisms,'
74 'dnsHostName,ldapServiceName,serverName,'
75 'supportedCapabilities',
76 '',
77 '',
78 ''),
79 ('2', 'DC,DC', '', 'cn', '', '', '')],
80 'rpc_netlogon:29': [()]
82 self.maxDiff = 5000
83 ngrams = {k: sorted(v) for k, v in model.ngrams.items()}
84 details = {k: sorted(v) for k, v in model.query_details.items()}
86 self.assertEqual(expected_ngrams, ngrams)
87 self.assertEqual(expected_query_details, details)
88 # We use a stringIO instead of a temporary file
89 f = StringIO()
90 model.save(f)
92 model2 = traffic.TrafficModel()
93 f.seek(0)
94 model2.load(f)
96 ngrams = {k: sorted(v) for k, v in model2.ngrams.items()}
97 details = {k: sorted(v) for k, v in model2.query_details.items()}
98 self.assertEqual(expected_ngrams, ngrams)
99 self.assertEqual(expected_query_details, details)
101 def test_parse_ngrams(self):
102 f = open(TEST_FILE)
103 (conversations,
104 interval,
105 duration,
106 dns_counts) = traffic.ingest_summaries([f])
107 f.close()
108 self.model.learn(conversations, dns_counts)
109 # print 'ngrams'
110 # pprint(self.model.ngrams, width=50)
111 # print 'query_details'
112 # pprint(self.model.query_details, width=55)
113 expected_ngrams = {
114 ('-', '-'): ['cldap:3', 'ldap:3'],
115 ('-', 'cldap:3'): ['cldap:3'],
116 ('-', 'ldap:3'): ['wait:0'],
117 ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
118 ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
119 ('kerberos:', 'ldap:3'): ['-'],
120 ('ldap:3', 'wait:0'): ['ldap:2'],
121 ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
122 ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
125 expected_query_details = {
126 'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
127 ('', '', '', 'Netlogon', '', '', ''),
128 ('', '', '', 'Netlogon', '', '', '')],
129 'kerberos:': [('',)],
130 'ldap:2': [('', '', '', '', '', '', '')],
131 'ldap:3': [('',
134 'subschemaSubentry,dsServiceName,namingContexts,'
135 'defaultNamingContext,schemaNamingContext,'
136 'configurationNamingContext,rootDomainNamingContext,'
137 'supportedControl,supportedLDAPVersion,'
138 'supportedLDAPPolicies,supportedSASLMechanisms,'
139 'dnsHostName,ldapServiceName,serverName,'
140 'supportedCapabilities',
143 ''),
144 ('2', 'DC,DC', '', 'cn', '', '', '')],
145 'rpc_netlogon:29': [()]
147 self.maxDiff = 5000
148 ngrams = {k: sorted(v) for k, v in self.model.ngrams.items()}
149 details = {k: sorted(v) for k, v in self.model.query_details.items()}
151 self.assertEqual(expected_ngrams, ngrams)
152 self.assertEqual(expected_query_details, details)
153 # We use a stringIO instead of a temporary file
154 f = StringIO()
155 self.model.save(f)
157 model2 = traffic.TrafficModel()
158 f.seek(0)
159 model2.load(f)
161 ngrams = {k: sorted(v) for k, v in model2.ngrams.items()}
162 details = {k: sorted(v) for k, v in model2.query_details.items()}
163 self.assertEqual(expected_ngrams, ngrams)
164 self.assertEqual(expected_query_details, details)