1 /* Asynchroneous thread multiplexer with a parallel use of the REXXSAA API.
2 * This module is under construction.
13 #define INCL_RXSYSEXIT
17 # include <sys/time.h>
23 /* This picky compiler claims about unused formal parameters.
24 * This is correct but hides (for human eyes) other errors since they
25 * are many and we can't reduce them all.
26 * Error 4100 is "unused formal parameter".
27 * Error 4115 is generated in the rameter".
29 # pragma warning(disable:4100)
33 # define WIN32_LEAN_AND_MEAN
40 # pragma warning(default:4115)
45 #define MAX_THREADS 6 /* parallel starting threads, 20 is a good maximum */
46 #define MAX_RUN (sizeof(unsigned) * CHAR_BIT)
47 #define TOTAL_THREADS 2000 /* total count of threads */
50 #define ThreadIndexType pthread_t
51 #define my_threadidx() pthread_self()
52 #define my_threadid() pthread_self()
53 #define THREAD_RETURN void *
54 #define THREAD_CONVENTION
55 static pthread_t thread
[MAX_THREADS
];
59 #define ThreadIndexType DWORD
60 #define my_threadidx() GetCurrentThreadId()
61 #define my_threadid() GetCurrentThread()
62 #define THREAD_RETURN unsigned
63 #define THREAD_CONVENTION __stdcall
64 static HANDLE thread
[MAX_THREADS
];
67 static ThreadIndexType threadx
[MAX_THREADS
];
68 static enum {Ready
= 0,Running
,Stopped
} State
[MAX_THREADS
];
69 static unsigned found
[MAX_THREADS
]; /* which lines we have seen (bitwise). */
70 static int GlobalError
= 0; /* int should be threadsafe by default */
71 static int stdout_is_tty
= 0;
72 static enum {UnUsed
= 0, FirstRun
, DoInstore
} UseInstore
= UnUsed
;
73 static void *InstoreBuf
= NULL
;
74 static unsigned InstoreLen
;
76 static void ThreadHasStopped(unsigned position
);
78 /* We redirect the output. This is the redirection handler. */
79 LONG APIENTRY
instore_exit( LONG ExNum
, LONG Subfun
, PEXIT PBlock
)
81 RXSIOSAY_PARM
*psiosay
;
82 char buf
[256]; /* enough to hold the data */
84 unsigned len
,tid
,loop
;
87 if ((ExNum
!= RXSIO
) || (Subfun
!= RXSIOSAY
)) /* unexpected? */
88 return RXEXIT_NOT_HANDLED
;
91 return RXEXIT_HANDLED
;
93 psiosay
= (RXSIOSAY_PARM
*)PBlock
;
94 rx
= psiosay
->rxsio_string
;
95 if (!RXVALIDSTRING(rx
))
98 "Thread %lu gives an invalid string for a SAY output.\n",
99 (unsigned long) my_threadidx());
101 return RXEXIT_HANDLED
;
105 if (len
>= sizeof(buf
))
106 len
= sizeof(buf
) - 1;
107 memcpy(buf
,RXSTRPTR(rx
),len
);
108 buf
[len
] = '\0'; /* We have a sscanf-able string */
110 rc
= sscanf(buf
,"Loop %u in thread %u",&loop
,&tid
);
114 "Thread %lu gives an unexpected SAY output \"%s\".\n",
115 (unsigned long) my_threadidx(),buf
);
117 return RXEXIT_HANDLED
;
120 if ((ThreadIndexType
) tid
!= my_threadidx())
123 "Thread %lu claims an incorrect thread identifier %lu.\n",
124 (unsigned long) my_threadidx(),
125 (unsigned long) tid
);
127 return RXEXIT_HANDLED
;
130 for (idx
= 0;idx
< MAX_THREADS
;idx
++)
131 if (threadx
[idx
] == (ThreadIndexType
) tid
)
134 if (idx
>= MAX_THREADS
)
137 "Thread %lu can't be found in the thread table.\n",
138 (unsigned long) my_threadidx());
140 return RXEXIT_HANDLED
;
143 /* Check the loop number */
145 if ((loop
< 1) || (loop
> MAX_RUN
))
149 loop
= 1 << (loop
- 1); /* Bitmask for the loop */
150 if (found
[idx
] & loop
)
157 "Thread %lu's loop doesn't run continuously.\n",
158 (unsigned long) my_threadidx());
162 return RXEXIT_HANDLED
;
165 THREAD_RETURN THREAD_CONVENTION
instore( void *data
)
167 RXSTRING Instore
[2] ;
169 char instore_buf
[256];
172 threadx
[(unsigned) data
] = my_threadidx();
173 RexxRegisterExitExe( "ExitHandler",
174 #ifdef RX_STRONGTYPING
181 Exits
[0].sysexit_name
= "ExitHandler" ;
182 Exits
[0].sysexit_code
= RXSIO
;
183 Exits
[1].sysexit_code
= RXENDLST
;
185 sprintf(instore_buf
,"Do i = 1 To %u;"
186 "say 'Loop' i 'in thread' gettid();"
190 Instore
[0].strptr
= instore_buf
;
191 Instore
[0].strlength
= strlen( Instore
[0].strptr
) ;
192 if (UseInstore
== DoInstore
)
194 Instore
[1].strptr
= InstoreBuf
;
195 Instore
[1].strlength
= InstoreLen
;
198 Instore
[1].strptr
= NULL
;
199 rc
= RexxStart( 0, NULL
, "Testing", Instore
, "Foo", RXCOMMAND
,
200 Exits
, NULL
, NULL
) ;
201 switch (UseInstore
) {
203 if ( Instore
[1].strptr
)
204 RexxFreeMemory( Instore
[1].strptr
) ;
209 "Not got the instore macro.\n");
213 if ( Instore
[1].strptr
)
215 InstoreBuf
= Instore
[1].strptr
;
216 InstoreLen
= Instore
[1].strlength
;
222 "Not got the instore macro.\n");
226 /* I don't know if the standard allows a success and a return value
227 * of NULL in Instore[1]. Ignore it. It will be detected later.
228 * True application should check the return code of RexxStart().
232 RexxDeregisterExit("ExitHandler",NULL
);
233 ThreadHasStopped((unsigned) data
);
234 #ifndef THREAD_RETURN_VOID
235 return((THREAD_RETURN
) 0);
239 void reap(unsigned position
)
241 if (found
[position
] != ~((unsigned) 0))
244 "Thread %lu has stopped without completing its loop.\n",
252 int start_a_thread( unsigned position
)
257 State
[position
] = Running
;
258 /* Avoid some race conditions. I don't know if this is a problem of the
259 * runtime system or the kernel. If the systems runs into severe swapping
260 * the threads seems to run before the thread id is known which is used
261 * in instore_exit. We suspend the thread until all details of the new
262 * thread are known before we continue. This gives a little bit worser
265 thread
[position
] = (HANDLE
) _beginthreadex( NULL
,
271 rc
= (long) thread
[position
] != 0l;
275 "Error starting thread, error code is %ld\n",
278 ThreadHasStopped(position
);
280 ResumeThread(thread
[position
]);
284 static void ThreadHasStopped(unsigned position
)
286 State
[position
] = Stopped
;
289 void wait_for_threads( void )
291 unsigned i
,j
,done
,running
;
293 HANDLE compressed
[MAX_THREADS
];
294 unsigned BackSort
[MAX_THREADS
];
296 running
= done
= MAX_THREADS
;
298 printf("%u\r",MAX_THREADS
); /* already started */
304 for (i
= 0,j
= 0;i
< MAX_THREADS
;i
++)
305 if (State
[i
] != Ready
)
307 compressed
[j
] = thread
[i
];
311 rc
= WaitForMultipleObjects(running
, compressed
, FALSE
, 3000);
312 if (rc
== 0xFFFFFFFF)
313 { /* Failure or dead thread, look for a stopped one */
314 for (i
= 0;i
< running
;i
++)
315 if (State
[BackSort
[i
]] == Stopped
)
316 rc
= WAIT_OBJECT_0
+ i
;
318 if ((rc
< WAIT_OBJECT_0
) || (rc
>= running
+ WAIT_OBJECT_0
))
321 "At least one thread won't finish normally within 3 seconds (rc=%u,error=%lu).\n",
327 i
= BackSort
[rc
- WAIT_OBJECT_0
];
328 if (State
[i
] != Stopped
)
331 "Thread %u hasn't finished normally.\n");
335 CloseHandle(thread
[i
]);
339 if (done
< TOTAL_THREADS
)
341 if (!start_a_thread(i
))
347 printf("%u(%u)\r",done
,running
);
348 if (GlobalError
|| !running
)
355 static pthread_mutex_t thread_lock
= PTHREAD_MUTEX_INITIALIZER
;
356 static pthread_cond_t something_stopped
= PTHREAD_COND_INITIALIZER
;
358 int start_a_thread( unsigned position
)
362 State
[position
] = Running
;
363 rc
= pthread_create( &thread
[position
], NULL
, instore
, (void *) position
);
367 "Error starting thread, error code is %d\n",
370 ThreadHasStopped(position
);
375 static void ThreadHasStopped(unsigned position
)
377 pthread_mutex_lock(&thread_lock
);
378 State
[position
] = Stopped
;
379 pthread_cond_signal(&something_stopped
);
380 pthread_mutex_unlock(&thread_lock
);
383 void wait_for_threads( void )
385 unsigned i
,done
,running
;
388 struct timespec timeout
;
390 running
= done
= MAX_THREADS
;
392 printf("%u\r",MAX_THREADS
); /* already started */
397 pthread_mutex_lock(&thread_lock
);
400 gettimeofday(&now
,NULL
);
401 timeout
.tv_sec
= now
.tv_sec
+ 3;
402 timeout
.tv_nsec
= now
.tv_usec
* 1000;
403 rc
= pthread_cond_timedwait(&something_stopped
,&thread_lock
,&timeout
);
407 "At least one thread won't finish within 3 seconds.\n");
412 for ( i
= 0; i
< MAX_THREADS
; i
++ )
414 if (State
[i
] != Stopped
)
418 rc
= pthread_join( thread
[i
], NULL
);
422 "A thread can't be found in the internal table.\n");
427 if (done
< TOTAL_THREADS
)
429 if (!start_a_thread(i
))
436 printf("%u(%u)\r",done
,running
);
437 if (GlobalError
|| !running
)
441 pthread_mutex_unlock(&thread_lock
);
445 static void usage(void)
447 printf("usage: threader [-p]\n"
450 "-p\tLoad the macro only once and then use the generated instore\n"
451 "\tmacro. Default: Always load the macro new.\n"
453 "threader uses a macro which generated lines with numbers which can\n"
454 "be parsed to detect problems in the multi-threading implementation.\n"
455 "A loop counter runs until %u. The test should run within a few\n"
456 "seconds until a few minutes. You should hit ^C to abort the program\n"
457 "if you think your harddisk is working heavy.\n"
459 "This program is for testing purpose only.\n"
464 int main( int argc
, char *argv
[] )
471 if (isatty(fileno(stdout
)))
478 if (strcmp(argv
[1],"-p") == 0)
479 UseInstore
= FirstRun
;
484 memset(found
,0,sizeof(found
));
485 memset(State
,0,sizeof(State
));
486 /* We want to see surprisings at once: */
487 setvbuf(stdout
,NULL
,_IONBF
,0);
488 setvbuf(stderr
,NULL
,_IONBF
,0);
489 printf( "Regina Rexx Thread Tester\n" );
490 printf( "-------------------------\n" );
492 version
.strlength
= 0;
493 version
.strptr
= NULL
;
494 /* This might not work if we check another Rexx: */
495 versioncode
= ReginaVersion(&version
);
496 printf("Regina's version is %lu.%02lu",
501 printf(" (in complete \"%s\")",version
.strptr
);
502 RexxFreeMemory(version
.strptr
);
509 thread
[0] = my_threadid();
510 threadx
[0] = my_threadidx();
514 UseInstore
= DoInstore
;
519 "You should see a loop counter which stops at %u.\n\n",
523 for ( i
= 0; i
< MAX_THREADS
; i
++ )
524 if (!start_a_thread(i
) || GlobalError
)
533 "An error encountered. Do you use the right shared "
539 "Thread tester passed without an error.\n"
540 "About %u seconds used for %u cyles.\n",
541 (unsigned) (time(NULL
) - start
),TOTAL_THREADS
);
543 printf("Press ENTER to continue...\n");
546 fgets(buf
,sizeof(buf
),stdin
);