2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
35 enum wreplsrv_out_connect_stage
{
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
,
37 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
,
38 WREPLSRV_OUT_CONNECT_STAGE_DONE
41 struct wreplsrv_out_connect_state
{
42 enum wreplsrv_out_connect_stage stage
;
43 struct composite_context
*c
;
44 struct wrepl_request
*req
;
45 struct composite_context
*c_req
;
46 struct wrepl_associate assoc_io
;
47 enum winsrepl_partner_type type
;
48 struct wreplsrv_out_connection
*wreplconn
;
51 static void wreplsrv_out_connect_handler_creq(struct composite_context
*c_req
);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request
*req
);
54 static NTSTATUS
wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state
*state
)
58 status
= wrepl_connect_recv(state
->c_req
);
59 NT_STATUS_NOT_OK_RETURN(status
);
61 state
->req
= wrepl_associate_send(state
->wreplconn
->sock
, &state
->assoc_io
);
62 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
64 state
->req
->async
.fn
= wreplsrv_out_connect_handler_req
;
65 state
->req
->async
.private_data
= state
;
67 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
;
72 static NTSTATUS
wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state
*state
)
76 status
= wrepl_associate_recv(state
->req
, &state
->assoc_io
);
77 NT_STATUS_NOT_OK_RETURN(status
);
79 state
->wreplconn
->assoc_ctx
.peer_ctx
= state
->assoc_io
.out
.assoc_ctx
;
80 state
->wreplconn
->assoc_ctx
.peer_major
= state
->assoc_io
.out
.major_version
;
82 if (state
->type
== WINSREPL_PARTNER_PUSH
) {
83 if (state
->wreplconn
->assoc_ctx
.peer_major
>= 5) {
84 state
->wreplconn
->partner
->push
.wreplconn
= state
->wreplconn
;
85 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
87 state
->type
= WINSREPL_PARTNER_NONE
;
89 } else if (state
->type
== WINSREPL_PARTNER_PULL
) {
90 state
->wreplconn
->partner
->pull
.wreplconn
= state
->wreplconn
;
91 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
94 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
99 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state
*state
)
101 struct composite_context
*c
= state
->c
;
103 switch (state
->stage
) {
104 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
:
105 c
->status
= wreplsrv_out_connect_wait_socket(state
);
107 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
:
108 c
->status
= wreplsrv_out_connect_wait_assoc_ctx(state
);
109 c
->state
= COMPOSITE_STATE_DONE
;
111 case WREPLSRV_OUT_CONNECT_STAGE_DONE
:
112 c
->status
= NT_STATUS_INTERNAL_ERROR
;
115 if (!NT_STATUS_IS_OK(c
->status
)) {
116 c
->state
= COMPOSITE_STATE_ERROR
;
119 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
124 static void wreplsrv_out_connect_handler_creq(struct composite_context
*creq
)
126 struct wreplsrv_out_connect_state
*state
= talloc_get_type(creq
->async
.private_data
,
127 struct wreplsrv_out_connect_state
);
128 wreplsrv_out_connect_handler(state
);
132 static void wreplsrv_out_connect_handler_req(struct wrepl_request
*req
)
134 struct wreplsrv_out_connect_state
*state
= talloc_get_type(req
->async
.private_data
,
135 struct wreplsrv_out_connect_state
);
136 wreplsrv_out_connect_handler(state
);
140 static struct composite_context
*wreplsrv_out_connect_send(struct wreplsrv_partner
*partner
,
141 enum winsrepl_partner_type type
,
142 struct wreplsrv_out_connection
*wreplconn
)
144 struct composite_context
*c
= NULL
;
145 struct wreplsrv_service
*service
= partner
->service
;
146 struct wreplsrv_out_connect_state
*state
= NULL
;
147 struct wreplsrv_out_connection
**wreplconnp
= &wreplconn
;
148 bool cached_connection
= false;
150 c
= talloc_zero(partner
, struct composite_context
);
153 state
= talloc_zero(c
, struct wreplsrv_out_connect_state
);
154 if (!state
) goto failed
;
158 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
159 c
->event_ctx
= service
->task
->event_ctx
;
160 c
->private_data
= state
;
162 if (type
== WINSREPL_PARTNER_PUSH
) {
163 cached_connection
= true;
164 wreplconn
= partner
->push
.wreplconn
;
165 wreplconnp
= &partner
->push
.wreplconn
;
166 } else if (type
== WINSREPL_PARTNER_PULL
) {
167 cached_connection
= true;
168 wreplconn
= partner
->pull
.wreplconn
;
169 wreplconnp
= &partner
->pull
.wreplconn
;
172 /* we have a connection already, so use it */
174 if (!wreplconn
->sock
->dead
) {
175 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
176 state
->wreplconn
= wreplconn
;
179 } else if (!cached_connection
) {
180 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
181 state
->wreplconn
= NULL
;
185 talloc_free(wreplconn
);
190 wreplconn
= talloc_zero(state
, struct wreplsrv_out_connection
);
191 if (!wreplconn
) goto failed
;
193 wreplconn
->service
= service
;
194 wreplconn
->partner
= partner
;
195 wreplconn
->sock
= wrepl_socket_init(wreplconn
, service
->task
->event_ctx
, lp_iconv_convenience(service
->task
->lp_ctx
));
196 if (!wreplconn
->sock
) goto failed
;
198 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
;
199 state
->wreplconn
= wreplconn
;
200 state
->c_req
= wrepl_connect_send(wreplconn
->sock
,
201 partner
->our_address
?partner
->our_address
:wrepl_best_ip(service
->task
->lp_ctx
, partner
->address
),
203 if (!state
->c_req
) goto failed
;
205 state
->c_req
->async
.fn
= wreplsrv_out_connect_handler_creq
;
206 state
->c_req
->async
.private_data
= state
;
214 static NTSTATUS
wreplsrv_out_connect_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
215 struct wreplsrv_out_connection
**wreplconn
)
219 status
= composite_wait(c
);
221 if (NT_STATUS_IS_OK(status
)) {
222 struct wreplsrv_out_connect_state
*state
= talloc_get_type(c
->private_data
,
223 struct wreplsrv_out_connect_state
);
224 if (state
->wreplconn
) {
225 *wreplconn
= talloc_reference(mem_ctx
, state
->wreplconn
);
226 if (!*wreplconn
) status
= NT_STATUS_NO_MEMORY
;
228 status
= NT_STATUS_INVALID_CONNECTION
;
237 struct wreplsrv_pull_table_io
{
239 struct wreplsrv_partner
*partner
;
241 struct wrepl_wins_owner
*owners
;
245 struct wrepl_wins_owner
*owners
;
249 enum wreplsrv_pull_table_stage
{
250 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
,
251 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
,
252 WREPLSRV_PULL_TABLE_STAGE_DONE
255 struct wreplsrv_pull_table_state
{
256 enum wreplsrv_pull_table_stage stage
;
257 struct composite_context
*c
;
258 struct wrepl_request
*req
;
259 struct wrepl_pull_table table_io
;
260 struct wreplsrv_pull_table_io
*io
;
261 struct composite_context
*creq
;
262 struct wreplsrv_out_connection
*wreplconn
;
265 static void wreplsrv_pull_table_handler_req(struct wrepl_request
*req
);
267 static NTSTATUS
wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state
*state
)
271 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
272 NT_STATUS_NOT_OK_RETURN(status
);
274 state
->table_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
275 state
->req
= wrepl_pull_table_send(state
->wreplconn
->sock
, &state
->table_io
);
276 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
278 state
->req
->async
.fn
= wreplsrv_pull_table_handler_req
;
279 state
->req
->async
.private_data
= state
;
281 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
;
286 static NTSTATUS
wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state
*state
)
290 status
= wrepl_pull_table_recv(state
->req
, state
, &state
->table_io
);
291 NT_STATUS_NOT_OK_RETURN(status
);
293 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
298 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state
*state
)
300 struct composite_context
*c
= state
->c
;
302 switch (state
->stage
) {
303 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
:
304 c
->status
= wreplsrv_pull_table_wait_connection(state
);
306 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
:
307 c
->status
= wreplsrv_pull_table_wait_table_reply(state
);
308 c
->state
= COMPOSITE_STATE_DONE
;
310 case WREPLSRV_PULL_TABLE_STAGE_DONE
:
311 c
->status
= NT_STATUS_INTERNAL_ERROR
;
314 if (!NT_STATUS_IS_OK(c
->status
)) {
315 c
->state
= COMPOSITE_STATE_ERROR
;
318 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
323 static void wreplsrv_pull_table_handler_creq(struct composite_context
*creq
)
325 struct wreplsrv_pull_table_state
*state
= talloc_get_type(creq
->async
.private_data
,
326 struct wreplsrv_pull_table_state
);
327 wreplsrv_pull_table_handler(state
);
331 static void wreplsrv_pull_table_handler_req(struct wrepl_request
*req
)
333 struct wreplsrv_pull_table_state
*state
= talloc_get_type(req
->async
.private_data
,
334 struct wreplsrv_pull_table_state
);
335 wreplsrv_pull_table_handler(state
);
339 static struct composite_context
*wreplsrv_pull_table_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_table_io
*io
)
341 struct composite_context
*c
= NULL
;
342 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
343 struct wreplsrv_pull_table_state
*state
= NULL
;
345 c
= talloc_zero(mem_ctx
, struct composite_context
);
348 state
= talloc_zero(c
, struct wreplsrv_pull_table_state
);
349 if (!state
) goto failed
;
353 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
354 c
->event_ctx
= service
->task
->event_ctx
;
355 c
->private_data
= state
;
357 if (io
->in
.num_owners
) {
358 state
->table_io
.out
.num_partners
= io
->in
.num_owners
;
359 state
->table_io
.out
.partners
= io
->in
.owners
;
360 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
365 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
;
366 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, WINSREPL_PARTNER_PULL
, NULL
);
367 if (!state
->creq
) goto failed
;
369 state
->creq
->async
.fn
= wreplsrv_pull_table_handler_creq
;
370 state
->creq
->async
.private_data
= state
;
378 static NTSTATUS
wreplsrv_pull_table_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
379 struct wreplsrv_pull_table_io
*io
)
383 status
= composite_wait(c
);
385 if (NT_STATUS_IS_OK(status
)) {
386 struct wreplsrv_pull_table_state
*state
= talloc_get_type(c
->private_data
,
387 struct wreplsrv_pull_table_state
);
388 io
->out
.num_owners
= state
->table_io
.out
.num_partners
;
389 io
->out
.owners
= talloc_reference(mem_ctx
, state
->table_io
.out
.partners
);
396 struct wreplsrv_pull_names_io
{
398 struct wreplsrv_partner
*partner
;
399 struct wreplsrv_out_connection
*wreplconn
;
400 struct wrepl_wins_owner owner
;
404 struct wrepl_name
*names
;
408 enum wreplsrv_pull_names_stage
{
409 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
,
410 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
,
411 WREPLSRV_PULL_NAMES_STAGE_DONE
414 struct wreplsrv_pull_names_state
{
415 enum wreplsrv_pull_names_stage stage
;
416 struct composite_context
*c
;
417 struct wrepl_request
*req
;
418 struct wrepl_pull_names pull_io
;
419 struct wreplsrv_pull_names_io
*io
;
420 struct composite_context
*creq
;
421 struct wreplsrv_out_connection
*wreplconn
;
424 static void wreplsrv_pull_names_handler_req(struct wrepl_request
*req
);
426 static NTSTATUS
wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state
*state
)
430 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
431 NT_STATUS_NOT_OK_RETURN(status
);
433 state
->pull_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
434 state
->pull_io
.in
.partner
= state
->io
->in
.owner
;
435 state
->req
= wrepl_pull_names_send(state
->wreplconn
->sock
, &state
->pull_io
);
436 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
438 state
->req
->async
.fn
= wreplsrv_pull_names_handler_req
;
439 state
->req
->async
.private_data
= state
;
441 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
;
446 static NTSTATUS
wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state
*state
)
450 status
= wrepl_pull_names_recv(state
->req
, state
, &state
->pull_io
);
451 NT_STATUS_NOT_OK_RETURN(status
);
453 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_DONE
;
458 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state
*state
)
460 struct composite_context
*c
= state
->c
;
462 switch (state
->stage
) {
463 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
:
464 c
->status
= wreplsrv_pull_names_wait_connection(state
);
466 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
:
467 c
->status
= wreplsrv_pull_names_wait_send_reply(state
);
468 c
->state
= COMPOSITE_STATE_DONE
;
470 case WREPLSRV_PULL_NAMES_STAGE_DONE
:
471 c
->status
= NT_STATUS_INTERNAL_ERROR
;
474 if (!NT_STATUS_IS_OK(c
->status
)) {
475 c
->state
= COMPOSITE_STATE_ERROR
;
478 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
483 static void wreplsrv_pull_names_handler_creq(struct composite_context
*creq
)
485 struct wreplsrv_pull_names_state
*state
= talloc_get_type(creq
->async
.private_data
,
486 struct wreplsrv_pull_names_state
);
487 wreplsrv_pull_names_handler(state
);
491 static void wreplsrv_pull_names_handler_req(struct wrepl_request
*req
)
493 struct wreplsrv_pull_names_state
*state
= talloc_get_type(req
->async
.private_data
,
494 struct wreplsrv_pull_names_state
);
495 wreplsrv_pull_names_handler(state
);
499 static struct composite_context
*wreplsrv_pull_names_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_names_io
*io
)
501 struct composite_context
*c
= NULL
;
502 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
503 struct wreplsrv_pull_names_state
*state
= NULL
;
504 enum winsrepl_partner_type partner_type
= WINSREPL_PARTNER_PULL
;
506 if (io
->in
.wreplconn
) partner_type
= WINSREPL_PARTNER_NONE
;
508 c
= talloc_zero(mem_ctx
, struct composite_context
);
511 state
= talloc_zero(c
, struct wreplsrv_pull_names_state
);
512 if (!state
) goto failed
;
516 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
517 c
->event_ctx
= service
->task
->event_ctx
;
518 c
->private_data
= state
;
520 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
;
521 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, io
->in
.wreplconn
);
522 if (!state
->creq
) goto failed
;
524 state
->creq
->async
.fn
= wreplsrv_pull_names_handler_creq
;
525 state
->creq
->async
.private_data
= state
;
533 static NTSTATUS
wreplsrv_pull_names_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
534 struct wreplsrv_pull_names_io
*io
)
538 status
= composite_wait(c
);
540 if (NT_STATUS_IS_OK(status
)) {
541 struct wreplsrv_pull_names_state
*state
= talloc_get_type(c
->private_data
,
542 struct wreplsrv_pull_names_state
);
543 io
->out
.num_names
= state
->pull_io
.out
.num_names
;
544 io
->out
.names
= talloc_reference(mem_ctx
, state
->pull_io
.out
.names
);
552 enum wreplsrv_pull_cycle_stage
{
553 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
,
554 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
,
555 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
,
556 WREPLSRV_PULL_CYCLE_STAGE_DONE
559 struct wreplsrv_pull_cycle_state
{
560 enum wreplsrv_pull_cycle_stage stage
;
561 struct composite_context
*c
;
562 struct wreplsrv_pull_cycle_io
*io
;
563 struct wreplsrv_pull_table_io table_io
;
565 struct wreplsrv_pull_names_io names_io
;
566 struct composite_context
*creq
;
567 struct wrepl_associate_stop assoc_stop_io
;
568 struct wrepl_request
*req
;
571 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
);
572 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request
*req
);
574 static NTSTATUS
wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state
*state
)
576 struct wreplsrv_owner
*current_owner
=NULL
;
577 struct wreplsrv_owner
*local_owner
;
579 uint64_t old_max_version
= 0;
580 bool do_pull
= false;
582 for (i
=state
->current
; i
< state
->table_io
.out
.num_owners
; i
++) {
583 current_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
584 state
->io
->in
.partner
->pull
.table
,
585 state
->table_io
.out
.owners
[i
].address
);
587 local_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
588 state
->io
->in
.partner
->service
->table
,
589 state
->table_io
.out
.owners
[i
].address
);
591 * this means we are ourself the current owner,
592 * and we don't want replicate ourself
594 if (!current_owner
) continue;
597 * this means we don't have any records of this owner
607 * this means the remote partner has some new records of this owner
610 if (current_owner
->owner
.max_version
> local_owner
->owner
.max_version
) {
612 old_max_version
= local_owner
->owner
.max_version
;
619 state
->names_io
.in
.partner
= state
->io
->in
.partner
;
620 state
->names_io
.in
.wreplconn
= state
->io
->in
.wreplconn
;
621 state
->names_io
.in
.owner
= current_owner
->owner
;
622 state
->names_io
.in
.owner
.min_version
= old_max_version
+ 1;
623 state
->creq
= wreplsrv_pull_names_send(state
, &state
->names_io
);
624 NT_STATUS_HAVE_NO_MEMORY(state
->creq
);
626 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
627 state
->creq
->async
.private_data
= state
;
629 return STATUS_MORE_ENTRIES
;
635 static NTSTATUS
wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state
*state
)
639 status
= wreplsrv_pull_cycle_next_owner_do_work(state
);
640 if (NT_STATUS_IS_OK(status
)) {
641 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
642 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES
, status
)) {
643 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
;
644 status
= NT_STATUS_OK
;
647 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
&& state
->io
->in
.wreplconn
) {
648 state
->assoc_stop_io
.in
.assoc_ctx
= state
->io
->in
.wreplconn
->assoc_ctx
.peer_ctx
;
649 state
->assoc_stop_io
.in
.reason
= 0;
650 state
->req
= wrepl_associate_stop_send(state
->io
->in
.wreplconn
->sock
, &state
->assoc_stop_io
);
651 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
653 state
->req
->async
.fn
= wreplsrv_pull_cycle_handler_req
;
654 state
->req
->async
.private_data
= state
;
656 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
;
662 static NTSTATUS
wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state
*state
)
667 status
= wreplsrv_pull_table_recv(state
->creq
, state
, &state
->table_io
);
668 NT_STATUS_NOT_OK_RETURN(status
);
670 /* update partner table */
671 for (i
=0; i
< state
->table_io
.out
.num_owners
; i
++) {
672 status
= wreplsrv_add_table(state
->io
->in
.partner
->service
,
673 state
->io
->in
.partner
,
674 &state
->io
->in
.partner
->pull
.table
,
675 state
->table_io
.out
.owners
[i
].address
,
676 state
->table_io
.out
.owners
[i
].max_version
);
677 NT_STATUS_NOT_OK_RETURN(status
);
680 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
681 NT_STATUS_NOT_OK_RETURN(status
);
686 static NTSTATUS
wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state
*state
)
690 status
= wreplsrv_apply_records(state
->io
->in
.partner
,
691 &state
->names_io
.in
.owner
,
692 state
->names_io
.out
.num_names
,
693 state
->names_io
.out
.names
);
694 NT_STATUS_NOT_OK_RETURN(status
);
696 talloc_free(state
->names_io
.out
.names
);
697 ZERO_STRUCT(state
->names_io
);
702 static NTSTATUS
wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state
*state
)
706 status
= wreplsrv_pull_names_recv(state
->creq
, state
, &state
->names_io
);
707 NT_STATUS_NOT_OK_RETURN(status
);
710 * TODO: this should maybe an async call,
711 * because we may need some network access
712 * for conflict resolving
714 status
= wreplsrv_pull_cycle_apply_records(state
);
715 NT_STATUS_NOT_OK_RETURN(status
);
717 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
718 NT_STATUS_NOT_OK_RETURN(status
);
723 static NTSTATUS
wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state
*state
)
727 status
= wrepl_associate_stop_recv(state
->req
, &state
->assoc_stop_io
);
728 NT_STATUS_NOT_OK_RETURN(status
);
730 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
735 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state
*state
)
737 struct composite_context
*c
= state
->c
;
739 switch (state
->stage
) {
740 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
:
741 c
->status
= wreplsrv_pull_cycle_wait_table_reply(state
);
743 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
:
744 c
->status
= wreplsrv_pull_cycle_wait_send_replies(state
);
746 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
:
747 c
->status
= wreplsrv_pull_cycle_wait_stop_assoc(state
);
749 case WREPLSRV_PULL_CYCLE_STAGE_DONE
:
750 c
->status
= NT_STATUS_INTERNAL_ERROR
;
753 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
) {
754 c
->state
= COMPOSITE_STATE_DONE
;
757 if (!NT_STATUS_IS_OK(c
->status
)) {
758 c
->state
= COMPOSITE_STATE_ERROR
;
761 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
766 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
)
768 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(creq
->async
.private_data
,
769 struct wreplsrv_pull_cycle_state
);
770 wreplsrv_pull_cycle_handler(state
);
774 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request
*req
)
776 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(req
->async
.private_data
,
777 struct wreplsrv_pull_cycle_state
);
778 wreplsrv_pull_cycle_handler(state
);
782 struct composite_context
*wreplsrv_pull_cycle_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_cycle_io
*io
)
784 struct composite_context
*c
= NULL
;
785 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
786 struct wreplsrv_pull_cycle_state
*state
= NULL
;
788 c
= talloc_zero(mem_ctx
, struct composite_context
);
791 state
= talloc_zero(c
, struct wreplsrv_pull_cycle_state
);
792 if (!state
) goto failed
;
796 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
797 c
->event_ctx
= service
->task
->event_ctx
;
798 c
->private_data
= state
;
800 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
;
801 state
->table_io
.in
.partner
= io
->in
.partner
;
802 state
->table_io
.in
.num_owners
= io
->in
.num_owners
;
803 state
->table_io
.in
.owners
= io
->in
.owners
;
804 state
->creq
= wreplsrv_pull_table_send(state
, &state
->table_io
);
805 if (!state
->creq
) goto failed
;
807 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
808 state
->creq
->async
.private_data
= state
;
816 NTSTATUS
wreplsrv_pull_cycle_recv(struct composite_context
*c
)
820 status
= composite_wait(c
);
826 enum wreplsrv_push_notify_stage
{
827 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
,
828 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
,
829 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
832 struct wreplsrv_push_notify_state
{
833 enum wreplsrv_push_notify_stage stage
;
834 struct composite_context
*c
;
835 struct wreplsrv_push_notify_io
*io
;
836 enum wrepl_replication_cmd command
;
838 struct wrepl_send_ctrl ctrl
;
839 struct wrepl_request
*req
;
840 struct wrepl_packet req_packet
;
841 struct wrepl_packet
*rep_packet
;
842 struct composite_context
*creq
;
843 struct wreplsrv_out_connection
*wreplconn
;
846 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
);
847 static void wreplsrv_push_notify_handler_req(struct wrepl_request
*req
);
849 static NTSTATUS
wreplsrv_push_notify_update(struct wreplsrv_push_notify_state
*state
)
851 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
852 struct wrepl_packet
*req
= &state
->req_packet
;
853 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
854 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
855 struct wreplsrv_in_connection
*wrepl_in
;
857 struct socket_context
*sock
;
858 struct packet_context
*packet
;
861 /* prepare the outgoing request */
862 req
->opcode
= WREPL_OPCODE_BITS
;
863 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
864 req
->mess_type
= WREPL_REPLICATION
;
866 repl_out
->command
= state
->command
;
868 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
869 service
->wins_db
->local_owner
, state
->full_table
);
870 NT_STATUS_NOT_OK_RETURN(status
);
872 /* queue the request */
873 state
->req
= wrepl_request_send(state
->wreplconn
->sock
, req
, NULL
);
874 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
877 * now we need to convert the wrepl_socket (client connection)
878 * into a wreplsrv_in_connection (server connection), because
879 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
880 * message is received by the peer.
883 /* steal the socket_context */
884 sock
= state
->wreplconn
->sock
->sock
;
885 state
->wreplconn
->sock
->sock
= NULL
;
886 talloc_steal(state
, sock
);
889 * steal the packet_context
890 * note the request DATA_BLOB we just send on the
891 * wrepl_socket (client connection) is still unter the
892 * packet context and will be send to the wire
894 packet
= state
->wreplconn
->sock
->packet
;
895 state
->wreplconn
->sock
->packet
= NULL
;
896 talloc_steal(state
, packet
);
899 * get the fde_flags of the old fde event,
900 * so that we can later set the same flags to the new one
902 fde_flags
= event_get_fd_flags(state
->wreplconn
->sock
->event
.fde
);
905 * free the wrepl_socket (client connection)
907 talloc_free(state
->wreplconn
->sock
);
908 state
->wreplconn
->sock
= NULL
;
911 * now create a wreplsrv_in_connection,
912 * on which we act as server
914 * NOTE: sock and packet will be stolen by
915 * wreplsrv_in_connection_merge()
917 status
= wreplsrv_in_connection_merge(state
->io
->in
.partner
,
918 sock
, packet
, &wrepl_in
);
919 NT_STATUS_NOT_OK_RETURN(status
);
921 event_set_fd_flags(wrepl_in
->conn
->event
.fde
, fde_flags
);
923 wrepl_in
->assoc_ctx
.peer_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
924 wrepl_in
->assoc_ctx
.our_ctx
= 0;
926 /* now we can free the wreplsrv_out_connection */
927 talloc_free(state
->wreplconn
);
928 state
->wreplconn
= NULL
;
930 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
935 static NTSTATUS
wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state
*state
)
937 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
938 struct wrepl_packet
*req
= &state
->req_packet
;
939 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
940 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
943 req
->opcode
= WREPL_OPCODE_BITS
;
944 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
945 req
->mess_type
= WREPL_REPLICATION
;
947 repl_out
->command
= state
->command
;
949 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
950 service
->wins_db
->local_owner
, state
->full_table
);
951 NT_STATUS_NOT_OK_RETURN(status
);
953 /* we won't get a reply to a inform message */
954 state
->ctrl
.send_only
= true;
956 state
->req
= wrepl_request_send(state
->wreplconn
->sock
, req
, &state
->ctrl
);
957 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
959 state
->req
->async
.fn
= wreplsrv_push_notify_handler_req
;
960 state
->req
->async
.private_data
= state
;
962 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
;
967 static NTSTATUS
wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state
*state
)
971 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
972 NT_STATUS_NOT_OK_RETURN(status
);
974 /* is the peer doesn't support inform fallback to update */
975 switch (state
->command
) {
976 case WREPL_REPL_INFORM
:
977 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
978 state
->command
= WREPL_REPL_UPDATE
;
981 case WREPL_REPL_INFORM2
:
982 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
983 state
->command
= WREPL_REPL_UPDATE2
;
990 switch (state
->command
) {
991 case WREPL_REPL_UPDATE
:
992 state
->full_table
= true;
993 return wreplsrv_push_notify_update(state
);
994 case WREPL_REPL_UPDATE2
:
995 state
->full_table
= false;
996 return wreplsrv_push_notify_update(state
);
997 case WREPL_REPL_INFORM
:
998 state
->full_table
= true;
999 return wreplsrv_push_notify_inform(state
);
1000 case WREPL_REPL_INFORM2
:
1001 state
->full_table
= false;
1002 return wreplsrv_push_notify_inform(state
);
1004 return NT_STATUS_INTERNAL_ERROR
;
1007 return NT_STATUS_INTERNAL_ERROR
;
1010 static NTSTATUS
wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state
*state
)
1014 status
= wrepl_request_recv(state
->req
, state
, NULL
);
1015 NT_STATUS_NOT_OK_RETURN(status
);
1017 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1021 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state
*state
)
1023 struct composite_context
*c
= state
->c
;
1025 switch (state
->stage
) {
1026 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
:
1027 c
->status
= wreplsrv_push_notify_wait_connect(state
);
1029 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
:
1030 c
->status
= wreplsrv_push_notify_wait_inform(state
);
1032 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE
:
1033 c
->status
= NT_STATUS_INTERNAL_ERROR
;
1036 if (state
->stage
== WREPLSRV_PUSH_NOTIFY_STAGE_DONE
) {
1037 c
->state
= COMPOSITE_STATE_DONE
;
1040 if (!NT_STATUS_IS_OK(c
->status
)) {
1041 c
->state
= COMPOSITE_STATE_ERROR
;
1044 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
1049 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
)
1051 struct wreplsrv_push_notify_state
*state
= talloc_get_type(creq
->async
.private_data
,
1052 struct wreplsrv_push_notify_state
);
1053 wreplsrv_push_notify_handler(state
);
1057 static void wreplsrv_push_notify_handler_req(struct wrepl_request
*req
)
1059 struct wreplsrv_push_notify_state
*state
= talloc_get_type(req
->async
.private_data
,
1060 struct wreplsrv_push_notify_state
);
1061 wreplsrv_push_notify_handler(state
);
1065 struct composite_context
*wreplsrv_push_notify_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_push_notify_io
*io
)
1067 struct composite_context
*c
= NULL
;
1068 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
1069 struct wreplsrv_push_notify_state
*state
= NULL
;
1070 enum winsrepl_partner_type partner_type
;
1072 c
= talloc_zero(mem_ctx
, struct composite_context
);
1073 if (!c
) goto failed
;
1075 state
= talloc_zero(c
, struct wreplsrv_push_notify_state
);
1076 if (!state
) goto failed
;
1080 if (io
->in
.inform
) {
1081 /* we can cache the connection in partner->push->wreplconn */
1082 partner_type
= WINSREPL_PARTNER_PUSH
;
1083 if (io
->in
.propagate
) {
1084 state
->command
= WREPL_REPL_INFORM2
;
1086 state
->command
= WREPL_REPL_INFORM
;
1089 /* we can NOT cache the connection */
1090 partner_type
= WINSREPL_PARTNER_NONE
;
1091 if (io
->in
.propagate
) {
1092 state
->command
= WREPL_REPL_UPDATE2
;
1094 state
->command
= WREPL_REPL_UPDATE
;
1098 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
1099 c
->event_ctx
= service
->task
->event_ctx
;
1100 c
->private_data
= state
;
1102 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
;
1103 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, NULL
);
1104 if (!state
->creq
) goto failed
;
1106 state
->creq
->async
.fn
= wreplsrv_push_notify_handler_creq
;
1107 state
->creq
->async
.private_data
= state
;
1115 NTSTATUS
wreplsrv_push_notify_recv(struct composite_context
*c
)
1119 status
= composite_wait(c
);