ctdb-tests: Generalize transaction_loop test
[Samba.git] / ctdb / tests / src / transaction_loop.c
blob66237512c8718e9b6efe2df71b0c24339ca64814
1 /*
2 simple ctdb benchmark for persistent databases
4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include "system/network.h"
23 #include "lib/util/tevent_unix.h"
25 #include "client/client.h"
26 #include "tests/src/test_options.h"
27 #include "tests/src/cluster_wait.h"
29 struct transaction_loop_state {
30 struct tevent_context *ev;
31 struct ctdb_client_context *client;
32 struct ctdb_db_context *ctdb_db;
33 int num_nodes;
34 int timelimit;
35 int interactive;
36 TDB_DATA key;
37 uint32_t pnn;
38 struct ctdb_transaction_handle *h;
39 uint32_t *old_counter, *counter;
40 struct tevent_req *subreq;
43 static void transaction_loop_start(struct tevent_req *subreq);
44 static void transaction_loop_started(struct tevent_req *subreq);
45 static void transaction_loop_committed(struct tevent_req *subreq);
46 static void transaction_loop_each_second(struct tevent_req *subreq);
47 static bool transaction_loop_check_counters(struct tevent_req *req);
48 static void transaction_loop_finish(struct tevent_req *subreq);
50 static struct tevent_req *transaction_loop_send(
51 TALLOC_CTX *mem_ctx,
52 struct tevent_context *ev,
53 struct ctdb_client_context *client,
54 struct ctdb_db_context *ctdb_db,
55 int num_nodes, int timelimit, int interactive,
56 const char *keystr)
58 struct tevent_req *req, *subreq;
59 struct transaction_loop_state *state;
61 req = tevent_req_create(mem_ctx, &state,
62 struct transaction_loop_state);
63 if (req == NULL) {
64 return NULL;
67 state->ev = ev;
68 state->client = client;
69 state->ctdb_db = ctdb_db;
70 state->num_nodes = num_nodes;
71 state->timelimit = timelimit;
72 state->interactive = interactive;
73 state->key.dptr = discard_const(keystr);
74 state->key.dsize = strlen(keystr);
75 state->pnn = ctdb_client_pnn(client);
76 state->old_counter = talloc_zero_array(state, uint32_t, num_nodes);
77 if (tevent_req_nomem(state->old_counter, req)) {
78 return tevent_req_post(req, ev);
80 state->counter = talloc_zero_array(state, uint32_t, num_nodes);
81 if (tevent_req_nomem(state->counter, req)) {
82 return tevent_req_post(req, ev);
85 subreq = cluster_wait_send(state, state->ev, state->client,
86 state->num_nodes);
87 if (tevent_req_nomem(subreq, req)) {
88 return tevent_req_post(req, ev);
90 tevent_req_set_callback(subreq, transaction_loop_start, req);
92 return req;
95 static void transaction_loop_start(struct tevent_req *subreq)
97 struct tevent_req *req = tevent_req_callback_data(
98 subreq, struct tevent_req);
99 struct transaction_loop_state *state = tevent_req_data(
100 req, struct transaction_loop_state);
101 bool status;
102 int ret;
104 status = cluster_wait_recv(subreq, &ret);
105 TALLOC_FREE(subreq);
106 if (! status) {
107 tevent_req_error(req, ret);
108 return;
111 subreq = ctdb_transaction_start_send(state, state->ev, state->client,
112 tevent_timeval_current_ofs(
113 state->timelimit, 0),
114 state->ctdb_db, false);
115 if (tevent_req_nomem(subreq, req)) {
116 return;
118 tevent_req_set_callback(subreq, transaction_loop_started, req);
119 state->subreq = subreq;
121 if (ctdb_client_pnn(state->client) == 0) {
122 subreq = tevent_wakeup_send(state, state->ev,
123 tevent_timeval_current_ofs(1, 0));
124 if (tevent_req_nomem(subreq, req)) {
125 return;
127 tevent_req_set_callback(subreq, transaction_loop_each_second,
128 req);
131 subreq = tevent_wakeup_send(state, state->ev,
132 tevent_timeval_current_ofs(
133 state->timelimit, 0));
134 if (tevent_req_nomem(subreq, req)) {
135 return;
137 tevent_req_set_callback(subreq, transaction_loop_finish, req);
140 static void transaction_loop_started(struct tevent_req *subreq)
142 struct tevent_req *req = tevent_req_callback_data(
143 subreq, struct tevent_req);
144 struct transaction_loop_state *state = tevent_req_data(
145 req, struct transaction_loop_state);
146 TDB_DATA data;
147 int ret;
148 uint32_t *counter;
150 state->h = ctdb_transaction_start_recv(subreq, &ret);
151 TALLOC_FREE(subreq);
152 state->subreq = NULL;
153 if (state->h == NULL) {
154 fprintf(stderr, "transaction start failed\n");
155 tevent_req_error(req, ret);
156 return;
159 ret = ctdb_transaction_fetch_record(state->h, state->key,
160 state, &data);
161 if (ret != 0) {
162 fprintf(stderr, "transaction fetch record failed\n");
163 tevent_req_error(req, ret);
164 return;
167 if (data.dsize < state->num_nodes * sizeof(uint32_t)) {
168 TALLOC_FREE(data.dptr);
170 data.dsize = state->num_nodes * sizeof(uint32_t);
171 data.dptr = (uint8_t *)talloc_zero_array(state, uint32_t,
172 state->num_nodes);
173 if (tevent_req_nomem(data.dptr, req)) {
174 return;
178 counter = (uint32_t *)data.dptr;
179 counter[state->pnn] += 1;
180 memcpy(state->counter, counter, state->num_nodes * sizeof(uint32_t));
182 ret = ctdb_transaction_store_record(state->h, state->key, data);
183 if (ret != 0) {
184 fprintf(stderr, "transaction store failed\n");
185 tevent_req_error(req, ret);
186 return;
189 subreq = ctdb_transaction_commit_send(state, state->ev,
190 tevent_timeval_current_ofs(
191 state->timelimit, 0),
192 state->h);
193 if (tevent_req_nomem(subreq, req)) {
194 return;
196 tevent_req_set_callback(subreq, transaction_loop_committed, req);
197 state->subreq = subreq;
200 static void transaction_loop_committed(struct tevent_req *subreq)
202 struct tevent_req *req = tevent_req_callback_data(
203 subreq, struct tevent_req);
204 struct transaction_loop_state *state = tevent_req_data(
205 req, struct transaction_loop_state);
206 int ret;
207 bool status;
209 status = ctdb_transaction_commit_recv(subreq, &ret);
210 TALLOC_FREE(subreq);
211 state->subreq = NULL;
212 if (! status) {
213 fprintf(stderr, "transaction commit failed - %s\n",
214 strerror(ret));
215 tevent_req_error(req, ret);
216 return;
219 if (state->pnn == 0) {
220 if (! transaction_loop_check_counters(req)) {
221 return;
225 subreq = ctdb_transaction_start_send(state, state->ev, state->client,
226 tevent_timeval_current_ofs(
227 state->timelimit, 0),
228 state->ctdb_db, false);
229 if (tevent_req_nomem(subreq, req)) {
230 return;
232 tevent_req_set_callback(subreq, transaction_loop_started, req);
235 static void transaction_loop_each_second(struct tevent_req *subreq)
237 struct tevent_req *req = tevent_req_callback_data(
238 subreq, struct tevent_req);
239 struct transaction_loop_state *state = tevent_req_data(
240 req, struct transaction_loop_state);
241 bool status;
242 int i;
244 status = tevent_wakeup_recv(subreq);
245 TALLOC_FREE(subreq);
246 if (! status) {
247 fprintf(stderr, "tevent wakeup failed\n");
248 tevent_req_error(req, EIO);
249 return;
252 if (state->interactive == 1) {
253 printf("Transaction[%u]: ", ctdb_client_pnn(state->client));
254 for (i=0; i<state->num_nodes; i++) {
255 printf("%6u ", state->counter[i]);
257 printf("\n");
258 fflush(stdout);
261 subreq = tevent_wakeup_send(state, state->ev,
262 tevent_timeval_current_ofs(1, 0));
263 if (tevent_req_nomem(subreq, req)) {
264 return;
266 tevent_req_set_callback(subreq, transaction_loop_each_second, req);
269 static bool transaction_loop_check_counters(struct tevent_req *req)
271 struct transaction_loop_state *state = tevent_req_data(
272 req, struct transaction_loop_state);
273 int i;
274 bool monotonous = true;
276 for (i=0; i<state->num_nodes; i++) {
277 if (state->counter[i] < state->old_counter[i]) {
278 fprintf(stderr,
279 "Counter reduced for node %d: %u -> %u\n",
280 i, state->old_counter[i], state->counter[i]);
281 monotonous = false;
282 break;
286 if (monotonous) {
287 memcpy(state->old_counter, state->counter,
288 state->num_nodes * sizeof(uint32_t));
291 return monotonous;
294 static void transaction_loop_finish(struct tevent_req *subreq)
296 struct tevent_req *req = tevent_req_callback_data(
297 subreq, struct tevent_req);
298 struct transaction_loop_state *state = tevent_req_data(
299 req, struct transaction_loop_state);
300 bool status;
301 int i;
303 status = tevent_wakeup_recv(subreq);
304 TALLOC_FREE(subreq);
305 TALLOC_FREE(state->subreq);
306 if (! status) {
307 tevent_req_error(req, EIO);
308 return;
311 printf("Transaction[%u]: ", ctdb_client_pnn(state->client));
312 for (i=0; i<state->num_nodes; i++) {
313 printf("%6u ", state->counter[i]);
315 printf("\n");
317 tevent_req_done(req);
320 static bool transaction_loop_recv(struct tevent_req *req, int *perr)
322 int err;
324 if (tevent_req_is_unix_error(req, &err)) {
325 if (perr != NULL) {
326 *perr = err;
328 return false;
330 return true;
333 int main(int argc, const char *argv[])
335 const struct test_options *opts;
336 TALLOC_CTX *mem_ctx;
337 struct tevent_context *ev;
338 struct ctdb_client_context *client;
339 struct ctdb_db_context *ctdb_db;
340 struct tevent_req *req;
341 uint8_t db_flags;
342 int ret;
343 bool status;
345 status = process_options_database(argc, argv, &opts);
346 if (! status) {
347 exit(1);
350 mem_ctx = talloc_new(NULL);
351 if (mem_ctx == NULL) {
352 fprintf(stderr, "Memory allocation error\n");
353 exit(1);
356 ev = tevent_context_init(mem_ctx);
357 if (ev == NULL) {
358 fprintf(stderr, "Memory allocation error\n");
359 exit(1);
362 ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
363 if (ret != 0) {
364 fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
365 exit(1);
368 if (! ctdb_recovery_wait(ev, client)) {
369 fprintf(stderr, "Memory allocation error\n");
370 exit(1);
373 if (strcmp(opts->dbtype, "persistent") == 0) {
374 db_flags = CTDB_DB_FLAGS_PERSISTENT;
375 } else if (strcmp(opts->dbtype, "replicated") == 0) {
376 db_flags = CTDB_DB_FLAGS_REPLICATED;
377 } else {
378 fprintf(stderr, "Database must be persistent or replicated\n");
379 exit(1);
382 ret = ctdb_attach(ev, client, tevent_timeval_zero(), opts->dbname,
383 db_flags, &ctdb_db);
384 if (ret != 0) {
385 fprintf(stderr, "Failed to attach to persistent DB %s\n",
386 opts->dbname);
387 exit(1);
390 req = transaction_loop_send(mem_ctx, ev, client, ctdb_db,
391 opts->num_nodes, opts->timelimit,
392 opts->interactive, opts->keystr);
393 if (req == NULL) {
394 fprintf(stderr, "Memory allocation error\n");
395 exit(1);
398 tevent_req_poll(req, ev);
400 status = transaction_loop_recv(req, &ret);
401 if (! status) {
402 fprintf(stderr, "transaction loop test failed, ret=%d\n", ret);
403 exit(1);
406 talloc_free(mem_ctx);
407 return 0;