s4:dsdb: allocate DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS oid
[Samba.git] / ctdb / tests / src / message_ring.c
blobdabae65ff86a7f85ed227140b4ff93e958623568
1 /*
2 simple ctdb benchmark - send messages in a ring around cluster
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
23 #include "lib/util/time.h"
24 #include "lib/util/tevent_unix.h"
26 #include "client/client.h"
27 #include "tests/src/test_options.h"
28 #include "tests/src/cluster_wait.h"
30 #define MSG_ID_BENCH 0
32 struct message_ring_state {
33 struct tevent_context *ev;
34 struct ctdb_client_context *client;
35 int num_nodes;
36 int timelimit;
37 int interactive;
38 int msg_count;
39 int msg_plus, msg_minus;
40 struct timeval start_time;
43 static void message_ring_wait(struct tevent_req *subreq);
44 static void message_ring_start(struct tevent_req *subreq);
45 static void message_ring_each_second(struct tevent_req *subreq);
46 static void message_ring_msg_sent(struct tevent_req *subreq);
47 static void message_ring_msg_handler(uint64_t srvid, TDB_DATA data,
48 void *private_data);
49 static void message_ring_finish(struct tevent_req *subreq);
51 static struct tevent_req *message_ring_send(TALLOC_CTX *mem_ctx,
52 struct tevent_context *ev,
53 struct ctdb_client_context *client,
54 int num_nodes, int timelimit,
55 int interactive)
57 struct tevent_req *req, *subreq;
58 struct message_ring_state *state;
60 req = tevent_req_create(mem_ctx, &state, struct message_ring_state);
61 if (req == NULL) {
62 return NULL;
65 state->ev = ev;
66 state->client = client;
67 state->num_nodes = num_nodes;
68 state->timelimit = timelimit;
69 state->interactive = interactive;
71 subreq = ctdb_client_set_message_handler_send(
72 state, state->ev, state->client,
73 MSG_ID_BENCH,
74 message_ring_msg_handler, req);
75 if (tevent_req_nomem(subreq, req)) {
76 return tevent_req_post(req, ev);
78 tevent_req_set_callback(subreq, message_ring_wait, req);
80 return req;
83 static void message_ring_wait(struct tevent_req *subreq)
85 struct tevent_req *req = tevent_req_callback_data(
86 subreq, struct tevent_req);
87 struct message_ring_state *state = tevent_req_data(
88 req, struct message_ring_state);
89 bool status;
90 int ret;
92 status = ctdb_client_set_message_handler_recv(subreq, &ret);
93 TALLOC_FREE(subreq);
94 if (! status) {
95 tevent_req_error(req, ret);
96 return;
99 subreq = cluster_wait_send(state, state->ev, state->client,
100 state->num_nodes);
101 if (tevent_req_nomem(subreq, req)) {
102 return;
104 tevent_req_set_callback(subreq, message_ring_start, req);
107 static void message_ring_start(struct tevent_req *subreq)
109 struct tevent_req *req = tevent_req_callback_data(
110 subreq, struct tevent_req);
111 struct message_ring_state *state = tevent_req_data(
112 req, struct message_ring_state);
113 bool status;
114 int ret;
116 status = cluster_wait_recv(subreq, &ret);
117 TALLOC_FREE(subreq);
118 if (! status) {
119 tevent_req_error(req, ret);
120 return;
123 state->start_time = tevent_timeval_current();
125 if (ctdb_client_pnn(state->client) == 0) {
126 subreq = tevent_wakeup_send(state, state->ev,
127 tevent_timeval_current_ofs(1, 0));
128 if (tevent_req_nomem(subreq, req)) {
129 return;
131 tevent_req_set_callback(subreq, message_ring_each_second, req);
134 subreq = tevent_wakeup_send(state, state->ev,
135 tevent_timeval_current_ofs(
136 state->timelimit, 0));
137 if (tevent_req_nomem(subreq, req)) {
138 return;
140 tevent_req_set_callback(subreq, message_ring_finish, req);
143 static uint32_t next_node(struct ctdb_client_context *client,
144 int num_nodes, int incr)
146 return (ctdb_client_pnn(client) + num_nodes + incr) % num_nodes;
149 static void message_ring_each_second(struct tevent_req *subreq)
151 struct tevent_req *req = tevent_req_callback_data(
152 subreq, struct tevent_req);
153 struct message_ring_state *state = tevent_req_data(
154 req, struct message_ring_state);
155 struct ctdb_req_message msg;
156 uint32_t pnn;
157 int incr;
158 bool status;
160 status = tevent_wakeup_recv(subreq);
161 TALLOC_FREE(subreq);
162 if (! status) {
163 tevent_req_error(req, EIO);
164 return;
167 pnn = ctdb_client_pnn(state->client);
168 if (pnn == 0 && state->interactive == 1) {
169 double t;
171 t = timeval_elapsed(&state->start_time);
172 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
173 pnn, state->msg_count / t,
174 state->msg_plus, state->msg_minus);
175 fflush(stdout);
178 if (state->msg_plus == 0) {
179 incr = 1;
181 msg.srvid = 0;
182 msg.data.data.dptr = (uint8_t *)&incr;
183 msg.data.data.dsize = sizeof(incr);
185 pnn = next_node(state->client, state->num_nodes, incr);
187 subreq = ctdb_client_message_send(state, state->ev,
188 state->client, pnn, &msg);
189 if (tevent_req_nomem(subreq, req)) {
190 return;
192 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
195 if (state->msg_minus == 0) {
196 incr = -1;
198 msg.srvid = 0;
199 msg.data.data.dptr = (uint8_t *)&incr;
200 msg.data.data.dsize = sizeof(incr);
202 pnn = next_node(state->client, state->num_nodes, incr);
204 subreq = ctdb_client_message_send(state, state->ev,
205 state->client, pnn, &msg);
206 if (tevent_req_nomem(subreq, req)) {
207 return;
209 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
212 subreq = tevent_wakeup_send(state, state->ev,
213 tevent_timeval_current_ofs(1, 0));
214 if (tevent_req_nomem(subreq, req)) {
215 return;
217 tevent_req_set_callback(subreq, message_ring_each_second, req);
220 static void message_ring_msg_sent(struct tevent_req *subreq)
222 struct tevent_req *req = tevent_req_callback_data(
223 subreq, struct tevent_req);
224 bool status;
225 int ret;
227 status = ctdb_client_message_recv(subreq, &ret);
228 TALLOC_FREE(subreq);
229 if (! status) {
230 tevent_req_error(req, ret);
234 static void message_ring_msg_handler(uint64_t srvid, TDB_DATA data,
235 void *private_data)
237 struct tevent_req *req = talloc_get_type_abort(
238 private_data, struct tevent_req);
239 struct message_ring_state *state = tevent_req_data(
240 req, struct message_ring_state);
241 struct ctdb_req_message msg;
242 struct tevent_req *subreq;
243 int incr;
244 uint32_t pnn;
246 if (srvid != MSG_ID_BENCH) {
247 return;
250 if (data.dsize != sizeof(int)) {
251 return;
253 incr = *(int *)data.dptr;
255 state->msg_count += 1;
256 if (incr == 1) {
257 state->msg_plus += 1;
258 } else {
259 state->msg_minus += 1;
262 pnn = next_node(state->client, state->num_nodes, incr);
264 msg.srvid = srvid;
265 msg.data.data = data;
267 subreq = ctdb_client_message_send(state, state->ev, state->client,
268 pnn, &msg);
269 if (tevent_req_nomem(subreq, req)) {
270 return;
272 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
275 static void message_ring_finish(struct tevent_req *subreq)
277 struct tevent_req *req = tevent_req_callback_data(
278 subreq, struct tevent_req);
279 struct message_ring_state *state = tevent_req_data(
280 req, struct message_ring_state);
281 bool status;
282 double t;
284 status = tevent_wakeup_recv(subreq);
285 TALLOC_FREE(subreq);
286 if (! status) {
287 tevent_req_error(req, EIO);
288 return;
291 t = timeval_elapsed(&state->start_time);
293 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
294 ctdb_client_pnn(state->client), state->msg_count / t,
295 state->msg_plus, state->msg_minus);
297 tevent_req_done(req);
300 static bool message_ring_recv(struct tevent_req *req)
302 int ret;
304 if (tevent_req_is_unix_error(req, &ret)) {
305 return false;
307 return true;
310 int main(int argc, const char *argv[])
312 const struct test_options *opts;
313 TALLOC_CTX *mem_ctx;
314 struct tevent_context *ev;
315 struct ctdb_client_context *client;
316 struct tevent_req *req;
317 int ret;
318 bool status;
320 status = process_options_basic(argc, argv, &opts);
321 if (! status) {
322 exit(1);
325 mem_ctx = talloc_new(NULL);
326 if (mem_ctx == NULL) {
327 fprintf(stderr, "Memory allocation error\n");
328 exit(1);
331 ev = tevent_context_init(mem_ctx);
332 if (ev == NULL) {
333 fprintf(stderr, "Memory allocation error\n");
334 exit(1);
337 ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
338 if (ret != 0) {
339 fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
340 exit(1);
343 if (! ctdb_recovery_wait(ev, client)) {
344 fprintf(stderr, "Failed to wait for recovery\n");
345 exit(1);
348 req = message_ring_send(mem_ctx, ev, client,
349 opts->num_nodes, opts->timelimit,
350 opts->interactive);
351 if (req == NULL) {
352 fprintf(stderr, "Memory allocation error\n");
353 exit(1);
356 tevent_req_poll(req, ev);
358 status = message_ring_recv(req);
359 if (! status) {
360 fprintf(stderr, "message ring test failed\n");
361 exit(1);
364 talloc_free(mem_ctx);
365 return 0;