disable the unrecognized nls flag
[AROS-Contrib.git] / regina / rxstack.c
blobfe88d1d4ed788493d2f8f3f45498a675cc37f8ca
1 /*
2 * The Regina Rexx Interpreter
3 * Copyright (C) 1992-1994 Anders Christensen <anders@pvv.unit.no>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the Free
17 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 * This process runs as a daemon (or NT service). It maintains multiple,
21 * named Rexx queues.
22 * All communication is done via TCP/IP sockets.
23 * This process waits on a known port; 5656 by default for connections
24 * from clients. A client is any process that respects the Interface
25 * defined below. The "normal" clients are regina and rxqueue.
26 * Details about each client is kept, like current queue name.
28 * Structure
29 * ---------
30 * startup
31 * - set signal handler for SIGTERM
32 * initialise socket interface
33 * - socket()
34 * - bind()
35 * - listen()
36 * loop until killed
37 * - setup read FDs
38 * - select()
39 * - if listen socket, add new client
40 * - otherwise read command
41 * cleanup
42 * - disconnect all clients
43 * - free up resources
45 * Interface.
46 * ---------
47 * Once a client connects, it sends commands
48 * Commands are single character, followed by optional 6 hex character
49 * length and optional data.
50 * F - queue data onto client's current queue (FIFO)
51 * in -> FFFFFFFxxx--data--xxx
52 * out-> 0000000 (if successful)
53 * out-> 2xxxxxx (if error, eg queue deleted)
54 * out-> 3000000 (memory allocation error)
55 * regina QUEUE, rxqueue /fifo
56 * L - push data onto client's current queue (LIFO)
57 * in-> LFFFFFFxxx--data--xxx
58 * out-> 0000000 (if successful)
59 * out-> 2xxxxxx (if error, eg queue deleted)
60 * out-> 3000000 (memory allocation error)
61 * regina PUSH, rxqueue /lifo
62 * C - create queue
63 * in-> CFFFFFFxxx--queue name--xxx (if length 0, create name)
64 * out-> 0FFFFFFxxx--queue name--xxx (if queue name created)
65 * out-> 1FFFFFFxxx--queue name--xxx (if queue name existed)
66 * out-> 2xxxxxx (if error)
67 * regina RXQUEUE('C'), rxqueue N/A
68 * D - delete queue
69 * in-> DFFFFFFxxx--queue name--xxx
70 * out-> 0000000 (if queue name deleted)
71 * out-> 5xxxxxx (trying to delete 'SESSION' queue)
72 * out-> 6000000 (queue name not passed)
73 * out-> 9xxxxxx (if error, eg queue already deleted)
74 * regina RXQUEUE('D'), rxqueue N/A
75 * E - empty data from specified queue
76 * in-> EFFFFFFxxx--queue name--xxx
77 * out-> 0000000 (if queue emptied)
78 * out-> 2xxxxxx (if error, eg queue deleted)
79 * out-> 3000000 (memory allocation error)
80 * regina N/A, rxqueue /clear
81 * P - pop item off client's default queue
82 * in-> P000000
83 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
84 * out-> 1000000 (if queue empty)
85 * out-> 2xxxxxx (if queue name deleted - length ignored)
86 * out-> 4xxxxxx (if timeout on queue exceeded - length ignored)
87 * regina PULL, rxqueue N/A
88 * p - fetch item off client's default queue
89 * in-> p000000
90 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
91 * out-> 1000000 (if queue empty)
92 * out-> 2xxxxxx (if queue name deleted - length ignored)
93 * regina PULL without timeout, rxqueue N/A
94 *x S - set default queue name (allow false queues)
95 * in-> SFFFFFFxxx--queue name--xxx
96 * out-> 0000000 (if successful)
97 * out-> 3000000 (memory allocation error)
98 * out-> 6000000 (queue name not passed)
99 * regina RXQUEUE('S'), rxqueue N/A
100 * G - get default queue name
101 * in-> G000000
102 * out-> 0FFFFFFxxx--queue name--xxx
103 * regina RXQUEUE('G'), rxqueue N/A
104 * N - return number of lines on stack
105 * in-> N000000
106 * out-> 0FFFFFF (if queue exists)
107 * out-> 2xxxxxx (if error or queue doesn't exist - length ignored)
108 * regina QUEUED(), rxqueue N/A
109 * T - set timeout on queue pull
110 * in-> TFFFFFFTTTTTT
111 * out-> 0000000 (if queue timeout set)
112 * out-> 2xxxxxx (if error, eg invalid argument)
113 * out-> 6000000 (queue name not passed)
114 * regina RXQUEUE('T'), rxqueue N/A
115 * X - client disconnect
116 * in-> X000000
117 * out->
118 * Z - client requests shutdown - should only be called by ourselves!!
119 * in-> Z000000
120 * out->
123 #define NO_CTYPE_REPLACEMENT
124 #include "rexx.h"
126 #if defined(WIN32) || defined(__LCC__)
127 # if defined(_MSC_VER)
128 # if _MSC_VER >= 1100
129 /* Stupid MSC can't compile own headers without warning at least in VC 5.0 */
130 # pragma warning(disable: 4115 4201 4214 4514)
131 # endif
132 # include <windows.h>
133 # if _MSC_VER >= 1100
134 # pragma warning(default: 4115 4201 4214)
135 # endif
136 # elif defined(__LCC__)
137 # include <windows.h>
138 # include <winsvc.h>
139 # include <winsock.h>
140 # else
141 # include <windows.h>
142 # endif
143 # include <io.h>
144 #else
145 # ifdef HAVE_SYS_SOCKET_H
146 # include <sys/socket.h>
147 # endif
148 # ifdef HAVE_NETINET_IN_H
149 # include <netinet/in.h>
150 # endif
151 # if defined(HAVE_POLL_H) && defined(HAVE_POLL)
152 # include <poll.h>
153 # elif defined(HAVE_SYS_POLL_H) && defined(HAVE_POLL)
154 # include <sys/poll.h>
155 # elif defined(HAVE_SYS_SELECT_H)
156 # include <sys/select.h>
157 # endif
158 # ifdef HAVE_NETDB_H
159 # include <netdb.h>
160 # endif
161 # ifdef HAVE_ARPA_INET_H
162 # include <arpa/inet.h>
163 # endif
164 #endif
165 #include <string.h>
167 #ifdef HAVE_UNISTD_H
168 #include <unistd.h>
169 #endif
171 #ifdef HAVE_ERRNO_H
172 #include <errno.h>
173 #endif
175 #ifdef HAVE_SIGNAL_H
176 #include <signal.h>
177 #endif
179 #ifdef HAVE_CTYPE_H
180 #include <ctype.h>
181 #endif
183 #ifdef HAVE_PROCESS_H
184 # include <process.h>
185 #endif
187 #if defined(TIME_WITH_SYS_TIME)
188 # include <sys/time.h>
189 # include <time.h>
190 #else
191 # if defined(HAVE_SYS_TIME_H)
192 # include <sys/time.h>
193 # else
194 # include <time.h>
195 # endif
196 #endif
198 #include <assert.h>
200 #define HAVE_FORK
201 #if defined(__WATCOMC__) || defined(_MSC_VER) || (defined(__IBMC__) && defined(WIN32)) || defined(__SASC) || defined(__MINGW32__) || defined(__BORLANDC__) || defined(DOS) || defined(__LCC__)
202 # undef HAVE_FORK
203 #endif
204 #if defined(__WATCOMC__) && defined(__QNX__)
205 # define HAVE_FORK
206 #endif
208 #include "extstack.h"
210 #ifdef BUILD_NT_SERVICE
211 # include "service.h"
213 * this event is signalled when the
214 * service should end
216 HANDLE hServerStopEvent = NULL;
217 #endif
219 #ifdef WIN32
220 # define os_errno ((int)WSAGetLastError())
221 # define errno_str(code) Win32ErrorString(code)
222 # undef EINTR
223 # define EINTR WSAEINTR
224 # undef ECONNRESET
225 # define ECONNRESET WSAECONNRESET
226 #else
227 # define os_errno errno
228 # define errno_str(code) strerror(code)
229 #endif
233 * debugging is turned off. You can turn it on by the command line option "-D".
235 static int debug = 0 ;
236 #define DEBUGDUMP(x) { if ( debug ) \
237 {x;} \
241 * DEFAULT_WAKEUP is the time in ms after which the process shall wakeup.
242 * The time has a maximum of 49 days and shall be around one day. It may
243 * be set much shorter for debugging purpose.
245 #define DEFAULT_WAKEUP 86400000
248 * QUEUE_TIMEOUT is the time in ms after which an unused queue will be
249 * removed. A queue is unused if no client is connected to it.
250 * be set much shorter for debugging purpose.
251 * The time has a maximum of 49 days and may be set to one week. It may
252 * be set much shorter for debugging purpose.
254 #define QUEUE_TIMEOUT (86400000*7)
257 * RxTime defines a structure holding the time in milliseconds resolution.
258 * I know, most systems have at least one sort of high precision
259 * time structure, but the ugly "#ifdef" are unreadable if we use
260 * them all over here.
261 * Defining a structure on our own is much more helpful.
262 * Of course, we can't use a single 32 bit value for milliseconds. This
263 * will break the server after 49 days. unix will live a little bit longer
264 * and there shall Windows machines exist which doesn't have reboot since
265 * longer periods ;-)
267 typedef struct {
268 /* seconds are typical time_t values. */
269 time_t seconds ;
272 * milli's values are between 0 and 999. The special value -1 indicates
273 * a not-used condition.
275 int milli ;
276 } RxTime ;
279 * now holds the current time. It isn't updated after every operation and
280 * may be out of date by some milliseconds some times.
282 RxTime now ;
285 * This value is now plus 7 days.
287 RxTime queue_deadline ;
289 struct _Client ;
290 typedef struct _RxQueue {
292 * linked list maintainance elements
294 struct _RxQueue *prev, *next ;
295 /* name is the uppercased name of the queue.
297 streng *name ;
299 * Indicates if the queue is a "real" queue
300 * or a false queue as a result of a rxqueue('set', 'qname')
301 * on a queue that doesn't exist. This is crap behaviour!
302 * but that's how Object Rexx works :-(
304 int isReal ;
306 * Content: single buffered stack in opposite to the multi buffered
307 * internal stacks of Regina.
309 Buffer buf ;
311 * deadline is the time the queue was last used plus a timeout.
312 * The queue is removed if the queue isn't used for one week.
313 * Thus, the value is the time the queue was used last plus one week.
315 RxTime deadline ;
317 * Several clients may want to wait for incoming data. They are queued
318 * in the following structure and automatically reponsed by the
319 * data acceptor of the queue in a FIFO manner, which is a fair-queue
320 * algorithm.
321 * The clients will get a notice of am error if the queue is destroyed
322 * or emptied by an explicite call.
323 * structure (n = newer, o = older)
324 * oldest newest
325 * || ||
326 * NULL<-o--client<-o--client<-o--client--n->NULL
327 * | ^ | ^
328 * | | | |
329 * +--n--+ +--n--+
331 struct _Client *oldest, *newest;
332 } RxQueue;
335 * queues we work on.
336 * Format, p=prev, n=next:
337 * queues
338 * ||
339 * NULL<--p-queue-n---->queue-n---->queue-n-->NULL
340 * ^ v ^ v
341 * | | | |
342 * +---p---+ +---p---+
344 RxQueue *queues ;
346 /* SESSION is the special queue which can't be deleted and to which clients
347 * drop when their current queue is deleted.
349 RxQueue *SESSION ;
352 * Structure for multiple clients
354 typedef struct _Client
357 * linked list maintainance elements
359 struct _Client *prev, *next ;
362 * socket contains the socket's handle
364 int socket;
367 * each client has a default queue associated. It must be valid all
368 * the times after initialization.
370 RxQueue *default_queue;
373 * if queue_timeout is set, the client expects an error code after
374 * this time instead of waiting until world's end.
375 * The value is in milliseconds.
376 * A value of zero means no timeout; return immediately if no data
377 * A value of -1 means wait forever
379 long queue_timeout;
382 * We manage a deadline. A PULL operation is in an error state after
383 * this timestamp. The value is set at a PULL operation to
384 * now+queue_timeout.
386 RxTime deadline ;
389 * linked list maintainance elements for waiters.
391 struct _Client *older, *newer ;
392 } Client;
395 * clients we work on.
396 * Format, p=prev, n=next:
397 * clients
398 * ||
399 * NULL<--p-client-n--->client-n--->client-n-->NULL
400 * ^ v ^ v
401 * | | | |
402 * +---p---+ +---p---+
404 Client *clients;
406 int running = 1;
407 int allclean = 0;
408 time_t base_secs; /* the time the process started */
410 void empty_queue( RxQueue *q ) ;
412 #if !defined(HAVE_STRERROR)
414 * Sigh! This must probably be done this way, although it's incredibly
415 * backwards. Some versions of gcc comes with a complete set of ANSI C
416 * include files, which contains the definition of strerror(). However,
417 * that function does not exist in the default libraries of SunOS.
418 * To circumvent that problem, strerror() is #define'd to get_sys_errlist()
419 * in config.h, and here follows the definition of that function.
420 * Originally, strerror() was #defined to sys_errlist[x], but that does
421 * not work if string.h contains a declaration of the (non-existing)
422 * function strerror().
424 * So, this is a mismatch between the include files and the library, and
425 * it should not create problems for Regina. However, the _user_ will not
426 * encounter any problems until he compiles Regina, so we'll have to
427 * clean up after a buggy installation of the C compiler!
429 const char *get_sys_errlist( int num )
431 extern char *sys_errlist[] ;
432 return sys_errlist[num] ;
434 #endif
436 #ifdef WIN32
437 const volatile char *Win32ErrorString(int code)
439 static char buffer[512];
440 size_t len;
441 const CHAR *array[10];
442 static HINSTANCE tcpip = NULL;
443 DWORD rc;
445 for (rc = 0;rc < sizeof(array) / sizeof(array[0]);rc++)
446 array[rc] = "?";
448 sprintf(buffer,"code %d: ",code);
449 len = strlen(buffer);
451 if (tcpip == NULL)
452 tcpip = GetModuleHandle("wsock32");
453 rc = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM |
454 FORMAT_MESSAGE_FROM_HMODULE |
455 FORMAT_MESSAGE_ARGUMENT_ARRAY |
456 FORMAT_MESSAGE_MAX_WIDTH_MASK,
457 tcpip, /* lpSource */
458 code,
459 GetUserDefaultLangID(),
460 buffer + len,
461 sizeof(buffer) - len - 1, /* w/o term. 0 */
462 (va_list *) &array);
463 if ((rc == 0) && (len >= 2))
464 buffer[len - 2] = '\0'; /* cut off ": " at the end*/
465 return(buffer);
467 #endif
470 * get_now returns the current time.
472 RxTime get_now( void )
474 RxTime retval ;
475 #if defined(HAVE_GETTIMEOFDAY)
476 struct timeval times ;
478 gettimeofday(&times, NULL) ;
479 retval.seconds = times.tv_sec ;
480 retval.milli = times.tv_usec / 1000 ;
482 #elif defined(MAXLONGLONG)
483 static enum { T_first, T_OK, T_illegal } state = T_first ;
484 static LARGE_INTEGER freq;
485 LARGE_INTEGER curr;
487 retval.seconds = 0; /* Keep compiler happy */
488 if ( state == T_first )
490 if ( !QueryPerformanceFrequency( &freq ) )
491 state = T_illegal ;
492 else
493 state = T_OK ;
495 if ( state == T_OK )
497 if ( !QueryPerformanceCounter( &curr ) )
498 state = T_illegal ;
499 else
501 ULONGLONG h ;
503 * if we don't native support for the 64 bit arithmetic, use
504 * doubles. Everything else is a pain.
505 * I hope we never get a compiler for Windows which can't compile
506 * this directly.
508 retval.seconds = (time_t) ( curr.QuadPart / freq.QuadPart ) ;
509 h = curr.QuadPart % freq.QuadPart ;
510 h *= 1000 ;
511 retval.milli = (int) ( h / freq.QuadPart ) ;
514 if ( state == T_illegal )
516 /* Windows systems have ftime in the C library */
517 struct timeb timebuffer;
519 ftime(&timebuffer);
520 retval.seconds = timebuffer.time ;
521 retval.milli = timebuffer.millitm ;
524 #elif defined(HAVE_FTIME)
525 struct timeb timebuffer;
527 ftime(&timebuffer);
528 retval.seconds = timebuffer.time ;
529 retval.milli = timebuffer.millitm ;
531 #else
532 clock_t c ;
534 if ( ( c = clock() ) == (clock_t) -1 )
537 * clock() values are not adjusted to 1.1.1970 and CLOCKS_PER_SEC
538 * may be a float or double
540 retval.seconds = (time_t) (c / CLOCKS_PER_SEC) ;
541 retval.milli = (int) ( ( c * 1000.0 ) / CLOCKS_PER_SEC) % 1000 ;
543 else
545 retval.seconds = time( NULL ) ;
546 retval.milli = 0 ;
548 #endif
549 return retval ;
553 * time_add increments an amount of milliseconds to time. the amount is
554 * usually a Client->queue_timeout.
556 static void time_add( RxTime *t, long incr )
558 time_t s = (time_t) ( incr / 1000 ) ;
559 int m = ( incr % 1000 ) ;
561 assert( t->milli != -1 ) ;
562 /* wrapping may occur, be careful */
563 s += t->seconds ;
564 if ( ( t->milli += m ) >= 1000 )
566 s++ ;
567 t->milli -= 1000 ;
569 if ( t->seconds > s )
571 /* wrapping! */
572 s = (time_t) -1 ; /* maximum 1 */
573 if ( t->seconds > s )
575 /* time_t is a signed value, most representations have
576 * MinX = -MaxX - 1
578 s++;
579 s = -s;
581 assert( t->seconds <= s ) ;
583 t->seconds = s ;
587 * time_diff returns t1 - t2 in milliseconds. -1 is returned on overflow;
588 * -1 is never returned else. -2 is used is the returned time will be
589 * negative.
591 static int time_diff( RxTime t1, RxTime t2 )
593 int retval, h ;
595 assert( t1.milli != -1 ) ;
596 assert( t2.milli != -1 ) ;
597 if ( t1.seconds < t2.seconds )
598 retval = -2 ;
599 else
601 retval = (int) ( ( t1.seconds - t2.seconds ) * 1000 ) ;
603 if ( ( t1.milli < t2.milli ) && ( retval == 0 ) )
604 retval = -2 ;
605 else
607 retval += t1.milli - t2.milli ;
609 /* final check for overflow */
610 h = (int) ( t1.seconds - t2.seconds ) ;
611 if ( ( ( retval / 1000 ) < h - 1 )
612 || ( ( retval / 1000 ) > h + 1 )
613 || ( retval < 0 ) )
614 return -1 ;
617 return retval ;
620 streng *Str_upper( streng *str )
622 int i;
624 for ( i = 0; i < PSTRENGLEN( str ); i++ )
626 if ( islower( str->value[i] ) )
627 str->value[i] = (char)toupper( str->value[i] );
629 return str;
632 /* compares 2 strengs and returns 0 if they are equal, 1 if not.
633 * The second one is converted to uppercase while comparing, the first
634 * one must be uppercase.
636 int Str_ccmp( const streng *first, const streng *second )
638 int tmp ;
640 if ( PSTRENGLEN( first ) != PSTRENGLEN( second ) )
641 return 1 ;
643 for (tmp=0; tmp < PSTRENGLEN( first ); tmp++ )
644 if ( first->value[tmp] != toupper( second->value[tmp] ) )
645 return 1 ;
647 return 0 ;
650 /* Str_cre_or_exit create a streng or exits after a message about
651 * missing memory.
653 streng *Str_cre_or_exit( const char *str, unsigned length )
655 streng *retval ;
657 if ( ( retval = MAKESTRENG( length ) ) == NULL )
659 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
660 exit( ERR_STORAGE_EXHAUSTED );
663 memcpy( PSTRENGVAL( retval ), str, length ) ;
664 retval->len = length ;
665 return retval ;
669 * Str_buf create a streng from a buffer. It may return NULL on error.
671 streng *Str_buf( const char *str, unsigned length )
673 streng *retval ;
675 if ( ( retval = MAKESTRENG( length ) ) == NULL )
677 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
678 return NULL;
681 memcpy( PSTRENGVAL( retval ), str, length );
682 retval->len = length;
683 return retval;
687 * Str_dup duplicates a streng. It may return NULL on error.
689 streng *Str_dup( const streng *str )
691 return Str_buf( PSTRENGVAL( str ), PSTRENGLEN( str ) ) ;
695 * delete_a_queue deletes the queue's content and unlinks it from the list
696 * of existing queues if q isn't SESSION.
698 void delete_a_queue( RxQueue *q )
700 Client *c ;
702 if ( q->name )
704 DEBUGDUMP(printf("Deleting queue <%.*s>\n", PSTRENGLEN( q->name), PSTRENGVAL( q->name)););
706 else
708 DEBUGDUMP(printf("Deleting natal queue\n"););
710 empty_queue( q ) ;
712 if ( q == SESSION )
713 return ;
715 if ( q->name != NULL )
716 DROPSTRENG( q->name );
719 * dequeue from the linked list and free space
721 if (q->prev != NULL)
722 q->prev->next = q->next ;
723 else
724 queues = q->next ;
725 if (q->next != NULL)
726 q->next->prev = q->prev ;
727 free( q ) ;
730 * Let all clients connected to this queue fall back to "SESSION" as
731 * the default queue.
732 * FIXME: It may be better to leave a queue to a non-isReal state.
733 * This kind of work destroys the data integrity on SESSION.
734 * We get many more connections working on SESSION than expected.
736 for( c = clients; c != NULL; c = c->next )
738 if ( c->default_queue == q )
739 c->default_queue = SESSION ;
744 * delete_a_client deletes the clients' content and unlinks it from the list
745 * of existing clients.
747 void delete_a_client( Client *c )
749 close( c->socket ) ;
752 * dequeue from the linked list and free space
754 if (c->prev != NULL)
755 c->prev->next = c->next ;
756 else
757 clients = c->next ;
758 if (c->next != NULL)
759 c->next->prev = c->prev ;
760 free( c ) ;
763 void delete_all_queues( void )
765 RxQueue *q, *h ;
768 * SESSION won't be deleted. Be careful to initiate one delete per
769 * queue.
771 for ( q = queues; q != NULL; )
773 h = q ;
774 q = q->next ;
775 delete_a_queue( h );
779 char *get_unspecified_queue( void )
781 char *rxq = getenv( "RXQUEUE" );
783 if ( rxq == NULL )
784 rxq = "SESSION";
786 if ( strchr(rxq, '@' ) == NULL )
788 char *h ;
790 if ( ( h = (char *)malloc( strlen( rxq ) + 2 ) ) != NULL )
792 strcpy( h, rxq ) ;
793 strcat( h, "@" ) ;
794 rxq = h ;
797 return rxq;
800 int suicide( void )
802 int sock;
803 streng *queue;
804 char *in_queue=get_unspecified_queue();
805 Queue q;
807 if ( init_external_queue( NULL ) )
808 return 1;
810 queue = Str_cre_or_exit( in_queue, strlen( in_queue ) ) ;
812 if ( parse_queue( NULL, queue, &q ) == 1 )
814 sock = connect_to_rxstack( NULL, &q );
815 if ( sock < 0 )
817 /* error already shown by the function */
818 return(ERR_RXSTACK_CANT_CONNECT);
820 send_command_to_rxstack( NULL, sock, RXSTACK_KILL_STR, NULL, 0 );
821 read_result_from_rxstack( NULL, sock, RXSTACK_HEADER_SIZE );
822 close(sock);
824 term_external_queue( ) ;
825 return 0;
828 int rxstack_cleanup( void )
830 if ( !allclean )
832 DEBUGDUMP(printf("Cleaning up\n"););
834 * Disconnect all clients
835 * Delete all clients
837 delete_all_queues();
838 DEBUGDUMP(printf("Finished Cleaning up\n"););
839 term_external_queue( ) ;
840 allclean = 1;
842 return 0;
845 #ifdef BUILD_NT_SERVICE
846 BOOL report_service_start( void )
849 * report the status to the service control manager.
851 return (ReportStatusToSCMgr(
852 SERVICE_RUNNING, /* service state */
853 NO_ERROR, /* exit code */
854 0)); /* wait hint */
857 BOOL report_service_pending_start( void )
860 * report the status to the service control manager.
862 return (ReportStatusToSCMgr(
863 SERVICE_START_PENDING, /* service state */
864 NO_ERROR, /* exit code */
865 3000)); /* wait hint */
868 int nt_service_start( void )
871 * code copied from sample NT Service code. The goto's are
872 * not mine!!
873 * report the status to the service control manager.
875 if ( !report_service_pending_start() )
876 goto cleanupper;
879 * create the event object. The control handler function signals
880 * this event when it receives the "stop" control code.
882 hServerStopEvent = CreateEvent(
883 NULL, /* no security attributes */
884 TRUE, /* manual reset event */
885 FALSE, /* not-signalled */
886 NULL); /* no name */
888 if ( hServerStopEvent == NULL)
889 goto cleanupper;
892 * report the status to the service control manager.
894 if ( !report_service_pending_start() )
895 goto cleanupper;
897 return 0;
898 cleanupper:
899 return 1;
902 VOID ServiceStop()
904 DEBUGDUMP(printf("In ServiceStop()\n"););
905 suicide();
907 running = 0;
910 #endif
912 void rxstack_signal_handler( int sig )
914 running = 0;
917 /* Creates a new client and appends it in front of the current clients.
918 * Don't forget to set a default_queue and the socket at once.
920 Client *get_new_client( )
922 Client *retval = (Client *)malloc( sizeof( Client ) ) ;
924 if ( retval == NULL )
925 return NULL ;
926 memset( retval, 0, sizeof( Client ) ) ;
927 retval->socket = -1 ;
928 retval->deadline.milli = -1 ; /* deadline not used --> infinite timeout */
930 retval->next = clients ;
931 if ( clients != NULL )
932 clients->prev = retval ;
933 clients = retval ;
934 return retval ;
938 * Find the named queue - case insensitive
939 * returns the queue or NULL if no queue with this name exists.
941 RxQueue *find_queue( const streng *queue_name )
943 RxQueue *q ;
945 for ( q = queues; q != NULL; q = q->next )
947 /* This is inefficient, FIXME: Introduce a hash value */
948 if ( Str_ccmp( q->name, queue_name ) == 0 )
949 return q;
951 return NULL ;
954 /* Creates a new queue and appends it in front of the current queues.
955 * Don't forget to set a name at once.
957 RxQueue *get_new_queue( void )
959 RxQueue *retval = (RxQueue *)malloc( sizeof( RxQueue ) ) ;
961 if ( retval == NULL )
962 return NULL ;
963 memset( retval, 0, sizeof( RxQueue ) ) ;
964 retval->deadline = queue_deadline ;
966 retval->next = queues ;
967 if ( queues != NULL )
968 queues->prev = retval ;
969 queues = retval ;
970 return retval ;
973 int rxstack_delete_queue( Client *client, streng *queue_name )
975 RxQueue *q ;
976 int rc ;
978 if ( ( q = find_queue( queue_name ) ) == NULL )
980 rc = 9;
982 else
984 if ( q == SESSION )
985 rc = 5;
986 else
988 if ( !q->isReal )
991 * If we found a false queue, return 9
992 * but delete it.
994 delete_a_queue( q );
995 rc = 9;
997 else
1000 * Delete the contents of the queue
1001 * and mark it as gone.
1003 delete_a_queue( q );
1004 rc = 0;
1008 return rc ;
1011 int rxstack_create_client( int socket )
1013 Client *c ;
1015 if ( ( c = get_new_client( ) ) == NULL )
1017 close( socket ) ;
1018 /* This may have been the connectioon telling us to go down ;-) */
1019 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1020 exit( ERR_STORAGE_EXHAUSTED );
1023 c->socket = socket;
1024 c->default_queue = SESSION ;
1025 return 0;
1028 /* rxstack_send_return writes back to the client the action return code
1029 * and optionally a string of length len.
1030 * The functions returns 0 on success, -1 on error
1032 int rxstack_send_return( int sock, char *action, char *str, int len )
1034 streng *qlen, *header;
1035 int rc, retval = 0 ;
1037 DEBUGDUMP(printf("Sending to %d Result: %c <%.*s>\n", sock, *action, len, (str) ? str : ""););
1038 qlen = REXX_D2X( len );
1039 if ( qlen )
1041 header = REXX_RIGHT( qlen, RXSTACK_HEADER_SIZE, '0');
1042 DROPSTRENG( qlen );
1043 if ( header )
1045 header->value[0] = action[0];
1046 rc = send( sock, PSTRENGVAL(header), PSTRENGLEN(header), 0 );
1047 if ( rc != PSTRENGLEN(header) )
1049 DEBUGDUMP(printf("Send failed: rc> %d != PSTRENGLEN(header)> %d errno = %d\n", rc, PSTRENGLEN(header),os_errno ););
1050 retval = -1 ;
1052 else if ( str )
1054 rc = send( sock, str, len, 0 );
1055 if ( rc != len )
1057 DEBUGDUMP(printf("Send failed: errno = %d\n", os_errno ););
1058 retval = -1 ;
1061 DROPSTRENG( header );
1064 return retval ;
1067 int rxstack_delete_client( Client *client )
1069 delete_a_client( client ) ;
1070 return 0;
1073 /* rxstack_set_default_queue sets the client's (new) queue name.
1074 * A false queue is created if the queue isn't found.
1075 * The new queue is returned or NULL if we are out of memory.
1077 RxQueue *rxstack_set_default_queue( Client *client, streng *data )
1079 RxQueue *q, *prev;
1080 streng *newq;
1082 prev = client->default_queue;
1083 if ( ( q = find_queue( data ) ) == NULL )
1086 * We didn't find a real or a false queue, so create
1087 * a false queue
1089 q = get_new_queue( );
1090 if ( q != NULL )
1092 newq = Str_dup( data );
1093 if ( newq == NULL )
1095 delete_a_queue( q );
1096 return NULL;
1098 q->name = Str_upper( newq ) ;
1099 DEBUGDUMP(printf("Creating the false queue <%.*s>", PSTRENGLEN( q->name ), PSTRENGVAL( q->name ) ););
1100 /* q->isReal set to 0 by get_new_queue --> false queue */
1101 client->default_queue = q;
1104 else
1106 client->default_queue = q;
1109 if ( q == NULL )
1111 DEBUGDUMP(printf("No FREE MEMORY when setting default queue for client: <%.*s>\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1113 else
1115 DEBUGDUMP(printf("Setting default queue for client: <%.*s> Prev: %p <%.*s>\n", PSTRENGLEN(q->name), PSTRENGVAL(q->name), prev, PSTRENGLEN(prev->name), PSTRENGVAL(prev->name) ););
1116 /* SET or CREATE resets a timeout to 0; effectively turns off any timeout */
1117 client->queue_timeout = 0 ;
1119 return q;
1122 int rxstack_timeout_queue( Client *client, const streng *data )
1124 int val,error;
1127 * Convert the timeout
1128 * If the supplied timeout is 0 (infinite wait), set the client->queue_timeout
1129 * to -1.
1131 val = REXX_X2D( data, &error );
1132 if ( error )
1133 return 2;
1134 if ( val == 0 )
1135 val = -1;
1136 client->queue_timeout = val;
1137 DEBUGDUMP(printf("Timeout on queue: %ld\n", client->queue_timeout ););
1139 return 0;
1142 /* unique_name creates a unique name for a queue.
1143 * The function may exit after a message about missing memory.
1145 static streng *unique_name( void )
1147 static int first = 1 ;
1148 static char buf[ 80 ] ;
1149 static char *ptr ;
1150 static unsigned runner = 0;
1152 if ( first )
1154 first = 0 ;
1155 sprintf( buf, "S%d%ld", (int) getpid(), (long) time( NULL ) ) ;
1156 ptr = buf + strlen( buf ) ;
1158 sprintf( ptr, "%u", runner++ ) ;
1159 return Str_buf( buf, strlen( buf ) ) ;
1162 int rxstack_create_queue( Client *client, streng *data, streng **result )
1164 RxQueue *q ;
1165 streng *new_queue = NULL;
1166 int rc = 0;
1168 if ( data )
1170 DEBUGDUMP(printf("Creating new user-specified queue: <%.*s>\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1171 if ( ( q = find_queue( data ) ) == NULL )
1174 * No queue of that name, so use a duplicate of it.
1176 DEBUGDUMP(printf("Couldn't find <%.*s>; so creating it\n", PSTRENGLEN(data), PSTRENGVAL(data) ););
1177 new_queue = data;
1179 else
1182 * If the queue we found is a false queue, we can still
1183 * use the supplied name and the slot
1185 DROPSTRENG( data );
1186 if ( !q->isReal )
1188 DEBUGDUMP(printf("Found false queue\n" ););
1189 q->isReal = 1;
1190 /* SET or CREATE resets a timeout to 0 */
1191 client->queue_timeout = 0;
1192 *result = q->name;
1193 return 0; /* Pass back the name. May be different due to
1194 * different locales or codepages, but it IS the selected
1195 * name.
1198 new_queue = unique_name( );
1199 if ( new_queue == NULL )
1200 return 3;
1201 DEBUGDUMP(printf("Having to create unique queue <%.*s>\n", PSTRENGLEN( new_queue ), PSTRENGVAL( new_queue ) ););
1202 rc = 1;
1205 else
1207 DEBUGDUMP(printf("Creating system generated queue.\n"););
1208 new_queue = unique_name( );
1209 if ( new_queue == NULL )
1210 return 3;
1213 if ( ( q = get_new_queue( ) ) == NULL )
1215 DROPSTRENG( new_queue );
1216 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1217 return 3;
1221 * Uppercase the queue name
1223 q->name = Str_upper( new_queue );
1224 q->isReal = 1;
1225 /* SET or CREATE resets a timeout to 0 */
1226 client->queue_timeout = 0 ;
1227 *result = q->name;
1228 return rc; /* Both code 0 and code 1 return the name to the caller.
1229 * May be different due to codepages, etc.
1234 * Pushes 'line' onto the REXX stack in LIFO manner.
1235 * point to the new line. The line is put on top of the current
1236 * buffer.
1238 StackLine *rxstack_stack_lifo( RxQueue *current_queue, streng *line )
1240 StackLine *newbox ;
1242 if ( ( newbox = (StackLine *) malloc( sizeof(StackLine) ) ) != NULL )
1244 newbox->contents = line ;
1245 LIFO_LINE( &current_queue->buf, newbox ) ;
1248 return newbox;
1253 * Pushes 'line' onto the REXX stack in FIFO manner.
1254 * point to the new line. The line is put on top of the current
1255 * buffer.
1257 StackLine *rxstack_stack_fifo( RxQueue *current_queue, streng *line )
1259 StackLine *newbox ;
1261 if ( ( newbox = (StackLine *) malloc( sizeof(StackLine) ) ) != NULL )
1263 newbox->contents = line ;
1264 FIFO_LINE( &current_queue->buf, newbox ) ;
1267 return newbox;
1271 * dequeue_waiter dequeues the Client c from the waiter's list of the queue q.
1272 * The client must be linked in the waiter's list of the queue.
1273 * The client's variables for this purpose are cleaned, too.
1275 static void dequeue_waiter( RxQueue *q, Client *c )
1277 #if defined(DEBUG)
1278 Client *run ;
1279 for ( run = q->oldest; run != NULL; run = run->newer )
1280 if ( run == c )
1281 break ;
1282 assert( run != NULL ) ; /* This is the test if c is a waiter of q */
1283 #endif
1285 if ( c->older != NULL )
1287 c->older->newer = c->newer ;
1289 else
1291 q->oldest = c->newer ;
1292 if ( q->oldest != NULL )
1293 q->oldest->older = NULL ;
1296 if ( c->newer != NULL )
1298 c->newer->older = c->older ;
1300 else
1302 q->newest = c->older ;
1303 if ( q->newest != NULL )
1304 q->newest->newer = NULL ;
1306 assert( ( ( q->newest != NULL ) && ( q->oldest != NULL ) ) ||
1307 ( ( q->newest == NULL ) && ( q->oldest == NULL ) ) ) ;
1308 c->older = c->newer = NULL ;
1309 c->deadline.milli = -1 ;
1312 /* redir sends data as the answer of a pull operation back to
1313 * the oldest waiting client and dequeues this client.
1314 * data is dropped after the operation.
1316 void redir( RxQueue *q, streng *data )
1318 Client *c ;
1320 c = q->oldest ;
1321 DEBUGDUMP(printf("Redirecting <%.*s> to waiting client %d\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "", c->socket ););
1323 dequeue_waiter( q, c ) ;
1325 rxstack_send_return( c->socket, "0", PSTRENGVAL( data ), PSTRENGLEN( data ) ) ;
1326 DROPSTRENG( data ) ;
1329 /* bad_news_for_waiter informs a waiter about an error while waiting for
1330 * data for a pull request. The client is dequeued from its queue.
1332 void bad_news_for_waiter( RxQueue *q, Client *c )
1334 dequeue_waiter( q, c ) ;
1335 DEBUGDUMP(printf("Sending negative response to waiting client %d\n", c->socket ););
1337 rxstack_send_return( c->socket, "4", NULL, 0 ) ;
1340 int rxstack_queue_data( Client *client, streng *data, char order )
1342 int rc = 0;
1344 if ( client->default_queue->oldest != NULL )
1346 redir( client->default_queue, data ) ;
1347 return 0 ;
1349 DEBUGDUMP(printf("Queueing: <%.*s> Order: %c\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "", order ););
1350 if ( order == RXSTACK_QUEUE_FIFO )
1352 if ( rxstack_stack_fifo( client->default_queue, data ) == NULL )
1354 DROPSTRENG( data );
1355 rc = 3;
1358 else
1360 if ( rxstack_stack_lifo( client->default_queue, data ) == NULL )
1362 DROPSTRENG( data );
1363 rc = 3;
1366 return rc;
1369 /* Clears the content of the queue. All waiters are informed by code
1370 * 2 of a cleaned queue and removed the the waiter's list.
1372 void empty_queue( RxQueue *q )
1374 StackLine *tmp, *line;
1375 streng *contents;
1376 Buffer *b ;
1378 b = &q->buf ;
1379 for ( line = b->top; line != NULL; )
1381 contents = line->contents;
1382 DROPSTRENG( contents );
1383 tmp = line;
1384 line = line->lower;
1385 free( tmp );
1387 memset( &q->buf, 0, sizeof( Buffer ) ) ;
1389 /* acknowledge waiters for data not ready and dequeue them */
1391 while ( q->oldest != NULL ) {
1392 bad_news_for_waiter( q, q->oldest ) ;
1396 /* Clears the content of the queue named data. In opposite to the previous
1397 * version, the client's current queue isn't set to the named queue any
1398 * longer. The is the default behaviour in Regina.
1399 * returns 0 on success, 2 if the queue doesn't exist.
1401 int rxstack_empty_queue( Client *client, streng *data )
1403 RxQueue *q ;
1405 DEBUGDUMP(printf("Emptying queue: <%.*s>\n", PSTRENGLEN(data), (PSTRENGVAL(data)) ? PSTRENGVAL(data) : "" ););
1406 if ( ( q = find_queue( data ) ) == NULL )
1407 return 2;
1409 empty_queue( q ) ;
1411 return 0;
1414 int rxstack_number_in_queue( Client *client )
1416 int lines = (int) client->default_queue->buf.elements;
1418 DEBUGDUMP(printf("Querying number in queue: %d\n", lines ););
1419 return lines ;
1423 * Pulls a line off the queue and dequeues it.
1425 * If nowait isn't set and no data is available and the client's queue_timeout
1426 * is set, the client is set to the newest end of the client's default_queue.
1428 * It will be awaken by either ariving data on this pipe, or deleting/emptying
1429 * the pipe, or by a timeout.
1430 * Returns:
1431 * 0 if line OK
1432 * 1 if queue empty
1433 * 3 if waiting
1435 int rxstack_pull_line_off_queue( Client *client, streng **result, int nowait )
1437 int rc;
1438 Buffer *b;
1439 StackLine *line;
1440 RxQueue *q;
1442 b = &client->default_queue->buf;
1443 POP_LINE( b, line );
1444 if ( line != NULL )
1446 *result = line->contents;
1447 free( line );
1448 rc = 0;
1450 else
1452 *result = NULL;
1453 if ( nowait )
1455 rc = RXSTACK_EMPTY; /* queue empty */
1456 DEBUGDUMP(printf("nowait set to 1\n" ););
1458 else
1460 if ( client->queue_timeout == 0 )
1462 rc = RXSTACK_EMPTY; /* queue empty */
1463 DEBUGDUMP(printf("client timeout = 0\n" ););
1465 else
1467 assert( client->deadline.milli == -1 );
1468 assert ( client->newer == NULL );
1469 if ( client->queue_timeout != -1 )
1471 now = get_now( );
1472 client->deadline = now;
1473 time_add( &client->deadline, client->queue_timeout );
1475 q = client->default_queue;
1476 client->newer = NULL;
1477 client->older = q->newest;
1478 if ( client->older != NULL )
1479 client->older->newer = client;
1480 q->newest = client;
1481 if ( q->oldest == NULL )
1482 q->oldest = client;
1483 rc = RXSTACK_WAITING; /* waiting */
1484 DEBUGDUMP(printf("waiting until %ld.%d\n", client->deadline.seconds,client->deadline.milli ););
1488 DEBUGDUMP(printf("Pulling line off queue; rc %d\n", rc ););
1489 return rc;
1492 /* rxstack_process_command reads a new command from the client and processes
1493 * it.
1494 * returns 0 if the client has been terminated, 1 if the client persists.
1496 int rxstack_process_command( Client *client )
1498 RxQueue *q ;
1499 char cheader[RXSTACK_HEADER_SIZE];
1500 streng *header;
1501 streng *buffer = NULL ;
1502 streng *result=NULL;
1503 int rc,length;
1504 char rcode[2];
1506 if ( client->deadline.milli != -1 )
1509 * interrupted wait, assume the client don't want to wait for data any
1510 * longer
1512 bad_news_for_waiter( client->default_queue, client ) ;
1514 rcode[1] = '\0';
1515 memset( cheader, 0, sizeof(cheader) );
1516 DEBUGDUMP(printf("\nreading from socket %d\n", client->socket););
1517 rc = recv( client->socket, cheader, RXSTACK_HEADER_SIZE, 0 );
1518 if ( rc < 0 )
1520 if ( os_errno != ECONNRESET )
1522 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
1525 * Assume client has been lost
1527 rxstack_delete_client( client );
1528 return 0 ;
1530 if ( rc == 0 )
1532 DEBUGDUMP(printf("read empty header\n"););
1534 * Assume client has been lost
1536 rxstack_delete_client( client );
1537 return 0 ;
1539 else if ( rc != RXSTACK_HEADER_SIZE )
1541 DEBUGDUMP(printf("read corrupted header\n"););
1543 * Assume client has been lost
1545 rxstack_delete_client( client );
1546 return 0 ;
1549 header = MakeStreng( RXSTACK_HEADER_SIZE - 1 );
1550 if ( header == NULL )
1552 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1553 exit( ERR_STORAGE_EXHAUSTED );
1555 memcpy( PSTRENGVAL(header), cheader+1, RXSTACK_HEADER_SIZE-1 );
1556 header->len = RXSTACK_HEADER_SIZE-1 ;
1557 buffer = NULL;
1559 * Convert the data length
1561 length = REXX_X2D( header, &rc );
1562 if ( rc )
1565 * Errorneous number. Kill the client.
1567 DEBUGDUMP(printf("Invalid header: <%.*s>, client killed\n", header->len, header->value););
1568 rxstack_send_return( client->socket, "9", NULL, 0 );
1569 rxstack_delete_client( client );
1570 return 1;
1573 DEBUGDUMP(printf("Header: <%.*s> length: %d\n", header->len, header->value, length););
1574 DROPSTRENG( header );
1575 if ( length > 0 )
1578 * Allocate a streng big enough for the expected data
1579 * string, based on the length just read; even if the length
1580 * is zero
1582 buffer = MAKESTRENG ( length );
1583 if ( buffer == NULL )
1585 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1586 DEBUGDUMP(printf("can't buffer input of client\n"););
1587 rxstack_delete_client( client );
1588 return 0;
1590 rc = recv( client->socket, PSTRENGVAL(buffer), length, 0 );
1591 if ( rc < 0 )
1593 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_READING_SOCKET, ERR_RXSTACK_READING_SOCKET_TMPL, errno_str( os_errno ) );
1595 else if ( rc == 0 )
1598 * All we can assume here is that the client has been lost
1600 DEBUGDUMP(printf("read empty header\n"););
1601 rxstack_delete_client( client );
1602 DROPSTRENG( buffer ) ;
1603 return 0 ;
1605 else
1606 buffer->len = length ;
1609 switch( cheader[0] )
1611 case RXSTACK_QUEUE_FIFO:
1612 case RXSTACK_QUEUE_LIFO:
1613 DEBUGDUMP(printf("--- Queue %s ---\n", cheader[0] == RXSTACK_QUEUE_FIFO ? "FIFO" : "LIFO"););
1615 * fixes bug 700539
1617 if ( buffer == NULL )
1618 buffer = Str_buf( "", 0 );
1619 if ( buffer == NULL )
1620 rc = 3;
1621 else
1622 rc = rxstack_queue_data( client, buffer, cheader[0] );
1623 rcode[0] = (char)(rc+'0');
1624 rxstack_send_return( client->socket, rcode, NULL, 0 );
1625 buffer = NULL ; /* consumed by rxstack_queue_data */
1626 break;
1627 case RXSTACK_EXIT:
1628 DEBUGDUMP(printf("--- Exit ---\n"););
1630 * Client has requested disconnect, so remove all
1631 * references to the client
1633 rxstack_send_return( client->socket, "0", NULL, 0 );
1634 rxstack_delete_client( client );
1635 if ( buffer != NULL )
1637 DROPSTRENG( buffer );
1638 buffer = NULL;
1640 break;
1641 case RXSTACK_KILL:
1642 DEBUGDUMP(printf("--- Kill ---\n"););
1644 * Client has requested server to stop
1646 rxstack_send_return( client->socket, "0", NULL, 0 );
1647 rxstack_delete_client( client );
1648 running = 0;
1649 return 0;
1650 case RXSTACK_SET_QUEUE:
1651 DEBUGDUMP(printf("--- Set Queue ---\n"););
1653 * Set the default queue for the client
1655 if ( buffer == NULL )
1656 rxstack_send_return( client->socket, "6", NULL, 0 );
1657 else
1659 q = rxstack_set_default_queue( client, buffer );
1660 if ( q == NULL )
1661 rxstack_send_return( client->socket, "3", NULL, 0 );
1662 else
1663 rxstack_send_return( client->socket, "0", q->name->value, q->name->len );
1664 DROPSTRENG( buffer );
1665 buffer = NULL;
1667 break;
1668 case RXSTACK_EMPTY_QUEUE:
1669 DEBUGDUMP(printf("--- Empty Queue ---\n"););
1671 * Use the current queue as the default queue.
1673 if ( buffer == NULL )
1674 buffer = client->default_queue->name;
1675 rc = rxstack_empty_queue( client, buffer );
1676 rcode[0] = (char)(rc+'0');
1677 rxstack_send_return( client->socket, rcode, NULL, 0 );
1678 if ( buffer != client->default_queue->name )
1679 DROPSTRENG( buffer );
1680 buffer = NULL ;
1681 break;
1682 case RXSTACK_NUMBER_IN_QUEUE:
1683 DEBUGDUMP(printf("--- Number in Queue ---\n"););
1684 length = rxstack_number_in_queue( client );
1685 rxstack_send_return( client->socket, "0", NULL, length );
1686 if ( buffer != NULL )
1688 DROPSTRENG( buffer );
1689 buffer = NULL;
1691 break;
1692 case RXSTACK_PULL:
1693 case RXSTACK_FETCH:
1694 DEBUGDUMP(printf("--- Pull ---\n"););
1695 rc = rxstack_pull_line_off_queue( client, &result, cheader[0] == RXSTACK_FETCH );
1696 switch( rc )
1698 case 0: /* all OK */
1699 rxstack_send_return( client->socket, "0", PSTRENGVAL( result ), PSTRENGLEN( result ) );
1700 DROPSTRENG( result );
1701 break;
1702 case RXSTACK_WAITING: /* still waiting; don't return */
1703 break;
1704 default: /* empty/error */
1705 rcode[0] = (char)(rc+'0');
1706 rxstack_send_return( client->socket, rcode, NULL, 0 );
1707 break;
1709 if ( buffer != NULL )
1711 DROPSTRENG( buffer );
1712 buffer = NULL;
1714 break;
1715 case RXSTACK_GET_QUEUE:
1716 DEBUGDUMP(printf("--- Get Queue ---\n"););
1717 rxstack_send_return( client->socket, "0", PSTRENGVAL(client->default_queue->name), PSTRENGLEN(client->default_queue->name) ) ;
1718 if ( buffer != NULL )
1720 DROPSTRENG( buffer );
1721 buffer = NULL;
1723 break;
1724 case RXSTACK_CREATE_QUEUE:
1725 DEBUGDUMP(printf("--- Create Queue ---\n"););
1727 * Create a new queue
1729 rc = rxstack_create_queue( client, buffer, &result );
1730 rcode[0] = (char)(rc+'0');
1731 if ( ( rc != 1 ) && ( rc != 0 ) )
1732 rxstack_send_return( client->socket, rcode, NULL, 0 );
1733 else
1734 rxstack_send_return( client->socket, rcode, PSTRENGVAL(result), PSTRENGLEN(result) );
1735 buffer = NULL; /* consumed by rxstack_create_queue */
1736 break;
1737 case RXSTACK_DELETE_QUEUE:
1738 DEBUGDUMP(printf("--- Delete Queue ---\n"););
1740 * Delete the queue
1742 if ( buffer == NULL )
1743 rc = 6;
1744 else
1745 rc = rxstack_delete_queue( client, buffer );
1746 rcode[0] = (char)(rc+'0');
1747 rxstack_send_return( client->socket, rcode, NULL, 0 );
1748 if ( buffer != NULL )
1750 DROPSTRENG( buffer );
1751 buffer = NULL;
1753 break;
1754 case RXSTACK_TIMEOUT_QUEUE:
1755 DEBUGDUMP(printf("--- Timeout Queue ---\n"););
1757 * Set timeout for pull from queue
1759 if ( buffer == NULL )
1760 rc = 6;
1761 else
1762 rc = rxstack_timeout_queue( client, buffer );
1763 rcode[0] = (char)(rc+'0');
1764 rxstack_send_return( client->socket, rcode, NULL, 0 );
1765 if ( buffer != NULL )
1766 DROPSTRENG( buffer );
1767 buffer = NULL;
1768 break;
1769 case RXSTACK_UNKNOWN:
1770 /* do nothing */
1771 break;
1772 default:
1773 rxstack_send_return( client->socket, "9", NULL, 0 );
1774 break;
1776 assert( buffer == NULL ) ;
1777 if ( buffer != NULL )
1778 DROPSTRENG( buffer ) ;
1779 return 1;
1783 * earlier returns the time stamp which is more early.
1785 static RxTime earlier( RxTime t1, RxTime t2 )
1787 if ( time_diff( t1, t2 ) == -2 )
1788 return t1 ;
1789 return t2 ;
1792 /* check_for_waiting checks a client for a pending IO request.
1793 * We currently only support PULL requests.
1794 * The function returns immediately if the client doesn't wait.
1795 * In the other case it checks whether the maximum wait time has been
1796 * expired.
1797 * If the client is still waiting and the deadline isn't reached, it
1798 * checks if the deadline is more early then next_timeout and sets this
1799 * value is necessary.
1801 * The client's queue's deadline is set to the default deadline of queues in
1802 * all cases.
1804 void check_for_waiting( Client *client, RxTime *next_timeout )
1806 int diff ;
1808 client->default_queue->deadline = queue_deadline ;
1809 /* Do we are a waiter? */
1810 if ( client->deadline.milli == -1 )
1811 return ;
1814 * Check if there is anything in the queue...
1816 diff = time_diff( client->deadline, now ) ;
1817 if ( ( diff != -2 ) && ( diff != 0 ) )
1819 DEBUGDUMP(
1820 if ( diff == -1 )
1821 printf("Still waiting infinitely for %d\n", client->socket );
1822 else
1823 printf("Still waiting %d ms at max for %d\n", diff, client->socket );
1825 *next_timeout = earlier( *next_timeout, client->deadline ) ;
1827 else
1829 bad_news_for_waiter( client->default_queue, client ) ;
1833 /* check_queue checks a queue has reached its deadline and if no clients
1834 * have this queue as the default queue. The queue is deleted in this
1835 * case.
1836 * If the queuet is still valid and the deadline isn't reached, it
1837 * checks if the deadline is more early then next_timeout and sets this
1838 * value is necessary.
1840 void check_queue( RxQueue *q, RxTime *next_timeout )
1842 int diff ;
1843 Client *c ;
1845 diff = time_diff( q->deadline, now ) ;
1846 if ( ( diff != -2 ) && ( diff != 0 ) )
1848 *next_timeout = earlier( *next_timeout, q->deadline ) ;
1849 return ;
1852 for ( c = clients; c != NULL; c = c->next )
1854 if ( c->default_queue == q )
1856 q->deadline = queue_deadline ;
1857 *next_timeout = earlier( *next_timeout, q->deadline ) ;
1858 return ;
1862 DEBUGDUMP( printf( "Purging unused queue <%.*s>\n", PSTRENGLEN( q->name ), PSTRENGVAL( q->name ) ) );
1863 q->deadline = queue_deadline ; /* needed at least for SESSION */
1864 *next_timeout = earlier( *next_timeout, q->deadline ) ;
1865 delete_a_queue( q ) ;
1868 int rxstack_doit( )
1870 RxTime timeout ;
1871 int listen_sock,msgsock;
1872 struct sockaddr_in server,client;
1873 #ifdef HAVE_SOCKLEN_T
1874 socklen_t client_size ;
1875 #else
1876 int client_size ;
1877 #endif
1878 int portno,rc;
1879 Client *c, *ch ;
1880 RxQueue *q, *qh ;
1881 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
1882 # define POLL_INCR 16
1883 struct pollfd *pd = NULL ;
1884 unsigned poll_max = 0 ;
1885 unsigned poll_cnt = 0 ;
1886 #else
1887 int max_sock ;
1888 fd_set ready ;
1889 struct timeval to ;
1890 #endif
1891 #ifdef BUILD_NT_SERVICE
1892 char buf[30];
1893 #endif
1894 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
1895 int on = 1;
1896 #endif
1898 client_size = sizeof( struct sockaddr );
1899 #ifdef WIN32
1900 if ( init_external_queue( NULL ) )
1901 return 1;
1902 #endif
1904 #ifdef BUILD_NT_SERVICE
1905 if ( IsItNT()
1906 && !report_service_pending_start() )
1907 goto notrunning;
1908 #endif
1910 * Set up signal handler
1912 #ifdef SIGTERM
1913 signal( SIGTERM, rxstack_signal_handler );
1914 #endif
1915 #ifdef SIGINT
1916 signal( SIGINT, rxstack_signal_handler );
1917 #endif
1918 #ifdef SIGBREAK
1919 signal( SIGBREAK, rxstack_signal_handler );
1920 #endif
1921 #ifdef SIGPIPE
1922 signal( SIGPIPE, SIG_IGN );
1923 #endif
1924 clients = NULL ;
1925 queues = NULL ;
1928 * Initialise default "SESSION" queue
1930 if ( ( SESSION = get_new_queue( ) ) == NULL )
1932 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL ) ;
1933 return ERR_STORAGE_EXHAUSTED ;
1935 SESSION->name = Str_cre_or_exit( "SESSION", 7 ) ;
1936 SESSION->isReal = 1;
1938 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
1939 pd = (struct pollfd *)malloc( ( poll_max = POLL_INCR ) * sizeof( struct pollfd ) );
1940 if ( pd == NULL )
1942 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
1943 exit( ERR_STORAGE_EXHAUSTED );
1945 #endif
1947 #ifdef BUILD_NT_SERVICE
1948 if ( IsItNT()
1949 && !report_service_pending_start() )
1950 goto notrunning;
1951 #endif
1953 * Create listener socket
1955 listen_sock = socket(AF_INET, SOCK_STREAM, 0);
1956 if (listen_sock < 0)
1958 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Listening on socket", errno_str( os_errno ) );
1959 rxstack_cleanup();
1960 exit(ERR_RXSTACK_GENERAL);
1962 memset( &server, 0, sizeof(server) );
1963 server.sin_family = AF_INET;
1964 server.sin_addr.s_addr = htonl(INADDR_ANY);
1965 portno = default_port_number();
1966 server.sin_port = htons((unsigned short) portno);
1968 #ifdef BUILD_NT_SERVICE
1969 if ( IsItNT()
1970 && !report_service_pending_start() )
1971 goto notrunning;
1972 #endif
1974 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
1975 setsockopt( listen_sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof( on ) );
1976 #endif
1977 if ( bind(listen_sock, (struct sockaddr *)&server, sizeof(server)) < 0)
1979 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Error binding socket", errno_str( os_errno ) );
1980 rxstack_cleanup();
1981 exit( ERR_RXSTACK_GENERAL );
1983 #ifdef BUILD_NT_SERVICE
1984 sprintf(buf, "Listening on port: %d", portno );
1985 if ( IsItNT() )
1987 if ( !report_service_start() )
1988 goto notrunning;
1989 AddToMessageLog(TEXT(buf));
1991 else
1993 printf( "%s\n", buf );
1994 fflush(stdout);
1996 #else
1997 printf( "rxstack listening on port: %d\n", portno );
1998 fflush(stdout);
1999 #endif
2001 * Start accepting connections
2003 listen(listen_sock, 5);
2004 timeout = get_now( ) ;
2005 time_add( &timeout, DEFAULT_WAKEUP ) ;
2006 queue_deadline = now ;
2007 time_add( &queue_deadline, QUEUE_TIMEOUT ) ;
2008 while ( running )
2010 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2011 poll_cnt = 0 ;
2012 pd[ poll_cnt ].events = POLLIN ;
2013 pd[ poll_cnt++ ].fd = listen_sock ;
2014 DEBUGDUMP(printf("****** poll((%d", listen_sock););
2015 for ( c = clients; c != NULL; c = c->next )
2017 if ( c->socket == -1 )
2019 continue ;
2022 if ( poll_cnt == poll_max )
2024 pd = (struct pollfd *)realloc( pd, ( poll_max += POLL_INCR ) * sizeof( struct pollfd ) );
2025 if ( pd == NULL )
2027 showerror( ERR_STORAGE_EXHAUSTED, 0, ERR_STORAGE_EXHAUSTED_TMPL );
2028 exit( ERR_STORAGE_EXHAUSTED );
2032 pd[ poll_cnt ].events = POLLIN ;
2033 pd[ poll_cnt++ ].fd = c->socket ;
2034 DEBUGDUMP(printf(", %d", c->socket););
2036 #else
2037 FD_ZERO(&ready);
2038 FD_SET(listen_sock, &ready);
2039 DEBUGDUMP(printf("****** select((%d", listen_sock););
2040 max_sock = listen_sock;
2042 * For each connected client, allow its socket
2043 * to be triggered
2045 for ( c = clients; c != NULL; c = c->next )
2047 if ( c->socket != -1 )
2049 DEBUGDUMP(printf(", %d", c->socket););
2050 FD_SET( c->socket, &ready );
2051 if ( c->socket > max_sock )
2052 max_sock = c->socket;
2055 #endif
2056 now = get_now( ) ;
2057 rc = time_diff( timeout, now ) ;
2058 if ( rc == -2 )
2059 rc = 0 ; /* already timed out */
2060 if ( ( rc == -1 ) || ( rc > DEFAULT_WAKEUP ) )
2061 rc = DEFAULT_WAKEUP ;
2062 DEBUGDUMP(printf("), to=%d) ms at %ld,%03d\n", rc, now.seconds, now.milli ););
2063 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2064 rc = poll( pd, poll_cnt, rc ) ;
2065 #else
2066 to.tv_usec = ( rc % 1000 ) * 1000 ; /* microseconds fraction */
2067 to.tv_sec = rc / 1000;
2068 rc = select( max_sock + 1, &ready, (fd_set *)0, (fd_set *)0, &to ) ;
2069 #endif
2070 now = get_now( ) ;
2071 DEBUGDUMP(printf("****** after waiting(), rc=%d at %ld,%03d\n", rc, now.seconds, now.milli ););
2072 if ( rc < 0 )
2074 if ( os_errno != EINTR ) /* Win32 doesn't know about it ? */
2076 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Calling select", errno_str( os_errno ) );
2077 exit( ERR_RXSTACK_GENERAL );
2079 continue ;
2081 if ( rc )
2083 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2084 if ( pd[ 0 ].revents )
2085 #else
2086 if ( FD_ISSET(listen_sock, &ready ) )
2087 #endif
2089 msgsock = accept(listen_sock, (struct sockaddr *)&client, &client_size);
2090 if (msgsock == -1)
2092 showerror( ERR_EXTERNAL_QUEUE, ERR_RXSTACK_GENERAL, ERR_RXSTACK_GENERAL_TMPL, "Calling listen", errno_str( os_errno ) );
2093 rxstack_cleanup();
2094 exit( ERR_RXSTACK_GENERAL );
2096 else
2099 * A client has connected, create a client entry
2100 * and set their default queue to SESSION
2103 * Validate the client here...TBD
2104 * use details in client sockaddr struct
2106 DEBUGDUMP(printf("Client connecting from %s has socket %d\n", inet_ntoa( client.sin_addr ), msgsock ););
2107 rxstack_create_client( msgsock );
2110 else
2112 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2113 for ( c = clients, poll_cnt = 1; c != NULL; poll_cnt++ )
2114 #else
2115 for ( c = clients; c != NULL; )
2116 #endif
2118 /* c might be deleted by the following calls.
2119 * Assure we have everything perfect to use the
2120 * next element. An access to c after rxstack_process_command
2121 * is forbidden.
2123 ch = c ;
2124 c = c->next ;
2125 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2126 if ( pd[ poll_cnt ].revents )
2127 #else
2128 if ( FD_ISSET( ch->socket, &ready ) )
2129 #endif
2132 * Process the client's command...
2134 rxstack_process_command( ch ) ;
2140 * If select() timed out or received input, check all connected clients who
2141 * may be waiting for input on one of the queues.
2143 * now contains the time between the start of select() call and now
2144 * in milliseconds
2146 now = get_now();
2147 timeout = get_now( ) ;
2148 time_add( &timeout, DEFAULT_WAKEUP ) ;
2149 queue_deadline = now ;
2150 time_add( &queue_deadline, QUEUE_TIMEOUT ) ;
2151 for ( c = clients; c != NULL; c = c->next )
2153 check_for_waiting( c, &timeout );
2155 for ( q = queues; q != NULL; )
2157 qh = q ;
2158 q = q->next ;
2159 check_queue( qh, &timeout );
2162 #ifdef BUILD_NT_SERVICE
2163 notrunning:
2164 #endif
2165 return 0;
2169 * Gives a short usage description on stderr and returns 1
2171 static int usage( const char *argv0 )
2173 fprintf( stderr, "Usage: %s [-D] "
2174 #if defined(HAVE_FORK)
2175 "[-d|-k]"
2176 #else
2177 "[-k]"
2178 #endif
2179 "\n", argv0 ) ;
2180 return 1 ;
2183 static void checkDebug(void)
2185 if ( getenv( "RXDEBUG" ) != NULL )
2186 debug = 1 ;
2189 int runNormal( int argc, char **argv )
2191 int rc = 0 ;
2192 const char *argv0 = argv[ 0 ] ;
2194 argv++ ;
2195 argc-- ;
2197 checkDebug();
2199 if ( ( argc >= 1 ) && ( strcmp( *argv, "-D" ) == 0 ) )
2201 debug = 1 ;
2202 putenv( "RXDEBUG=1" ) ;
2203 argc-- ;
2204 argv++ ;
2206 if ( argc > 1 )
2208 return usage( argv0 );
2210 if ( argc == 1 )
2212 if ( strcmp(*argv, "-k") == 0 )
2214 return suicide();
2216 if ( strcmp(*argv, "-d") == 0 )
2218 #if defined(HAVE_FORK)
2219 if ( ( rc = fork() ) != 0 )
2220 exit(rc < 0);
2221 rc = rxstack_doit();
2222 #else
2223 fprintf( stderr, "Option \"-d\" option is invalid on this platform.\n" ) ;
2224 return usage( argv0 );
2225 #endif
2227 else
2229 return usage( argv0 );
2232 else
2234 rc = rxstack_doit();
2236 rxstack_cleanup();
2237 printf( "%s terminated.\n", argv0 );
2238 fflush(stdout);
2239 return rc;
2242 #ifdef BUILD_NT_SERVICE
2243 VOID ServiceStart(DWORD argc, LPTSTR *argv)
2244 #else
2245 int main(int argc, char *argv[])
2246 #endif
2248 #ifdef BUILD_NT_SERVICE
2249 if ( IsItNT() )
2251 if ( !nt_service_start() )
2253 checkDebug();
2254 rxstack_doit();
2256 rxstack_cleanup();
2257 return;
2259 else
2261 runNormal(argc, argv);
2263 #else
2264 return runNormal( argc, argv );
2265 #endif