lib: Remove messages_local
[Samba.git] / source3 / lib / messages.c
blob5db1214e4481b8d4641ce566ebb75cbf84dec96e
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
54 struct messaging_callback {
55 struct messaging_callback *prev, *next;
56 uint32 msg_type;
57 void (*fn)(struct messaging_context *msg, void *private_data,
58 uint32_t msg_type,
59 struct server_id server_id, DATA_BLOB *data);
60 void *private_data;
63 /****************************************************************************
64 A useful function for testing the message system.
65 ****************************************************************************/
67 static void ping_message(struct messaging_context *msg_ctx,
68 void *private_data,
69 uint32_t msg_type,
70 struct server_id src,
71 DATA_BLOB *data)
73 const char *msg = "none";
74 char *free_me = NULL;
76 if (data->data != NULL) {
77 free_me = talloc_strndup(talloc_tos(), (char *)data->data,
78 data->length);
79 msg = free_me;
81 DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
82 procid_str_static(&src), msg));
83 TALLOC_FREE(free_me);
84 messaging_send(msg_ctx, src, MSG_PONG, data);
87 /****************************************************************************
88 Register/replace a dispatch function for a particular message type.
89 JRA changed Dec 13 2006. Only one message handler now permitted per type.
90 *NOTE*: Dispatch functions must be able to cope with incoming
91 messages on an *odd* byte boundary.
92 ****************************************************************************/
94 struct msg_all {
95 struct messaging_context *msg_ctx;
96 int msg_type;
97 uint32 msg_flag;
98 const void *buf;
99 size_t len;
100 int n_sent;
103 /****************************************************************************
104 Send one of the messages for the broadcast.
105 ****************************************************************************/
107 static int traverse_fn(struct db_record *rec, const struct server_id *id,
108 uint32_t msg_flags, void *state)
110 struct msg_all *msg_all = (struct msg_all *)state;
111 NTSTATUS status;
113 /* Don't send if the receiver hasn't registered an interest. */
115 if((msg_flags & msg_all->msg_flag) == 0) {
116 return 0;
119 /* If the msg send fails because the pid was not found (i.e. smbd died),
120 * the msg has already been deleted from the messages.tdb.*/
122 status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
123 (const uint8_t *)msg_all->buf, msg_all->len);
125 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
128 * If the pid was not found delete the entry from
129 * serverid.tdb
132 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
134 dbwrap_record_delete(rec);
136 msg_all->n_sent++;
137 return 0;
141 * Send a message to all smbd processes.
143 * It isn't very efficient, but should be OK for the sorts of
144 * applications that use it. When we need efficient broadcast we can add
145 * it.
147 * @param n_sent Set to the number of messages sent. This should be
148 * equal to the number of processes, but be careful for races.
150 * @retval True for success.
152 bool message_send_all(struct messaging_context *msg_ctx,
153 int msg_type,
154 const void *buf, size_t len,
155 int *n_sent)
157 struct msg_all msg_all;
159 msg_all.msg_type = msg_type;
160 if (msg_type < 0x100) {
161 msg_all.msg_flag = FLAG_MSG_GENERAL;
162 } else if (msg_type > 0x100 && msg_type < 0x200) {
163 msg_all.msg_flag = FLAG_MSG_NMBD;
164 } else if (msg_type > 0x200 && msg_type < 0x300) {
165 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
166 } else if (msg_type > 0x300 && msg_type < 0x400) {
167 msg_all.msg_flag = FLAG_MSG_SMBD;
168 } else if (msg_type > 0x400 && msg_type < 0x600) {
169 msg_all.msg_flag = FLAG_MSG_WINBIND;
170 } else if (msg_type > 4000 && msg_type < 5000) {
171 msg_all.msg_flag = FLAG_MSG_DBWRAP;
172 } else {
173 return false;
176 msg_all.buf = buf;
177 msg_all.len = len;
178 msg_all.n_sent = 0;
179 msg_all.msg_ctx = msg_ctx;
181 serverid_traverse(traverse_fn, &msg_all);
182 if (n_sent)
183 *n_sent = msg_all.n_sent;
184 return true;
187 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
188 struct tevent_context *ev)
190 struct messaging_context *ctx;
191 NTSTATUS status;
193 if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
194 return NULL;
197 ctx->id = procid_self();
198 ctx->event_ctx = ev;
200 status = messaging_dgm_init(ctx, ctx, &ctx->local);
202 if (!NT_STATUS_IS_OK(status)) {
203 DEBUG(2, ("messaging_dgm_init failed: %s\n",
204 nt_errstr(status)));
205 TALLOC_FREE(ctx);
206 return NULL;
209 if (lp_clustering()) {
210 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
212 if (!NT_STATUS_IS_OK(status)) {
213 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
214 nt_errstr(status)));
215 TALLOC_FREE(ctx);
216 return NULL;
219 ctx->id.vnn = get_my_vnn();
221 messaging_register(ctx, NULL, MSG_PING, ping_message);
223 /* Register some debugging related messages */
225 register_msg_pool_usage(ctx);
226 register_dmalloc_msgs(ctx);
227 debug_register_msgs(ctx);
229 return ctx;
232 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
234 return msg_ctx->id;
238 * re-init after a fork
240 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
242 NTSTATUS status;
244 TALLOC_FREE(msg_ctx->local);
246 msg_ctx->id = procid_self();
248 status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
249 if (!NT_STATUS_IS_OK(status)) {
250 DEBUG(0, ("messaging_dgm_init failed: %s\n",
251 nt_errstr(status)));
252 return status;
255 TALLOC_FREE(msg_ctx->remote);
257 if (lp_clustering()) {
258 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
259 &msg_ctx->remote);
261 if (!NT_STATUS_IS_OK(status)) {
262 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
263 nt_errstr(status)));
264 return status;
268 return NT_STATUS_OK;
273 * Register a dispatch function for a particular message type. Allow multiple
274 * registrants
276 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
277 void *private_data,
278 uint32_t msg_type,
279 void (*fn)(struct messaging_context *msg,
280 void *private_data,
281 uint32_t msg_type,
282 struct server_id server_id,
283 DATA_BLOB *data))
285 struct messaging_callback *cb;
287 DEBUG(5, ("Registering messaging pointer for type %u - "
288 "private_data=%p\n",
289 (unsigned)msg_type, private_data));
292 * Only one callback per type
295 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
296 /* we allow a second registration of the same message
297 type if it has a different private pointer. This is
298 needed in, for example, the internal notify code,
299 which creates a new notify context for each tree
300 connect, and expects to receive messages to each of
301 them. */
302 if (cb->msg_type == msg_type && private_data == cb->private_data) {
303 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
304 (unsigned)msg_type, private_data));
305 cb->fn = fn;
306 cb->private_data = private_data;
307 return NT_STATUS_OK;
311 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
312 return NT_STATUS_NO_MEMORY;
315 cb->msg_type = msg_type;
316 cb->fn = fn;
317 cb->private_data = private_data;
319 DLIST_ADD(msg_ctx->callbacks, cb);
320 return NT_STATUS_OK;
324 De-register the function for a particular message type.
326 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
327 void *private_data)
329 struct messaging_callback *cb, *next;
331 for (cb = ctx->callbacks; cb; cb = next) {
332 next = cb->next;
333 if ((cb->msg_type == msg_type)
334 && (cb->private_data == private_data)) {
335 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
336 (unsigned)msg_type, private_data));
337 DLIST_REMOVE(ctx->callbacks, cb);
338 TALLOC_FREE(cb);
343 struct messaging_selfsend_state {
344 struct messaging_context *msg;
345 struct messaging_rec rec;
348 static void messaging_trigger_self(struct tevent_context *ev,
349 struct tevent_immediate *im,
350 void *private_data);
353 Send a message to a particular server
355 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
356 struct server_id server, uint32_t msg_type,
357 const DATA_BLOB *data)
359 if (server_id_is_disconnected(&server)) {
360 return NT_STATUS_INVALID_PARAMETER_MIX;
363 if (!procid_is_local(&server)) {
364 return msg_ctx->remote->send_fn(msg_ctx, server,
365 msg_type, data,
366 msg_ctx->remote);
369 if (server_id_equal(&msg_ctx->id, &server)) {
370 struct messaging_selfsend_state *state;
371 struct tevent_immediate *im;
373 state = talloc_pooled_object(
374 msg_ctx, struct messaging_selfsend_state,
375 1, data->length);
376 if (state == NULL) {
377 return NT_STATUS_NO_MEMORY;
379 state->msg = msg_ctx;
380 state->rec.msg_version = MESSAGE_VERSION;
381 state->rec.msg_type = msg_type & MSG_TYPE_MASK;
382 state->rec.dest = server;
383 state->rec.src = msg_ctx->id;
385 /* Can't fail, it's a pooled_object */
386 state->rec.buf = data_blob_talloc(
387 state, data->data, data->length);
389 im = tevent_create_immediate(state);
390 if (im == NULL) {
391 TALLOC_FREE(state);
392 return NT_STATUS_NO_MEMORY;
395 tevent_schedule_immediate(im, msg_ctx->event_ctx,
396 messaging_trigger_self, state);
397 return NT_STATUS_OK;
400 return msg_ctx->local->send_fn(msg_ctx, server, msg_type, data,
401 msg_ctx->local);
404 static void messaging_trigger_self(struct tevent_context *ev,
405 struct tevent_immediate *im,
406 void *private_data)
408 struct messaging_selfsend_state *state = talloc_get_type_abort(
409 private_data, struct messaging_selfsend_state);
410 messaging_dispatch_rec(state->msg, &state->rec);
411 TALLOC_FREE(state);
414 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
415 struct server_id server, uint32_t msg_type,
416 const uint8_t *buf, size_t len)
418 DATA_BLOB blob = data_blob_const(buf, len);
419 return messaging_send(msg_ctx, server, msg_type, &blob);
422 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
423 struct messaging_rec *rec)
425 struct messaging_rec *result;
427 result = talloc_pooled_object(mem_ctx, struct messaging_rec,
428 1, rec->buf.length);
429 if (result == NULL) {
430 return NULL;
432 *result = *rec;
434 /* Doesn't fail, see talloc_pooled_object */
436 result->buf.data = talloc_memdup(result, rec->buf.data,
437 rec->buf.length);
438 return result;
441 struct messaging_read_state {
442 struct tevent_context *ev;
443 struct messaging_context *msg_ctx;
444 uint32_t msg_type;
445 struct messaging_rec *rec;
448 static void messaging_read_cleanup(struct tevent_req *req,
449 enum tevent_req_state req_state);
451 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
452 struct tevent_context *ev,
453 struct messaging_context *msg_ctx,
454 uint32_t msg_type)
456 struct tevent_req *req;
457 struct messaging_read_state *state;
458 size_t waiters_len;
460 req = tevent_req_create(mem_ctx, &state,
461 struct messaging_read_state);
462 if (req == NULL) {
463 return NULL;
465 state->ev = ev;
466 state->msg_ctx = msg_ctx;
467 state->msg_type = msg_type;
469 waiters_len = talloc_array_length(msg_ctx->waiters);
471 if (waiters_len == msg_ctx->num_waiters) {
472 struct tevent_req **tmp;
474 tmp = talloc_realloc(msg_ctx, msg_ctx->waiters,
475 struct tevent_req *, waiters_len+1);
476 if (tevent_req_nomem(tmp, req)) {
477 return tevent_req_post(req, ev);
479 msg_ctx->waiters = tmp;
482 msg_ctx->waiters[msg_ctx->num_waiters] = req;
483 msg_ctx->num_waiters += 1;
484 tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
486 return req;
489 static void messaging_read_cleanup(struct tevent_req *req,
490 enum tevent_req_state req_state)
492 struct messaging_read_state *state = tevent_req_data(
493 req, struct messaging_read_state);
494 struct messaging_context *msg_ctx = state->msg_ctx;
495 struct tevent_req **waiters = msg_ctx->waiters;
496 unsigned i;
498 tevent_req_set_cleanup_fn(req, NULL);
500 for (i=0; i<msg_ctx->num_waiters; i++) {
501 if (waiters[i] == req) {
502 waiters[i] = waiters[msg_ctx->num_waiters-1];
503 msg_ctx->num_waiters -= 1;
504 return;
509 static void messaging_read_done(struct tevent_req *req, struct messaging_rec *rec)
511 struct messaging_read_state *state = tevent_req_data(
512 req, struct messaging_read_state);
514 state->rec = messaging_rec_dup(state, rec);
515 if (tevent_req_nomem(state->rec, req)) {
516 return;
518 tevent_req_done(req);
521 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
522 struct messaging_rec **presult)
524 struct messaging_read_state *state = tevent_req_data(
525 req, struct messaging_read_state);
526 int err;
528 if (tevent_req_is_unix_error(req, &err)) {
529 tevent_req_received(req);
530 return err;
532 *presult = talloc_move(mem_ctx, &state->rec);
533 return 0;
537 Dispatch one messaging_rec
539 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
540 struct messaging_rec *rec)
542 struct messaging_callback *cb, *next;
543 unsigned i;
545 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
546 next = cb->next;
547 if (cb->msg_type == rec->msg_type) {
548 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
549 rec->src, &rec->buf);
550 /* we continue looking for matching messages
551 after finding one. This matters for
552 subsystems like the internal notify code
553 which register more than one handler for
554 the same message type */
558 for (i=0; i<msg_ctx->num_waiters; i++) {
559 struct tevent_req *req = msg_ctx->waiters[i];
560 struct messaging_read_state *state = tevent_req_data(
561 req, struct messaging_read_state);
563 if (state->msg_type == rec->msg_type) {
564 messaging_read_done(req, rec);
567 return;
570 /** @} **/