librpc: Shorten dcerpc_binding_handle_call a bit
[Samba/gebeck_regimport.git] / source3 / lib / ctdb_conn.c
blob90930eb86b0193890f7c3a82599e05694270c442
1 /*
2 Unix SMB/CIFS implementation.
3 Samba3 ctdb connection handling
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
24 #ifdef CLUSTER_SUPPORT
26 #include "lib/async_req/async_sock.h"
28 struct ctdb_conn {
29 int fd;
30 struct tevent_queue *outqueue;
33 struct ctdb_conn_init_state {
34 struct sockaddr_un addr;
35 struct ctdb_conn *conn;
39 * use the callbacks of async_connect_send to make sure
40 * we are connecting to CTDB as root
42 static void before_connect_cb(void *private_data) {
43 become_root();
46 static void after_connect_cb(void *private_data) {
47 unbecome_root();
50 static void ctdb_conn_init_done(struct tevent_req *subreq);
51 static int ctdb_conn_destructor(struct ctdb_conn *conn);
53 struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
54 struct tevent_context *ev,
55 const char *sock)
57 struct tevent_req *req, *subreq;
58 struct ctdb_conn_init_state *state;
60 req = tevent_req_create(mem_ctx, &state, struct ctdb_conn_init_state);
61 if (req == NULL) {
62 return NULL;
65 if (!lp_clustering()) {
66 tevent_req_error(req, ENOSYS);
67 return tevent_req_post(req, ev);
70 if (strlen(sock) >= sizeof(state->addr.sun_path)) {
71 tevent_req_error(req, ENAMETOOLONG);
72 return tevent_req_post(req, ev);
75 state->conn = talloc(state, struct ctdb_conn);
76 if (tevent_req_nomem(state->conn, req)) {
77 return tevent_req_post(req, ev);
80 state->conn->outqueue = tevent_queue_create(
81 state->conn, "ctdb outqueue");
82 if (tevent_req_nomem(state->conn->outqueue, req)) {
83 return tevent_req_post(req, ev);
86 state->conn->fd = socket(AF_UNIX, SOCK_STREAM, 0);
87 if (state->conn->fd == -1) {
88 tevent_req_error(req, errno);
89 return tevent_req_post(req, ev);
91 talloc_set_destructor(state->conn, ctdb_conn_destructor);
93 state->addr.sun_family = AF_UNIX;
94 strncpy(state->addr.sun_path, sock, sizeof(state->addr.sun_path));
96 subreq = async_connect_send(state, ev, state->conn->fd,
97 (struct sockaddr *)&state->addr,
98 sizeof(state->addr), before_connect_cb,
99 after_connect_cb, NULL);
100 if (tevent_req_nomem(subreq, req)) {
101 return tevent_req_post(req, ev);
103 tevent_req_set_callback(subreq, ctdb_conn_init_done, req);
104 return req;
107 static int ctdb_conn_destructor(struct ctdb_conn *c)
109 if (c->fd != -1) {
110 close(c->fd);
111 c->fd = -1;
113 return 0;
116 static void ctdb_conn_init_done(struct tevent_req *subreq)
118 struct tevent_req *req = tevent_req_callback_data(
119 subreq, struct tevent_req);
120 int ret, err;
122 ret = async_connect_recv(subreq, &err);
123 TALLOC_FREE(subreq);
124 if (ret == -1) {
125 tevent_req_error(req, err);
126 return;
128 tevent_req_done(req);
131 int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
132 struct ctdb_conn **pconn)
134 struct ctdb_conn_init_state *state = tevent_req_data(
135 req, struct ctdb_conn_init_state);
136 int err;
138 if (tevent_req_is_unix_error(req, &err)) {
139 return err;
141 *pconn = talloc_move(mem_ctx, &state->conn);
143 return 0;
146 struct ctdb_conn_control_state {
147 struct tevent_context *ev;
148 struct ctdb_conn *conn;
149 struct ctdb_req_control req;
150 struct iovec iov[2];
151 struct ctdb_reply_control *reply;
154 static void ctdb_conn_control_written(struct tevent_req *subreq);
155 static void ctdb_conn_control_done(struct tevent_req *subreq);
156 static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p);
158 struct tevent_req *ctdb_conn_control_send(TALLOC_CTX *mem_ctx,
159 struct tevent_context *ev,
160 struct ctdb_conn *conn,
161 uint32_t vnn, uint32_t opcode,
162 uint64_t srvid, uint32_t flags,
163 uint8_t *data, size_t datalen)
165 struct tevent_req *req, *subreq;
166 struct ctdb_conn_control_state *state;
167 struct ctdb_req_header *hdr;
169 req = tevent_req_create(mem_ctx, &state,
170 struct ctdb_conn_control_state);
171 if (req == NULL) {
172 return NULL;
174 state->ev = ev;
175 state->conn = conn;
177 hdr = &state->req.hdr;
178 hdr->length = offsetof(struct ctdb_req_control, data) + datalen;
179 hdr->ctdb_magic = CTDB_MAGIC;
180 hdr->ctdb_version = CTDB_VERSION;
181 hdr->operation = CTDB_REQ_CONTROL;
182 hdr->reqid = 1; /* FIXME */
183 hdr->destnode = vnn;
184 state->req.opcode = opcode;
185 state->req.srvid = srvid;
186 state->req.datalen = datalen;
187 state->req.flags = flags;
189 state->iov[0].iov_base = &state->req;
190 state->iov[0].iov_len = offsetof(struct ctdb_req_control, data);
191 state->iov[1].iov_base = data;
192 state->iov[1].iov_len = datalen;
194 subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
195 state->iov, 2);
196 if (tevent_req_nomem(subreq, req)) {
197 return tevent_req_post(req, ev);
199 tevent_req_set_callback(subreq, ctdb_conn_control_written, req);
200 return req;
203 static void ctdb_conn_control_written(struct tevent_req *subreq)
205 struct tevent_req *req = tevent_req_callback_data(
206 subreq, struct tevent_req);
207 struct ctdb_conn_control_state *state = tevent_req_data(
208 req, struct ctdb_conn_control_state);
209 ssize_t written;
210 int err;
212 written = writev_recv(subreq, &err);
213 TALLOC_FREE(subreq);
214 if (written == -1) {
215 tevent_req_error(req, err);
216 return;
218 subreq = read_packet_send(
219 state, state->ev, state->conn->fd, sizeof(uint32_t),
220 ctdb_packet_more, NULL);
221 if (tevent_req_nomem(subreq, req)) {
222 return;
224 tevent_req_set_callback(subreq, ctdb_conn_control_done, req);
227 static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p)
229 uint32_t len;
231 if (buflen > sizeof(uint32_t)) {
232 /* Been here, done */
233 return 0;
235 memcpy(&len, buf, sizeof(len));
236 return (len - sizeof(uint32_t));
239 static void ctdb_conn_control_done(struct tevent_req *subreq)
241 struct tevent_req *req = tevent_req_callback_data(
242 subreq, struct tevent_req);
243 struct ctdb_conn_control_state *state = tevent_req_data(
244 req, struct ctdb_conn_control_state);
245 ssize_t nread;
246 uint8_t *buf;
247 int err;
249 nread = read_packet_recv(subreq, state, &buf, &err);
250 TALLOC_FREE(subreq);
251 if (nread == -1) {
252 tevent_req_error(req, err);
253 return;
255 state->reply = (struct ctdb_reply_control *)buf;
256 tevent_req_done(req);
259 int ctdb_conn_control_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
260 struct ctdb_reply_control **preply)
262 struct ctdb_conn_control_state *state = tevent_req_data(
263 req, struct ctdb_conn_control_state);
264 int err;
266 if (tevent_req_is_unix_error(req, &err)) {
267 return err;
269 if (preply != NULL) {
270 *preply = talloc_move(mem_ctx, &state->reply);
272 return 0;
275 struct ctdb_conn_msg_write_state {
276 struct ctdb_req_message ctdb_msg;
277 struct iovec iov[2];
280 static void ctdb_conn_msg_write_done(struct tevent_req *subreq);
282 struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
283 struct tevent_context *ev,
284 struct ctdb_conn *conn,
285 uint32_t vnn, uint64_t srvid,
286 uint8_t *msg, size_t msg_len)
288 struct tevent_req *req, *subreq;
289 struct ctdb_conn_msg_write_state *state;
290 struct ctdb_req_header *h;
292 req = tevent_req_create(mem_ctx, &state,
293 struct ctdb_conn_msg_write_state);
294 if (req == NULL) {
295 return NULL;
298 h = &state->ctdb_msg.hdr;
300 h->length = offsetof(struct ctdb_req_message, data) + msg_len;
301 h->ctdb_magic = CTDB_MAGIC;
302 h->ctdb_version = CTDB_VERSION;
303 h->generation = 1;
304 h->operation = CTDB_REQ_MESSAGE;
305 h->destnode = vnn;
306 h->srcnode = CTDB_CURRENT_NODE;
307 h->reqid = 0;
308 state->ctdb_msg.srvid = srvid;
309 state->ctdb_msg.datalen = msg_len;
311 state->iov[0].iov_base = &state->ctdb_msg;
312 state->iov[0].iov_len = offsetof(struct ctdb_req_message, data);
313 state->iov[1].iov_base = msg;
314 state->iov[1].iov_len = msg_len;
316 subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
317 state->iov, 2);
318 if (tevent_req_nomem(subreq, req)) {
319 return tevent_req_post(req, ev);
321 tevent_req_set_callback(subreq, ctdb_conn_msg_write_done, req);
322 return req;
325 static void ctdb_conn_msg_write_done(struct tevent_req *subreq)
327 struct tevent_req *req = tevent_req_callback_data(
328 subreq, struct tevent_req);
329 ssize_t written;
330 int err;
332 written = writev_recv(subreq, &err);
333 TALLOC_FREE(subreq);
334 if (written == -1) {
335 tevent_req_error(req, err);
336 return;
338 tevent_req_done(req);
341 int ctdb_conn_msg_write_recv(struct tevent_req *req)
343 int err;
344 if (tevent_req_is_unix_error(req, &err)) {
345 return err;
347 return 0;
350 struct ctdb_msg_channel {
351 struct ctdb_conn *conn;
354 struct ctdb_msg_channel_init_state {
355 struct tevent_context *ev;
356 struct ctdb_conn *conn;
357 uint64_t srvid;
358 struct ctdb_msg_channel *channel;
361 static void ctdb_msg_channel_init_connected(struct tevent_req *subreq);
362 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq);
364 struct tevent_req *ctdb_msg_channel_init_send(
365 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
366 const char *sock, uint64_t srvid)
368 struct tevent_req *req, *subreq;
369 struct ctdb_msg_channel_init_state *state;
371 req = tevent_req_create(mem_ctx, &state,
372 struct ctdb_msg_channel_init_state);
373 if (req == NULL) {
374 return NULL;
376 state->ev = ev;
377 state->srvid = srvid;
379 subreq = ctdb_conn_init_send(state, ev, sock);
380 if (tevent_req_nomem(subreq, req)) {
381 return tevent_req_post(req, ev);
383 tevent_req_set_callback(subreq, ctdb_msg_channel_init_connected, req);
384 return req;
387 static void ctdb_msg_channel_init_connected(struct tevent_req *subreq)
389 struct tevent_req *req = tevent_req_callback_data(
390 subreq, struct tevent_req);
391 struct ctdb_msg_channel_init_state *state = tevent_req_data(
392 req, struct ctdb_msg_channel_init_state);
393 int ret;
395 ret = ctdb_conn_init_recv(subreq, state, &state->conn);
396 TALLOC_FREE(subreq);
397 if (tevent_req_error(req, ret)) {
398 return;
400 subreq = ctdb_conn_control_send(state, state->ev, state->conn,
401 CTDB_CURRENT_NODE,
402 CTDB_CONTROL_REGISTER_SRVID,
403 state->srvid, 0, NULL, 0);
404 if (tevent_req_nomem(subreq, req)) {
405 return;
407 tevent_req_set_callback(
408 subreq, ctdb_msg_channel_init_registered_srvid, req);
411 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq)
413 struct tevent_req *req = tevent_req_callback_data(
414 subreq, struct tevent_req);
415 struct ctdb_msg_channel_init_state *state = tevent_req_data(
416 req, struct ctdb_msg_channel_init_state);
417 struct ctdb_reply_control *reply;
418 int ret;
420 ret = ctdb_conn_control_recv(subreq, talloc_tos(), &reply);
421 TALLOC_FREE(subreq);
422 if (tevent_req_error(req, ret)) {
423 return;
425 if (reply->status != 0) {
426 tevent_req_error(req, EIO);
427 return;
429 state->channel = talloc(state, struct ctdb_msg_channel);
430 if (tevent_req_nomem(state->channel, req)) {
431 return;
433 state->channel->conn = talloc_move(state->channel, &state->conn);
434 tevent_req_done(req);
437 int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
438 struct ctdb_msg_channel **pchannel)
440 struct ctdb_msg_channel_init_state *state = tevent_req_data(
441 req, struct ctdb_msg_channel_init_state);
442 int err;
444 if (tevent_req_is_unix_error(req, &err)) {
445 return err;
447 *pchannel = talloc_move(mem_ctx, &state->channel);
448 return 0;
451 struct ctdb_msg_read_state {
452 size_t buflen;
453 uint8_t *buf;
456 static void ctdb_msg_channel_got_msg(struct tevent_req *subreq);
458 struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
459 struct tevent_context *ev,
460 struct ctdb_msg_channel *channel)
462 struct tevent_req *req, *subreq;
463 struct ctdb_msg_read_state *state;
465 req = tevent_req_create(mem_ctx, &state,
466 struct ctdb_msg_read_state);
467 if (req == NULL) {
468 return NULL;
470 subreq = read_packet_send(state, ev, channel->conn->fd,
471 sizeof(uint32_t), ctdb_packet_more, NULL);
472 if (tevent_req_nomem(subreq, req)) {
473 return tevent_req_post(req, ev);
475 tevent_req_set_callback(subreq, ctdb_msg_channel_got_msg, req);
476 return req;
479 static void ctdb_msg_channel_got_msg(struct tevent_req *subreq)
481 struct tevent_req *req = tevent_req_callback_data(
482 subreq, struct tevent_req);
483 struct ctdb_msg_read_state *state = tevent_req_data(
484 req, struct ctdb_msg_read_state);
485 ssize_t nread;
486 uint8_t *buf;
487 int err;
489 nread = read_packet_recv(subreq, state, &buf, &err);
490 if (nread == -1) {
491 tevent_req_error(req, err);
492 return;
494 state->buflen = nread;
495 state->buf = buf;
496 tevent_req_done(req);
499 int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
500 uint8_t **pmsg, size_t *pmsg_len)
502 struct ctdb_msg_read_state *state = tevent_req_data(
503 req, struct ctdb_msg_read_state);
504 struct ctdb_req_header *hdr;
505 struct ctdb_req_message *msg;
506 uint8_t *buf;
507 int err;
509 if (tevent_req_is_unix_error(req, &err)) {
510 return err;
513 hdr = (struct ctdb_req_header *)state->buf;
514 if (hdr->length != state->buflen) {
515 DEBUG(10, ("Got invalid header length\n"));
516 return EIO;
518 if (hdr->operation != CTDB_REQ_MESSAGE) {
519 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
520 CTDB_REQ_MESSAGE, (int)hdr->operation));
521 return EIO;
523 if (hdr->length < offsetof(struct ctdb_req_message, data)) {
524 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr->length));
525 return EIO;
528 msg = (struct ctdb_req_message *)hdr;
529 if (msg->datalen >
530 hdr->length - offsetof(struct ctdb_req_message, data)) {
531 DEBUG(10, ("Got invalid datalen %d\n", (int)msg->datalen));
532 return EIO;
535 buf = (uint8_t *)talloc_memdup(mem_ctx, msg->data, msg->datalen);
536 if (buf == NULL) {
537 return ENOMEM;
539 *pmsg = buf;
540 *pmsg_len = msg->datalen;
541 return 0;
544 #else
546 struct dummy_state {
547 uint8_t dummy;
550 static struct tevent_req *dummy_send(TALLOC_CTX *mem_ctx,
551 struct tevent_context *ev)
553 struct tevent_req *req;
554 struct dummy_state *state;
555 req = tevent_req_create(mem_ctx, &state, struct dummy_state);
556 if (req == NULL) {
557 return NULL;
559 tevent_req_done(req);
560 return tevent_req_post(req, ev);
563 struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
564 struct tevent_context *ev,
565 const char *sock)
567 return dummy_send(mem_ctx, ev);
570 int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
571 struct ctdb_conn **pconn)
573 return ENOSYS;
576 struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
577 struct tevent_context *ev,
578 struct ctdb_conn *conn,
579 uint32_t vnn, uint64_t srvid,
580 uint8_t *msg, size_t msg_len)
582 return dummy_send(mem_ctx, ev);
585 int ctdb_conn_msg_write_recv(struct tevent_req *req)
587 return ENOSYS;
590 struct tevent_req *ctdb_msg_channel_init_send(
591 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
592 const char *sock, uint64_t srvid)
594 return dummy_send(mem_ctx, ev);
597 int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
598 struct ctdb_msg_channel **pchannel)
600 return ENOSYS;
603 struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
604 struct tevent_context *ev,
605 struct ctdb_msg_channel *channel)
607 return dummy_send(mem_ctx, ev);
610 int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
611 uint8_t **pmsg, size_t *pmsg_len)
613 return ENOSYS;
616 #endif