s4:libcli/wrepl: convert wrepl_pull_table_send to tevent_req
[Samba/cd1.git] / source4 / wrepl_server / wrepl_out_helpers.c
blob97457614363ced75e1580b6eabcba98c9d1e5b00
1 /*
2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
35 enum wreplsrv_out_connect_stage {
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
37 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
38 WREPLSRV_OUT_CONNECT_STAGE_DONE
41 struct wreplsrv_out_connect_state {
42 enum wreplsrv_out_connect_stage stage;
43 struct composite_context *c;
44 struct wrepl_request *req;
45 struct composite_context *c_req;
46 struct wrepl_associate assoc_io;
47 enum winsrepl_partner_type type;
48 struct wreplsrv_out_connection *wreplconn;
51 static void wreplsrv_out_connect_handler_creq(struct composite_context *c_req);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req);
54 static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
56 NTSTATUS status;
58 status = wrepl_connect_recv(state->c_req);
59 NT_STATUS_NOT_OK_RETURN(status);
61 state->req = wrepl_associate_send(state->wreplconn->sock, &state->assoc_io);
62 NT_STATUS_HAVE_NO_MEMORY(state->req);
64 state->req->async.fn = wreplsrv_out_connect_handler_req;
65 state->req->async.private_data = state;
67 state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
69 return NT_STATUS_OK;
72 static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
74 NTSTATUS status;
76 status = wrepl_associate_recv(state->req, &state->assoc_io);
77 NT_STATUS_NOT_OK_RETURN(status);
79 state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
80 state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
82 if (state->type == WINSREPL_PARTNER_PUSH) {
83 if (state->wreplconn->assoc_ctx.peer_major >= 5) {
84 state->wreplconn->partner->push.wreplconn = state->wreplconn;
85 talloc_steal(state->wreplconn->partner, state->wreplconn);
86 } else {
87 state->type = WINSREPL_PARTNER_NONE;
89 } else if (state->type == WINSREPL_PARTNER_PULL) {
90 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
91 talloc_steal(state->wreplconn->partner, state->wreplconn);
94 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
96 return NT_STATUS_OK;
99 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
101 struct composite_context *c = state->c;
103 switch (state->stage) {
104 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
105 c->status = wreplsrv_out_connect_wait_socket(state);
106 break;
107 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
108 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
109 c->state = COMPOSITE_STATE_DONE;
110 break;
111 case WREPLSRV_OUT_CONNECT_STAGE_DONE:
112 c->status = NT_STATUS_INTERNAL_ERROR;
115 if (!NT_STATUS_IS_OK(c->status)) {
116 c->state = COMPOSITE_STATE_ERROR;
119 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
120 c->async.fn(c);
124 static void wreplsrv_out_connect_handler_creq(struct composite_context *creq)
126 struct wreplsrv_out_connect_state *state = talloc_get_type(creq->async.private_data,
127 struct wreplsrv_out_connect_state);
128 wreplsrv_out_connect_handler(state);
129 return;
132 static void wreplsrv_out_connect_handler_req(struct wrepl_request *req)
134 struct wreplsrv_out_connect_state *state = talloc_get_type(req->async.private_data,
135 struct wreplsrv_out_connect_state);
136 wreplsrv_out_connect_handler(state);
137 return;
140 static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
141 enum winsrepl_partner_type type,
142 struct wreplsrv_out_connection *wreplconn)
144 struct composite_context *c = NULL;
145 struct wreplsrv_service *service = partner->service;
146 struct wreplsrv_out_connect_state *state = NULL;
147 struct wreplsrv_out_connection **wreplconnp = &wreplconn;
148 bool cached_connection = false;
150 c = talloc_zero(partner, struct composite_context);
151 if (!c) goto failed;
153 state = talloc_zero(c, struct wreplsrv_out_connect_state);
154 if (!state) goto failed;
155 state->c = c;
156 state->type = type;
158 c->state = COMPOSITE_STATE_IN_PROGRESS;
159 c->event_ctx = service->task->event_ctx;
160 c->private_data = state;
162 if (type == WINSREPL_PARTNER_PUSH) {
163 cached_connection = true;
164 wreplconn = partner->push.wreplconn;
165 wreplconnp = &partner->push.wreplconn;
166 } else if (type == WINSREPL_PARTNER_PULL) {
167 cached_connection = true;
168 wreplconn = partner->pull.wreplconn;
169 wreplconnp = &partner->pull.wreplconn;
172 /* we have a connection already, so use it */
173 if (wreplconn) {
174 if (!wreplconn->sock->dead) {
175 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
176 state->wreplconn= wreplconn;
177 composite_done(c);
178 return c;
179 } else if (!cached_connection) {
180 state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
181 state->wreplconn= NULL;
182 composite_done(c);
183 return c;
184 } else {
185 talloc_free(wreplconn);
186 *wreplconnp = NULL;
190 wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
191 if (!wreplconn) goto failed;
193 wreplconn->service = service;
194 wreplconn->partner = partner;
195 wreplconn->sock = wrepl_socket_init(wreplconn, service->task->event_ctx, lp_iconv_convenience(service->task->lp_ctx));
196 if (!wreplconn->sock) goto failed;
198 state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
199 state->wreplconn= wreplconn;
200 state->c_req = wrepl_connect_send(wreplconn->sock,
201 partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
202 partner->address);
203 if (!state->c_req) goto failed;
205 state->c_req->async.fn = wreplsrv_out_connect_handler_creq;
206 state->c_req->async.private_data = state;
208 return c;
209 failed:
210 talloc_free(c);
211 return NULL;
214 static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
215 struct wreplsrv_out_connection **wreplconn)
217 NTSTATUS status;
219 status = composite_wait(c);
221 if (NT_STATUS_IS_OK(status)) {
222 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
223 struct wreplsrv_out_connect_state);
224 if (state->wreplconn) {
225 *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
226 if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
227 } else {
228 status = NT_STATUS_INVALID_CONNECTION;
232 talloc_free(c);
233 return status;
237 struct wreplsrv_pull_table_io {
238 struct {
239 struct wreplsrv_partner *partner;
240 uint32_t num_owners;
241 struct wrepl_wins_owner *owners;
242 } in;
243 struct {
244 uint32_t num_owners;
245 struct wrepl_wins_owner *owners;
246 } out;
249 enum wreplsrv_pull_table_stage {
250 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
251 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
252 WREPLSRV_PULL_TABLE_STAGE_DONE
255 struct wreplsrv_pull_table_state {
256 enum wreplsrv_pull_table_stage stage;
257 struct composite_context *c;
258 struct wrepl_pull_table table_io;
259 struct wreplsrv_pull_table_io *io;
260 struct composite_context *creq;
261 struct wreplsrv_out_connection *wreplconn;
262 struct tevent_req *subreq;
265 static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq);
267 static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
269 NTSTATUS status;
271 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
272 NT_STATUS_NOT_OK_RETURN(status);
274 state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
275 state->subreq = wrepl_pull_table_send(state,
276 state->wreplconn->service->task->event_ctx,
277 state->wreplconn->sock, &state->table_io);
278 NT_STATUS_HAVE_NO_MEMORY(state->subreq);
280 tevent_req_set_callback(state->subreq,
281 wreplsrv_pull_table_handler_treq,
282 state);
284 state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
286 return NT_STATUS_OK;
289 static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
291 NTSTATUS status;
293 status = wrepl_pull_table_recv(state->subreq, state, &state->table_io);
294 TALLOC_FREE(state->subreq);
295 NT_STATUS_NOT_OK_RETURN(status);
297 state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
299 return NT_STATUS_OK;
302 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
304 struct composite_context *c = state->c;
306 switch (state->stage) {
307 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
308 c->status = wreplsrv_pull_table_wait_connection(state);
309 break;
310 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
311 c->status = wreplsrv_pull_table_wait_table_reply(state);
312 c->state = COMPOSITE_STATE_DONE;
313 break;
314 case WREPLSRV_PULL_TABLE_STAGE_DONE:
315 c->status = NT_STATUS_INTERNAL_ERROR;
318 if (!NT_STATUS_IS_OK(c->status)) {
319 c->state = COMPOSITE_STATE_ERROR;
322 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
323 c->async.fn(c);
327 static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
329 struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
330 struct wreplsrv_pull_table_state);
331 wreplsrv_pull_table_handler(state);
332 return;
335 static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq)
337 struct wreplsrv_pull_table_state *state = tevent_req_callback_data(subreq,
338 struct wreplsrv_pull_table_state);
339 wreplsrv_pull_table_handler(state);
340 return;
343 static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
345 struct composite_context *c = NULL;
346 struct wreplsrv_service *service = io->in.partner->service;
347 struct wreplsrv_pull_table_state *state = NULL;
349 c = talloc_zero(mem_ctx, struct composite_context);
350 if (!c) goto failed;
352 state = talloc_zero(c, struct wreplsrv_pull_table_state);
353 if (!state) goto failed;
354 state->c = c;
355 state->io = io;
357 c->state = COMPOSITE_STATE_IN_PROGRESS;
358 c->event_ctx = service->task->event_ctx;
359 c->private_data = state;
361 if (io->in.num_owners) {
362 state->table_io.out.num_partners = io->in.num_owners;
363 state->table_io.out.partners = io->in.owners;
364 state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
365 composite_done(c);
366 return c;
369 state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
370 state->creq = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
371 if (!state->creq) goto failed;
373 state->creq->async.fn = wreplsrv_pull_table_handler_creq;
374 state->creq->async.private_data = state;
376 return c;
377 failed:
378 talloc_free(c);
379 return NULL;
382 static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
383 struct wreplsrv_pull_table_io *io)
385 NTSTATUS status;
387 status = composite_wait(c);
389 if (NT_STATUS_IS_OK(status)) {
390 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
391 struct wreplsrv_pull_table_state);
392 io->out.num_owners = state->table_io.out.num_partners;
393 io->out.owners = talloc_reference(mem_ctx, state->table_io.out.partners);
396 talloc_free(c);
397 return status;
400 struct wreplsrv_pull_names_io {
401 struct {
402 struct wreplsrv_partner *partner;
403 struct wreplsrv_out_connection *wreplconn;
404 struct wrepl_wins_owner owner;
405 } in;
406 struct {
407 uint32_t num_names;
408 struct wrepl_name *names;
409 } out;
412 enum wreplsrv_pull_names_stage {
413 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
414 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
415 WREPLSRV_PULL_NAMES_STAGE_DONE
418 struct wreplsrv_pull_names_state {
419 enum wreplsrv_pull_names_stage stage;
420 struct composite_context *c;
421 struct wrepl_pull_names pull_io;
422 struct wreplsrv_pull_names_io *io;
423 struct composite_context *creq;
424 struct wreplsrv_out_connection *wreplconn;
425 struct tevent_req *subreq;
428 static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq);
430 static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
432 NTSTATUS status;
434 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
435 NT_STATUS_NOT_OK_RETURN(status);
437 state->pull_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
438 state->pull_io.in.partner = state->io->in.owner;
439 state->subreq = wrepl_pull_names_send(state,
440 state->wreplconn->service->task->event_ctx,
441 state->wreplconn->sock,
442 &state->pull_io);
443 NT_STATUS_HAVE_NO_MEMORY(state->subreq);
445 tevent_req_set_callback(state->subreq,
446 wreplsrv_pull_names_handler_treq,
447 state);
449 state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
451 return NT_STATUS_OK;
454 static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
456 NTSTATUS status;
458 status = wrepl_pull_names_recv(state->subreq, state, &state->pull_io);
459 TALLOC_FREE(state->subreq);
460 NT_STATUS_NOT_OK_RETURN(status);
462 state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
464 return NT_STATUS_OK;
467 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
469 struct composite_context *c = state->c;
471 switch (state->stage) {
472 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
473 c->status = wreplsrv_pull_names_wait_connection(state);
474 break;
475 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
476 c->status = wreplsrv_pull_names_wait_send_reply(state);
477 c->state = COMPOSITE_STATE_DONE;
478 break;
479 case WREPLSRV_PULL_NAMES_STAGE_DONE:
480 c->status = NT_STATUS_INTERNAL_ERROR;
483 if (!NT_STATUS_IS_OK(c->status)) {
484 c->state = COMPOSITE_STATE_ERROR;
487 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
488 c->async.fn(c);
492 static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
494 struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
495 struct wreplsrv_pull_names_state);
496 wreplsrv_pull_names_handler(state);
497 return;
500 static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq)
502 struct wreplsrv_pull_names_state *state = tevent_req_callback_data(subreq,
503 struct wreplsrv_pull_names_state);
504 wreplsrv_pull_names_handler(state);
505 return;
508 static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
510 struct composite_context *c = NULL;
511 struct wreplsrv_service *service = io->in.partner->service;
512 struct wreplsrv_pull_names_state *state = NULL;
513 enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
515 if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
517 c = talloc_zero(mem_ctx, struct composite_context);
518 if (!c) goto failed;
520 state = talloc_zero(c, struct wreplsrv_pull_names_state);
521 if (!state) goto failed;
522 state->c = c;
523 state->io = io;
525 c->state = COMPOSITE_STATE_IN_PROGRESS;
526 c->event_ctx = service->task->event_ctx;
527 c->private_data = state;
529 state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
530 state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
531 if (!state->creq) goto failed;
533 state->creq->async.fn = wreplsrv_pull_names_handler_creq;
534 state->creq->async.private_data = state;
536 return c;
537 failed:
538 talloc_free(c);
539 return NULL;
542 static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
543 struct wreplsrv_pull_names_io *io)
545 NTSTATUS status;
547 status = composite_wait(c);
549 if (NT_STATUS_IS_OK(status)) {
550 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
551 struct wreplsrv_pull_names_state);
552 io->out.num_names = state->pull_io.out.num_names;
553 io->out.names = talloc_reference(mem_ctx, state->pull_io.out.names);
556 talloc_free(c);
557 return status;
561 enum wreplsrv_pull_cycle_stage {
562 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
563 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
564 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
565 WREPLSRV_PULL_CYCLE_STAGE_DONE
568 struct wreplsrv_pull_cycle_state {
569 enum wreplsrv_pull_cycle_stage stage;
570 struct composite_context *c;
571 struct wreplsrv_pull_cycle_io *io;
572 struct wreplsrv_pull_table_io table_io;
573 uint32_t current;
574 struct wreplsrv_pull_names_io names_io;
575 struct composite_context *creq;
576 struct wrepl_associate_stop assoc_stop_io;
577 struct wrepl_request *req;
580 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
581 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req);
583 static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
585 struct wreplsrv_owner *current_owner=NULL;
586 struct wreplsrv_owner *local_owner;
587 uint32_t i;
588 uint64_t old_max_version = 0;
589 bool do_pull = false;
591 for (i=state->current; i < state->table_io.out.num_owners; i++) {
592 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
593 state->io->in.partner->pull.table,
594 state->table_io.out.owners[i].address);
596 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
597 state->io->in.partner->service->table,
598 state->table_io.out.owners[i].address);
600 * this means we are ourself the current owner,
601 * and we don't want replicate ourself
603 if (!current_owner) continue;
606 * this means we don't have any records of this owner
607 * so fetch them
609 if (!local_owner) {
610 do_pull = true;
612 break;
616 * this means the remote partner has some new records of this owner
617 * fetch them
619 if (current_owner->owner.max_version > local_owner->owner.max_version) {
620 do_pull = true;
621 old_max_version = local_owner->owner.max_version;
622 break;
625 state->current = i;
627 if (do_pull) {
628 state->names_io.in.partner = state->io->in.partner;
629 state->names_io.in.wreplconn = state->io->in.wreplconn;
630 state->names_io.in.owner = current_owner->owner;
631 state->names_io.in.owner.min_version = old_max_version + 1;
632 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
633 NT_STATUS_HAVE_NO_MEMORY(state->creq);
635 state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
636 state->creq->async.private_data = state;
638 return STATUS_MORE_ENTRIES;
641 return NT_STATUS_OK;
644 static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
646 NTSTATUS status;
648 status = wreplsrv_pull_cycle_next_owner_do_work(state);
649 if (NT_STATUS_IS_OK(status)) {
650 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
651 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
652 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
653 status = NT_STATUS_OK;
656 if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
657 state->assoc_stop_io.in.assoc_ctx = state->io->in.wreplconn->assoc_ctx.peer_ctx;
658 state->assoc_stop_io.in.reason = 0;
659 state->req = wrepl_associate_stop_send(state->io->in.wreplconn->sock, &state->assoc_stop_io);
660 NT_STATUS_HAVE_NO_MEMORY(state->req);
662 state->req->async.fn = wreplsrv_pull_cycle_handler_req;
663 state->req->async.private_data = state;
665 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
668 return status;
671 static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
673 NTSTATUS status;
674 uint32_t i;
676 status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
677 NT_STATUS_NOT_OK_RETURN(status);
679 /* update partner table */
680 for (i=0; i < state->table_io.out.num_owners; i++) {
681 status = wreplsrv_add_table(state->io->in.partner->service,
682 state->io->in.partner,
683 &state->io->in.partner->pull.table,
684 state->table_io.out.owners[i].address,
685 state->table_io.out.owners[i].max_version);
686 NT_STATUS_NOT_OK_RETURN(status);
689 status = wreplsrv_pull_cycle_next_owner_wrapper(state);
690 NT_STATUS_NOT_OK_RETURN(status);
692 return status;
695 static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
697 NTSTATUS status;
699 status = wreplsrv_apply_records(state->io->in.partner,
700 &state->names_io.in.owner,
701 state->names_io.out.num_names,
702 state->names_io.out.names);
703 NT_STATUS_NOT_OK_RETURN(status);
705 talloc_free(state->names_io.out.names);
706 ZERO_STRUCT(state->names_io);
708 return NT_STATUS_OK;
711 static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
713 NTSTATUS status;
715 status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
716 NT_STATUS_NOT_OK_RETURN(status);
719 * TODO: this should maybe an async call,
720 * because we may need some network access
721 * for conflict resolving
723 status = wreplsrv_pull_cycle_apply_records(state);
724 NT_STATUS_NOT_OK_RETURN(status);
726 status = wreplsrv_pull_cycle_next_owner_wrapper(state);
727 NT_STATUS_NOT_OK_RETURN(status);
729 return status;
732 static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
734 NTSTATUS status;
736 status = wrepl_associate_stop_recv(state->req, &state->assoc_stop_io);
737 NT_STATUS_NOT_OK_RETURN(status);
739 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
741 return status;
744 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
746 struct composite_context *c = state->c;
748 switch (state->stage) {
749 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
750 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
751 break;
752 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
753 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
754 break;
755 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
756 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
757 break;
758 case WREPLSRV_PULL_CYCLE_STAGE_DONE:
759 c->status = NT_STATUS_INTERNAL_ERROR;
762 if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
763 c->state = COMPOSITE_STATE_DONE;
766 if (!NT_STATUS_IS_OK(c->status)) {
767 c->state = COMPOSITE_STATE_ERROR;
770 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
771 c->async.fn(c);
775 static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
777 struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
778 struct wreplsrv_pull_cycle_state);
779 wreplsrv_pull_cycle_handler(state);
780 return;
783 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request *req)
785 struct wreplsrv_pull_cycle_state *state = talloc_get_type(req->async.private_data,
786 struct wreplsrv_pull_cycle_state);
787 wreplsrv_pull_cycle_handler(state);
788 return;
791 struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
793 struct composite_context *c = NULL;
794 struct wreplsrv_service *service = io->in.partner->service;
795 struct wreplsrv_pull_cycle_state *state = NULL;
797 c = talloc_zero(mem_ctx, struct composite_context);
798 if (!c) goto failed;
800 state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
801 if (!state) goto failed;
802 state->c = c;
803 state->io = io;
805 c->state = COMPOSITE_STATE_IN_PROGRESS;
806 c->event_ctx = service->task->event_ctx;
807 c->private_data = state;
809 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
810 state->table_io.in.partner = io->in.partner;
811 state->table_io.in.num_owners = io->in.num_owners;
812 state->table_io.in.owners = io->in.owners;
813 state->creq = wreplsrv_pull_table_send(state, &state->table_io);
814 if (!state->creq) goto failed;
816 state->creq->async.fn = wreplsrv_pull_cycle_handler_creq;
817 state->creq->async.private_data = state;
819 return c;
820 failed:
821 talloc_free(c);
822 return NULL;
825 NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
827 NTSTATUS status;
829 status = composite_wait(c);
831 talloc_free(c);
832 return status;
835 enum wreplsrv_push_notify_stage {
836 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
837 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
838 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
841 struct wreplsrv_push_notify_state {
842 enum wreplsrv_push_notify_stage stage;
843 struct composite_context *c;
844 struct wreplsrv_push_notify_io *io;
845 enum wrepl_replication_cmd command;
846 bool full_table;
847 struct wrepl_send_ctrl ctrl;
848 struct wrepl_request *req;
849 struct wrepl_packet req_packet;
850 struct wrepl_packet *rep_packet;
851 struct composite_context *creq;
852 struct wreplsrv_out_connection *wreplconn;
855 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
856 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
858 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
860 struct wreplsrv_service *service = state->io->in.partner->service;
861 struct wrepl_packet *req = &state->req_packet;
862 struct wrepl_replication *repl_out = &state->req_packet.message.replication;
863 struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
864 struct wreplsrv_in_connection *wrepl_in;
865 NTSTATUS status;
866 struct socket_context *sock;
868 /* prepare the outgoing request */
869 req->opcode = WREPL_OPCODE_BITS;
870 req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
871 req->mess_type = WREPL_REPLICATION;
873 repl_out->command = state->command;
875 status = wreplsrv_fill_wrepl_table(service, state, table_out,
876 service->wins_db->local_owner, state->full_table);
877 NT_STATUS_NOT_OK_RETURN(status);
879 /* queue the request */
880 state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
881 NT_STATUS_HAVE_NO_MEMORY(state->req);
884 * now we need to convert the wrepl_socket (client connection)
885 * into a wreplsrv_in_connection (server connection), because
886 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
887 * message is received by the peer.
890 /* steal the socket_context */
891 sock = state->wreplconn->sock->sock;
892 state->wreplconn->sock->sock = NULL;
893 talloc_steal(state, sock);
896 * TODO: steal the tstream if we switch the client to tsocket.
897 * This is just to get a compiler error as soon as we remove
898 * packet_context.
900 state->wreplconn->sock->packet = NULL;
903 * free the wrepl_socket (client connection)
905 talloc_free(state->wreplconn->sock);
906 state->wreplconn->sock = NULL;
909 * now create a wreplsrv_in_connection,
910 * on which we act as server
912 * NOTE: sock and packet will be stolen by
913 * wreplsrv_in_connection_merge()
915 status = wreplsrv_in_connection_merge(state->io->in.partner,
916 sock, &wrepl_in);
917 NT_STATUS_NOT_OK_RETURN(status);
919 wrepl_in->assoc_ctx.peer_ctx = state->wreplconn->assoc_ctx.peer_ctx;
920 wrepl_in->assoc_ctx.our_ctx = 0;
922 /* now we can free the wreplsrv_out_connection */
923 TALLOC_FREE(state->wreplconn);
925 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
927 return NT_STATUS_OK;
930 static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
932 struct wreplsrv_service *service = state->io->in.partner->service;
933 struct wrepl_packet *req = &state->req_packet;
934 struct wrepl_replication *repl_out = &state->req_packet.message.replication;
935 struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
936 NTSTATUS status;
938 req->opcode = WREPL_OPCODE_BITS;
939 req->assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
940 req->mess_type = WREPL_REPLICATION;
942 repl_out->command = state->command;
944 status = wreplsrv_fill_wrepl_table(service, state, table_out,
945 service->wins_db->local_owner, state->full_table);
946 NT_STATUS_NOT_OK_RETURN(status);
948 /* we won't get a reply to a inform message */
949 state->ctrl.send_only = true;
951 state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
952 NT_STATUS_HAVE_NO_MEMORY(state->req);
954 state->req->async.fn = wreplsrv_push_notify_handler_req;
955 state->req->async.private_data = state;
957 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
959 return NT_STATUS_OK;
962 static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
964 NTSTATUS status;
966 status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
967 NT_STATUS_NOT_OK_RETURN(status);
969 /* is the peer doesn't support inform fallback to update */
970 switch (state->command) {
971 case WREPL_REPL_INFORM:
972 if (state->wreplconn->assoc_ctx.peer_major < 5) {
973 state->command = WREPL_REPL_UPDATE;
975 break;
976 case WREPL_REPL_INFORM2:
977 if (state->wreplconn->assoc_ctx.peer_major < 5) {
978 state->command = WREPL_REPL_UPDATE2;
980 break;
981 default:
982 break;
985 switch (state->command) {
986 case WREPL_REPL_UPDATE:
987 state->full_table = true;
988 return wreplsrv_push_notify_update(state);
989 case WREPL_REPL_UPDATE2:
990 state->full_table = false;
991 return wreplsrv_push_notify_update(state);
992 case WREPL_REPL_INFORM:
993 state->full_table = true;
994 return wreplsrv_push_notify_inform(state);
995 case WREPL_REPL_INFORM2:
996 state->full_table = false;
997 return wreplsrv_push_notify_inform(state);
998 default:
999 return NT_STATUS_INTERNAL_ERROR;
1002 return NT_STATUS_INTERNAL_ERROR;
1005 static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
1007 NTSTATUS status;
1009 status = wrepl_request_recv(state->req, state, NULL);
1010 NT_STATUS_NOT_OK_RETURN(status);
1012 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
1013 return status;
1016 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
1018 struct composite_context *c = state->c;
1020 switch (state->stage) {
1021 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
1022 c->status = wreplsrv_push_notify_wait_connect(state);
1023 break;
1024 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
1025 c->status = wreplsrv_push_notify_wait_inform(state);
1026 break;
1027 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
1028 c->status = NT_STATUS_INTERNAL_ERROR;
1031 if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
1032 c->state = COMPOSITE_STATE_DONE;
1035 if (!NT_STATUS_IS_OK(c->status)) {
1036 c->state = COMPOSITE_STATE_ERROR;
1039 if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
1040 c->async.fn(c);
1044 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
1046 struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
1047 struct wreplsrv_push_notify_state);
1048 wreplsrv_push_notify_handler(state);
1049 return;
1052 static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
1054 struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
1055 struct wreplsrv_push_notify_state);
1056 wreplsrv_push_notify_handler(state);
1057 return;
1060 struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
1062 struct composite_context *c = NULL;
1063 struct wreplsrv_service *service = io->in.partner->service;
1064 struct wreplsrv_push_notify_state *state = NULL;
1065 enum winsrepl_partner_type partner_type;
1067 c = talloc_zero(mem_ctx, struct composite_context);
1068 if (!c) goto failed;
1070 state = talloc_zero(c, struct wreplsrv_push_notify_state);
1071 if (!state) goto failed;
1072 state->c = c;
1073 state->io = io;
1075 if (io->in.inform) {
1076 /* we can cache the connection in partner->push->wreplconn */
1077 partner_type = WINSREPL_PARTNER_PUSH;
1078 if (io->in.propagate) {
1079 state->command = WREPL_REPL_INFORM2;
1080 } else {
1081 state->command = WREPL_REPL_INFORM;
1083 } else {
1084 /* we can NOT cache the connection */
1085 partner_type = WINSREPL_PARTNER_NONE;
1086 if (io->in.propagate) {
1087 state->command = WREPL_REPL_UPDATE2;
1088 } else {
1089 state->command = WREPL_REPL_UPDATE;
1093 c->state = COMPOSITE_STATE_IN_PROGRESS;
1094 c->event_ctx = service->task->event_ctx;
1095 c->private_data = state;
1097 state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
1098 state->creq = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
1099 if (!state->creq) goto failed;
1101 state->creq->async.fn = wreplsrv_push_notify_handler_creq;
1102 state->creq->async.private_data = state;
1104 return c;
1105 failed:
1106 talloc_free(c);
1107 return NULL;
1110 NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
1112 NTSTATUS status;
1114 status = composite_wait(c);
1116 talloc_free(c);
1117 return status;