2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
35 enum wreplsrv_out_connect_stage
{
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
,
37 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
,
38 WREPLSRV_OUT_CONNECT_STAGE_DONE
41 struct wreplsrv_out_connect_state
{
42 enum wreplsrv_out_connect_stage stage
;
43 struct composite_context
*c
;
44 struct wrepl_associate assoc_io
;
45 enum winsrepl_partner_type type
;
46 struct wreplsrv_out_connection
*wreplconn
;
47 struct tevent_req
*subreq
;
50 static void wreplsrv_out_connect_handler_treq(struct tevent_req
*subreq
);
52 static NTSTATUS
wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state
*state
)
56 status
= wrepl_connect_recv(state
->subreq
);
57 TALLOC_FREE(state
->subreq
);
58 NT_STATUS_NOT_OK_RETURN(status
);
60 state
->subreq
= wrepl_associate_send(state
,
61 state
->wreplconn
->service
->task
->event_ctx
,
62 state
->wreplconn
->sock
, &state
->assoc_io
);
63 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
65 tevent_req_set_callback(state
->subreq
,
66 wreplsrv_out_connect_handler_treq
,
69 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
;
74 static NTSTATUS
wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state
*state
)
78 status
= wrepl_associate_recv(state
->subreq
, &state
->assoc_io
);
79 TALLOC_FREE(state
->subreq
);
80 NT_STATUS_NOT_OK_RETURN(status
);
82 state
->wreplconn
->assoc_ctx
.peer_ctx
= state
->assoc_io
.out
.assoc_ctx
;
83 state
->wreplconn
->assoc_ctx
.peer_major
= state
->assoc_io
.out
.major_version
;
85 if (state
->type
== WINSREPL_PARTNER_PUSH
) {
86 if (state
->wreplconn
->assoc_ctx
.peer_major
>= 5) {
87 state
->wreplconn
->partner
->push
.wreplconn
= state
->wreplconn
;
88 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
90 state
->type
= WINSREPL_PARTNER_NONE
;
92 } else if (state
->type
== WINSREPL_PARTNER_PULL
) {
93 state
->wreplconn
->partner
->pull
.wreplconn
= state
->wreplconn
;
94 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
97 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
102 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state
*state
)
104 struct composite_context
*c
= state
->c
;
106 switch (state
->stage
) {
107 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
:
108 c
->status
= wreplsrv_out_connect_wait_socket(state
);
110 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
:
111 c
->status
= wreplsrv_out_connect_wait_assoc_ctx(state
);
112 c
->state
= COMPOSITE_STATE_DONE
;
114 case WREPLSRV_OUT_CONNECT_STAGE_DONE
:
115 c
->status
= NT_STATUS_INTERNAL_ERROR
;
118 if (!NT_STATUS_IS_OK(c
->status
)) {
119 c
->state
= COMPOSITE_STATE_ERROR
;
122 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
127 static void wreplsrv_out_connect_handler_treq(struct tevent_req
*subreq
)
129 struct wreplsrv_out_connect_state
*state
= tevent_req_callback_data(subreq
,
130 struct wreplsrv_out_connect_state
);
131 wreplsrv_out_connect_handler(state
);
135 static struct composite_context
*wreplsrv_out_connect_send(struct wreplsrv_partner
*partner
,
136 enum winsrepl_partner_type type
,
137 struct wreplsrv_out_connection
*wreplconn
)
139 struct composite_context
*c
= NULL
;
140 struct wreplsrv_service
*service
= partner
->service
;
141 struct wreplsrv_out_connect_state
*state
= NULL
;
142 struct wreplsrv_out_connection
**wreplconnp
= &wreplconn
;
143 bool cached_connection
= false;
145 c
= talloc_zero(partner
, struct composite_context
);
148 state
= talloc_zero(c
, struct wreplsrv_out_connect_state
);
149 if (!state
) goto failed
;
153 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
154 c
->event_ctx
= service
->task
->event_ctx
;
155 c
->private_data
= state
;
157 if (type
== WINSREPL_PARTNER_PUSH
) {
158 cached_connection
= true;
159 wreplconn
= partner
->push
.wreplconn
;
160 wreplconnp
= &partner
->push
.wreplconn
;
161 } else if (type
== WINSREPL_PARTNER_PULL
) {
162 cached_connection
= true;
163 wreplconn
= partner
->pull
.wreplconn
;
164 wreplconnp
= &partner
->pull
.wreplconn
;
167 /* we have a connection already, so use it */
169 if (wrepl_socket_is_connected(wreplconn
->sock
)) {
170 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
171 state
->wreplconn
= wreplconn
;
174 } else if (!cached_connection
) {
175 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
176 state
->wreplconn
= NULL
;
180 talloc_free(wreplconn
);
185 wreplconn
= talloc_zero(state
, struct wreplsrv_out_connection
);
186 if (!wreplconn
) goto failed
;
188 wreplconn
->service
= service
;
189 wreplconn
->partner
= partner
;
190 wreplconn
->sock
= wrepl_socket_init(wreplconn
, service
->task
->event_ctx
, lp_iconv_convenience(service
->task
->lp_ctx
));
191 if (!wreplconn
->sock
) goto failed
;
193 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
;
194 state
->wreplconn
= wreplconn
;
195 state
->subreq
= wrepl_connect_send(state
,
196 service
->task
->event_ctx
,
198 partner
->our_address
?partner
->our_address
:wrepl_best_ip(service
->task
->lp_ctx
, partner
->address
),
200 if (!state
->subreq
) goto failed
;
202 tevent_req_set_callback(state
->subreq
,
203 wreplsrv_out_connect_handler_treq
,
212 static NTSTATUS
wreplsrv_out_connect_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
213 struct wreplsrv_out_connection
**wreplconn
)
217 status
= composite_wait(c
);
219 if (NT_STATUS_IS_OK(status
)) {
220 struct wreplsrv_out_connect_state
*state
= talloc_get_type(c
->private_data
,
221 struct wreplsrv_out_connect_state
);
222 if (state
->wreplconn
) {
223 *wreplconn
= talloc_reference(mem_ctx
, state
->wreplconn
);
224 if (!*wreplconn
) status
= NT_STATUS_NO_MEMORY
;
226 status
= NT_STATUS_INVALID_CONNECTION
;
235 struct wreplsrv_pull_table_io
{
237 struct wreplsrv_partner
*partner
;
239 struct wrepl_wins_owner
*owners
;
243 struct wrepl_wins_owner
*owners
;
247 enum wreplsrv_pull_table_stage
{
248 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
,
249 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
,
250 WREPLSRV_PULL_TABLE_STAGE_DONE
253 struct wreplsrv_pull_table_state
{
254 enum wreplsrv_pull_table_stage stage
;
255 struct composite_context
*c
;
256 struct wrepl_pull_table table_io
;
257 struct wreplsrv_pull_table_io
*io
;
258 struct composite_context
*creq
;
259 struct wreplsrv_out_connection
*wreplconn
;
260 struct tevent_req
*subreq
;
263 static void wreplsrv_pull_table_handler_treq(struct tevent_req
*subreq
);
265 static NTSTATUS
wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state
*state
)
269 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
270 NT_STATUS_NOT_OK_RETURN(status
);
272 state
->table_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
273 state
->subreq
= wrepl_pull_table_send(state
,
274 state
->wreplconn
->service
->task
->event_ctx
,
275 state
->wreplconn
->sock
, &state
->table_io
);
276 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
278 tevent_req_set_callback(state
->subreq
,
279 wreplsrv_pull_table_handler_treq
,
282 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
;
287 static NTSTATUS
wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state
*state
)
291 status
= wrepl_pull_table_recv(state
->subreq
, state
, &state
->table_io
);
292 TALLOC_FREE(state
->subreq
);
293 NT_STATUS_NOT_OK_RETURN(status
);
295 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
300 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state
*state
)
302 struct composite_context
*c
= state
->c
;
304 switch (state
->stage
) {
305 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
:
306 c
->status
= wreplsrv_pull_table_wait_connection(state
);
308 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
:
309 c
->status
= wreplsrv_pull_table_wait_table_reply(state
);
310 c
->state
= COMPOSITE_STATE_DONE
;
312 case WREPLSRV_PULL_TABLE_STAGE_DONE
:
313 c
->status
= NT_STATUS_INTERNAL_ERROR
;
316 if (!NT_STATUS_IS_OK(c
->status
)) {
317 c
->state
= COMPOSITE_STATE_ERROR
;
320 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
325 static void wreplsrv_pull_table_handler_creq(struct composite_context
*creq
)
327 struct wreplsrv_pull_table_state
*state
= talloc_get_type(creq
->async
.private_data
,
328 struct wreplsrv_pull_table_state
);
329 wreplsrv_pull_table_handler(state
);
333 static void wreplsrv_pull_table_handler_treq(struct tevent_req
*subreq
)
335 struct wreplsrv_pull_table_state
*state
= tevent_req_callback_data(subreq
,
336 struct wreplsrv_pull_table_state
);
337 wreplsrv_pull_table_handler(state
);
341 static struct composite_context
*wreplsrv_pull_table_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_table_io
*io
)
343 struct composite_context
*c
= NULL
;
344 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
345 struct wreplsrv_pull_table_state
*state
= NULL
;
347 c
= talloc_zero(mem_ctx
, struct composite_context
);
350 state
= talloc_zero(c
, struct wreplsrv_pull_table_state
);
351 if (!state
) goto failed
;
355 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
356 c
->event_ctx
= service
->task
->event_ctx
;
357 c
->private_data
= state
;
359 if (io
->in
.num_owners
) {
360 state
->table_io
.out
.num_partners
= io
->in
.num_owners
;
361 state
->table_io
.out
.partners
= io
->in
.owners
;
362 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
367 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
;
368 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, WINSREPL_PARTNER_PULL
, NULL
);
369 if (!state
->creq
) goto failed
;
371 state
->creq
->async
.fn
= wreplsrv_pull_table_handler_creq
;
372 state
->creq
->async
.private_data
= state
;
380 static NTSTATUS
wreplsrv_pull_table_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
381 struct wreplsrv_pull_table_io
*io
)
385 status
= composite_wait(c
);
387 if (NT_STATUS_IS_OK(status
)) {
388 struct wreplsrv_pull_table_state
*state
= talloc_get_type(c
->private_data
,
389 struct wreplsrv_pull_table_state
);
390 io
->out
.num_owners
= state
->table_io
.out
.num_partners
;
391 io
->out
.owners
= talloc_reference(mem_ctx
, state
->table_io
.out
.partners
);
398 struct wreplsrv_pull_names_io
{
400 struct wreplsrv_partner
*partner
;
401 struct wreplsrv_out_connection
*wreplconn
;
402 struct wrepl_wins_owner owner
;
406 struct wrepl_name
*names
;
410 enum wreplsrv_pull_names_stage
{
411 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
,
412 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
,
413 WREPLSRV_PULL_NAMES_STAGE_DONE
416 struct wreplsrv_pull_names_state
{
417 enum wreplsrv_pull_names_stage stage
;
418 struct composite_context
*c
;
419 struct wrepl_pull_names pull_io
;
420 struct wreplsrv_pull_names_io
*io
;
421 struct composite_context
*creq
;
422 struct wreplsrv_out_connection
*wreplconn
;
423 struct tevent_req
*subreq
;
426 static void wreplsrv_pull_names_handler_treq(struct tevent_req
*subreq
);
428 static NTSTATUS
wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state
*state
)
432 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
433 NT_STATUS_NOT_OK_RETURN(status
);
435 state
->pull_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
436 state
->pull_io
.in
.partner
= state
->io
->in
.owner
;
437 state
->subreq
= wrepl_pull_names_send(state
,
438 state
->wreplconn
->service
->task
->event_ctx
,
439 state
->wreplconn
->sock
,
441 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
443 tevent_req_set_callback(state
->subreq
,
444 wreplsrv_pull_names_handler_treq
,
447 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
;
452 static NTSTATUS
wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state
*state
)
456 status
= wrepl_pull_names_recv(state
->subreq
, state
, &state
->pull_io
);
457 TALLOC_FREE(state
->subreq
);
458 NT_STATUS_NOT_OK_RETURN(status
);
460 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_DONE
;
465 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state
*state
)
467 struct composite_context
*c
= state
->c
;
469 switch (state
->stage
) {
470 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
:
471 c
->status
= wreplsrv_pull_names_wait_connection(state
);
473 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
:
474 c
->status
= wreplsrv_pull_names_wait_send_reply(state
);
475 c
->state
= COMPOSITE_STATE_DONE
;
477 case WREPLSRV_PULL_NAMES_STAGE_DONE
:
478 c
->status
= NT_STATUS_INTERNAL_ERROR
;
481 if (!NT_STATUS_IS_OK(c
->status
)) {
482 c
->state
= COMPOSITE_STATE_ERROR
;
485 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
490 static void wreplsrv_pull_names_handler_creq(struct composite_context
*creq
)
492 struct wreplsrv_pull_names_state
*state
= talloc_get_type(creq
->async
.private_data
,
493 struct wreplsrv_pull_names_state
);
494 wreplsrv_pull_names_handler(state
);
498 static void wreplsrv_pull_names_handler_treq(struct tevent_req
*subreq
)
500 struct wreplsrv_pull_names_state
*state
= tevent_req_callback_data(subreq
,
501 struct wreplsrv_pull_names_state
);
502 wreplsrv_pull_names_handler(state
);
506 static struct composite_context
*wreplsrv_pull_names_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_names_io
*io
)
508 struct composite_context
*c
= NULL
;
509 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
510 struct wreplsrv_pull_names_state
*state
= NULL
;
511 enum winsrepl_partner_type partner_type
= WINSREPL_PARTNER_PULL
;
513 if (io
->in
.wreplconn
) partner_type
= WINSREPL_PARTNER_NONE
;
515 c
= talloc_zero(mem_ctx
, struct composite_context
);
518 state
= talloc_zero(c
, struct wreplsrv_pull_names_state
);
519 if (!state
) goto failed
;
523 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
524 c
->event_ctx
= service
->task
->event_ctx
;
525 c
->private_data
= state
;
527 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
;
528 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, io
->in
.wreplconn
);
529 if (!state
->creq
) goto failed
;
531 state
->creq
->async
.fn
= wreplsrv_pull_names_handler_creq
;
532 state
->creq
->async
.private_data
= state
;
540 static NTSTATUS
wreplsrv_pull_names_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
541 struct wreplsrv_pull_names_io
*io
)
545 status
= composite_wait(c
);
547 if (NT_STATUS_IS_OK(status
)) {
548 struct wreplsrv_pull_names_state
*state
= talloc_get_type(c
->private_data
,
549 struct wreplsrv_pull_names_state
);
550 io
->out
.num_names
= state
->pull_io
.out
.num_names
;
551 io
->out
.names
= talloc_reference(mem_ctx
, state
->pull_io
.out
.names
);
559 enum wreplsrv_pull_cycle_stage
{
560 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
,
561 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
,
562 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
,
563 WREPLSRV_PULL_CYCLE_STAGE_DONE
566 struct wreplsrv_pull_cycle_state
{
567 enum wreplsrv_pull_cycle_stage stage
;
568 struct composite_context
*c
;
569 struct wreplsrv_pull_cycle_io
*io
;
570 struct wreplsrv_pull_table_io table_io
;
572 struct wreplsrv_pull_names_io names_io
;
573 struct composite_context
*creq
;
574 struct wrepl_associate_stop assoc_stop_io
;
575 struct tevent_req
*subreq
;
578 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
);
579 static void wreplsrv_pull_cycle_handler_treq(struct tevent_req
*subreq
);
581 static NTSTATUS
wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state
*state
)
583 struct wreplsrv_owner
*current_owner
=NULL
;
584 struct wreplsrv_owner
*local_owner
;
586 uint64_t old_max_version
= 0;
587 bool do_pull
= false;
589 for (i
=state
->current
; i
< state
->table_io
.out
.num_owners
; i
++) {
590 current_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
591 state
->io
->in
.partner
->pull
.table
,
592 state
->table_io
.out
.owners
[i
].address
);
594 local_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
595 state
->io
->in
.partner
->service
->table
,
596 state
->table_io
.out
.owners
[i
].address
);
598 * this means we are ourself the current owner,
599 * and we don't want replicate ourself
601 if (!current_owner
) continue;
604 * this means we don't have any records of this owner
614 * this means the remote partner has some new records of this owner
617 if (current_owner
->owner
.max_version
> local_owner
->owner
.max_version
) {
619 old_max_version
= local_owner
->owner
.max_version
;
626 state
->names_io
.in
.partner
= state
->io
->in
.partner
;
627 state
->names_io
.in
.wreplconn
= state
->io
->in
.wreplconn
;
628 state
->names_io
.in
.owner
= current_owner
->owner
;
629 state
->names_io
.in
.owner
.min_version
= old_max_version
+ 1;
630 state
->creq
= wreplsrv_pull_names_send(state
, &state
->names_io
);
631 NT_STATUS_HAVE_NO_MEMORY(state
->creq
);
633 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
634 state
->creq
->async
.private_data
= state
;
636 return STATUS_MORE_ENTRIES
;
642 static NTSTATUS
wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state
*state
)
646 status
= wreplsrv_pull_cycle_next_owner_do_work(state
);
647 if (NT_STATUS_IS_OK(status
)) {
648 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
649 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES
, status
)) {
650 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
;
651 status
= NT_STATUS_OK
;
654 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
&& state
->io
->in
.wreplconn
) {
655 state
->assoc_stop_io
.in
.assoc_ctx
= state
->io
->in
.wreplconn
->assoc_ctx
.peer_ctx
;
656 state
->assoc_stop_io
.in
.reason
= 0;
657 state
->subreq
= wrepl_associate_stop_send(state
,
658 state
->io
->in
.wreplconn
->service
->task
->event_ctx
,
659 state
->io
->in
.wreplconn
->sock
,
660 &state
->assoc_stop_io
);
661 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
663 tevent_req_set_callback(state
->subreq
,
664 wreplsrv_pull_cycle_handler_treq
,
667 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
;
673 static NTSTATUS
wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state
*state
)
678 status
= wreplsrv_pull_table_recv(state
->creq
, state
, &state
->table_io
);
679 NT_STATUS_NOT_OK_RETURN(status
);
681 /* update partner table */
682 for (i
=0; i
< state
->table_io
.out
.num_owners
; i
++) {
683 status
= wreplsrv_add_table(state
->io
->in
.partner
->service
,
684 state
->io
->in
.partner
,
685 &state
->io
->in
.partner
->pull
.table
,
686 state
->table_io
.out
.owners
[i
].address
,
687 state
->table_io
.out
.owners
[i
].max_version
);
688 NT_STATUS_NOT_OK_RETURN(status
);
691 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
692 NT_STATUS_NOT_OK_RETURN(status
);
697 static NTSTATUS
wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state
*state
)
701 status
= wreplsrv_apply_records(state
->io
->in
.partner
,
702 &state
->names_io
.in
.owner
,
703 state
->names_io
.out
.num_names
,
704 state
->names_io
.out
.names
);
705 NT_STATUS_NOT_OK_RETURN(status
);
707 talloc_free(state
->names_io
.out
.names
);
708 ZERO_STRUCT(state
->names_io
);
713 static NTSTATUS
wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state
*state
)
717 status
= wreplsrv_pull_names_recv(state
->creq
, state
, &state
->names_io
);
718 NT_STATUS_NOT_OK_RETURN(status
);
721 * TODO: this should maybe an async call,
722 * because we may need some network access
723 * for conflict resolving
725 status
= wreplsrv_pull_cycle_apply_records(state
);
726 NT_STATUS_NOT_OK_RETURN(status
);
728 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
729 NT_STATUS_NOT_OK_RETURN(status
);
734 static NTSTATUS
wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state
*state
)
738 status
= wrepl_associate_stop_recv(state
->subreq
, &state
->assoc_stop_io
);
739 TALLOC_FREE(state
->subreq
);
740 NT_STATUS_NOT_OK_RETURN(status
);
742 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
747 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state
*state
)
749 struct composite_context
*c
= state
->c
;
751 switch (state
->stage
) {
752 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
:
753 c
->status
= wreplsrv_pull_cycle_wait_table_reply(state
);
755 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
:
756 c
->status
= wreplsrv_pull_cycle_wait_send_replies(state
);
758 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
:
759 c
->status
= wreplsrv_pull_cycle_wait_stop_assoc(state
);
761 case WREPLSRV_PULL_CYCLE_STAGE_DONE
:
762 c
->status
= NT_STATUS_INTERNAL_ERROR
;
765 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
) {
766 c
->state
= COMPOSITE_STATE_DONE
;
769 if (!NT_STATUS_IS_OK(c
->status
)) {
770 c
->state
= COMPOSITE_STATE_ERROR
;
773 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
778 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
)
780 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(creq
->async
.private_data
,
781 struct wreplsrv_pull_cycle_state
);
782 wreplsrv_pull_cycle_handler(state
);
786 static void wreplsrv_pull_cycle_handler_treq(struct tevent_req
*subreq
)
788 struct wreplsrv_pull_cycle_state
*state
= tevent_req_callback_data(subreq
,
789 struct wreplsrv_pull_cycle_state
);
790 wreplsrv_pull_cycle_handler(state
);
794 struct composite_context
*wreplsrv_pull_cycle_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_cycle_io
*io
)
796 struct composite_context
*c
= NULL
;
797 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
798 struct wreplsrv_pull_cycle_state
*state
= NULL
;
800 c
= talloc_zero(mem_ctx
, struct composite_context
);
803 state
= talloc_zero(c
, struct wreplsrv_pull_cycle_state
);
804 if (!state
) goto failed
;
808 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
809 c
->event_ctx
= service
->task
->event_ctx
;
810 c
->private_data
= state
;
812 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
;
813 state
->table_io
.in
.partner
= io
->in
.partner
;
814 state
->table_io
.in
.num_owners
= io
->in
.num_owners
;
815 state
->table_io
.in
.owners
= io
->in
.owners
;
816 state
->creq
= wreplsrv_pull_table_send(state
, &state
->table_io
);
817 if (!state
->creq
) goto failed
;
819 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
820 state
->creq
->async
.private_data
= state
;
828 NTSTATUS
wreplsrv_pull_cycle_recv(struct composite_context
*c
)
832 status
= composite_wait(c
);
838 enum wreplsrv_push_notify_stage
{
839 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
,
840 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE
,
841 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
,
842 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
845 struct wreplsrv_push_notify_state
{
846 enum wreplsrv_push_notify_stage stage
;
847 struct composite_context
*c
;
848 struct wreplsrv_push_notify_io
*io
;
849 enum wrepl_replication_cmd command
;
851 struct wrepl_send_ctrl ctrl
;
852 struct wrepl_packet req_packet
;
853 struct wrepl_packet
*rep_packet
;
854 struct composite_context
*creq
;
855 struct wreplsrv_out_connection
*wreplconn
;
856 struct tevent_req
*subreq
;
859 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
);
860 static void wreplsrv_push_notify_handler_treq(struct tevent_req
*subreq
);
862 static NTSTATUS
wreplsrv_push_notify_update(struct wreplsrv_push_notify_state
*state
)
864 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
865 struct wrepl_packet
*req
= &state
->req_packet
;
866 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
867 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
870 /* prepare the outgoing request */
871 req
->opcode
= WREPL_OPCODE_BITS
;
872 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
873 req
->mess_type
= WREPL_REPLICATION
;
875 repl_out
->command
= state
->command
;
877 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
878 service
->wins_db
->local_owner
, state
->full_table
);
879 NT_STATUS_NOT_OK_RETURN(status
);
881 /* queue the request */
882 state
->subreq
= wrepl_request_send(state
,
883 state
->wreplconn
->service
->task
->event_ctx
,
884 state
->wreplconn
->sock
, req
, NULL
);
885 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
887 tevent_req_set_callback(state
->subreq
,
888 wreplsrv_push_notify_handler_treq
,
891 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE
;
896 static NTSTATUS
wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state
*state
)
898 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
899 struct wrepl_packet
*req
= &state
->req_packet
;
900 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
901 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
904 req
->opcode
= WREPL_OPCODE_BITS
;
905 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
906 req
->mess_type
= WREPL_REPLICATION
;
908 repl_out
->command
= state
->command
;
910 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
911 service
->wins_db
->local_owner
, state
->full_table
);
912 NT_STATUS_NOT_OK_RETURN(status
);
914 /* we won't get a reply to a inform message */
915 state
->ctrl
.send_only
= true;
917 state
->subreq
= wrepl_request_send(state
,
918 state
->wreplconn
->service
->task
->event_ctx
,
919 state
->wreplconn
->sock
, req
, &state
->ctrl
);
920 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
922 tevent_req_set_callback(state
->subreq
,
923 wreplsrv_push_notify_handler_treq
,
926 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
;
931 static NTSTATUS
wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state
*state
)
935 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
936 NT_STATUS_NOT_OK_RETURN(status
);
938 /* is the peer doesn't support inform fallback to update */
939 switch (state
->command
) {
940 case WREPL_REPL_INFORM
:
941 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
942 state
->command
= WREPL_REPL_UPDATE
;
945 case WREPL_REPL_INFORM2
:
946 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
947 state
->command
= WREPL_REPL_UPDATE2
;
954 switch (state
->command
) {
955 case WREPL_REPL_UPDATE
:
956 state
->full_table
= true;
957 return wreplsrv_push_notify_update(state
);
958 case WREPL_REPL_UPDATE2
:
959 state
->full_table
= false;
960 return wreplsrv_push_notify_update(state
);
961 case WREPL_REPL_INFORM
:
962 state
->full_table
= true;
963 return wreplsrv_push_notify_inform(state
);
964 case WREPL_REPL_INFORM2
:
965 state
->full_table
= false;
966 return wreplsrv_push_notify_inform(state
);
968 return NT_STATUS_INTERNAL_ERROR
;
971 return NT_STATUS_INTERNAL_ERROR
;
974 static NTSTATUS
wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state
*state
)
976 struct wreplsrv_in_connection
*wrepl_in
;
977 struct tstream_context
*stream
;
980 status
= wrepl_request_recv(state
->subreq
, state
, NULL
);
981 TALLOC_FREE(state
->subreq
);
982 NT_STATUS_NOT_OK_RETURN(status
);
985 * now we need to convert the wrepl_socket (client connection)
986 * into a wreplsrv_in_connection (server connection), because
987 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
988 * message is received by the peer.
991 status
= wrepl_socket_split_stream(state
->wreplconn
->sock
, state
, &stream
);
992 NT_STATUS_NOT_OK_RETURN(status
);
995 * now create a wreplsrv_in_connection,
996 * on which we act as server
998 * NOTE: stream will be stolen by
999 * wreplsrv_in_connection_merge()
1001 status
= wreplsrv_in_connection_merge(state
->io
->in
.partner
,
1002 state
->wreplconn
->assoc_ctx
.peer_ctx
,
1005 NT_STATUS_NOT_OK_RETURN(status
);
1007 /* now we can free the wreplsrv_out_connection */
1008 TALLOC_FREE(state
->wreplconn
);
1010 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1011 return NT_STATUS_OK
;
1014 static NTSTATUS
wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state
*state
)
1018 status
= wrepl_request_recv(state
->subreq
, state
, NULL
);
1019 TALLOC_FREE(state
->subreq
);
1020 NT_STATUS_NOT_OK_RETURN(status
);
1022 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1026 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state
*state
)
1028 struct composite_context
*c
= state
->c
;
1030 switch (state
->stage
) {
1031 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
:
1032 c
->status
= wreplsrv_push_notify_wait_connect(state
);
1034 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE
:
1035 c
->status
= wreplsrv_push_notify_wait_update(state
);
1037 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
:
1038 c
->status
= wreplsrv_push_notify_wait_inform(state
);
1040 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE
:
1041 c
->status
= NT_STATUS_INTERNAL_ERROR
;
1044 if (state
->stage
== WREPLSRV_PUSH_NOTIFY_STAGE_DONE
) {
1045 c
->state
= COMPOSITE_STATE_DONE
;
1048 if (!NT_STATUS_IS_OK(c
->status
)) {
1049 c
->state
= COMPOSITE_STATE_ERROR
;
1052 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
1057 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
)
1059 struct wreplsrv_push_notify_state
*state
= talloc_get_type(creq
->async
.private_data
,
1060 struct wreplsrv_push_notify_state
);
1061 wreplsrv_push_notify_handler(state
);
1065 static void wreplsrv_push_notify_handler_treq(struct tevent_req
*subreq
)
1067 struct wreplsrv_push_notify_state
*state
= tevent_req_callback_data(subreq
,
1068 struct wreplsrv_push_notify_state
);
1069 wreplsrv_push_notify_handler(state
);
1073 struct composite_context
*wreplsrv_push_notify_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_push_notify_io
*io
)
1075 struct composite_context
*c
= NULL
;
1076 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
1077 struct wreplsrv_push_notify_state
*state
= NULL
;
1078 enum winsrepl_partner_type partner_type
;
1080 c
= talloc_zero(mem_ctx
, struct composite_context
);
1081 if (!c
) goto failed
;
1083 state
= talloc_zero(c
, struct wreplsrv_push_notify_state
);
1084 if (!state
) goto failed
;
1088 if (io
->in
.inform
) {
1089 /* we can cache the connection in partner->push->wreplconn */
1090 partner_type
= WINSREPL_PARTNER_PUSH
;
1091 if (io
->in
.propagate
) {
1092 state
->command
= WREPL_REPL_INFORM2
;
1094 state
->command
= WREPL_REPL_INFORM
;
1097 /* we can NOT cache the connection */
1098 partner_type
= WINSREPL_PARTNER_NONE
;
1099 if (io
->in
.propagate
) {
1100 state
->command
= WREPL_REPL_UPDATE2
;
1102 state
->command
= WREPL_REPL_UPDATE
;
1106 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
1107 c
->event_ctx
= service
->task
->event_ctx
;
1108 c
->private_data
= state
;
1110 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
;
1111 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, NULL
);
1112 if (!state
->creq
) goto failed
;
1114 state
->creq
->async
.fn
= wreplsrv_push_notify_handler_creq
;
1115 state
->creq
->async
.private_data
= state
;
1123 NTSTATUS
wreplsrv_push_notify_recv(struct composite_context
*c
)
1127 status
= composite_wait(c
);