shadow_copy2: implement disk_free
[Samba.git] / source3 / lib / msg_channel.c
blob5f6ddeb4afedea70b94bcaf7b05760bdf95c870e
1 /*
2 Unix SMB/CIFS implementation.
3 Samba3 message channels
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "msg_channel.h"
22 #include "ctdb_conn.h"
23 #include "lib/util/tevent_unix.h"
25 struct msg_channel {
26 struct ctdb_msg_channel *ctdb_channel;
27 struct messaging_context *msg;
28 uint32_t msg_type;
30 struct tevent_req *pending_req;
31 struct tevent_context *ev;
33 struct messaging_rec **msgs;
36 struct msg_channel_init_state {
37 struct msg_channel *channel;
40 static void msg_channel_init_got_ctdb(struct tevent_req *subreq);
41 static void msg_channel_init_got_msg(struct messaging_context *msg,
42 void *priv, uint32_t msg_type,
43 struct server_id server_id, DATA_BLOB *data);
44 static int msg_channel_destructor(struct msg_channel *s);
46 struct tevent_req *msg_channel_init_send(TALLOC_CTX *mem_ctx,
47 struct tevent_context *ev,
48 struct messaging_context *msg,
49 uint32_t msg_type)
51 struct tevent_req *req, *subreq;
52 struct msg_channel_init_state *state;
53 struct server_id pid;
55 req = tevent_req_create(mem_ctx, &state,
56 struct msg_channel_init_state);
57 if (req == NULL) {
58 return NULL;
61 state->channel = talloc_zero(state, struct msg_channel);
62 if (tevent_req_nomem(state->channel, req)) {
63 return tevent_req_post(req, ev);
65 state->channel->msg = msg;
66 state->channel->msg_type = msg_type;
68 pid = messaging_server_id(msg);
69 subreq = ctdb_msg_channel_init_send(state, ev, lp_ctdbd_socket(),
70 pid.pid);
71 if (tevent_req_nomem(subreq, req)) {
72 return tevent_req_post(req, ev);
74 tevent_req_set_callback(subreq, msg_channel_init_got_ctdb, req);
75 return req;
78 static void msg_channel_init_got_ctdb(struct tevent_req *subreq)
80 struct tevent_req *req = tevent_req_callback_data(
81 subreq, struct tevent_req);
82 struct msg_channel_init_state *state = tevent_req_data(
83 req, struct msg_channel_init_state);
84 struct msg_channel *s = state->channel;
85 NTSTATUS status;
86 int ret;
88 ret = ctdb_msg_channel_init_recv(subreq, s, &s->ctdb_channel);
89 TALLOC_FREE(subreq);
91 if (ret == ENOSYS) {
92 s->ctdb_channel = NULL;
93 ret = 0;
96 if (tevent_req_error(req, ret)) {
97 return;
99 status = messaging_register(s->msg, s, s->msg_type,
100 msg_channel_init_got_msg);
101 if (!NT_STATUS_IS_OK(status)) {
102 tevent_req_error(req, map_errno_from_nt_status(status));
103 return;
105 talloc_set_destructor(s, msg_channel_destructor);
106 tevent_req_done(req);
109 static int msg_channel_destructor(struct msg_channel *s)
111 messaging_deregister(s->msg, s->msg_type, s);
112 return 0;
115 int msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
116 struct msg_channel **pchannel)
118 struct msg_channel_init_state *state = tevent_req_data(
119 req, struct msg_channel_init_state);
120 int err;
122 if (tevent_req_is_unix_error(req, &err)) {
123 return err;
125 *pchannel = talloc_move(mem_ctx, &state->channel);
126 return 0;
129 int msg_channel_init(TALLOC_CTX *mem_ctx, struct messaging_context *msg,
130 uint32_t msgtype, struct msg_channel **pchannel)
132 TALLOC_CTX *frame = talloc_stackframe();
133 struct tevent_context *ev;
134 struct tevent_req *req;
135 int err = ENOMEM;
136 bool ok;
138 ev = tevent_context_init(frame);
139 if (ev == NULL) {
140 goto fail;
142 req = msg_channel_init_send(frame, ev, msg, msgtype);
143 if (req == NULL) {
144 goto fail;
146 ok = tevent_req_poll(req, ev);
147 if (!ok) {
148 err = errno;
149 goto fail;
151 err = msg_channel_init_recv(req, mem_ctx, pchannel);
152 fail:
153 TALLOC_FREE(frame);
154 return err;
157 struct msg_read_state {
158 struct tevent_context *ev;
159 struct msg_channel *channel;
160 struct messaging_rec *rec;
163 static void msg_channel_init_got_msg(struct messaging_context *msg,
164 void *priv, uint32_t msg_type,
165 struct server_id server_id,
166 DATA_BLOB *data)
168 struct msg_channel *s = talloc_get_type_abort(
169 priv, struct msg_channel);
170 struct messaging_rec *rec;
171 struct messaging_rec **msgs;
172 size_t num_msgs;
174 rec = talloc(s, struct messaging_rec);
175 if (rec == NULL) {
176 goto fail;
178 rec->msg_version = 1;
179 rec->msg_type = msg_type;
180 rec->dest = server_id;
181 rec->src = messaging_server_id(msg);
182 rec->buf.data = (uint8_t *)talloc_memdup(rec, data->data,
183 data->length);
184 if (rec->buf.data == NULL) {
185 goto fail;
187 rec->buf.length = data->length;
189 if (s->pending_req != NULL) {
190 struct tevent_req *req = s->pending_req;
191 struct msg_read_state *state = tevent_req_data(
192 req, struct msg_read_state);
194 s->pending_req = NULL;
196 state->rec = talloc_move(state, &rec);
197 tevent_req_defer_callback(req, s->ev);
198 tevent_req_done(req);
199 return;
202 num_msgs = talloc_array_length(s->msgs);
203 msgs = talloc_realloc(s, s->msgs, struct messaging_rec *, num_msgs+1);
204 if (msgs == NULL) {
205 goto fail;
207 s->msgs = msgs;
208 s->msgs[num_msgs] = talloc_move(s->msgs, &rec);
210 return;
211 fail:
212 TALLOC_FREE(rec);
215 static void msg_read_got_ctdb(struct tevent_req *subreq);
217 struct tevent_req *msg_read_send(TALLOC_CTX *mem_ctx,
218 struct tevent_context *ev,
219 struct msg_channel *channel)
221 struct tevent_req *req;
222 struct msg_read_state *state;
223 void *msg_tdb_event;
224 size_t num_msgs;
226 req = tevent_req_create(mem_ctx, &state, struct msg_read_state);
227 if (req == NULL) {
228 return NULL;
230 state->ev = ev;
231 state->channel = channel;
233 if (channel->pending_req != NULL) {
234 tevent_req_error(req, EBUSY);
235 return tevent_req_post(req, ev);
238 num_msgs = talloc_array_length(channel->msgs);
239 if (num_msgs != 0) {
240 state->rec = talloc_move(state, &channel->msgs[0]);
241 memmove(channel->msgs, channel->msgs+1,
242 sizeof(struct messaging_rec *) * (num_msgs-1));
243 channel->msgs = talloc_realloc(
244 channel, channel->msgs, struct messaging_rec *,
245 num_msgs - 1);
246 tevent_req_done(req);
247 return tevent_req_post(req, ev);
250 channel->pending_req = req;
251 channel->ev = ev;
253 msg_tdb_event = messaging_tdb_event(state, channel->msg, ev);
254 if (tevent_req_nomem(msg_tdb_event, req)) {
255 return tevent_req_post(req, ev);
258 if (channel->ctdb_channel != NULL) {
259 struct tevent_req *subreq;
261 subreq = ctdb_msg_read_send(state, ev,
262 channel->ctdb_channel);
263 if (tevent_req_nomem(subreq, req)) {
264 return tevent_req_post(req, ev);
266 tevent_req_set_callback(subreq, msg_read_got_ctdb, req);
268 return req;
271 static void msg_read_got_ctdb(struct tevent_req *subreq)
273 struct tevent_req *req = tevent_req_callback_data(
274 subreq, struct tevent_req);
275 struct msg_read_state *state = tevent_req_data(
276 req, struct msg_read_state);
277 DATA_BLOB blob;
278 enum ndr_err_code ndr_err;
279 int ret;
281 ret = ctdb_msg_read_recv(subreq, talloc_tos(),
282 &blob.data, &blob.length);
283 TALLOC_FREE(subreq);
284 if (tevent_req_error(req, ret)) {
285 return;
288 state->rec = talloc(state, struct messaging_rec);
289 if (tevent_req_nomem(state->rec, req)) {
290 return;
293 ndr_err = ndr_pull_struct_blob(
294 &blob, state->rec, state->rec,
295 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
297 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
298 DEBUG(1, ("ndr_pull_struct_blob failed: %s\n",
299 ndr_errstr(ndr_err)));
300 tevent_req_error(req, ndr_map_error2errno(ndr_err));
301 return;
303 if (DEBUGLEVEL >= 10) {
304 NDR_PRINT_DEBUG(messaging_rec, state->rec);
306 if (state->rec->msg_type == state->channel->msg_type) {
307 tevent_req_done(req);
308 return;
311 * Got some unexpected msg type, wait for the next one
313 subreq = ctdb_msg_read_send(state, state->ev,
314 state->channel->ctdb_channel);
315 if (tevent_req_nomem(subreq, req)) {
316 return;
318 tevent_req_set_callback(subreq, msg_read_got_ctdb, req);
321 int msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
322 struct messaging_rec **prec)
324 struct msg_read_state *state = tevent_req_data(
325 req, struct msg_read_state);
326 int err;
328 if (tevent_req_is_unix_error(req, &err)) {
329 return err;
331 *prec = talloc_move(mem_ctx, &state->rec);
332 tevent_req_received(req);
333 return 0;