2 Unix SMB/CIFS implementation.
4 WINS Replication server
6 Copyright (C) Stefan Metzmacher 2005
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "lib/events/events.h"
24 #include "lib/socket/socket.h"
25 #include "smbd/service_task.h"
26 #include "smbd/service_stream.h"
27 #include "librpc/gen_ndr/winsrepl.h"
28 #include "wrepl_server/wrepl_server.h"
29 #include "nbt_server/wins/winsdb.h"
30 #include "libcli/composite/composite.h"
31 #include "libcli/wrepl/winsrepl.h"
32 #include "libcli/resolve/resolve.h"
33 #include "param/param.h"
35 enum wreplsrv_out_connect_stage
{
36 WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
,
37 WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
,
38 WREPLSRV_OUT_CONNECT_STAGE_DONE
41 struct wreplsrv_out_connect_state
{
42 enum wreplsrv_out_connect_stage stage
;
43 struct composite_context
*c
;
44 struct wrepl_request
*req
;
45 struct composite_context
*c_req
;
46 struct wrepl_associate assoc_io
;
47 enum winsrepl_partner_type type
;
48 struct wreplsrv_out_connection
*wreplconn
;
51 static void wreplsrv_out_connect_handler_creq(struct composite_context
*c_req
);
52 static void wreplsrv_out_connect_handler_req(struct wrepl_request
*req
);
54 static NTSTATUS
wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state
*state
)
58 status
= wrepl_connect_recv(state
->c_req
);
59 NT_STATUS_NOT_OK_RETURN(status
);
61 state
->req
= wrepl_associate_send(state
->wreplconn
->sock
, &state
->assoc_io
);
62 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
64 state
->req
->async
.fn
= wreplsrv_out_connect_handler_req
;
65 state
->req
->async
.private_data
= state
;
67 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
;
72 static NTSTATUS
wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state
*state
)
76 status
= wrepl_associate_recv(state
->req
, &state
->assoc_io
);
77 NT_STATUS_NOT_OK_RETURN(status
);
79 state
->wreplconn
->assoc_ctx
.peer_ctx
= state
->assoc_io
.out
.assoc_ctx
;
80 state
->wreplconn
->assoc_ctx
.peer_major
= state
->assoc_io
.out
.major_version
;
82 if (state
->type
== WINSREPL_PARTNER_PUSH
) {
83 if (state
->wreplconn
->assoc_ctx
.peer_major
>= 5) {
84 state
->wreplconn
->partner
->push
.wreplconn
= state
->wreplconn
;
85 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
87 state
->type
= WINSREPL_PARTNER_NONE
;
89 } else if (state
->type
== WINSREPL_PARTNER_PULL
) {
90 state
->wreplconn
->partner
->pull
.wreplconn
= state
->wreplconn
;
91 talloc_steal(state
->wreplconn
->partner
, state
->wreplconn
);
94 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
99 static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state
*state
)
101 struct composite_context
*c
= state
->c
;
103 switch (state
->stage
) {
104 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
:
105 c
->status
= wreplsrv_out_connect_wait_socket(state
);
107 case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX
:
108 c
->status
= wreplsrv_out_connect_wait_assoc_ctx(state
);
109 c
->state
= COMPOSITE_STATE_DONE
;
111 case WREPLSRV_OUT_CONNECT_STAGE_DONE
:
112 c
->status
= NT_STATUS_INTERNAL_ERROR
;
115 if (!NT_STATUS_IS_OK(c
->status
)) {
116 c
->state
= COMPOSITE_STATE_ERROR
;
119 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
124 static void wreplsrv_out_connect_handler_creq(struct composite_context
*creq
)
126 struct wreplsrv_out_connect_state
*state
= talloc_get_type(creq
->async
.private_data
,
127 struct wreplsrv_out_connect_state
);
128 wreplsrv_out_connect_handler(state
);
132 static void wreplsrv_out_connect_handler_req(struct wrepl_request
*req
)
134 struct wreplsrv_out_connect_state
*state
= talloc_get_type(req
->async
.private_data
,
135 struct wreplsrv_out_connect_state
);
136 wreplsrv_out_connect_handler(state
);
140 static struct composite_context
*wreplsrv_out_connect_send(struct wreplsrv_partner
*partner
,
141 enum winsrepl_partner_type type
,
142 struct wreplsrv_out_connection
*wreplconn
)
144 struct composite_context
*c
= NULL
;
145 struct wreplsrv_service
*service
= partner
->service
;
146 struct wreplsrv_out_connect_state
*state
= NULL
;
147 struct wreplsrv_out_connection
**wreplconnp
= &wreplconn
;
148 bool cached_connection
= false;
150 c
= talloc_zero(partner
, struct composite_context
);
153 state
= talloc_zero(c
, struct wreplsrv_out_connect_state
);
154 if (!state
) goto failed
;
158 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
159 c
->event_ctx
= service
->task
->event_ctx
;
160 c
->private_data
= state
;
162 if (type
== WINSREPL_PARTNER_PUSH
) {
163 cached_connection
= true;
164 wreplconn
= partner
->push
.wreplconn
;
165 wreplconnp
= &partner
->push
.wreplconn
;
166 } else if (type
== WINSREPL_PARTNER_PULL
) {
167 cached_connection
= true;
168 wreplconn
= partner
->pull
.wreplconn
;
169 wreplconnp
= &partner
->pull
.wreplconn
;
172 /* we have a connection already, so use it */
174 if (!wreplconn
->sock
->dead
) {
175 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
176 state
->wreplconn
= wreplconn
;
179 } else if (!cached_connection
) {
180 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_DONE
;
181 state
->wreplconn
= NULL
;
185 talloc_free(wreplconn
);
190 wreplconn
= talloc_zero(state
, struct wreplsrv_out_connection
);
191 if (!wreplconn
) goto failed
;
193 wreplconn
->service
= service
;
194 wreplconn
->partner
= partner
;
195 wreplconn
->sock
= wrepl_socket_init(wreplconn
, service
->task
->event_ctx
, lp_iconv_convenience(service
->task
->lp_ctx
));
196 if (!wreplconn
->sock
) goto failed
;
198 state
->stage
= WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET
;
199 state
->wreplconn
= wreplconn
;
200 state
->c_req
= wrepl_connect_send(wreplconn
->sock
,
201 partner
->our_address
?partner
->our_address
:wrepl_best_ip(service
->task
->lp_ctx
, partner
->address
),
203 if (!state
->c_req
) goto failed
;
205 state
->c_req
->async
.fn
= wreplsrv_out_connect_handler_creq
;
206 state
->c_req
->async
.private_data
= state
;
214 static NTSTATUS
wreplsrv_out_connect_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
215 struct wreplsrv_out_connection
**wreplconn
)
219 status
= composite_wait(c
);
221 if (NT_STATUS_IS_OK(status
)) {
222 struct wreplsrv_out_connect_state
*state
= talloc_get_type(c
->private_data
,
223 struct wreplsrv_out_connect_state
);
224 if (state
->wreplconn
) {
225 *wreplconn
= talloc_reference(mem_ctx
, state
->wreplconn
);
226 if (!*wreplconn
) status
= NT_STATUS_NO_MEMORY
;
228 status
= NT_STATUS_INVALID_CONNECTION
;
237 struct wreplsrv_pull_table_io
{
239 struct wreplsrv_partner
*partner
;
241 struct wrepl_wins_owner
*owners
;
245 struct wrepl_wins_owner
*owners
;
249 enum wreplsrv_pull_table_stage
{
250 WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
,
251 WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
,
252 WREPLSRV_PULL_TABLE_STAGE_DONE
255 struct wreplsrv_pull_table_state
{
256 enum wreplsrv_pull_table_stage stage
;
257 struct composite_context
*c
;
258 struct wrepl_pull_table table_io
;
259 struct wreplsrv_pull_table_io
*io
;
260 struct composite_context
*creq
;
261 struct wreplsrv_out_connection
*wreplconn
;
262 struct tevent_req
*subreq
;
265 static void wreplsrv_pull_table_handler_treq(struct tevent_req
*subreq
);
267 static NTSTATUS
wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state
*state
)
271 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
272 NT_STATUS_NOT_OK_RETURN(status
);
274 state
->table_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
275 state
->subreq
= wrepl_pull_table_send(state
,
276 state
->wreplconn
->service
->task
->event_ctx
,
277 state
->wreplconn
->sock
, &state
->table_io
);
278 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
280 tevent_req_set_callback(state
->subreq
,
281 wreplsrv_pull_table_handler_treq
,
284 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
;
289 static NTSTATUS
wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state
*state
)
293 status
= wrepl_pull_table_recv(state
->subreq
, state
, &state
->table_io
);
294 TALLOC_FREE(state
->subreq
);
295 NT_STATUS_NOT_OK_RETURN(status
);
297 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
302 static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state
*state
)
304 struct composite_context
*c
= state
->c
;
306 switch (state
->stage
) {
307 case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
:
308 c
->status
= wreplsrv_pull_table_wait_connection(state
);
310 case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY
:
311 c
->status
= wreplsrv_pull_table_wait_table_reply(state
);
312 c
->state
= COMPOSITE_STATE_DONE
;
314 case WREPLSRV_PULL_TABLE_STAGE_DONE
:
315 c
->status
= NT_STATUS_INTERNAL_ERROR
;
318 if (!NT_STATUS_IS_OK(c
->status
)) {
319 c
->state
= COMPOSITE_STATE_ERROR
;
322 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
327 static void wreplsrv_pull_table_handler_creq(struct composite_context
*creq
)
329 struct wreplsrv_pull_table_state
*state
= talloc_get_type(creq
->async
.private_data
,
330 struct wreplsrv_pull_table_state
);
331 wreplsrv_pull_table_handler(state
);
335 static void wreplsrv_pull_table_handler_treq(struct tevent_req
*subreq
)
337 struct wreplsrv_pull_table_state
*state
= tevent_req_callback_data(subreq
,
338 struct wreplsrv_pull_table_state
);
339 wreplsrv_pull_table_handler(state
);
343 static struct composite_context
*wreplsrv_pull_table_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_table_io
*io
)
345 struct composite_context
*c
= NULL
;
346 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
347 struct wreplsrv_pull_table_state
*state
= NULL
;
349 c
= talloc_zero(mem_ctx
, struct composite_context
);
352 state
= talloc_zero(c
, struct wreplsrv_pull_table_state
);
353 if (!state
) goto failed
;
357 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
358 c
->event_ctx
= service
->task
->event_ctx
;
359 c
->private_data
= state
;
361 if (io
->in
.num_owners
) {
362 state
->table_io
.out
.num_partners
= io
->in
.num_owners
;
363 state
->table_io
.out
.partners
= io
->in
.owners
;
364 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_DONE
;
369 state
->stage
= WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION
;
370 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, WINSREPL_PARTNER_PULL
, NULL
);
371 if (!state
->creq
) goto failed
;
373 state
->creq
->async
.fn
= wreplsrv_pull_table_handler_creq
;
374 state
->creq
->async
.private_data
= state
;
382 static NTSTATUS
wreplsrv_pull_table_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
383 struct wreplsrv_pull_table_io
*io
)
387 status
= composite_wait(c
);
389 if (NT_STATUS_IS_OK(status
)) {
390 struct wreplsrv_pull_table_state
*state
= talloc_get_type(c
->private_data
,
391 struct wreplsrv_pull_table_state
);
392 io
->out
.num_owners
= state
->table_io
.out
.num_partners
;
393 io
->out
.owners
= talloc_reference(mem_ctx
, state
->table_io
.out
.partners
);
400 struct wreplsrv_pull_names_io
{
402 struct wreplsrv_partner
*partner
;
403 struct wreplsrv_out_connection
*wreplconn
;
404 struct wrepl_wins_owner owner
;
408 struct wrepl_name
*names
;
412 enum wreplsrv_pull_names_stage
{
413 WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
,
414 WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
,
415 WREPLSRV_PULL_NAMES_STAGE_DONE
418 struct wreplsrv_pull_names_state
{
419 enum wreplsrv_pull_names_stage stage
;
420 struct composite_context
*c
;
421 struct wrepl_pull_names pull_io
;
422 struct wreplsrv_pull_names_io
*io
;
423 struct composite_context
*creq
;
424 struct wreplsrv_out_connection
*wreplconn
;
425 struct tevent_req
*subreq
;
428 static void wreplsrv_pull_names_handler_treq(struct tevent_req
*subreq
);
430 static NTSTATUS
wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state
*state
)
434 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
435 NT_STATUS_NOT_OK_RETURN(status
);
437 state
->pull_io
.in
.assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
438 state
->pull_io
.in
.partner
= state
->io
->in
.owner
;
439 state
->subreq
= wrepl_pull_names_send(state
,
440 state
->wreplconn
->service
->task
->event_ctx
,
441 state
->wreplconn
->sock
,
443 NT_STATUS_HAVE_NO_MEMORY(state
->subreq
);
445 tevent_req_set_callback(state
->subreq
,
446 wreplsrv_pull_names_handler_treq
,
449 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
;
454 static NTSTATUS
wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state
*state
)
458 status
= wrepl_pull_names_recv(state
->subreq
, state
, &state
->pull_io
);
459 TALLOC_FREE(state
->subreq
);
460 NT_STATUS_NOT_OK_RETURN(status
);
462 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_DONE
;
467 static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state
*state
)
469 struct composite_context
*c
= state
->c
;
471 switch (state
->stage
) {
472 case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
:
473 c
->status
= wreplsrv_pull_names_wait_connection(state
);
475 case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY
:
476 c
->status
= wreplsrv_pull_names_wait_send_reply(state
);
477 c
->state
= COMPOSITE_STATE_DONE
;
479 case WREPLSRV_PULL_NAMES_STAGE_DONE
:
480 c
->status
= NT_STATUS_INTERNAL_ERROR
;
483 if (!NT_STATUS_IS_OK(c
->status
)) {
484 c
->state
= COMPOSITE_STATE_ERROR
;
487 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
492 static void wreplsrv_pull_names_handler_creq(struct composite_context
*creq
)
494 struct wreplsrv_pull_names_state
*state
= talloc_get_type(creq
->async
.private_data
,
495 struct wreplsrv_pull_names_state
);
496 wreplsrv_pull_names_handler(state
);
500 static void wreplsrv_pull_names_handler_treq(struct tevent_req
*subreq
)
502 struct wreplsrv_pull_names_state
*state
= tevent_req_callback_data(subreq
,
503 struct wreplsrv_pull_names_state
);
504 wreplsrv_pull_names_handler(state
);
508 static struct composite_context
*wreplsrv_pull_names_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_names_io
*io
)
510 struct composite_context
*c
= NULL
;
511 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
512 struct wreplsrv_pull_names_state
*state
= NULL
;
513 enum winsrepl_partner_type partner_type
= WINSREPL_PARTNER_PULL
;
515 if (io
->in
.wreplconn
) partner_type
= WINSREPL_PARTNER_NONE
;
517 c
= talloc_zero(mem_ctx
, struct composite_context
);
520 state
= talloc_zero(c
, struct wreplsrv_pull_names_state
);
521 if (!state
) goto failed
;
525 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
526 c
->event_ctx
= service
->task
->event_ctx
;
527 c
->private_data
= state
;
529 state
->stage
= WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION
;
530 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, io
->in
.wreplconn
);
531 if (!state
->creq
) goto failed
;
533 state
->creq
->async
.fn
= wreplsrv_pull_names_handler_creq
;
534 state
->creq
->async
.private_data
= state
;
542 static NTSTATUS
wreplsrv_pull_names_recv(struct composite_context
*c
, TALLOC_CTX
*mem_ctx
,
543 struct wreplsrv_pull_names_io
*io
)
547 status
= composite_wait(c
);
549 if (NT_STATUS_IS_OK(status
)) {
550 struct wreplsrv_pull_names_state
*state
= talloc_get_type(c
->private_data
,
551 struct wreplsrv_pull_names_state
);
552 io
->out
.num_names
= state
->pull_io
.out
.num_names
;
553 io
->out
.names
= talloc_reference(mem_ctx
, state
->pull_io
.out
.names
);
561 enum wreplsrv_pull_cycle_stage
{
562 WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
,
563 WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
,
564 WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
,
565 WREPLSRV_PULL_CYCLE_STAGE_DONE
568 struct wreplsrv_pull_cycle_state
{
569 enum wreplsrv_pull_cycle_stage stage
;
570 struct composite_context
*c
;
571 struct wreplsrv_pull_cycle_io
*io
;
572 struct wreplsrv_pull_table_io table_io
;
574 struct wreplsrv_pull_names_io names_io
;
575 struct composite_context
*creq
;
576 struct wrepl_associate_stop assoc_stop_io
;
577 struct wrepl_request
*req
;
580 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
);
581 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request
*req
);
583 static NTSTATUS
wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state
*state
)
585 struct wreplsrv_owner
*current_owner
=NULL
;
586 struct wreplsrv_owner
*local_owner
;
588 uint64_t old_max_version
= 0;
589 bool do_pull
= false;
591 for (i
=state
->current
; i
< state
->table_io
.out
.num_owners
; i
++) {
592 current_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
593 state
->io
->in
.partner
->pull
.table
,
594 state
->table_io
.out
.owners
[i
].address
);
596 local_owner
= wreplsrv_find_owner(state
->io
->in
.partner
->service
,
597 state
->io
->in
.partner
->service
->table
,
598 state
->table_io
.out
.owners
[i
].address
);
600 * this means we are ourself the current owner,
601 * and we don't want replicate ourself
603 if (!current_owner
) continue;
606 * this means we don't have any records of this owner
616 * this means the remote partner has some new records of this owner
619 if (current_owner
->owner
.max_version
> local_owner
->owner
.max_version
) {
621 old_max_version
= local_owner
->owner
.max_version
;
628 state
->names_io
.in
.partner
= state
->io
->in
.partner
;
629 state
->names_io
.in
.wreplconn
= state
->io
->in
.wreplconn
;
630 state
->names_io
.in
.owner
= current_owner
->owner
;
631 state
->names_io
.in
.owner
.min_version
= old_max_version
+ 1;
632 state
->creq
= wreplsrv_pull_names_send(state
, &state
->names_io
);
633 NT_STATUS_HAVE_NO_MEMORY(state
->creq
);
635 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
636 state
->creq
->async
.private_data
= state
;
638 return STATUS_MORE_ENTRIES
;
644 static NTSTATUS
wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state
*state
)
648 status
= wreplsrv_pull_cycle_next_owner_do_work(state
);
649 if (NT_STATUS_IS_OK(status
)) {
650 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
651 } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES
, status
)) {
652 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
;
653 status
= NT_STATUS_OK
;
656 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
&& state
->io
->in
.wreplconn
) {
657 state
->assoc_stop_io
.in
.assoc_ctx
= state
->io
->in
.wreplconn
->assoc_ctx
.peer_ctx
;
658 state
->assoc_stop_io
.in
.reason
= 0;
659 state
->req
= wrepl_associate_stop_send(state
->io
->in
.wreplconn
->sock
, &state
->assoc_stop_io
);
660 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
662 state
->req
->async
.fn
= wreplsrv_pull_cycle_handler_req
;
663 state
->req
->async
.private_data
= state
;
665 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
;
671 static NTSTATUS
wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state
*state
)
676 status
= wreplsrv_pull_table_recv(state
->creq
, state
, &state
->table_io
);
677 NT_STATUS_NOT_OK_RETURN(status
);
679 /* update partner table */
680 for (i
=0; i
< state
->table_io
.out
.num_owners
; i
++) {
681 status
= wreplsrv_add_table(state
->io
->in
.partner
->service
,
682 state
->io
->in
.partner
,
683 &state
->io
->in
.partner
->pull
.table
,
684 state
->table_io
.out
.owners
[i
].address
,
685 state
->table_io
.out
.owners
[i
].max_version
);
686 NT_STATUS_NOT_OK_RETURN(status
);
689 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
690 NT_STATUS_NOT_OK_RETURN(status
);
695 static NTSTATUS
wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state
*state
)
699 status
= wreplsrv_apply_records(state
->io
->in
.partner
,
700 &state
->names_io
.in
.owner
,
701 state
->names_io
.out
.num_names
,
702 state
->names_io
.out
.names
);
703 NT_STATUS_NOT_OK_RETURN(status
);
705 talloc_free(state
->names_io
.out
.names
);
706 ZERO_STRUCT(state
->names_io
);
711 static NTSTATUS
wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state
*state
)
715 status
= wreplsrv_pull_names_recv(state
->creq
, state
, &state
->names_io
);
716 NT_STATUS_NOT_OK_RETURN(status
);
719 * TODO: this should maybe an async call,
720 * because we may need some network access
721 * for conflict resolving
723 status
= wreplsrv_pull_cycle_apply_records(state
);
724 NT_STATUS_NOT_OK_RETURN(status
);
726 status
= wreplsrv_pull_cycle_next_owner_wrapper(state
);
727 NT_STATUS_NOT_OK_RETURN(status
);
732 static NTSTATUS
wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state
*state
)
736 status
= wrepl_associate_stop_recv(state
->req
, &state
->assoc_stop_io
);
737 NT_STATUS_NOT_OK_RETURN(status
);
739 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_DONE
;
744 static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state
*state
)
746 struct composite_context
*c
= state
->c
;
748 switch (state
->stage
) {
749 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
:
750 c
->status
= wreplsrv_pull_cycle_wait_table_reply(state
);
752 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES
:
753 c
->status
= wreplsrv_pull_cycle_wait_send_replies(state
);
755 case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC
:
756 c
->status
= wreplsrv_pull_cycle_wait_stop_assoc(state
);
758 case WREPLSRV_PULL_CYCLE_STAGE_DONE
:
759 c
->status
= NT_STATUS_INTERNAL_ERROR
;
762 if (state
->stage
== WREPLSRV_PULL_CYCLE_STAGE_DONE
) {
763 c
->state
= COMPOSITE_STATE_DONE
;
766 if (!NT_STATUS_IS_OK(c
->status
)) {
767 c
->state
= COMPOSITE_STATE_ERROR
;
770 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
775 static void wreplsrv_pull_cycle_handler_creq(struct composite_context
*creq
)
777 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(creq
->async
.private_data
,
778 struct wreplsrv_pull_cycle_state
);
779 wreplsrv_pull_cycle_handler(state
);
783 static void wreplsrv_pull_cycle_handler_req(struct wrepl_request
*req
)
785 struct wreplsrv_pull_cycle_state
*state
= talloc_get_type(req
->async
.private_data
,
786 struct wreplsrv_pull_cycle_state
);
787 wreplsrv_pull_cycle_handler(state
);
791 struct composite_context
*wreplsrv_pull_cycle_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_pull_cycle_io
*io
)
793 struct composite_context
*c
= NULL
;
794 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
795 struct wreplsrv_pull_cycle_state
*state
= NULL
;
797 c
= talloc_zero(mem_ctx
, struct composite_context
);
800 state
= talloc_zero(c
, struct wreplsrv_pull_cycle_state
);
801 if (!state
) goto failed
;
805 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
806 c
->event_ctx
= service
->task
->event_ctx
;
807 c
->private_data
= state
;
809 state
->stage
= WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY
;
810 state
->table_io
.in
.partner
= io
->in
.partner
;
811 state
->table_io
.in
.num_owners
= io
->in
.num_owners
;
812 state
->table_io
.in
.owners
= io
->in
.owners
;
813 state
->creq
= wreplsrv_pull_table_send(state
, &state
->table_io
);
814 if (!state
->creq
) goto failed
;
816 state
->creq
->async
.fn
= wreplsrv_pull_cycle_handler_creq
;
817 state
->creq
->async
.private_data
= state
;
825 NTSTATUS
wreplsrv_pull_cycle_recv(struct composite_context
*c
)
829 status
= composite_wait(c
);
835 enum wreplsrv_push_notify_stage
{
836 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
,
837 WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
,
838 WREPLSRV_PUSH_NOTIFY_STAGE_DONE
841 struct wreplsrv_push_notify_state
{
842 enum wreplsrv_push_notify_stage stage
;
843 struct composite_context
*c
;
844 struct wreplsrv_push_notify_io
*io
;
845 enum wrepl_replication_cmd command
;
847 struct wrepl_send_ctrl ctrl
;
848 struct wrepl_request
*req
;
849 struct wrepl_packet req_packet
;
850 struct wrepl_packet
*rep_packet
;
851 struct composite_context
*creq
;
852 struct wreplsrv_out_connection
*wreplconn
;
855 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
);
856 static void wreplsrv_push_notify_handler_req(struct wrepl_request
*req
);
858 static NTSTATUS
wreplsrv_push_notify_update(struct wreplsrv_push_notify_state
*state
)
860 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
861 struct wrepl_packet
*req
= &state
->req_packet
;
862 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
863 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
864 struct wreplsrv_in_connection
*wrepl_in
;
866 struct socket_context
*sock
;
868 /* prepare the outgoing request */
869 req
->opcode
= WREPL_OPCODE_BITS
;
870 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
871 req
->mess_type
= WREPL_REPLICATION
;
873 repl_out
->command
= state
->command
;
875 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
876 service
->wins_db
->local_owner
, state
->full_table
);
877 NT_STATUS_NOT_OK_RETURN(status
);
879 /* queue the request */
880 state
->req
= wrepl_request_send(state
->wreplconn
->sock
, req
, NULL
);
881 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
884 * now we need to convert the wrepl_socket (client connection)
885 * into a wreplsrv_in_connection (server connection), because
886 * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
887 * message is received by the peer.
890 /* steal the socket_context */
891 sock
= state
->wreplconn
->sock
->sock
;
892 state
->wreplconn
->sock
->sock
= NULL
;
893 talloc_steal(state
, sock
);
896 * TODO: steal the tstream if we switch the client to tsocket.
897 * This is just to get a compiler error as soon as we remove
900 state
->wreplconn
->sock
->packet
= NULL
;
903 * free the wrepl_socket (client connection)
905 talloc_free(state
->wreplconn
->sock
);
906 state
->wreplconn
->sock
= NULL
;
909 * now create a wreplsrv_in_connection,
910 * on which we act as server
912 * NOTE: sock and packet will be stolen by
913 * wreplsrv_in_connection_merge()
915 status
= wreplsrv_in_connection_merge(state
->io
->in
.partner
,
917 NT_STATUS_NOT_OK_RETURN(status
);
919 wrepl_in
->assoc_ctx
.peer_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
920 wrepl_in
->assoc_ctx
.our_ctx
= 0;
922 /* now we can free the wreplsrv_out_connection */
923 TALLOC_FREE(state
->wreplconn
);
925 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
930 static NTSTATUS
wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state
*state
)
932 struct wreplsrv_service
*service
= state
->io
->in
.partner
->service
;
933 struct wrepl_packet
*req
= &state
->req_packet
;
934 struct wrepl_replication
*repl_out
= &state
->req_packet
.message
.replication
;
935 struct wrepl_table
*table_out
= &state
->req_packet
.message
.replication
.info
.table
;
938 req
->opcode
= WREPL_OPCODE_BITS
;
939 req
->assoc_ctx
= state
->wreplconn
->assoc_ctx
.peer_ctx
;
940 req
->mess_type
= WREPL_REPLICATION
;
942 repl_out
->command
= state
->command
;
944 status
= wreplsrv_fill_wrepl_table(service
, state
, table_out
,
945 service
->wins_db
->local_owner
, state
->full_table
);
946 NT_STATUS_NOT_OK_RETURN(status
);
948 /* we won't get a reply to a inform message */
949 state
->ctrl
.send_only
= true;
951 state
->req
= wrepl_request_send(state
->wreplconn
->sock
, req
, &state
->ctrl
);
952 NT_STATUS_HAVE_NO_MEMORY(state
->req
);
954 state
->req
->async
.fn
= wreplsrv_push_notify_handler_req
;
955 state
->req
->async
.private_data
= state
;
957 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
;
962 static NTSTATUS
wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state
*state
)
966 status
= wreplsrv_out_connect_recv(state
->creq
, state
, &state
->wreplconn
);
967 NT_STATUS_NOT_OK_RETURN(status
);
969 /* is the peer doesn't support inform fallback to update */
970 switch (state
->command
) {
971 case WREPL_REPL_INFORM
:
972 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
973 state
->command
= WREPL_REPL_UPDATE
;
976 case WREPL_REPL_INFORM2
:
977 if (state
->wreplconn
->assoc_ctx
.peer_major
< 5) {
978 state
->command
= WREPL_REPL_UPDATE2
;
985 switch (state
->command
) {
986 case WREPL_REPL_UPDATE
:
987 state
->full_table
= true;
988 return wreplsrv_push_notify_update(state
);
989 case WREPL_REPL_UPDATE2
:
990 state
->full_table
= false;
991 return wreplsrv_push_notify_update(state
);
992 case WREPL_REPL_INFORM
:
993 state
->full_table
= true;
994 return wreplsrv_push_notify_inform(state
);
995 case WREPL_REPL_INFORM2
:
996 state
->full_table
= false;
997 return wreplsrv_push_notify_inform(state
);
999 return NT_STATUS_INTERNAL_ERROR
;
1002 return NT_STATUS_INTERNAL_ERROR
;
1005 static NTSTATUS
wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state
*state
)
1009 status
= wrepl_request_recv(state
->req
, state
, NULL
);
1010 NT_STATUS_NOT_OK_RETURN(status
);
1012 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_DONE
;
1016 static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state
*state
)
1018 struct composite_context
*c
= state
->c
;
1020 switch (state
->stage
) {
1021 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
:
1022 c
->status
= wreplsrv_push_notify_wait_connect(state
);
1024 case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM
:
1025 c
->status
= wreplsrv_push_notify_wait_inform(state
);
1027 case WREPLSRV_PUSH_NOTIFY_STAGE_DONE
:
1028 c
->status
= NT_STATUS_INTERNAL_ERROR
;
1031 if (state
->stage
== WREPLSRV_PUSH_NOTIFY_STAGE_DONE
) {
1032 c
->state
= COMPOSITE_STATE_DONE
;
1035 if (!NT_STATUS_IS_OK(c
->status
)) {
1036 c
->state
= COMPOSITE_STATE_ERROR
;
1039 if (c
->state
>= COMPOSITE_STATE_DONE
&& c
->async
.fn
) {
1044 static void wreplsrv_push_notify_handler_creq(struct composite_context
*creq
)
1046 struct wreplsrv_push_notify_state
*state
= talloc_get_type(creq
->async
.private_data
,
1047 struct wreplsrv_push_notify_state
);
1048 wreplsrv_push_notify_handler(state
);
1052 static void wreplsrv_push_notify_handler_req(struct wrepl_request
*req
)
1054 struct wreplsrv_push_notify_state
*state
= talloc_get_type(req
->async
.private_data
,
1055 struct wreplsrv_push_notify_state
);
1056 wreplsrv_push_notify_handler(state
);
1060 struct composite_context
*wreplsrv_push_notify_send(TALLOC_CTX
*mem_ctx
, struct wreplsrv_push_notify_io
*io
)
1062 struct composite_context
*c
= NULL
;
1063 struct wreplsrv_service
*service
= io
->in
.partner
->service
;
1064 struct wreplsrv_push_notify_state
*state
= NULL
;
1065 enum winsrepl_partner_type partner_type
;
1067 c
= talloc_zero(mem_ctx
, struct composite_context
);
1068 if (!c
) goto failed
;
1070 state
= talloc_zero(c
, struct wreplsrv_push_notify_state
);
1071 if (!state
) goto failed
;
1075 if (io
->in
.inform
) {
1076 /* we can cache the connection in partner->push->wreplconn */
1077 partner_type
= WINSREPL_PARTNER_PUSH
;
1078 if (io
->in
.propagate
) {
1079 state
->command
= WREPL_REPL_INFORM2
;
1081 state
->command
= WREPL_REPL_INFORM
;
1084 /* we can NOT cache the connection */
1085 partner_type
= WINSREPL_PARTNER_NONE
;
1086 if (io
->in
.propagate
) {
1087 state
->command
= WREPL_REPL_UPDATE2
;
1089 state
->command
= WREPL_REPL_UPDATE
;
1093 c
->state
= COMPOSITE_STATE_IN_PROGRESS
;
1094 c
->event_ctx
= service
->task
->event_ctx
;
1095 c
->private_data
= state
;
1097 state
->stage
= WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT
;
1098 state
->creq
= wreplsrv_out_connect_send(io
->in
.partner
, partner_type
, NULL
);
1099 if (!state
->creq
) goto failed
;
1101 state
->creq
->async
.fn
= wreplsrv_push_notify_handler_creq
;
1102 state
->creq
->async
.private_data
= state
;
1110 NTSTATUS
wreplsrv_push_notify_recv(struct composite_context
*c
)
1114 status
= composite_wait(c
);