2 * The Regina Rexx Interpreter
3 * Copyright (C) 1992-1994 Anders Christensen <anders@pvv.unit.no>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the Free
17 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 * This process runs as a daemon (or NT service). It maintains multiple,
22 * All communication is done via TCP/IP sockets.
23 * This process waits on a known port; 5656 by default for connections
24 * from clients. A client is any process that respects the Interface
25 * defined below. The "normal" clients are regina and rxqueue.
26 * Details about each client is kept, like current queue name.
31 * - set signal handler for SIGTERM
32 * initialise socket interface
39 * - if listen socket, add new client
40 * - otherwise read command
42 * - disconnect all clients
47 * Once a client connects, it sends commands
48 * Commands are single character, followed by optional 6 hex character
49 * length and optional data.
50 * F - queue data onto client's current queue (FIFO)
51 * in -> FFFFFFFxxx--data--xxx
52 * out-> 0000000 (if successful)
53 * out-> 2xxxxxx (if error, eg queue deleted)
54 * out-> 3000000 (memory allocation error)
55 * regina QUEUE, rxqueue /fifo
56 * L - push data onto client's current queue (LIFO)
57 * in-> LFFFFFFxxx--data--xxx
58 * out-> 0000000 (if successful)
59 * out-> 2xxxxxx (if error, eg queue deleted)
60 * out-> 3000000 (memory allocation error)
61 * regina PUSH, rxqueue /lifo
63 * in-> CFFFFFFxxx--queue name--xxx (if length 0, create name)
64 * out-> 0FFFFFFxxx--queue name--xxx (if queue name created)
65 * out-> 1FFFFFFxxx--queue name--xxx (if queue name existed)
66 * out-> 2xxxxxx (if error)
67 * regina RXQUEUE('C'), rxqueue N/A
69 * in-> DFFFFFFxxx--queue name--xxx
70 * out-> 0000000 (if queue name deleted)
71 * out-> 5xxxxxx (trying to delete 'SESSION' queue)
72 * out-> 6000000 (queue name not passed)
73 * out-> 9xxxxxx (if error, eg queue already deleted)
74 * regina RXQUEUE('D'), rxqueue N/A
75 * E - empty data from specified queue
76 * in-> EFFFFFFxxx--queue name--xxx
77 * out-> 0000000 (if queue emptied)
78 * out-> 2xxxxxx (if error, eg queue deleted)
79 * out-> 3000000 (memory allocation error)
80 * regina N/A, rxqueue /clear
81 * P - pop item off client's default queue
83 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
84 * out-> 1000000 (if queue empty)
85 * out-> 2xxxxxx (if queue name deleted - length ignored)
86 * out-> 4xxxxxx (if timeout on queue exceeded - length ignored)
87 * regina PULL, rxqueue N/A
88 * p - fetch item off client's default queue
90 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
91 * out-> 1000000 (if queue empty)
92 * out-> 2xxxxxx (if queue name deleted - length ignored)
93 * regina PULL without timeout, rxqueue N/A
94 *x S - set default queue name (allow false queues)
95 * in-> SFFFFFFxxx--queue name--xxx
96 * out-> 0000000 (if successful)
97 * out-> 3000000 (memory allocation error)
98 * out-> 6000000 (queue name not passed)
99 * regina RXQUEUE('S'), rxqueue N/A
100 * G - get default queue name
102 * out-> 0FFFFFFxxx--queue name--xxx
103 * regina RXQUEUE('G'), rxqueue N/A
104 * N - return number of lines on stack
106 * out-> 0FFFFFF (if queue exists)
107 * out-> 2xxxxxx (if error or queue doesn't exist - length ignored)
108 * regina QUEUED(), rxqueue N/A
109 * T - set timeout on queue pull
111 * out-> 0000000 (if queue timeout set)
112 * out-> 2xxxxxx (if error, eg invalid argument)
113 * out-> 6000000 (queue name not passed)
114 * regina RXQUEUE('T'), rxqueue N/A
115 * X - client disconnect
118 * Z - client requests shutdown - should only be called by ourselves!!
123 #define NO_CTYPE_REPLACEMENT
126 #if defined(WIN32) || defined(__LCC__)
127 # if defined(_MSC_VER)
128 # if _MSC_VER >= 1100
129 /* Stupid MSC can't compile own headers without warning at least in VC 5.0 */
130 # pragma warning(disable: 4115 4201 4214 4514)
132 # include <windows.h>
133 # if _MSC_VER >= 1100
134 # pragma warning(default: 4115 4201 4214)
136 # elif defined(__LCC__)
137 # include <windows.h>
139 # include <winsock.h>
141 # include <windows.h>
145 # ifdef HAVE_SYS_SOCKET_H
146 # include <sys/socket.h>
148 # ifdef HAVE_NETINET_IN_H
149 # include <netinet/in.h>
151 # if defined(HAVE_POLL_H) && defined(HAVE_POLL)
153 # elif defined(HAVE_SYS_POLL_H) && defined(HAVE_POLL)
154 # include <sys/poll.h>
155 # elif defined(HAVE_SYS_SELECT_H)
156 # include <sys/select.h>
161 # ifdef HAVE_ARPA_INET_H
162 # include <arpa/inet.h>
183 #ifdef HAVE_PROCESS_H
184 # include <process.h>
187 #if defined(TIME_WITH_SYS_TIME)
188 # include <sys/time.h>
191 # if defined(HAVE_SYS_TIME_H)
192 # include <sys/time.h>
201 #if defined(__WATCOMC__) || defined(_MSC_VER) || (defined(__IBMC__) && defined(WIN32)) || defined(__SASC) || defined(__MINGW32__) || defined(__BORLANDC__) || defined(DOS) || defined(__LCC__)
204 #if defined(__WATCOMC__) && defined(__QNX__)
208 #include "extstack.h"
210 #ifdef BUILD_NT_SERVICE
211 # include "service.h"
213 * this event is signalled when the
216 HANDLE hServerStopEvent
= NULL
;
220 # define os_errno ((int)WSAGetLastError())
221 # define errno_str(code) Win32ErrorString(code)
223 # define EINTR WSAEINTR
225 # define ECONNRESET WSAECONNRESET
227 # define os_errno errno
228 # define errno_str(code) strerror(code)
233 * debugging is turned off. You can turn it on by the command line option "-D".
235 static int debug
= 0 ;
236 #define DEBUGDUMP(x) { if ( debug ) \
241 * DEFAULT_WAKEUP is the time in ms after which the process shall wakeup.
242 * The time has a maximum of 49 days and shall be around one day. It may
243 * be set much shorter for debugging purpose.
245 #define DEFAULT_WAKEUP 86400000
248 * QUEUE_TIMEOUT is the time in ms after which an unused queue will be
249 * removed. A queue is unused if no client is connected to it.
250 * be set much shorter for debugging purpose.
251 * The time has a maximum of 49 days and may be set to one week. It may
252 * be set much shorter for debugging purpose.
254 #define QUEUE_TIMEOUT (86400000*7)
257 * RxTime defines a structure holding the time in milliseconds resolution.
258 * I know, most systems have at least one sort of high precision
259 * time structure, but the ugly "#ifdef" are unreadable if we use
260 * them all over here.
261 * Defining a structure on our own is much more helpful.
262 * Of course, we can't use a single 32 bit value for milliseconds. This
263 * will break the server after 49 days. unix will live a little bit longer
264 * and there shall Windows machines exist which doesn't have reboot since
268 /* seconds are typical time_t values. */
272 * milli's values are between 0 and 999. The special value -1 indicates
273 * a not-used condition.
279 * now holds the current time. It isn't updated after every operation and
280 * may be out of date by some milliseconds some times.
285 * This value is now plus 7 days.
287 RxTime queue_deadline
;
290 typedef struct _RxQueue
{
292 * linked list maintainance elements
294 struct _RxQueue
*prev
, *next
;
295 /* name is the uppercased name of the queue.
299 * Indicates if the queue is a "real" queue
300 * or a false queue as a result of a rxqueue('set', 'qname')
301 * on a queue that doesn't exist. This is crap behaviour!
302 * but that's how Object Rexx works :-(
306 * Content: single buffered stack in opposite to the multi buffered
307 * internal stacks of Regina.
311 * deadline is the time the queue was last used plus a timeout.
312 * The queue is removed if the queue isn't used for one week.
313 * Thus, the value is the time the queue was used last plus one week.
317 * Several clients may want to wait for incoming data. They are queued
318 * in the following structure and automatically reponsed by the
319 * data acceptor of the queue in a FIFO manner, which is a fair-queue
321 * The clients will get a notice of am error if the queue is destroyed
322 * or emptied by an explicite call.
323 * structure (n = newer, o = older)
326 * NULL<-o--client<-o--client<-o--client--n->NULL
331 struct _Client
*oldest
, *newest
;
336 * Format, p=prev, n=next:
339 * NULL<--p-queue-n---->queue-n---->queue-n-->NULL
342 * +---p---+ +---p---+
346 /* SESSION is the special queue which can't be deleted and to which clients
347 * drop when their current queue is deleted.
352 * Structure for multiple clients
354 typedef struct _Client
357 * linked list maintainance elements
359 struct _Client
*prev
, *next
;
362 * socket contains the socket's handle
367 * each client has a default queue associated. It must be valid all
368 * the times after initialization.
370 RxQueue
*default_queue
;
373 * if queue_timeout is set, the client expects an error code after
374 * this time instead of waiting until world's end.
375 * The value is in milliseconds.
376 * A value of zero means no timeout; return immediately if no data
377 * A value of -1 means wait forever
382 * We manage a deadline. A PULL operation is in an error state after
383 * this timestamp. The value is set at a PULL operation to
389 * linked list maintainance elements for waiters.
391 struct _Client
*older
, *newer
;
395 * clients we work on.
396 * Format, p=prev, n=next:
399 * NULL<--p-client-n--->client-n--->client-n-->NULL
402 * +---p---+ +---p---+
408 time_t base_secs
; /* the time the process started */
410 void empty_queue( RxQueue
*q
) ;
412 #if !defined(HAVE_STRERROR)
414 * Sigh! This must probably be done this way, although it's incredibly
415 * backwards. Some versions of gcc comes with a complete set of ANSI C
416 * include files, which contains the definition of strerror(). However,
417 * that function does not exist in the default libraries of SunOS.
418 * To circumvent that problem, strerror() is #define'd to get_sys_errlist()
419 * in config.h, and here follows the definition of that function.
420 * Originally, strerror() was #defined to sys_errlist[x], but that does
421 * not work if string.h contains a declaration of the (non-existing)
422 * function strerror().
424 * So, this is a mismatch between the include files and the library, and
425 * it should not create problems for Regina. However, the _user_ will not
426 * encounter any problems until he compiles Regina, so we'll have to
427 * clean up after a buggy installation of the C compiler!
429 const char *get_sys_errlist( int num
)
431 extern char *sys_errlist
[] ;
432 return sys_errlist
[num
] ;
437 const volatile char *Win32ErrorString(int code
)
439 static char buffer
[512];
441 const CHAR
*array
[10];
442 static HINSTANCE tcpip
= NULL
;
445 for (rc
= 0;rc
< sizeof(array
) / sizeof(array
[0]);rc
++)
448 sprintf(buffer
,"code %d: ",code
);
449 len
= strlen(buffer
);
452 tcpip
= GetModuleHandle("wsock32");
453 rc
= FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM
|
454 FORMAT_MESSAGE_FROM_HMODULE
|
455 FORMAT_MESSAGE_ARGUMENT_ARRAY
|
456 FORMAT_MESSAGE_MAX_WIDTH_MASK
,
457 tcpip
, /* lpSource */
459 GetUserDefaultLangID(),
461 sizeof(buffer
) - len
- 1, /* w/o term. 0 */
463 if ((rc
== 0) && (len
>= 2))
464 buffer
[len
- 2] = '\0'; /* cut off ": " at the end*/
470 * get_now returns the current time.
472 RxTime
get_now( void )
475 #if defined(HAVE_GETTIMEOFDAY)
476 struct timeval times
;
478 gettimeofday(×
, NULL
) ;
479 retval
.seconds
= times
.tv_sec
;
480 retval
.milli
= times
.tv_usec
/ 1000 ;
482 #elif defined(MAXLONGLONG)
483 static enum { T_first
, T_OK
, T_illegal
} state
= T_first
;
484 static LARGE_INTEGER freq
;
487 retval
.seconds
= 0; /* Keep compiler happy */
488 if ( state
== T_first
)
490 if ( !QueryPerformanceFrequency( &freq
) )
497 if ( !QueryPerformanceCounter( &curr
) )
503 * if we don't native support for the 64 bit arithmetic, use
504 * doubles. Everything else is a pain.
505 * I hope we never get a compiler for Windows which can't compile
508 retval
.seconds
= (time_t) ( curr
.QuadPart
/ freq
.QuadPart
) ;
509 h
= curr
.QuadPart
% freq
.QuadPart
;
511 retval
.milli
= (int) ( h
/ freq
.QuadPart
) ;
514 if ( state
== T_illegal
)
516 /* Windows systems have ftime in the C library */
517 struct timeb timebuffer
;
520 retval
.seconds
= timebuffer
.time
;
521 retval
.milli
= timebuffer
.millitm
;
524 #elif defined(HAVE_FTIME)
525 struct timeb timebuffer
;
528 retval
.seconds
= timebuffer
.time
;
529 retval
.milli
= timebuffer
.millitm
;
534 if ( ( c
= clock() ) == (clock_t) -1 )
537 * clock() values are not adjusted to 1.1.1970 and CLOCKS_PER_SEC
538 * may be a float or double
540 retval
.seconds
= (time_t) (c
/ CLOCKS_PER_SEC
) ;
541 retval
.milli
= (int) ( ( c
* 1000.0 ) / CLOCKS_PER_SEC
) % 1000 ;
545 retval
.seconds
= time( NULL
) ;
553 * time_add increments an amount of milliseconds to time. the amount is
554 * usually a Client->queue_timeout.
556 static void time_add( RxTime
*t
, long incr
)
558 time_t s
= (time_t) ( incr
/ 1000 ) ;
559 int m
= ( incr
% 1000 ) ;
561 assert( t
->milli
!= -1 ) ;
562 /* wrapping may occur, be careful */
564 if ( ( t
->milli
+= m
) >= 1000 )
569 if ( t
->seconds
> s
)
572 s
= (time_t) -1 ; /* maximum 1 */
573 if ( t
->seconds
> s
)
575 /* time_t is a signed value, most representations have
581 assert( t
->seconds
<= s
) ;
587 * time_diff returns t1 - t2 in milliseconds. -1 is returned on overflow;
588 * -1 is never returned else. -2 is used is the returned time will be
591 static int time_diff( RxTime t1
, RxTime t2
)
595 assert( t1
.milli
!= -1 ) ;
596 assert( t2
.milli
!= -1 ) ;
597 if ( t1
.seconds
< t2
.seconds
)
601 retval
= (int) ( ( t1
.seconds
- t2
.seconds
) * 1000 ) ;
603 if ( ( t1
.milli
< t2
.milli
) && ( retval
== 0 ) )
607 retval
+= t1
.milli
- t2
.milli
;
609 /* final check for overflow */
610 h
= (int) ( t1
.seconds
- t2
.seconds
) ;
611 if ( ( ( retval
/ 1000 ) < h
- 1 )
612 || ( ( retval
/ 1000 ) > h
+ 1 )
620 streng
*Str_upper( streng
*str
)
624 for ( i
= 0; i
< PSTRENGLEN( str
); i
++ )
626 if ( islower( str
->value
[i
] ) )
627 str
->value
[i
] = (char)toupper( str
->value
[i
] );
632 /* compares 2 strengs and returns 0 if they are equal, 1 if not.
633 * The second one is converted to uppercase while comparing, the first
634 * one must be uppercase.
636 int Str_ccmp( const streng
*first
, const streng
*second
)
640 if ( PSTRENGLEN( first
) != PSTRENGLEN( second
) )
643 for (tmp
=0; tmp
< PSTRENGLEN( first
); tmp
++ )
644 if ( first
->value
[tmp
] != toupper( second
->value
[tmp
] ) )
650 /* Str_cre_or_exit create a streng or exits after a message about
653 streng
*Str_cre_or_exit( const char *str
, unsigned length
)
657 if ( ( retval
= MAKESTRENG( length
) ) == NULL
)
659 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
660 exit( ERR_STORAGE_EXHAUSTED
);
663 memcpy( PSTRENGVAL( retval
), str
, length
) ;
664 retval
->len
= length
;
669 * Str_buf create a streng from a buffer. It may return NULL on error.
671 streng
*Str_buf( const char *str
, unsigned length
)
675 if ( ( retval
= MAKESTRENG( length
) ) == NULL
)
677 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
681 memcpy( PSTRENGVAL( retval
), str
, length
);
682 retval
->len
= length
;
687 * Str_dup duplicates a streng. It may return NULL on error.
689 streng
*Str_dup( const streng
*str
)
691 return Str_buf( PSTRENGVAL( str
), PSTRENGLEN( str
) ) ;
695 * delete_a_queue deletes the queue's content and unlinks it from the list
696 * of existing queues if q isn't SESSION.
698 void delete_a_queue( RxQueue
*q
)
704 DEBUGDUMP(printf("Deleting queue <%.*s>\n", PSTRENGLEN( q
->name
), PSTRENGVAL( q
->name
)););
708 DEBUGDUMP(printf("Deleting natal queue\n"););
715 if ( q
->name
!= NULL
)
716 DROPSTRENG( q
->name
);
719 * dequeue from the linked list and free space
722 q
->prev
->next
= q
->next
;
726 q
->next
->prev
= q
->prev
;
730 * Let all clients connected to this queue fall back to "SESSION" as
732 * FIXME: It may be better to leave a queue to a non-isReal state.
733 * This kind of work destroys the data integrity on SESSION.
734 * We get many more connections working on SESSION than expected.
736 for( c
= clients
; c
!= NULL
; c
= c
->next
)
738 if ( c
->default_queue
== q
)
739 c
->default_queue
= SESSION
;
744 * delete_a_client deletes the clients' content and unlinks it from the list
745 * of existing clients.
747 void delete_a_client( Client
*c
)
752 * dequeue from the linked list and free space
755 c
->prev
->next
= c
->next
;
759 c
->next
->prev
= c
->prev
;
763 void delete_all_queues( void )
768 * SESSION won't be deleted. Be careful to initiate one delete per
771 for ( q
= queues
; q
!= NULL
; )
779 char *get_unspecified_queue( void )
781 char *rxq
= getenv( "RXQUEUE" );
786 if ( strchr(rxq
, '@' ) == NULL
)
790 if ( ( h
= (char *)malloc( strlen( rxq
) + 2 ) ) != NULL
)
804 char *in_queue
=get_unspecified_queue();
807 if ( init_external_queue( NULL
) )
810 queue
= Str_cre_or_exit( in_queue
, strlen( in_queue
) ) ;
812 if ( parse_queue( NULL
, queue
, &q
) == 1 )
814 sock
= connect_to_rxstack( NULL
, &q
);
817 /* error already shown by the function */
818 return(ERR_RXSTACK_CANT_CONNECT
);
820 send_command_to_rxstack( NULL
, sock
, RXSTACK_KILL_STR
, NULL
, 0 );
821 read_result_from_rxstack( NULL
, sock
, RXSTACK_HEADER_SIZE
);
824 term_external_queue( ) ;
828 int rxstack_cleanup( void )
832 DEBUGDUMP(printf("Cleaning up\n"););
834 * Disconnect all clients
838 DEBUGDUMP(printf("Finished Cleaning up\n"););
839 term_external_queue( ) ;
845 #ifdef BUILD_NT_SERVICE
846 BOOL
report_service_start( void )
849 * report the status to the service control manager.
851 return (ReportStatusToSCMgr(
852 SERVICE_RUNNING
, /* service state */
853 NO_ERROR
, /* exit code */
857 BOOL
report_service_pending_start( void )
860 * report the status to the service control manager.
862 return (ReportStatusToSCMgr(
863 SERVICE_START_PENDING
, /* service state */
864 NO_ERROR
, /* exit code */
865 3000)); /* wait hint */
868 int nt_service_start( void )
871 * code copied from sample NT Service code. The goto's are
873 * report the status to the service control manager.
875 if ( !report_service_pending_start() )
879 * create the event object. The control handler function signals
880 * this event when it receives the "stop" control code.
882 hServerStopEvent
= CreateEvent(
883 NULL
, /* no security attributes */
884 TRUE
, /* manual reset event */
885 FALSE
, /* not-signalled */
888 if ( hServerStopEvent
== NULL
)
892 * report the status to the service control manager.
894 if ( !report_service_pending_start() )
904 DEBUGDUMP(printf("In ServiceStop()\n"););
912 void rxstack_signal_handler( int sig
)
917 /* Creates a new client and appends it in front of the current clients.
918 * Don't forget to set a default_queue and the socket at once.
920 Client
*get_new_client( )
922 Client
*retval
= (Client
*)malloc( sizeof( Client
) ) ;
924 if ( retval
== NULL
)
926 memset( retval
, 0, sizeof( Client
) ) ;
927 retval
->socket
= -1 ;
928 retval
->deadline
.milli
= -1 ; /* deadline not used --> infinite timeout */
930 retval
->next
= clients
;
931 if ( clients
!= NULL
)
932 clients
->prev
= retval
;
938 * Find the named queue - case insensitive
939 * returns the queue or NULL if no queue with this name exists.
941 RxQueue
*find_queue( const streng
*queue_name
)
945 for ( q
= queues
; q
!= NULL
; q
= q
->next
)
947 /* This is inefficient, FIXME: Introduce a hash value */
948 if ( Str_ccmp( q
->name
, queue_name
) == 0 )
954 /* Creates a new queue and appends it in front of the current queues.
955 * Don't forget to set a name at once.
957 RxQueue
*get_new_queue( void )
959 RxQueue
*retval
= (RxQueue
*)malloc( sizeof( RxQueue
) ) ;
961 if ( retval
== NULL
)
963 memset( retval
, 0, sizeof( RxQueue
) ) ;
964 retval
->deadline
= queue_deadline
;
966 retval
->next
= queues
;
967 if ( queues
!= NULL
)
968 queues
->prev
= retval
;
973 int rxstack_delete_queue( Client
*client
, streng
*queue_name
)
978 if ( ( q
= find_queue( queue_name
) ) == NULL
)
991 * If we found a false queue, return 9
1000 * Delete the contents of the queue
1001 * and mark it as gone.
1003 delete_a_queue( q
);
1011 int rxstack_create_client( int socket
)
1015 if ( ( c
= get_new_client( ) ) == NULL
)
1018 /* This may have been the connectioon telling us to go down ;-) */
1019 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1020 exit( ERR_STORAGE_EXHAUSTED
);
1024 c
->default_queue
= SESSION
;
1028 /* rxstack_send_return writes back to the client the action return code
1029 * and optionally a string of length len.
1030 * The functions returns 0 on success, -1 on error
1032 int rxstack_send_return( int sock
, char *action
, char *str
, int len
)
1034 streng
*qlen
, *header
;
1035 int rc
, retval
= 0 ;
1037 DEBUGDUMP(printf("Sending to %d Result: %c <%.*s>\n", sock
, *action
, len
, (str
) ? str
: ""););
1038 qlen
= REXX_D2X( len
);
1041 header
= REXX_RIGHT( qlen
, RXSTACK_HEADER_SIZE
, '0');
1045 header
->value
[0] = action
[0];
1046 rc
= send( sock
, PSTRENGVAL(header
), PSTRENGLEN(header
), 0 );
1047 if ( rc
!= PSTRENGLEN(header
) )
1049 DEBUGDUMP(printf("Send failed: rc> %d != PSTRENGLEN(header)> %d errno = %d\n", rc
, PSTRENGLEN(header
),os_errno
););
1054 rc
= send( sock
, str
, len
, 0 );
1057 DEBUGDUMP(printf("Send failed: errno = %d\n", os_errno
););
1061 DROPSTRENG( header
);
1067 int rxstack_delete_client( Client
*client
)
1069 delete_a_client( client
) ;
1073 /* rxstack_set_default_queue sets the client's (new) queue name.
1074 * A false queue is created if the queue isn't found.
1075 * The new queue is returned or NULL if we are out of memory.
1077 RxQueue
*rxstack_set_default_queue( Client
*client
, streng
*data
)
1082 prev
= client
->default_queue
;
1083 if ( ( q
= find_queue( data
) ) == NULL
)
1086 * We didn't find a real or a false queue, so create
1089 q
= get_new_queue( );
1092 newq
= Str_dup( data
);
1095 delete_a_queue( q
);
1098 q
->name
= Str_upper( newq
) ;
1099 DEBUGDUMP(printf("Creating the false queue <%.*s>", PSTRENGLEN( q
->name
), PSTRENGVAL( q
->name
) ););
1100 /* q->isReal set to 0 by get_new_queue --> false queue */
1101 client
->default_queue
= q
;
1106 client
->default_queue
= q
;
1111 DEBUGDUMP(printf("No FREE MEMORY when setting default queue for client: <%.*s>\n", PSTRENGLEN(data
), PSTRENGVAL(data
) ););
1115 DEBUGDUMP(printf("Setting default queue for client: <%.*s> Prev: %p <%.*s>\n", PSTRENGLEN(q
->name
), PSTRENGVAL(q
->name
), prev
, PSTRENGLEN(prev
->name
), PSTRENGVAL(prev
->name
) ););
1116 /* SET or CREATE resets a timeout to 0; effectively turns off any timeout */
1117 client
->queue_timeout
= 0 ;
1122 int rxstack_timeout_queue( Client
*client
, const streng
*data
)
1127 * Convert the timeout
1128 * If the supplied timeout is 0 (infinite wait), set the client->queue_timeout
1131 val
= REXX_X2D( data
, &error
);
1136 client
->queue_timeout
= val
;
1137 DEBUGDUMP(printf("Timeout on queue: %ld\n", client
->queue_timeout
););
1142 /* unique_name creates a unique name for a queue.
1143 * The function may exit after a message about missing memory.
1145 static streng
*unique_name( void )
1147 static int first
= 1 ;
1148 static char buf
[ 80 ] ;
1150 static unsigned runner
= 0;
1155 sprintf( buf
, "S%d%ld", (int) getpid(), (long) time( NULL
) ) ;
1156 ptr
= buf
+ strlen( buf
) ;
1158 sprintf( ptr
, "%u", runner
++ ) ;
1159 return Str_buf( buf
, strlen( buf
) ) ;
1162 int rxstack_create_queue( Client
*client
, streng
*data
, streng
**result
)
1165 streng
*new_queue
= NULL
;
1170 DEBUGDUMP(printf("Creating new user-specified queue: <%.*s>\n", PSTRENGLEN(data
), PSTRENGVAL(data
) ););
1171 if ( ( q
= find_queue( data
) ) == NULL
)
1174 * No queue of that name, so use a duplicate of it.
1176 DEBUGDUMP(printf("Couldn't find <%.*s>; so creating it\n", PSTRENGLEN(data
), PSTRENGVAL(data
) ););
1182 * If the queue we found is a false queue, we can still
1183 * use the supplied name and the slot
1188 DEBUGDUMP(printf("Found false queue\n" ););
1190 /* SET or CREATE resets a timeout to 0 */
1191 client
->queue_timeout
= 0;
1193 return 0; /* Pass back the name. May be different due to
1194 * different locales or codepages, but it IS the selected
1198 new_queue
= unique_name( );
1199 if ( new_queue
== NULL
)
1201 DEBUGDUMP(printf("Having to create unique queue <%.*s>\n", PSTRENGLEN( new_queue
), PSTRENGVAL( new_queue
) ););
1207 DEBUGDUMP(printf("Creating system generated queue.\n"););
1208 new_queue
= unique_name( );
1209 if ( new_queue
== NULL
)
1213 if ( ( q
= get_new_queue( ) ) == NULL
)
1215 DROPSTRENG( new_queue
);
1216 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1221 * Uppercase the queue name
1223 q
->name
= Str_upper( new_queue
);
1225 /* SET or CREATE resets a timeout to 0 */
1226 client
->queue_timeout
= 0 ;
1228 return rc
; /* Both code 0 and code 1 return the name to the caller.
1229 * May be different due to codepages, etc.
1234 * Pushes 'line' onto the REXX stack in LIFO manner.
1235 * point to the new line. The line is put on top of the current
1238 StackLine
*rxstack_stack_lifo( RxQueue
*current_queue
, streng
*line
)
1242 if ( ( newbox
= (StackLine
*) malloc( sizeof(StackLine
) ) ) != NULL
)
1244 newbox
->contents
= line
;
1245 LIFO_LINE( ¤t_queue
->buf
, newbox
) ;
1253 * Pushes 'line' onto the REXX stack in FIFO manner.
1254 * point to the new line. The line is put on top of the current
1257 StackLine
*rxstack_stack_fifo( RxQueue
*current_queue
, streng
*line
)
1261 if ( ( newbox
= (StackLine
*) malloc( sizeof(StackLine
) ) ) != NULL
)
1263 newbox
->contents
= line
;
1264 FIFO_LINE( ¤t_queue
->buf
, newbox
) ;
1271 * dequeue_waiter dequeues the Client c from the waiter's list of the queue q.
1272 * The client must be linked in the waiter's list of the queue.
1273 * The client's variables for this purpose are cleaned, too.
1275 static void dequeue_waiter( RxQueue
*q
, Client
*c
)
1279 for ( run
= q
->oldest
; run
!= NULL
; run
= run
->newer
)
1282 assert( run
!= NULL
) ; /* This is the test if c is a waiter of q */
1285 if ( c
->older
!= NULL
)
1287 c
->older
->newer
= c
->newer
;
1291 q
->oldest
= c
->newer
;
1292 if ( q
->oldest
!= NULL
)
1293 q
->oldest
->older
= NULL
;
1296 if ( c
->newer
!= NULL
)
1298 c
->newer
->older
= c
->older
;
1302 q
->newest
= c
->older
;
1303 if ( q
->newest
!= NULL
)
1304 q
->newest
->newer
= NULL
;
1306 assert( ( ( q
->newest
!= NULL
) && ( q
->oldest
!= NULL
) ) ||
1307 ( ( q
->newest
== NULL
) && ( q
->oldest
== NULL
) ) ) ;
1308 c
->older
= c
->newer
= NULL
;
1309 c
->deadline
.milli
= -1 ;
1312 /* redir sends data as the answer of a pull operation back to
1313 * the oldest waiting client and dequeues this client.
1314 * data is dropped after the operation.
1316 void redir( RxQueue
*q
, streng
*data
)
1321 DEBUGDUMP(printf("Redirecting <%.*s> to waiting client %d\n", PSTRENGLEN(data
), (PSTRENGVAL(data
)) ? PSTRENGVAL(data
) : "", c
->socket
););
1323 dequeue_waiter( q
, c
) ;
1325 rxstack_send_return( c
->socket
, "0", PSTRENGVAL( data
), PSTRENGLEN( data
) ) ;
1326 DROPSTRENG( data
) ;
1329 /* bad_news_for_waiter informs a waiter about an error while waiting for
1330 * data for a pull request. The client is dequeued from its queue.
1332 void bad_news_for_waiter( RxQueue
*q
, Client
*c
)
1334 dequeue_waiter( q
, c
) ;
1335 DEBUGDUMP(printf("Sending negative response to waiting client %d\n", c
->socket
););
1337 rxstack_send_return( c
->socket
, "4", NULL
, 0 ) ;
1340 int rxstack_queue_data( Client
*client
, streng
*data
, char order
)
1344 if ( client
->default_queue
->oldest
!= NULL
)
1346 redir( client
->default_queue
, data
) ;
1349 DEBUGDUMP(printf("Queueing: <%.*s> Order: %c\n", PSTRENGLEN(data
), (PSTRENGVAL(data
)) ? PSTRENGVAL(data
) : "", order
););
1350 if ( order
== RXSTACK_QUEUE_FIFO
)
1352 if ( rxstack_stack_fifo( client
->default_queue
, data
) == NULL
)
1360 if ( rxstack_stack_lifo( client
->default_queue
, data
) == NULL
)
1369 /* Clears the content of the queue. All waiters are informed by code
1370 * 2 of a cleaned queue and removed the the waiter's list.
1372 void empty_queue( RxQueue
*q
)
1374 StackLine
*tmp
, *line
;
1379 for ( line
= b
->top
; line
!= NULL
; )
1381 contents
= line
->contents
;
1382 DROPSTRENG( contents
);
1387 memset( &q
->buf
, 0, sizeof( Buffer
) ) ;
1389 /* acknowledge waiters for data not ready and dequeue them */
1391 while ( q
->oldest
!= NULL
) {
1392 bad_news_for_waiter( q
, q
->oldest
) ;
1396 /* Clears the content of the queue named data. In opposite to the previous
1397 * version, the client's current queue isn't set to the named queue any
1398 * longer. The is the default behaviour in Regina.
1399 * returns 0 on success, 2 if the queue doesn't exist.
1401 int rxstack_empty_queue( Client
*client
, streng
*data
)
1405 DEBUGDUMP(printf("Emptying queue: <%.*s>\n", PSTRENGLEN(data
), (PSTRENGVAL(data
)) ? PSTRENGVAL(data
) : "" ););
1406 if ( ( q
= find_queue( data
) ) == NULL
)
1414 int rxstack_number_in_queue( Client
*client
)
1416 int lines
= (int) client
->default_queue
->buf
.elements
;
1418 DEBUGDUMP(printf("Querying number in queue: %d\n", lines
););
1423 * Pulls a line off the queue and dequeues it.
1425 * If nowait isn't set and no data is available and the client's queue_timeout
1426 * is set, the client is set to the newest end of the client's default_queue.
1428 * It will be awaken by either ariving data on this pipe, or deleting/emptying
1429 * the pipe, or by a timeout.
1435 int rxstack_pull_line_off_queue( Client
*client
, streng
**result
, int nowait
)
1442 b
= &client
->default_queue
->buf
;
1443 POP_LINE( b
, line
);
1446 *result
= line
->contents
;
1455 rc
= RXSTACK_EMPTY
; /* queue empty */
1456 DEBUGDUMP(printf("nowait set to 1\n" ););
1460 if ( client
->queue_timeout
== 0 )
1462 rc
= RXSTACK_EMPTY
; /* queue empty */
1463 DEBUGDUMP(printf("client timeout = 0\n" ););
1467 assert( client
->deadline
.milli
== -1 );
1468 assert ( client
->newer
== NULL
);
1469 if ( client
->queue_timeout
!= -1 )
1472 client
->deadline
= now
;
1473 time_add( &client
->deadline
, client
->queue_timeout
);
1475 q
= client
->default_queue
;
1476 client
->newer
= NULL
;
1477 client
->older
= q
->newest
;
1478 if ( client
->older
!= NULL
)
1479 client
->older
->newer
= client
;
1481 if ( q
->oldest
== NULL
)
1483 rc
= RXSTACK_WAITING
; /* waiting */
1484 DEBUGDUMP(printf("waiting until %ld.%d\n", client
->deadline
.seconds
,client
->deadline
.milli
););
1488 DEBUGDUMP(printf("Pulling line off queue; rc %d\n", rc
););
1492 /* rxstack_process_command reads a new command from the client and processes
1494 * returns 0 if the client has been terminated, 1 if the client persists.
1496 int rxstack_process_command( Client
*client
)
1499 char cheader
[RXSTACK_HEADER_SIZE
];
1501 streng
*buffer
= NULL
;
1502 streng
*result
=NULL
;
1506 if ( client
->deadline
.milli
!= -1 )
1509 * interrupted wait, assume the client don't want to wait for data any
1512 bad_news_for_waiter( client
->default_queue
, client
) ;
1515 memset( cheader
, 0, sizeof(cheader
) );
1516 DEBUGDUMP(printf("\nreading from socket %d\n", client
->socket
););
1517 rc
= recv( client
->socket
, cheader
, RXSTACK_HEADER_SIZE
, 0 );
1520 if ( os_errno
!= ECONNRESET
)
1522 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_READING_SOCKET
, ERR_RXSTACK_READING_SOCKET_TMPL
, errno_str( os_errno
) );
1525 * Assume client has been lost
1527 rxstack_delete_client( client
);
1532 DEBUGDUMP(printf("read empty header\n"););
1534 * Assume client has been lost
1536 rxstack_delete_client( client
);
1539 else if ( rc
!= RXSTACK_HEADER_SIZE
)
1541 DEBUGDUMP(printf("read corrupted header\n"););
1543 * Assume client has been lost
1545 rxstack_delete_client( client
);
1549 header
= MakeStreng( RXSTACK_HEADER_SIZE
- 1 );
1550 if ( header
== NULL
)
1552 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1553 exit( ERR_STORAGE_EXHAUSTED
);
1555 memcpy( PSTRENGVAL(header
), cheader
+1, RXSTACK_HEADER_SIZE
-1 );
1556 header
->len
= RXSTACK_HEADER_SIZE
-1 ;
1559 * Convert the data length
1561 length
= REXX_X2D( header
, &rc
);
1565 * Errorneous number. Kill the client.
1567 DEBUGDUMP(printf("Invalid header: <%.*s>, client killed\n", header
->len
, header
->value
););
1568 rxstack_send_return( client
->socket
, "9", NULL
, 0 );
1569 rxstack_delete_client( client
);
1573 DEBUGDUMP(printf("Header: <%.*s> length: %d\n", header
->len
, header
->value
, length
););
1574 DROPSTRENG( header
);
1578 * Allocate a streng big enough for the expected data
1579 * string, based on the length just read; even if the length
1582 buffer
= MAKESTRENG ( length
);
1583 if ( buffer
== NULL
)
1585 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1586 DEBUGDUMP(printf("can't buffer input of client\n"););
1587 rxstack_delete_client( client
);
1590 rc
= recv( client
->socket
, PSTRENGVAL(buffer
), length
, 0 );
1593 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_READING_SOCKET
, ERR_RXSTACK_READING_SOCKET_TMPL
, errno_str( os_errno
) );
1598 * All we can assume here is that the client has been lost
1600 DEBUGDUMP(printf("read empty header\n"););
1601 rxstack_delete_client( client
);
1602 DROPSTRENG( buffer
) ;
1606 buffer
->len
= length
;
1609 switch( cheader
[0] )
1611 case RXSTACK_QUEUE_FIFO
:
1612 case RXSTACK_QUEUE_LIFO
:
1613 DEBUGDUMP(printf("--- Queue %s ---\n", cheader
[0] == RXSTACK_QUEUE_FIFO
? "FIFO" : "LIFO"););
1617 if ( buffer
== NULL
)
1618 buffer
= Str_buf( "", 0 );
1619 if ( buffer
== NULL
)
1622 rc
= rxstack_queue_data( client
, buffer
, cheader
[0] );
1623 rcode
[0] = (char)(rc
+'0');
1624 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1625 buffer
= NULL
; /* consumed by rxstack_queue_data */
1628 DEBUGDUMP(printf("--- Exit ---\n"););
1630 * Client has requested disconnect, so remove all
1631 * references to the client
1633 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1634 rxstack_delete_client( client
);
1635 if ( buffer
!= NULL
)
1637 DROPSTRENG( buffer
);
1642 DEBUGDUMP(printf("--- Kill ---\n"););
1644 * Client has requested server to stop
1646 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1647 rxstack_delete_client( client
);
1650 case RXSTACK_SET_QUEUE
:
1651 DEBUGDUMP(printf("--- Set Queue ---\n"););
1653 * Set the default queue for the client
1655 if ( buffer
== NULL
)
1656 rxstack_send_return( client
->socket
, "6", NULL
, 0 );
1659 q
= rxstack_set_default_queue( client
, buffer
);
1661 rxstack_send_return( client
->socket
, "3", NULL
, 0 );
1663 rxstack_send_return( client
->socket
, "0", q
->name
->value
, q
->name
->len
);
1664 DROPSTRENG( buffer
);
1668 case RXSTACK_EMPTY_QUEUE
:
1669 DEBUGDUMP(printf("--- Empty Queue ---\n"););
1671 * Use the current queue as the default queue.
1673 if ( buffer
== NULL
)
1674 buffer
= client
->default_queue
->name
;
1675 rc
= rxstack_empty_queue( client
, buffer
);
1676 rcode
[0] = (char)(rc
+'0');
1677 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1678 if ( buffer
!= client
->default_queue
->name
)
1679 DROPSTRENG( buffer
);
1682 case RXSTACK_NUMBER_IN_QUEUE
:
1683 DEBUGDUMP(printf("--- Number in Queue ---\n"););
1684 length
= rxstack_number_in_queue( client
);
1685 rxstack_send_return( client
->socket
, "0", NULL
, length
);
1686 if ( buffer
!= NULL
)
1688 DROPSTRENG( buffer
);
1694 DEBUGDUMP(printf("--- Pull ---\n"););
1695 rc
= rxstack_pull_line_off_queue( client
, &result
, cheader
[0] == RXSTACK_FETCH
);
1698 case 0: /* all OK */
1699 rxstack_send_return( client
->socket
, "0", PSTRENGVAL( result
), PSTRENGLEN( result
) );
1700 DROPSTRENG( result
);
1702 case RXSTACK_WAITING
: /* still waiting; don't return */
1704 default: /* empty/error */
1705 rcode
[0] = (char)(rc
+'0');
1706 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1709 if ( buffer
!= NULL
)
1711 DROPSTRENG( buffer
);
1715 case RXSTACK_GET_QUEUE
:
1716 DEBUGDUMP(printf("--- Get Queue ---\n"););
1717 rxstack_send_return( client
->socket
, "0", PSTRENGVAL(client
->default_queue
->name
), PSTRENGLEN(client
->default_queue
->name
) ) ;
1718 if ( buffer
!= NULL
)
1720 DROPSTRENG( buffer
);
1724 case RXSTACK_CREATE_QUEUE
:
1725 DEBUGDUMP(printf("--- Create Queue ---\n"););
1727 * Create a new queue
1729 rc
= rxstack_create_queue( client
, buffer
, &result
);
1730 rcode
[0] = (char)(rc
+'0');
1731 if ( ( rc
!= 1 ) && ( rc
!= 0 ) )
1732 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1734 rxstack_send_return( client
->socket
, rcode
, PSTRENGVAL(result
), PSTRENGLEN(result
) );
1735 buffer
= NULL
; /* consumed by rxstack_create_queue */
1737 case RXSTACK_DELETE_QUEUE
:
1738 DEBUGDUMP(printf("--- Delete Queue ---\n"););
1742 if ( buffer
== NULL
)
1745 rc
= rxstack_delete_queue( client
, buffer
);
1746 rcode
[0] = (char)(rc
+'0');
1747 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1748 if ( buffer
!= NULL
)
1750 DROPSTRENG( buffer
);
1754 case RXSTACK_TIMEOUT_QUEUE
:
1755 DEBUGDUMP(printf("--- Timeout Queue ---\n"););
1757 * Set timeout for pull from queue
1759 if ( buffer
== NULL
)
1762 rc
= rxstack_timeout_queue( client
, buffer
);
1763 rcode
[0] = (char)(rc
+'0');
1764 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1765 if ( buffer
!= NULL
)
1766 DROPSTRENG( buffer
);
1769 case RXSTACK_UNKNOWN
:
1773 rxstack_send_return( client
->socket
, "9", NULL
, 0 );
1776 assert( buffer
== NULL
) ;
1777 if ( buffer
!= NULL
)
1778 DROPSTRENG( buffer
) ;
1783 * earlier returns the time stamp which is more early.
1785 static RxTime
earlier( RxTime t1
, RxTime t2
)
1787 if ( time_diff( t1
, t2
) == -2 )
1792 /* check_for_waiting checks a client for a pending IO request.
1793 * We currently only support PULL requests.
1794 * The function returns immediately if the client doesn't wait.
1795 * In the other case it checks whether the maximum wait time has been
1797 * If the client is still waiting and the deadline isn't reached, it
1798 * checks if the deadline is more early then next_timeout and sets this
1799 * value is necessary.
1801 * The client's queue's deadline is set to the default deadline of queues in
1804 void check_for_waiting( Client
*client
, RxTime
*next_timeout
)
1808 client
->default_queue
->deadline
= queue_deadline
;
1809 /* Do we are a waiter? */
1810 if ( client
->deadline
.milli
== -1 )
1814 * Check if there is anything in the queue...
1816 diff
= time_diff( client
->deadline
, now
) ;
1817 if ( ( diff
!= -2 ) && ( diff
!= 0 ) )
1821 printf("Still waiting infinitely for %d\n", client
->socket
);
1823 printf("Still waiting %d ms at max for %d\n", diff
, client
->socket
);
1825 *next_timeout
= earlier( *next_timeout
, client
->deadline
) ;
1829 bad_news_for_waiter( client
->default_queue
, client
) ;
1833 /* check_queue checks a queue has reached its deadline and if no clients
1834 * have this queue as the default queue. The queue is deleted in this
1836 * If the queuet is still valid and the deadline isn't reached, it
1837 * checks if the deadline is more early then next_timeout and sets this
1838 * value is necessary.
1840 void check_queue( RxQueue
*q
, RxTime
*next_timeout
)
1845 diff
= time_diff( q
->deadline
, now
) ;
1846 if ( ( diff
!= -2 ) && ( diff
!= 0 ) )
1848 *next_timeout
= earlier( *next_timeout
, q
->deadline
) ;
1852 for ( c
= clients
; c
!= NULL
; c
= c
->next
)
1854 if ( c
->default_queue
== q
)
1856 q
->deadline
= queue_deadline
;
1857 *next_timeout
= earlier( *next_timeout
, q
->deadline
) ;
1862 DEBUGDUMP( printf( "Purging unused queue <%.*s>\n", PSTRENGLEN( q
->name
), PSTRENGVAL( q
->name
) ) );
1863 q
->deadline
= queue_deadline
; /* needed at least for SESSION */
1864 *next_timeout
= earlier( *next_timeout
, q
->deadline
) ;
1865 delete_a_queue( q
) ;
1871 int listen_sock
,msgsock
;
1872 struct sockaddr_in server
,client
;
1873 #ifdef HAVE_SOCKLEN_T
1874 socklen_t client_size
;
1881 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
1882 # define POLL_INCR 16
1883 struct pollfd
*pd
= NULL
;
1884 unsigned poll_max
= 0 ;
1885 unsigned poll_cnt
= 0 ;
1891 #ifdef BUILD_NT_SERVICE
1894 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
1898 client_size
= sizeof( struct sockaddr
);
1900 if ( init_external_queue( NULL
) )
1904 #ifdef BUILD_NT_SERVICE
1906 && !report_service_pending_start() )
1910 * Set up signal handler
1913 signal( SIGTERM
, rxstack_signal_handler
);
1916 signal( SIGINT
, rxstack_signal_handler
);
1919 signal( SIGBREAK
, rxstack_signal_handler
);
1922 signal( SIGPIPE
, SIG_IGN
);
1928 * Initialise default "SESSION" queue
1930 if ( ( SESSION
= get_new_queue( ) ) == NULL
)
1932 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
) ;
1933 return ERR_STORAGE_EXHAUSTED
;
1935 SESSION
->name
= Str_cre_or_exit( "SESSION", 7 ) ;
1936 SESSION
->isReal
= 1;
1938 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
1939 pd
= (struct pollfd
*)malloc( ( poll_max
= POLL_INCR
) * sizeof( struct pollfd
) );
1942 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1943 exit( ERR_STORAGE_EXHAUSTED
);
1947 #ifdef BUILD_NT_SERVICE
1949 && !report_service_pending_start() )
1953 * Create listener socket
1955 listen_sock
= socket(AF_INET
, SOCK_STREAM
, 0);
1956 if (listen_sock
< 0)
1958 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Listening on socket", errno_str( os_errno
) );
1960 exit(ERR_RXSTACK_GENERAL
);
1962 memset( &server
, 0, sizeof(server
) );
1963 server
.sin_family
= AF_INET
;
1964 server
.sin_addr
.s_addr
= htonl(INADDR_ANY
);
1965 portno
= default_port_number();
1966 server
.sin_port
= htons((unsigned short) portno
);
1968 #ifdef BUILD_NT_SERVICE
1970 && !report_service_pending_start() )
1974 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
1975 setsockopt( listen_sock
, SOL_SOCKET
, SO_REUSEADDR
, (void *) &on
, sizeof( on
) );
1977 if ( bind(listen_sock
, (struct sockaddr
*)&server
, sizeof(server
)) < 0)
1979 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Error binding socket", errno_str( os_errno
) );
1981 exit( ERR_RXSTACK_GENERAL
);
1983 #ifdef BUILD_NT_SERVICE
1984 sprintf(buf
, "Listening on port: %d", portno
);
1987 if ( !report_service_start() )
1989 AddToMessageLog(TEXT(buf
));
1993 printf( "%s\n", buf
);
1997 printf( "rxstack listening on port: %d\n", portno
);
2001 * Start accepting connections
2003 listen(listen_sock
, 5);
2004 timeout
= get_now( ) ;
2005 time_add( &timeout
, DEFAULT_WAKEUP
) ;
2006 queue_deadline
= now
;
2007 time_add( &queue_deadline
, QUEUE_TIMEOUT
) ;
2010 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2012 pd
[ poll_cnt
].events
= POLLIN
;
2013 pd
[ poll_cnt
++ ].fd
= listen_sock
;
2014 DEBUGDUMP(printf("****** poll((%d", listen_sock
););
2015 for ( c
= clients
; c
!= NULL
; c
= c
->next
)
2017 if ( c
->socket
== -1 )
2022 if ( poll_cnt
== poll_max
)
2024 pd
= (struct pollfd
*)realloc( pd
, ( poll_max
+= POLL_INCR
) * sizeof( struct pollfd
) );
2027 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
2028 exit( ERR_STORAGE_EXHAUSTED
);
2032 pd
[ poll_cnt
].events
= POLLIN
;
2033 pd
[ poll_cnt
++ ].fd
= c
->socket
;
2034 DEBUGDUMP(printf(", %d", c
->socket
););
2038 FD_SET(listen_sock
, &ready
);
2039 DEBUGDUMP(printf("****** select((%d", listen_sock
););
2040 max_sock
= listen_sock
;
2042 * For each connected client, allow its socket
2045 for ( c
= clients
; c
!= NULL
; c
= c
->next
)
2047 if ( c
->socket
!= -1 )
2049 DEBUGDUMP(printf(", %d", c
->socket
););
2050 FD_SET( c
->socket
, &ready
);
2051 if ( c
->socket
> max_sock
)
2052 max_sock
= c
->socket
;
2057 rc
= time_diff( timeout
, now
) ;
2059 rc
= 0 ; /* already timed out */
2060 if ( ( rc
== -1 ) || ( rc
> DEFAULT_WAKEUP
) )
2061 rc
= DEFAULT_WAKEUP
;
2062 DEBUGDUMP(printf("), to=%d) ms at %ld,%03d\n", rc
, now
.seconds
, now
.milli
););
2063 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2064 rc
= poll( pd
, poll_cnt
, rc
) ;
2066 to
.tv_usec
= ( rc
% 1000 ) * 1000 ; /* microseconds fraction */
2067 to
.tv_sec
= rc
/ 1000;
2068 rc
= select( max_sock
+ 1, &ready
, (fd_set
*)0, (fd_set
*)0, &to
) ;
2071 DEBUGDUMP(printf("****** after waiting(), rc=%d at %ld,%03d\n", rc
, now
.seconds
, now
.milli
););
2074 if ( os_errno
!= EINTR
) /* Win32 doesn't know about it ? */
2076 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Calling select", errno_str( os_errno
) );
2077 exit( ERR_RXSTACK_GENERAL
);
2083 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2084 if ( pd
[ 0 ].revents
)
2086 if ( FD_ISSET(listen_sock
, &ready
) )
2089 msgsock
= accept(listen_sock
, (struct sockaddr
*)&client
, &client_size
);
2092 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Calling listen", errno_str( os_errno
) );
2094 exit( ERR_RXSTACK_GENERAL
);
2099 * A client has connected, create a client entry
2100 * and set their default queue to SESSION
2103 * Validate the client here...TBD
2104 * use details in client sockaddr struct
2106 DEBUGDUMP(printf("Client connecting from %s has socket %d\n", inet_ntoa( client
.sin_addr
), msgsock
););
2107 rxstack_create_client( msgsock
);
2112 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2113 for ( c
= clients
, poll_cnt
= 1; c
!= NULL
; poll_cnt
++ )
2115 for ( c
= clients
; c
!= NULL
; )
2118 /* c might be deleted by the following calls.
2119 * Assure we have everything perfect to use the
2120 * next element. An access to c after rxstack_process_command
2125 #if defined(HAVE_POLL) && (defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H))
2126 if ( pd
[ poll_cnt
].revents
)
2128 if ( FD_ISSET( ch
->socket
, &ready
) )
2132 * Process the client's command...
2134 rxstack_process_command( ch
) ;
2140 * If select() timed out or received input, check all connected clients who
2141 * may be waiting for input on one of the queues.
2143 * now contains the time between the start of select() call and now
2147 timeout
= get_now( ) ;
2148 time_add( &timeout
, DEFAULT_WAKEUP
) ;
2149 queue_deadline
= now
;
2150 time_add( &queue_deadline
, QUEUE_TIMEOUT
) ;
2151 for ( c
= clients
; c
!= NULL
; c
= c
->next
)
2153 check_for_waiting( c
, &timeout
);
2155 for ( q
= queues
; q
!= NULL
; )
2159 check_queue( qh
, &timeout
);
2162 #ifdef BUILD_NT_SERVICE
2169 * Gives a short usage description on stderr and returns 1
2171 static int usage( const char *argv0
)
2173 fprintf( stderr
, "Usage: %s [-D] "
2174 #if defined(HAVE_FORK)
2183 static void checkDebug(void)
2185 if ( getenv( "RXDEBUG" ) != NULL
)
2189 int runNormal( int argc
, char **argv
)
2192 const char *argv0
= argv
[ 0 ] ;
2199 if ( ( argc
>= 1 ) && ( strcmp( *argv
, "-D" ) == 0 ) )
2202 putenv( "RXDEBUG=1" ) ;
2208 return usage( argv0
);
2212 if ( strcmp(*argv
, "-k") == 0 )
2216 if ( strcmp(*argv
, "-d") == 0 )
2218 #if defined(HAVE_FORK)
2219 if ( ( rc
= fork() ) != 0 )
2221 rc
= rxstack_doit();
2223 fprintf( stderr
, "Option \"-d\" option is invalid on this platform.\n" ) ;
2224 return usage( argv0
);
2229 return usage( argv0
);
2234 rc
= rxstack_doit();
2237 printf( "%s terminated.\n", argv0
);
2242 #ifdef BUILD_NT_SERVICE
2243 VOID
ServiceStart(DWORD argc
, LPTSTR
*argv
)
2245 int main(int argc
, char *argv
[])
2248 #ifdef BUILD_NT_SERVICE
2251 if ( !nt_service_start() )
2261 runNormal(argc
, argv
);
2264 return runNormal( argc
, argv
);