1 /* Copyright (c) 2001-2004, Roger Dingledine.
2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 * Copyright (c) 2007-2020, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
6 #include "core/or/or.h"
7 #include "lib/thread/threads.h"
8 #include "core/or/onion.h"
9 #include "lib/evloop/workqueue.h"
10 #include "lib/crypt_ops/crypto_curve25519.h"
11 #include "lib/crypt_ops/crypto_rand.h"
12 #include "lib/net/alertsock.h"
13 #include "lib/evloop/compat_libevent.h"
14 #include "lib/intmath/weakrng.h"
15 #include "lib/crypt_ops/crypto_init.h"
19 #define MAX_INFLIGHT (1<<16)
21 static int opt_verbose
= 0;
22 static int opt_n_threads
= 8;
23 static int opt_n_items
= 10000;
24 static int opt_n_inflight
= 1000;
25 static int opt_n_lowwater
= 250;
26 static int opt_n_cancel
= 0;
27 static int opt_ratio_rsa
= 5;
29 #ifdef TRACK_RESPONSES
30 tor_mutex_t bitmap_mutex
;
35 typedef struct state_t
{
39 curve25519_secret_key_t ecdh
;
43 typedef struct rsa_work_t
{
49 typedef struct ecdh_work_t
{
52 curve25519_public_key_t pk
;
58 mark_handled(int serial
)
60 #ifdef TRACK_RESPONSES
61 tor_mutex_acquire(&bitmap_mutex
);
62 tor_assert(serial
< handled_len
);
63 tor_assert(! bitarray_is_set(handled
, serial
));
64 bitarray_set(handled
, serial
);
65 tor_mutex_release(&bitmap_mutex
);
66 #else /* !defined(TRACK_RESPONSES) */
68 #endif /* defined(TRACK_RESPONSES) */
71 static workqueue_reply_t
72 workqueue_do_rsa(void *state
, void *work
)
74 rsa_work_t
*rw
= work
;
76 crypto_pk_t
*rsa
= st
->rsa
;
80 tor_assert(st
->magic
== 13371337);
82 len
= crypto_pk_private_sign(rsa
, (char*)sig
, 256,
83 (char*)rw
->msg
, rw
->msglen
);
89 memset(rw
->msg
, 0, sizeof(rw
->msg
));
91 memcpy(rw
->msg
, sig
, len
);
94 mark_handled(rw
->serial
);
99 static workqueue_reply_t
100 workqueue_do_shutdown(void *state
, void *work
)
104 crypto_pk_free(((state_t
*)state
)->rsa
);
106 return WQ_RPL_SHUTDOWN
;
109 static workqueue_reply_t
110 workqueue_do_ecdh(void *state
, void *work
)
112 ecdh_work_t
*ew
= work
;
113 uint8_t output
[CURVE25519_OUTPUT_LEN
];
116 tor_assert(st
->magic
== 13371337);
118 curve25519_handshake(output
, &st
->ecdh
, &ew
->u
.pk
);
119 memcpy(ew
->u
.msg
, output
, CURVE25519_OUTPUT_LEN
);
121 mark_handled(ew
->serial
);
125 static workqueue_reply_t
126 workqueue_shutdown_error(void *state
, void *work
)
139 st
= tor_malloc(sizeof(*st
));
140 /* Every thread gets its own keys. not a problem for benchmarking */
141 st
->rsa
= crypto_pk_new();
142 if (crypto_pk_generate_key_with_bits(st
->rsa
, 1024) < 0) {
143 crypto_pk_free(st
->rsa
);
147 curve25519_secret_key_generate(&st
->ecdh
, 0);
148 st
->magic
= 13371337;
153 free_state(void *arg
)
156 crypto_pk_free(st
->rsa
);
160 static tor_weak_rng_t weak_rng
;
161 static int n_sent
= 0;
162 static int rsa_sent
= 0;
163 static int ecdh_sent
= 0;
164 static int n_received_previously
= 0;
165 static int n_received
= 0;
166 static int no_shutdown
= 0;
168 #ifdef TRACK_RESPONSES
169 bitarray_t
*received
;
173 handle_reply(void *arg
)
175 #ifdef TRACK_RESPONSES
176 rsa_work_t
*rw
= arg
; /* Naughty cast, but only looking at serial. */
177 tor_assert(! bitarray_is_set(received
, rw
->serial
));
178 bitarray_set(received
,rw
->serial
);
185 /* This should never get called. */
187 handle_reply_shutdown(void *arg
)
193 static workqueue_entry_t
*
194 add_work(threadpool_t
*tp
)
197 opt_ratio_rsa
== 0 ||
198 tor_weak_random_range(&weak_rng
, opt_ratio_rsa
) == 0;
201 rsa_work_t
*w
= tor_malloc_zero(sizeof(*w
));
202 w
->serial
= n_sent
++;
203 crypto_rand((char*)w
->msg
, 20);
206 return threadpool_queue_work_priority(tp
,
208 workqueue_do_rsa
, handle_reply
, w
);
210 ecdh_work_t
*w
= tor_malloc_zero(sizeof(*w
));
211 w
->serial
= n_sent
++;
212 /* Not strictly right, but this is just for benchmarks. */
213 crypto_rand((char*)w
->u
.pk
.public_key
, 32);
215 return threadpool_queue_work(tp
, workqueue_do_ecdh
, handle_reply
, w
);
219 static int n_failed_cancel
= 0;
220 static int n_successful_cancel
= 0;
223 add_n_work_items(threadpool_t
*tp
, int n
)
226 int n_try_cancel
= 0, i
;
227 workqueue_entry_t
**to_cancel
;
228 workqueue_entry_t
*ent
;
230 // We'll choose randomly which entries to cancel.
231 to_cancel
= tor_calloc(opt_n_cancel
, sizeof(workqueue_entry_t
*));
233 while (n_queued
++ < n
) {
237 tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL
);
241 if (n_try_cancel
< opt_n_cancel
) {
242 to_cancel
[n_try_cancel
++] = ent
;
244 int p
= tor_weak_random_range(&weak_rng
, n_queued
);
245 if (p
< n_try_cancel
) {
251 for (i
= 0; i
< n_try_cancel
; ++i
) {
252 void *work
= workqueue_entry_cancel(to_cancel
[i
]);
256 n_successful_cancel
++;
265 static int shutting_down
= 0;
268 replysock_readable_cb(threadpool_t
*tp
)
270 if (n_received_previously
== n_received
)
273 n_received_previously
= n_received
;
276 printf("%d / %d", n_received
, n_sent
);
278 printf(" (%d cancelled, %d uncancellable)",
279 n_successful_cancel
, n_failed_cancel
);
282 #ifdef TRACK_RESPONSES
283 tor_mutex_acquire(&bitmap_mutex
);
284 for (i
= 0; i
< opt_n_items
; ++i
) {
285 if (bitarray_is_set(received
, i
))
287 else if (bitarray_is_set(handled
, i
))
293 tor_mutex_release(&bitmap_mutex
);
294 #endif /* defined(TRACK_RESPONSES) */
296 if (n_sent
- (n_received
+n_successful_cancel
) < opt_n_lowwater
) {
297 int n_to_send
= n_received
+ opt_n_inflight
- n_sent
;
298 if (n_to_send
> opt_n_items
- n_sent
)
299 n_to_send
= opt_n_items
- n_sent
;
300 add_n_work_items(tp
, n_to_send
);
303 if (shutting_down
== 0 &&
304 n_received
+n_successful_cancel
== n_sent
&&
305 n_sent
>= opt_n_items
) {
307 threadpool_queue_update(tp
, NULL
,
308 workqueue_do_shutdown
, NULL
, NULL
);
309 // Anything we add after starting the shutdown must not be executed.
310 threadpool_queue_work(tp
, workqueue_shutdown_error
,
311 handle_reply_shutdown
, NULL
);
313 struct timeval limit
= { 2, 0 };
314 tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit
);
324 " -h Display this information\n"
326 " -N <items> Run this many items of work\n"
327 " -T <threads> Use this many threads\n"
328 " -I <inflight> Have no more than this many requests queued at once\n"
329 " -L <lowwater> Add items whenever fewer than this many are pending\n"
330 " -C <cancel> Try to cancel N items of every batch that we add\n"
331 " -R <ratio> Make one out of this many items be a slow (RSA) one\n"
332 " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
333 " Disable one of the alert_socket backends.");
337 main(int argc
, char **argv
)
342 tor_libevent_cfg_t evcfg
;
343 uint32_t as_flags
= 0;
345 for (i
= 1; i
< argc
; ++i
) {
346 if (!strcmp(argv
[i
], "-v")) {
348 } else if (!strcmp(argv
[i
], "-T") && i
+1<argc
) {
349 opt_n_threads
= atoi(argv
[++i
]);
350 } else if (!strcmp(argv
[i
], "-N") && i
+1<argc
) {
351 opt_n_items
= atoi(argv
[++i
]);
352 } else if (!strcmp(argv
[i
], "-I") && i
+1<argc
) {
353 opt_n_inflight
= atoi(argv
[++i
]);
354 } else if (!strcmp(argv
[i
], "-L") && i
+1<argc
) {
355 opt_n_lowwater
= atoi(argv
[++i
]);
356 } else if (!strcmp(argv
[i
], "-R") && i
+1<argc
) {
357 opt_ratio_rsa
= atoi(argv
[++i
]);
358 } else if (!strcmp(argv
[i
], "-C") && i
+1<argc
) {
359 opt_n_cancel
= atoi(argv
[++i
]);
360 } else if (!strcmp(argv
[i
], "--no-eventfd2")) {
361 as_flags
|= ASOCKS_NOEVENTFD2
;
362 } else if (!strcmp(argv
[i
], "--no-eventfd")) {
363 as_flags
|= ASOCKS_NOEVENTFD
;
364 } else if (!strcmp(argv
[i
], "--no-pipe2")) {
365 as_flags
|= ASOCKS_NOPIPE2
;
366 } else if (!strcmp(argv
[i
], "--no-pipe")) {
367 as_flags
|= ASOCKS_NOPIPE
;
368 } else if (!strcmp(argv
[i
], "--no-socketpair")) {
369 as_flags
|= ASOCKS_NOSOCKETPAIR
;
370 } else if (!strcmp(argv
[i
], "-h")) {
379 if (opt_n_threads
< 1 ||
380 opt_n_items
< 1 || opt_n_inflight
< 1 || opt_n_lowwater
< 0 ||
381 opt_n_cancel
> opt_n_inflight
|| opt_n_inflight
> MAX_INFLIGHT
||
387 if (opt_n_inflight
> opt_n_items
) {
388 opt_n_inflight
= opt_n_items
;
393 if (crypto_global_init(1, NULL
, NULL
) < 0) {
394 printf("Couldn't initialize crypto subsystem; exiting.\n");
397 if (crypto_seed_rng() < 0) {
398 printf("Couldn't seed RNG; exiting.\n");
402 rq
= replyqueue_new(as_flags
);
403 if (as_flags
&& rq
== NULL
)
404 return 77; // 77 means "skipped".
407 tp
= threadpool_new(opt_n_threads
,
408 rq
, new_state
, free_state
, NULL
);
411 crypto_seed_weak_rng(&weak_rng
);
413 memset(&evcfg
, 0, sizeof(evcfg
));
414 tor_libevent_initialize(&evcfg
);
417 int r
= threadpool_register_reply_event(tp
,
418 replysock_readable_cb
);
422 #ifdef TRACK_RESPONSES
423 handled
= bitarray_init_zero(opt_n_items
);
424 received
= bitarray_init_zero(opt_n_items
);
425 tor_mutex_init(&bitmap_mutex
);
426 handled_len
= opt_n_items
;
427 #endif /* defined(TRACK_RESPONSES) */
429 for (i
= 0; i
< opt_n_inflight
; ++i
) {
430 if (! add_work(tp
)) {
431 puts("Couldn't add work.");
437 struct timeval limit
= { 180, 0 };
438 tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit
);
441 tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
443 if (n_sent
!= opt_n_items
|| n_received
+n_successful_cancel
!= n_sent
) {
444 printf("%d vs %d\n", n_sent
, opt_n_items
);
445 printf("%d+%d vs %d\n", n_received
, n_successful_cancel
, n_sent
);
448 } else if (no_shutdown
) {
449 puts("Accepted work after shutdown\n");