From 8e71b8f83c3077bdcf576e69d033a7ab96097dfa Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Thu, 16 Nov 2000 21:44:49 +0000 Subject: [PATCH] Fix for a problem with the new messaging system. If a sender is using the messaging system as a notification mechanism, and the speed of notification greatly exceeds the speed of message recovery, then you get a massively (>75Mb) growing tdb. If the message is a simple notification, then the message is static, and you only need one of them in transit to a target process at any one time. This patch adds a BOOL "allow_duplicates" to the message_send_XX primitives. If set to False, then before sending a message the sender checks the existing message queue for a target pid for a duplicate of this message, and doesn't add to it if one already exists. Jeremy. --- source/Makefile.in | 9 ++++++++ source/include/proto.h | 4 ++-- source/lib/debug.c | 2 +- source/lib/messages.c | 46 +++++++++++++++++++++++++++++++++----- source/printing/printing.c | 21 ++++++----------- source/profile/profile.c | 2 +- source/rpc_server/srv_spoolss_nt.c | 2 +- source/utils/msgtest.c | 29 ++++++++++++++++++++++-- source/utils/smbcontrol.c | 20 ++++++++--------- 9 files changed, 99 insertions(+), 36 deletions(-) diff --git a/source/Makefile.in b/source/Makefile.in index 28628658f17..f4096d06aec 100644 --- a/source/Makefile.in +++ b/source/Makefile.in @@ -286,6 +286,9 @@ SMBTORTURE_OBJ = utils/torture.o utils/nbio.o $(LIBSMB_OBJ) $(PARAM_OBJ) \ MASKTEST_OBJ = utils/masktest.o $(LIBSMB_OBJ) $(PARAM_OBJ) \ $(UBIQX_OBJ) $(LIB_OBJ) +MSGTEST_OBJ = utils/msgtest.o $(LIBSMB_OBJ) $(PARAM_OBJ) \ + $(UBIQX_OBJ) $(LIB_OBJ) + LOCKTEST_OBJ = utils/locktest.o $(LOCKING_OBJ) $(LIBSMB_OBJ) $(PARAM_OBJ) \ $(UBIQX_OBJ) $(LIB_OBJ) @@ -354,6 +357,8 @@ smbtorture : CHECK bin/smbtorture masktest : CHECK bin/masktest +msgtest : CHECK bin/msgtest + locktest : CHECK bin/locktest locktest2 : CHECK bin/locktest2 @@ -505,6 +510,10 @@ bin/masktest: $(MASKTEST_OBJ) bin/.dummy @echo Linking $@ @$(CC) $(FLAGS) -o $@ $(MASKTEST_OBJ) $(LDFLAGS) $(LIBS) +bin/msgtest: $(MSGTEST_OBJ) bin/.dummy + @echo Linking $@ + @$(CC) $(FLAGS) -o $@ $(MSGTEST_OBJ) $(LDFLAGS) $(LIBS) + bin/locktest: $(LOCKTEST_OBJ) bin/.dummy @echo Linking $@ @$(CC) $(FLAGS) -o $@ $(LOCKTEST_OBJ) $(LDFLAGS) $(LIBS) diff --git a/source/include/proto.h b/source/include/proto.h index 4f7a10631e3..2ff347ce198 100644 --- a/source/include/proto.h +++ b/source/include/proto.h @@ -153,12 +153,12 @@ void mdfour(unsigned char *out, unsigned char *in, int n); void ping_message(int msg_type, pid_t src, void *buf, size_t len); void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len); BOOL message_init(void); -BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len); +BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed); void message_dispatch(void); void message_register(int msg_type, void (*fn)(int msg_type, pid_t pid, void *buf, size_t len)); void message_deregister(int msg_type); -BOOL message_send_all(int msg_type, void *buf, size_t len); +BOOL message_send_all(int msg_type, void *buf, size_t len, BOOL duplicates_allowed); /*The following definitions come from lib/ms_fnmatch.c */ diff --git a/source/lib/debug.c b/source/lib/debug.c index cdcd44955b4..9d520b6c2fe 100644 --- a/source/lib/debug.c +++ b/source/lib/debug.c @@ -135,7 +135,7 @@ send a "set debug level" message ****************************************************************************/ void debug_message_send(pid_t pid, int level) { - message_send_pid(pid, MSG_DEBUG, &level, sizeof(int)); + message_send_pid(pid, MSG_DEBUG, &level, sizeof(int), False); } diff --git a/source/lib/messages.c b/source/lib/messages.c index 126ca768b7b..0e2dfeffd67 100644 --- a/source/lib/messages.c +++ b/source/lib/messages.c @@ -71,7 +71,7 @@ a useful function for testing the message system void ping_message(int msg_type, pid_t src, void *buf, size_t len) { DEBUG(1,("INFO: Received PING message from PID %d\n",src)); - message_send_pid(src, MSG_PONG, buf, len); + message_send_pid(src, MSG_PONG, buf, len, True); } /**************************************************************************** @@ -83,7 +83,7 @@ void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len) DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %d\n",src)); level = DEBUGLEVEL; - message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int)); + message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int), True); } /**************************************************************************** @@ -148,13 +148,23 @@ static BOOL message_notify(pid_t pid) /**************************************************************************** send a message to a particular pid ****************************************************************************/ -BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len) +BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed) { TDB_DATA kbuf; TDB_DATA dbuf; struct message_rec rec; void *p; + /* + * Do an early check for process exists - saves adding into a tdb + * and deleting again if the target is not present. JRA. + */ + + if (kill(pid, 0) == -1) { + DEBUG(2,("message_send_pid: pid %d doesn't exist\n", (int)pid)); + return False; + } + rec.msg_version = MESSAGE_VERSION; rec.msg_type = msg_type; rec.dest = pid; @@ -183,6 +193,30 @@ BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len) goto ok; } + if (!duplicates_allowed) { + char *ptr; + struct message_rec *prec; + + for(ptr = (char *)dbuf.dptr, prec = (struct message_rec *)ptr; ptr < dbuf.dptr + dbuf.dsize; + ptr += (sizeof(rec) + prec->len), prec = (struct message_rec *)ptr) { + + /* + * First check if the message header matches, then, if it's a non-zero + * sized message, check if the data matches. If so it's a duplicate and + * we can discard it. JRA. + */ + + if (!memcmp(ptr, &rec, sizeof(rec))) { + if (!len || (len && !memcmp( ptr + sizeof(rec), (char *)buf, len))) { + DEBUG(10,("message_send_pid: discarding duplicate message.\n")); + free(dbuf.dptr); + tdb_unlockchain(tdb, kbuf); + return True; + } + } + } + } + /* we're adding to an existing entry */ p = (void *)malloc(dbuf.dsize + len + sizeof(rec)); if (!p) goto failed; @@ -323,6 +357,7 @@ static struct { int msg_type; void *buf; size_t len; + BOOL duplicates; } msg_all; /**************************************************************************** @@ -335,7 +370,7 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void memcpy(&crec, dbuf.dptr, sizeof(crec)); if (crec.cnum == -1) return 0; - message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len); + message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len, msg_all.duplicates); return 0; } @@ -344,7 +379,7 @@ this is a useful function for sending messages to all smbd processes. It isn't very efficient, but should be OK for the sorts of applications that use it. When we need efficient broadcast we can add it. ****************************************************************************/ -BOOL message_send_all(int msg_type, void *buf, size_t len) +BOOL message_send_all(int msg_type, void *buf, size_t len, BOOL duplicates_allowed) { TDB_CONTEXT *the_tdb; @@ -357,6 +392,7 @@ BOOL message_send_all(int msg_type, void *buf, size_t len) msg_all.msg_type = msg_type; msg_all.buf = buf; msg_all.len = len; + msg_all.duplicates = duplicates_allowed; tdb_traverse(the_tdb, traverse_fn, NULL); tdb_close(the_tdb); diff --git a/source/printing/printing.c b/source/printing/printing.c index 2d98aa99744..1eb6c275553 100644 --- a/source/printing/printing.c +++ b/source/printing/printing.c @@ -576,8 +576,7 @@ BOOL print_job_delete(struct current_user *user, int jobid, int *errcode) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); return !print_job_exists(jobid); } @@ -627,8 +626,7 @@ BOOL print_job_pause(struct current_user *user, int jobid, int *errcode) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); /* how do we tell if this succeeded? */ @@ -678,8 +676,7 @@ BOOL print_job_resume(struct current_user *user, int jobid, int *errcode) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); return True; } @@ -942,8 +939,7 @@ BOOL print_job_end(int jobid) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); return True; } @@ -1125,8 +1121,7 @@ BOOL print_queue_pause(struct current_user *user, int snum, int *errcode) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); return True; } @@ -1159,8 +1154,7 @@ BOOL print_queue_resume(struct current_user *user, int snum, int *errcode) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); return True; } @@ -1189,8 +1183,7 @@ BOOL print_queue_purge(struct current_user *user, int snum, int *errcode) printer_name = PRINTERNAME(snum); - message_send_all(MSG_PRINTER_NOTIFY, printer_name, - strlen(printer_name) + 1); + message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False); return True; } diff --git a/source/profile/profile.c b/source/profile/profile.c index 80584adfa2a..dce1d78a9b6 100644 --- a/source/profile/profile.c +++ b/source/profile/profile.c @@ -86,7 +86,7 @@ void reqprofile_message(int msg_type, pid_t src, void *buf, size_t len) level = 0; #endif DEBUG(1,("INFO: Received REQ_PROFILELEVEL message from PID %d\n",src)); - message_send_pid(src, MSG_PROFILELEVEL, &level, sizeof(int)); + message_send_pid(src, MSG_PROFILELEVEL, &level, sizeof(int), True); } /******************************************************************* diff --git a/source/rpc_server/srv_spoolss_nt.c b/source/rpc_server/srv_spoolss_nt.c index 6a3b72ffa71..30acc149049 100644 --- a/source/rpc_server/srv_spoolss_nt.c +++ b/source/rpc_server/srv_spoolss_nt.c @@ -640,7 +640,7 @@ static BOOL srv_spoolss_sendnotify(POLICY_HND *handle) /*srv_spoolss_receive_message(printer);*/ DEBUG(10,("srv_spoolss_sendnotify: Sending message about printer %s\n", printer )); - message_send_all(MSG_PRINTER_NOTIFY, printer, strlen(printer) + 1); /* Null terminate... */ + message_send_all(MSG_PRINTER_NOTIFY, printer, strlen(printer) + 1, False); /* Null terminate... */ return True; } diff --git a/source/utils/msgtest.c b/source/utils/msgtest.c index 858166e697f..3fbf95af8fa 100644 --- a/source/utils/msgtest.c +++ b/source/utils/msgtest.c @@ -36,12 +36,12 @@ void pong_message(int msg_type, pid_t src, void *buf, size_t len) pong_count++; } - int main(int argc, char *argv[]) { pid_t pid; int i, n; static pstring servicesf = CONFIGFILE; + char buf[12]; TimeInit(); setup_logging(argv[0],True); @@ -52,13 +52,18 @@ void pong_message(int msg_type, pid_t src, void *buf, size_t len) message_init(); + if (argc != 3) { + fprintf(stderr, "%s: Usage - %s pid count\n", argv[0], argv[0]); + exit(1); + } + pid = atoi(argv[1]); n = atoi(argv[2]); message_register(MSG_PONG, pong_message); for (i=0;i