r15102: u32 is a tdb-only thing that's not exported by samba4 tdb. Replace by uint32.
[Samba/gebeck_regimport.git] / source / lib / messages.c
blobcd2a3b36b6e2402d4bc48b244f80d82276e7036c
1 /*
2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 2 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 /**
24 @defgroup messages Internal messaging framework
26 @file messages.c
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
33 that process.
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
37 simple example.
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
44 quick succession.
48 #include "includes.h"
50 /* the locking database handle */
51 static TDB_CONTEXT *tdb;
52 static int received_signal;
54 /* change the message version with any incompatible changes in the protocol */
55 #define MESSAGE_VERSION 1
57 struct message_rec {
58 int msg_version;
59 int msg_type;
60 struct process_id dest;
61 struct process_id src;
62 size_t len;
65 /* we have a linked list of dispatch handlers */
66 static struct dispatch_fns {
67 struct dispatch_fns *next, *prev;
68 int msg_type;
69 void (*fn)(int msg_type, struct process_id pid, void *buf, size_t len);
70 } *dispatch_fns;
72 /****************************************************************************
73 Free global objects.
74 ****************************************************************************/
76 void gfree_messsges(void)
78 struct dispatch_fns *dfn, *next;
80 /* delete the dispatch_fns list */
81 dfn = dispatch_fns;
82 while( dfn ) {
83 next = dfn->next;
84 DLIST_REMOVE(dispatch_fns, dfn);
85 SAFE_FREE(dfn);
86 dfn = next;
90 /****************************************************************************
91 Notifications come in as signals.
92 ****************************************************************************/
94 static void sig_usr1(void)
96 received_signal = 1;
97 sys_select_signal(SIGUSR1);
100 /****************************************************************************
101 A useful function for testing the message system.
102 ****************************************************************************/
104 static void ping_message(int msg_type, struct process_id src,
105 void *buf, size_t len)
107 const char *msg = buf ? buf : "none";
108 DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
109 procid_str_static(&src), msg));
110 message_send_pid(src, MSG_PONG, buf, len, True);
113 /****************************************************************************
114 Initialise the messaging functions.
115 ****************************************************************************/
117 BOOL message_init(void)
119 if (tdb) return True;
121 tdb = tdb_open_log(lock_path("messages.tdb"),
122 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT,
123 O_RDWR|O_CREAT,0600);
125 if (!tdb) {
126 DEBUG(0,("ERROR: Failed to initialise messages database\n"));
127 return False;
130 CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
132 message_register(MSG_PING, ping_message);
134 /* Register some debugging related messages */
136 register_msg_pool_usage();
137 register_dmalloc_msgs();
139 return True;
142 /*******************************************************************
143 Form a static tdb key from a pid.
144 ******************************************************************/
146 static TDB_DATA message_key_pid(struct process_id pid)
148 static char key[20];
149 TDB_DATA kbuf;
151 slprintf(key, sizeof(key)-1, "PID/%s", procid_str_static(&pid));
153 kbuf.dptr = (char *)key;
154 kbuf.dsize = strlen(key)+1;
155 return kbuf;
158 /****************************************************************************
159 Notify a process that it has a message. If the process doesn't exist
160 then delete its record in the database.
161 ****************************************************************************/
163 static BOOL message_notify(struct process_id procid)
165 pid_t pid = procid.pid;
167 * Doing kill with a non-positive pid causes messages to be
168 * sent to places we don't want.
171 SMB_ASSERT(pid > 0);
173 if (kill(pid, SIGUSR1) == -1) {
174 if (errno == ESRCH) {
175 DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
176 tdb_delete(tdb, message_key_pid(procid));
177 } else {
178 DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno)));
180 return False;
182 return True;
185 /****************************************************************************
186 Send a message to a particular pid.
187 ****************************************************************************/
189 static BOOL message_send_pid_internal(struct process_id pid, int msg_type,
190 const void *buf, size_t len,
191 BOOL duplicates_allowed,
192 unsigned int timeout)
194 TDB_DATA kbuf;
195 TDB_DATA dbuf;
196 TDB_DATA old_dbuf;
197 struct message_rec rec;
198 char *ptr;
199 struct message_rec prec;
202 * Doing kill with a non-positive pid causes messages to be
203 * sent to places we don't want.
206 SMB_ASSERT(procid_to_pid(&pid) > 0);
208 rec.msg_version = MESSAGE_VERSION;
209 rec.msg_type = msg_type;
210 rec.dest = pid;
211 rec.src = procid_self();
212 rec.len = len;
214 kbuf = message_key_pid(pid);
216 dbuf.dptr = (void *)SMB_MALLOC(len + sizeof(rec));
217 if (!dbuf.dptr)
218 return False;
220 memcpy(dbuf.dptr, &rec, sizeof(rec));
221 if (len > 0)
222 memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len);
224 dbuf.dsize = len + sizeof(rec);
226 if (duplicates_allowed) {
228 /* If duplicates are allowed we can just append the message and return. */
230 /* lock the record for the destination */
231 if (timeout) {
232 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
233 DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
234 return False;
236 } else {
237 if (tdb_chainlock(tdb, kbuf) == -1) {
238 DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
239 return False;
242 tdb_append(tdb, kbuf, dbuf);
243 tdb_chainunlock(tdb, kbuf);
245 SAFE_FREE(dbuf.dptr);
246 errno = 0; /* paranoia */
247 return message_notify(pid);
250 /* lock the record for the destination */
251 if (timeout) {
252 if (tdb_chainlock_with_timeout(tdb, kbuf, timeout) == -1) {
253 DEBUG(0,("message_send_pid_internal: failed to get chainlock with timeout %ul.\n", timeout));
254 return False;
256 } else {
257 if (tdb_chainlock(tdb, kbuf) == -1) {
258 DEBUG(0,("message_send_pid_internal: failed to get chainlock.\n"));
259 return False;
263 old_dbuf = tdb_fetch(tdb, kbuf);
265 if (!old_dbuf.dptr) {
266 /* its a new record */
268 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
269 tdb_chainunlock(tdb, kbuf);
271 SAFE_FREE(dbuf.dptr);
272 errno = 0; /* paranoia */
273 return message_notify(pid);
276 /* Not a new record. Check for duplicates. */
278 for(ptr = (char *)old_dbuf.dptr; ptr < old_dbuf.dptr + old_dbuf.dsize; ) {
280 * First check if the message header matches, then, if it's a non-zero
281 * sized message, check if the data matches. If so it's a duplicate and
282 * we can discard it. JRA.
285 if (!memcmp(ptr, &rec, sizeof(rec))) {
286 if (!len || (len && !memcmp( ptr + sizeof(rec), buf, len))) {
287 tdb_chainunlock(tdb, kbuf);
288 DEBUG(10,("message_send_pid_internal: discarding duplicate message.\n"));
289 SAFE_FREE(dbuf.dptr);
290 SAFE_FREE(old_dbuf.dptr);
291 return True;
294 memcpy(&prec, ptr, sizeof(prec));
295 ptr += sizeof(rec) + prec.len;
298 /* we're adding to an existing entry */
300 tdb_append(tdb, kbuf, dbuf);
301 tdb_chainunlock(tdb, kbuf);
303 SAFE_FREE(old_dbuf.dptr);
304 SAFE_FREE(dbuf.dptr);
306 errno = 0; /* paranoia */
307 return message_notify(pid);
310 /****************************************************************************
311 Send a message to a particular pid - no timeout.
312 ****************************************************************************/
314 BOOL message_send_pid(struct process_id pid, int msg_type, const void *buf, size_t len, BOOL duplicates_allowed)
316 return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, 0);
319 /****************************************************************************
320 Send a message to a particular pid, with timeout in seconds.
321 ****************************************************************************/
323 BOOL message_send_pid_with_timeout(struct process_id pid, int msg_type, const void *buf, size_t len,
324 BOOL duplicates_allowed, unsigned int timeout)
326 return message_send_pid_internal(pid, msg_type, buf, len, duplicates_allowed, timeout);
329 /****************************************************************************
330 Count the messages pending for a particular pid. Expensive....
331 ****************************************************************************/
333 unsigned int messages_pending_for_pid(struct process_id pid)
335 TDB_DATA kbuf;
336 TDB_DATA dbuf;
337 char *buf;
338 unsigned int message_count = 0;
340 kbuf = message_key_pid(pid);
342 dbuf = tdb_fetch(tdb, kbuf);
343 if (dbuf.dptr == NULL || dbuf.dsize == 0) {
344 SAFE_FREE(dbuf.dptr);
345 return 0;
348 for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) {
349 struct message_rec rec;
350 memcpy(&rec, buf, sizeof(rec));
351 buf += (sizeof(rec) + rec.len);
352 dbuf.dsize -= (sizeof(rec) + rec.len);
353 message_count++;
356 SAFE_FREE(dbuf.dptr);
357 return message_count;
360 /****************************************************************************
361 Retrieve all messages for the current process.
362 ****************************************************************************/
364 static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
366 TDB_DATA kbuf;
367 TDB_DATA dbuf;
368 TDB_DATA null_dbuf;
370 ZERO_STRUCT(null_dbuf);
372 *msgs_buf = NULL;
373 *total_len = 0;
375 kbuf = message_key_pid(pid_to_procid(sys_getpid()));
377 if (tdb_chainlock(tdb, kbuf) == -1)
378 return False;
380 dbuf = tdb_fetch(tdb, kbuf);
382 * Replace with an empty record to keep the allocated
383 * space in the tdb.
385 tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
386 tdb_chainunlock(tdb, kbuf);
388 if (dbuf.dptr == NULL || dbuf.dsize == 0) {
389 SAFE_FREE(dbuf.dptr);
390 return False;
393 *msgs_buf = dbuf.dptr;
394 *total_len = dbuf.dsize;
396 return True;
399 /****************************************************************************
400 Parse out the next message for the current process.
401 ****************************************************************************/
403 static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type,
404 struct process_id *src, char **buf, size_t *len)
406 struct message_rec rec;
407 char *ret_buf = *buf;
409 *buf = NULL;
410 *len = 0;
412 if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
413 return False;
415 memcpy(&rec, ret_buf, sizeof(rec));
416 ret_buf += sizeof(rec);
418 if (rec.msg_version != MESSAGE_VERSION) {
419 DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
420 return False;
423 if (rec.len > 0) {
424 if (total_len - (ret_buf - msgs_buf) < rec.len)
425 return False;
428 *len = rec.len;
429 *msg_type = rec.msg_type;
430 *src = rec.src;
431 *buf = ret_buf;
433 return True;
436 /****************************************************************************
437 Receive and dispatch any messages pending for this process.
438 Notice that all dispatch handlers for a particular msg_type get called,
439 so you can register multiple handlers for a message.
440 *NOTE*: Dispatch functions must be able to cope with incoming
441 messages on an *odd* byte boundary.
442 ****************************************************************************/
444 void message_dispatch(void)
446 int msg_type;
447 struct process_id src;
448 char *buf;
449 char *msgs_buf;
450 size_t len, total_len;
451 struct dispatch_fns *dfn;
452 int n_handled;
454 if (!received_signal)
455 return;
457 DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
459 received_signal = 0;
461 if (!retrieve_all_messages(&msgs_buf, &total_len))
462 return;
464 for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
465 DEBUG(10,("message_dispatch: received msg_type=%d "
466 "src_pid=%u\n", msg_type,
467 (unsigned int) procid_to_pid(&src)));
468 n_handled = 0;
469 for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
470 if (dfn->msg_type == msg_type) {
471 DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
472 dfn->fn(msg_type, src, len ? (void *)buf : NULL, len);
473 n_handled++;
476 if (!n_handled) {
477 DEBUG(5,("message_dispatch: warning: no handlers registed for "
478 "msg_type %d in pid %u\n",
479 msg_type, (unsigned int)sys_getpid()));
482 SAFE_FREE(msgs_buf);
485 /****************************************************************************
486 Register a dispatch function for a particular message type.
487 *NOTE*: Dispatch functions must be able to cope with incoming
488 messages on an *odd* byte boundary.
489 ****************************************************************************/
491 void message_register(int msg_type,
492 void (*fn)(int msg_type, struct process_id pid,
493 void *buf, size_t len))
495 struct dispatch_fns *dfn;
497 dfn = SMB_MALLOC_P(struct dispatch_fns);
499 if (dfn != NULL) {
501 ZERO_STRUCTPN(dfn);
503 dfn->msg_type = msg_type;
504 dfn->fn = fn;
506 DLIST_ADD(dispatch_fns, dfn);
508 else {
510 DEBUG(0,("message_register: Not enough memory. malloc failed!\n"));
514 /****************************************************************************
515 De-register the function for a particular message type.
516 ****************************************************************************/
518 void message_deregister(int msg_type)
520 struct dispatch_fns *dfn, *next;
522 for (dfn = dispatch_fns; dfn; dfn = next) {
523 next = dfn->next;
524 if (dfn->msg_type == msg_type) {
525 DLIST_REMOVE(dispatch_fns, dfn);
526 SAFE_FREE(dfn);
531 struct msg_all {
532 int msg_type;
533 uint32 msg_flag;
534 const void *buf;
535 size_t len;
536 BOOL duplicates;
537 int n_sent;
540 /****************************************************************************
541 Send one of the messages for the broadcast.
542 ****************************************************************************/
544 static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
546 struct connections_data crec;
547 struct msg_all *msg_all = (struct msg_all *)state;
549 if (dbuf.dsize != sizeof(crec))
550 return 0;
552 memcpy(&crec, dbuf.dptr, sizeof(crec));
554 if (crec.cnum != -1)
555 return 0;
557 /* Don't send if the receiver hasn't registered an interest. */
559 if(!(crec.bcast_msg_flags & msg_all->msg_flag))
560 return 0;
562 /* If the msg send fails because the pid was not found (i.e. smbd died),
563 * the msg has already been deleted from the messages.tdb.*/
565 if (!message_send_pid(crec.pid, msg_all->msg_type,
566 msg_all->buf, msg_all->len,
567 msg_all->duplicates)) {
569 /* If the pid was not found delete the entry from connections.tdb */
571 if (errno == ESRCH) {
572 DEBUG(2,("pid %s doesn't exist - deleting connections %d [%s]\n",
573 procid_str_static(&crec.pid),
574 crec.cnum, crec.name));
575 tdb_delete(the_tdb, kbuf);
578 msg_all->n_sent++;
579 return 0;
583 * Send a message to all smbd processes.
585 * It isn't very efficient, but should be OK for the sorts of
586 * applications that use it. When we need efficient broadcast we can add
587 * it.
589 * @param n_sent Set to the number of messages sent. This should be
590 * equal to the number of processes, but be careful for races.
592 * @retval True for success.
594 BOOL message_send_all(TDB_CONTEXT *conn_tdb, int msg_type,
595 const void *buf, size_t len,
596 BOOL duplicates_allowed,
597 int *n_sent)
599 struct msg_all msg_all;
601 msg_all.msg_type = msg_type;
602 if (msg_type < 1000)
603 msg_all.msg_flag = FLAG_MSG_GENERAL;
604 else if (msg_type > 1000 && msg_type < 2000)
605 msg_all.msg_flag = FLAG_MSG_NMBD;
606 else if (msg_type > 2000 && msg_type < 2100)
607 msg_all.msg_flag = FLAG_MSG_PRINT_NOTIFY;
608 else if (msg_type > 2100 && msg_type < 3000)
609 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
610 else if (msg_type > 3000 && msg_type < 4000)
611 msg_all.msg_flag = FLAG_MSG_SMBD;
612 else
613 return False;
615 msg_all.buf = buf;
616 msg_all.len = len;
617 msg_all.duplicates = duplicates_allowed;
618 msg_all.n_sent = 0;
620 tdb_traverse(conn_tdb, traverse_fn, &msg_all);
621 if (n_sent)
622 *n_sent = msg_all.n_sent;
623 return True;
627 * Block and unblock receiving of messages. Allows removal of race conditions
628 * when doing a fork and changing message disposition.
631 void message_block(void)
633 BlockSignals(True, SIGUSR1);
636 void message_unblock(void)
638 BlockSignals(False, SIGUSR1);
640 /** @} **/