1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
9 * [1] lth. The call to Sleep() is a hack to get the test case to run
10 * on Windows 95. Without it, the test case fails with an error
11 * WSAECONNRESET following a recv() call. The error is caused by the
12 * server side thread termination without a shutdown() or closesocket()
13 * call. Windows docmunentation suggests that this is predicted
14 * behavior; that other platforms get away with it is ... serindipity.
15 * The test case should shutdown() or closesocket() before
16 * thread termination. I didn't have time to figure out where or how
17 * to do it. The Sleep() call inserts enough delay to allow the
18 * client side to recv() all his data before the server side thread
19 * terminates. Whew! ...
21 ** Modification History:
22 * 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
23 * The debug mode will print all of the printfs associated with this test.
24 * The regress mode will be the default mode. Since the regress tool limits
25 * the output to a one line status:PASS or FAIL,all of the printf statements
26 * have been handled with an if (debug_mode) statement.
58 ** This is the beginning of the test
64 #define DEFAULT_HIGH 0
65 #define BUFFER_SIZE 1024
66 #define DEFAULT_BACKLOG 5
69 #define PORT_INC_DO +100
74 #define PORT_INC_3264 +200
79 #define DEFAULT_PORT 12849 PORT_INC_DO PORT_INC_3264
81 #define DEFAULT_CLIENTS 1
82 #define ALLOWED_IN_ACCEPT 1
83 #define DEFAULT_CLIPPING 1000
84 #define DEFAULT_WORKERS_MIN 1
85 #define DEFAULT_WORKERS_MAX 1
86 #define DEFAULT_SERVER "localhost"
87 #define DEFAULT_EXECUTION_TIME 10
88 #define DEFAULT_CLIENT_TIMEOUT 4000
89 #define DEFAULT_SERVER_TIMEOUT 4000
90 #define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH
92 typedef enum CSState_e
{cs_init
, cs_run
, cs_stop
, cs_exit
} CSState_t
;
94 static void PR_CALLBACK
Worker(void *arg
);
95 typedef struct CSPool_s CSPool_t
;
96 typedef struct CSWorker_s CSWorker_t
;
97 typedef struct CSServer_s CSServer_t
;
98 typedef enum Verbosity
109 static PRInt32 domain
= AF_INET
;
110 static PRInt32 protocol
= 6; /* TCP */
111 static PRFileDesc
*debug_out
= NULL
;
112 static PRBool debug_mode
= PR_FALSE
;
113 static PRBool pthread_stats
= PR_FALSE
;
114 static Verbosity verbosity
= TEST_LOG_ALWAYS
;
115 static PRThreadScope thread_scope
= PR_LOCAL_THREAD
;
119 PRCList element
; /* list of the server's workers */
121 PRThread
*thread
; /* this worker objects thread */
122 CSServer_t
*server
; /* back pointer to server structure */
128 PRCondVar
*acceptComplete
;
129 PRUint32 accepting
, active
, workers
;
134 PRCList list
; /* head of worker list */
137 PRThread
*thread
; /* the main server thread */
138 PRCondVar
*stateChange
;
140 PRUint16 port
; /* port we're listening on */
141 PRUint32 backlog
; /* size of our listener backlog */
142 PRFileDesc
*listener
; /* the fd accepting connections */
144 CSPool_t pool
; /* statistics on worker threads */
145 CSState_t state
; /* the server's state */
146 struct /* controlling worker counts */
148 PRUint32 minimum
, maximum
, accepting
;
152 PRIntervalTime started
, stopped
;
153 PRUint32 operations
, bytesTransferred
;
156 typedef struct CSDescriptor_s
158 PRInt32 size
; /* size of transfer */
159 char filename
[60]; /* filename, null padded */
162 typedef struct CSClient_s
166 PRCondVar
*stateChange
;
167 PRNetAddr serverAddress
;
172 PRIntervalTime started
, stopped
;
173 PRUint32 operations
, bytesTransferred
;
176 #define TEST_LOG(l, p, a) \
178 if (debug_mode || (p <= verbosity)) printf a; \
181 PRLogModuleInfo
*cltsrv_log_file
= NULL
;
183 #define MY_ASSERT(_expr) \
184 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
186 #define TEST_ASSERT(_expr) \
187 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
189 static void _MY_Assert(const char *s
, const char *file
, PRIntn ln
)
192 PR_Assert(s
, file
, ln
);
195 static PRBool
Aborted(PRStatus rv
)
197 return ((PR_FAILURE
== rv
) && (PR_PENDING_INTERRUPT_ERROR
== PR_GetError())) ?
201 static void TimeOfDayMessage(const char *msg
, PRThread
* me
)
205 PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters
, &tod
);
206 (void)PR_FormatTime(buffer
, sizeof(buffer
), "%H:%M:%S", &tod
);
209 cltsrv_log_file
, TEST_LOG_ALWAYS
,
210 ("%s(0x%p): %s\n", msg
, me
, buffer
));
211 } /* TimeOfDayMessage */
214 static void PR_CALLBACK
Client(void *arg
)
219 PRFileDesc
*fd
= NULL
;
220 PRUintn clipping
= DEFAULT_CLIPPING
;
221 PRThread
*me
= PR_GetCurrentThread();
222 CSClient_t
*client
= (CSClient_t
*)arg
;
223 CSDescriptor_t
*descriptor
= PR_NEW(CSDescriptor_t
);
224 PRIntervalTime timeout
= PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT
);
227 for (index
= 0; index
< sizeof(buffer
); ++index
) {
228 buffer
[index
] = (char)index
;
231 client
->started
= PR_IntervalNow();
234 client
->state
= cs_run
;
235 PR_NotifyCondVar(client
->stateChange
);
236 PR_Unlock(client
->ml
);
238 TimeOfDayMessage("Client started at", me
);
240 while (cs_run
== client
->state
)
242 PRInt32 bytes
, descbytes
, filebytes
, netbytes
;
244 (void)PR_NetAddrToString(&client
->serverAddress
, buffer
, sizeof(buffer
));
245 TEST_LOG(cltsrv_log_file
, TEST_LOG_INFO
,
246 ("\tClient(0x%p): connecting to server at %s\n", me
, buffer
));
248 fd
= PR_Socket(domain
, SOCK_STREAM
, protocol
);
249 TEST_ASSERT(NULL
!= fd
);
250 rv
= PR_Connect(fd
, &client
->serverAddress
, timeout
);
251 if (PR_FAILURE
== rv
)
254 cltsrv_log_file
, TEST_LOG_ERROR
,
255 ("\tClient(0x%p): conection failed (%d, %d)\n",
256 me
, PR_GetError(), PR_GetOSError()));
260 memset(descriptor
, 0, sizeof(*descriptor
));
261 descriptor
->size
= PR_htonl(descbytes
= rand() % clipping
);
263 descriptor
->filename
, sizeof(descriptor
->filename
),
264 "CS%p%p-%p.dat", client
->started
, me
, client
->operations
);
266 cltsrv_log_file
, TEST_LOG_VERBOSE
,
267 ("\tClient(0x%p): sending descriptor for %u bytes\n", me
, descbytes
));
269 fd
, descriptor
, sizeof(*descriptor
), SEND_FLAGS
, timeout
);
270 if (sizeof(CSDescriptor_t
) != bytes
)
272 if (Aborted(PR_FAILURE
)) {
275 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
278 cltsrv_log_file
, TEST_LOG_ERROR
,
279 ("\tClient(0x%p): send descriptor timeout\n", me
));
283 TEST_ASSERT(sizeof(*descriptor
) == bytes
);
286 while (netbytes
< descbytes
)
288 filebytes
= sizeof(buffer
);
289 if ((descbytes
- netbytes
) < filebytes
) {
290 filebytes
= descbytes
- netbytes
;
293 cltsrv_log_file
, TEST_LOG_VERBOSE
,
294 ("\tClient(0x%p): sending %d bytes\n", me
, filebytes
));
295 bytes
= PR_Send(fd
, buffer
, filebytes
, SEND_FLAGS
, timeout
);
296 if (filebytes
!= bytes
)
298 if (Aborted(PR_FAILURE
)) {
301 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
304 cltsrv_log_file
, TEST_LOG_ERROR
,
305 ("\tClient(0x%p): send data timeout\n", me
));
309 TEST_ASSERT(bytes
== filebytes
);
313 while (filebytes
< descbytes
)
315 netbytes
= sizeof(buffer
);
316 if ((descbytes
- filebytes
) < netbytes
) {
317 netbytes
= descbytes
- filebytes
;
320 cltsrv_log_file
, TEST_LOG_VERBOSE
,
321 ("\tClient(0x%p): receiving %d bytes\n", me
, netbytes
));
322 bytes
= PR_Recv(fd
, buffer
, netbytes
, RECV_FLAGS
, timeout
);
325 if (Aborted(PR_FAILURE
))
328 cltsrv_log_file
, TEST_LOG_ERROR
,
329 ("\tClient(0x%p): receive data aborted\n", me
));
332 else if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
334 cltsrv_log_file
, TEST_LOG_ERROR
,
335 ("\tClient(0x%p): receive data timeout\n", me
));
338 cltsrv_log_file
, TEST_LOG_ERROR
,
339 ("\tClient(0x%p): receive error (%d, %d)\n",
340 me
, PR_GetError(), PR_GetOSError()));
346 cltsrv_log_file
, TEST_LOG_ERROR
,
347 ("\t\tClient(0x%p): unexpected end of stream\n",
348 PR_GetCurrentThread()));
354 rv
= PR_Shutdown(fd
, PR_SHUTDOWN_BOTH
);
358 TEST_ASSERT(PR_SUCCESS
== rv
);
360 (void)PR_Close(fd
); fd
= NULL
;
362 cltsrv_log_file
, TEST_LOG_INFO
,
363 ("\tClient(0x%p): disconnected from server\n", me
));
366 client
->operations
+= 1;
367 client
->bytesTransferred
+= 2 * descbytes
;
368 rv
= PR_WaitCondVar(client
->stateChange
, rand() % clipping
);
369 PR_Unlock(client
->ml
);
376 client
->stopped
= PR_IntervalNow();
384 client
->state
= cs_exit
;
385 PR_NotifyCondVar(client
->stateChange
);
386 PR_Unlock(client
->ml
);
387 PR_DELETE(descriptor
);
389 cltsrv_log_file
, TEST_LOG_ALWAYS
,
390 ("\tClient(0x%p): stopped after %u operations and %u bytes\n",
391 PR_GetCurrentThread(), client
->operations
, client
->bytesTransferred
));
395 static PRStatus
ProcessRequest(PRFileDesc
*fd
, CSServer_t
*server
)
399 PRFileDesc
*file
= NULL
;
400 PRThread
* me
= PR_GetCurrentThread();
401 PRInt32 bytes
, descbytes
, netbytes
, filebytes
= 0;
402 CSDescriptor_t
*descriptor
= PR_NEW(CSDescriptor_t
);
403 PRIntervalTime timeout
= PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT
);
406 cltsrv_log_file
, TEST_LOG_VERBOSE
,
407 ("\tProcessRequest(0x%p): receiving desciptor\n", me
));
409 fd
, descriptor
, sizeof(*descriptor
), RECV_FLAGS
, timeout
);
416 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
419 cltsrv_log_file
, TEST_LOG_ERROR
,
420 ("\tProcessRequest(0x%p): receive timeout\n", me
));
428 cltsrv_log_file
, TEST_LOG_ERROR
,
429 ("\tProcessRequest(0x%p): unexpected end of file\n", me
));
432 descbytes
= PR_ntohl(descriptor
->size
);
433 TEST_ASSERT(sizeof(*descriptor
) == bytes
);
436 cltsrv_log_file
, TEST_LOG_VERBOSE
,
437 ("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n",
438 me
, descbytes
, descriptor
->filename
));
441 descriptor
->filename
, (PR_CREATE_FILE
| PR_WRONLY
), 0666);
448 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
451 cltsrv_log_file
, TEST_LOG_ERROR
,
452 ("\tProcessRequest(0x%p): open file timeout\n", me
));
456 TEST_ASSERT(NULL
!= file
);
459 while (filebytes
< descbytes
)
461 netbytes
= sizeof(buffer
);
462 if ((descbytes
- filebytes
) < netbytes
) {
463 netbytes
= descbytes
- filebytes
;
466 cltsrv_log_file
, TEST_LOG_VERBOSE
,
467 ("\tProcessRequest(0x%p): receive %d bytes\n", me
, netbytes
));
468 bytes
= PR_Recv(fd
, buffer
, netbytes
, RECV_FLAGS
, timeout
);
475 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
478 cltsrv_log_file
, TEST_LOG_ERROR
,
479 ("\t\tProcessRequest(0x%p): receive data timeout\n", me
));
483 * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED)
484 * on NT here. This is equivalent to ECONNRESET on Unix.
488 cltsrv_log_file
, TEST_LOG_WARNING
,
489 ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n",
490 me
, PR_GetError(), PR_GetOSError()));
496 cltsrv_log_file
, TEST_LOG_WARNING
,
497 ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me
));
503 /* The byte count for PR_Write should be positive */
504 MY_ASSERT(netbytes
> 0);
506 cltsrv_log_file
, TEST_LOG_VERBOSE
,
507 ("\tProcessRequest(0x%p): write %d bytes to file\n", me
, netbytes
));
508 bytes
= PR_Write(file
, buffer
, netbytes
);
509 if (netbytes
!= bytes
)
515 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
518 cltsrv_log_file
, TEST_LOG_ERROR
,
519 ("\t\tProcessRequest(0x%p): write file timeout\n", me
));
523 TEST_ASSERT(bytes
> 0);
527 server
->operations
+= 1;
528 server
->bytesTransferred
+= filebytes
;
529 PR_Unlock(server
->ml
);
535 TEST_ASSERT(PR_SUCCESS
== rv
);
539 cltsrv_log_file
, TEST_LOG_VERBOSE
,
540 ("\t\tProcessRequest(0x%p): opening %s\n", me
, descriptor
->filename
));
541 file
= PR_Open(descriptor
->filename
, PR_RDONLY
, 0);
548 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
551 cltsrv_log_file
, TEST_LOG_ERROR
,
552 ("\t\tProcessRequest(0x%p): open file timeout\n",
553 PR_GetCurrentThread()));
557 cltsrv_log_file
, TEST_LOG_ERROR
,
558 ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n",
559 me
, PR_GetError(), PR_GetOSError()));
562 TEST_ASSERT(NULL
!= file
);
565 while (netbytes
< descbytes
)
567 filebytes
= sizeof(buffer
);
568 if ((descbytes
- netbytes
) < filebytes
) {
569 filebytes
= descbytes
- netbytes
;
572 cltsrv_log_file
, TEST_LOG_VERBOSE
,
573 ("\tProcessRequest(0x%p): read %d bytes from file\n", me
, filebytes
));
574 bytes
= PR_Read(file
, buffer
, filebytes
);
575 if (filebytes
!= bytes
)
581 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
583 cltsrv_log_file
, TEST_LOG_ERROR
,
584 ("\t\tProcessRequest(0x%p): read file timeout\n", me
));
587 cltsrv_log_file
, TEST_LOG_ERROR
,
588 ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n",
589 me
, PR_GetError(), PR_GetOSError()));
592 TEST_ASSERT(bytes
> 0);
596 cltsrv_log_file
, TEST_LOG_VERBOSE
,
597 ("\t\tProcessRequest(0x%p): sending %d bytes\n", me
, filebytes
));
598 bytes
= PR_Send(fd
, buffer
, filebytes
, SEND_FLAGS
, timeout
);
599 if (filebytes
!= bytes
)
605 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
608 cltsrv_log_file
, TEST_LOG_ERROR
,
609 ("\t\tProcessRequest(0x%p): send data timeout\n", me
));
614 TEST_ASSERT(bytes
> 0);
618 server
->bytesTransferred
+= filebytes
;
619 PR_Unlock(server
->ml
);
621 rv
= PR_Shutdown(fd
, PR_SHUTDOWN_BOTH
);
630 TEST_ASSERT(PR_SUCCESS
== rv
);
638 drv
= PR_Delete(descriptor
->filename
);
639 TEST_ASSERT(PR_SUCCESS
== drv
);
642 cltsrv_log_file
, TEST_LOG_VERBOSE
,
643 ("\t\tProcessRequest(0x%p): Finished\n", me
));
645 PR_DELETE(descriptor
);
648 PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */
651 } /* ProcessRequest */
653 static PRStatus
CreateWorker(CSServer_t
*server
, CSPool_t
*pool
)
655 CSWorker_t
*worker
= PR_NEWZAP(CSWorker_t
);
656 worker
->server
= server
;
657 PR_INIT_CLIST(&worker
->element
);
658 worker
->thread
= PR_CreateThread(
659 PR_USER_THREAD
, Worker
, worker
,
660 DEFAULT_SERVER_PRIORITY
, thread_scope
,
661 PR_UNJOINABLE_THREAD
, 0);
662 if (NULL
== worker
->thread
)
668 TEST_LOG(cltsrv_log_file
, TEST_LOG_STATUS
,
669 ("\tCreateWorker(0x%p): create new worker (0x%p)\n",
670 PR_GetCurrentThread(), worker
->thread
));
675 static void PR_CALLBACK
Worker(void *arg
)
679 PRFileDesc
*fd
= NULL
;
680 PRThread
*me
= PR_GetCurrentThread();
681 CSWorker_t
*worker
= (CSWorker_t
*)arg
;
682 CSServer_t
*server
= worker
->server
;
683 CSPool_t
*pool
= &server
->pool
;
686 cltsrv_log_file
, TEST_LOG_NOTICE
,
687 ("\t\tWorker(0x%p): started [%u]\n", me
, pool
->workers
+ 1));
690 PR_APPEND_LINK(&worker
->element
, &server
->list
);
691 pool
->workers
+= 1; /* define our existance */
693 while (cs_run
== server
->state
)
695 while (pool
->accepting
>= server
->workers
.accepting
)
698 cltsrv_log_file
, TEST_LOG_VERBOSE
,
699 ("\t\tWorker(0x%p): waiting for accept slot[%d]\n",
700 me
, pool
->accepting
));
701 rv
= PR_WaitCondVar(pool
->acceptComplete
, PR_INTERVAL_NO_TIMEOUT
);
702 if (Aborted(rv
) || (cs_run
!= server
->state
))
705 cltsrv_log_file
, TEST_LOG_NOTICE
,
706 ("\tWorker(0x%p): has been %s\n",
707 me
, (Aborted(rv
) ? "interrupted" : "stopped")));
711 pool
->accepting
+= 1; /* how many are really in accept */
712 PR_Unlock(server
->ml
);
715 cltsrv_log_file
, TEST_LOG_VERBOSE
,
716 ("\t\tWorker(0x%p): calling accept\n", me
));
717 fd
= PR_Accept(server
->listener
, &from
, PR_INTERVAL_NO_TIMEOUT
);
720 pool
->accepting
-= 1;
721 PR_NotifyCondVar(pool
->acceptComplete
);
723 if ((NULL
== fd
) && Aborted(PR_FAILURE
))
725 if (NULL
!= server
->listener
)
727 PR_Close(server
->listener
);
728 server
->listener
= NULL
;
736 ** Create another worker of the total number of workers is
737 ** less than the minimum specified or we have none left in
738 ** accept() AND we're not over the maximum.
739 ** This sort of presumes that the number allowed in accept
740 ** is at least as many as the minimum. Otherwise we'll keep
741 ** creating new threads and deleting them soon after.
744 ((pool
->workers
< server
->workers
.minimum
) ||
745 ((0 == pool
->accepting
)
746 && (pool
->workers
< server
->workers
.maximum
))) ?
749 PR_Unlock(server
->ml
);
752 (void)CreateWorker(server
, pool
);
755 rv
= ProcessRequest(fd
, server
);
756 if (PR_SUCCESS
!= rv
)
758 cltsrv_log_file
, TEST_LOG_ERROR
,
759 ("\t\tWorker(0x%p): server process ended abnormally\n", me
));
760 (void)PR_Close(fd
); fd
= NULL
;
769 PR_Unlock(server
->ml
);
773 (void)PR_Shutdown(fd
, PR_SHUTDOWN_BOTH
);
778 cltsrv_log_file
, TEST_LOG_NOTICE
,
779 ("\t\tWorker(0x%p): exiting [%u]\n", PR_GetCurrentThread(), pool
->workers
));
782 pool
->workers
-= 1; /* undefine our existance */
783 PR_REMOVE_AND_INIT_LINK(&worker
->element
);
784 PR_NotifyCondVar(pool
->exiting
);
785 PR_Unlock(server
->ml
);
787 PR_DELETE(worker
); /* destruction of the "worker" object */
791 static void PR_CALLBACK
Server(void *arg
)
794 PRNetAddr serverAddress
;
795 PRThread
*me
= PR_GetCurrentThread();
796 CSServer_t
*server
= (CSServer_t
*)arg
;
797 PRSocketOptionData sockOpt
;
799 server
->listener
= PR_Socket(domain
, SOCK_STREAM
, protocol
);
801 sockOpt
.option
= PR_SockOpt_Reuseaddr
;
802 sockOpt
.value
.reuse_addr
= PR_TRUE
;
803 rv
= PR_SetSocketOption(server
->listener
, &sockOpt
);
804 TEST_ASSERT(PR_SUCCESS
== rv
);
806 memset(&serverAddress
, 0, sizeof(serverAddress
));
807 if (PR_AF_INET6
!= domain
) {
808 TEST_LOG(cltsrv_log_file
, TEST_LOG_ALWAYS
,
809 ("server binding to ip port %s\n", DEFAULT_PORT
));
810 rv
= PR_InitializeNetAddr(PR_IpAddrAny
, DEFAULT_PORT
, &serverAddress
);
813 TEST_LOG(cltsrv_log_file
, TEST_LOG_ALWAYS
,
814 ("server binding to ipv6 port %s\n", DEFAULT_PORT
));
815 rv
= PR_SetNetAddr(PR_IpAddrAny
, PR_AF_INET6
, DEFAULT_PORT
,
818 rv
= PR_Bind(server
->listener
, &serverAddress
);
819 TEST_ASSERT(PR_SUCCESS
== rv
);
821 rv
= PR_Listen(server
->listener
, server
->backlog
);
822 TEST_ASSERT(PR_SUCCESS
== rv
);
824 server
->started
= PR_IntervalNow();
825 TimeOfDayMessage("Server started at", me
);
828 server
->state
= cs_run
;
829 PR_NotifyCondVar(server
->stateChange
);
830 PR_Unlock(server
->ml
);
833 ** Create the first worker (actually, a thread that accepts
834 ** connections and then processes the work load as needed).
835 ** From this point on, additional worker threads are created
836 ** as they are needed by existing worker threads.
838 rv
= CreateWorker(server
, &server
->pool
);
839 TEST_ASSERT(PR_SUCCESS
== rv
);
842 ** From here on this thread is merely hanging around as the contact
843 ** point for the main test driver. It's just waiting for the driver
844 ** to declare the test complete.
847 cltsrv_log_file
, TEST_LOG_VERBOSE
,
848 ("\tServer(0x%p): waiting for state change\n", me
));
851 while ((cs_run
== server
->state
) && !Aborted(rv
))
853 rv
= PR_WaitCondVar(server
->stateChange
, PR_INTERVAL_NO_TIMEOUT
);
855 PR_Unlock(server
->ml
);
859 cltsrv_log_file
, TEST_LOG_INFO
,
860 ("\tServer(0x%p): shutting down workers\n", me
));
863 ** Get all the worker threads to exit. They know how to
864 ** clean up after themselves, so this is just a matter of
865 ** waiting for clorine in the pool to take effect. During
866 ** this stage we're ignoring interrupts.
868 server
->workers
.minimum
= server
->workers
.maximum
= 0;
871 while (!PR_CLIST_IS_EMPTY(&server
->list
))
873 PRCList
*head
= PR_LIST_HEAD(&server
->list
);
874 CSWorker_t
*worker
= (CSWorker_t
*)head
;
876 cltsrv_log_file
, TEST_LOG_VERBOSE
,
877 ("\tServer(0x%p): interrupting worker(0x%p)\n", me
, worker
));
878 rv
= PR_Interrupt(worker
->thread
);
879 TEST_ASSERT(PR_SUCCESS
== rv
);
880 PR_REMOVE_AND_INIT_LINK(head
);
883 while (server
->pool
.workers
> 0)
886 cltsrv_log_file
, TEST_LOG_NOTICE
,
887 ("\tServer(0x%p): waiting for %u workers to exit\n",
888 me
, server
->pool
.workers
));
889 (void)PR_WaitCondVar(server
->pool
.exiting
, PR_INTERVAL_NO_TIMEOUT
);
892 server
->state
= cs_exit
;
893 PR_NotifyCondVar(server
->stateChange
);
894 PR_Unlock(server
->ml
);
897 cltsrv_log_file
, TEST_LOG_ALWAYS
,
898 ("\tServer(0x%p): stopped after %u operations and %u bytes\n",
899 me
, server
->operations
, server
->bytesTransferred
));
901 if (NULL
!= server
->listener
) {
902 PR_Close(server
->listener
);
904 server
->stopped
= PR_IntervalNow();
908 static void WaitForCompletion(PRIntn execution
)
910 while (execution
> 0)
912 PRIntn dally
= (execution
> 30) ? 30 : execution
;
913 PR_Sleep(PR_SecondsToInterval(dally
));
915 PT_FPrintStats(debug_out
, "\nPThread Statistics\n");
919 } /* WaitForCompletion */
921 static void Help(void)
923 PR_fprintf(debug_out
, "cltsrv test program usage:\n");
924 PR_fprintf(debug_out
, "\t-a <n> threads allowed in accept (5)\n");
925 PR_fprintf(debug_out
, "\t-b <n> backlock for listen (5)\n");
926 PR_fprintf(debug_out
, "\t-c <threads> number of clients to create (1)\n");
927 PR_fprintf(debug_out
, "\t-f <low> low water mark for fd caching (0)\n");
928 PR_fprintf(debug_out
, "\t-F <high> high water mark for fd caching (0)\n");
929 PR_fprintf(debug_out
, "\t-w <threads> minimal number of server threads (1)\n");
930 PR_fprintf(debug_out
, "\t-W <threads> maximum number of server threads (1)\n");
931 PR_fprintf(debug_out
, "\t-e <seconds> duration of the test in seconds (10)\n");
932 PR_fprintf(debug_out
, "\t-s <string> dsn name of server (localhost)\n");
933 PR_fprintf(debug_out
, "\t-G use GLOBAL threads (LOCAL)\n");
934 PR_fprintf(debug_out
, "\t-X use XTP as transport (TCP)\n");
935 PR_fprintf(debug_out
, "\t-6 Use IPv6 (IPv4)\n");
936 PR_fprintf(debug_out
, "\t-v verbosity (accumulative) (0)\n");
937 PR_fprintf(debug_out
, "\t-p pthread statistics (FALSE)\n");
938 PR_fprintf(debug_out
, "\t-d debug mode (FALSE)\n");
939 PR_fprintf(debug_out
, "\t-h this message\n");
942 static Verbosity
IncrementVerbosity(void)
944 PRIntn verboge
= (PRIntn
)verbosity
+ 1;
945 return (Verbosity
)verboge
;
946 } /* IncrementVerbosity */
948 int main(int argc
, char** argv
)
953 PRStatus rv
, joinStatus
;
954 CSServer_t
*server
= NULL
;
956 PRUintn backlog
= DEFAULT_BACKLOG
;
957 PRUintn clients
= DEFAULT_CLIENTS
;
958 const char *serverName
= DEFAULT_SERVER
;
959 PRBool serverIsLocal
= PR_TRUE
;
960 PRUintn accepting
= ALLOWED_IN_ACCEPT
;
961 PRUintn workersMin
= DEFAULT_WORKERS_MIN
;
962 PRUintn workersMax
= DEFAULT_WORKERS_MAX
;
963 PRIntn execution
= DEFAULT_EXECUTION_TIME
;
964 PRIntn low
= DEFAULT_LOW
, high
= DEFAULT_HIGH
;
967 * -G use global threads
968 * -a <n> threads allowed in accept
969 * -b <n> backlock for listen
970 * -c <threads> number of clients to create
971 * -f <low> low water mark for caching FDs
972 * -F <high> high water mark for caching FDs
973 * -w <threads> minimal number of server threads
974 * -W <threads> maximum number of server threads
975 * -e <seconds> duration of the test in seconds
976 * -s <string> dsn name of server (implies no server here)
981 PLOptState
*opt
= PL_CreateOptState(argc
, argv
, "GX6b:a:c:f:F:w:W:e:s:vdhp");
983 debug_out
= PR_GetSpecialFD(PR_StandardError
);
985 while (PL_OPT_EOL
!= (os
= PL_GetNextOpt(opt
)))
987 if (PL_OPT_BAD
== os
) {
992 case 'G': /* use global threads */
993 thread_scope
= PR_GLOBAL_THREAD
;
995 case 'X': /* use XTP as transport */
998 case '6': /* Use IPv6 */
999 domain
= PR_AF_INET6
;
1001 case 'a': /* the value for accepting */
1002 accepting
= atoi(opt
->value
);
1004 case 'b': /* the value for backlock */
1005 backlog
= atoi(opt
->value
);
1007 case 'c': /* number of client threads */
1008 clients
= atoi(opt
->value
);
1010 case 'f': /* low water fd cache */
1011 low
= atoi(opt
->value
);
1013 case 'F': /* low water fd cache */
1014 high
= atoi(opt
->value
);
1016 case 'w': /* minimum server worker threads */
1017 workersMin
= atoi(opt
->value
);
1019 case 'W': /* maximum server worker threads */
1020 workersMax
= atoi(opt
->value
);
1022 case 'e': /* program execution time in seconds */
1023 execution
= atoi(opt
->value
);
1025 case 's': /* server's address */
1026 serverName
= opt
->value
;
1028 case 'v': /* verbosity */
1029 verbosity
= IncrementVerbosity();
1031 case 'd': /* debug mode */
1032 debug_mode
= PR_TRUE
;
1034 case 'p': /* pthread mode */
1035 pthread_stats
= PR_TRUE
;
1043 PL_DestroyOptState(opt
);
1045 if (0 != PL_strcmp(serverName
, DEFAULT_SERVER
)) {
1046 serverIsLocal
= PR_FALSE
;
1048 if (0 == execution
) {
1049 execution
= DEFAULT_EXECUTION_TIME
;
1051 if (0 == workersMax
) {
1052 workersMax
= DEFAULT_WORKERS_MAX
;
1054 if (0 == workersMin
) {
1055 workersMin
= DEFAULT_WORKERS_MIN
;
1057 if (0 == accepting
) {
1058 accepting
= ALLOWED_IN_ACCEPT
;
1061 backlog
= DEFAULT_BACKLOG
;
1064 if (workersMin
> accepting
) {
1065 accepting
= workersMin
;
1069 TimeOfDayMessage("Client/Server started at", PR_GetCurrentThread());
1071 cltsrv_log_file
= PR_NewLogModule("cltsrv_log");
1072 MY_ASSERT(NULL
!= cltsrv_log_file
);
1073 boolean
= PR_SetLogFile("cltsrv.log");
1076 rv
= PR_SetFDCacheSize(low
, high
);
1077 PR_ASSERT(PR_SUCCESS
== rv
);
1081 /* Establish the server */
1083 cltsrv_log_file
, TEST_LOG_INFO
,
1084 ("main(0x%p): starting server\n", PR_GetCurrentThread()));
1086 server
= PR_NEWZAP(CSServer_t
);
1087 PR_INIT_CLIST(&server
->list
);
1088 server
->state
= cs_init
;
1089 server
->ml
= PR_NewLock();
1090 server
->backlog
= backlog
;
1091 server
->port
= DEFAULT_PORT
;
1092 server
->workers
.minimum
= workersMin
;
1093 server
->workers
.maximum
= workersMax
;
1094 server
->workers
.accepting
= accepting
;
1095 server
->stateChange
= PR_NewCondVar(server
->ml
);
1096 server
->pool
.exiting
= PR_NewCondVar(server
->ml
);
1097 server
->pool
.acceptComplete
= PR_NewCondVar(server
->ml
);
1100 cltsrv_log_file
, TEST_LOG_NOTICE
,
1101 ("main(0x%p): creating server thread\n", PR_GetCurrentThread()));
1103 server
->thread
= PR_CreateThread(
1104 PR_USER_THREAD
, Server
, server
, PR_PRIORITY_HIGH
,
1105 thread_scope
, PR_JOINABLE_THREAD
, 0);
1106 TEST_ASSERT(NULL
!= server
->thread
);
1109 cltsrv_log_file
, TEST_LOG_VERBOSE
,
1110 ("main(0x%p): waiting for server init\n", PR_GetCurrentThread()));
1112 PR_Lock(server
->ml
);
1113 while (server
->state
== cs_init
) {
1114 PR_WaitCondVar(server
->stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1116 PR_Unlock(server
->ml
);
1119 cltsrv_log_file
, TEST_LOG_VERBOSE
,
1120 ("main(0x%p): server init complete (port #%d)\n",
1121 PR_GetCurrentThread(), server
->port
));
1126 /* Create all of the clients */
1128 char buffer
[BUFFER_SIZE
];
1129 client
= (CSClient_t
*)PR_CALLOC(clients
* sizeof(CSClient_t
));
1132 cltsrv_log_file
, TEST_LOG_VERBOSE
,
1133 ("main(0x%p): creating %d client threads\n",
1134 PR_GetCurrentThread(), clients
));
1138 rv
= PR_GetHostByName(serverName
, buffer
, BUFFER_SIZE
, &host
);
1139 if (PR_SUCCESS
!= rv
)
1141 PL_FPrintError(PR_STDERR
, "PR_GetHostByName");
1146 for (index
= 0; index
< clients
; ++index
)
1148 client
[index
].state
= cs_init
;
1149 client
[index
].ml
= PR_NewLock();
1152 if (PR_AF_INET6
!= domain
) {
1153 TEST_LOG(cltsrv_log_file
, TEST_LOG_ALWAYS
,
1154 ("loopback client ip port %s\n", DEFAULT_PORT
));
1155 (void)PR_InitializeNetAddr(
1156 PR_IpAddrLoopback
, DEFAULT_PORT
,
1157 &client
[index
].serverAddress
);
1160 TEST_LOG(cltsrv_log_file
, TEST_LOG_ALWAYS
,
1161 ("loopback client ipv6 port %s\n", DEFAULT_PORT
));
1162 rv
= PR_SetNetAddr(PR_IpAddrLoopback
, PR_AF_INET6
,
1163 DEFAULT_PORT
, &client
[index
].serverAddress
);
1168 TEST_LOG(cltsrv_log_file
, TEST_LOG_ALWAYS
,
1169 ("client enumerate port %s\n", DEFAULT_PORT
));
1170 (void)PR_EnumerateHostEnt(
1171 0, &host
, DEFAULT_PORT
, &client
[index
].serverAddress
);
1173 client
[index
].stateChange
= PR_NewCondVar(client
[index
].ml
);
1175 cltsrv_log_file
, TEST_LOG_INFO
,
1176 ("main(0x%p): creating client threads\n", PR_GetCurrentThread()));
1177 client
[index
].thread
= PR_CreateThread(
1178 PR_USER_THREAD
, Client
, &client
[index
], PR_PRIORITY_NORMAL
,
1179 thread_scope
, PR_JOINABLE_THREAD
, 0);
1180 TEST_ASSERT(NULL
!= client
[index
].thread
);
1181 PR_Lock(client
[index
].ml
);
1182 while (cs_init
== client
[index
].state
) {
1183 PR_WaitCondVar(client
[index
].stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1185 PR_Unlock(client
[index
].ml
);
1189 /* Then just let them go at it for a bit */
1191 cltsrv_log_file
, TEST_LOG_ALWAYS
,
1192 ("main(0x%p): waiting for execution interval (%d seconds)\n",
1193 PR_GetCurrentThread(), execution
));
1195 WaitForCompletion(execution
);
1197 TimeOfDayMessage("Shutting down", PR_GetCurrentThread());
1201 for (index
= 0; index
< clients
; ++index
)
1203 TEST_LOG(cltsrv_log_file
, TEST_LOG_STATUS
,
1204 ("main(0x%p): notifying client(0x%p) to stop\n",
1205 PR_GetCurrentThread(), client
[index
].thread
));
1207 PR_Lock(client
[index
].ml
);
1208 if (cs_run
== client
[index
].state
)
1210 client
[index
].state
= cs_stop
;
1211 PR_Interrupt(client
[index
].thread
);
1212 while (cs_stop
== client
[index
].state
)
1214 client
[index
].stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1216 PR_Unlock(client
[index
].ml
);
1218 TEST_LOG(cltsrv_log_file
, TEST_LOG_VERBOSE
,
1219 ("main(0x%p): joining client(0x%p)\n",
1220 PR_GetCurrentThread(), client
[index
].thread
));
1222 joinStatus
= PR_JoinThread(client
[index
].thread
);
1223 TEST_ASSERT(PR_SUCCESS
== joinStatus
);
1224 PR_DestroyCondVar(client
[index
].stateChange
);
1225 PR_DestroyLock(client
[index
].ml
);
1232 /* All clients joined - retrieve the server */
1234 cltsrv_log_file
, TEST_LOG_NOTICE
,
1235 ("main(0x%p): notifying server(0x%p) to stop\n",
1236 PR_GetCurrentThread(), server
->thread
));
1238 PR_Lock(server
->ml
);
1239 server
->state
= cs_stop
;
1240 PR_Interrupt(server
->thread
);
1241 while (cs_exit
!= server
->state
) {
1242 PR_WaitCondVar(server
->stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1244 PR_Unlock(server
->ml
);
1247 cltsrv_log_file
, TEST_LOG_NOTICE
,
1248 ("main(0x%p): joining server(0x%p)\n",
1249 PR_GetCurrentThread(), server
->thread
));
1250 joinStatus
= PR_JoinThread(server
->thread
);
1251 TEST_ASSERT(PR_SUCCESS
== joinStatus
);
1253 PR_DestroyCondVar(server
->stateChange
);
1254 PR_DestroyCondVar(server
->pool
.exiting
);
1255 PR_DestroyCondVar(server
->pool
.acceptComplete
);
1256 PR_DestroyLock(server
->ml
);
1261 cltsrv_log_file
, TEST_LOG_ALWAYS
,
1262 ("main(0x%p): test complete\n", PR_GetCurrentThread()));
1264 PT_FPrintStats(debug_out
, "\nPThread Statistics\n");
1266 TimeOfDayMessage("Test exiting at", PR_GetCurrentThread());