2 static char *RCSid
= "$Id$";
6 * The Regina Rexx Interpreter
7 * Copyright (C) 1992-1994 Anders Christensen <anders@pvv.unit.no>
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the Free
21 * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * This process runs as a daemon (or NT service). It maintains multiple,
26 * All communication is done via TCP/IP sockets.
27 * This process waits on a known port; 5656 by default for connections
28 * from clients. A client is any process that respects the Interface
29 * defined below. The "normal" clients are regina and rxqueue.
30 * Details about each client is kept, like current queue name.
35 * - set signal handler for SIGTERM
36 * initialise socket interface
43 * - if listen socket, add new client
44 * - otherwise read command
46 * - disconnect all clients
51 * Once a client connects, it sends commands
52 * Commands are single character, followed by optional 6 hex character
53 * length and optional data.
54 * F - queue data onto client's current queue (FIFO)
55 * in -> FFFFFFFxxx--data--xxx
56 * out-> 0000000 (if successful)
57 * out-> 2xxxxxx (if error, eg queue deleted)
58 * regina QUEUE, rxqueue /fifo
59 * L - push data onto client's current queue (LIFO)
60 * in-> LFFFFFFxxx--data--xxx
61 * out-> 0000000 (if successful)
62 * out-> 2xxxxxx (if error, eg queue deleted)
63 * regina PUSH, rxqueue /lifo
65 * in-> CFFFFFFxxx--queue name--xxx (if length 0, create name)
66 * out-> 0xxxxxx (if queue name created - length ignored)
67 * out-> 1FFFFFFxxx--queue name--xxx (if queue name existed or was not specified)
68 * out-> 2xxxxxx (if error)
69 * regina RXQUEUE('C'), rxqueue N/A
71 * in-> DFFFFFFxxx--queue name--xxx
72 * out-> 0000000 (if queue name deleted)
73 * out-> 2xxxxxx (if error, eg queue already deleted)
74 * out-> 9xxxxxx (trying to delete 'SESSION' queue)
75 * regina RXQUEUE('D'), rxqueue N/A
76 * E - empty data from specified queue
77 * in-> KFFFFFFxxx--queue name--xxx
78 * out-> 0000000 (if queue emptied)
79 * out-> 2xxxxxx (if error, eg queue deleted)
80 * regina N/A, rxqueue /clear
81 * P - pop item off client's default queue
83 * out-> 0FFFFFFxxx--data--xxx (if queue name existed)
84 * out-> 1000000 (if queue empty)
85 * out-> 2xxxxxx (if queue name deleted - length ignored)
86 * out-> 4xxxxxx (if timeout on queue exceeded - length ignored)
87 * regina PULL, rxqueue N/A
88 * S - set default queue name (allow false queues)
89 * in-> SFFFFFFxxx--queue name--xxx
90 * out-> 0000000 (if successful)
91 * out-> 2xxxxxx (if error)
92 * regina RXQUEUE('S'), rxqueue N/A
93 * G - get default queue name
95 * out-> 0FFFFFFxxx--queue name--xxx
96 * regina RXQUEUE('G'), rxqueue N/A
97 * N - return number of lines on stack
99 * out-> 0FFFFFF (if queue exists)
100 * out-> 2xxxxxx (if error or queue doesn't exist - length ignored)
101 * regina QUEUED(), rxqueue N/A
102 * T - set timeout on queue pull
104 * out-> 0000000 (if queue timeout set)
105 * regina RXQUEUE('T'), rxqueue N/A
106 * W - write client's temporary stack to "real" stack
108 * out-> 0FFFFFF (if queue exists)
109 * out-> 2xxxxxx (if error or queue doesn't exist - length ignored)
110 * regina QUEUED(), rxqueue N/A
111 * X - client disconnect
114 * Z - client requests shutdown - should only be called by ourselves!!
118 #define MAX_CLIENTS 100
121 #define FD_SETSIZE MAX_CLIENTS+1
126 # if defined(_MSC_VER)
127 # if _MSC_VER >= 1100
128 /* Stupid MSC can't compile own headers without warning at least in VC 5.0 */
129 # pragma warning(disable: 4115 4201 4214)
131 # include <windows.h>
132 # if _MSC_VER >= 1100
133 # pragma warning(default: 4115 4201 4214)
136 # include <windows.h>
140 # ifdef HAVE_SYS_SOCKET_H
141 # include <sys/socket.h>
143 # ifdef HAVE_NETINET_IN_H
144 # include <netinet/in.h>
146 # ifdef HAVE_SYS_SELECT_H
147 # include <sys/select.h>
152 # ifdef HAVE_ARPA_INET_H
153 # include <arpa/inet.h>
174 #ifdef HAVE_PROCESS_H
175 # include <process.h>
178 #if defined(TIME_WITH_SYS_TIME)
179 # include <sys/time.h>
182 # if defined(HAVE_SYS_TIME_H)
183 # include <sys/time.h>
190 #if !defined(__WATCOMC__) && !defined(_MSC_VER) && !(defined(__IBMC__) && defined(WIN32)) && !defined(__SASC) && !defined(__MINGW32__) && !defined(__BORLANDC__)
193 #if defined(__WATCOMC__) && defined(__QNX__)
197 #include "extstack.h"
200 # define DEBUGDUMP(x) {x;}
202 # define DEBUGDUMP(x) {}
205 #ifdef BUILD_NT_SERVICE
206 # include "service.h"
208 * this event is signalled when the
211 HANDLE hServerStopEvent
= NULL
;
214 #define NUMBER_QUEUES 100
217 * Structure for multiple queues
218 * All clients share the queues. Do they also share the
219 * temporary stack ? yes I think so TBD
221 typedef struct stacklinestruct
*stacklineptr
;
223 typedef struct stacklinestruct
225 stacklineptr next
, prev
;
230 * Pointers to queue names
232 streng
*queuename
[NUMBER_QUEUES
] ;
234 * Indicates if the queue is a "real" queue
235 * or a false queue as a result of a rxqueue('set', 'qname')
236 * on a queue that doesn't exist. This is crap behaviour!
237 * but that's how Object Rexx works :-(
239 int real_queue
[NUMBER_QUEUES
];
241 * Pointers to first and last line in the stack.
243 stacklineptr lastline
[NUMBER_QUEUES
] ;
244 stacklineptr firstline
[NUMBER_QUEUES
] ;
247 * Structure for multiple clients
254 * Pointers to first and last entry in the temporary stack
255 * don't use this yet; if ever
257 stacklineptr firstbox
;
258 stacklineptr lastbox
;
262 * Linked list would be better for client - TBD
264 CLIENT clients
[MAX_CLIENTS
];
268 time_t base_secs
; /* the time the process started */
270 #if !defined(HAVE_STRERROR)
272 * Sigh! This must probably be done this way, although it's incredibly
273 * backwards. Some versions of gcc comes with a complete set of ANSI C
274 * include files, which contains the definition of strerror(). However,
275 * that function does not exist in the default libraries of SunOS.
276 * To circumvent that problem, strerror() is #define'd to get_sys_errlist()
277 * in config.h, and here follows the definition of that function.
278 * Originally, strerror() was #defined to sys_errlist[x], but that does
279 * not work if string.h contains a declaration of the (non-existing)
280 * function strerror().
282 * So, this is a mismatch between the include files and the library, and
283 * it should not create problems for Regina. However, the _user_ will not
284 * encounter any problems until he compiles Regina, so we'll have to
285 * clean up after a buggy installation of the C compiler!
287 const char *get_sys_errlist( int num
)
289 extern char *sys_errlist
[] ;
290 return sys_errlist
[num
] ;
294 time_t getmsecs( void )
297 #if defined(HAVE_GETTIMEOFDAY)
298 struct timeval times
;
300 gettimeofday(×
, NULL
) ;
301 secs
= times
.tv_sec
- base_secs
;
302 usecs
= times
.tv_usec
;
304 DEBUGDUMP(printf("getmsecs() secs: %ld usecs: %ld\n", secs
, usecs
););
306 if (times
.tv_usec
< 0)
308 usecs
= (times
.tv_usec
+ 1000000) ;
309 secs
= times
.tv_sec
- 1 ;
312 #elif defined(HAVE_FTIME)
313 struct timeb timebuffer
;
316 secs
= timebuffer
.time
- base_secs
;
317 usecs
= timebuffer
.millitm
* 1000;
319 secs
= time(NULL
) - base_secs
;
322 return ( (secs
*1000000)+usecs
) / 1000;
325 streng
*Streng_upper( streng
*str
)
329 for ( i
= 0; i
< PSTRENGLEN( str
); i
++ )
331 if ( islower( str
->value
[i
] ) )
332 str
->value
[i
] = (char)toupper( str
->value
[i
] );
337 int Str_ccmp( const streng
*first
, const streng
*second
)
341 if ( PSTRENGLEN( first
) != PSTRENGLEN( second
) )
344 for (tmp
=0; tmp
< PSTRENGLEN( first
); tmp
++ )
345 if ( tolower( first
->value
[tmp
] ) != tolower( second
->value
[tmp
] ) )
351 void delete_a_queue( int idx
)
353 stacklineptr ptr
=firstline
[idx
],nptr
;
359 DROPSTRENG(ptr
->contents
) ;
364 DROPSTRENG( queuename
[idx
] );
366 firstline
[idx
] = lastline
[idx
] = NULL
;
367 queuename
[idx
] = NULL
;
369 * OS/2 Rexx seems to reset the queue to SESSION for each client
370 * that has the queue we are just deleting as their current
373 for( i
= 0; i
< MAX_CLIENTS
; i
++ )
375 if ( clients
[i
].default_queue
== idx
)
376 clients
[i
].default_queue
= 0;
380 void delete_all_queues( void )
383 for ( i
= 0; i
< NUMBER_QUEUES
; i
++ )
390 char *get_unspecified_queue( void )
392 char *rxq
= getenv( "RXQUEUE" );
403 char *in_queue
=get_unspecified_queue();
405 int server_address
,portno
;
407 if ( init_external_queue( NULL
) )
411 queue
= MAKESTRENG( strlen( in_queue
) );
414 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
415 return(ERR_STORAGE_EXHAUSTED
);
417 memcpy( PSTRENGVAL( queue
), in_queue
, PSTRENGLEN( queue
) );
419 if ( parse_queue( NULL
, queue
, &server_name
, &server_address
, &portno
) == 0 )
421 sock
= connect_to_rxstack( NULL
, portno
, server_name
, server_address
);
424 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_CANT_CONNECT
, ERR_RXSTACK_CANT_CONNECT_TMPL
, server_name
, portno
, strerror( errno
) );
425 return(ERR_RXSTACK_CANT_CONNECT
);
427 send_command_to_rxstack( NULL
, sock
, RXSTACK_KILL_STR
, NULL
, 0 );
428 read_result_from_rxstack( NULL
, sock
, RXSTACK_HEADER_SIZE
);
434 int rxstack_cleanup( void )
438 DEBUGDUMP(printf("Cleaning up\n"););
440 * Disconnect all clients
444 term_external_queue();
447 DEBUGDUMP(printf("Finished Cleaning up\n"););
453 #ifdef BUILD_NT_SERVICE
454 BOOL
report_service_start( void )
457 * report the status to the service control manager.
459 return (ReportStatusToSCMgr(
460 SERVICE_RUNNING
, /* service state */
461 NO_ERROR
, /* exit code */
465 BOOL
report_service_pending_start( void )
468 * report the status to the service control manager.
470 return (ReportStatusToSCMgr(
471 SERVICE_START_PENDING
, /* service state */
472 NO_ERROR
, /* exit code */
473 3000)); /* wait hint */
476 int nt_service_start( void )
479 * code copied from sample NT Service code. The goto's are
481 * report the status to the service control manager.
483 if ( !report_service_pending_start() )
487 * create the event object. The control handler function signals
488 * this event when it receives the "stop" control code.
490 hServerStopEvent
= CreateEvent(
491 NULL
, /* no security attributes */
492 TRUE
, /* manual reset event */
493 FALSE
, /* not-signalled */
496 if ( hServerStopEvent
== NULL
)
500 * report the status to the service control manager.
502 if ( !report_service_pending_start() )
512 DEBUGDUMP(printf("In ServiceStop()\n"););
520 void rxstack_signal_handler( int sig
)
525 int find_free_client( )
528 for ( i
= 0; i
< MAX_CLIENTS
; i
++ )
530 if ( clients
[i
].socket
== 0 )
537 * Find the named queue - case insensitive
539 int find_queue( streng
*queue_name
)
543 for ( i
= 0; i
< NUMBER_QUEUES
; i
++ ) /* inefficient !! */
546 && Str_ccmp( queuename
[i
], queue_name
) == 0 )
549 return UNKNOWN_QUEUE
;
552 int find_free_slot( void )
554 int i
,idx
= UNKNOWN_QUEUE
;
556 for ( i
= 0; i
< NUMBER_QUEUES
; i
++ )
558 if ( queuename
[i
] == NULL
)
567 int rxstack_delete_queue( CLIENT
*client
, streng
*queue_name
)
571 if ( ( idx
= find_queue( queue_name
) ) == UNKNOWN_QUEUE
)
582 * If we found a false queue, return 9
584 if ( real_queue
[idx
] == 0 )
589 * Delete the contents of the queue
590 * and mark it as gone.
592 delete_a_queue( idx
);
600 int rxstack_create_client( int socket
)
603 i
= find_free_client();
610 clients
[i
].default_queue
= 0;
611 clients
[i
].socket
= socket
;
612 clients
[i
].queue_timeout
= 0;
617 int rxstack_send_return( int sock
, char *action
, char *str
, int len
)
619 streng
*qlen
, *header
;
622 DEBUGDUMP(printf("Sending to %d Result: %s <%s>\n", sock
, action
, (str
) ? str
: ""););
623 qlen
= REXX_D2X( len
);
626 header
= REXX_RIGHT( qlen
, RXSTACK_HEADER_SIZE
, '0');
630 header
->value
[0] = action
[0];
631 rc
= send( sock
, PSTRENGVAL(header
), PSTRENGLEN(header
), 0 );
632 DEBUGDUMP(printf("Send length: %s(%d) rc %d\n", PSTRENGVAL(header
),PSTRENGLEN(header
),rc
););
635 rc
= send( sock
, str
, len
, 0 );
636 DEBUGDUMP(printf("Send str <%s> length %d rc: %d\n", str
, len
, rc
););
638 DROPSTRENG( header
);
644 int rxstack_delete_client( CLIENT
*client
)
646 close( client
->socket
);
648 client
->default_queue
= 0;
652 int rxstack_set_default_queue( CLIENT
*client
, streng
*data
)
657 rc
= client
->default_queue
;
658 if ( ( idx
= find_queue( data
) ) == UNKNOWN_QUEUE
)
661 * We didn't find a real or a false queue, so create
664 idx
= find_free_slot( );
665 if ( idx
== UNKNOWN_QUEUE
)
669 new_queue
= MAKESTRENG( PSTRENGLEN( data
) );
672 memcpy( PSTRENGVAL( new_queue
), PSTRENGVAL( data
), PSTRENGLEN( new_queue
) );
676 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
677 exit( ERR_STORAGE_EXHAUSTED
);
679 queuename
[idx
] = Streng_upper( new_queue
) ;
680 client
->default_queue
= idx
;
681 real_queue
[idx
] = 0; /* false queue */
686 client
->default_queue
= idx
;
691 DEBUGDUMP(printf("No FREE SLOTS when setting default queue for client: <%s>\n", PSTRENGVAL(data
) ););
695 DEBUGDUMP(printf("Setting default queue for client: <%s> Prev: %d <%s>\n", PSTRENGVAL(data
), rc
, PSTRENGVAL(queuename
[rc
]) ););
700 int rxstack_timeout_queue( CLIENT
*client
, streng
*data
)
704 * Extract the timeout value from the incoming buffer
706 timeout
= MakeStreng( PSTRENGLEN( data
) );
707 if ( timeout
== NULL
)
709 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
710 exit( ERR_STORAGE_EXHAUSTED
);
712 memcpy( PSTRENGVAL(timeout
), PSTRENGVAL( data
), PSTRENGLEN( data
) );
714 * Convert the timeout
715 * If the supplied timeout is 0 (infinite wait), set the client->queue_timeout
718 client
->queue_timeout
= REXX_X2D( timeout
);
719 if ( client
->queue_timeout
== 0 )
721 client
->queue_timeout
= -1;
723 DEBUGDUMP(printf("Timeout on queue: %ld\n", client
->queue_timeout
););
724 DROPSTRENG( timeout
);
729 int rxstack_create_queue( CLIENT
*client
, streng
*data
)
732 int idx
=UNKNOWN_QUEUE
,false_queue_idx
=UNKNOWN_QUEUE
;
738 DEBUGDUMP(printf("Creating new user-specified queue: <%s>\n", PSTRENGVAL(data
) ););
739 if ( ( idx
= find_queue( data
) ) == UNKNOWN_QUEUE
)
742 * No queue of that name, so use it.
744 DEBUGDUMP(printf("Couldn't find <%s>; so creating it\n", PSTRENGVAL(data
) ););
745 new_queue
= MAKESTRENG( PSTRENGLEN( data
));
748 memcpy( PSTRENGVAL( new_queue
), PSTRENGVAL( data
), PSTRENGLEN( new_queue
) );
752 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
753 exit( ERR_STORAGE_EXHAUSTED
);
759 * If the queue we found is a false queue, we can still
760 * use the supplied name and the slot
762 if ( real_queue
[idx
] == 0 )
764 DEBUGDUMP(printf("Found false queue\n" ););
765 new_queue
= MAKESTRENG( PSTRENGLEN( data
));
768 memcpy( PSTRENGVAL( new_queue
), PSTRENGVAL( data
), PSTRENGLEN( new_queue
) );
772 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
773 exit( ERR_STORAGE_EXHAUSTED
);
775 false_queue_idx
= idx
;
780 * Create a unique queue name
782 sprintf(buf
,"S%d%ld%d", getpid(), clock(), find_free_slot( ) );
783 DEBUGDUMP(printf("Having to create unique queue <%s>\n", buf
););
784 length
= strlen( buf
);
785 new_queue
= MAKESTRENG( length
);
788 memcpy( PSTRENGVAL( new_queue
), buf
, length
);
789 new_queue
->len
= length
;
793 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
794 exit( ERR_STORAGE_EXHAUSTED
);
802 DEBUGDUMP(printf("Creating system generated queue.\n"););
804 * Create a unique queue name
806 sprintf(buf
,"S%d%ld%d", getpid(), clock(), find_free_slot( ) );
807 length
= strlen( buf
);
808 new_queue
= MAKESTRENG( length
);
811 memcpy( PSTRENGVAL( new_queue
), buf
, length
);
812 new_queue
->len
= length
;
817 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
818 exit( ERR_STORAGE_EXHAUSTED
);
822 * Find the first slot that hasn't been used, or use
825 if ( false_queue_idx
== UNKNOWN_QUEUE
)
826 idx
= find_free_slot( );
828 idx
= false_queue_idx
;
830 if ( idx
== UNKNOWN_QUEUE
)
832 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_TOO_MANY_QUEUES
, ERR_RXSTACK_TOO_MANY_QUEUES_TMPL
, NUMBER_QUEUES
);
833 return ERR_RXSTACK_TOO_MANY_QUEUES
;
835 if ( queuename
[idx
] )
836 DROPSTRENG( queuename
[idx
] );
838 * Uppercase the queue name
840 queuename
[idx
] = Streng_upper( new_queue
);
841 client
->default_queue
= idx
;
848 * Pushes 'line' onto the REXX stack, lifo, and sets 'lastline' to
849 * point to the new line. The line is put on top of the current
852 stacklineptr
rxstack_stack_lifo( int current_queue
, streng
*line
)
854 stacklineptr newbox
=NULL
;
856 newbox
= (stacklineptr
)malloc( sizeof(stackline
) ) ;
859 if ( lastline
[current_queue
])
861 lastline
[current_queue
]->next
= newbox
;
862 newbox
->prev
= lastline
[current_queue
] ;
866 newbox
->prev
= NULL
;
867 firstline
[current_queue
] = newbox
;
870 newbox
->next
= NULL
;
871 newbox
->contents
= line
;
872 lastline
[current_queue
] = newbox
;
879 * Puts 'line' on the REXX stack, fifo. This routine is similar to
880 * stack_lifo but the differences are big enough to have a separate
881 * routine. The line is put in the bottom of the current buffer,
882 * that is just above the uppermost buffer mark, or at the bottom
883 * of the stack, if there are no buffer marks.
885 stacklineptr
rxstack_stack_fifo( int current_queue
, streng
*line
)
887 stacklineptr newbox
=NULL
, ptr
=NULL
;
889 newbox
= (stacklineptr
)malloc( sizeof(stackline
) ) ;
892 newbox
->prev
= newbox
->next
= NULL
;
893 newbox
->contents
= line
;
895 for (ptr
=lastline
[current_queue
];(ptr
)&&(ptr
->contents
);ptr
=ptr
->prev
) ;
900 newbox
->next
= ptr
->next
;
902 ptr
->next
->prev
= newbox
;
904 lastline
[current_queue
] = newbox
;
909 newbox
->next
= firstline
[current_queue
] ;
910 firstline
[current_queue
] = newbox
;
912 newbox
->next
->prev
= newbox
;
913 if (!lastline
[current_queue
])
914 lastline
[current_queue
] = newbox
;
920 int rxstack_queue_data( CLIENT
*client
, streng
*data
, char order
)
926 line
= MakeStreng ( 0 );
929 DEBUGDUMP(printf("Queueing: <%s> Order: %c\n", (PSTRENGVAL(line
)) ? PSTRENGVAL(line
) : "", order
););
930 if ( order
== RXSTACK_QUEUE_FIFO
)
932 if ( rxstack_stack_fifo( client
->default_queue
, line
) == NULL
)
937 if ( rxstack_stack_lifo( client
->default_queue
, line
) == NULL
)
943 int rxstack_empty_queue( CLIENT
*client
, streng
*data
)
946 stacklineptr tmp
,line
;
949 DEBUGDUMP(printf("Emptying queue: <%s>\n", (PSTRENGVAL(data
)) ? PSTRENGVAL(data
) : "" ););
950 rc
= rxstack_set_default_queue( client
, data
);
955 for ( line
= firstline
[client
->default_queue
]; line
!= NULL
; )
957 contents
= line
->contents
;
958 DROPSTRENG( contents
);
963 firstline
[client
->default_queue
] = lastline
[client
->default_queue
] = NULL
;
968 int rxstack_number_in_queue( CLIENT
*client
)
973 ptr
= firstline
[client
->default_queue
] ;
974 for ( lines
= 0; ptr
; ptr
= ptr
->next
)
979 DEBUGDUMP(printf("Querying number in queue: %d\n", lines
););
984 * Gets the line off the queue but doesn't delete it
986 int rxstack_get_line_off_queue( CLIENT
*client
, streng
**result
)
989 * Return 0 if line OK, 1 if queue empty, 2 if error, 3 if timeout, 4 if still waiting
992 stacklineptr line
=NULL
;
995 * test here to see if client's default queue is valid
997 if ( ( line
= lastline
[client
->default_queue
] ) != NULL
)
999 *result
= MAKESTRENG( line
->contents
->len
);
1002 memcpy( (*result
)->value
, line
->contents
->value
, line
->contents
->len
);
1010 if ( client
->queue_timeout
)
1011 rc
= 3; /* waiting */
1013 rc
= 1; /* queue empty */
1015 DEBUGDUMP(printf("Pulling line off queue; rc %d\n", rc
););
1019 int rxstack_delete_line_off_queue( CLIENT
*client
)
1022 * Return 0 if line OK, 1 if queue empty, 2 if error
1025 streng
*contents
=NULL
;
1026 stacklineptr line
=NULL
;
1029 * test here to see if client's default queue is valid
1031 DEBUGDUMP(printf("Deleting line off queue\n" ););
1032 if ( ( line
= lastline
[client
->default_queue
] ) != NULL
)
1034 contents
= line
->contents
;
1035 DROPSTRENG( contents
);
1036 lastline
[client
->default_queue
] = line
->prev
;
1038 firstline
[client
->default_queue
] = NULL
;
1040 line
->prev
->next
= NULL
;
1051 int rxstack_process_command( CLIENT
*client
)
1053 char cheader
[RXSTACK_HEADER_SIZE
];
1056 streng
*result
=NULL
;
1061 memset( cheader
, 0, sizeof(cheader
) );
1062 DEBUGDUMP(printf("reading from socket %d\n", client
->socket
););
1063 rc
= recv( client
->socket
, cheader
, RXSTACK_HEADER_SIZE
, 0 );
1066 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_READING_SOCKET
, ERR_RXSTACK_READING_SOCKET_TMPL
, strerror( errno
) );
1068 * Assume client has been lost
1070 rxstack_delete_client( client
);
1074 DEBUGDUMP(printf("read empty header\n"););
1076 * Assume client has been lost
1078 rxstack_delete_client( client
);
1082 header
= MakeStreng( RXSTACK_HEADER_SIZE
- 1 );
1083 if ( header
== NULL
)
1085 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1086 exit( ERR_STORAGE_EXHAUSTED
);
1088 memcpy( PSTRENGVAL(header
), cheader
+1, RXSTACK_HEADER_SIZE
-1 );
1091 * Convert the data length
1093 length
= REXX_X2D( header
);
1094 DROPSTRENG( header
);
1095 DEBUGDUMP(printf("Header: <%s> length: %d\n", header
->value
, length
););
1099 * Allocate a streng big enough for the expected data
1100 * string, based on the length just read; even if the length
1103 buffer
= MakeStreng ( length
);
1104 if ( buffer
== NULL
)
1106 showerror( ERR_STORAGE_EXHAUSTED
, 0, ERR_STORAGE_EXHAUSTED_TMPL
);
1107 exit( ERR_STORAGE_EXHAUSTED
);
1109 rc
= recv( client
->socket
, PSTRENGVAL(buffer
), PSTRENGLEN(buffer
), 0 );
1112 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_READING_SOCKET
, ERR_RXSTACK_READING_SOCKET_TMPL
, strerror( errno
) );
1117 * All we can assume here is that the client has been lost
1119 DEBUGDUMP(printf("read empty header\n"););
1120 rxstack_delete_client( client
);
1125 switch( cheader
[0] )
1127 case RXSTACK_QUEUE_FIFO
:
1128 case RXSTACK_QUEUE_LIFO
:
1129 DEBUGDUMP(printf("--- Queue %s ---\n", cheader
[0] == RXSTACK_QUEUE_FIFO
? "FIFO" : "LIFO"););
1130 rxstack_queue_data( client
, buffer
, cheader
[0] );
1131 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1134 DEBUGDUMP(printf("--- Exit ---\n"););
1136 * Client has requested disconnect, so remove all
1137 * references to the client
1139 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1140 rxstack_delete_client( client
);
1143 DEBUGDUMP(printf("--- Kill ---\n"););
1145 * Client has requested server to stop
1147 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1148 rxstack_delete_client( client
);
1151 case RXSTACK_SET_QUEUE
:
1152 DEBUGDUMP(printf("--- Set Queue ---\n"););
1154 * Set the default queue for the client
1156 rc
= rxstack_set_default_queue( client
, buffer
);
1158 rxstack_send_return( client
->socket
, "2", NULL
, 0 );
1160 rxstack_send_return( client
->socket
, "0", (queuename
[rc
])->value
, (queuename
[rc
])->len
);
1161 DROPSTRENG( buffer
);
1163 case RXSTACK_EMPTY_QUEUE
:
1164 DEBUGDUMP(printf("--- Empty Queue ---\n"););
1165 rxstack_empty_queue( client
, buffer
);
1166 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1168 case RXSTACK_NUMBER_IN_QUEUE
:
1169 DEBUGDUMP(printf("--- Number in Queue ---\n"););
1170 length
= rxstack_number_in_queue( client
);
1171 rxstack_send_return( client
->socket
, "0", NULL
, length
);
1174 DEBUGDUMP(printf("--- Pull ---\n"););
1175 rc
= rxstack_get_line_off_queue( client
, &result
);
1178 case 0: /* all OK */
1179 rxstack_send_return( client
->socket
, "0", PSTRENGVAL( result
), PSTRENGLEN( result
) );
1180 DROPSTRENG( result
);
1181 rc
= rxstack_delete_line_off_queue( client
);
1185 rcode
[0] = (char)(rc
+'0');
1186 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1187 DROPSTRENG( result
);
1189 default: /* 3 - still waiting; don't return */
1193 case RXSTACK_GET_QUEUE
:
1194 DEBUGDUMP(printf("--- Get Queue ---\n"););
1195 rxstack_send_return( client
->socket
, "0", (queuename
[client
->default_queue
])->value
, (queuename
[client
->default_queue
])->len
);
1197 case RXSTACK_CREATE_QUEUE
:
1198 DEBUGDUMP(printf("--- Create Queue ---\n"););
1200 * Create a new queue
1202 rc
= rxstack_create_queue( client
, buffer
);
1204 rxstack_send_return( client
->socket
, "0", NULL
, 0 );
1207 rcode
[0] = (char)(rc
+'0');
1208 rxstack_send_return( client
->socket
, rcode
, (queuename
[client
->default_queue
])->value
, (queuename
[client
->default_queue
])->len
);
1210 DROPSTRENG( buffer
);
1212 case RXSTACK_DELETE_QUEUE
:
1213 DEBUGDUMP(printf("--- Delete Queue ---\n"););
1217 rc
= rxstack_delete_queue( client
, buffer
);
1218 rcode
[0] = (char)(rc
+'0');
1219 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1220 DROPSTRENG( buffer
);
1222 case RXSTACK_TIMEOUT_QUEUE
:
1223 DEBUGDUMP(printf("--- Timeout Queue ---\n"););
1225 * Set timeout for pull from queue
1227 rc
= rxstack_timeout_queue( client
, buffer
);
1228 rcode
[0] = (char)(rc
+'0');
1229 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1230 DROPSTRENG( buffer
);
1232 case RXSTACK_UNKNOWN
:
1236 rxstack_send_return( client
->socket
, "9", NULL
, 0 );
1243 void check_for_waiting( CLIENT
*client
, time_t inc
)
1245 streng
*result
=NULL
;
1251 * If the client is waiting infinitely (clinet->queue_timeout = -1, then
1252 * simply ignore the decrement of the timeout counter
1254 if ( client
->queue_timeout
!= -1 )
1256 if ( inc
>= client
->queue_timeout
)
1257 client
->queue_timeout
= 0;
1259 client
->queue_timeout
-= inc
;
1261 DEBUGDUMP(printf("Checking for wait. Remaining time: %ld Value decremented: %ld\n", client
->queue_timeout
, inc
););
1263 * Check if there is anything in the queue...
1265 rc
= rxstack_get_line_off_queue( client
, &result
);
1266 if ( rc
== 1 /* empty */
1267 && client
->queue_timeout
== 0 )
1273 case 0: /* all OK */
1274 rxstack_send_return( client
->socket
, "0", PSTRENGVAL( result
), PSTRENGLEN( result
) );
1275 DROPSTRENG( result
);
1276 rc
= rxstack_delete_line_off_queue( client
);
1280 case 4: /* timeout */
1281 rcode
[0] = (char)(rc
+'0');
1282 rxstack_send_return( client
->socket
, rcode
, NULL
, 0 );
1283 DROPSTRENG( result
);
1284 client
->queue_timeout
= 0;
1286 default: /* 3 - still waiting; don't return */
1293 int listen_sock
,i
,msgsock
;
1294 struct sockaddr_in server
,client
;
1296 int max_sock
,client_size
;
1300 #ifdef BUILD_NT_SERVICE
1303 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
1307 base_secs
= time(NULL
);
1308 client_size
= sizeof( struct sockaddr
);
1310 if ( init_external_queue( NULL
) )
1314 #ifdef BUILD_NT_SERVICE
1316 && !report_service_pending_start() )
1320 * Set up signal handler
1323 signal( SIGTERM
, rxstack_signal_handler
);
1326 signal( SIGINT
, rxstack_signal_handler
);
1329 signal( SIGPIPE
, SIG_IGN
);
1331 memset( clients
, 0, sizeof(clients
) );
1332 memset( queuename
, 0, sizeof(queuename
) );
1335 * Initialise default "SESSION" queue
1337 queuename
[0] = MakeStreng( 7 );
1338 memcpy( queuename
[0]->value
, "SESSION", 7 ) ;
1341 #ifdef BUILD_NT_SERVICE
1343 && !report_service_pending_start() )
1347 * Create listener socket
1349 listen_sock
= socket(AF_INET
, SOCK_STREAM
, 0);
1350 if (listen_sock
< 0)
1352 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Listening on socket", strerror( errno
) );
1354 exit(ERR_RXSTACK_GENERAL
);
1356 memset( &server
, 0, sizeof(server
) );
1357 server
.sin_family
= AF_INET
;
1358 server
.sin_addr
.s_addr
= htonl(INADDR_ANY
);
1359 portno
= get_default_port_number();
1360 server
.sin_port
= htons((unsigned short) portno
);
1362 #ifdef BUILD_NT_SERVICE
1364 && !report_service_pending_start() )
1368 #if defined(SO_REUSEADDR) && defined(SOL_SOCKET)
1369 setsockopt( listen_sock
, SOL_SOCKET
, SO_REUSEADDR
, (void *) &on
, sizeof( on
) );
1371 if ( bind(listen_sock
, (struct sockaddr
*)&server
, sizeof(server
)) < 0)
1373 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Error binding socket", strerror( errno
) );
1375 exit( ERR_RXSTACK_GENERAL
);
1377 #ifdef BUILD_NT_SERVICE
1378 sprintf(buf
, "Listening on port: %d", portno
);
1381 if ( !report_service_start() )
1383 AddToMessageLog(TEXT(buf
));
1387 printf( "%s\n", buf
);
1391 printf( "rxstack listening on port: %d\n", portno
);
1395 * Start accepting connections
1397 listen(listen_sock
, 5);
1401 FD_SET(listen_sock
, &ready
);
1402 DEBUGDUMP(printf("FD_SET for %d\n", listen_sock
););
1403 max_sock
= listen_sock
;
1405 * For each connected client, allow its socket
1408 for ( i
= 0; i
< MAX_CLIENTS
; i
++ )
1411 if ( clients
[i
].socket
)
1413 DEBUGDUMP(printf("FD_SET for %d\n", clients
[i
].socket
););
1414 FD_SET( clients
[i
].socket
, &ready
);
1415 if ( clients
[i
].socket
> max_sock
)
1416 max_sock
= clients
[i
].socket
;
1419 to
.tv_usec
= 100000; /* 100000 is 100 milliseconds */
1422 DEBUGDUMP(printf("*********** before select() max_sock %d time: %ld\n", max_sock
, now
););
1423 if ( ( rc
= select(max_sock
+1, &ready
, (fd_set
*)0, (fd_set
*)0, &to
) ) < 0)
1425 if ( errno
!= EINTR
)
1426 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Calling select", strerror( errno
) );
1429 DEBUGDUMP(printf("*********** after select() rc %d time: %ld\n", rc
, getmsecs()););
1432 if ( FD_ISSET(listen_sock
, &ready
) )
1434 msgsock
= accept(listen_sock
, (struct sockaddr
*)&client
, (int *)&client_size
);
1437 showerror( ERR_EXTERNAL_QUEUE
, ERR_RXSTACK_GENERAL
, ERR_RXSTACK_GENERAL_TMPL
, "Calling listen", strerror( errno
) );
1439 exit( ERR_RXSTACK_GENERAL
);
1444 * A client has connected, create a client entry
1445 * and set their default queue to SESSION
1448 * Validate the client here...TBD
1449 * use details in client sockaddr struct
1451 DEBUGDUMP(printf("Client connecting from %s\n", inet_ntoa( client
.sin_addr
) ););
1452 rxstack_create_client( msgsock
);
1457 for ( i
= 0; i
< MAX_CLIENTS
; i
++ )
1459 if ( clients
[i
].socket
)
1461 if ( FD_ISSET( clients
[i
].socket
, &ready
) )
1464 * Process the client's command...
1466 rxstack_process_command(&clients
[i
]);
1473 * If select() timed out or received input, check all connected clients who
1474 * may be waiting for input on one of the queues.
1479 * now contains the time between the start of select() call and now
1482 now
= getmsecs() - now
;
1483 for ( i
= 0; i
< MAX_CLIENTS
; i
++ )
1485 if ( clients
[i
].socket
1486 && clients
[i
].queue_timeout
)
1488 check_for_waiting( &clients
[i
], now
);
1493 #ifdef BUILD_NT_SERVICE
1499 int runNormal( int argc
, char **argv
)
1505 if ( strcmp(argv
[1], "-k") == 0 )
1509 else if ( strcmp(argv
[1], "-d") == 0 )
1511 #if defined(HAVE_FORK)
1512 if ( ( rc
= fork() ) != 0 )
1514 rc
= rxstack_doit();
1516 fprintf(stderr
,"-d option invalid on this platform\nUsage: %s [-d|-k]\n", argv
[0] );
1522 fprintf(stderr
,"Usage: %s [-d|-k]\n", argv
[0] );
1528 rc
= rxstack_doit();
1531 printf( "%s terminated.\n", argv
[0] );
1536 #ifdef BUILD_NT_SERVICE
1537 VOID
ServiceStart(DWORD argc
, LPTSTR
*argv
)
1539 int main(int argc
, char *argv
[])
1542 #ifdef BUILD_NT_SERVICE
1545 if ( !nt_service_start() )
1554 runNormal(argc
, argv
);
1558 return runNormal( argc
, argv
);