1 # Unit and integration tests for traffic.py
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # from pprint import pprint
19 from io
import StringIO
23 from samba
.emulate
import traffic
26 TEST_FILE
= 'testdata/traffic-sample-very-short.txt'
29 class TrafficEmulatorTests(samba
.tests
.TestCase
):
31 self
.model
= traffic
.TrafficModel()
36 def test_parse_ngrams_dns_included(self
):
37 model
= traffic
.TrafficModel()
42 dns_counts
) = traffic
.ingest_summaries([f
], dns_mode
='include')
44 model
.learn(conversations
)
46 ('-', '-'): ['dns:0', 'dns:0', 'dns:0', 'ldap:3'],
47 ('-', 'dns:0'): ['dns:0', 'dns:0', 'dns:0'],
48 ('-', 'ldap:3'): ['wait:0'],
49 ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
50 ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
51 ('dns:0', 'dns:0'): ['dns:0', 'dns:0', 'dns:0', 'wait:0'],
52 ('dns:0', 'wait:0'): ['cldap:3'],
53 ('kerberos:', 'ldap:3'): ['-'],
54 ('ldap:3', 'wait:0'): ['ldap:2'],
55 ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
56 ('wait:0', 'cldap:3'): ['cldap:3'],
57 ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
59 expected_query_details
= {
60 'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
61 ('', '', '', 'Netlogon', '', '', ''),
62 ('', '', '', 'Netlogon', '', '', '')],
63 'dns:0': [(), (), (), (), (), (), (), (), ()],
65 'ldap:2': [('', '', '', '', '', '', '')],
69 'subschemaSubentry,dsServiceName,namingContexts,'
70 'defaultNamingContext,schemaNamingContext,'
71 'configurationNamingContext,rootDomainNamingContext,'
72 'supportedControl,supportedLDAPVersion,'
73 'supportedLDAPPolicies,supportedSASLMechanisms,'
74 'dnsHostName,ldapServiceName,serverName,'
75 'supportedCapabilities',
79 ('2', 'DC,DC', '', 'cn', '', '', '')],
80 'rpc_netlogon:29': [()]
83 ngrams
= {k
: sorted(v
) for k
, v
in model
.ngrams
.items()}
84 details
= {k
: sorted(v
) for k
, v
in model
.query_details
.items()}
86 self
.assertEqual(expected_ngrams
, ngrams
)
87 self
.assertEqual(expected_query_details
, details
)
88 # We use a stringIO instead of a temporary file
92 model2
= traffic
.TrafficModel()
96 ngrams
= {k
: sorted(v
) for k
, v
in model2
.ngrams
.items()}
97 details
= {k
: sorted(v
) for k
, v
in model2
.query_details
.items()}
98 self
.assertEqual(expected_ngrams
, ngrams
)
99 self
.assertEqual(expected_query_details
, details
)
101 def test_parse_ngrams(self
):
106 dns_counts
) = traffic
.ingest_summaries([f
])
108 self
.model
.learn(conversations
, dns_counts
)
110 # pprint(self.model.ngrams, width=50)
111 # print 'query_details'
112 # pprint(self.model.query_details, width=55)
114 ('-', '-'): ['cldap:3', 'ldap:3'],
115 ('-', 'cldap:3'): ['cldap:3'],
116 ('-', 'ldap:3'): ['wait:0'],
117 ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
118 ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
119 ('kerberos:', 'ldap:3'): ['-'],
120 ('ldap:3', 'wait:0'): ['ldap:2'],
121 ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
122 ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
125 expected_query_details
= {
126 'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
127 ('', '', '', 'Netlogon', '', '', ''),
128 ('', '', '', 'Netlogon', '', '', '')],
129 'kerberos:': [('',)],
130 'ldap:2': [('', '', '', '', '', '', '')],
134 'subschemaSubentry,dsServiceName,namingContexts,'
135 'defaultNamingContext,schemaNamingContext,'
136 'configurationNamingContext,rootDomainNamingContext,'
137 'supportedControl,supportedLDAPVersion,'
138 'supportedLDAPPolicies,supportedSASLMechanisms,'
139 'dnsHostName,ldapServiceName,serverName,'
140 'supportedCapabilities',
144 ('2', 'DC,DC', '', 'cn', '', '', '')],
145 'rpc_netlogon:29': [()]
148 ngrams
= {k
: sorted(v
) for k
, v
in self
.model
.ngrams
.items()}
149 details
= {k
: sorted(v
) for k
, v
in self
.model
.query_details
.items()}
151 self
.assertEqual(expected_ngrams
, ngrams
)
152 self
.assertEqual(expected_query_details
, details
)
153 # We use a stringIO instead of a temporary file
157 model2
= traffic
.TrafficModel()
161 ngrams
= {k
: sorted(v
) for k
, v
in model2
.ngrams
.items()}
162 details
= {k
: sorted(v
) for k
, v
in model2
.query_details
.items()}
163 self
.assertEqual(expected_ngrams
, ngrams
)
164 self
.assertEqual(expected_query_details
, details
)