2 Unix SMB/CIFS implementation.
3 Samba3 message channels
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "msg_channel.h"
22 #include "ctdb_conn.h"
23 #include "lib/util/tevent_unix.h"
26 struct ctdb_msg_channel
*ctdb_channel
;
27 struct messaging_context
*msg
;
30 struct tevent_req
*pending_req
;
31 struct tevent_context
*ev
;
33 struct messaging_rec
**msgs
;
36 struct msg_channel_init_state
{
37 struct msg_channel
*channel
;
40 static void msg_channel_init_got_ctdb(struct tevent_req
*subreq
);
41 static void msg_channel_init_got_msg(struct messaging_context
*msg
,
42 void *priv
, uint32_t msg_type
,
43 struct server_id server_id
, DATA_BLOB
*data
);
44 static int msg_channel_destructor(struct msg_channel
*s
);
46 struct tevent_req
*msg_channel_init_send(TALLOC_CTX
*mem_ctx
,
47 struct tevent_context
*ev
,
48 struct messaging_context
*msg
,
51 struct tevent_req
*req
, *subreq
;
52 struct msg_channel_init_state
*state
;
55 req
= tevent_req_create(mem_ctx
, &state
,
56 struct msg_channel_init_state
);
61 state
->channel
= talloc_zero(state
, struct msg_channel
);
62 if (tevent_req_nomem(state
->channel
, req
)) {
63 return tevent_req_post(req
, ev
);
65 state
->channel
->msg
= msg
;
66 state
->channel
->msg_type
= msg_type
;
68 pid
= messaging_server_id(msg
);
69 subreq
= ctdb_msg_channel_init_send(state
, ev
, lp_ctdbd_socket(),
71 if (tevent_req_nomem(subreq
, req
)) {
72 return tevent_req_post(req
, ev
);
74 tevent_req_set_callback(subreq
, msg_channel_init_got_ctdb
, req
);
78 static void msg_channel_init_got_ctdb(struct tevent_req
*subreq
)
80 struct tevent_req
*req
= tevent_req_callback_data(
81 subreq
, struct tevent_req
);
82 struct msg_channel_init_state
*state
= tevent_req_data(
83 req
, struct msg_channel_init_state
);
84 struct msg_channel
*s
= state
->channel
;
88 ret
= ctdb_msg_channel_init_recv(subreq
, s
, &s
->ctdb_channel
);
92 s
->ctdb_channel
= NULL
;
96 if (tevent_req_error(req
, ret
)) {
99 status
= messaging_register(s
->msg
, s
, s
->msg_type
,
100 msg_channel_init_got_msg
);
101 if (!NT_STATUS_IS_OK(status
)) {
102 tevent_req_error(req
, map_errno_from_nt_status(status
));
105 talloc_set_destructor(s
, msg_channel_destructor
);
106 tevent_req_done(req
);
109 static int msg_channel_destructor(struct msg_channel
*s
)
111 messaging_deregister(s
->msg
, s
->msg_type
, s
);
115 int msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
116 struct msg_channel
**pchannel
)
118 struct msg_channel_init_state
*state
= tevent_req_data(
119 req
, struct msg_channel_init_state
);
122 if (tevent_req_is_unix_error(req
, &err
)) {
125 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
129 int msg_channel_init(TALLOC_CTX
*mem_ctx
, struct messaging_context
*msg
,
130 uint32_t msgtype
, struct msg_channel
**pchannel
)
132 TALLOC_CTX
*frame
= talloc_stackframe();
133 struct tevent_context
*ev
;
134 struct tevent_req
*req
;
138 ev
= samba_tevent_context_init(frame
);
142 req
= msg_channel_init_send(frame
, ev
, msg
, msgtype
);
146 ok
= tevent_req_poll(req
, ev
);
151 err
= msg_channel_init_recv(req
, mem_ctx
, pchannel
);
157 struct msg_read_state
{
158 struct tevent_context
*ev
;
159 struct msg_channel
*channel
;
160 struct messaging_rec
*rec
;
163 static void msg_channel_init_got_msg(struct messaging_context
*msg
,
164 void *priv
, uint32_t msg_type
,
165 struct server_id server_id
,
168 struct msg_channel
*s
= talloc_get_type_abort(
169 priv
, struct msg_channel
);
170 struct messaging_rec
*rec
;
171 struct messaging_rec
**msgs
;
174 rec
= talloc(s
, struct messaging_rec
);
178 rec
->msg_version
= 1;
179 rec
->msg_type
= msg_type
;
180 rec
->dest
= server_id
;
181 rec
->src
= messaging_server_id(msg
);
182 rec
->buf
.data
= (uint8_t *)talloc_memdup(rec
, data
->data
,
184 if (rec
->buf
.data
== NULL
) {
187 rec
->buf
.length
= data
->length
;
189 if (s
->pending_req
!= NULL
) {
190 struct tevent_req
*req
= s
->pending_req
;
191 struct msg_read_state
*state
= tevent_req_data(
192 req
, struct msg_read_state
);
194 s
->pending_req
= NULL
;
196 state
->rec
= talloc_move(state
, &rec
);
197 tevent_req_defer_callback(req
, s
->ev
);
198 tevent_req_done(req
);
202 num_msgs
= talloc_array_length(s
->msgs
);
203 msgs
= talloc_realloc(s
, s
->msgs
, struct messaging_rec
*, num_msgs
+1);
208 s
->msgs
[num_msgs
] = talloc_move(s
->msgs
, &rec
);
215 static void msg_read_got_ctdb(struct tevent_req
*subreq
);
216 static int msg_read_state_destructor(struct msg_read_state
*s
);
218 struct tevent_req
*msg_read_send(TALLOC_CTX
*mem_ctx
,
219 struct tevent_context
*ev
,
220 struct msg_channel
*channel
)
222 struct tevent_req
*req
;
223 struct msg_read_state
*state
;
227 req
= tevent_req_create(mem_ctx
, &state
, struct msg_read_state
);
232 state
->channel
= channel
;
234 if (channel
->pending_req
!= NULL
) {
235 tevent_req_error(req
, EBUSY
);
236 return tevent_req_post(req
, ev
);
239 num_msgs
= talloc_array_length(channel
->msgs
);
241 state
->rec
= talloc_move(state
, &channel
->msgs
[0]);
242 memmove(channel
->msgs
, channel
->msgs
+1,
243 sizeof(struct messaging_rec
*) * (num_msgs
-1));
244 channel
->msgs
= talloc_realloc(
245 channel
, channel
->msgs
, struct messaging_rec
*,
247 tevent_req_done(req
);
248 return tevent_req_post(req
, ev
);
251 channel
->pending_req
= req
;
252 talloc_set_destructor(state
, msg_read_state_destructor
);
256 msg_tdb_event
= messaging_tdb_event(state
, channel
->msg
, ev
);
257 if (tevent_req_nomem(msg_tdb_event
, req
)) {
258 return tevent_req_post(req
, ev
);
261 if (channel
->ctdb_channel
!= NULL
) {
262 struct tevent_req
*subreq
;
264 subreq
= ctdb_msg_read_send(state
, ev
,
265 channel
->ctdb_channel
);
266 if (tevent_req_nomem(subreq
, req
)) {
267 return tevent_req_post(req
, ev
);
269 tevent_req_set_callback(subreq
, msg_read_got_ctdb
, req
);
274 static int msg_read_state_destructor(struct msg_read_state
*s
)
276 s
->channel
->pending_req
= NULL
;
280 static void msg_read_got_ctdb(struct tevent_req
*subreq
)
282 struct tevent_req
*req
= tevent_req_callback_data(
283 subreq
, struct tevent_req
);
284 struct msg_read_state
*state
= tevent_req_data(
285 req
, struct msg_read_state
);
287 enum ndr_err_code ndr_err
;
290 state
->channel
->pending_req
= NULL
;
292 ret
= ctdb_msg_read_recv(subreq
, talloc_tos(),
293 &blob
.data
, &blob
.length
);
295 if (tevent_req_error(req
, ret
)) {
299 state
->rec
= talloc(state
, struct messaging_rec
);
300 if (tevent_req_nomem(state
->rec
, req
)) {
304 ndr_err
= ndr_pull_struct_blob(
305 &blob
, state
->rec
, state
->rec
,
306 (ndr_pull_flags_fn_t
)ndr_pull_messaging_rec
);
308 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
309 DEBUG(1, ("ndr_pull_struct_blob failed: %s\n",
310 ndr_errstr(ndr_err
)));
311 tevent_req_error(req
, ndr_map_error2errno(ndr_err
));
314 if (DEBUGLEVEL
>= 10) {
315 NDR_PRINT_DEBUG(messaging_rec
, state
->rec
);
317 if (state
->rec
->msg_type
== state
->channel
->msg_type
) {
318 tevent_req_done(req
);
322 * Got some unexpected msg type, wait for the next one
324 subreq
= ctdb_msg_read_send(state
, state
->ev
,
325 state
->channel
->ctdb_channel
);
326 if (tevent_req_nomem(subreq
, req
)) {
329 tevent_req_set_callback(subreq
, msg_read_got_ctdb
, req
);
330 state
->channel
->pending_req
= req
;
333 int msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
334 struct messaging_rec
**prec
)
336 struct msg_read_state
*state
= tevent_req_data(
337 req
, struct msg_read_state
);
340 if (tevent_req_is_unix_error(req
, &err
)) {
343 *prec
= talloc_move(mem_ctx
, &state
->rec
);
344 tevent_req_received(req
);