s3:libsmb: Honor disable_netbios option in smbsock_connect_send
[Samba.git] / ctdb / tests / src / fetch_ring.c
blobf746e789511dc571f7adb1f15d0b78450211292a
1 /*
2 simple ctdb benchmark
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/time.h"
25 #include "lib/util/tevent_unix.h"
27 #include "client/client.h"
28 #include "tests/src/test_options.h"
29 #include "tests/src/cluster_wait.h"
31 #define TESTDB "fetch_ring.tdb"
32 #define TESTKEY "testkey"
34 #define MSG_ID_FETCH 0
36 static uint32_t next_node(struct ctdb_client_context *client, int num_nodes)
38 return (ctdb_client_pnn(client) + 1) % num_nodes;
41 struct fetch_ring_state {
42 struct tevent_context *ev;
43 struct ctdb_client_context *client;
44 struct ctdb_db_context *ctdb_db;
45 int num_nodes;
46 int timelimit;
47 int interactive;
48 TDB_DATA key;
49 int msg_count;
50 struct timeval start_time;
53 static void fetch_ring_msg_handler(uint64_t srvid, TDB_DATA data,
54 void *private_data);
55 static void fetch_ring_wait(struct tevent_req *subreq);
56 static void fetch_ring_start(struct tevent_req *subreq);
57 static void fetch_ring_update(struct tevent_req *subreq);
58 static void fetch_ring_msg_sent(struct tevent_req *subreq);
59 static void fetch_ring_finish(struct tevent_req *subreq);
60 static void fetch_ring_final_read(struct tevent_req *subreq);
62 static struct tevent_req *fetch_ring_send(TALLOC_CTX *mem_ctx,
63 struct tevent_context *ev,
64 struct ctdb_client_context *client,
65 struct ctdb_db_context *ctdb_db,
66 int num_nodes, int timelimit,
67 int interactive)
69 struct tevent_req *req, *subreq;
70 struct fetch_ring_state *state;
72 req = tevent_req_create(mem_ctx, &state, struct fetch_ring_state);
73 if (req == NULL) {
74 return NULL;
77 state->ev = ev;
78 state->client = client;
79 state->ctdb_db = ctdb_db;
80 state->num_nodes = num_nodes;
81 state->timelimit = timelimit;
82 state->interactive = interactive;
83 state->key.dptr = discard_const(TESTKEY);
84 state->key.dsize = strlen(TESTKEY);
86 subreq = ctdb_client_set_message_handler_send(
87 state, ev, client, MSG_ID_FETCH,
88 fetch_ring_msg_handler, req);
89 if (tevent_req_nomem(subreq, req)) {
90 return tevent_req_post(req, ev);
92 tevent_req_set_callback(subreq, fetch_ring_wait, req);
94 return req;
97 static void fetch_ring_msg_handler(uint64_t srvid, TDB_DATA data,
98 void *private_data)
100 struct tevent_req *req = talloc_get_type_abort(
101 private_data, struct tevent_req);
102 struct fetch_ring_state *state = tevent_req_data(
103 req, struct fetch_ring_state);
104 struct tevent_req *subreq;
106 state->msg_count += 1;
108 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
109 state->ctdb_db, state->key, false);
110 if (tevent_req_nomem(subreq, req)) {
111 return;
113 tevent_req_set_callback(subreq, fetch_ring_update, req);
116 static void fetch_ring_wait(struct tevent_req *subreq)
118 struct tevent_req *req = tevent_req_callback_data(
119 subreq, struct tevent_req);
120 struct fetch_ring_state *state = tevent_req_data(
121 req, struct fetch_ring_state);
122 bool status;
123 int ret;
125 status = ctdb_client_set_message_handler_recv(subreq, &ret);
126 TALLOC_FREE(subreq);
127 if (! status) {
128 tevent_req_error(req, ret);
129 return;
132 subreq = cluster_wait_send(state, state->ev, state->client,
133 state->num_nodes);
134 if (tevent_req_nomem(subreq, req)) {
135 return;
137 tevent_req_set_callback(subreq, fetch_ring_start, req);
140 static void fetch_ring_start(struct tevent_req *subreq)
142 struct tevent_req *req = tevent_req_callback_data(
143 subreq, struct tevent_req);
144 struct fetch_ring_state *state = tevent_req_data(
145 req, struct fetch_ring_state);
146 bool status;
147 int ret;
149 status = cluster_wait_recv(subreq, &ret);
150 TALLOC_FREE(subreq);
151 if (! status) {
152 tevent_req_error(req, ret);
153 return;
156 state->start_time = tevent_timeval_current();
158 if (ctdb_client_pnn(state->client) == state->num_nodes-1) {
159 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
160 state->ctdb_db, state->key,
161 false);
162 if (tevent_req_nomem(subreq, req)) {
163 return;
165 tevent_req_set_callback(subreq, fetch_ring_update, req);
168 subreq = tevent_wakeup_send(state, state->ev,
169 tevent_timeval_current_ofs(
170 state->timelimit, 0));
171 if (tevent_req_nomem(subreq, req)) {
172 return;
174 tevent_req_set_callback(subreq, fetch_ring_finish, req);
178 static void fetch_ring_update(struct tevent_req *subreq)
180 struct tevent_req *req = tevent_req_callback_data(
181 subreq, struct tevent_req);
182 struct fetch_ring_state *state = tevent_req_data(
183 req, struct fetch_ring_state);
184 struct ctdb_record_handle *h;
185 struct ctdb_req_message msg;
186 TDB_DATA data;
187 uint32_t pnn;
188 int ret;
190 h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
191 TALLOC_FREE(subreq);
192 if (h == NULL) {
193 tevent_req_error(req, ret);
194 return;
197 if (data.dsize > 1000) {
198 TALLOC_FREE(data.dptr);
199 data.dsize = 0;
202 if (data.dsize == 0) {
203 data.dptr = (uint8_t *)talloc_asprintf(state, "Test data\n");
204 if (tevent_req_nomem(data.dptr, req)) {
205 return;
209 data.dptr = (uint8_t *)talloc_asprintf_append(
210 (char *)data.dptr,
211 "msg_count=%d on node %d\n",
212 state->msg_count,
213 ctdb_client_pnn(state->client));
214 if (tevent_req_nomem(data.dptr, req)) {
215 return;
218 data.dsize = strlen((const char *)data.dptr) + 1;
220 ret = ctdb_store_record(h, data);
221 if (ret != 0) {
222 tevent_req_error(req, ret);
223 return;
226 talloc_free(data.dptr);
227 talloc_free(h);
229 msg.srvid = MSG_ID_FETCH;
230 msg.data.data = tdb_null;
232 pnn = next_node(state->client, state->num_nodes);
234 subreq = ctdb_client_message_send(state, state->ev, state->client,
235 pnn, &msg);
236 if (tevent_req_nomem(subreq, req)) {
237 return;
239 tevent_req_set_callback(subreq, fetch_ring_msg_sent, req);
242 static void fetch_ring_msg_sent(struct tevent_req *subreq)
244 struct tevent_req *req = tevent_req_callback_data(
245 subreq, struct tevent_req);
246 bool status;
247 int ret;
249 status = ctdb_client_message_recv(subreq, &ret);
250 TALLOC_FREE(subreq);
251 if (! status) {
252 tevent_req_error(req, ret);
256 static void fetch_ring_finish(struct tevent_req *subreq)
258 struct tevent_req *req = tevent_req_callback_data(
259 subreq, struct tevent_req);
260 struct fetch_ring_state *state = tevent_req_data(
261 req, struct fetch_ring_state);
262 bool status;
263 double t;
265 status = tevent_wakeup_recv(subreq);
266 TALLOC_FREE(subreq);
267 if (! status) {
268 tevent_req_error(req, EIO);
269 return;
272 t = timeval_elapsed(&state->start_time);
274 printf("Fetch[%u]: %.2f msgs/sec\n", ctdb_client_pnn(state->client),
275 state->msg_count / t);
277 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
278 state->ctdb_db, state->key, false);
279 if (tevent_req_nomem(subreq, req)) {
280 return;
282 tevent_req_set_callback(subreq, fetch_ring_final_read, req);
285 static void fetch_ring_final_read(struct tevent_req *subreq)
287 struct tevent_req *req = tevent_req_callback_data(
288 subreq, struct tevent_req);
289 struct fetch_ring_state *state = tevent_req_data(
290 req, struct fetch_ring_state);
291 struct ctdb_record_handle *h;
292 TDB_DATA data;
293 int err;
295 h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &err);
296 TALLOC_FREE(subreq);
297 if (h == NULL) {
298 tevent_req_error(req, err);
299 return;
302 if (state->interactive == 1) {
303 printf("DATA:\n%s\n", (char *)data.dptr);
305 talloc_free(data.dptr);
306 talloc_free(h);
308 tevent_req_done(req);
311 static bool fetch_ring_recv(struct tevent_req *req, int *perr)
313 int err;
315 if (tevent_req_is_unix_error(req, &err)) {
316 if (perr != NULL) {
317 *perr = err;
319 return false;
321 return true;
324 int main(int argc, const char *argv[])
326 const struct test_options *opts;
327 TALLOC_CTX *mem_ctx;
328 struct tevent_context *ev;
329 struct ctdb_client_context *client;
330 struct ctdb_db_context *ctdb_db;
331 struct tevent_req *req;
332 int ret;
333 bool status;
335 setup_logging("fetch_ring", DEBUG_STDERR);
337 status = process_options_basic(argc, argv, &opts);
338 if (! status) {
339 exit(1);
342 mem_ctx = talloc_new(NULL);
343 if (mem_ctx == NULL) {
344 fprintf(stderr, "Memory allocation error\n");
345 exit(1);
348 ev = tevent_context_init(mem_ctx);
349 if (ev == NULL) {
350 fprintf(stderr, "Memory allocation error\n");
351 exit(1);
354 ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
355 if (ret != 0) {
356 fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
357 exit(1);
360 if (! ctdb_recovery_wait(ev, client)) {
361 fprintf(stderr, "Memory allocation error\n");
362 exit(1);
365 ret = ctdb_attach(ev, client, tevent_timeval_zero(), TESTDB, 0,
366 &ctdb_db);
367 if (ret != 0) {
368 fprintf(stderr, "Failed to attach to DB %s\n", TESTDB);
369 exit(1);
372 req = fetch_ring_send(mem_ctx, ev, client, ctdb_db,
373 opts->num_nodes, opts->timelimit,
374 opts->interactive);
375 if (req == NULL) {
376 fprintf(stderr, "Memory allocation error\n");
377 exit(1);
380 tevent_req_poll(req, ev);
382 status = fetch_ring_recv(req, NULL);
383 if (! status) {
384 fprintf(stderr, "fetch ring test failed\n");
385 exit(1);
388 talloc_free(mem_ctx);
389 return 0;