2 simple ctdb benchmark - send messages in a ring around cluster
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
23 #include "lib/util/time.h"
24 #include "lib/util/tevent_unix.h"
26 #include "client/client.h"
27 #include "tests/src/test_options.h"
28 #include "tests/src/cluster_wait.h"
30 #define MSG_ID_BENCH 0
32 struct message_ring_state
{
33 struct tevent_context
*ev
;
34 struct ctdb_client_context
*client
;
39 int msg_plus
, msg_minus
;
40 struct timeval start_time
;
43 static void message_ring_wait(struct tevent_req
*subreq
);
44 static void message_ring_start(struct tevent_req
*subreq
);
45 static void message_ring_each_second(struct tevent_req
*subreq
);
46 static void message_ring_msg_sent(struct tevent_req
*subreq
);
47 static void message_ring_msg_handler(uint64_t srvid
, TDB_DATA data
,
49 static void message_ring_finish(struct tevent_req
*subreq
);
51 static struct tevent_req
*message_ring_send(TALLOC_CTX
*mem_ctx
,
52 struct tevent_context
*ev
,
53 struct ctdb_client_context
*client
,
54 int num_nodes
, int timelimit
,
57 struct tevent_req
*req
, *subreq
;
58 struct message_ring_state
*state
;
60 req
= tevent_req_create(mem_ctx
, &state
, struct message_ring_state
);
66 state
->client
= client
;
67 state
->num_nodes
= num_nodes
;
68 state
->timelimit
= timelimit
;
69 state
->interactive
= interactive
;
71 subreq
= ctdb_client_set_message_handler_send(
72 state
, state
->ev
, state
->client
,
74 message_ring_msg_handler
, req
);
75 if (tevent_req_nomem(subreq
, req
)) {
76 return tevent_req_post(req
, ev
);
78 tevent_req_set_callback(subreq
, message_ring_wait
, req
);
83 static void message_ring_wait(struct tevent_req
*subreq
)
85 struct tevent_req
*req
= tevent_req_callback_data(
86 subreq
, struct tevent_req
);
87 struct message_ring_state
*state
= tevent_req_data(
88 req
, struct message_ring_state
);
92 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
95 tevent_req_error(req
, ret
);
99 subreq
= cluster_wait_send(state
, state
->ev
, state
->client
,
101 if (tevent_req_nomem(subreq
, req
)) {
104 tevent_req_set_callback(subreq
, message_ring_start
, req
);
107 static void message_ring_start(struct tevent_req
*subreq
)
109 struct tevent_req
*req
= tevent_req_callback_data(
110 subreq
, struct tevent_req
);
111 struct message_ring_state
*state
= tevent_req_data(
112 req
, struct message_ring_state
);
116 status
= cluster_wait_recv(subreq
, &ret
);
119 tevent_req_error(req
, ret
);
123 state
->start_time
= tevent_timeval_current();
125 if (ctdb_client_pnn(state
->client
) == 0) {
126 subreq
= tevent_wakeup_send(state
, state
->ev
,
127 tevent_timeval_current_ofs(1, 0));
128 if (tevent_req_nomem(subreq
, req
)) {
131 tevent_req_set_callback(subreq
, message_ring_each_second
, req
);
134 subreq
= tevent_wakeup_send(state
, state
->ev
,
135 tevent_timeval_current_ofs(
136 state
->timelimit
, 0));
137 if (tevent_req_nomem(subreq
, req
)) {
140 tevent_req_set_callback(subreq
, message_ring_finish
, req
);
143 static uint32_t next_node(struct ctdb_client_context
*client
,
144 int num_nodes
, int incr
)
146 return (ctdb_client_pnn(client
) + num_nodes
+ incr
) % num_nodes
;
149 static void message_ring_each_second(struct tevent_req
*subreq
)
151 struct tevent_req
*req
= tevent_req_callback_data(
152 subreq
, struct tevent_req
);
153 struct message_ring_state
*state
= tevent_req_data(
154 req
, struct message_ring_state
);
155 struct ctdb_req_message msg
;
160 status
= tevent_wakeup_recv(subreq
);
163 tevent_req_error(req
, EIO
);
167 pnn
= ctdb_client_pnn(state
->client
);
168 if (pnn
== 0 && state
->interactive
== 1) {
171 t
= timeval_elapsed(&state
->start_time
);
172 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
173 pnn
, state
->msg_count
/ t
,
174 state
->msg_plus
, state
->msg_minus
);
178 if (state
->msg_plus
== 0) {
182 msg
.data
.data
.dptr
= (uint8_t *)&incr
;
183 msg
.data
.data
.dsize
= sizeof(incr
);
185 pnn
= next_node(state
->client
, state
->num_nodes
, incr
);
187 subreq
= ctdb_client_message_send(state
, state
->ev
,
188 state
->client
, pnn
, &msg
);
189 if (tevent_req_nomem(subreq
, req
)) {
192 tevent_req_set_callback(subreq
, message_ring_msg_sent
, req
);
195 if (state
->msg_minus
== 0) {
199 msg
.data
.data
.dptr
= (uint8_t *)&incr
;
200 msg
.data
.data
.dsize
= sizeof(incr
);
202 pnn
= next_node(state
->client
, state
->num_nodes
, incr
);
204 subreq
= ctdb_client_message_send(state
, state
->ev
,
205 state
->client
, pnn
, &msg
);
206 if (tevent_req_nomem(subreq
, req
)) {
209 tevent_req_set_callback(subreq
, message_ring_msg_sent
, req
);
212 subreq
= tevent_wakeup_send(state
, state
->ev
,
213 tevent_timeval_current_ofs(1, 0));
214 if (tevent_req_nomem(subreq
, req
)) {
217 tevent_req_set_callback(subreq
, message_ring_each_second
, req
);
220 static void message_ring_msg_sent(struct tevent_req
*subreq
)
222 struct tevent_req
*req
= tevent_req_callback_data(
223 subreq
, struct tevent_req
);
227 status
= ctdb_client_message_recv(subreq
, &ret
);
230 tevent_req_error(req
, ret
);
234 static void message_ring_msg_handler(uint64_t srvid
, TDB_DATA data
,
237 struct tevent_req
*req
= talloc_get_type_abort(
238 private_data
, struct tevent_req
);
239 struct message_ring_state
*state
= tevent_req_data(
240 req
, struct message_ring_state
);
241 struct ctdb_req_message msg
;
242 struct tevent_req
*subreq
;
246 if (srvid
!= MSG_ID_BENCH
) {
250 if (data
.dsize
!= sizeof(int)) {
253 incr
= *(int *)data
.dptr
;
255 state
->msg_count
+= 1;
257 state
->msg_plus
+= 1;
259 state
->msg_minus
+= 1;
262 pnn
= next_node(state
->client
, state
->num_nodes
, incr
);
265 msg
.data
.data
= data
;
267 subreq
= ctdb_client_message_send(state
, state
->ev
, state
->client
,
269 if (tevent_req_nomem(subreq
, req
)) {
272 tevent_req_set_callback(subreq
, message_ring_msg_sent
, req
);
275 static void message_ring_finish(struct tevent_req
*subreq
)
277 struct tevent_req
*req
= tevent_req_callback_data(
278 subreq
, struct tevent_req
);
279 struct message_ring_state
*state
= tevent_req_data(
280 req
, struct message_ring_state
);
284 status
= tevent_wakeup_recv(subreq
);
287 tevent_req_error(req
, EIO
);
291 t
= timeval_elapsed(&state
->start_time
);
293 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
294 ctdb_client_pnn(state
->client
), state
->msg_count
/ t
,
295 state
->msg_plus
, state
->msg_minus
);
297 tevent_req_done(req
);
300 static bool message_ring_recv(struct tevent_req
*req
)
304 if (tevent_req_is_unix_error(req
, &ret
)) {
310 int main(int argc
, const char *argv
[])
312 const struct test_options
*opts
;
314 struct tevent_context
*ev
;
315 struct ctdb_client_context
*client
;
316 struct tevent_req
*req
;
320 status
= process_options_basic(argc
, argv
, &opts
);
325 mem_ctx
= talloc_new(NULL
);
326 if (mem_ctx
== NULL
) {
327 fprintf(stderr
, "Memory allocation error\n");
331 ev
= tevent_context_init(mem_ctx
);
333 fprintf(stderr
, "Memory allocation error\n");
337 ret
= ctdb_client_init(mem_ctx
, ev
, opts
->socket
, &client
);
339 fprintf(stderr
, "Failed to initialize client, ret=%d\n", ret
);
343 if (! ctdb_recovery_wait(ev
, client
)) {
344 fprintf(stderr
, "Failed to wait for recovery\n");
348 req
= message_ring_send(mem_ctx
, ev
, client
,
349 opts
->num_nodes
, opts
->timelimit
,
352 fprintf(stderr
, "Memory allocation error\n");
356 tevent_req_poll(req
, ev
);
358 status
= message_ring_recv(req
);
360 fprintf(stderr
, "message ring test failed\n");
364 talloc_free(mem_ctx
);