2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
35 enum wreplsrv_out_connect_stage
{
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
,
37 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
,
38 WREPLSRV_OUT_CONNECT_STAGE_DONE
41 struct wreplsrv_out_connect_state
{
42 enum wreplsrv_out_connect_stage stage
;
43 struct composite_context
*c
;
44 struct wrepl_associate assoc_io
;
45 enum winsrepl_partner_type type
;
46 struct wreplsrv_out_connection
*wreplconn
;
47 struct tevent_req
*subreq
;
50 static void wreplsrv_out_connect_handler_treq(struct tevent_req
*subreq
);
52 static NTSTATUS
wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state
*state
)
56 status
= wrepl_connect_recv(state
->subreq
);
57 TALLOC_FREE(state
->subreq
);
58 NT_STATUS_NOT_OK_RETURN(status
);
60 state
->subreq
= wrepl_associate_send(state
,
61 state
->wreplconn
->service
->task
->event_ctx
,
62 state
->wreplconn
->sock
, &state
->assoc_io
);
63 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
65 tevent_req_set_callback(state
->subreq
,
66 wreplsrv_out_connect_handler_treq
,
69 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
;
74 static NTSTATUS
wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state
*state
)
78 status
= wrepl_associate_recv(state
->subreq
, &state
->assoc_io
);
79 TALLOC_FREE(state
->subreq
);
80 NT_STATUS_NOT_OK_RETURN(status
);
82 state
->wreplconn
->assoc_ctx
.peer_ctx
= state
->assoc_io
.out
.assoc_ctx
;
83 state
->wreplconn
->assoc_ctx
.peer_major
= state
->assoc_io
.out
.major_version
;
85 if (state
->type
== WINSREPL_PARTNER_PUSH
) {
86 if (state
->wreplconn
->assoc_ctx
.peer_major
>= 5) {
87 state
->wreplconn
->partner
->push
.wreplconn
= state
->wreplconn
;
88 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
90 state
->type
= WINSREPL_PARTNER_NONE
;
92 } else if (state
->type
== WINSREPL_PARTNER_PULL
) {
93 state
->wreplconn
->partner
->pull
.wreplconn
= state
->wreplconn
;
94 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
97 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
102 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state
*state
)
104 struct composite_context
*c
= state
->c
;
106 switch (state
->stage
) {
107 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
:
108 c
->status
= wreplsrv_out_connect_wait_socket(state
);
110 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
:
111 c
->status
= wreplsrv_out_connect_wait_assoc_ctx(state
);
112 c
->state
= COMPOSITE_STATE_DONE
;
114 case WREPLSRV_OUT_CONNECT_STAGE_DONE
:
115 c
->status
= NT_STATUS_INTERNAL_ERROR
;
118 if (!NT_STATUS_IS_OK(c
->status
)) {
119 c
->state
= COMPOSITE_STATE_ERROR
;
122 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
127 static void wreplsrv_out_connect_handler_treq(struct tevent_req
*subreq
)
129 struct wreplsrv_out_connect_state
*state
= tevent_req_callback_data(subreq
,
130 struct wreplsrv_out_connect_state
);
131 wreplsrv_out_connect_handler(state
);
135 static struct composite_context
*wreplsrv_out_connect_send(struct wreplsrv_partner
*partner
,
136 enum winsrepl_partner_type type
,
137 struct wreplsrv_out_connection
*wreplconn
)
139 struct composite_context
*c
= NULL
;
140 struct wreplsrv_service
*service
= partner
->service
;
141 struct wreplsrv_out_connect_state
*state
= NULL
;
142 struct wreplsrv_out_connection
**wreplconnp
= &wreplconn
;
143 bool cached_connection
= false;
145 c
= talloc_zero(partner
, struct composite_context
);
148 state
= talloc_zero(c
, struct wreplsrv_out_connect_state
);
149 if (!state
) goto failed
;
153 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
154 c
->event_ctx
= service
->task
->event_ctx
;
155 c
->private_data
= state
;
157 if (type
== WINSREPL_PARTNER_PUSH
) {
158 cached_connection
= true;
159 wreplconn
= partner
->push
.wreplconn
;
160 wreplconnp
= &partner
->push
.wreplconn
;
161 } else if (type
== WINSREPL_PARTNER_PULL
) {
162 cached_connection
= true;
163 wreplconn
= partner
->pull
.wreplconn
;
164 wreplconnp
= &partner
->pull
.wreplconn
;
167 /* we have a connection already, so use it */
169 if (wrepl_socket_is_connected(wreplconn
->sock
)) {
170 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
171 state
->wreplconn
= wreplconn
;
174 } else if (!cached_connection
) {
175 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
176 state
->wreplconn
= NULL
;
180 talloc_free(wreplconn
);
185 wreplconn
= talloc_zero(state
, struct wreplsrv_out_connection
);
186 if (!wreplconn
) goto failed
;
188 wreplconn
->service
= service
;
189 wreplconn
->partner
= partner
;
190 wreplconn
->sock
= wrepl_socket_init(wreplconn
, service
->task
->event_ctx
);
191 if (!wreplconn
->sock
) goto failed
;
193 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
;
194 state
->wreplconn
= wreplconn
;
195 state
->subreq
= wrepl_connect_send(state
,
196 service
->task
->event_ctx
,
198 partner
->our_address
?partner
->our_address
:wrepl_best_ip(service
->task
->lp_ctx
, partner
->address
),
200 if (!state
->subreq
) goto failed
;
202 tevent_req_set_callback(state
->subreq
,
203 wreplsrv_out_connect_handler_treq
,
212 static NTSTATUS
wreplsrv_out_connect_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
213 struct wreplsrv_out_connection
**wreplconn
)
217 status
= composite_wait(c
);
219 if (NT_STATUS_IS_OK(status
)) {
220 struct wreplsrv_out_connect_state
*state
= talloc_get_type(c
->private_data
,
221 struct wreplsrv_out_connect_state
);
222 if (state
->wreplconn
) {
223 *wreplconn
= talloc_reference(mem_ctx
, state
->wreplconn
);
224 if (!*wreplconn
) status
= NT_STATUS_NO_MEMORY
;
226 status
= NT_STATUS_CONNECTION_DISCONNECTED
;
235 struct wreplsrv_pull_table_io
{
237 struct wreplsrv_partner
*partner
;
239 struct wrepl_wins_owner
*owners
;
243 struct wrepl_wins_owner
*owners
;
247 enum wreplsrv_pull_table_stage
{
248 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
,
249 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
,
250 WREPLSRV_PULL_TABLE_STAGE_DONE
253 struct wreplsrv_pull_table_state
{
254 enum wreplsrv_pull_table_stage stage
;
255 struct composite_context
*c
;
256 struct wrepl_pull_table table_io
;
257 struct wreplsrv_pull_table_io
*io
;
258 struct composite_context
*creq
;
259 struct wreplsrv_out_connection
*wreplconn
;
260 struct tevent_req
*subreq
;
263 static void wreplsrv_pull_table_handler_treq(struct tevent_req
*subreq
);
265 static NTSTATUS
wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state
*state
)
269 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
270 NT_STATUS_NOT_OK_RETURN(status
);
272 state
->table_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
273 state
->subreq
= wrepl_pull_table_send(state
,
274 state
->wreplconn
->service
->task
->event_ctx
,
275 state
->wreplconn
->sock
, &state
->table_io
);
276 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
278 tevent_req_set_callback(state
->subreq
,
279 wreplsrv_pull_table_handler_treq
,
282 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
;
287 static NTSTATUS
wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state
*state
)
291 status
= wrepl_pull_table_recv(state
->subreq
, state
, &state
->table_io
);
292 TALLOC_FREE(state
->subreq
);
293 NT_STATUS_NOT_OK_RETURN(status
);
295 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
300 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state
*state
)
302 struct composite_context
*c
= state
->c
;
304 switch (state
->stage
) {
305 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
:
306 c
->status
= wreplsrv_pull_table_wait_connection(state
);
308 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
:
309 c
->status
= wreplsrv_pull_table_wait_table_reply(state
);
310 c
->state
= COMPOSITE_STATE_DONE
;
312 case WREPLSRV_PULL_TABLE_STAGE_DONE
:
313 c
->status
= NT_STATUS_INTERNAL_ERROR
;
316 if (!NT_STATUS_IS_OK(c
->status
)) {
317 c
->state
= COMPOSITE_STATE_ERROR
;
320 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
325 static void wreplsrv_pull_table_handler_creq(struct composite_context
*creq
)
327 struct wreplsrv_pull_table_state
*state
= talloc_get_type(creq
->async
.private_data
,
328 struct wreplsrv_pull_table_state
);
329 wreplsrv_pull_table_handler(state
);
333 static void wreplsrv_pull_table_handler_treq(struct tevent_req
*subreq
)
335 struct wreplsrv_pull_table_state
*state
= tevent_req_callback_data(subreq
,
336 struct wreplsrv_pull_table_state
);
337 wreplsrv_pull_table_handler(state
);
341 static struct composite_context
*wreplsrv_pull_table_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_table_io
*io
)
343 struct composite_context
*c
= NULL
;
344 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
345 struct wreplsrv_pull_table_state
*state
= NULL
;
347 c
= talloc_zero(mem_ctx
, struct composite_context
);
350 state
= talloc_zero(c
, struct wreplsrv_pull_table_state
);
351 if (!state
) goto failed
;
355 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
356 c
->event_ctx
= service
->task
->event_ctx
;
357 c
->private_data
= state
;
359 if (io
->in
.num_owners
) {
360 struct wrepl_wins_owner
*partners
;
363 partners
= talloc_array(state
,
364 struct wrepl_wins_owner
,
366 if (composite_nomem(partners
, c
)) goto failed
;
368 for (i
=0; i
< io
->in
.num_owners
; i
++) {
369 partners
[i
] = io
->in
.owners
[i
];
370 partners
[i
].address
= talloc_strdup(partners
,
371 io
->in
.owners
[i
].address
);
372 if (composite_nomem(partners
[i
].address
, c
)) goto failed
;
375 state
->table_io
.out
.num_partners
= io
->in
.num_owners
;
376 state
->table_io
.out
.partners
= partners
;
377 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
382 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
;
383 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, WINSREPL_PARTNER_PULL
, NULL
);
384 if (!state
->creq
) goto failed
;
386 state
->creq
->async
.fn
= wreplsrv_pull_table_handler_creq
;
387 state
->creq
->async
.private_data
= state
;
395 static NTSTATUS
wreplsrv_pull_table_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
396 struct wreplsrv_pull_table_io
*io
)
400 status
= composite_wait(c
);
402 if (NT_STATUS_IS_OK(status
)) {
403 struct wreplsrv_pull_table_state
*state
= talloc_get_type(c
->private_data
,
404 struct wreplsrv_pull_table_state
);
405 io
->out
.num_owners
= state
->table_io
.out
.num_partners
;
406 io
->out
.owners
= talloc_move(mem_ctx
, &state
->table_io
.out
.partners
);
413 struct wreplsrv_pull_names_io
{
415 struct wreplsrv_partner
*partner
;
416 struct wreplsrv_out_connection
*wreplconn
;
417 struct wrepl_wins_owner owner
;
421 struct wrepl_name
*names
;
425 enum wreplsrv_pull_names_stage
{
426 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
,
427 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
,
428 WREPLSRV_PULL_NAMES_STAGE_DONE
431 struct wreplsrv_pull_names_state
{
432 enum wreplsrv_pull_names_stage stage
;
433 struct composite_context
*c
;
434 struct wrepl_pull_names pull_io
;
435 struct wreplsrv_pull_names_io
*io
;
436 struct composite_context
*creq
;
437 struct wreplsrv_out_connection
*wreplconn
;
438 struct tevent_req
*subreq
;
441 static void wreplsrv_pull_names_handler_treq(struct tevent_req
*subreq
);
443 static NTSTATUS
wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state
*state
)
447 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
448 NT_STATUS_NOT_OK_RETURN(status
);
450 state
->pull_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
451 state
->pull_io
.in
.partner
= state
->io
->in
.owner
;
452 state
->subreq
= wrepl_pull_names_send(state
,
453 state
->wreplconn
->service
->task
->event_ctx
,
454 state
->wreplconn
->sock
,
456 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
458 tevent_req_set_callback(state
->subreq
,
459 wreplsrv_pull_names_handler_treq
,
462 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
;
467 static NTSTATUS
wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state
*state
)
471 status
= wrepl_pull_names_recv(state
->subreq
, state
, &state
->pull_io
);
472 TALLOC_FREE(state
->subreq
);
473 NT_STATUS_NOT_OK_RETURN(status
);
475 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_DONE
;
480 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state
*state
)
482 struct composite_context
*c
= state
->c
;
484 switch (state
->stage
) {
485 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
:
486 c
->status
= wreplsrv_pull_names_wait_connection(state
);
488 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
:
489 c
->status
= wreplsrv_pull_names_wait_send_reply(state
);
490 c
->state
= COMPOSITE_STATE_DONE
;
492 case WREPLSRV_PULL_NAMES_STAGE_DONE
:
493 c
->status
= NT_STATUS_INTERNAL_ERROR
;
496 if (!NT_STATUS_IS_OK(c
->status
)) {
497 c
->state
= COMPOSITE_STATE_ERROR
;
500 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
505 static void wreplsrv_pull_names_handler_creq(struct composite_context
*creq
)
507 struct wreplsrv_pull_names_state
*state
= talloc_get_type(creq
->async
.private_data
,
508 struct wreplsrv_pull_names_state
);
509 wreplsrv_pull_names_handler(state
);
513 static void wreplsrv_pull_names_handler_treq(struct tevent_req
*subreq
)
515 struct wreplsrv_pull_names_state
*state
= tevent_req_callback_data(subreq
,
516 struct wreplsrv_pull_names_state
);
517 wreplsrv_pull_names_handler(state
);
521 static struct composite_context
*wreplsrv_pull_names_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_names_io
*io
)
523 struct composite_context
*c
= NULL
;
524 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
525 struct wreplsrv_pull_names_state
*state
= NULL
;
526 enum winsrepl_partner_type partner_type
= WINSREPL_PARTNER_PULL
;
528 if (io
->in
.wreplconn
) partner_type
= WINSREPL_PARTNER_NONE
;
530 c
= talloc_zero(mem_ctx
, struct composite_context
);
533 state
= talloc_zero(c
, struct wreplsrv_pull_names_state
);
534 if (!state
) goto failed
;
538 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
539 c
->event_ctx
= service
->task
->event_ctx
;
540 c
->private_data
= state
;
542 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
;
543 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, io
->in
.wreplconn
);
544 if (!state
->creq
) goto failed
;
546 state
->creq
->async
.fn
= wreplsrv_pull_names_handler_creq
;
547 state
->creq
->async
.private_data
= state
;
555 static NTSTATUS
wreplsrv_pull_names_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
556 struct wreplsrv_pull_names_io
*io
)
560 status
= composite_wait(c
);
562 if (NT_STATUS_IS_OK(status
)) {
563 struct wreplsrv_pull_names_state
*state
= talloc_get_type(c
->private_data
,
564 struct wreplsrv_pull_names_state
);
565 io
->out
.num_names
= state
->pull_io
.out
.num_names
;
566 io
->out
.names
= talloc_move(mem_ctx
, &state
->pull_io
.out
.names
);
574 enum wreplsrv_pull_cycle_stage
{
575 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
,
576 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
,
577 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
,
578 WREPLSRV_PULL_CYCLE_STAGE_DONE
581 struct wreplsrv_pull_cycle_state
{
582 enum wreplsrv_pull_cycle_stage stage
;
583 struct composite_context
*c
;
584 struct wreplsrv_pull_cycle_io
*io
;
585 struct wreplsrv_pull_table_io table_io
;
587 struct wreplsrv_pull_names_io names_io
;
588 struct composite_context
*creq
;
589 struct wrepl_associate_stop assoc_stop_io
;
590 struct tevent_req
*subreq
;
593 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
);
594 static void wreplsrv_pull_cycle_handler_treq(struct tevent_req
*subreq
);
596 static NTSTATUS
wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state
*state
)
598 struct wreplsrv_owner
*current_owner
=NULL
;
599 struct wreplsrv_owner
*local_owner
;
601 uint64_t old_max_version
= 0;
602 bool do_pull
= false;
604 for (i
=state
->current
; i
< state
->table_io
.out
.num_owners
; i
++) {
605 current_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
606 state
->io
->in
.partner
->pull
.table
,
607 state
->table_io
.out
.owners
[i
].address
);
609 local_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
610 state
->io
->in
.partner
->service
->table
,
611 state
->table_io
.out
.owners
[i
].address
);
613 * this means we are ourself the current owner,
614 * and we don't want replicate ourself
616 if (!current_owner
) continue;
619 * this means we don't have any records of this owner
629 * this means the remote partner has some new records of this owner
632 if (current_owner
->owner
.max_version
> local_owner
->owner
.max_version
) {
634 old_max_version
= local_owner
->owner
.max_version
;
641 state
->names_io
.in
.partner
= state
->io
->in
.partner
;
642 state
->names_io
.in
.wreplconn
= state
->io
->in
.wreplconn
;
643 state
->names_io
.in
.owner
= current_owner
->owner
;
644 state
->names_io
.in
.owner
.min_version
= old_max_version
+ 1;
645 state
->creq
= wreplsrv_pull_names_send(state
, &state
->names_io
);
646 NT_STATUS_HAVE_NO_MEMORY(state
->creq
);
648 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
649 state
->creq
->async
.private_data
= state
;
651 return STATUS_MORE_ENTRIES
;
657 static NTSTATUS
wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state
*state
)
661 status
= wreplsrv_pull_cycle_next_owner_do_work(state
);
662 if (NT_STATUS_IS_OK(status
)) {
663 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
664 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES
, status
)) {
665 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
;
666 status
= NT_STATUS_OK
;
669 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
&& state
->io
->in
.wreplconn
) {
670 state
->assoc_stop_io
.in
.assoc_ctx
= state
->io
->in
.wreplconn
->assoc_ctx
.peer_ctx
;
671 state
->assoc_stop_io
.in
.reason
= 0;
672 state
->subreq
= wrepl_associate_stop_send(state
,
673 state
->io
->in
.wreplconn
->service
->task
->event_ctx
,
674 state
->io
->in
.wreplconn
->sock
,
675 &state
->assoc_stop_io
);
676 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
678 tevent_req_set_callback(state
->subreq
,
679 wreplsrv_pull_cycle_handler_treq
,
682 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
;
688 static NTSTATUS
wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state
*state
)
693 status
= wreplsrv_pull_table_recv(state
->creq
, state
, &state
->table_io
);
694 NT_STATUS_NOT_OK_RETURN(status
);
696 /* update partner table */
697 for (i
=0; i
< state
->table_io
.out
.num_owners
; i
++) {
698 status
= wreplsrv_add_table(state
->io
->in
.partner
->service
,
699 state
->io
->in
.partner
,
700 &state
->io
->in
.partner
->pull
.table
,
701 state
->table_io
.out
.owners
[i
].address
,
702 state
->table_io
.out
.owners
[i
].max_version
);
703 NT_STATUS_NOT_OK_RETURN(status
);
706 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
707 NT_STATUS_NOT_OK_RETURN(status
);
712 static NTSTATUS
wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state
*state
)
716 status
= wreplsrv_apply_records(state
->io
->in
.partner
,
717 &state
->names_io
.in
.owner
,
718 state
->names_io
.out
.num_names
,
719 state
->names_io
.out
.names
);
720 NT_STATUS_NOT_OK_RETURN(status
);
722 talloc_free(state
->names_io
.out
.names
);
723 ZERO_STRUCT(state
->names_io
);
728 static NTSTATUS
wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state
*state
)
732 status
= wreplsrv_pull_names_recv(state
->creq
, state
, &state
->names_io
);
733 NT_STATUS_NOT_OK_RETURN(status
);
736 * TODO: this should maybe an async call,
737 * because we may need some network access
738 * for conflict resolving
740 status
= wreplsrv_pull_cycle_apply_records(state
);
741 NT_STATUS_NOT_OK_RETURN(status
);
743 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
744 NT_STATUS_NOT_OK_RETURN(status
);
749 static NTSTATUS
wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state
*state
)
753 status
= wrepl_associate_stop_recv(state
->subreq
, &state
->assoc_stop_io
);
754 TALLOC_FREE(state
->subreq
);
755 NT_STATUS_NOT_OK_RETURN(status
);
757 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
762 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state
*state
)
764 struct composite_context
*c
= state
->c
;
766 switch (state
->stage
) {
767 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
:
768 c
->status
= wreplsrv_pull_cycle_wait_table_reply(state
);
770 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
:
771 c
->status
= wreplsrv_pull_cycle_wait_send_replies(state
);
773 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
:
774 c
->status
= wreplsrv_pull_cycle_wait_stop_assoc(state
);
776 case WREPLSRV_PULL_CYCLE_STAGE_DONE
:
777 c
->status
= NT_STATUS_INTERNAL_ERROR
;
780 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
) {
781 c
->state
= COMPOSITE_STATE_DONE
;
784 if (!NT_STATUS_IS_OK(c
->status
)) {
785 c
->state
= COMPOSITE_STATE_ERROR
;
788 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
793 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
)
795 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(creq
->async
.private_data
,
796 struct wreplsrv_pull_cycle_state
);
797 wreplsrv_pull_cycle_handler(state
);
801 static void wreplsrv_pull_cycle_handler_treq(struct tevent_req
*subreq
)
803 struct wreplsrv_pull_cycle_state
*state
= tevent_req_callback_data(subreq
,
804 struct wreplsrv_pull_cycle_state
);
805 wreplsrv_pull_cycle_handler(state
);
809 struct composite_context
*wreplsrv_pull_cycle_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_cycle_io
*io
)
811 struct composite_context
*c
= NULL
;
812 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
813 struct wreplsrv_pull_cycle_state
*state
= NULL
;
815 c
= talloc_zero(mem_ctx
, struct composite_context
);
818 state
= talloc_zero(c
, struct wreplsrv_pull_cycle_state
);
819 if (!state
) goto failed
;
823 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
824 c
->event_ctx
= service
->task
->event_ctx
;
825 c
->private_data
= state
;
827 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
;
828 state
->table_io
.in
.partner
= io
->in
.partner
;
829 state
->table_io
.in
.num_owners
= io
->in
.num_owners
;
830 state
->table_io
.in
.owners
= io
->in
.owners
;
831 state
->creq
= wreplsrv_pull_table_send(state
, &state
->table_io
);
832 if (!state
->creq
) goto failed
;
834 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
835 state
->creq
->async
.private_data
= state
;
843 NTSTATUS
wreplsrv_pull_cycle_recv(struct composite_context
*c
)
847 status
= composite_wait(c
);
853 enum wreplsrv_push_notify_stage
{
854 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
,
855 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE
,
856 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
,
857 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
860 struct wreplsrv_push_notify_state
{
861 enum wreplsrv_push_notify_stage stage
;
862 struct composite_context
*c
;
863 struct wreplsrv_push_notify_io
*io
;
864 enum wrepl_replication_cmd command
;
866 struct wrepl_send_ctrl ctrl
;
867 struct wrepl_packet req_packet
;
868 struct wrepl_packet
*rep_packet
;
869 struct composite_context
*creq
;
870 struct wreplsrv_out_connection
*wreplconn
;
871 struct tevent_req
*subreq
;
874 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
);
875 static void wreplsrv_push_notify_handler_treq(struct tevent_req
*subreq
);
877 static NTSTATUS
wreplsrv_push_notify_update(struct wreplsrv_push_notify_state
*state
)
879 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
880 struct wrepl_packet
*req
= &state
->req_packet
;
881 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
882 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
885 /* prepare the outgoing request */
886 req
->opcode
= WREPL_OPCODE_BITS
;
887 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
888 req
->mess_type
= WREPL_REPLICATION
;
890 repl_out
->command
= state
->command
;
892 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
893 service
->wins_db
->local_owner
, state
->full_table
);
894 NT_STATUS_NOT_OK_RETURN(status
);
896 /* queue the request */
897 state
->subreq
= wrepl_request_send(state
,
898 state
->wreplconn
->service
->task
->event_ctx
,
899 state
->wreplconn
->sock
, req
, NULL
);
900 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
902 tevent_req_set_callback(state
->subreq
,
903 wreplsrv_push_notify_handler_treq
,
906 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE
;
911 static NTSTATUS
wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state
*state
)
913 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
914 struct wrepl_packet
*req
= &state
->req_packet
;
915 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
916 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
919 req
->opcode
= WREPL_OPCODE_BITS
;
920 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
921 req
->mess_type
= WREPL_REPLICATION
;
923 repl_out
->command
= state
->command
;
925 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
926 service
->wins_db
->local_owner
, state
->full_table
);
927 NT_STATUS_NOT_OK_RETURN(status
);
929 /* we won't get a reply to a inform message */
930 state
->ctrl
.send_only
= true;
932 state
->subreq
= wrepl_request_send(state
,
933 state
->wreplconn
->service
->task
->event_ctx
,
934 state
->wreplconn
->sock
, req
, &state
->ctrl
);
935 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
937 tevent_req_set_callback(state
->subreq
,
938 wreplsrv_push_notify_handler_treq
,
941 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
;
946 static NTSTATUS
wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state
*state
)
950 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
951 NT_STATUS_NOT_OK_RETURN(status
);
953 /* is the peer doesn't support inform fallback to update */
954 switch (state
->command
) {
955 case WREPL_REPL_INFORM
:
956 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
957 state
->command
= WREPL_REPL_UPDATE
;
960 case WREPL_REPL_INFORM2
:
961 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
962 state
->command
= WREPL_REPL_UPDATE2
;
969 switch (state
->command
) {
970 case WREPL_REPL_UPDATE
:
971 state
->full_table
= true;
972 return wreplsrv_push_notify_update(state
);
973 case WREPL_REPL_UPDATE2
:
974 state
->full_table
= false;
975 return wreplsrv_push_notify_update(state
);
976 case WREPL_REPL_INFORM
:
977 state
->full_table
= true;
978 return wreplsrv_push_notify_inform(state
);
979 case WREPL_REPL_INFORM2
:
980 state
->full_table
= false;
981 return wreplsrv_push_notify_inform(state
);
983 return NT_STATUS_INTERNAL_ERROR
;
987 static NTSTATUS
wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state
*state
)
989 struct wreplsrv_in_connection
*wrepl_in
;
990 struct tstream_context
*stream
;
991 void *process_context
= NULL
;
994 status
= wrepl_request_recv(state
->subreq
, state
, NULL
);
995 TALLOC_FREE(state
->subreq
);
996 NT_STATUS_NOT_OK_RETURN(status
);
999 * now we need to convert the wrepl_socket (client connection)
1000 * into a wreplsrv_in_connection (server connection), because
1001 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
1002 * message is received by the peer.
1005 status
= wrepl_socket_split_stream(state
->wreplconn
->sock
, state
, &stream
);
1006 NT_STATUS_NOT_OK_RETURN(status
);
1009 * now create a wreplsrv_in_connection,
1010 * on which we act as server
1012 * NOTE: stream will be stolen by
1013 * wreplsrv_in_connection_merge()
1015 process_context
= state
->io
->in
.partner
->service
->task
->process_context
;
1016 status
= wreplsrv_in_connection_merge(state
->io
->in
.partner
,
1017 state
->wreplconn
->assoc_ctx
.peer_ctx
,
1019 &wrepl_in
, process_context
);
1020 NT_STATUS_NOT_OK_RETURN(status
);
1022 /* now we can free the wreplsrv_out_connection */
1023 TALLOC_FREE(state
->wreplconn
);
1025 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1026 return NT_STATUS_OK
;
1029 static NTSTATUS
wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state
*state
)
1033 status
= wrepl_request_recv(state
->subreq
, state
, NULL
);
1034 TALLOC_FREE(state
->subreq
);
1035 NT_STATUS_NOT_OK_RETURN(status
);
1037 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1041 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state
*state
)
1043 struct composite_context
*c
= state
->c
;
1045 switch (state
->stage
) {
1046 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
:
1047 c
->status
= wreplsrv_push_notify_wait_connect(state
);
1049 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE
:
1050 c
->status
= wreplsrv_push_notify_wait_update(state
);
1052 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
:
1053 c
->status
= wreplsrv_push_notify_wait_inform(state
);
1055 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE
:
1056 c
->status
= NT_STATUS_INTERNAL_ERROR
;
1059 if (state
->stage
== WREPLSRV_PUSH_NOTIFY_STAGE_DONE
) {
1060 c
->state
= COMPOSITE_STATE_DONE
;
1063 if (!NT_STATUS_IS_OK(c
->status
)) {
1064 c
->state
= COMPOSITE_STATE_ERROR
;
1067 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
1072 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
)
1074 struct wreplsrv_push_notify_state
*state
= talloc_get_type(creq
->async
.private_data
,
1075 struct wreplsrv_push_notify_state
);
1076 wreplsrv_push_notify_handler(state
);
1080 static void wreplsrv_push_notify_handler_treq(struct tevent_req
*subreq
)
1082 struct wreplsrv_push_notify_state
*state
= tevent_req_callback_data(subreq
,
1083 struct wreplsrv_push_notify_state
);
1084 wreplsrv_push_notify_handler(state
);
1088 struct composite_context
*wreplsrv_push_notify_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_push_notify_io
*io
)
1090 struct composite_context
*c
= NULL
;
1091 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
1092 struct wreplsrv_push_notify_state
*state
= NULL
;
1093 enum winsrepl_partner_type partner_type
;
1095 c
= talloc_zero(mem_ctx
, struct composite_context
);
1096 if (!c
) goto failed
;
1098 state
= talloc_zero(c
, struct wreplsrv_push_notify_state
);
1099 if (!state
) goto failed
;
1103 if (io
->in
.inform
) {
1104 /* we can cache the connection in partner->push->wreplconn */
1105 partner_type
= WINSREPL_PARTNER_PUSH
;
1106 if (io
->in
.propagate
) {
1107 state
->command
= WREPL_REPL_INFORM2
;
1109 state
->command
= WREPL_REPL_INFORM
;
1112 /* we can NOT cache the connection */
1113 partner_type
= WINSREPL_PARTNER_NONE
;
1114 if (io
->in
.propagate
) {
1115 state
->command
= WREPL_REPL_UPDATE2
;
1117 state
->command
= WREPL_REPL_UPDATE
;
1121 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
1122 c
->event_ctx
= service
->task
->event_ctx
;
1123 c
->private_data
= state
;
1125 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
;
1126 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, NULL
);
1127 if (!state
->creq
) goto failed
;
1129 state
->creq
->async
.fn
= wreplsrv_push_notify_handler_creq
;
1130 state
->creq
->async
.private_data
= state
;
1138 NTSTATUS
wreplsrv_push_notify_recv(struct composite_context
*c
)
1142 status
= composite_wait(c
);