Made sleep_on usable from all requests.
[wine/multimedia.git] / server / thread.c
blob39cd5cfd76a91811a3c6c34e78e34ea0af4585e6
1 /*
2 * Server-side thread management
4 * Copyright (C) 1998 Alexandre Julliard
5 */
7 #include "config.h"
9 #include <assert.h>
10 #include <fcntl.h>
11 #include <signal.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #ifdef HAVE_SYS_MMAN_H
16 #include <sys/mman.h>
17 #endif
18 #include <sys/types.h>
19 #include <sys/uio.h>
20 #include <unistd.h>
21 #include <stdarg.h>
24 #include "winbase.h"
25 #include "winerror.h"
27 #include "handle.h"
28 #include "process.h"
29 #include "thread.h"
30 #include "request.h"
33 /* thread queues */
35 struct wait_queue_entry
37 struct wait_queue_entry *next;
38 struct wait_queue_entry *prev;
39 struct object *obj;
40 struct thread *thread;
43 struct thread_wait
45 int count; /* count of objects */
46 int flags;
47 struct timeval timeout;
48 struct timeout_user *user;
49 sleep_reply reply; /* function to build the reply */
50 struct wait_queue_entry queues[1];
53 /* asynchronous procedure calls */
55 struct thread_apc
57 void *func; /* function to call in client */
58 void *param; /* function param */
60 #define MAX_THREAD_APC 16 /* Max outstanding APCs for a thread */
63 /* thread operations */
65 static void dump_thread( struct object *obj, int verbose );
66 static int thread_signaled( struct object *obj, struct thread *thread );
67 extern void thread_poll_event( struct object *obj, int event );
68 static void destroy_thread( struct object *obj );
70 static const struct object_ops thread_ops =
72 sizeof(struct thread), /* size */
73 dump_thread, /* dump */
74 add_queue, /* add_queue */
75 remove_queue, /* remove_queue */
76 thread_signaled, /* signaled */
77 no_satisfied, /* satisfied */
78 NULL, /* get_poll_events */
79 thread_poll_event, /* poll_event */
80 no_read_fd, /* get_read_fd */
81 no_write_fd, /* get_write_fd */
82 no_flush, /* flush */
83 no_get_file_info, /* get_file_info */
84 destroy_thread /* destroy */
87 static struct thread *first_thread;
89 /* allocate the buffer for the communication with the client */
90 static int alloc_client_buffer( struct thread *thread )
92 int fd;
94 if ((fd = create_anonymous_file()) == -1) return -1;
95 if (ftruncate( fd, MAX_REQUEST_LENGTH ) == -1) goto error;
96 if ((thread->buffer = mmap( 0, MAX_REQUEST_LENGTH, PROT_READ | PROT_WRITE,
97 MAP_SHARED, fd, 0 )) == (void*)-1) goto error;
98 return fd;
100 error:
101 file_set_error();
102 if (fd != -1) close( fd );
103 return -1;
106 /* create a new thread */
107 static struct thread *create_thread( int fd, struct process *process, int suspend )
109 struct thread *thread;
110 int buf_fd;
112 int flags = fcntl( fd, F_GETFL, 0 );
113 fcntl( fd, F_SETFL, flags | O_NONBLOCK );
115 if (!(thread = alloc_object( &thread_ops, fd ))) return NULL;
117 thread->unix_pid = 0; /* not known yet */
118 thread->teb = NULL;
119 thread->mutex = NULL;
120 thread->debug_ctx = NULL;
121 thread->wait = NULL;
122 thread->apc = NULL;
123 thread->apc_count = 0;
124 thread->error = 0;
125 thread->pass_fd = -1;
126 thread->state = RUNNING;
127 thread->attached = 0;
128 thread->exit_code = 0x103; /* STILL_ACTIVE */
129 thread->next = NULL;
130 thread->prev = NULL;
131 thread->priority = THREAD_PRIORITY_NORMAL;
132 thread->affinity = 1;
133 thread->suspend = (suspend != 0);
134 thread->buffer = (void *)-1;
135 thread->last_req = REQ_GET_THREAD_BUFFER;
137 if (!first_thread) /* creating the first thread */
139 current = thread;
140 thread->process = process = create_initial_process();
141 assert( process );
143 else thread->process = (struct process *)grab_object( process );
145 if ((thread->next = first_thread) != NULL) thread->next->prev = thread;
146 first_thread = thread;
147 add_process_thread( process, thread );
149 if ((buf_fd = alloc_client_buffer( thread )) == -1) goto error;
151 set_select_events( &thread->obj, POLLIN ); /* start listening to events */
152 set_reply_fd( thread, buf_fd ); /* send the fd to the client */
153 send_reply( thread );
154 return thread;
156 error:
157 remove_process_thread( process, thread );
158 release_object( thread );
159 return NULL;
162 /* create the initial thread and start the main server loop */
163 void create_initial_thread( int fd )
165 create_thread( fd, NULL, 0 );
166 select_loop();
169 /* handle a client event */
170 void thread_poll_event( struct object *obj, int event )
172 struct thread *thread = (struct thread *)obj;
173 assert( obj->ops == &thread_ops );
175 if (event & (POLLERR | POLLHUP)) kill_thread( thread, BROKEN_PIPE );
176 else
178 if (event & POLLOUT) write_request( thread );
179 if (event & POLLIN) read_request( thread );
183 /* destroy a thread when its refcount is 0 */
184 static void destroy_thread( struct object *obj )
186 struct thread *thread = (struct thread *)obj;
187 assert( obj->ops == &thread_ops );
189 assert( !thread->debug_ctx ); /* cannot still be debugging something */
190 release_object( thread->process );
191 if (thread->next) thread->next->prev = thread->prev;
192 if (thread->prev) thread->prev->next = thread->next;
193 else first_thread = thread->next;
194 if (thread->apc) free( thread->apc );
195 if (thread->buffer != (void *)-1) munmap( thread->buffer, MAX_REQUEST_LENGTH );
196 if (thread->pass_fd != -1) close( thread->pass_fd );
199 /* dump a thread on stdout for debugging purposes */
200 static void dump_thread( struct object *obj, int verbose )
202 struct thread *thread = (struct thread *)obj;
203 assert( obj->ops == &thread_ops );
205 fprintf( stderr, "Thread pid=%d teb=%p state=%d\n",
206 thread->unix_pid, thread->teb, thread->state );
209 static int thread_signaled( struct object *obj, struct thread *thread )
211 struct thread *mythread = (struct thread *)obj;
212 return (mythread->state == TERMINATED);
215 /* get a thread pointer from a thread id (and increment the refcount) */
216 struct thread *get_thread_from_id( void *id )
218 struct thread *t = first_thread;
219 while (t && (t != id)) t = t->next;
220 if (t) grab_object( t );
221 return t;
224 /* get a thread from a handle (and increment the refcount) */
225 struct thread *get_thread_from_handle( int handle, unsigned int access )
227 return (struct thread *)get_handle_obj( current->process, handle,
228 access, &thread_ops );
231 /* find a thread from a Unix pid */
232 struct thread *get_thread_from_pid( int pid )
234 struct thread *t = first_thread;
235 while (t && (t->unix_pid != pid)) t = t->next;
236 return t;
239 /* set all information about a thread */
240 static void set_thread_info( struct thread *thread,
241 struct set_thread_info_request *req )
243 if (req->mask & SET_THREAD_INFO_PRIORITY)
244 thread->priority = req->priority;
245 if (req->mask & SET_THREAD_INFO_AFFINITY)
247 if (req->affinity != 1) set_error( ERROR_INVALID_PARAMETER );
248 else thread->affinity = req->affinity;
252 /* suspend a thread */
253 int suspend_thread( struct thread *thread, int check_limit )
255 int old_count = thread->suspend;
256 if (thread->suspend < MAXIMUM_SUSPEND_COUNT || !check_limit)
258 if (!(thread->process->suspend + thread->suspend++)) stop_thread( thread );
260 else set_error( ERROR_SIGNAL_REFUSED );
261 return old_count;
264 /* resume a thread */
265 int resume_thread( struct thread *thread )
267 int old_count = thread->suspend;
268 if (thread->suspend > 0)
270 if (!(--thread->suspend + thread->process->suspend)) continue_thread( thread );
272 return old_count;
275 /* suspend all threads but the current */
276 void suspend_all_threads( void )
278 struct thread *thread;
279 for ( thread = first_thread; thread; thread = thread->next )
280 if ( thread != current )
281 suspend_thread( thread, 0 );
284 /* resume all threads but the current */
285 void resume_all_threads( void )
287 struct thread *thread;
288 for ( thread = first_thread; thread; thread = thread->next )
289 if ( thread != current )
290 resume_thread( thread );
293 /* add a thread to an object wait queue; return 1 if OK, 0 on error */
294 int add_queue( struct object *obj, struct wait_queue_entry *entry )
296 grab_object( obj );
297 entry->obj = obj;
298 entry->prev = obj->tail;
299 entry->next = NULL;
300 if (obj->tail) obj->tail->next = entry;
301 else obj->head = entry;
302 obj->tail = entry;
303 return 1;
306 /* remove a thread from an object wait queue */
307 void remove_queue( struct object *obj, struct wait_queue_entry *entry )
309 if (entry->next) entry->next->prev = entry->prev;
310 else obj->tail = entry->prev;
311 if (entry->prev) entry->prev->next = entry->next;
312 else obj->head = entry->next;
313 release_object( obj );
316 /* finish waiting */
317 static void end_wait( struct thread *thread )
319 struct thread_wait *wait = thread->wait;
320 struct wait_queue_entry *entry;
321 int i;
323 assert( wait );
324 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
325 entry->obj->ops->remove_queue( entry->obj, entry );
326 if (wait->user) remove_timeout_user( wait->user );
327 free( wait );
328 thread->wait = NULL;
331 /* build the thread wait structure */
332 static int wait_on( int count, struct object *objects[], int flags,
333 int timeout, sleep_reply func )
335 struct thread_wait *wait;
336 struct wait_queue_entry *entry;
337 int i;
339 if (!(wait = mem_alloc( sizeof(*wait) + (count-1) * sizeof(*entry) ))) return 0;
340 current->wait = wait;
341 wait->count = count;
342 wait->flags = flags;
343 wait->user = NULL;
344 wait->reply = func;
345 if (flags & SELECT_TIMEOUT)
347 gettimeofday( &wait->timeout, 0 );
348 add_timeout( &wait->timeout, timeout );
351 for (i = 0, entry = wait->queues; i < count; i++, entry++)
353 struct object *obj = objects[i];
354 entry->thread = current;
355 if (!obj->ops->add_queue( obj, entry ))
357 wait->count = i;
358 end_wait( current );
359 return 0;
362 return 1;
365 /* check if the thread waiting condition is satisfied */
366 static int check_wait( struct thread *thread, struct object **object )
368 int i, signaled;
369 struct thread_wait *wait = thread->wait;
370 struct wait_queue_entry *entry = wait->queues;
372 assert( wait );
373 *object = NULL;
374 if (wait->flags & SELECT_ALL)
376 int not_ok = 0;
377 /* Note: we must check them all anyway, as some objects may
378 * want to do something when signaled, even if others are not */
379 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
380 not_ok |= !entry->obj->ops->signaled( entry->obj, thread );
381 if (not_ok) goto other_checks;
382 /* Wait satisfied: tell it to all objects */
383 signaled = 0;
384 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
385 if (entry->obj->ops->satisfied( entry->obj, thread ))
386 signaled = STATUS_ABANDONED_WAIT_0;
387 return signaled;
389 else
391 for (i = 0, entry = wait->queues; i < wait->count; i++, entry++)
393 if (!entry->obj->ops->signaled( entry->obj, thread )) continue;
394 /* Wait satisfied: tell it to the object */
395 signaled = i;
396 *object = entry->obj;
397 if (entry->obj->ops->satisfied( entry->obj, thread ))
398 signaled = i + STATUS_ABANDONED_WAIT_0;
399 return signaled;
403 other_checks:
404 if ((wait->flags & SELECT_ALERTABLE) && thread->apc) return STATUS_USER_APC;
405 if (wait->flags & SELECT_TIMEOUT)
407 struct timeval now;
408 gettimeofday( &now, NULL );
409 if (!time_before( &now, &wait->timeout )) return STATUS_TIMEOUT;
411 return -1;
414 /* build a reply to the select request */
415 static void build_select_reply( struct thread *thread, struct object *obj, int signaled )
417 struct select_request *req = get_req_ptr( thread );
418 req->signaled = signaled;
421 /* attempt to wake up a thread */
422 /* return 1 if OK, 0 if the wait condition is still not satisfied */
423 static int wake_thread( struct thread *thread )
425 int signaled;
426 struct object *object;
427 if ((signaled = check_wait( thread, &object )) == -1) return 0;
428 thread->error = 0;
429 thread->wait->reply( thread, object, signaled );
430 end_wait( thread );
431 return 1;
434 /* thread wait timeout */
435 static void thread_timeout( void *ptr )
437 struct thread *thread = ptr;
438 if (debug_level) fprintf( stderr, "%08x: *timeout*\n", (unsigned int)thread );
439 assert( thread->wait );
440 thread->error = 0;
441 thread->wait->user = NULL;
442 thread->wait->reply( thread, NULL, STATUS_TIMEOUT );
443 end_wait( thread );
444 send_reply( thread );
447 /* sleep on a list of objects */
448 int sleep_on( int count, struct object *objects[], int flags, int timeout, sleep_reply func )
450 assert( !current->wait );
451 if (!wait_on( count, objects, flags, timeout, func )) return 0;
452 if (wake_thread( current )) return 1;
453 /* now we need to wait */
454 if (flags & SELECT_TIMEOUT)
456 if (!(current->wait->user = add_timeout_user( &current->wait->timeout,
457 thread_timeout, current )))
459 end_wait( current );
460 return 0;
463 return 1;
466 /* select on a list of handles */
467 static int select_on( int count, int *handles, int flags, int timeout )
469 int ret = 0;
470 int i;
471 struct object *objects[MAXIMUM_WAIT_OBJECTS];
473 if ((count < 0) || (count > MAXIMUM_WAIT_OBJECTS))
475 set_error( ERROR_INVALID_PARAMETER );
476 return 0;
478 for (i = 0; i < count; i++)
480 if (!(objects[i] = get_handle_obj( current->process, handles[i], SYNCHRONIZE, NULL )))
481 break;
483 if (i == count) ret = sleep_on( count, objects, flags, timeout, build_select_reply );
484 while (--i >= 0) release_object( objects[i] );
485 return ret;
488 /* attempt to wake threads sleeping on the object wait queue */
489 void wake_up( struct object *obj, int max )
491 struct wait_queue_entry *entry = obj->head;
493 while (entry)
495 struct thread *thread = entry->thread;
496 entry = entry->next;
497 if (wake_thread( thread ))
499 send_reply( thread );
500 if (max && !--max) break;
505 /* queue an async procedure call */
506 static int thread_queue_apc( struct thread *thread, void *func, void *param )
508 struct thread_apc *apc;
509 if (!thread->apc)
511 if (!(thread->apc = mem_alloc( MAX_THREAD_APC * sizeof(*apc) )))
512 return 0;
513 thread->apc_count = 0;
515 else if (thread->apc_count >= MAX_THREAD_APC) return 0;
516 thread->apc[thread->apc_count].func = func;
517 thread->apc[thread->apc_count].param = param;
518 thread->apc_count++;
519 if (thread->wait)
521 if (wake_thread( thread )) send_reply( thread );
523 return 1;
526 /* kill a thread on the spot */
527 void kill_thread( struct thread *thread, int exit_code )
529 if (thread->state == TERMINATED) return; /* already killed */
530 thread->state = TERMINATED;
531 thread->exit_code = exit_code;
532 if (current == thread) current = NULL;
533 if (debug_level) trace_kill( thread );
534 if (thread->wait) end_wait( thread );
535 debug_exit_thread( thread, exit_code );
536 abandon_mutexes( thread );
537 remove_process_thread( thread->process, thread );
538 wake_up( &thread->obj, 0 );
539 detach_thread( thread );
540 remove_select_user( &thread->obj );
541 release_object( thread );
544 /* create a new thread */
545 DECL_HANDLER(new_thread)
547 struct thread *thread;
548 struct process *process;
550 if ((process = get_process_from_id( req->pid )))
552 if ((fd = dup(fd)) != -1)
554 if ((thread = create_thread( fd, process, req->suspend )))
556 req->tid = thread;
557 if ((req->handle = alloc_handle( current->process, thread,
558 THREAD_ALL_ACCESS, req->inherit )) == -1)
559 release_object( thread );
560 /* else will be released when the thread gets killed */
562 else close( fd );
564 else file_set_error();
565 release_object( process );
569 /* retrieve the thread buffer file descriptor */
570 DECL_HANDLER(get_thread_buffer)
572 fatal_protocol_error( current, "get_thread_buffer: should never get called directly\n" );
575 /* initialize a new thread */
576 DECL_HANDLER(init_thread)
578 if (current->unix_pid)
580 fatal_protocol_error( current, "init_thread: already running\n" );
581 return;
583 current->unix_pid = req->unix_pid;
584 current->teb = req->teb;
585 if (current->suspend + current->process->suspend > 0) stop_thread( current );
586 req->pid = current->process;
587 req->tid = current;
590 /* terminate a thread */
591 DECL_HANDLER(terminate_thread)
593 struct thread *thread;
595 if ((thread = get_thread_from_handle( req->handle, THREAD_TERMINATE )))
597 kill_thread( thread, req->exit_code );
598 release_object( thread );
602 /* fetch information about a thread */
603 DECL_HANDLER(get_thread_info)
605 struct thread *thread;
607 if ((thread = get_thread_from_handle( req->handle, THREAD_QUERY_INFORMATION )))
609 req->tid = thread;
610 req->exit_code = thread->exit_code;
611 req->priority = thread->priority;
612 release_object( thread );
616 /* set information about a thread */
617 DECL_HANDLER(set_thread_info)
619 struct thread *thread;
621 if ((thread = get_thread_from_handle( req->handle, THREAD_SET_INFORMATION )))
623 set_thread_info( thread, req );
624 release_object( thread );
628 /* suspend a thread */
629 DECL_HANDLER(suspend_thread)
631 struct thread *thread;
633 if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
635 req->count = suspend_thread( thread, 1 );
636 release_object( thread );
640 /* resume a thread */
641 DECL_HANDLER(resume_thread)
643 struct thread *thread;
645 if ((thread = get_thread_from_handle( req->handle, THREAD_SUSPEND_RESUME )))
647 req->count = resume_thread( thread );
648 release_object( thread );
652 /* select on a handle list */
653 DECL_HANDLER(select)
655 if (!select_on( req->count, req->handles, req->flags, req->timeout ))
656 req->signaled = -1;
659 /* queue an APC for a thread */
660 DECL_HANDLER(queue_apc)
662 struct thread *thread;
663 if ((thread = get_thread_from_handle( req->handle, THREAD_SET_CONTEXT )))
665 thread_queue_apc( thread, req->func, req->param );
666 release_object( thread );
670 /* get list of APC to call */
671 DECL_HANDLER(get_apcs)
673 if ((req->count = current->apc_count))
675 memcpy( req->apcs, current->apc, current->apc_count * sizeof(*current->apc) );
676 free( current->apc );
677 current->apc = NULL;
678 current->apc_count = 0;