2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
35 enum wreplsrv_out_connect_stage
{
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
,
37 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
,
38 WREPLSRV_OUT_CONNECT_STAGE_DONE
41 struct wreplsrv_out_connect_state
{
42 enum wreplsrv_out_connect_stage stage
;
43 struct composite_context
*c
;
44 struct wrepl_request
*req
;
45 struct composite_context
*c_req
;
46 struct wrepl_associate assoc_io
;
47 enum winsrepl_partner_type type
;
48 struct wreplsrv_out_connection
*wreplconn
;
51 static void wreplsrv_out_connect_handler_creq(struct composite_context
*c_req
);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request
*req
);
54 static NTSTATUS
wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state
*state
)
58 status
= wrepl_connect_recv(state
->c_req
);
59 NT_STATUS_NOT_OK_RETURN(status
);
61 state
->req
= wrepl_associate_send(state
->wreplconn
->sock
, &state
->assoc_io
);
62 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
64 state
->req
->async
.fn
= wreplsrv_out_connect_handler_req
;
65 state
->req
->async
.private = state
;
67 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
;
72 static NTSTATUS
wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state
*state
)
76 status
= wrepl_associate_recv(state
->req
, &state
->assoc_io
);
77 NT_STATUS_NOT_OK_RETURN(status
);
79 state
->wreplconn
->assoc_ctx
.peer_ctx
= state
->assoc_io
.out
.assoc_ctx
;
81 if (state
->type
== WINSREPL_PARTNER_PUSH
) {
82 state
->wreplconn
->partner
->push
.wreplconn
= state
->wreplconn
;
83 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
84 } else if (state
->type
== WINSREPL_PARTNER_PULL
) {
85 state
->wreplconn
->partner
->pull
.wreplconn
= state
->wreplconn
;
86 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
89 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
94 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state
*state
)
96 struct composite_context
*c
= state
->c
;
98 switch (state
->stage
) {
99 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
:
100 c
->status
= wreplsrv_out_connect_wait_socket(state
);
102 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
:
103 c
->status
= wreplsrv_out_connect_wait_assoc_ctx(state
);
104 c
->state
= COMPOSITE_STATE_DONE
;
106 case WREPLSRV_OUT_CONNECT_STAGE_DONE
:
107 c
->status
= NT_STATUS_INTERNAL_ERROR
;
110 if (!NT_STATUS_IS_OK(c
->status
)) {
111 c
->state
= COMPOSITE_STATE_ERROR
;
114 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
119 static void wreplsrv_out_connect_handler_creq(struct composite_context
*creq
)
121 struct wreplsrv_out_connect_state
*state
= talloc_get_type(creq
->async
.private_data
,
122 struct wreplsrv_out_connect_state
);
123 wreplsrv_out_connect_handler(state
);
127 static void wreplsrv_out_connect_handler_req(struct wrepl_request
*req
)
129 struct wreplsrv_out_connect_state
*state
= talloc_get_type(req
->async
.private,
130 struct wreplsrv_out_connect_state
);
131 wreplsrv_out_connect_handler(state
);
135 static struct composite_context
*wreplsrv_out_connect_send(struct wreplsrv_partner
*partner
,
136 enum winsrepl_partner_type type
,
137 struct wreplsrv_out_connection
*wreplconn
)
139 struct composite_context
*c
= NULL
;
140 struct wreplsrv_service
*service
= partner
->service
;
141 struct wreplsrv_out_connect_state
*state
= NULL
;
142 struct wreplsrv_out_connection
**wreplconnp
= &wreplconn
;
143 bool cached_connection
= false;
145 c
= talloc_zero(partner
, struct composite_context
);
148 state
= talloc_zero(c
, struct wreplsrv_out_connect_state
);
149 if (!state
) goto failed
;
153 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
154 c
->event_ctx
= service
->task
->event_ctx
;
155 c
->private_data
= state
;
157 if (type
== WINSREPL_PARTNER_PUSH
) {
158 cached_connection
= true;
159 wreplconn
= partner
->push
.wreplconn
;
160 wreplconnp
= &partner
->push
.wreplconn
;
161 } else if (type
== WINSREPL_PARTNER_PULL
) {
162 cached_connection
= true;
163 wreplconn
= partner
->pull
.wreplconn
;
164 wreplconnp
= &partner
->pull
.wreplconn
;
167 /* we have a connection already, so use it */
169 if (!wreplconn
->sock
->dead
) {
170 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
171 state
->wreplconn
= wreplconn
;
174 } else if (!cached_connection
) {
175 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
176 state
->wreplconn
= NULL
;
180 talloc_free(wreplconn
);
185 wreplconn
= talloc_zero(state
, struct wreplsrv_out_connection
);
186 if (!wreplconn
) goto failed
;
188 wreplconn
->service
= service
;
189 wreplconn
->partner
= partner
;
190 wreplconn
->sock
= wrepl_socket_init(wreplconn
, service
->task
->event_ctx
, lp_iconv_convenience(service
->task
->lp_ctx
));
191 if (!wreplconn
->sock
) goto failed
;
193 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
;
194 state
->wreplconn
= wreplconn
;
195 state
->c_req
= wrepl_connect_send(wreplconn
->sock
,
196 lp_resolve_context(service
->task
->lp_ctx
),
197 partner
->our_address
?partner
->our_address
:wrepl_best_ip(service
->task
->lp_ctx
, partner
->address
),
199 if (!state
->c_req
) goto failed
;
201 state
->c_req
->async
.fn
= wreplsrv_out_connect_handler_creq
;
202 state
->c_req
->async
.private_data
= state
;
210 static NTSTATUS
wreplsrv_out_connect_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
211 struct wreplsrv_out_connection
**wreplconn
)
215 status
= composite_wait(c
);
217 if (NT_STATUS_IS_OK(status
)) {
218 struct wreplsrv_out_connect_state
*state
= talloc_get_type(c
->private_data
,
219 struct wreplsrv_out_connect_state
);
220 if (state
->wreplconn
) {
221 *wreplconn
= talloc_reference(mem_ctx
, state
->wreplconn
);
222 if (!*wreplconn
) status
= NT_STATUS_NO_MEMORY
;
224 status
= NT_STATUS_INVALID_CONNECTION
;
233 struct wreplsrv_pull_table_io
{
235 struct wreplsrv_partner
*partner
;
237 struct wrepl_wins_owner
*owners
;
241 struct wrepl_wins_owner
*owners
;
245 enum wreplsrv_pull_table_stage
{
246 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
,
247 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
,
248 WREPLSRV_PULL_TABLE_STAGE_DONE
251 struct wreplsrv_pull_table_state
{
252 enum wreplsrv_pull_table_stage stage
;
253 struct composite_context
*c
;
254 struct wrepl_request
*req
;
255 struct wrepl_pull_table table_io
;
256 struct wreplsrv_pull_table_io
*io
;
257 struct composite_context
*creq
;
258 struct wreplsrv_out_connection
*wreplconn
;
261 static void wreplsrv_pull_table_handler_req(struct wrepl_request
*req
);
263 static NTSTATUS
wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state
*state
)
267 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
268 NT_STATUS_NOT_OK_RETURN(status
);
270 state
->table_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
271 state
->req
= wrepl_pull_table_send(state
->wreplconn
->sock
, &state
->table_io
);
272 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
274 state
->req
->async
.fn
= wreplsrv_pull_table_handler_req
;
275 state
->req
->async
.private = state
;
277 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
;
282 static NTSTATUS
wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state
*state
)
286 status
= wrepl_pull_table_recv(state
->req
, state
, &state
->table_io
);
287 NT_STATUS_NOT_OK_RETURN(status
);
289 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
294 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state
*state
)
296 struct composite_context
*c
= state
->c
;
298 switch (state
->stage
) {
299 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
:
300 c
->status
= wreplsrv_pull_table_wait_connection(state
);
302 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
:
303 c
->status
= wreplsrv_pull_table_wait_table_reply(state
);
304 c
->state
= COMPOSITE_STATE_DONE
;
306 case WREPLSRV_PULL_TABLE_STAGE_DONE
:
307 c
->status
= NT_STATUS_INTERNAL_ERROR
;
310 if (!NT_STATUS_IS_OK(c
->status
)) {
311 c
->state
= COMPOSITE_STATE_ERROR
;
314 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
319 static void wreplsrv_pull_table_handler_creq(struct composite_context
*creq
)
321 struct wreplsrv_pull_table_state
*state
= talloc_get_type(creq
->async
.private_data
,
322 struct wreplsrv_pull_table_state
);
323 wreplsrv_pull_table_handler(state
);
327 static void wreplsrv_pull_table_handler_req(struct wrepl_request
*req
)
329 struct wreplsrv_pull_table_state
*state
= talloc_get_type(req
->async
.private,
330 struct wreplsrv_pull_table_state
);
331 wreplsrv_pull_table_handler(state
);
335 static struct composite_context
*wreplsrv_pull_table_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_table_io
*io
)
337 struct composite_context
*c
= NULL
;
338 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
339 struct wreplsrv_pull_table_state
*state
= NULL
;
341 c
= talloc_zero(mem_ctx
, struct composite_context
);
344 state
= talloc_zero(c
, struct wreplsrv_pull_table_state
);
345 if (!state
) goto failed
;
349 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
350 c
->event_ctx
= service
->task
->event_ctx
;
351 c
->private_data
= state
;
353 if (io
->in
.num_owners
) {
354 state
->table_io
.out
.num_partners
= io
->in
.num_owners
;
355 state
->table_io
.out
.partners
= io
->in
.owners
;
356 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
361 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
;
362 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, WINSREPL_PARTNER_PULL
, NULL
);
363 if (!state
->creq
) goto failed
;
365 state
->creq
->async
.fn
= wreplsrv_pull_table_handler_creq
;
366 state
->creq
->async
.private_data
= state
;
374 static NTSTATUS
wreplsrv_pull_table_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
375 struct wreplsrv_pull_table_io
*io
)
379 status
= composite_wait(c
);
381 if (NT_STATUS_IS_OK(status
)) {
382 struct wreplsrv_pull_table_state
*state
= talloc_get_type(c
->private_data
,
383 struct wreplsrv_pull_table_state
);
384 io
->out
.num_owners
= state
->table_io
.out
.num_partners
;
385 io
->out
.owners
= talloc_reference(mem_ctx
, state
->table_io
.out
.partners
);
392 struct wreplsrv_pull_names_io
{
394 struct wreplsrv_partner
*partner
;
395 struct wreplsrv_out_connection
*wreplconn
;
396 struct wrepl_wins_owner owner
;
400 struct wrepl_name
*names
;
404 enum wreplsrv_pull_names_stage
{
405 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
,
406 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
,
407 WREPLSRV_PULL_NAMES_STAGE_DONE
410 struct wreplsrv_pull_names_state
{
411 enum wreplsrv_pull_names_stage stage
;
412 struct composite_context
*c
;
413 struct wrepl_request
*req
;
414 struct wrepl_pull_names pull_io
;
415 struct wreplsrv_pull_names_io
*io
;
416 struct composite_context
*creq
;
417 struct wreplsrv_out_connection
*wreplconn
;
420 static void wreplsrv_pull_names_handler_req(struct wrepl_request
*req
);
422 static NTSTATUS
wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state
*state
)
426 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
427 NT_STATUS_NOT_OK_RETURN(status
);
429 state
->pull_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
430 state
->pull_io
.in
.partner
= state
->io
->in
.owner
;
431 state
->req
= wrepl_pull_names_send(state
->wreplconn
->sock
, &state
->pull_io
);
432 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
434 state
->req
->async
.fn
= wreplsrv_pull_names_handler_req
;
435 state
->req
->async
.private = state
;
437 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
;
442 static NTSTATUS
wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state
*state
)
446 status
= wrepl_pull_names_recv(state
->req
, state
, &state
->pull_io
);
447 NT_STATUS_NOT_OK_RETURN(status
);
449 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_DONE
;
454 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state
*state
)
456 struct composite_context
*c
= state
->c
;
458 switch (state
->stage
) {
459 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
:
460 c
->status
= wreplsrv_pull_names_wait_connection(state
);
462 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
:
463 c
->status
= wreplsrv_pull_names_wait_send_reply(state
);
464 c
->state
= COMPOSITE_STATE_DONE
;
466 case WREPLSRV_PULL_NAMES_STAGE_DONE
:
467 c
->status
= NT_STATUS_INTERNAL_ERROR
;
470 if (!NT_STATUS_IS_OK(c
->status
)) {
471 c
->state
= COMPOSITE_STATE_ERROR
;
474 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
479 static void wreplsrv_pull_names_handler_creq(struct composite_context
*creq
)
481 struct wreplsrv_pull_names_state
*state
= talloc_get_type(creq
->async
.private_data
,
482 struct wreplsrv_pull_names_state
);
483 wreplsrv_pull_names_handler(state
);
487 static void wreplsrv_pull_names_handler_req(struct wrepl_request
*req
)
489 struct wreplsrv_pull_names_state
*state
= talloc_get_type(req
->async
.private,
490 struct wreplsrv_pull_names_state
);
491 wreplsrv_pull_names_handler(state
);
495 static struct composite_context
*wreplsrv_pull_names_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_names_io
*io
)
497 struct composite_context
*c
= NULL
;
498 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
499 struct wreplsrv_pull_names_state
*state
= NULL
;
500 enum winsrepl_partner_type partner_type
= WINSREPL_PARTNER_PULL
;
502 if (io
->in
.wreplconn
) partner_type
= WINSREPL_PARTNER_NONE
;
504 c
= talloc_zero(mem_ctx
, struct composite_context
);
507 state
= talloc_zero(c
, struct wreplsrv_pull_names_state
);
508 if (!state
) goto failed
;
512 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
513 c
->event_ctx
= service
->task
->event_ctx
;
514 c
->private_data
= state
;
516 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
;
517 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, io
->in
.wreplconn
);
518 if (!state
->creq
) goto failed
;
520 state
->creq
->async
.fn
= wreplsrv_pull_names_handler_creq
;
521 state
->creq
->async
.private_data
= state
;
529 static NTSTATUS
wreplsrv_pull_names_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
530 struct wreplsrv_pull_names_io
*io
)
534 status
= composite_wait(c
);
536 if (NT_STATUS_IS_OK(status
)) {
537 struct wreplsrv_pull_names_state
*state
= talloc_get_type(c
->private_data
,
538 struct wreplsrv_pull_names_state
);
539 io
->out
.num_names
= state
->pull_io
.out
.num_names
;
540 io
->out
.names
= talloc_reference(mem_ctx
, state
->pull_io
.out
.names
);
548 enum wreplsrv_pull_cycle_stage
{
549 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
,
550 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
,
551 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
,
552 WREPLSRV_PULL_CYCLE_STAGE_DONE
555 struct wreplsrv_pull_cycle_state
{
556 enum wreplsrv_pull_cycle_stage stage
;
557 struct composite_context
*c
;
558 struct wreplsrv_pull_cycle_io
*io
;
559 struct wreplsrv_pull_table_io table_io
;
561 struct wreplsrv_pull_names_io names_io
;
562 struct composite_context
*creq
;
563 struct wrepl_associate_stop assoc_stop_io
;
564 struct wrepl_request
*req
;
567 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
);
568 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request
*req
);
570 static NTSTATUS
wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state
*state
)
572 struct wreplsrv_owner
*current_owner
=NULL
;
573 struct wreplsrv_owner
*local_owner
;
575 uint64_t old_max_version
= 0;
576 bool do_pull
= false;
578 for (i
=state
->current
; i
< state
->table_io
.out
.num_owners
; i
++) {
579 current_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
580 state
->io
->in
.partner
->pull
.table
,
581 state
->table_io
.out
.owners
[i
].address
);
583 local_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
584 state
->io
->in
.partner
->service
->table
,
585 state
->table_io
.out
.owners
[i
].address
);
587 * this means we are ourself the current owner,
588 * and we don't want replicate ourself
590 if (!current_owner
) continue;
593 * this means we don't have any records of this owner
603 * this means the remote partner has some new records of this owner
606 if (current_owner
->owner
.max_version
> local_owner
->owner
.max_version
) {
608 old_max_version
= local_owner
->owner
.max_version
;
615 state
->names_io
.in
.partner
= state
->io
->in
.partner
;
616 state
->names_io
.in
.wreplconn
= state
->io
->in
.wreplconn
;
617 state
->names_io
.in
.owner
= current_owner
->owner
;
618 state
->names_io
.in
.owner
.min_version
= old_max_version
+ 1;
619 state
->creq
= wreplsrv_pull_names_send(state
, &state
->names_io
);
620 NT_STATUS_HAVE_NO_MEMORY(state
->creq
);
622 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
623 state
->creq
->async
.private_data
= state
;
625 return STATUS_MORE_ENTRIES
;
631 static NTSTATUS
wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state
*state
)
635 status
= wreplsrv_pull_cycle_next_owner_do_work(state
);
636 if (NT_STATUS_IS_OK(status
)) {
637 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
638 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES
, status
)) {
639 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
;
640 status
= NT_STATUS_OK
;
643 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
&& state
->io
->in
.wreplconn
) {
644 state
->assoc_stop_io
.in
.assoc_ctx
= state
->io
->in
.wreplconn
->assoc_ctx
.peer_ctx
;
645 state
->assoc_stop_io
.in
.reason
= 0;
646 state
->req
= wrepl_associate_stop_send(state
->io
->in
.wreplconn
->sock
, &state
->assoc_stop_io
);
647 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
649 state
->req
->async
.fn
= wreplsrv_pull_cycle_handler_req
;
650 state
->req
->async
.private = state
;
652 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
;
658 static NTSTATUS
wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state
*state
)
663 status
= wreplsrv_pull_table_recv(state
->creq
, state
, &state
->table_io
);
664 NT_STATUS_NOT_OK_RETURN(status
);
666 /* update partner table */
667 for (i
=0; i
< state
->table_io
.out
.num_owners
; i
++) {
668 status
= wreplsrv_add_table(state
->io
->in
.partner
->service
,
669 state
->io
->in
.partner
,
670 &state
->io
->in
.partner
->pull
.table
,
671 state
->table_io
.out
.owners
[i
].address
,
672 state
->table_io
.out
.owners
[i
].max_version
);
673 NT_STATUS_NOT_OK_RETURN(status
);
676 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
677 NT_STATUS_NOT_OK_RETURN(status
);
682 static NTSTATUS
wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state
*state
)
686 status
= wreplsrv_apply_records(state
->io
->in
.partner
,
687 &state
->names_io
.in
.owner
,
688 state
->names_io
.out
.num_names
,
689 state
->names_io
.out
.names
);
690 NT_STATUS_NOT_OK_RETURN(status
);
692 talloc_free(state
->names_io
.out
.names
);
693 ZERO_STRUCT(state
->names_io
);
698 static NTSTATUS
wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state
*state
)
702 status
= wreplsrv_pull_names_recv(state
->creq
, state
, &state
->names_io
);
703 NT_STATUS_NOT_OK_RETURN(status
);
706 * TODO: this should maybe an async call,
707 * because we may need some network access
708 * for conflict resolving
710 status
= wreplsrv_pull_cycle_apply_records(state
);
711 NT_STATUS_NOT_OK_RETURN(status
);
713 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
714 NT_STATUS_NOT_OK_RETURN(status
);
719 static NTSTATUS
wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state
*state
)
723 status
= wrepl_associate_stop_recv(state
->req
, &state
->assoc_stop_io
);
724 NT_STATUS_NOT_OK_RETURN(status
);
726 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
731 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state
*state
)
733 struct composite_context
*c
= state
->c
;
735 switch (state
->stage
) {
736 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
:
737 c
->status
= wreplsrv_pull_cycle_wait_table_reply(state
);
739 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
:
740 c
->status
= wreplsrv_pull_cycle_wait_send_replies(state
);
742 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
:
743 c
->status
= wreplsrv_pull_cycle_wait_stop_assoc(state
);
745 case WREPLSRV_PULL_CYCLE_STAGE_DONE
:
746 c
->status
= NT_STATUS_INTERNAL_ERROR
;
749 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
) {
750 c
->state
= COMPOSITE_STATE_DONE
;
753 if (!NT_STATUS_IS_OK(c
->status
)) {
754 c
->state
= COMPOSITE_STATE_ERROR
;
757 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
762 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
)
764 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(creq
->async
.private_data
,
765 struct wreplsrv_pull_cycle_state
);
766 wreplsrv_pull_cycle_handler(state
);
770 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request
*req
)
772 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(req
->async
.private,
773 struct wreplsrv_pull_cycle_state
);
774 wreplsrv_pull_cycle_handler(state
);
778 struct composite_context
*wreplsrv_pull_cycle_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_cycle_io
*io
)
780 struct composite_context
*c
= NULL
;
781 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
782 struct wreplsrv_pull_cycle_state
*state
= NULL
;
784 c
= talloc_zero(mem_ctx
, struct composite_context
);
787 state
= talloc_zero(c
, struct wreplsrv_pull_cycle_state
);
788 if (!state
) goto failed
;
792 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
793 c
->event_ctx
= service
->task
->event_ctx
;
794 c
->private_data
= state
;
796 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
;
797 state
->table_io
.in
.partner
= io
->in
.partner
;
798 state
->table_io
.in
.num_owners
= io
->in
.num_owners
;
799 state
->table_io
.in
.owners
= io
->in
.owners
;
800 state
->creq
= wreplsrv_pull_table_send(state
, &state
->table_io
);
801 if (!state
->creq
) goto failed
;
803 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
804 state
->creq
->async
.private_data
= state
;
812 NTSTATUS
wreplsrv_pull_cycle_recv(struct composite_context
*c
)
816 status
= composite_wait(c
);
822 enum wreplsrv_push_notify_stage
{
823 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
,
824 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
,
825 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
828 struct wreplsrv_push_notify_state
{
829 enum wreplsrv_push_notify_stage stage
;
830 struct composite_context
*c
;
831 struct wreplsrv_push_notify_io
*io
;
832 enum wrepl_replication_cmd command
;
834 struct wrepl_send_ctrl ctrl
;
835 struct wrepl_request
*req
;
836 struct wrepl_packet req_packet
;
837 struct wrepl_packet
*rep_packet
;
838 struct composite_context
*creq
;
839 struct wreplsrv_out_connection
*wreplconn
;
842 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
);
843 static void wreplsrv_push_notify_handler_req(struct wrepl_request
*req
);
845 static NTSTATUS
wreplsrv_push_notify_update(struct wreplsrv_push_notify_state
*state
)
847 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
848 struct wrepl_packet
*req
= &state
->req_packet
;
849 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
850 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
851 struct wreplsrv_in_connection
*wrepl_in
;
853 struct socket_context
*sock
;
854 struct packet_context
*packet
;
857 /* prepare the outgoing request */
858 req
->opcode
= WREPL_OPCODE_BITS
;
859 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
860 req
->mess_type
= WREPL_REPLICATION
;
862 repl_out
->command
= state
->command
;
864 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
865 service
->wins_db
->local_owner
, state
->full_table
);
866 NT_STATUS_NOT_OK_RETURN(status
);
868 /* queue the request */
869 state
->req
= wrepl_request_send(state
->wreplconn
->sock
, req
, NULL
);
870 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
873 * now we need to convert the wrepl_socket (client connection)
874 * into a wreplsrv_in_connection (server connection), because
875 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
876 * message is received by the peer.
879 /* steal the socket_context */
880 sock
= state
->wreplconn
->sock
->sock
;
881 state
->wreplconn
->sock
->sock
= NULL
;
882 talloc_steal(state
, sock
);
885 * steal the packet_context
886 * note the request DATA_BLOB we just send on the
887 * wrepl_socket (client connection) is still unter the
888 * packet context and will be send to the wire
890 packet
= state
->wreplconn
->sock
->packet
;
891 state
->wreplconn
->sock
->packet
= NULL
;
892 talloc_steal(state
, packet
);
895 * get the fde_flags of the old fde event,
896 * so that we can later set the same flags to the new one
898 fde_flags
= event_get_fd_flags(state
->wreplconn
->sock
->event
.fde
);
901 * free the wrepl_socket (client connection)
903 talloc_free(state
->wreplconn
->sock
);
904 state
->wreplconn
->sock
= NULL
;
907 * now create a wreplsrv_in_connection,
908 * on which we act as server
910 * NOTE: sock and packet will be stolen by
911 * wreplsrv_in_connection_merge()
913 status
= wreplsrv_in_connection_merge(state
->io
->in
.partner
,
914 sock
, packet
, &wrepl_in
);
915 NT_STATUS_NOT_OK_RETURN(status
);
917 event_set_fd_flags(wrepl_in
->conn
->event
.fde
, fde_flags
);
919 wrepl_in
->assoc_ctx
.peer_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
920 wrepl_in
->assoc_ctx
.our_ctx
= 0;
922 /* now we can free the wreplsrv_out_connection */
923 talloc_free(state
->wreplconn
);
924 state
->wreplconn
= NULL
;
926 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
931 static NTSTATUS
wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state
*state
)
933 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
934 struct wrepl_packet
*req
= &state
->req_packet
;
935 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
936 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
939 req
->opcode
= WREPL_OPCODE_BITS
;
940 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
941 req
->mess_type
= WREPL_REPLICATION
;
943 repl_out
->command
= state
->command
;
945 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
946 service
->wins_db
->local_owner
, state
->full_table
);
947 NT_STATUS_NOT_OK_RETURN(status
);
949 /* we won't get a reply to a inform message */
950 state
->ctrl
.send_only
= true;
952 state
->req
= wrepl_request_send(state
->wreplconn
->sock
, req
, &state
->ctrl
);
953 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
955 state
->req
->async
.fn
= wreplsrv_push_notify_handler_req
;
956 state
->req
->async
.private = state
;
958 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
;
963 static NTSTATUS
wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state
*state
)
967 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
968 NT_STATUS_NOT_OK_RETURN(status
);
970 switch (state
->command
) {
971 case WREPL_REPL_UPDATE
:
972 state
->full_table
= true;
973 return wreplsrv_push_notify_update(state
);
974 case WREPL_REPL_UPDATE2
:
975 state
->full_table
= false;
976 return wreplsrv_push_notify_update(state
);
977 case WREPL_REPL_INFORM
:
978 state
->full_table
= true;
979 return wreplsrv_push_notify_inform(state
);
980 case WREPL_REPL_INFORM2
:
981 state
->full_table
= false;
982 return wreplsrv_push_notify_inform(state
);
984 return NT_STATUS_INTERNAL_ERROR
;
987 return NT_STATUS_INTERNAL_ERROR
;
990 static NTSTATUS
wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state
*state
)
994 status
= wrepl_request_recv(state
->req
, state
, NULL
);
995 NT_STATUS_NOT_OK_RETURN(status
);
997 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1001 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state
*state
)
1003 struct composite_context
*c
= state
->c
;
1005 switch (state
->stage
) {
1006 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
:
1007 c
->status
= wreplsrv_push_notify_wait_connect(state
);
1009 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
:
1010 c
->status
= wreplsrv_push_notify_wait_inform(state
);
1012 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE
:
1013 c
->status
= NT_STATUS_INTERNAL_ERROR
;
1016 if (state
->stage
== WREPLSRV_PUSH_NOTIFY_STAGE_DONE
) {
1017 c
->state
= COMPOSITE_STATE_DONE
;
1020 if (!NT_STATUS_IS_OK(c
->status
)) {
1021 c
->state
= COMPOSITE_STATE_ERROR
;
1024 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
1029 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
)
1031 struct wreplsrv_push_notify_state
*state
= talloc_get_type(creq
->async
.private_data
,
1032 struct wreplsrv_push_notify_state
);
1033 wreplsrv_push_notify_handler(state
);
1037 static void wreplsrv_push_notify_handler_req(struct wrepl_request
*req
)
1039 struct wreplsrv_push_notify_state
*state
= talloc_get_type(req
->async
.private,
1040 struct wreplsrv_push_notify_state
);
1041 wreplsrv_push_notify_handler(state
);
1045 struct composite_context
*wreplsrv_push_notify_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_push_notify_io
*io
)
1047 struct composite_context
*c
= NULL
;
1048 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
1049 struct wreplsrv_push_notify_state
*state
= NULL
;
1050 enum winsrepl_partner_type partner_type
;
1052 c
= talloc_zero(mem_ctx
, struct composite_context
);
1053 if (!c
) goto failed
;
1055 state
= talloc_zero(c
, struct wreplsrv_push_notify_state
);
1056 if (!state
) goto failed
;
1060 if (io
->in
.inform
) {
1061 /* we can cache the connection in partner->push->wreplconn */
1062 partner_type
= WINSREPL_PARTNER_PUSH
;
1063 if (io
->in
.propagate
) {
1064 state
->command
= WREPL_REPL_INFORM2
;
1066 state
->command
= WREPL_REPL_INFORM
;
1069 /* we can NOT cache the connection */
1070 partner_type
= WINSREPL_PARTNER_NONE
;
1071 if (io
->in
.propagate
) {
1072 state
->command
= WREPL_REPL_UPDATE2
;
1074 state
->command
= WREPL_REPL_UPDATE
;
1078 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
1079 c
->event_ctx
= service
->task
->event_ctx
;
1080 c
->private_data
= state
;
1082 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
;
1083 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, NULL
);
1084 if (!state
->creq
) goto failed
;
1086 state
->creq
->async
.fn
= wreplsrv_push_notify_handler_creq
;
1087 state
->creq
->async
.private_data
= state
;
1095 NTSTATUS
wreplsrv_push_notify_recv(struct composite_context
*c
)
1099 status
= composite_wait(c
);