1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 Tests auth logging tests that exercise SamLogon
24 from samba
.samdb
import SamDB
25 import samba
.tests
.auth_log_base
26 from samba
.credentials
import (
31 from samba
.dcerpc
import ntlmssp
, netlogon
32 from samba
.dcerpc
.dcerpc
import AS_SYSTEM_MAGIC_PATH_TOKEN
33 from samba
.ndr
import ndr_pack
34 from samba
.auth
import system_session
35 from samba
.tests
import delete_force
36 from samba
.dsdb
import UF_WORKSTATION_TRUST_ACCOUNT
, UF_PASSWD_NOTREQD
37 from samba
.dcerpc
.misc
import SEC_CHAN_WKSTA
38 from samba
.compat
import text_type
39 from samba
.dcerpc
.windows_event_ids
import (
40 EVT_ID_SUCCESSFUL_LOGON
,
45 class AuthLogTestsSamLogon(samba
.tests
.auth_log_base
.AuthLogTestBase
):
48 super(AuthLogTestsSamLogon
, self
).setUp()
49 self
.lp
= samba
.tests
.env_loadparm()
50 self
.creds
= Credentials()
52 self
.session
= system_session()
54 session_info
=self
.session
,
55 credentials
=self
.creds
,
58 self
.domain
= os
.environ
["DOMAIN"]
59 self
.netbios_name
= "SamLogonTest"
60 self
.machinepass
= "abcdefghij"
61 self
.remoteAddress
= AS_SYSTEM_MAGIC_PATH_TOKEN
62 self
.base_dn
= self
.ldb
.domain_dn()
63 self
.samlogon_dn
= ("cn=%s,cn=users,%s" %
64 (self
.netbios_name
, self
.base_dn
))
67 super(AuthLogTestsSamLogon
, self
).tearDown()
68 delete_force(self
.ldb
, self
.samlogon_dn
)
70 def _test_samlogon(self
, binding
, creds
, checkFunction
):
72 def isLastExpectedMessage(msg
):
74 msg
["type"] == "Authentication" and
75 msg
["Authentication"]["serviceDescription"] == "SamLogon" and
76 msg
["Authentication"]["authDescription"] == "network" and
77 msg
["Authentication"]["passwordType"] == "NTLMv2" and
78 (msg
["Authentication"]["eventId"] ==
79 EVT_ID_SUCCESSFUL_LOGON
) and
80 (msg
["Authentication"]["logonType"] == EVT_LOGON_NETWORK
))
83 binding
= "[schannel,%s]" % binding
85 binding
= "[schannel]"
87 utf16pw
= text_type('"' + self
.machinepass
+ '"').encode('utf-16-le')
89 "dn": self
.samlogon_dn
,
90 "objectclass": "computer",
91 "sAMAccountName": "%s$" % self
.netbios_name
,
93 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD
),
94 "unicodePwd": utf16pw
})
96 machine_creds
= Credentials()
97 machine_creds
.guess(self
.get_loadparm())
98 machine_creds
.set_secure_channel_type(SEC_CHAN_WKSTA
)
99 machine_creds
.set_password(self
.machinepass
)
100 machine_creds
.set_username(self
.netbios_name
+ "$")
102 netlogon_conn
= netlogon
.netlogon("ncalrpc:%s" % binding
,
105 challenge
= b
"abcdefgh"
107 target_info
= ntlmssp
.AV_PAIR_LIST()
108 target_info
.count
= 3
110 domainname
= ntlmssp
.AV_PAIR()
111 domainname
.AvId
= ntlmssp
.MsvAvNbDomainName
112 domainname
.Value
= self
.domain
114 computername
= ntlmssp
.AV_PAIR()
115 computername
.AvId
= ntlmssp
.MsvAvNbComputerName
116 computername
.Value
= self
.netbios_name
118 eol
= ntlmssp
.AV_PAIR()
119 eol
.AvId
= ntlmssp
.MsvAvEOL
120 target_info
.pair
= [domainname
, computername
, eol
]
122 target_info_blob
= ndr_pack(target_info
)
124 response
= creds
.get_ntlm_response(flags
=CLI_CRED_NTLMv2_AUTH
,
126 target_info
=target_info_blob
)
130 logon_level
= netlogon
.NetlogonNetworkTransitiveInformation
131 logon
= samba
.dcerpc
.netlogon
.netr_NetworkInfo()
134 x
if isinstance(x
, int) else ord(x
) for x
in challenge
]
135 logon
.nt
= netlogon
.netr_ChallengeResponse()
136 logon
.nt
.length
= len(response
["nt_response"])
138 x
if isinstance(x
, int) else ord(x
) for
139 x
in response
["nt_response"]
141 logon
.identity_info
= samba
.dcerpc
.netlogon
.netr_IdentityInfo()
142 (username
, domain
) = creds
.get_ntlm_username_domain()
144 logon
.identity_info
.domain_name
.string
= domain
145 logon
.identity_info
.account_name
.string
= username
146 logon
.identity_info
.workstation
.string
= creds
.get_workstation()
148 validation_level
= samba
.dcerpc
.netlogon
.NetlogonValidationSamInfo4
150 result
= netlogon_conn
.netr_LogonSamLogonEx(
151 os
.environ
["SERVER"],
152 machine_creds
.get_workstation(),
154 validation_level
, netr_flags
)
156 (validation
, authoritative
, netr_flags_out
) = result
158 messages
= self
.waitForMessages(isLastExpectedMessage
, netlogon_conn
)
159 checkFunction(messages
)
161 def samlogon_check(self
, messages
):
163 messages
= self
.remove_netlogon_messages(messages
)
164 expected_messages
= 5
165 self
.assertEquals(expected_messages
,
167 "Did not receive the expected number of messages")
169 # Check the first message it should be an Authorization
171 self
.assertEquals("Authorization", msg
["type"])
172 self
.assertEquals("DCE/RPC",
173 msg
["Authorization"]["serviceDescription"])
174 self
.assertEquals("ncalrpc", msg
["Authorization"]["authType"])
175 self
.assertEquals("NONE", msg
["Authorization"]["transportProtection"])
176 self
.assertTrue(self
.is_guid(msg
["Authorization"]["sessionId"]))
178 def test_ncalrpc_samlogon(self
):
180 creds
= self
.insta_creds(template
=self
.get_credentials(),
181 kerberos_state
=DONT_USE_KERBEROS
)
183 self
._test
_samlogon
("SEAL", creds
, self
.samlogon_check
)
184 except Exception as e
:
185 self
.fail("Unexpected exception: " + str(e
))