1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 /***********************************************************************
10 ** Description: Test threadpool functionality.
12 ** Modification History:
24 #if defined(_PR_PTHREADS)
29 #if defined(XP_UNIX) || defined (XP_OS2)
39 static int _debug_on
= 0;
40 static char *program_name
= NULL
;
41 static void serve_client_write(void *arg
);
43 #include "obsolete/prsem.h"
49 #define DPRINTF(arg) if (_debug_on) printf arg
52 #define BUF_DATA_SIZE (2 * 1024)
53 #define TCP_MESG_SIZE 1024
54 #define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */
57 #define NUM_TCP_CONNECTIONS_PER_CLIENT 10
58 #define NUM_TCP_MESGS_PER_CONNECTION 10
59 #define TCP_SERVER_PORT 10000
60 #define SERVER_MAX_BIND_COUNT 100
63 char *getcwd(char *buf
, size_t size
)
65 wchar_t wpath
[MAX_PATH
];
66 _wgetcwd(wpath
, MAX_PATH
);
67 WideCharToMultiByte(CP_ACP
, 0, wpath
, -1, buf
, size
, 0, 0);
73 static PRInt32 num_tcp_clients
= NUM_TCP_CLIENTS
;
74 static PRInt32 num_tcp_connections_per_client
= NUM_TCP_CONNECTIONS_PER_CLIENT
;
75 static PRInt32 tcp_mesg_size
= TCP_MESG_SIZE
;
76 static PRInt32 num_tcp_mesgs_per_connection
= NUM_TCP_MESGS_PER_CONNECTION
;
77 static void TCP_Server_Accept(void *arg
);
81 typedef struct buffer
{
82 char data
[BUF_DATA_SIZE
];
86 typedef struct Server_Param
{
87 PRJobIoDesc iod
; /* socket to read from/write to */
88 PRInt32 datalen
; /* bytes of data transfered in each read/write */
90 PRMonitor
*exit_mon
; /* monitor to signal on exit */
91 PRInt32
*job_counterp
; /* counter to decrement, before exit */
92 PRInt32 conn_counter
; /* counter to decrement, before exit */
96 typedef struct Serve_Client_Param
{
97 PRJobIoDesc iod
; /* socket to read from/write to */
98 PRInt32 datalen
; /* bytes of data transfered in each read/write */
99 PRMonitor
*exit_mon
; /* monitor to signal on exit */
100 PRInt32
*job_counterp
; /* counter to decrement, before exit */
102 } Serve_Client_Param
;
104 typedef struct Session
{
105 PRJobIoDesc iod
; /* socket to read from/write to */
110 PRMonitor
*exit_mon
; /* monitor to signal on exit */
111 PRInt32
*job_counterp
; /* counter to decrement, before exit */
116 serve_client_read(void *arg
)
118 Session
*sp
= (Session
*) arg
;
126 PRIntervalTime timeout
= PR_INTERVAL_NO_TIMEOUT
;
128 sockfd
= sp
->iod
.socket
;
129 buf
= sp
->in_buf
->data
;
131 PR_ASSERT(sp
->msg_num
< num_tcp_mesgs_per_connection
);
132 PR_ASSERT(sp
->bytes_read
< sp
->bytes
);
134 offset
= sp
->bytes_read
;
135 rem
= sp
->bytes
- offset
;
136 bytes
= PR_Recv(sockfd
, buf
+ offset
, rem
, 0, timeout
);
140 sp
->bytes_read
+= bytes
;
141 sp
->iod
.timeout
= PR_SecondsToInterval(60);
142 if (sp
->bytes_read
< sp
->bytes
) {
143 jobp
= PR_QueueJob_Read(sp
->tp
, &sp
->iod
, serve_client_read
, sp
,
145 PR_ASSERT(NULL
!= jobp
);
148 PR_ASSERT(sp
->bytes_read
== sp
->bytes
);
149 DPRINTF(("serve_client: read complete, msg(%d) \n", sp
->msg_num
));
151 sp
->iod
.timeout
= PR_SecondsToInterval(60);
152 jobp
= PR_QueueJob_Write(sp
->tp
, &sp
->iod
, serve_client_write
, sp
,
154 PR_ASSERT(NULL
!= jobp
);
160 serve_client_write(void *arg
)
162 Session
*sp
= (Session
*) arg
;
168 sockfd
= sp
->iod
.socket
;
169 buf
= sp
->in_buf
->data
;
171 PR_ASSERT(sp
->msg_num
< num_tcp_mesgs_per_connection
);
173 bytes
= PR_Send(sockfd
, buf
, sp
->bytes
, 0, PR_INTERVAL_NO_TIMEOUT
);
174 PR_ASSERT(bytes
== sp
->bytes
);
179 DPRINTF(("serve_client: write complete, msg(%d) \n", sp
->msg_num
));
181 if (sp
->msg_num
< num_tcp_mesgs_per_connection
) {
183 sp
->iod
.timeout
= PR_SecondsToInterval(60);
184 jobp
= PR_QueueJob_Read(sp
->tp
, &sp
->iod
, serve_client_read
, sp
,
186 PR_ASSERT(NULL
!= jobp
);
190 DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp
->msg_num
));
191 if (PR_Shutdown(sockfd
, PR_SHUTDOWN_BOTH
) < 0) {
192 fprintf(stderr
,"%s: ERROR - PR_Shutdown\n", program_name
);
196 PR_EnterMonitor(sp
->exit_mon
);
197 --(*sp
->job_counterp
);
198 PR_Notify(sp
->exit_mon
);
199 PR_ExitMonitor(sp
->exit_mon
);
201 PR_DELETE(sp
->in_buf
);
209 * Thread, started by the server, for serving a client connection.
210 * Reads data from socket and writes it back, unmodified, and
213 static void PR_CALLBACK
214 Serve_Client(void *arg
)
216 Serve_Client_Param
*scp
= (Serve_Client_Param
*) arg
;
221 sp
= PR_NEW(Session
);
224 in_buf
= PR_NEW(buffer
);
225 if (in_buf
== NULL
) {
226 fprintf(stderr
,"%s: failed to alloc buffer struct\n",program_name
);
232 sp
->bytes
= scp
->datalen
;
236 sp
->exit_mon
= scp
->exit_mon
;
237 sp
->job_counterp
= scp
->job_counterp
;
239 sp
->iod
.timeout
= PR_SecondsToInterval(60);
240 jobp
= PR_QueueJob_Read(sp
->tp
, &sp
->iod
, serve_client_read
, sp
,
242 PR_ASSERT(NULL
!= jobp
);
247 print_stats(void *arg
)
249 Server_Param
*sp
= (Server_Param
*) arg
;
250 PRThreadPool
*tp
= sp
->tp
;
254 PR_EnterMonitor(sp
->exit_mon
);
255 counter
= (*sp
->job_counterp
);
256 PR_ExitMonitor(sp
->exit_mon
);
258 printf("PRINT_STATS: #client connections = %d\n",counter
);
261 jobp
= PR_QueueJob_Timer(tp
, PR_MillisecondsToInterval(500),
262 print_stats
, sp
, PR_FALSE
);
264 PR_ASSERT(NULL
!= jobp
);
267 static int job_counter
= 0;
270 * Server binds an address to a socket, starts a client process and
271 * listens for incoming connections.
272 * Each client connects to the server and sends a chunk of data
273 * Starts a Serve_Client job for each incoming connection, to read
274 * the data from the client and send it back to the client, unmodified.
275 * Each client checks that data received from server is same as the
276 * data it sent to the server.
277 * Finally, the threadpool is shutdown
279 static void PR_CALLBACK
280 TCP_Server(void *arg
)
282 PRThreadPool
*tp
= (PRThreadPool
*) arg
;
292 * Create a tcp socket
294 if ((sockfd
= PR_NewTCPSocket()) == NULL
) {
295 fprintf(stderr
,"%s: PR_NewTCPSocket failed\n", program_name
);
298 memset(&netaddr
, 0, sizeof(netaddr
));
299 netaddr
.inet
.family
= PR_AF_INET
;
300 netaddr
.inet
.port
= PR_htons(TCP_SERVER_PORT
);
301 netaddr
.inet
.ip
= PR_htonl(PR_INADDR_ANY
);
303 * try a few times to bind server's address, if addresses are in
307 while (PR_Bind(sockfd
, &netaddr
) < 0) {
308 if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR
) {
309 netaddr
.inet
.port
+= 2;
310 if (i
++ < SERVER_MAX_BIND_COUNT
) {
314 fprintf(stderr
,"%s: ERROR - PR_Bind failed\n", program_name
);
320 if (PR_Listen(sockfd
, 32) < 0) {
321 fprintf(stderr
,"%s: ERROR - PR_Listen failed\n", program_name
);
326 if (PR_GetSockName(sockfd
, &netaddr
) < 0) {
327 fprintf(stderr
,"%s: ERROR - PR_GetSockName failed\n", program_name
);
333 "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
334 netaddr
.inet
.ip
, netaddr
.inet
.port
));
336 sp
= PR_NEW(Server_Param
);
338 fprintf(stderr
,"%s: PR_NEW failed\n", program_name
);
342 sp
->iod
.socket
= sockfd
;
343 sp
->iod
.timeout
= PR_SecondsToInterval(60);
344 sp
->datalen
= tcp_mesg_size
;
345 sp
->exit_mon
= sc_mon
;
346 sp
->job_counterp
= &job_counter
;
347 sp
->conn_counter
= 0;
349 sp
->netaddr
= netaddr
;
351 /* create and cancel an io job */
352 jobp
= PR_QueueJob_Accept(tp
, &sp
->iod
, TCP_Server_Accept
, sp
,
354 PR_ASSERT(NULL
!= jobp
);
355 rval
= PR_CancelJob(jobp
);
356 PR_ASSERT(PR_SUCCESS
== rval
);
359 * create the client process
363 char *argv
[MAX_ARGS
+ 1];
366 char path
[1024 + sizeof("/thrpool_client")];
368 getcwd(path
, sizeof(path
));
370 (void)strcat(path
, "/thrpool_client");
372 (void)strcat(path
, ".exe");
374 argv
[index
++] = path
;
375 sprintf(port
,"%d",PR_ntohs(netaddr
.inet
.port
));
378 argv
[index
++] = "-d";
379 argv
[index
++] = "-p";
380 argv
[index
++] = port
;
381 argv
[index
++] = NULL
;
383 argv
[index
++] = "-p";
384 argv
[index
++] = port
;
385 argv
[index
++] = NULL
;
387 PR_ASSERT(MAX_ARGS
>= (index
- 1));
389 DPRINTF(("creating client process %s ...\n", path
));
390 if (PR_FAILURE
== PR_CreateProcessDetached(path
, argv
, NULL
, NULL
)) {
392 "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
398 sc_mon
= PR_NewMonitor();
399 if (sc_mon
== NULL
) {
400 fprintf(stderr
,"%s: PR_NewMonitor failed\n", program_name
);
405 sp
->iod
.socket
= sockfd
;
406 sp
->iod
.timeout
= PR_SecondsToInterval(60);
407 sp
->datalen
= tcp_mesg_size
;
408 sp
->exit_mon
= sc_mon
;
409 sp
->job_counterp
= &job_counter
;
410 sp
->conn_counter
= 0;
412 sp
->netaddr
= netaddr
;
414 /* create and cancel a timer job */
415 jobp
= PR_QueueJob_Timer(tp
, PR_MillisecondsToInterval(5000),
416 print_stats
, sp
, PR_FALSE
);
417 PR_ASSERT(NULL
!= jobp
);
418 rval
= PR_CancelJob(jobp
);
419 PR_ASSERT(PR_SUCCESS
== rval
);
421 DPRINTF(("TCP_Server: Accepting connections \n"));
423 jobp
= PR_QueueJob_Accept(tp
, &sp
->iod
, TCP_Server_Accept
, sp
,
425 PR_ASSERT(NULL
!= jobp
);
430 TCP_Server_Accept(void *arg
)
432 Server_Param
*sp
= (Server_Param
*) arg
;
433 PRThreadPool
*tp
= sp
->tp
;
434 Serve_Client_Param
*scp
;
435 PRFileDesc
*newsockfd
;
438 if ((newsockfd
= PR_Accept(sp
->iod
.socket
, &sp
->netaddr
,
439 PR_INTERVAL_NO_TIMEOUT
)) == NULL
) {
440 fprintf(stderr
,"%s: ERROR - PR_Accept failed\n", program_name
);
444 scp
= PR_NEW(Serve_Client_Param
);
446 fprintf(stderr
,"%s: PR_NEW failed\n", program_name
);
452 * Start a Serve_Client job for each incoming connection
454 scp
->iod
.socket
= newsockfd
;
455 scp
->iod
.timeout
= PR_SecondsToInterval(60);
456 scp
->datalen
= tcp_mesg_size
;
457 scp
->exit_mon
= sp
->exit_mon
;
458 scp
->job_counterp
= sp
->job_counterp
;
461 PR_EnterMonitor(sp
->exit_mon
);
462 (*sp
->job_counterp
)++;
463 PR_ExitMonitor(sp
->exit_mon
);
464 jobp
= PR_QueueJob(tp
, Serve_Client
, scp
,
467 PR_ASSERT(NULL
!= jobp
);
468 DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp
));
471 * single-threaded update; no lock needed
474 if (sp
->conn_counter
<
475 (num_tcp_clients
* num_tcp_connections_per_client
)) {
476 jobp
= PR_QueueJob_Accept(tp
, &sp
->iod
, TCP_Server_Accept
, sp
,
478 PR_ASSERT(NULL
!= jobp
);
481 jobp
= PR_QueueJob_Timer(tp
, PR_MillisecondsToInterval(500),
482 print_stats
, sp
, PR_FALSE
);
484 PR_ASSERT(NULL
!= jobp
);
485 DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp
));
488 PR_EnterMonitor(sp
->exit_mon
);
489 /* Wait for server jobs to finish */
490 while (0 != *sp
->job_counterp
) {
491 PR_Wait(sp
->exit_mon
, PR_INTERVAL_NO_TIMEOUT
);
492 DPRINTF(("TCP_Server: conn_counter = %d\n",
496 PR_ExitMonitor(sp
->exit_mon
);
497 if (sp
->iod
.socket
) {
498 PR_Close(sp
->iod
.socket
);
500 PR_DestroyMonitor(sp
->exit_mon
);
501 printf("%30s","TCP_Socket_Client_Server_Test:");
502 printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
503 num_tcp_clients
, num_tcp_connections_per_client
);
504 printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
505 num_tcp_mesgs_per_connection
, tcp_mesg_size
);
507 DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name
));
508 PR_ShutdownThreadPool(sp
->tp
);
512 /************************************************************************/
514 #define DEFAULT_INITIAL_THREADS 4
515 #define DEFAULT_MAX_THREADS 100
516 #define DEFAULT_STACKSIZE (512 * 1024)
518 int main(int argc
, char **argv
)
520 PRInt32 initial_threads
= DEFAULT_INITIAL_THREADS
;
521 PRInt32 max_threads
= DEFAULT_MAX_THREADS
;
522 PRInt32 stacksize
= DEFAULT_STACKSIZE
;
523 PRThreadPool
*tp
= NULL
;
533 program_name
= argv
[0];
534 opt
= PL_CreateOptState(argc
, argv
, "d");
535 while (PL_OPT_EOL
!= (os
= PL_GetNextOpt(opt
)))
537 if (PL_OPT_BAD
== os
) {
542 case 'd': /* debug mode */
549 PL_DestroyOptState(opt
);
551 PR_Init(PR_USER_THREAD
, PR_PRIORITY_NORMAL
, 0);
554 PR_SetConcurrency(4);
556 tp
= PR_CreateThreadPool(initial_threads
, max_threads
, stacksize
);
558 printf("PR_CreateThreadPool failed\n");
562 jobp
= PR_QueueJob(tp
, TCP_Server
, tp
, PR_TRUE
);
563 rv
= PR_JoinJob(jobp
);
564 PR_ASSERT(PR_SUCCESS
== rv
);
566 DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name
));
567 rv
= PR_JoinThreadPool(tp
);
568 PR_ASSERT(PR_SUCCESS
== rv
);
569 DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name
));
573 if (failed_already
) {