2 * Server-side message queues
4 * Copyright (C) 2000 Alexandre Julliard
22 struct message_result
*send_next
; /* next in sender list */
23 struct message_result
*recv_next
; /* next in receiver list */
24 struct msg_queue
*sender
; /* sender queue */
25 struct msg_queue
*receiver
; /* receiver queue */
26 int replied
; /* has it been replied to? */
27 unsigned int result
; /* reply result */
28 unsigned int error
; /* error code to pass back to sender */
33 struct message
*next
; /* next message in list */
34 struct message
*prev
; /* prev message in list */
35 int type
; /* message type (FIXME) */
36 handle_t win
; /* window handle */
37 unsigned int msg
; /* message code */
38 unsigned int wparam
; /* parameters */
39 unsigned int lparam
; /* parameters */
40 unsigned int info
; /* extra info */
41 struct message_result
*result
; /* result in sender queue */
46 struct message
*first
; /* head of list */
47 struct message
*last
; /* tail of list */
52 struct timer
*next
; /* next timer in list */
53 struct timer
*prev
; /* prev timer in list */
54 struct timeval when
; /* next expiration */
55 unsigned int rate
; /* timer rate in ms */
56 handle_t win
; /* window handle */
57 unsigned int msg
; /* message to post */
58 unsigned int id
; /* timer id */
59 unsigned int lparam
; /* lparam for message */
64 struct object obj
; /* object header */
65 unsigned int wake_bits
; /* wakeup bits */
66 unsigned int wake_mask
; /* wakeup mask */
67 unsigned int changed_bits
; /* changed wakeup bits */
68 unsigned int changed_mask
; /* changed wakeup mask */
69 struct message_list send_list
; /* list of sent messages */
70 struct message_list post_list
; /* list of posted messages */
71 struct message_result
*send_result
; /* stack of sent messages waiting for result */
72 struct message_result
*recv_result
; /* stack of received messages waiting for result */
73 struct timer
*first_timer
; /* head of timer list */
74 struct timer
*last_timer
; /* tail of timer list */
75 struct timer
*next_timer
; /* next timer to expire */
76 struct timeout_user
*timeout
; /* timeout for next timer to expire */
79 static void msg_queue_dump( struct object
*obj
, int verbose
);
80 static int msg_queue_add_queue( struct object
*obj
, struct wait_queue_entry
*entry
);
81 static void msg_queue_remove_queue( struct object
*obj
, struct wait_queue_entry
*entry
);
82 static int msg_queue_signaled( struct object
*obj
, struct thread
*thread
);
83 static int msg_queue_satisfied( struct object
*obj
, struct thread
*thread
);
84 static void msg_queue_destroy( struct object
*obj
);
85 static void timer_callback( void *private );
87 static const struct object_ops msg_queue_ops
=
89 sizeof(struct msg_queue
), /* size */
90 msg_queue_dump
, /* dump */
91 msg_queue_add_queue
, /* add_queue */
92 msg_queue_remove_queue
, /* remove_queue */
93 msg_queue_signaled
, /* signaled */
94 msg_queue_satisfied
, /* satisfied */
95 NULL
, /* get_poll_events */
96 NULL
, /* poll_event */
97 no_get_fd
, /* get_fd */
99 no_get_file_info
, /* get_file_info */
100 msg_queue_destroy
/* destroy */
104 static struct msg_queue
*create_msg_queue( struct thread
*thread
)
106 struct msg_queue
*queue
;
108 if ((queue
= alloc_object( &msg_queue_ops
, -1 )))
110 queue
->wake_bits
= 0;
111 queue
->wake_mask
= 0;
112 queue
->changed_bits
= 0;
113 queue
->changed_mask
= 0;
114 queue
->send_list
.first
= NULL
;
115 queue
->send_list
.last
= NULL
;
116 queue
->post_list
.first
= NULL
;
117 queue
->post_list
.last
= NULL
;
118 queue
->send_result
= NULL
;
119 queue
->recv_result
= NULL
;
120 queue
->first_timer
= NULL
;
121 queue
->last_timer
= NULL
;
122 queue
->next_timer
= NULL
;
123 queue
->timeout
= NULL
;
124 thread
->queue
= queue
;
125 if (!thread
->process
->queue
)
126 thread
->process
->queue
= (struct msg_queue
*)grab_object( queue
);
131 /* check the queue status */
132 inline static int is_signaled( struct msg_queue
*queue
)
134 return ((queue
->wake_bits
& queue
->wake_mask
) || (queue
->changed_bits
& queue
->changed_mask
));
137 /* set/clear some queue bits */
138 inline static void change_queue_bits( struct msg_queue
*queue
, unsigned int set
, unsigned int clear
)
140 queue
->wake_bits
= (queue
->wake_bits
| set
) & ~clear
;
141 queue
->changed_bits
= (queue
->changed_bits
| set
) & ~clear
;
142 if (is_signaled( queue
)) wake_up( &queue
->obj
, 0 );
145 /* get the current thread queue, creating it if needed */
146 inline struct msg_queue
*get_current_queue(void)
148 struct msg_queue
*queue
= current
->queue
;
149 if (!queue
) queue
= create_msg_queue( current
);
153 /* append a message to the end of a list */
154 inline static void append_message( struct message_list
*list
, struct message
*msg
)
157 if ((msg
->prev
= list
->last
)) msg
->prev
->next
= msg
;
158 else list
->first
= msg
;
162 /* unlink a message from a list it */
163 inline static void unlink_message( struct message_list
*list
, struct message
*msg
)
165 if (msg
->next
) msg
->next
->prev
= msg
->prev
;
166 else list
->last
= msg
->prev
;
167 if (msg
->prev
) msg
->prev
->next
= msg
->next
;
168 else list
->first
= msg
->next
;
171 /* free a message when deleting a queue or window */
172 static void free_message( struct message
*msg
)
174 struct message_result
*result
= msg
->result
;
180 result
->error
= STATUS_ACCESS_DENIED
; /* FIXME */
182 result
->receiver
= NULL
;
183 /* wake sender queue if waiting on this result */
184 if (result
->sender
->send_result
== result
)
185 change_queue_bits( result
->sender
, QS_SMRESULT
, 0 );
192 /* remove (and free) a message from the sent messages list */
193 static void remove_sent_message( struct msg_queue
*queue
, struct message
*msg
)
195 unlink_message( &queue
->send_list
, msg
);
197 if (!queue
->send_list
.first
) change_queue_bits( queue
, 0, QS_SENDMESSAGE
);
200 /* remove (and free) a message from the posted messages list */
201 static void remove_posted_message( struct msg_queue
*queue
, struct message
*msg
)
203 unlink_message( &queue
->post_list
, msg
);
205 if (!queue
->post_list
.first
) change_queue_bits( queue
, 0, QS_POSTMESSAGE
);
208 /* send a message from the sender queue to the receiver queue */
209 static int send_message( struct msg_queue
*send_queue
, struct msg_queue
*recv_queue
,
210 struct message
*msg
)
212 struct message_result
*result
= mem_alloc( sizeof(*result
) );
213 if (!result
) return 0;
215 /* put the result on the sender result stack */
216 result
->sender
= send_queue
;
217 result
->receiver
= recv_queue
;
219 result
->send_next
= send_queue
->send_result
;
220 send_queue
->send_result
= result
;
222 /* and put the message on the receiver queue */
223 msg
->result
= result
;
224 append_message( &recv_queue
->send_list
, msg
);
225 change_queue_bits( recv_queue
, QS_SENDMESSAGE
, 0 );
229 /* receive a message, removing it from the sent queue */
230 static void receive_message( struct msg_queue
*queue
, struct message
*msg
)
232 struct message_result
*result
= msg
->result
;
234 unlink_message( &queue
->send_list
, msg
);
235 /* put the result on the receiver result stack */
236 result
->recv_next
= queue
->recv_result
;
237 queue
->recv_result
= result
;
239 if (!queue
->send_list
.first
) change_queue_bits( queue
, 0, QS_SENDMESSAGE
);
242 /* set the result of the current received message */
243 static void reply_message( struct msg_queue
*queue
, unsigned int result
,
244 unsigned int error
, int remove
)
246 struct message_result
*res
= queue
->recv_result
;
250 queue
->recv_result
= res
->recv_next
;
251 res
->receiver
= NULL
;
252 if (!res
->sender
) /* no one waiting for it */
260 res
->result
= result
;
263 /* wake sender queue if waiting on this result */
264 if (res
->sender
&& res
->sender
->send_result
== res
)
265 change_queue_bits( res
->sender
, QS_SMRESULT
, 0 );
269 /* retrieve the reply of the current message being sent */
270 static unsigned int get_message_reply( struct msg_queue
*queue
, int cancel
)
272 struct message_result
*res
= queue
->send_result
;
273 unsigned int ret
= 0;
275 set_error( STATUS_PENDING
);
277 if (res
&& (res
->replied
|| cancel
))
282 set_error( res
->error
);
284 queue
->send_result
= res
->send_next
;
286 if (!res
->receiver
) free( res
);
287 if (!queue
->send_result
|| !queue
->send_result
->replied
)
288 change_queue_bits( queue
, 0, QS_SMRESULT
);
293 /* empty a message list and free all the messages */
294 static void empty_msg_list( struct message_list
*list
)
296 struct message
*msg
= list
->first
;
299 struct message
*next
= msg
->next
;
305 /* cleanup all pending results when deleting a queue */
306 static void cleanup_results( struct msg_queue
*queue
)
308 struct message_result
*result
, *next
;
310 result
= queue
->send_result
;
313 next
= result
->send_next
;
314 result
->sender
= NULL
;
315 if (!result
->receiver
) free( result
);
319 while (queue
->recv_result
) reply_message( queue
, 0, STATUS_ACCESS_DENIED
/*FIXME*/, 1 );
322 static int msg_queue_add_queue( struct object
*obj
, struct wait_queue_entry
*entry
)
324 struct msg_queue
*queue
= (struct msg_queue
*)obj
;
325 struct process
*process
= entry
->thread
->process
;
327 /* a thread can only wait on its own queue */
328 if (entry
->thread
->queue
!= queue
)
330 set_error( STATUS_ACCESS_DENIED
);
333 /* if waiting on the main process queue, set the idle event */
334 if (process
->queue
== queue
)
336 if (process
->idle_event
) set_event( process
->idle_event
);
338 add_queue( obj
, entry
);
342 static void msg_queue_remove_queue(struct object
*obj
, struct wait_queue_entry
*entry
)
344 struct msg_queue
*queue
= (struct msg_queue
*)obj
;
345 struct process
*process
= entry
->thread
->process
;
347 remove_queue( obj
, entry
);
349 assert( entry
->thread
->queue
== queue
);
351 /* if waiting on the main process queue, reset the idle event */
352 if (process
->queue
== queue
)
354 if (process
->idle_event
) reset_event( process
->idle_event
);
358 static void msg_queue_dump( struct object
*obj
, int verbose
)
360 struct msg_queue
*queue
= (struct msg_queue
*)obj
;
361 fprintf( stderr
, "Msg queue bits=%x mask=%x\n",
362 queue
->wake_bits
, queue
->wake_mask
);
365 static int msg_queue_signaled( struct object
*obj
, struct thread
*thread
)
367 struct msg_queue
*queue
= (struct msg_queue
*)obj
;
368 return is_signaled( queue
);
371 static int msg_queue_satisfied( struct object
*obj
, struct thread
*thread
)
373 struct msg_queue
*queue
= (struct msg_queue
*)obj
;
374 queue
->wake_mask
= 0;
375 queue
->changed_mask
= 0;
376 return 0; /* Not abandoned */
379 static void msg_queue_destroy( struct object
*obj
)
381 struct msg_queue
*queue
= (struct msg_queue
*)obj
;
382 struct timer
*timer
= queue
->first_timer
;
384 cleanup_results( queue
);
385 empty_msg_list( &queue
->send_list
);
386 empty_msg_list( &queue
->post_list
);
390 struct timer
*next
= timer
->next
;
394 if (queue
->timeout
) remove_timeout_user( queue
->timeout
);
397 /* set the next timer to expire */
398 static void set_next_timer( struct msg_queue
*queue
, struct timer
*timer
)
402 remove_timeout_user( queue
->timeout
);
403 queue
->timeout
= NULL
;
405 if ((queue
->next_timer
= timer
))
406 queue
->timeout
= add_timeout_user( &timer
->when
, timer_callback
, queue
);
408 /* set/clear QS_TIMER bit */
409 if (queue
->next_timer
== queue
->first_timer
)
410 change_queue_bits( queue
, 0, QS_TIMER
);
412 change_queue_bits( queue
, QS_TIMER
, 0 );
415 /* callback for the next timer expiration */
416 static void timer_callback( void *private )
418 struct msg_queue
*queue
= private;
420 queue
->timeout
= NULL
;
421 /* move on to the next timer */
422 set_next_timer( queue
, queue
->next_timer
->next
);
425 /* link a timer at its rightful place in the queue list */
426 static void link_timer( struct msg_queue
*queue
, struct timer
*timer
)
428 struct timer
*pos
= queue
->next_timer
;
430 while (pos
&& time_before( &pos
->when
, &timer
->when
)) pos
= pos
->next
;
432 if (pos
) /* insert before pos */
434 if ((timer
->prev
= pos
->prev
)) timer
->prev
->next
= timer
;
435 else queue
->first_timer
= timer
;
439 else /* insert at end */
442 timer
->prev
= queue
->last_timer
;
443 if (queue
->last_timer
) queue
->last_timer
->next
= timer
;
444 else queue
->first_timer
= timer
;
445 queue
->last_timer
= timer
;
447 /* check if we replaced the next timer */
448 if (pos
== queue
->next_timer
) set_next_timer( queue
, timer
);
451 /* remove a timer from the queue timer list */
452 static void unlink_timer( struct msg_queue
*queue
, struct timer
*timer
)
454 if (timer
->next
) timer
->next
->prev
= timer
->prev
;
455 else queue
->last_timer
= timer
->prev
;
456 if (timer
->prev
) timer
->prev
->next
= timer
->next
;
457 else queue
->first_timer
= timer
->next
;
458 /* check if we removed the next timer */
459 if (queue
->next_timer
== timer
) set_next_timer( queue
, timer
->next
);
462 /* restart an expired timer */
463 static void restart_timer( struct msg_queue
*queue
, struct timer
*timer
)
466 unlink_timer( queue
, timer
);
467 gettimeofday( &now
, 0 );
468 while (!time_before( &now
, &timer
->when
)) add_timeout( &timer
->when
, timer
->rate
);
469 link_timer( queue
, timer
);
472 /* find an expired timer matching the filtering parameters */
473 static struct timer
*find_expired_timer( struct msg_queue
*queue
, handle_t win
,
474 unsigned int get_first
, unsigned int get_last
,
478 for (timer
= queue
->first_timer
; (timer
&& timer
!= queue
->next_timer
); timer
= timer
->next
)
480 if (win
&& timer
->win
!= win
) continue;
481 if (timer
->msg
>= get_first
&& timer
->msg
<= get_last
)
483 if (remove
) restart_timer( queue
, timer
);
491 static int kill_timer( struct msg_queue
*queue
, handle_t win
, unsigned int msg
, unsigned int id
)
495 for (timer
= queue
->first_timer
; timer
; timer
= timer
->next
)
497 if (timer
->win
!= win
|| timer
->msg
!= msg
|| timer
->id
!= id
) continue;
498 unlink_timer( queue
, timer
);
506 static struct timer
*set_timer( struct msg_queue
*queue
, unsigned int rate
)
508 struct timer
*timer
= mem_alloc( sizeof(*timer
) );
512 gettimeofday( &timer
->when
, 0 );
513 add_timeout( &timer
->when
, rate
);
514 link_timer( queue
, timer
);
519 /* remove all messages and timers belonging to a certain window */
520 static void cleanup_window( struct msg_queue
*queue
, handle_t win
)
526 timer
= queue
->first_timer
;
529 struct timer
*next
= timer
->next
;
530 if (timer
->win
== win
)
532 unlink_timer( queue
, timer
);
538 /* remove sent messages */
539 msg
= queue
->send_list
.first
;
542 struct message
*next
= msg
->next
;
543 if (msg
->win
== win
) remove_sent_message( queue
, msg
);
547 /* remove posted messages */
548 msg
= queue
->post_list
.first
;
551 struct message
*next
= msg
->next
;
552 if (msg
->win
== win
) remove_posted_message( queue
, msg
);
557 /* get the message queue of the current thread */
558 DECL_HANDLER(get_msg_queue
)
560 struct msg_queue
*queue
= get_current_queue();
563 if (queue
) req
->handle
= alloc_handle( current
->process
, queue
, SYNCHRONIZE
, 0 );
567 /* set the message queue wake bits */
568 DECL_HANDLER(set_queue_bits
)
570 struct msg_queue
*queue
= (struct msg_queue
*)get_handle_obj( current
->process
, req
->handle
,
574 req
->changed_mask
= queue
->changed_mask
;
575 if (!req
->mask_cond
|| (queue
->changed_mask
& req
->mask_cond
))
576 change_queue_bits( queue
, req
->set
, req
->clear
);
577 release_object( queue
);
582 /* set the current message queue wakeup mask */
583 DECL_HANDLER(set_queue_mask
)
585 struct msg_queue
*queue
= get_current_queue();
589 queue
->wake_mask
= req
->wake_mask
;
590 queue
->changed_mask
= req
->changed_mask
;
591 req
->wake_bits
= queue
->wake_bits
;
592 req
->changed_bits
= queue
->changed_bits
;
593 if (is_signaled( queue
))
595 /* if skip wait is set, do what would have been done in the subsequent wait */
596 if (req
->skip_wait
) msg_queue_satisfied( &queue
->obj
, current
);
597 else wake_up( &queue
->obj
, 0 );
603 /* get the current message queue status */
604 DECL_HANDLER(get_queue_status
)
606 struct msg_queue
*queue
= current
->queue
;
609 req
->wake_bits
= queue
->wake_bits
;
610 req
->changed_bits
= queue
->changed_bits
;
611 if (req
->clear
) queue
->changed_bits
= 0;
613 else req
->wake_bits
= req
->changed_bits
= 0;
617 /* send a message to a thread queue */
618 DECL_HANDLER(send_message
)
621 struct msg_queue
*send_queue
= get_current_queue();
622 struct msg_queue
*recv_queue
;
623 struct thread
*thread
= get_thread_from_id( req
->id
);
627 if (!(recv_queue
= thread
->queue
))
629 set_error( STATUS_INVALID_PARAMETER
);
630 release_object( thread
);
634 if ((msg
= mem_alloc( sizeof(*msg
) )))
636 msg
->type
= req
->type
;
639 msg
->wparam
= req
->wparam
;
640 msg
->lparam
= req
->lparam
;
641 msg
->info
= req
->info
;
643 if (!req
->posted
) send_message( send_queue
, recv_queue
, msg
);
646 append_message( &recv_queue
->post_list
, msg
);
647 change_queue_bits( recv_queue
, QS_POSTMESSAGE
, 0 );
650 release_object( thread
);
653 /* get a message from the current queue */
654 DECL_HANDLER(get_message
)
658 struct msg_queue
*queue
= get_current_queue();
662 /* first check for sent messages */
663 if ((msg
= queue
->send_list
.first
))
666 req
->type
= msg
->type
;
669 req
->wparam
= msg
->wparam
;
670 req
->lparam
= msg
->lparam
;
671 req
->info
= msg
->info
;
672 receive_message( queue
, msg
);
675 if (!req
->posted
) goto done
; /* nothing else to check */
677 /* then try a posted message */
679 for (msg
= queue
->post_list
.first
; msg
; msg
= msg
->next
)
681 /* check against the filters */
682 if (req
->get_win
&& msg
->win
!= req
->get_win
) continue;
683 if (msg
->msg
>= req
->get_first
&& msg
->msg
<= req
->get_last
)
686 req
->type
= msg
->type
;
689 req
->wparam
= msg
->wparam
;
690 req
->lparam
= msg
->lparam
;
691 req
->info
= msg
->info
;
692 if (req
->remove
) remove_posted_message( queue
, msg
);
697 /* now check for WM_PAINT */
698 if ((queue
->wake_bits
& QS_PAINT
) &&
699 (WM_PAINT
>= req
->get_first
) && (WM_PAINT
<= req
->get_last
))
710 /* now check for timer */
711 if ((timer
= find_expired_timer( queue
, req
->get_win
, req
->get_first
,
712 req
->get_last
, req
->remove
)))
715 req
->win
= timer
->win
;
716 req
->msg
= timer
->msg
;
717 req
->wparam
= timer
->id
;
718 req
->lparam
= timer
->lparam
;
724 set_error( STATUS_PENDING
); /* FIXME */
728 /* reply to a sent message */
729 DECL_HANDLER(reply_message
)
731 if (current
->queue
&& current
->queue
->recv_result
)
732 reply_message( current
->queue
, req
->result
, 0, req
->remove
);
734 set_error( STATUS_ACCESS_DENIED
);
738 /* retrieve the reply for the last message sent */
739 DECL_HANDLER(get_message_reply
)
741 if (current
->queue
) req
->result
= get_message_reply( current
->queue
, req
->cancel
);
742 else set_error( STATUS_ACCESS_DENIED
);
746 /* check if we are processing a sent message */
747 DECL_HANDLER(in_send_message
)
753 struct message_result
*result
= current
->queue
->recv_result
;
756 flags
|= ISMEX_SEND
; /* FIXME */
757 if (result
->replied
|| !result
->sender
) flags
|= ISMEX_REPLIED
;
764 /* cleanup a queue when a window is deleted */
765 DECL_HANDLER(cleanup_window_queue
)
767 if (current
->queue
) cleanup_window( current
->queue
, req
->win
);
771 /* set a window timer */
772 DECL_HANDLER(set_win_timer
)
775 struct msg_queue
*queue
= get_current_queue();
779 /* remove it if it existed already */
780 if (req
->win
) kill_timer( queue
, req
->win
, req
->msg
, req
->id
);
782 if ((timer
= set_timer( queue
, req
->rate
)))
784 timer
->win
= req
->win
;
785 timer
->msg
= req
->msg
;
787 timer
->lparam
= req
->lparam
;
791 /* kill a window timer */
792 DECL_HANDLER(kill_win_timer
)
794 struct msg_queue
*queue
= current
->queue
;
796 if (!queue
|| !kill_timer( queue
, req
->win
, req
->msg
, req
->id
))
797 set_error( STATUS_INVALID_PARAMETER
);