2 Unix SMB/CIFS implementation.
3 Samba3 message channels
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "msg_channel.h"
22 #include "ctdb_conn.h"
23 #include "lib/util/tevent_unix.h"
26 struct ctdb_msg_channel
*ctdb_channel
;
27 struct messaging_context
*msg
;
30 struct tevent_req
*pending_req
;
31 struct tevent_context
*ev
;
33 struct messaging_rec
**msgs
;
36 struct msg_channel_init_state
{
37 struct msg_channel
*channel
;
40 static void msg_channel_init_got_ctdb(struct tevent_req
*subreq
);
41 static void msg_channel_init_got_msg(struct messaging_context
*msg
,
42 void *priv
, uint32_t msg_type
,
43 struct server_id server_id
, DATA_BLOB
*data
);
44 static void msg_channel_trigger(struct tevent_context
*ev
,
45 struct tevent_immediate
*im
,
47 static int msg_channel_destructor(struct msg_channel
*s
);
49 struct tevent_req
*msg_channel_init_send(TALLOC_CTX
*mem_ctx
,
50 struct tevent_context
*ev
,
51 struct messaging_context
*msg
,
54 struct tevent_req
*req
, *subreq
;
55 struct msg_channel_init_state
*state
;
58 req
= tevent_req_create(mem_ctx
, &state
,
59 struct msg_channel_init_state
);
64 state
->channel
= talloc_zero(state
, struct msg_channel
);
65 if (tevent_req_nomem(state
->channel
, req
)) {
66 return tevent_req_post(req
, ev
);
68 state
->channel
->msg
= msg
;
69 state
->channel
->msg_type
= msg_type
;
71 pid
= messaging_server_id(msg
);
72 subreq
= ctdb_msg_channel_init_send(state
, ev
, lp_ctdbd_socket(),
74 if (tevent_req_nomem(subreq
, req
)) {
75 return tevent_req_post(req
, ev
);
77 tevent_req_set_callback(subreq
, msg_channel_init_got_ctdb
, req
);
81 static void msg_channel_init_got_ctdb(struct tevent_req
*subreq
)
83 struct tevent_req
*req
= tevent_req_callback_data(
84 subreq
, struct tevent_req
);
85 struct msg_channel_init_state
*state
= tevent_req_data(
86 req
, struct msg_channel_init_state
);
87 struct msg_channel
*s
= state
->channel
;
91 ret
= ctdb_msg_channel_init_recv(subreq
, s
, &s
->ctdb_channel
);
95 s
->ctdb_channel
= NULL
;
99 if (tevent_req_error(req
, ret
)) {
102 status
= messaging_register(s
->msg
, s
, s
->msg_type
,
103 msg_channel_init_got_msg
);
104 if (!NT_STATUS_IS_OK(status
)) {
105 tevent_req_error(req
, map_errno_from_nt_status(status
));
108 talloc_set_destructor(s
, msg_channel_destructor
);
109 tevent_req_done(req
);
112 static int msg_channel_destructor(struct msg_channel
*s
)
114 messaging_deregister(s
->msg
, s
->msg_type
, s
);
118 int msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
119 struct msg_channel
**pchannel
)
121 struct msg_channel_init_state
*state
= tevent_req_data(
122 req
, struct msg_channel_init_state
);
125 if (tevent_req_is_unix_error(req
, &err
)) {
128 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
132 int msg_channel_init(TALLOC_CTX
*mem_ctx
, struct messaging_context
*msg
,
133 uint32_t msgtype
, struct msg_channel
**pchannel
)
135 TALLOC_CTX
*frame
= talloc_stackframe();
136 struct tevent_context
*ev
;
137 struct tevent_req
*req
;
141 ev
= tevent_context_init(frame
);
145 req
= msg_channel_init_send(frame
, ev
, msg
, msgtype
);
149 ok
= tevent_req_poll(req
, ev
);
154 err
= msg_channel_init_recv(req
, mem_ctx
, pchannel
);
160 static void msg_channel_init_got_msg(struct messaging_context
*msg
,
161 void *priv
, uint32_t msg_type
,
162 struct server_id server_id
,
165 struct msg_channel
*s
= talloc_get_type_abort(
166 priv
, struct msg_channel
);
167 struct messaging_rec
*rec
;
168 struct messaging_rec
**msgs
;
170 struct tevent_immediate
*im
;
172 rec
= talloc(s
, struct messaging_rec
);
176 rec
->msg_version
= 1;
177 rec
->msg_type
= msg_type
;
178 rec
->dest
= server_id
;
179 rec
->src
= messaging_server_id(msg
);
180 rec
->buf
.data
= (uint8_t *)talloc_memdup(rec
, data
->data
,
182 if (rec
->buf
.data
== NULL
) {
185 rec
->buf
.length
= data
->length
;
187 num_msgs
= talloc_array_length(s
->msgs
);
188 msgs
= talloc_realloc(s
, s
->msgs
, struct messaging_rec
*, num_msgs
+1);
193 s
->msgs
[num_msgs
] = talloc_move(s
->msgs
, &rec
);
195 if (s
->pending_req
== NULL
) {
199 im
= tevent_create_immediate(s
);
203 tevent_schedule_immediate(im
, s
->ev
, msg_channel_trigger
, s
);
209 struct msg_read_state
{
210 struct tevent_context
*ev
;
211 struct tevent_req
*req
;
212 struct msg_channel
*channel
;
213 struct messaging_rec
*rec
;
216 static int msg_read_state_destructor(struct msg_read_state
*s
);
217 static void msg_read_got_ctdb(struct tevent_req
*subreq
);
219 struct tevent_req
*msg_read_send(TALLOC_CTX
*mem_ctx
,
220 struct tevent_context
*ev
,
221 struct msg_channel
*channel
)
223 struct tevent_req
*req
;
224 struct tevent_immediate
*im
;
225 struct msg_read_state
*state
;
229 req
= tevent_req_create(mem_ctx
, &state
, struct msg_read_state
);
235 state
->channel
= channel
;
237 if (channel
->pending_req
!= NULL
) {
238 tevent_req_error(req
, EBUSY
);
239 return tevent_req_post(req
, ev
);
241 channel
->pending_req
= req
;
243 talloc_set_destructor(state
, msg_read_state_destructor
);
245 num_msgs
= talloc_array_length(channel
->msgs
);
247 im
= tevent_create_immediate(channel
->ev
);
248 if (tevent_req_nomem(im
, req
)) {
249 return tevent_req_post(req
, ev
);
251 tevent_schedule_immediate(im
, channel
->ev
, msg_channel_trigger
,
256 msg_tdb_event
= messaging_tdb_event(state
, channel
->msg
, ev
);
257 if (tevent_req_nomem(msg_tdb_event
, req
)) {
258 return tevent_req_post(req
, ev
);
261 if (channel
->ctdb_channel
!= NULL
) {
262 struct tevent_req
*subreq
;
264 subreq
= ctdb_msg_read_send(state
, ev
,
265 channel
->ctdb_channel
);
266 if (tevent_req_nomem(subreq
, req
)) {
267 return tevent_req_post(req
, ev
);
269 tevent_req_set_callback(subreq
, msg_read_got_ctdb
, req
);
274 static int msg_read_state_destructor(struct msg_read_state
*s
)
276 assert(s
->channel
->pending_req
== s
->req
);
277 s
->channel
->pending_req
= NULL
;
281 static void msg_channel_trigger(struct tevent_context
*ev
,
282 struct tevent_immediate
*im
,
285 struct msg_channel
*channel
;
286 struct tevent_req
*req
;
287 struct msg_read_state
*state
;
290 channel
= talloc_get_type_abort(priv
, struct msg_channel
);
291 req
= channel
->pending_req
;
292 state
= tevent_req_data(req
, struct msg_read_state
);
294 talloc_set_destructor(state
, NULL
);
295 msg_read_state_destructor(state
);
297 num_msgs
= talloc_array_length(channel
->msgs
);
298 assert(num_msgs
> 0);
300 state
->rec
= talloc_move(state
, &channel
->msgs
[0]);
302 memmove(channel
->msgs
, channel
->msgs
+1,
303 sizeof(struct messaging_rec
*) * (num_msgs
-1));
304 channel
->msgs
= talloc_realloc(
305 channel
, channel
->msgs
, struct messaging_rec
*, num_msgs
- 1);
307 tevent_req_done(req
);
310 static void msg_read_got_ctdb(struct tevent_req
*subreq
)
312 struct tevent_req
*req
= tevent_req_callback_data(
313 subreq
, struct tevent_req
);
314 struct msg_read_state
*state
= tevent_req_data(
315 req
, struct msg_read_state
);
317 enum ndr_err_code ndr_err
;
320 ret
= ctdb_msg_read_recv(subreq
, talloc_tos(),
321 &blob
.data
, &blob
.length
);
323 if (tevent_req_error(req
, ret
)) {
327 state
->rec
= talloc(state
, struct messaging_rec
);
328 if (tevent_req_nomem(state
->rec
, req
)) {
332 ndr_err
= ndr_pull_struct_blob(
333 &blob
, state
->rec
, state
->rec
,
334 (ndr_pull_flags_fn_t
)ndr_pull_messaging_rec
);
336 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
337 DEBUG(1, ("ndr_pull_struct_blob failed: %s\n",
338 ndr_errstr(ndr_err
)));
339 tevent_req_error(req
, ndr_map_error2errno(ndr_err
));
342 if (DEBUGLEVEL
>= 10) {
343 NDR_PRINT_DEBUG(messaging_rec
, state
->rec
);
345 if (state
->rec
->msg_type
== state
->channel
->msg_type
) {
346 tevent_req_done(req
);
350 * Got some unexpected msg type, wait for the next one
352 subreq
= ctdb_msg_read_send(state
, state
->ev
,
353 state
->channel
->ctdb_channel
);
354 if (tevent_req_nomem(subreq
, req
)) {
357 tevent_req_set_callback(subreq
, msg_read_got_ctdb
, req
);
360 int msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
361 struct messaging_rec
**prec
)
363 struct msg_read_state
*state
= tevent_req_data(
364 req
, struct msg_read_state
);
367 if (tevent_req_is_unix_error(req
, &err
)) {
370 *prec
= talloc_move(mem_ctx
, &state
->rec
);