4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/time.h"
25 #include "lib/util/tevent_unix.h"
27 #include "client/client.h"
28 #include "tests/src/test_options.h"
29 #include "tests/src/cluster_wait.h"
31 #define TESTDB "fetch_ring.tdb"
32 #define TESTKEY "testkey"
34 #define MSG_ID_FETCH 0
36 static uint32_t next_node(struct ctdb_client_context
*client
, int num_nodes
)
38 return (ctdb_client_pnn(client
) + 1) % num_nodes
;
41 struct fetch_ring_state
{
42 struct tevent_context
*ev
;
43 struct ctdb_client_context
*client
;
44 struct ctdb_db_context
*ctdb_db
;
50 struct timeval start_time
;
53 static void fetch_ring_msg_handler(uint64_t srvid
, TDB_DATA data
,
55 static void fetch_ring_wait(struct tevent_req
*subreq
);
56 static void fetch_ring_start(struct tevent_req
*subreq
);
57 static void fetch_ring_update(struct tevent_req
*subreq
);
58 static void fetch_ring_msg_sent(struct tevent_req
*subreq
);
59 static void fetch_ring_finish(struct tevent_req
*subreq
);
60 static void fetch_ring_final_read(struct tevent_req
*subreq
);
62 static struct tevent_req
*fetch_ring_send(TALLOC_CTX
*mem_ctx
,
63 struct tevent_context
*ev
,
64 struct ctdb_client_context
*client
,
65 struct ctdb_db_context
*ctdb_db
,
66 int num_nodes
, int timelimit
,
69 struct tevent_req
*req
, *subreq
;
70 struct fetch_ring_state
*state
;
72 req
= tevent_req_create(mem_ctx
, &state
, struct fetch_ring_state
);
78 state
->client
= client
;
79 state
->ctdb_db
= ctdb_db
;
80 state
->num_nodes
= num_nodes
;
81 state
->timelimit
= timelimit
;
82 state
->interactive
= interactive
;
83 state
->key
.dptr
= discard_const(TESTKEY
);
84 state
->key
.dsize
= strlen(TESTKEY
);
86 subreq
= ctdb_client_set_message_handler_send(
87 state
, ev
, client
, MSG_ID_FETCH
,
88 fetch_ring_msg_handler
, req
);
89 if (tevent_req_nomem(subreq
, req
)) {
90 return tevent_req_post(req
, ev
);
92 tevent_req_set_callback(subreq
, fetch_ring_wait
, req
);
97 static void fetch_ring_msg_handler(uint64_t srvid
, TDB_DATA data
,
100 struct tevent_req
*req
= talloc_get_type_abort(
101 private_data
, struct tevent_req
);
102 struct fetch_ring_state
*state
= tevent_req_data(
103 req
, struct fetch_ring_state
);
104 struct tevent_req
*subreq
;
106 state
->msg_count
+= 1;
108 subreq
= ctdb_fetch_lock_send(state
, state
->ev
, state
->client
,
109 state
->ctdb_db
, state
->key
, false);
110 if (tevent_req_nomem(subreq
, req
)) {
113 tevent_req_set_callback(subreq
, fetch_ring_update
, req
);
116 static void fetch_ring_wait(struct tevent_req
*subreq
)
118 struct tevent_req
*req
= tevent_req_callback_data(
119 subreq
, struct tevent_req
);
120 struct fetch_ring_state
*state
= tevent_req_data(
121 req
, struct fetch_ring_state
);
125 status
= ctdb_client_set_message_handler_recv(subreq
, &ret
);
128 tevent_req_error(req
, ret
);
132 subreq
= cluster_wait_send(state
, state
->ev
, state
->client
,
134 if (tevent_req_nomem(subreq
, req
)) {
137 tevent_req_set_callback(subreq
, fetch_ring_start
, req
);
140 static void fetch_ring_start(struct tevent_req
*subreq
)
142 struct tevent_req
*req
= tevent_req_callback_data(
143 subreq
, struct tevent_req
);
144 struct fetch_ring_state
*state
= tevent_req_data(
145 req
, struct fetch_ring_state
);
149 status
= cluster_wait_recv(subreq
, &ret
);
152 tevent_req_error(req
, ret
);
156 state
->start_time
= tevent_timeval_current();
158 if (ctdb_client_pnn(state
->client
) == state
->num_nodes
-1) {
159 subreq
= ctdb_fetch_lock_send(state
, state
->ev
, state
->client
,
160 state
->ctdb_db
, state
->key
,
162 if (tevent_req_nomem(subreq
, req
)) {
165 tevent_req_set_callback(subreq
, fetch_ring_update
, req
);
168 subreq
= tevent_wakeup_send(state
, state
->ev
,
169 tevent_timeval_current_ofs(
170 state
->timelimit
, 0));
171 if (tevent_req_nomem(subreq
, req
)) {
174 tevent_req_set_callback(subreq
, fetch_ring_finish
, req
);
178 static void fetch_ring_update(struct tevent_req
*subreq
)
180 struct tevent_req
*req
= tevent_req_callback_data(
181 subreq
, struct tevent_req
);
182 struct fetch_ring_state
*state
= tevent_req_data(
183 req
, struct fetch_ring_state
);
184 struct ctdb_record_handle
*h
;
185 struct ctdb_req_message msg
;
190 h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &ret
);
193 tevent_req_error(req
, ret
);
197 if (data
.dsize
> 1000) {
198 TALLOC_FREE(data
.dptr
);
202 if (data
.dsize
== 0) {
203 data
.dptr
= (uint8_t *)talloc_asprintf(state
, "Test data\n");
204 if (tevent_req_nomem(data
.dptr
, req
)) {
209 data
.dptr
= (uint8_t *)talloc_asprintf_append(
211 "msg_count=%d on node %d\n",
213 ctdb_client_pnn(state
->client
));
214 if (tevent_req_nomem(data
.dptr
, req
)) {
218 data
.dsize
= strlen((const char *)data
.dptr
) + 1;
220 ret
= ctdb_store_record(h
, data
);
222 tevent_req_error(req
, ret
);
226 talloc_free(data
.dptr
);
229 msg
.srvid
= MSG_ID_FETCH
;
230 msg
.data
.data
= tdb_null
;
232 pnn
= next_node(state
->client
, state
->num_nodes
);
234 subreq
= ctdb_client_message_send(state
, state
->ev
, state
->client
,
236 if (tevent_req_nomem(subreq
, req
)) {
239 tevent_req_set_callback(subreq
, fetch_ring_msg_sent
, req
);
242 static void fetch_ring_msg_sent(struct tevent_req
*subreq
)
244 struct tevent_req
*req
= tevent_req_callback_data(
245 subreq
, struct tevent_req
);
249 status
= ctdb_client_message_recv(subreq
, &ret
);
252 tevent_req_error(req
, ret
);
256 static void fetch_ring_finish(struct tevent_req
*subreq
)
258 struct tevent_req
*req
= tevent_req_callback_data(
259 subreq
, struct tevent_req
);
260 struct fetch_ring_state
*state
= tevent_req_data(
261 req
, struct fetch_ring_state
);
265 status
= tevent_wakeup_recv(subreq
);
268 tevent_req_error(req
, EIO
);
272 t
= timeval_elapsed(&state
->start_time
);
274 printf("Fetch[%u]: %.2f msgs/sec\n", ctdb_client_pnn(state
->client
),
275 state
->msg_count
/ t
);
277 subreq
= ctdb_fetch_lock_send(state
, state
->ev
, state
->client
,
278 state
->ctdb_db
, state
->key
, false);
279 if (tevent_req_nomem(subreq
, req
)) {
282 tevent_req_set_callback(subreq
, fetch_ring_final_read
, req
);
285 static void fetch_ring_final_read(struct tevent_req
*subreq
)
287 struct tevent_req
*req
= tevent_req_callback_data(
288 subreq
, struct tevent_req
);
289 struct fetch_ring_state
*state
= tevent_req_data(
290 req
, struct fetch_ring_state
);
291 struct ctdb_record_handle
*h
;
295 h
= ctdb_fetch_lock_recv(subreq
, NULL
, state
, &data
, &err
);
298 tevent_req_error(req
, err
);
302 if (state
->interactive
== 1) {
303 printf("DATA:\n%s\n", (char *)data
.dptr
);
305 talloc_free(data
.dptr
);
308 tevent_req_done(req
);
311 static bool fetch_ring_recv(struct tevent_req
*req
, int *perr
)
315 if (tevent_req_is_unix_error(req
, &err
)) {
324 int main(int argc
, const char *argv
[])
326 const struct test_options
*opts
;
328 struct tevent_context
*ev
;
329 struct ctdb_client_context
*client
;
330 struct ctdb_db_context
*ctdb_db
;
331 struct tevent_req
*req
;
335 setup_logging("fetch_ring", DEBUG_STDERR
);
337 status
= process_options_basic(argc
, argv
, &opts
);
342 mem_ctx
= talloc_new(NULL
);
343 if (mem_ctx
== NULL
) {
344 fprintf(stderr
, "Memory allocation error\n");
348 ev
= tevent_context_init(mem_ctx
);
350 fprintf(stderr
, "Memory allocation error\n");
354 ret
= ctdb_client_init(mem_ctx
, ev
, opts
->socket
, &client
);
356 fprintf(stderr
, "Failed to initialize client, ret=%d\n", ret
);
360 if (! ctdb_recovery_wait(ev
, client
)) {
361 fprintf(stderr
, "Memory allocation error\n");
365 ret
= ctdb_attach(ev
, client
, tevent_timeval_zero(), TESTDB
, 0,
368 fprintf(stderr
, "Failed to attach to DB %s\n", TESTDB
);
372 req
= fetch_ring_send(mem_ctx
, ev
, client
, ctdb_db
,
373 opts
->num_nodes
, opts
->timelimit
,
376 fprintf(stderr
, "Memory allocation error\n");
380 tevent_req_poll(req
, ev
);
382 status
= fetch_ring_recv(req
, NULL
);
384 fprintf(stderr
, "fetch ring test failed\n");
388 talloc_free(mem_ctx
);