2 Unix SMB/CIFS implementation.
3 Samba3 message channels
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "msg_channel.h"
22 #include "ctdb_conn.h"
23 #include "lib/util/tevent_unix.h"
26 struct ctdb_msg_channel
*ctdb_channel
;
27 struct messaging_context
*msg
;
30 struct tevent_req
*pending_req
;
31 struct tevent_context
*ev
;
33 struct messaging_rec
**msgs
;
36 struct msg_channel_init_state
{
37 struct msg_channel
*channel
;
40 static void msg_channel_init_got_ctdb(struct tevent_req
*subreq
);
41 static void msg_channel_init_got_msg(struct messaging_context
*msg
,
42 void *priv
, uint32_t msg_type
,
43 struct server_id server_id
, DATA_BLOB
*data
);
44 static int msg_channel_destructor(struct msg_channel
*s
);
46 struct tevent_req
*msg_channel_init_send(TALLOC_CTX
*mem_ctx
,
47 struct tevent_context
*ev
,
48 struct messaging_context
*msg
,
51 struct tevent_req
*req
, *subreq
;
52 struct msg_channel_init_state
*state
;
55 req
= tevent_req_create(mem_ctx
, &state
,
56 struct msg_channel_init_state
);
61 state
->channel
= talloc_zero(state
, struct msg_channel
);
62 if (tevent_req_nomem(state
->channel
, req
)) {
63 return tevent_req_post(req
, ev
);
65 state
->channel
->msg
= msg
;
66 state
->channel
->msg_type
= msg_type
;
68 pid
= messaging_server_id(msg
);
69 subreq
= ctdb_msg_channel_init_send(state
, ev
, lp_ctdbd_socket(),
71 if (tevent_req_nomem(subreq
, req
)) {
72 return tevent_req_post(req
, ev
);
74 tevent_req_set_callback(subreq
, msg_channel_init_got_ctdb
, req
);
78 static void msg_channel_init_got_ctdb(struct tevent_req
*subreq
)
80 struct tevent_req
*req
= tevent_req_callback_data(
81 subreq
, struct tevent_req
);
82 struct msg_channel_init_state
*state
= tevent_req_data(
83 req
, struct msg_channel_init_state
);
84 struct msg_channel
*s
= state
->channel
;
88 ret
= ctdb_msg_channel_init_recv(subreq
, s
, &s
->ctdb_channel
);
92 s
->ctdb_channel
= NULL
;
96 if (tevent_req_error(req
, ret
)) {
99 status
= messaging_register(s
->msg
, s
, s
->msg_type
,
100 msg_channel_init_got_msg
);
101 if (!NT_STATUS_IS_OK(status
)) {
102 tevent_req_error(req
, map_errno_from_nt_status(status
));
105 talloc_set_destructor(s
, msg_channel_destructor
);
106 tevent_req_done(req
);
109 static int msg_channel_destructor(struct msg_channel
*s
)
111 messaging_deregister(s
->msg
, s
->msg_type
, s
);
115 int msg_channel_init_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
116 struct msg_channel
**pchannel
)
118 struct msg_channel_init_state
*state
= tevent_req_data(
119 req
, struct msg_channel_init_state
);
122 if (tevent_req_is_unix_error(req
, &err
)) {
125 *pchannel
= talloc_move(mem_ctx
, &state
->channel
);
129 int msg_channel_init(TALLOC_CTX
*mem_ctx
, struct messaging_context
*msg
,
130 uint32_t msgtype
, struct msg_channel
**pchannel
)
132 TALLOC_CTX
*frame
= talloc_stackframe();
133 struct tevent_context
*ev
;
134 struct tevent_req
*req
;
138 ev
= samba_tevent_context_init(frame
);
142 req
= msg_channel_init_send(frame
, ev
, msg
, msgtype
);
146 ok
= tevent_req_poll(req
, ev
);
151 err
= msg_channel_init_recv(req
, mem_ctx
, pchannel
);
157 struct msg_read_state
{
158 struct tevent_context
*ev
;
159 struct msg_channel
*channel
;
160 struct messaging_rec
*rec
;
163 static void msg_channel_init_got_msg(struct messaging_context
*msg
,
164 void *priv
, uint32_t msg_type
,
165 struct server_id server_id
,
168 struct msg_channel
*s
= talloc_get_type_abort(
169 priv
, struct msg_channel
);
170 struct messaging_rec
*rec
;
171 struct messaging_rec
**msgs
;
174 rec
= talloc(s
, struct messaging_rec
);
178 rec
->msg_version
= 1;
179 rec
->msg_type
= msg_type
;
180 rec
->dest
= server_id
;
181 rec
->src
= messaging_server_id(msg
);
182 rec
->buf
.data
= (uint8_t *)talloc_memdup(rec
, data
->data
,
184 if (rec
->buf
.data
== NULL
) {
187 rec
->buf
.length
= data
->length
;
189 if (s
->pending_req
!= NULL
) {
190 struct tevent_req
*req
= s
->pending_req
;
191 struct msg_read_state
*state
= tevent_req_data(
192 req
, struct msg_read_state
);
194 s
->pending_req
= NULL
;
196 state
->rec
= talloc_move(state
, &rec
);
197 tevent_req_defer_callback(req
, s
->ev
);
198 tevent_req_done(req
);
202 num_msgs
= talloc_array_length(s
->msgs
);
203 msgs
= talloc_realloc(s
, s
->msgs
, struct messaging_rec
*, num_msgs
+1);
208 s
->msgs
[num_msgs
] = talloc_move(s
->msgs
, &rec
);
215 static void msg_read_got_ctdb(struct tevent_req
*subreq
);
217 struct tevent_req
*msg_read_send(TALLOC_CTX
*mem_ctx
,
218 struct tevent_context
*ev
,
219 struct msg_channel
*channel
)
221 struct tevent_req
*req
;
222 struct msg_read_state
*state
;
226 req
= tevent_req_create(mem_ctx
, &state
, struct msg_read_state
);
231 state
->channel
= channel
;
233 if (channel
->pending_req
!= NULL
) {
234 tevent_req_error(req
, EBUSY
);
235 return tevent_req_post(req
, ev
);
238 num_msgs
= talloc_array_length(channel
->msgs
);
240 state
->rec
= talloc_move(state
, &channel
->msgs
[0]);
241 memmove(channel
->msgs
, channel
->msgs
+1,
242 sizeof(struct messaging_rec
*) * (num_msgs
-1));
243 channel
->msgs
= talloc_realloc(
244 channel
, channel
->msgs
, struct messaging_rec
*,
246 tevent_req_done(req
);
247 return tevent_req_post(req
, ev
);
250 channel
->pending_req
= req
;
253 msg_tdb_event
= messaging_tdb_event(state
, channel
->msg
, ev
);
254 if (tevent_req_nomem(msg_tdb_event
, req
)) {
255 return tevent_req_post(req
, ev
);
258 if (channel
->ctdb_channel
!= NULL
) {
259 struct tevent_req
*subreq
;
261 subreq
= ctdb_msg_read_send(state
, ev
,
262 channel
->ctdb_channel
);
263 if (tevent_req_nomem(subreq
, req
)) {
264 return tevent_req_post(req
, ev
);
266 tevent_req_set_callback(subreq
, msg_read_got_ctdb
, req
);
271 static void msg_read_got_ctdb(struct tevent_req
*subreq
)
273 struct tevent_req
*req
= tevent_req_callback_data(
274 subreq
, struct tevent_req
);
275 struct msg_read_state
*state
= tevent_req_data(
276 req
, struct msg_read_state
);
278 enum ndr_err_code ndr_err
;
281 ret
= ctdb_msg_read_recv(subreq
, talloc_tos(),
282 &blob
.data
, &blob
.length
);
284 if (tevent_req_error(req
, ret
)) {
288 state
->rec
= talloc(state
, struct messaging_rec
);
289 if (tevent_req_nomem(state
->rec
, req
)) {
293 ndr_err
= ndr_pull_struct_blob(
294 &blob
, state
->rec
, state
->rec
,
295 (ndr_pull_flags_fn_t
)ndr_pull_messaging_rec
);
297 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
298 DEBUG(1, ("ndr_pull_struct_blob failed: %s\n",
299 ndr_errstr(ndr_err
)));
300 tevent_req_error(req
, ndr_map_error2errno(ndr_err
));
303 if (DEBUGLEVEL
>= 10) {
304 NDR_PRINT_DEBUG(messaging_rec
, state
->rec
);
306 if (state
->rec
->msg_type
== state
->channel
->msg_type
) {
307 tevent_req_done(req
);
311 * Got some unexpected msg type, wait for the next one
314 TALLOC_FREE(state
->rec
);
316 subreq
= ctdb_msg_read_send(state
, state
->ev
,
317 state
->channel
->ctdb_channel
);
318 if (tevent_req_nomem(subreq
, req
)) {
321 tevent_req_set_callback(subreq
, msg_read_got_ctdb
, req
);
324 int msg_read_recv(struct tevent_req
*req
, TALLOC_CTX
*mem_ctx
,
325 struct messaging_rec
**prec
)
327 struct msg_read_state
*state
= tevent_req_data(
328 req
, struct msg_read_state
);
331 if (tevent_req_is_unix_error(req
, &err
)) {
334 *prec
= talloc_move(mem_ctx
, &state
->rec
);
335 tevent_req_received(req
);