r21115: notify_internal.c needs to remove the table entry if a process has crashed. So
[Samba/nascimento.git] / source / lib / messages.c
blobe0bf86a46cee38fd564638bd5d519ce910e8a8c2
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
57 struct message_rec {
58 int msg_version;
59 int msg_type;
60 struct process_id dest;
61 struct process_id src;
62 size_t len;
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67 struct dispatch_fns *next, *prev;
68 int msg_type;
69 void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len,
70 void *private_data);
71 void *private_data;
72 } *dispatch_fns;
74 /****************************************************************************
75 Free global objects.
76 ****************************************************************************/
78 void gfree_messages(void)
80 struct dispatch_fns *dfn, *next;
82 /* delete the dispatch_fns list */
83 dfn = dispatch_fns;
84 while( dfn ) {
85 next = dfn->next;
86 DLIST_REMOVE(dispatch_fns, dfn);
87 SAFE_FREE(dfn);
88 dfn = next;
92 /****************************************************************************
93 Notifications come in as signals.
94 ****************************************************************************/
96 static void sig_usr1(void)
98 received_signal = 1;
99 sys_select_signal(SIGUSR1);
102 /****************************************************************************
103 A useful function for testing the message system.
104 ****************************************************************************/
106 static void ping_message(int msg_type, struct process_id src,
107 void *buf, size_t len, void *private_data)
109 const char *msg = buf ? (const char *)buf : "none";
111 DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
112 procid_str_static(&src), msg));
113 message_send_pid(src, MSG_PONG, buf, len, True);
116 /****************************************************************************
117 Initialise the messaging functions.
118 ****************************************************************************/
120 BOOL message_init(void)
122 sec_init();
124 if (tdb)
125 return True;
127 tdb = tdb_open_log(lock_path("messages.tdb"),
128 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT,
129 O_RDWR|O_CREAT,0600);
131 if (!tdb) {
132 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
133 return False;
136 CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
138 message_register(MSG_PING, ping_message, NULL);
140 /* Register some debugging related messages */
142 register_msg_pool_usage();
143 register_dmalloc_msgs();
145 return True;
148 /*******************************************************************
149 Form a static tdb key from a pid.
150 ******************************************************************/
152 static TDB_DATA message_key_pid(struct process_id pid)
154 static char key[20];
155 TDB_DATA kbuf;
157 slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
159 kbuf.dptr = (char *)key;
160 kbuf.dsize = strlen(key)+1;
161 return kbuf;
164 /****************************************************************************
165 Notify a process that it has a message. If the process doesn't exist
166 then delete its record in the database.
167 ****************************************************************************/
169 static NTSTATUS message_notify(struct process_id procid)
171 pid_t pid = procid.pid;
172 int ret;
173 uid_t euid = geteuid();
176 * Doing kill with a non-positive pid causes messages to be
177 * sent to places we don't want.
180 SMB_ASSERT(pid > 0);
182 if (euid != 0) {
183 become_root_uid_only();
186 ret = kill(pid, SIGUSR1);
188 if (euid != 0) {
189 unbecome_root_uid_only();
192 if (ret == -1) {
193 if (errno == ESRCH) {
194 DEBUG(2,("pid %d doesn't exist - deleting messages record\n",
195 (int)pid));
196 tdb_delete(tdb, message_key_pid(procid));
199 * INVALID_HANDLE is the closest I can think of -- vl
201 return NT_STATUS_INVALID_HANDLE;
204 DEBUG(2,("message to process %d failed - %s\n", (int)pid,
205 strerror(errno)));
208 * No call to map_nt_error_from_unix -- don't want to link in
209 * errormap.o into lots of utils.
212 if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
213 if (errno == EPERM) return NT_STATUS_ACCESS_DENIED;
214 return NT_STATUS_UNSUCCESSFUL;
217 return NT_STATUS_OK;
220 /****************************************************************************
221 Send a message to a particular pid.
222 ****************************************************************************/
224 static NTSTATUS message_send_pid_internal(struct process_id pid, int msg_type,
225 const void *buf, size_t len,
226 BOOL duplicates_allowed,
227 unsigned int timeout)
229 TDB_DATA kbuf;
230 TDB_DATA dbuf;
231 TDB_DATA old_dbuf;
232 struct message_rec rec;
233 char *ptr;
234 struct message_rec prec;
236 /* NULL pointer means implicit length zero. */
237 if (!buf) {
238 SMB_ASSERT(len == 0);
242 * Doing kill with a non-positive pid causes messages to be
243 * sent to places we don't want.
246 SMB_ASSERT(procid_to_pid(&pid) > 0);
248 rec.msg_version = MESSAGE_VERSION;
249 rec.msg_type = msg_type;
250 rec.dest = pid;
251 rec.src = procid_self();
252 rec.len = buf ? len : 0;
254 kbuf = message_key_pid(pid);
256 dbuf.dptr = (char *)SMB_MALLOC(len + sizeof(rec));
257 if (!dbuf.dptr) {
258 return NT_STATUS_NO_MEMORY;
261 memcpy(dbuf.dptr, &rec, sizeof(rec));
262 if (len > 0 && buf)
263 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
265 dbuf.dsize = len + sizeof(rec);
267 if (duplicates_allowed) {
269 /* If duplicates are allowed we can just append the message and return. */
271 /* lock the record for the destination */
272 if (timeout) {
273 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
274 DEBUG(0,("message_send_pid_internal: failed to get "
275 "chainlock with timeout %ul.\n", timeout));
276 return NT_STATUS_IO_TIMEOUT;
278 } else {
279 if (tdb_chainlock(tdb, kbuf) == -1) {
280 DEBUG(0,("message_send_pid_internal: failed to get "
281 "chainlock.\n"));
282 return NT_STATUS_LOCK_NOT_GRANTED;
285 tdb_append(tdb, kbuf, dbuf);
286 tdb_chainunlock(tdb, kbuf);
288 SAFE_FREE(dbuf.dptr);
289 errno = 0; /* paranoia */
290 return message_notify(pid);
293 /* lock the record for the destination */
294 if (timeout) {
295 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
296 DEBUG(0,("message_send_pid_internal: failed to get chainlock "
297 "with timeout %ul.\n", timeout));
298 return NT_STATUS_IO_TIMEOUT;
300 } else {
301 if (tdb_chainlock(tdb, kbuf) == -1) {
302 DEBUG(0,("message_send_pid_internal: failed to get "
303 "chainlock.\n"));
304 return NT_STATUS_LOCK_NOT_GRANTED;
308 old_dbuf = tdb_fetch(tdb, kbuf);
310 if (!old_dbuf.dptr) {
311 /* its a new record */
313 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
314 tdb_chainunlock(tdb, kbuf);
316 SAFE_FREE(dbuf.dptr);
317 errno = 0; /* paranoia */
318 return message_notify(pid);
321 /* Not a new record. Check for duplicates. */
323 for(ptr = (char *)old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
325 * First check if the message header matches, then, if it's a non-zero
326 * sized message, check if the data matches. If so it's a duplicate and
327 * we can discard it. JRA.
330 if (!memcmp(ptr, &rec, sizeof(rec))) {
331 if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
332 tdb_chainunlock(tdb, kbuf);
333 DEBUG(10,("message_send_pid_internal: discarding "
334 "duplicate message.\n"));
335 SAFE_FREE(dbuf.dptr);
336 SAFE_FREE(old_dbuf.dptr);
337 return NT_STATUS_OK;
340 memcpy(&prec, ptr, sizeof(prec));
341 ptr += sizeof(rec) + prec.len;
344 /* we're adding to an existing entry */
346 tdb_append(tdb, kbuf, dbuf);
347 tdb_chainunlock(tdb, kbuf);
349 SAFE_FREE(old_dbuf.dptr);
350 SAFE_FREE(dbuf.dptr);
352 errno = 0; /* paranoia */
353 return message_notify(pid);
356 /****************************************************************************
357 Send a message to a particular pid - no timeout.
358 ****************************************************************************/
360 NTSTATUS message_send_pid(struct process_id pid, int msg_type, const void *buf,
361 size_t len, BOOL duplicates_allowed)
363 return message_send_pid_internal(pid, msg_type, buf, len,
364 duplicates_allowed, 0);
367 /****************************************************************************
368 Send a message to a particular pid, with timeout in seconds.
369 ****************************************************************************/
371 NTSTATUS message_send_pid_with_timeout(struct process_id pid, int msg_type,
372 const void *buf, size_t len,
373 BOOL duplicates_allowed, unsigned int timeout)
375 return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed,
376 timeout);
379 /****************************************************************************
380 Count the messages pending for a particular pid. Expensive....
381 ****************************************************************************/
383 unsigned int messages_pending_for_pid(struct process_id pid)
385 TDB_DATA kbuf;
386 TDB_DATA dbuf;
387 char *buf;
388 unsigned int message_count = 0;
390 kbuf = message_key_pid(pid);
392 dbuf = tdb_fetch(tdb, kbuf);
393 if (dbuf.dptr == NULL || dbuf.dsize == 0) {
394 SAFE_FREE(dbuf.dptr);
395 return 0;
398 for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
399 struct message_rec rec;
400 memcpy(&rec, buf, sizeof(rec));
401 buf += (sizeof(rec) + rec.len);
402 dbuf.dsize -= (sizeof(rec) + rec.len);
403 message_count++;
406 SAFE_FREE(dbuf.dptr);
407 return message_count;
410 /****************************************************************************
411 Retrieve all messages for the current process.
412 ****************************************************************************/
414 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
416 TDB_DATA kbuf;
417 TDB_DATA dbuf;
418 TDB_DATA null_dbuf;
420 ZERO_STRUCT(null_dbuf);
422 *msgs_buf = NULL;
423 *total_len = 0;
425 kbuf = message_key_pid(pid_to_procid(sys_getpid()));
427 if (tdb_chainlock(tdb, kbuf) == -1)
428 return False;
430 dbuf = tdb_fetch(tdb, kbuf);
432 * Replace with an empty record to keep the allocated
433 * space in the tdb.
435 tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
436 tdb_chainunlock(tdb, kbuf);
438 if (dbuf.dptr == NULL || dbuf.dsize == 0) {
439 SAFE_FREE(dbuf.dptr);
440 return False;
443 *msgs_buf = dbuf.dptr;
444 *total_len = dbuf.dsize;
446 return True;
449 /****************************************************************************
450 Parse out the next message for the current process.
451 ****************************************************************************/
453 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
454 struct process_id *src, char **buf, size_t *len)
456 struct message_rec rec;
457 char *ret_buf = *buf;
459 *buf = NULL;
460 *len = 0;
462 if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
463 return False;
465 memcpy(&rec, ret_buf, sizeof(rec));
466 ret_buf += sizeof(rec);
468 if (rec.msg_version != MESSAGE_VERSION) {
469 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
470 return False;
473 if (rec.len > 0) {
474 if (total_len - (ret_buf - msgs_buf) < rec.len)
475 return False;
478 *len = rec.len;
479 *msg_type = rec.msg_type;
480 *src = rec.src;
481 *buf = ret_buf;
483 return True;
486 /****************************************************************************
487 Receive and dispatch any messages pending for this process.
488 JRA changed Dec 13 2006. Only one message handler now permitted per type.
489 *NOTE*: Dispatch functions must be able to cope with incoming
490 messages on an *odd* byte boundary.
491 ****************************************************************************/
493 void message_dispatch(void)
495 int msg_type;
496 struct process_id src;
497 char *buf;
498 char *msgs_buf;
499 size_t len, total_len;
500 int n_handled;
502 if (!received_signal)
503 return;
505 DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
507 received_signal = 0;
509 if (!retrieve_all_messages(&msgs_buf, &total_len))
510 return;
512 for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
513 struct dispatch_fns *dfn;
515 DEBUG(10,("message_dispatch: received msg_type=%d "
516 "src_pid=%u\n", msg_type,
517 (unsigned int) procid_to_pid(&src)));
519 n_handled = 0;
520 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
521 if (dfn->msg_type == msg_type) {
522 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
523 dfn->fn(msg_type, src,
524 len ? (void *)buf : NULL, len,
525 dfn->private_data);
526 n_handled++;
527 break;
530 if (!n_handled) {
531 DEBUG(5,("message_dispatch: warning: no handler registed for "
532 "msg_type %d in pid %u\n",
533 msg_type, (unsigned int)sys_getpid()));
536 SAFE_FREE(msgs_buf);
539 /****************************************************************************
540 Register/replace a dispatch function for a particular message type.
541 JRA changed Dec 13 2006. Only one message handler now permitted per type.
542 *NOTE*: Dispatch functions must be able to cope with incoming
543 messages on an *odd* byte boundary.
544 ****************************************************************************/
546 void message_register(int msg_type,
547 void (*fn)(int msg_type, struct process_id pid,
548 void *buf, size_t len,
549 void *private_data),
550 void *private_data)
552 struct dispatch_fns *dfn;
554 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
555 if (dfn->msg_type == msg_type) {
556 dfn->fn = fn;
557 return;
561 dfn = SMB_MALLOC_P(struct dispatch_fns);
563 if (dfn != NULL) {
565 ZERO_STRUCTPN(dfn);
567 dfn->msg_type = msg_type;
568 dfn->fn = fn;
569 dfn->private_data = private_data;
571 DLIST_ADD(dispatch_fns, dfn);
573 else {
575 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
579 /****************************************************************************
580 De-register the function for a particular message type.
581 ****************************************************************************/
583 void message_deregister(int msg_type)
585 struct dispatch_fns *dfn, *next;
587 for (dfn = dispatch_fns; dfn; dfn = next) {
588 next = dfn->next;
589 if (dfn->msg_type == msg_type) {
590 DLIST_REMOVE(dispatch_fns, dfn);
591 SAFE_FREE(dfn);
592 return;
597 struct msg_all {
598 int msg_type;
599 uint32 msg_flag;
600 const void *buf;
601 size_t len;
602 BOOL duplicates;
603 int n_sent;
606 /****************************************************************************
607 Send one of the messages for the broadcast.
608 ****************************************************************************/
610 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
612 struct connections_data crec;
613 struct msg_all *msg_all = (struct msg_all *)state;
614 NTSTATUS status;
616 if (dbuf.dsize != sizeof(crec))
617 return 0;
619 memcpy(&crec, dbuf.dptr, sizeof(crec));
621 if (crec.cnum != -1)
622 return 0;
624 /* Don't send if the receiver hasn't registered an interest. */
626 if(!(crec.bcast_msg_flags & msg_all->msg_flag))
627 return 0;
629 /* If the msg send fails because the pid was not found (i.e. smbd died),
630 * the msg has already been deleted from the messages.tdb.*/
632 status = message_send_pid(crec.pid, msg_all->msg_type,
633 msg_all->buf, msg_all->len,
634 msg_all->duplicates);
636 if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
638 /* If the pid was not found delete the entry from connections.tdb */
640 DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
641 procid_str_static(&crec.pid), crec.cnum, crec.name));
642 tdb_delete(the_tdb, kbuf);
644 msg_all->n_sent++;
645 return 0;
649 * Send a message to all smbd processes.
651 * It isn't very efficient, but should be OK for the sorts of
652 * applications that use it. When we need efficient broadcast we can add
653 * it.
655 * @param n_sent Set to the number of messages sent. This should be
656 * equal to the number of processes, but be careful for races.
658 * @retval True for success.
660 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
661 const void *buf, size_t len,
662 BOOL duplicates_allowed,
663 int *n_sent)
665 struct msg_all msg_all;
667 msg_all.msg_type = msg_type;
668 if (msg_type < 1000)
669 msg_all.msg_flag = FLAG_MSG_GENERAL;
670 else if (msg_type > 1000 && msg_type < 2000)
671 msg_all.msg_flag = FLAG_MSG_NMBD;
672 else if (msg_type > 2000 && msg_type < 2100)
673 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
674 else if (msg_type > 2100 && msg_type < 3000)
675 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
676 else if (msg_type > 3000 && msg_type < 4000)
677 msg_all.msg_flag = FLAG_MSG_SMBD;
678 else
679 return False;
681 msg_all.buf = buf;
682 msg_all.len = len;
683 msg_all.duplicates = duplicates_allowed;
684 msg_all.n_sent = 0;
686 tdb_traverse(conn_tdb, traverse_fn, &msg_all);
687 if (n_sent)
688 *n_sent = msg_all.n_sent;
689 return True;
693 * Block and unblock receiving of messages. Allows removal of race conditions
694 * when doing a fork and changing message disposition.
697 void message_block(void)
699 BlockSignals(True, SIGUSR1);
702 void message_unblock(void)
704 BlockSignals(False, SIGUSR1);
708 * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could
709 * import the whole Samba4 thing, but I want notify.c from Samba4 in first.
712 struct messaging_callback {
713 struct messaging_callback *prev, *next;
714 uint32 msg_type;
715 void (*fn)(struct messaging_context *msg, void *private_data,
716 uint32_t msg_type,
717 struct server_id server_id, DATA_BLOB *data);
718 void *private_data;
721 struct messaging_context {
722 struct server_id id;
723 struct messaging_callback *callbacks;
726 static int messaging_context_destructor(struct messaging_context *ctx)
728 struct messaging_callback *cb;
730 for (cb = ctx->callbacks; cb; cb = cb->next) {
732 * We unconditionally remove all instances of our callback
733 * from the tdb basis.
735 message_deregister(cb->msg_type);
737 return 0;
740 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
741 struct server_id server_id,
742 struct event_context *ev)
744 struct messaging_context *ctx;
746 if (!(ctx = TALLOC_ZERO_P(mem_ctx, struct messaging_context))) {
747 return NULL;
750 ctx->id = server_id;
751 talloc_set_destructor(ctx, messaging_context_destructor);
752 return ctx;
755 static void messaging_callback(int msg_type, struct process_id pid,
756 void *buf, size_t len, void *private_data)
758 struct messaging_context *ctx = talloc_get_type_abort(
759 private_data, struct messaging_context);
760 struct messaging_callback *cb, *next;
762 for (cb = ctx->callbacks; cb; cb = next) {
764 * Allow a callback to remove itself
766 next = cb->next;
768 if (msg_type == cb->msg_type) {
769 DATA_BLOB blob;
770 struct server_id id;
772 blob.data = (uint8 *)buf;
773 blob.length = len;
774 id.id = pid;
776 cb->fn(ctx, cb->private_data, msg_type, id, &blob);
782 * Register a dispatch function for a particular message type. Allow multiple
783 * registrants
785 NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data,
786 uint32_t msg_type,
787 void (*fn)(struct messaging_context *msg,
788 void *private_data,
789 uint32_t msg_type,
790 struct server_id server_id,
791 DATA_BLOB *data))
793 struct messaging_callback *cb;
795 if (!(cb = talloc(ctx, struct messaging_callback))) {
796 return NT_STATUS_NO_MEMORY;
799 cb->msg_type = msg_type;
800 cb->fn = fn;
801 cb->private_data = private_data;
803 DLIST_ADD(ctx->callbacks, cb);
804 message_register(msg_type, messaging_callback, ctx);
805 return NT_STATUS_OK;
809 De-register the function for a particular message type.
811 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
812 void *private_data)
814 struct messaging_callback *cb, *next;
816 for (cb = ctx->callbacks; cb; cb = next) {
817 next = cb->next;
818 if ((cb->msg_type == msg_type)
819 && (cb->private_data == private_data)) {
820 DLIST_REMOVE(ctx->callbacks, cb);
821 TALLOC_FREE(cb);
827 Send a message to a particular server
829 NTSTATUS messaging_send(struct messaging_context *msg,
830 struct server_id server,
831 uint32_t msg_type, DATA_BLOB *data)
833 return message_send_pid_internal(server.id, msg_type, data->data,
834 data->length, True, 0);
837 /** @} **/