s3: smbd: Locking - re-add pending lock records if we fail to acquire a lock (and...
[Samba.git] / source3 / lib / msg_channel.c
blob55d102ab3294cc5b320563108564e0ae56efae92
1 /*
2 Unix SMB/CIFS implementation.
3 Samba3 message channels
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "msg_channel.h"
22 #include "ctdb_conn.h"
23 #include "lib/util/tevent_unix.h"
25 struct msg_channel {
26 struct ctdb_msg_channel *ctdb_channel;
27 struct messaging_context *msg;
28 uint32_t msg_type;
30 struct tevent_req *pending_req;
31 struct tevent_context *ev;
33 struct messaging_rec **msgs;
36 struct msg_channel_init_state {
37 struct msg_channel *channel;
40 static void msg_channel_init_got_ctdb(struct tevent_req *subreq);
41 static void msg_channel_init_got_msg(struct messaging_context *msg,
42 void *priv, uint32_t msg_type,
43 struct server_id server_id, DATA_BLOB *data);
44 static int msg_channel_destructor(struct msg_channel *s);
46 struct tevent_req *msg_channel_init_send(TALLOC_CTX *mem_ctx,
47 struct tevent_context *ev,
48 struct messaging_context *msg,
49 uint32_t msg_type)
51 struct tevent_req *req, *subreq;
52 struct msg_channel_init_state *state;
53 struct server_id pid;
55 req = tevent_req_create(mem_ctx, &state,
56 struct msg_channel_init_state);
57 if (req == NULL) {
58 return NULL;
61 state->channel = talloc_zero(state, struct msg_channel);
62 if (tevent_req_nomem(state->channel, req)) {
63 return tevent_req_post(req, ev);
65 state->channel->msg = msg;
66 state->channel->msg_type = msg_type;
68 pid = messaging_server_id(msg);
69 subreq = ctdb_msg_channel_init_send(state, ev, lp_ctdbd_socket(),
70 pid.pid);
71 if (tevent_req_nomem(subreq, req)) {
72 return tevent_req_post(req, ev);
74 tevent_req_set_callback(subreq, msg_channel_init_got_ctdb, req);
75 return req;
78 static void msg_channel_init_got_ctdb(struct tevent_req *subreq)
80 struct tevent_req *req = tevent_req_callback_data(
81 subreq, struct tevent_req);
82 struct msg_channel_init_state *state = tevent_req_data(
83 req, struct msg_channel_init_state);
84 struct msg_channel *s = state->channel;
85 NTSTATUS status;
86 int ret;
88 ret = ctdb_msg_channel_init_recv(subreq, s, &s->ctdb_channel);
89 TALLOC_FREE(subreq);
91 if (ret == ENOSYS) {
92 s->ctdb_channel = NULL;
93 ret = 0;
96 if (tevent_req_error(req, ret)) {
97 return;
99 status = messaging_register(s->msg, s, s->msg_type,
100 msg_channel_init_got_msg);
101 if (!NT_STATUS_IS_OK(status)) {
102 tevent_req_error(req, map_errno_from_nt_status(status));
103 return;
105 talloc_set_destructor(s, msg_channel_destructor);
106 tevent_req_done(req);
109 static int msg_channel_destructor(struct msg_channel *s)
111 messaging_deregister(s->msg, s->msg_type, s);
112 return 0;
115 int msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
116 struct msg_channel **pchannel)
118 struct msg_channel_init_state *state = tevent_req_data(
119 req, struct msg_channel_init_state);
120 int err;
122 if (tevent_req_is_unix_error(req, &err)) {
123 return err;
125 *pchannel = talloc_move(mem_ctx, &state->channel);
126 return 0;
129 int msg_channel_init(TALLOC_CTX *mem_ctx, struct messaging_context *msg,
130 uint32_t msgtype, struct msg_channel **pchannel)
132 TALLOC_CTX *frame = talloc_stackframe();
133 struct tevent_context *ev;
134 struct tevent_req *req;
135 int err = ENOMEM;
136 bool ok;
138 ev = samba_tevent_context_init(frame);
139 if (ev == NULL) {
140 goto fail;
142 req = msg_channel_init_send(frame, ev, msg, msgtype);
143 if (req == NULL) {
144 goto fail;
146 ok = tevent_req_poll(req, ev);
147 if (!ok) {
148 err = errno;
149 goto fail;
151 err = msg_channel_init_recv(req, mem_ctx, pchannel);
152 fail:
153 TALLOC_FREE(frame);
154 return err;
157 struct msg_read_state {
158 struct tevent_context *ev;
159 struct msg_channel *channel;
160 struct messaging_rec *rec;
163 static void msg_channel_init_got_msg(struct messaging_context *msg,
164 void *priv, uint32_t msg_type,
165 struct server_id server_id,
166 DATA_BLOB *data)
168 struct msg_channel *s = talloc_get_type_abort(
169 priv, struct msg_channel);
170 struct messaging_rec *rec;
171 struct messaging_rec **msgs;
172 size_t num_msgs;
174 rec = talloc(s, struct messaging_rec);
175 if (rec == NULL) {
176 goto fail;
178 rec->msg_version = 1;
179 rec->msg_type = msg_type;
180 rec->dest = server_id;
181 rec->src = messaging_server_id(msg);
182 rec->buf.data = (uint8_t *)talloc_memdup(rec, data->data,
183 data->length);
184 if (rec->buf.data == NULL) {
185 goto fail;
187 rec->buf.length = data->length;
189 if (s->pending_req != NULL) {
190 struct tevent_req *req = s->pending_req;
191 struct msg_read_state *state = tevent_req_data(
192 req, struct msg_read_state);
194 s->pending_req = NULL;
196 state->rec = talloc_move(state, &rec);
197 tevent_req_defer_callback(req, s->ev);
198 tevent_req_done(req);
199 return;
202 num_msgs = talloc_array_length(s->msgs);
203 msgs = talloc_realloc(s, s->msgs, struct messaging_rec *, num_msgs+1);
204 if (msgs == NULL) {
205 goto fail;
207 s->msgs = msgs;
208 s->msgs[num_msgs] = talloc_move(s->msgs, &rec);
210 return;
211 fail:
212 TALLOC_FREE(rec);
215 static void msg_read_got_ctdb(struct tevent_req *subreq);
216 static int msg_read_state_destructor(struct msg_read_state *s);
218 struct tevent_req *msg_read_send(TALLOC_CTX *mem_ctx,
219 struct tevent_context *ev,
220 struct msg_channel *channel)
222 struct tevent_req *req;
223 struct msg_read_state *state;
224 void *msg_tdb_event;
225 size_t num_msgs;
227 req = tevent_req_create(mem_ctx, &state, struct msg_read_state);
228 if (req == NULL) {
229 return NULL;
231 state->ev = ev;
232 state->channel = channel;
234 if (channel->pending_req != NULL) {
235 tevent_req_error(req, EBUSY);
236 return tevent_req_post(req, ev);
239 num_msgs = talloc_array_length(channel->msgs);
240 if (num_msgs != 0) {
241 state->rec = talloc_move(state, &channel->msgs[0]);
242 memmove(channel->msgs, channel->msgs+1,
243 sizeof(struct messaging_rec *) * (num_msgs-1));
244 channel->msgs = talloc_realloc(
245 channel, channel->msgs, struct messaging_rec *,
246 num_msgs - 1);
247 tevent_req_done(req);
248 return tevent_req_post(req, ev);
251 channel->pending_req = req;
252 talloc_set_destructor(state, msg_read_state_destructor);
254 channel->ev = ev;
256 msg_tdb_event = messaging_tdb_event(state, channel->msg, ev);
257 if (tevent_req_nomem(msg_tdb_event, req)) {
258 return tevent_req_post(req, ev);
261 if (channel->ctdb_channel != NULL) {
262 struct tevent_req *subreq;
264 subreq = ctdb_msg_read_send(state, ev,
265 channel->ctdb_channel);
266 if (tevent_req_nomem(subreq, req)) {
267 return tevent_req_post(req, ev);
269 tevent_req_set_callback(subreq, msg_read_got_ctdb, req);
271 return req;
274 static int msg_read_state_destructor(struct msg_read_state *s)
276 s->channel->pending_req = NULL;
277 return 0;
280 static void msg_read_got_ctdb(struct tevent_req *subreq)
282 struct tevent_req *req = tevent_req_callback_data(
283 subreq, struct tevent_req);
284 struct msg_read_state *state = tevent_req_data(
285 req, struct msg_read_state);
286 DATA_BLOB blob;
287 enum ndr_err_code ndr_err;
288 int ret;
290 state->channel->pending_req = NULL;
292 ret = ctdb_msg_read_recv(subreq, talloc_tos(),
293 &blob.data, &blob.length);
294 TALLOC_FREE(subreq);
295 if (tevent_req_error(req, ret)) {
296 return;
299 state->rec = talloc(state, struct messaging_rec);
300 if (tevent_req_nomem(state->rec, req)) {
301 return;
304 ndr_err = ndr_pull_struct_blob(
305 &blob, state->rec, state->rec,
306 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
308 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
309 DEBUG(1, ("ndr_pull_struct_blob failed: %s\n",
310 ndr_errstr(ndr_err)));
311 tevent_req_error(req, ndr_map_error2errno(ndr_err));
312 return;
314 if (DEBUGLEVEL >= 10) {
315 NDR_PRINT_DEBUG(messaging_rec, state->rec);
317 if (state->rec->msg_type == state->channel->msg_type) {
318 tevent_req_done(req);
319 return;
322 * Got some unexpected msg type, wait for the next one
324 subreq = ctdb_msg_read_send(state, state->ev,
325 state->channel->ctdb_channel);
326 if (tevent_req_nomem(subreq, req)) {
327 return;
329 tevent_req_set_callback(subreq, msg_read_got_ctdb, req);
330 state->channel->pending_req = req;
333 int msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
334 struct messaging_rec **prec)
336 struct msg_read_state *state = tevent_req_data(
337 req, struct msg_read_state);
338 int err;
340 if (tevent_req_is_unix_error(req, &err)) {
341 return err;
343 *prec = talloc_move(mem_ctx, &state->rec);
344 tevent_req_received(req);
345 return 0;