Edit changelog a little for clarity and conciseness
[tor.git] / src / test / ntor_ref.py
blobdf065853f36be866e16a60fa988b976b0b42a7ce
1 #!/usr/bin/python
2 # Copyright 2012-2015, The Tor Project, Inc
3 # See LICENSE for licensing information
5 """
6 ntor_ref.py
9 This module is a reference implementation for the "ntor" protocol
10 s proposed by Goldberg, Stebila, and Ustaoglu and as instantiated in
11 Tor Proposal 216.
13 It's meant to be used to validate Tor's ntor implementation. It
14 requirs the curve25519 python module from the curve25519-donna
15 package.
17 *** DO NOT USE THIS IN PRODUCTION. ***
19 commands:
21 gen_kdf_vectors: Print out some test vectors for the RFC5869 KDF.
22 timing: Print a little timing information about this implementation's
23 handshake.
24 self-test: Try handshaking with ourself; make sure we can.
25 test-tor: Handshake with tor's ntor implementation via the program
26 src/test/test-ntor-cl; make sure we can.
28 """
30 import binascii
31 try:
32 import curve25519
33 curve25519mod = curve25519.keys
34 except ImportError:
35 curve25519 = None
36 import slownacl_curve25519
37 curve25519mod = slownacl_curve25519
39 import hashlib
40 import hmac
41 import subprocess
42 import sys
44 # **********************************************************************
45 # Helpers and constants
47 def HMAC(key,msg):
48 "Return the HMAC-SHA256 of 'msg' using the key 'key'."
49 H = hmac.new(key, b"", hashlib.sha256)
50 H.update(msg)
51 return H.digest()
53 def H(msg,tweak):
54 """Return the hash of 'msg' using tweak 'tweak'. (In this version of ntor,
55 the tweaked hash is just HMAC with the tweak as the key.)"""
56 return HMAC(key=tweak,
57 msg=msg)
59 def keyid(k):
60 """Return the 32-byte key ID of a public key 'k'. (Since we're
61 using curve25519, we let k be its own keyid.)
62 """
63 return k.serialize()
65 NODE_ID_LENGTH = 20
66 KEYID_LENGTH = 32
67 G_LENGTH = 32
68 H_LENGTH = 32
70 PROTOID = b"ntor-curve25519-sha256-1"
71 M_EXPAND = PROTOID + b":key_expand"
72 T_MAC = PROTOID + b":mac"
73 T_KEY = PROTOID + b":key_extract"
74 T_VERIFY = PROTOID + b":verify"
76 def H_mac(msg): return H(msg, tweak=T_MAC)
77 def H_verify(msg): return H(msg, tweak=T_VERIFY)
79 class PrivateKey(curve25519mod.Private):
80 """As curve25519mod.Private, but doesn't regenerate its public key
81 every time you ask for it.
82 """
83 def __init__(self):
84 curve25519mod.Private.__init__(self)
85 self._memo_public = None
87 def get_public(self):
88 if self._memo_public is None:
89 self._memo_public = curve25519mod.Private.get_public(self)
91 return self._memo_public
93 # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
95 if sys.version < '3':
96 def int2byte(i):
97 return chr(i)
98 else:
99 def int2byte(i):
100 return bytes([i])
102 def kdf_rfc5869(key, salt, info, n):
104 prk = HMAC(key=salt, msg=key)
106 out = b""
107 last = b""
108 i = 1
109 while len(out) < n:
110 m = last + info + int2byte(i)
111 last = h = HMAC(key=prk, msg=m)
112 out += h
113 i = i + 1
114 return out[:n]
116 def kdf_ntor(key, n):
117 return kdf_rfc5869(key, T_KEY, M_EXPAND, n)
119 # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
121 def client_part1(node_id, pubkey_B):
122 """Initial handshake, client side.
124 From the specification:
126 <<To send a create cell, the client generates a keypair x,X =
127 KEYGEN(), and sends a CREATE cell with contents:
129 NODEID: ID -- ID_LENGTH bytes
130 KEYID: KEYID(B) -- H_LENGTH bytes
131 CLIENT_PK: X -- G_LENGTH bytes
134 Takes node_id -- a digest of the server's identity key,
135 pubkey_B -- a public key for the server.
136 Returns a tuple of (client secret key x, client->server message)"""
138 assert len(node_id) == NODE_ID_LENGTH
140 key_id = keyid(pubkey_B)
141 seckey_x = PrivateKey()
142 pubkey_X = seckey_x.get_public().serialize()
144 message = node_id + key_id + pubkey_X
146 assert len(message) == NODE_ID_LENGTH + H_LENGTH + H_LENGTH
147 return seckey_x , message
149 def hash_nil(x):
150 """Identity function: if we don't pass a hash function that does nothing,
151 the curve25519 python lib will try to sha256 it for us."""
152 return x
154 def bad_result(r):
155 """Helper: given a result of multiplying a public key by a private key,
156 return True iff one of the inputs was broken"""
157 assert len(r) == 32
158 return r == '\x00'*32
160 def server(seckey_b, my_node_id, message, keyBytes=72):
161 """Handshake step 2, server side.
163 From the spec:
166 The server generates a keypair of y,Y = KEYGEN(), and computes
168 secret_input = EXP(X,y) | EXP(X,b) | ID | B | X | Y | PROTOID
169 KEY_SEED = H(secret_input, t_key)
170 verify = H(secret_input, t_verify)
171 auth_input = verify | ID | B | Y | X | PROTOID | "Server"
173 The server sends a CREATED cell containing:
175 SERVER_PK: Y -- G_LENGTH bytes
176 AUTH: H(auth_input, t_mac) -- H_LENGTH byets
179 Takes seckey_b -- the server's secret key
180 my_node_id -- the servers's public key digest,
181 message -- a message from a client
182 keybytes -- amount of key material to generate
184 Returns a tuple of (key material, sever->client reply), or None on
185 error.
188 assert len(message) == NODE_ID_LENGTH + H_LENGTH + H_LENGTH
190 if my_node_id != message[:NODE_ID_LENGTH]:
191 return None
193 badness = (keyid(seckey_b.get_public()) !=
194 message[NODE_ID_LENGTH:NODE_ID_LENGTH+H_LENGTH])
196 pubkey_X = curve25519mod.Public(message[NODE_ID_LENGTH+H_LENGTH:])
197 seckey_y = PrivateKey()
198 pubkey_Y = seckey_y.get_public()
199 pubkey_B = seckey_b.get_public()
200 xy = seckey_y.get_shared_key(pubkey_X, hash_nil)
201 xb = seckey_b.get_shared_key(pubkey_X, hash_nil)
203 # secret_input = EXP(X,y) | EXP(X,b) | ID | B | X | Y | PROTOID
204 secret_input = (xy + xb + my_node_id +
205 pubkey_B.serialize() +
206 pubkey_X.serialize() +
207 pubkey_Y.serialize() +
208 PROTOID)
210 verify = H_verify(secret_input)
212 # auth_input = verify | ID | B | Y | X | PROTOID | "Server"
213 auth_input = (verify +
214 my_node_id +
215 pubkey_B.serialize() +
216 pubkey_Y.serialize() +
217 pubkey_X.serialize() +
218 PROTOID +
219 b"Server")
221 msg = pubkey_Y.serialize() + H_mac(auth_input)
223 badness += bad_result(xb)
224 badness += bad_result(xy)
226 if badness:
227 return None
229 keys = kdf_ntor(secret_input, keyBytes)
231 return keys, msg
233 def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72):
234 """Handshake step 3: client side again.
236 From the spec:
239 The client then checks Y is in G^* [see NOTE below], and computes
241 secret_input = EXP(Y,x) | EXP(B,x) | ID | B | X | Y | PROTOID
242 KEY_SEED = H(secret_input, t_key)
243 verify = H(secret_input, t_verify)
244 auth_input = verify | ID | B | Y | X | PROTOID | "Server"
246 The client verifies that AUTH == H(auth_input, t_mac).
249 Takes seckey_x -- the secret key we generated in step 1.
250 msg -- the message from the server.
251 node_id -- the node_id we used in step 1.
252 server_key -- the same public key we used in step 1.
253 keyBytes -- the number of bytes we want to generate
254 Returns key material, or None on error
257 assert len(msg) == G_LENGTH + H_LENGTH
259 pubkey_Y = curve25519mod.Public(msg[:G_LENGTH])
260 their_auth = msg[G_LENGTH:]
262 pubkey_X = seckey_x.get_public()
264 yx = seckey_x.get_shared_key(pubkey_Y, hash_nil)
265 bx = seckey_x.get_shared_key(pubkey_B, hash_nil)
268 # secret_input = EXP(Y,x) | EXP(B,x) | ID | B | X | Y | PROTOID
269 secret_input = (yx + bx + node_id +
270 pubkey_B.serialize() +
271 pubkey_X.serialize() +
272 pubkey_Y.serialize() + PROTOID)
274 verify = H_verify(secret_input)
276 # auth_input = verify | ID | B | Y | X | PROTOID | "Server"
277 auth_input = (verify + node_id +
278 pubkey_B.serialize() +
279 pubkey_Y.serialize() +
280 pubkey_X.serialize() + PROTOID +
281 b"Server")
283 my_auth = H_mac(auth_input)
285 badness = my_auth != their_auth
286 badness |= bad_result(yx) + bad_result(bx)
288 if badness:
289 return None
291 return kdf_ntor(secret_input, keyBytes)
293 # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
295 def demo(node_id=b"iToldYouAboutStairs.", server_key=PrivateKey()):
297 Try to handshake with ourself.
299 x, create = client_part1(node_id, server_key.get_public())
300 skeys, created = server(server_key, node_id, create)
301 ckeys = client_part2(x, created, node_id, server_key.get_public())
302 assert len(skeys) == 72
303 assert len(ckeys) == 72
304 assert skeys == ckeys
305 print("OK")
307 # ======================================================================
308 def timing():
310 Use Python's timeit module to see how fast this nonsense is
312 import timeit
313 t = timeit.Timer(stmt="ntor_ref.demo(N,SK)",
314 setup="import ntor_ref,curve25519;N='ABCD'*5;SK=ntor_ref.PrivateKey()")
315 print(t.timeit(number=1000))
317 # ======================================================================
319 def kdf_vectors():
321 Generate some vectors to check our KDF.
323 import binascii
324 def kdf_vec(inp):
325 k = kdf_rfc5869(inp, T_KEY, M_EXPAND, 100)
326 print(repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\"")
327 kdf_vec("")
328 kdf_vec("Tor")
329 kdf_vec("AN ALARMING ITEM TO FIND ON YOUR CREDIT-RATING STATEMENT")
331 # ======================================================================
334 def test_tor():
336 Call the test-ntor-cl command-line program to make sure we can
337 interoperate with Tor's ntor program
339 enhex=lambda s: binascii.b2a_hex(s)
340 dehex=lambda s: binascii.a2b_hex(s.strip())
342 PROG = b"./src/test/test-ntor-cl"
343 def tor_client1(node_id, pubkey_B):
344 " returns (msg, state) "
345 p = subprocess.Popen([PROG, b"client1", enhex(node_id),
346 enhex(pubkey_B.serialize())],
347 stdout=subprocess.PIPE)
348 return map(dehex, p.stdout.readlines())
349 def tor_server1(seckey_b, node_id, msg, n):
350 " returns (msg, keys) "
351 p = subprocess.Popen([PROG, "server1", enhex(seckey_b.serialize()),
352 enhex(node_id), enhex(msg), str(n)],
353 stdout=subprocess.PIPE)
354 return map(dehex, p.stdout.readlines())
355 def tor_client2(state, msg, n):
356 " returns (keys,) "
357 p = subprocess.Popen([PROG, "client2", enhex(state),
358 enhex(msg), str(n)],
359 stdout=subprocess.PIPE)
360 return map(dehex, p.stdout.readlines())
363 node_id = b"thisisatornodeid$#%^"
364 seckey_b = PrivateKey()
365 pubkey_B = seckey_b.get_public()
367 # Do a pure-Tor handshake
368 c2s_msg, c_state = tor_client1(node_id, pubkey_B)
369 s2c_msg, s_keys = tor_server1(seckey_b, node_id, c2s_msg, 90)
370 c_keys, = tor_client2(c_state, s2c_msg, 90)
371 assert c_keys == s_keys
372 assert len(c_keys) == 90
374 # Try a mixed handshake with Tor as the client
375 c2s_msg, c_state = tor_client1(node_id, pubkey_B)
376 s_keys, s2c_msg = server(seckey_b, node_id, c2s_msg, 90)
377 c_keys, = tor_client2(c_state, s2c_msg, 90)
378 assert c_keys == s_keys
379 assert len(c_keys) == 90
381 # Now do a mixed handshake with Tor as the server
382 c_x, c2s_msg = client_part1(node_id, pubkey_B)
383 s2c_msg, s_keys = tor_server1(seckey_b, node_id, c2s_msg, 90)
384 c_keys = client_part2(c_x, s2c_msg, node_id, pubkey_B, 90)
385 assert c_keys == s_keys
386 assert len(c_keys) == 90
388 print("OK")
390 # ======================================================================
392 if __name__ == '__main__':
393 if len(sys.argv) < 2:
394 print(__doc__)
395 elif sys.argv[1] == 'gen_kdf_vectors':
396 kdf_vectors()
397 elif sys.argv[1] == 'timing':
398 timing()
399 elif sys.argv[1] == 'self-test':
400 demo()
401 elif sys.argv[1] == 'test-tor':
402 test_tor()
404 else:
405 print(__doc__)