2 * Copyright (c) 2006 Ondrej Palkovsky
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
9 * - Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * - The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 * Asynchronous library
38 * The aim of this library is to provide a facility for writing programs which
39 * utilize the asynchronous nature of HelenOS IPC, yet using a normal way of
42 * You should be able to write very simple multithreaded programs. The async
43 * framework will automatically take care of most of the synchronization
46 * Example of use (pseudo C):
48 * 1) Multithreaded client application
50 * fibril_create(fibril1, ...);
51 * fibril_create(fibril2, ...);
54 * int fibril1(void *arg)
56 * conn = async_connect_me_to(...);
58 * exch = async_exchange_begin(conn);
59 * c1 = async_send(exch);
60 * async_exchange_end(exch);
62 * exch = async_exchange_begin(conn);
63 * c2 = async_send(exch);
64 * async_exchange_end(exch);
72 * 2) Multithreaded server application
79 * port_handler(ichandle, *icall)
82 * async_answer_0(ichandle, ELIMIT);
85 * async_answer_0(ichandle, EOK);
87 * chandle = async_get_call(&call);
88 * somehow_handle_the_call(chandle, call);
89 * async_answer_2(chandle, 1, 2, 3);
91 * chandle = async_get_call(&call);
100 #include "../private/async.h"
104 #include <ipc/event.h>
107 #include <adt/hash_table.h>
108 #include <adt/hash.h>
109 #include <adt/list.h>
112 #include <sys/time.h>
113 #include <libarch/barrier.h>
120 #include <abi/mm/as.h>
121 #include "../private/libc.h"
122 #include "../private/fibril.h"
124 /** Async framework global futex */
125 futex_t async_futex
= FUTEX_INITIALIZER
;
127 /** Number of threads waiting for IPC in the kernel. */
128 static atomic_t threads_in_ipc_wait
= { 0 };
134 cap_call_handle_t chandle
;
138 /* Client connection data */
142 task_id_t in_task_id
;
147 /* Server connection data */
151 /** Hash table link. */
154 /** Incoming client task ID. */
155 task_id_t in_task_id
;
157 /** Incoming phone hash. */
158 sysarg_t in_phone_hash
;
160 /** Link to the client tracking structure. */
163 /** Messages that should be delivered to this fibril. */
166 /** Identification of the opening call. */
167 cap_call_handle_t chandle
;
169 /** Call data of the opening call. */
172 /** Identification of the closing call. */
173 cap_call_handle_t close_chandle
;
175 /** Fibril function that will be used to handle the connection. */
176 async_port_handler_t handler
;
182 /* Notification data */
184 /** notification_hash_table link */
187 /** notification_queue link */
190 /** Notification method */
193 /** Notification handler */
194 async_notification_handler_t handler
;
196 /** Notification handler argument */
199 /** Data of the most recent notification. */
203 * How many notifications with this `imethod` arrived since it was last
204 * handled. If `count` > 1, `calldata` only holds the data for the most
205 * recent such notification, all the older data being lost.
207 * `async_spawn_notification_handler()` can be used to increase the
208 * number of notifications that can be processed simultaneously,
209 * reducing the likelihood of losing them when the handler blocks.
214 /** Identifier of the incoming connection handled by the current fibril. */
215 static fibril_local connection_t
*fibril_connection
;
217 static void *default_client_data_constructor(void)
222 static void default_client_data_destructor(void *data
)
226 static async_client_data_ctor_t async_client_data_create
=
227 default_client_data_constructor
;
228 static async_client_data_dtor_t async_client_data_destroy
=
229 default_client_data_destructor
;
231 void async_set_client_data_constructor(async_client_data_ctor_t ctor
)
233 assert(async_client_data_create
== default_client_data_constructor
);
234 async_client_data_create
= ctor
;
237 void async_set_client_data_destructor(async_client_data_dtor_t dtor
)
239 assert(async_client_data_destroy
== default_client_data_destructor
);
240 async_client_data_destroy
= dtor
;
243 static futex_t client_futex
= FUTEX_INITIALIZER
;
244 static hash_table_t client_hash_table
;
246 // TODO: lockfree notification_queue?
247 static futex_t notification_futex
= FUTEX_INITIALIZER
;
248 static hash_table_t notification_hash_table
;
249 static LIST_INITIALIZE(notification_queue
);
250 static FIBRIL_SEMAPHORE_INITIALIZE(notification_semaphore
, 0);
252 static sysarg_t notification_avail
= 0;
254 /* The remaining structures are guarded by async_futex. */
255 static hash_table_t conn_hash_table
;
256 static LIST_INITIALIZE(timeout_list
);
258 static size_t client_key_hash(void *key
)
260 task_id_t in_task_id
= *(task_id_t
*) key
;
264 static size_t client_hash(const ht_link_t
*item
)
266 client_t
*client
= hash_table_get_inst(item
, client_t
, link
);
267 return client_key_hash(&client
->in_task_id
);
270 static bool client_key_equal(void *key
, const ht_link_t
*item
)
272 task_id_t in_task_id
= *(task_id_t
*) key
;
273 client_t
*client
= hash_table_get_inst(item
, client_t
, link
);
274 return in_task_id
== client
->in_task_id
;
277 /** Operations for the client hash table. */
278 static hash_table_ops_t client_hash_table_ops
= {
280 .key_hash
= client_key_hash
,
281 .key_equal
= client_key_equal
,
283 .remove_callback
= NULL
291 /** Compute hash into the connection hash table
293 * The hash is based on the source task ID and the source phone hash. The task
294 * ID is included in the hash because a phone hash alone might not be unique
295 * while we still track connections for killed tasks due to kernel's recycling
296 * of phone structures.
298 * @param key Pointer to the connection key structure.
300 * @return Index into the connection hash table.
303 static size_t conn_key_hash(void *key
)
305 conn_key_t
*ck
= (conn_key_t
*) key
;
308 hash
= hash_combine(hash
, LOWER32(ck
->task_id
));
309 hash
= hash_combine(hash
, UPPER32(ck
->task_id
));
310 hash
= hash_combine(hash
, ck
->phone_hash
);
314 static size_t conn_hash(const ht_link_t
*item
)
316 connection_t
*conn
= hash_table_get_inst(item
, connection_t
, link
);
317 return conn_key_hash(&(conn_key_t
){
318 .task_id
= conn
->in_task_id
,
319 .phone_hash
= conn
->in_phone_hash
323 static bool conn_key_equal(void *key
, const ht_link_t
*item
)
325 conn_key_t
*ck
= (conn_key_t
*) key
;
326 connection_t
*conn
= hash_table_get_inst(item
, connection_t
, link
);
327 return ((ck
->task_id
== conn
->in_task_id
) &&
328 (ck
->phone_hash
== conn
->in_phone_hash
));
331 /** Operations for the connection hash table. */
332 static hash_table_ops_t conn_hash_table_ops
= {
334 .key_hash
= conn_key_hash
,
335 .key_equal
= conn_key_equal
,
337 .remove_callback
= NULL
340 static client_t
*async_client_get(task_id_t client_id
, bool create
)
342 client_t
*client
= NULL
;
344 futex_lock(&client_futex
);
345 ht_link_t
*link
= hash_table_find(&client_hash_table
, &client_id
);
347 client
= hash_table_get_inst(link
, client_t
, link
);
348 atomic_inc(&client
->refcnt
);
350 // TODO: move the malloc out of critical section
351 client
= malloc(sizeof(client_t
));
353 client
->in_task_id
= client_id
;
354 client
->data
= async_client_data_create();
356 atomic_set(&client
->refcnt
, 1);
357 hash_table_insert(&client_hash_table
, &client
->link
);
361 futex_unlock(&client_futex
);
365 static void async_client_put(client_t
*client
)
369 futex_lock(&client_futex
);
371 if (atomic_predec(&client
->refcnt
) == 0) {
372 hash_table_remove(&client_hash_table
, &client
->in_task_id
);
377 futex_unlock(&client_futex
);
381 async_client_data_destroy(client
->data
);
387 /** Wrapper for client connection fibril.
389 * When a new connection arrives, a fibril with this implementing
390 * function is created.
392 * @param arg Connection structure pointer.
394 * @return Always zero.
397 static errno_t
connection_fibril(void *arg
)
402 * Setup fibril-local connection pointer.
404 fibril_connection
= (connection_t
*) arg
;
407 * Add our reference for the current connection in the client task
408 * tracking structure. If this is the first reference, create and
409 * hash in a new tracking structure.
412 client_t
*client
= async_client_get(fibril_connection
->in_task_id
, true);
414 ipc_answer_0(fibril_connection
->chandle
, ENOMEM
);
418 fibril_connection
->client
= client
;
421 * Call the connection handler function.
423 fibril_connection
->handler(fibril_connection
->chandle
,
424 &fibril_connection
->call
, fibril_connection
->data
);
427 * Remove the reference for this client task connection.
429 async_client_put(client
);
432 * Remove myself from the connection hash table.
434 futex_lock(&async_futex
);
435 hash_table_remove(&conn_hash_table
, &(conn_key_t
){
436 .task_id
= fibril_connection
->in_task_id
,
437 .phone_hash
= fibril_connection
->in_phone_hash
439 futex_unlock(&async_futex
);
442 * Answer all remaining messages with EHANGUP.
444 while (!list_empty(&fibril_connection
->msg_queue
)) {
446 list_get_instance(list_first(&fibril_connection
->msg_queue
),
449 list_remove(&msg
->link
);
450 ipc_answer_0(msg
->chandle
, EHANGUP
);
455 * If the connection was hung-up, answer the last call,
456 * i.e. IPC_M_PHONE_HUNGUP.
458 if (fibril_connection
->close_chandle
)
459 ipc_answer_0(fibril_connection
->close_chandle
, EOK
);
461 free(fibril_connection
);
465 /** Create a new fibril for a new connection.
467 * Create new fibril for connection, fill in connection structures and insert it
468 * into the hash table, so that later we can easily do routing of messages to
469 * particular fibrils.
471 * @param in_task_id Identification of the incoming connection.
472 * @param in_phone_hash Identification of the incoming connection.
473 * @param chandle Handle of the opening IPC_M_CONNECT_ME_TO call.
474 * If chandle is CAP_NIL, the connection was opened by
475 * accepting the IPC_M_CONNECT_TO_ME call and this
476 * function is called directly by the server.
477 * @param call Call data of the opening call.
478 * @param handler Connection handler.
479 * @param data Client argument to pass to the connection handler.
481 * @return New fibril id or NULL on failure.
484 static fid_t
async_new_connection(task_id_t in_task_id
, sysarg_t in_phone_hash
,
485 cap_call_handle_t chandle
, ipc_call_t
*call
, async_port_handler_t handler
,
488 connection_t
*conn
= malloc(sizeof(*conn
));
490 if (chandle
!= CAP_NIL
)
491 ipc_answer_0(chandle
, ENOMEM
);
493 return (uintptr_t) NULL
;
496 conn
->in_task_id
= in_task_id
;
497 conn
->in_phone_hash
= in_phone_hash
;
498 list_initialize(&conn
->msg_queue
);
499 conn
->chandle
= chandle
;
500 conn
->close_chandle
= CAP_NIL
;
501 conn
->handler
= handler
;
507 /* We will activate the fibril ASAP */
508 conn
->wdata
.active
= true;
509 conn
->wdata
.fid
= fibril_create(connection_fibril
, conn
);
511 if (conn
->wdata
.fid
== 0) {
514 if (chandle
!= CAP_NIL
)
515 ipc_answer_0(chandle
, ENOMEM
);
517 return (uintptr_t) NULL
;
520 /* Add connection to the connection hash table */
522 futex_lock(&async_futex
);
523 hash_table_insert(&conn_hash_table
, &conn
->link
);
524 futex_unlock(&async_futex
);
526 fibril_add_ready(conn
->wdata
.fid
);
528 return conn
->wdata
.fid
;
531 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
533 * Ask through phone for a new connection to some service.
535 * @param exch Exchange for sending the message.
536 * @param iface Callback interface.
537 * @param arg1 User defined argument.
538 * @param arg2 User defined argument.
539 * @param handler Callback handler.
540 * @param data Handler data.
541 * @param port_id ID of the newly created port.
543 * @return Zero on success or an error code.
546 errno_t
async_create_callback_port(async_exch_t
*exch
, iface_t iface
, sysarg_t arg1
,
547 sysarg_t arg2
, async_port_handler_t handler
, void *data
, port_id_t
*port_id
)
549 if ((iface
& IFACE_MOD_CALLBACK
) != IFACE_MOD_CALLBACK
)
556 aid_t req
= async_send_3(exch
, IPC_M_CONNECT_TO_ME
, iface
, arg1
, arg2
,
560 async_wait_for(req
, &rc
);
564 rc
= async_create_port_internal(iface
, handler
, data
, port_id
);
568 sysarg_t phone_hash
= IPC_GET_ARG5(answer
);
569 fid_t fid
= async_new_connection(answer
.in_task_id
, phone_hash
,
570 CAP_NIL
, NULL
, handler
, data
);
571 if (fid
== (uintptr_t) NULL
)
577 static size_t notification_key_hash(void *key
)
579 sysarg_t id
= *(sysarg_t
*) key
;
583 static size_t notification_hash(const ht_link_t
*item
)
585 notification_t
*notification
=
586 hash_table_get_inst(item
, notification_t
, htlink
);
587 return notification_key_hash(¬ification
->imethod
);
590 static bool notification_key_equal(void *key
, const ht_link_t
*item
)
592 sysarg_t id
= *(sysarg_t
*) key
;
593 notification_t
*notification
=
594 hash_table_get_inst(item
, notification_t
, htlink
);
595 return id
== notification
->imethod
;
598 /** Operations for the notification hash table. */
599 static hash_table_ops_t notification_hash_table_ops
= {
600 .hash
= notification_hash
,
601 .key_hash
= notification_key_hash
,
602 .key_equal
= notification_key_equal
,
604 .remove_callback
= NULL
607 /** Sort in current fibril's timeout request.
609 * @param wd Wait data of the current fibril.
612 void async_insert_timeout(awaiter_t
*wd
)
616 wd
->to_event
.occurred
= false;
617 wd
->to_event
.inlist
= true;
619 link_t
*tmp
= timeout_list
.head
.next
;
620 while (tmp
!= &timeout_list
.head
) {
622 list_get_instance(tmp
, awaiter_t
, to_event
.link
);
624 if (tv_gteq(&cur
->to_event
.expires
, &wd
->to_event
.expires
))
630 list_insert_before(&wd
->to_event
.link
, tmp
);
633 /** Try to route a call to an appropriate connection fibril.
635 * If the proper connection fibril is found, a message with the call is added to
636 * its message queue. If the fibril was not active, it is activated and all
637 * timeouts are unregistered.
639 * @param chandle Handle of the incoming call.
640 * @param call Data of the incoming call.
642 * @return False if the call doesn't match any connection.
643 * @return True if the call was passed to the respective connection fibril.
646 static bool route_call(cap_call_handle_t chandle
, ipc_call_t
*call
)
650 futex_lock(&async_futex
);
652 ht_link_t
*link
= hash_table_find(&conn_hash_table
, &(conn_key_t
){
653 .task_id
= call
->in_task_id
,
654 .phone_hash
= call
->in_phone_hash
657 futex_unlock(&async_futex
);
661 connection_t
*conn
= hash_table_get_inst(link
, connection_t
, link
);
663 msg_t
*msg
= malloc(sizeof(*msg
));
665 futex_unlock(&async_futex
);
669 msg
->chandle
= chandle
;
671 list_append(&msg
->link
, &conn
->msg_queue
);
673 if (IPC_GET_IMETHOD(*call
) == IPC_M_PHONE_HUNGUP
)
674 conn
->close_chandle
= chandle
;
676 /* If the connection fibril is waiting for an event, activate it */
677 if (!conn
->wdata
.active
) {
679 /* If in timeout list, remove it */
680 if (conn
->wdata
.to_event
.inlist
) {
681 conn
->wdata
.to_event
.inlist
= false;
682 list_remove(&conn
->wdata
.to_event
.link
);
685 conn
->wdata
.active
= true;
686 fibril_add_ready(conn
->wdata
.fid
);
689 futex_unlock(&async_futex
);
693 /** Function implementing the notification handler fibril. Never returns. */
694 static errno_t
notification_fibril_func(void *arg
)
699 fibril_semaphore_down(¬ification_semaphore
);
701 futex_lock(¬ification_futex
);
704 * The semaphore ensures that if we get this far,
705 * the queue must be non-empty.
707 assert(!list_empty(¬ification_queue
));
709 notification_t
*notification
= list_get_instance(
710 list_first(¬ification_queue
), notification_t
, qlink
);
711 list_remove(¬ification
->qlink
);
713 async_notification_handler_t handler
= notification
->handler
;
714 void *arg
= notification
->arg
;
715 ipc_call_t calldata
= notification
->calldata
;
716 long count
= notification
->count
;
718 notification
->count
= 0;
720 futex_unlock(¬ification_futex
);
722 // FIXME: Pass count to the handler. It might be important.
726 handler(&calldata
, arg
);
734 * Creates a new dedicated fibril for handling notifications.
735 * By default, there is one such fibril. This function can be used to
736 * create more in order to increase the number of notification that can
737 * be processed concurrently.
739 * Currently, there is no way to destroy those fibrils after they are created.
741 errno_t
async_spawn_notification_handler(void)
743 fid_t f
= fibril_create(notification_fibril_func
, NULL
);
751 /** Queue notification.
753 * @param call Data of the incoming call.
756 static void queue_notification(ipc_call_t
*call
)
760 futex_lock(¬ification_futex
);
762 ht_link_t
*link
= hash_table_find(¬ification_hash_table
,
763 &IPC_GET_IMETHOD(*call
));
765 /* Invalid notification. */
766 // TODO: Make sure this can't happen and turn it into assert.
767 futex_unlock(¬ification_futex
);
771 notification_t
*notification
=
772 hash_table_get_inst(link
, notification_t
, htlink
);
774 notification
->count
++;
775 notification
->calldata
= *call
;
777 if (link_in_use(¬ification
->qlink
)) {
778 /* Notification already queued. */
779 futex_unlock(¬ification_futex
);
783 list_append(¬ification
->qlink
, ¬ification_queue
);
784 futex_unlock(¬ification_futex
);
786 fibril_semaphore_up(¬ification_semaphore
);
790 * Creates a new notification structure and inserts it into the hash table.
792 * @param handler Function to call when notification is received.
793 * @param arg Argument for the handler function.
794 * @return The newly created notification structure.
796 static notification_t
*notification_create(async_notification_handler_t handler
, void *arg
)
798 notification_t
*notification
= calloc(1, sizeof(notification_t
));
802 notification
->handler
= handler
;
803 notification
->arg
= arg
;
807 futex_lock(¬ification_futex
);
809 if (notification_avail
== 0) {
810 /* Attempt to create the first handler fibril. */
811 fib
= fibril_create(notification_fibril_func
, NULL
);
813 futex_unlock(¬ification_futex
);
819 sysarg_t imethod
= notification_avail
;
820 notification_avail
++;
822 notification
->imethod
= imethod
;
823 hash_table_insert(¬ification_hash_table
, ¬ification
->htlink
);
825 futex_unlock(¬ification_futex
);
829 fibril_add_ready(fib
);
835 /** Subscribe to IRQ notification.
837 * @param inr IRQ number.
838 * @param handler Notification handler.
839 * @param data Notification handler client data.
840 * @param ucode Top-half pseudocode handler.
842 * @param[out] handle IRQ capability handle on success.
844 * @return An error code.
847 errno_t
async_irq_subscribe(int inr
, async_notification_handler_t handler
,
848 void *data
, const irq_code_t
*ucode
, cap_irq_handle_t
*handle
)
850 notification_t
*notification
= notification_create(handler
, data
);
854 cap_irq_handle_t ihandle
;
855 errno_t rc
= ipc_irq_subscribe(inr
, notification
->imethod
, ucode
,
857 if (rc
== EOK
&& handle
!= NULL
) {
863 /** Unsubscribe from IRQ notification.
865 * @param handle IRQ capability handle.
867 * @return Zero on success or an error code.
870 errno_t
async_irq_unsubscribe(cap_irq_handle_t ihandle
)
872 // TODO: Remove entry from hash table
873 // to avoid memory leak
875 return ipc_irq_unsubscribe(ihandle
);
878 /** Subscribe to event notifications.
880 * @param evno Event type to subscribe.
881 * @param handler Notification handler.
882 * @param data Notification handler client data.
884 * @return Zero on success or an error code.
887 errno_t
async_event_subscribe(event_type_t evno
,
888 async_notification_handler_t handler
, void *data
)
890 notification_t
*notification
= notification_create(handler
, data
);
894 return ipc_event_subscribe(evno
, notification
->imethod
);
897 /** Subscribe to task event notifications.
899 * @param evno Event type to subscribe.
900 * @param handler Notification handler.
901 * @param data Notification handler client data.
903 * @return Zero on success or an error code.
906 errno_t
async_event_task_subscribe(event_task_type_t evno
,
907 async_notification_handler_t handler
, void *data
)
909 notification_t
*notification
= notification_create(handler
, data
);
913 return ipc_event_task_subscribe(evno
, notification
->imethod
);
916 /** Unmask event notifications.
918 * @param evno Event type to unmask.
920 * @return Value returned by the kernel.
923 errno_t
async_event_unmask(event_type_t evno
)
925 return ipc_event_unmask(evno
);
928 /** Unmask task event notifications.
930 * @param evno Event type to unmask.
932 * @return Value returned by the kernel.
935 errno_t
async_event_task_unmask(event_task_type_t evno
)
937 return ipc_event_task_unmask(evno
);
940 /** Return new incoming message for the current (fibril-local) connection.
942 * @param call Storage where the incoming call data will be stored.
943 * @param usecs Timeout in microseconds. Zero denotes no timeout.
945 * @return If no timeout was specified, then a handle of the incoming call is
946 * returned. If a timeout is specified, then a handle of the incoming
947 * call is returned unless the timeout expires prior to receiving a
948 * message. In that case zero CAP_NIL is returned.
950 cap_call_handle_t
async_get_call_timeout(ipc_call_t
*call
, suseconds_t usecs
)
953 assert(fibril_connection
);
957 * GCC 4.1.0 coughs on fibril_connection-> dereference.
958 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
959 * I would never expect to find so many errors in
962 connection_t
*conn
= fibril_connection
;
964 futex_lock(&async_futex
);
967 getuptime(&conn
->wdata
.to_event
.expires
);
968 tv_add_diff(&conn
->wdata
.to_event
.expires
, usecs
);
970 conn
->wdata
.to_event
.inlist
= false;
972 /* If nothing in queue, wait until something arrives */
973 while (list_empty(&conn
->msg_queue
)) {
974 if (conn
->close_chandle
) {
976 * Handle the case when the connection was already
977 * closed by the client but the server did not notice
978 * the first IPC_M_PHONE_HUNGUP call and continues to
979 * call async_get_call_timeout(). Repeat
980 * IPC_M_PHONE_HUNGUP until the caller notices.
982 memset(call
, 0, sizeof(ipc_call_t
));
983 IPC_SET_IMETHOD(*call
, IPC_M_PHONE_HUNGUP
);
984 futex_unlock(&async_futex
);
985 return conn
->close_chandle
;
989 async_insert_timeout(&conn
->wdata
);
991 conn
->wdata
.active
= false;
994 * Note: the current fibril will be rescheduled either due to a
995 * timeout or due to an arriving message destined to it. In the
996 * former case, handle_expired_timeouts() and, in the latter
997 * case, route_call() will perform the wakeup.
999 fibril_switch(FIBRIL_FROM_BLOCKED
);
1001 if ((usecs
) && (conn
->wdata
.to_event
.occurred
) &&
1002 (list_empty(&conn
->msg_queue
))) {
1003 /* If we timed out -> exit */
1004 futex_unlock(&async_futex
);
1009 msg_t
*msg
= list_get_instance(list_first(&conn
->msg_queue
),
1011 list_remove(&msg
->link
);
1013 cap_call_handle_t chandle
= msg
->chandle
;
1017 futex_unlock(&async_futex
);
1021 void *async_get_client_data(void)
1023 assert(fibril_connection
);
1024 return fibril_connection
->client
->data
;
1027 void *async_get_client_data_by_id(task_id_t client_id
)
1029 client_t
*client
= async_client_get(client_id
, false);
1033 if (!client
->data
) {
1034 async_client_put(client
);
1038 return client
->data
;
1041 void async_put_client_data_by_id(task_id_t client_id
)
1043 client_t
*client
= async_client_get(client_id
, false);
1046 assert(client
->data
);
1048 /* Drop the reference we got in async_get_client_data_by_hash(). */
1049 async_client_put(client
);
1051 /* Drop our own reference we got at the beginning of this function. */
1052 async_client_put(client
);
1055 /** Handle a call that was received.
1057 * If the call has the IPC_M_CONNECT_ME_TO method, a new connection is created.
1058 * Otherwise the call is routed to its connection fibril.
1060 * @param chandle Handle of the incoming call.
1061 * @param call Data of the incoming call.
1064 static void handle_call(cap_call_handle_t chandle
, ipc_call_t
*call
)
1068 if (call
->flags
& IPC_CALL_ANSWERED
)
1071 if (chandle
== CAP_NIL
) {
1072 if (call
->flags
& IPC_CALL_NOTIF
) {
1073 /* Kernel notification */
1074 queue_notification(call
);
1079 /* New connection */
1080 if (IPC_GET_IMETHOD(*call
) == IPC_M_CONNECT_ME_TO
) {
1081 iface_t iface
= (iface_t
) IPC_GET_ARG1(*call
);
1082 sysarg_t in_phone_hash
= IPC_GET_ARG5(*call
);
1084 // TODO: Currently ignores all ports but the first one.
1086 async_port_handler_t handler
=
1087 async_get_port_handler(iface
, 0, &data
);
1089 async_new_connection(call
->in_task_id
, in_phone_hash
, chandle
,
1090 call
, handler
, data
);
1094 /* Try to route the call through the connection hash table */
1095 if (route_call(chandle
, call
))
1098 /* Unknown call from unknown phone - hang it up */
1099 ipc_answer_0(chandle
, EHANGUP
);
1102 /** Fire all timeouts that expired. */
1103 static suseconds_t
handle_expired_timeouts(unsigned int *flags
)
1105 /* Make sure the async_futex is held. */
1106 futex_assert_is_locked(&async_futex
);
1113 link_t
*cur
= list_first(&timeout_list
);
1114 while (cur
!= NULL
) {
1116 list_get_instance(cur
, awaiter_t
, to_event
.link
);
1118 if (tv_gt(&waiter
->to_event
.expires
, &tv
)) {
1120 *flags
= SYNCH_FLAGS_NON_BLOCKING
;
1124 return tv_sub_diff(&waiter
->to_event
.expires
, &tv
);
1127 list_remove(&waiter
->to_event
.link
);
1128 waiter
->to_event
.inlist
= false;
1129 waiter
->to_event
.occurred
= true;
1132 * Redundant condition?
1133 * The fibril should not be active when it gets here.
1135 if (!waiter
->active
) {
1136 waiter
->active
= true;
1137 fibril_add_ready(waiter
->fid
);
1141 cur
= list_first(&timeout_list
);
1145 *flags
= SYNCH_FLAGS_NON_BLOCKING
;
1149 return SYNCH_NO_TIMEOUT
;
1152 /** Endless loop dispatching incoming calls and answers.
1154 * @return Never returns.
1157 static errno_t
async_manager_worker(void)
1160 futex_lock(&async_futex
);
1161 fibril_switch(FIBRIL_FROM_MANAGER
);
1164 * The switch only returns when there is no non-manager fibril
1168 unsigned int flags
= SYNCH_FLAGS_NONE
;
1169 suseconds_t next_timeout
= handle_expired_timeouts(&flags
);
1170 futex_unlock(&async_futex
);
1172 atomic_inc(&threads_in_ipc_wait
);
1175 errno_t rc
= ipc_wait_cycle(&call
, next_timeout
, flags
);
1177 atomic_dec(&threads_in_ipc_wait
);
1180 handle_call(call
.cap_handle
, &call
);
1186 /** Function to start async_manager as a standalone fibril.
1188 * When more kernel threads are used, one async manager should exist per thread.
1190 * @param arg Unused.
1191 * @return Never returns.
1194 static errno_t
async_manager_fibril(void *arg
)
1196 async_manager_worker();
1200 /** Add one manager to manager list. */
1201 void async_create_manager(void)
1203 fid_t fid
= fibril_create_generic(async_manager_fibril
, NULL
, PAGE_SIZE
);
1205 fibril_add_manager(fid
);
1208 /** Remove one manager from manager list */
1209 void async_destroy_manager(void)
1211 fibril_remove_manager();
1214 /** Initialize the async framework.
1217 void __async_server_init(void)
1219 if (!hash_table_create(&client_hash_table
, 0, 0, &client_hash_table_ops
))
1222 if (!hash_table_create(&conn_hash_table
, 0, 0, &conn_hash_table_ops
))
1225 if (!hash_table_create(¬ification_hash_table
, 0, 0,
1226 ¬ification_hash_table_ops
))
1230 errno_t
async_answer_0(cap_call_handle_t chandle
, errno_t retval
)
1232 return ipc_answer_0(chandle
, retval
);
1235 errno_t
async_answer_1(cap_call_handle_t chandle
, errno_t retval
, sysarg_t arg1
)
1237 return ipc_answer_1(chandle
, retval
, arg1
);
1240 errno_t
async_answer_2(cap_call_handle_t chandle
, errno_t retval
, sysarg_t arg1
,
1243 return ipc_answer_2(chandle
, retval
, arg1
, arg2
);
1246 errno_t
async_answer_3(cap_call_handle_t chandle
, errno_t retval
, sysarg_t arg1
,
1247 sysarg_t arg2
, sysarg_t arg3
)
1249 return ipc_answer_3(chandle
, retval
, arg1
, arg2
, arg3
);
1252 errno_t
async_answer_4(cap_call_handle_t chandle
, errno_t retval
, sysarg_t arg1
,
1253 sysarg_t arg2
, sysarg_t arg3
, sysarg_t arg4
)
1255 return ipc_answer_4(chandle
, retval
, arg1
, arg2
, arg3
, arg4
);
1258 errno_t
async_answer_5(cap_call_handle_t chandle
, errno_t retval
, sysarg_t arg1
,
1259 sysarg_t arg2
, sysarg_t arg3
, sysarg_t arg4
, sysarg_t arg5
)
1261 return ipc_answer_5(chandle
, retval
, arg1
, arg2
, arg3
, arg4
, arg5
);
1264 errno_t
async_forward_fast(cap_call_handle_t chandle
, async_exch_t
*exch
,
1265 sysarg_t imethod
, sysarg_t arg1
, sysarg_t arg2
, unsigned int mode
)
1270 return ipc_forward_fast(chandle
, exch
->phone
, imethod
, arg1
, arg2
, mode
);
1273 errno_t
async_forward_slow(cap_call_handle_t chandle
, async_exch_t
*exch
,
1274 sysarg_t imethod
, sysarg_t arg1
, sysarg_t arg2
, sysarg_t arg3
,
1275 sysarg_t arg4
, sysarg_t arg5
, unsigned int mode
)
1280 return ipc_forward_slow(chandle
, exch
->phone
, imethod
, arg1
, arg2
, arg3
,
1284 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
1286 * Ask through phone for a new connection to some service.
1288 * @param exch Exchange for sending the message.
1289 * @param arg1 User defined argument.
1290 * @param arg2 User defined argument.
1291 * @param arg3 User defined argument.
1293 * @return Zero on success or an error code.
1296 errno_t
async_connect_to_me(async_exch_t
*exch
, sysarg_t arg1
, sysarg_t arg2
,
1303 aid_t req
= async_send_3(exch
, IPC_M_CONNECT_TO_ME
, arg1
, arg2
, arg3
,
1307 async_wait_for(req
, &rc
);
1309 return (errno_t
) rc
;
1314 /** Interrupt one thread of this task from waiting for IPC. */
1315 void async_poke(void)
1317 if (atomic_get(&threads_in_ipc_wait
) > 0)
1321 /** Wrapper for receiving the IPC_M_SHARE_IN calls using the async framework.
1323 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_IN
1324 * calls so that the user doesn't have to remember the meaning of each IPC
1327 * So far, this wrapper is to be used from within a connection fibril.
1329 * @param chandle Storage for the handle of the IPC_M_SHARE_IN call.
1330 * @param size Destination address space area size.
1332 * @return True on success, false on failure.
1335 bool async_share_in_receive(cap_call_handle_t
*chandle
, size_t *size
)
1341 *chandle
= async_get_call(&data
);
1343 if (IPC_GET_IMETHOD(data
) != IPC_M_SHARE_IN
)
1346 *size
= (size_t) IPC_GET_ARG1(data
);
1350 /** Wrapper for answering the IPC_M_SHARE_IN calls using the async framework.
1352 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_IN
1353 * calls so that the user doesn't have to remember the meaning of each IPC
1356 * @param chandle Handle of the IPC_M_DATA_READ call to answer.
1357 * @param src Source address space base.
1358 * @param flags Flags to be used for sharing. Bits can be only cleared.
1360 * @return Zero on success or a value from @ref errno.h on failure.
1363 errno_t
async_share_in_finalize(cap_call_handle_t chandle
, void *src
,
1366 // FIXME: The source has no business deciding destination address.
1367 return ipc_answer_3(chandle
, EOK
, (sysarg_t
) src
, (sysarg_t
) flags
,
1371 /** Wrapper for receiving the IPC_M_SHARE_OUT calls using the async framework.
1373 * This wrapper only makes it more comfortable to receive IPC_M_SHARE_OUT
1374 * calls so that the user doesn't have to remember the meaning of each IPC
1377 * So far, this wrapper is to be used from within a connection fibril.
1379 * @param chandle Storage for the hash of the IPC_M_SHARE_OUT call.
1380 * @param size Storage for the source address space area size.
1381 * @param flags Storage for the sharing flags.
1383 * @return True on success, false on failure.
1386 bool async_share_out_receive(cap_call_handle_t
*chandle
, size_t *size
,
1387 unsigned int *flags
)
1394 *chandle
= async_get_call(&data
);
1396 if (IPC_GET_IMETHOD(data
) != IPC_M_SHARE_OUT
)
1399 *size
= (size_t) IPC_GET_ARG2(data
);
1400 *flags
= (unsigned int) IPC_GET_ARG3(data
);
1404 /** Wrapper for answering the IPC_M_SHARE_OUT calls using the async framework.
1406 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_OUT
1407 * calls so that the user doesn't have to remember the meaning of each IPC
1410 * @param chandle Handle of the IPC_M_DATA_WRITE call to answer.
1411 * @param dst Address of the storage for the destination address space area
1414 * @return Zero on success or a value from @ref errno.h on failure.
1417 errno_t
async_share_out_finalize(cap_call_handle_t chandle
, void **dst
)
1419 return ipc_answer_2(chandle
, EOK
, (sysarg_t
) _end
, (sysarg_t
) dst
);
1422 /** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
1424 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
1425 * calls so that the user doesn't have to remember the meaning of each IPC
1428 * So far, this wrapper is to be used from within a connection fibril.
1430 * @param chandle Storage for the handle of the IPC_M_DATA_READ.
1431 * @param size Storage for the maximum size. Can be NULL.
1433 * @return True on success, false on failure.
1436 bool async_data_read_receive(cap_call_handle_t
*chandle
, size_t *size
)
1439 return async_data_read_receive_call(chandle
, &data
, size
);
1442 /** Wrapper for receiving the IPC_M_DATA_READ calls using the async framework.
1444 * This wrapper only makes it more comfortable to receive IPC_M_DATA_READ
1445 * calls so that the user doesn't have to remember the meaning of each IPC
1448 * So far, this wrapper is to be used from within a connection fibril.
1450 * @param chandle Storage for the handle of the IPC_M_DATA_READ.
1451 * @param size Storage for the maximum size. Can be NULL.
1453 * @return True on success, false on failure.
1456 bool async_data_read_receive_call(cap_call_handle_t
*chandle
, ipc_call_t
*data
,
1462 *chandle
= async_get_call(data
);
1464 if (IPC_GET_IMETHOD(*data
) != IPC_M_DATA_READ
)
1468 *size
= (size_t) IPC_GET_ARG2(*data
);
1473 /** Wrapper for answering the IPC_M_DATA_READ calls using the async framework.
1475 * This wrapper only makes it more comfortable to answer IPC_M_DATA_READ
1476 * calls so that the user doesn't have to remember the meaning of each IPC
1479 * @param chandle Handle of the IPC_M_DATA_READ call to answer.
1480 * @param src Source address for the IPC_M_DATA_READ call.
1481 * @param size Size for the IPC_M_DATA_READ call. Can be smaller than
1482 * the maximum size announced by the sender.
1484 * @return Zero on success or a value from @ref errno.h on failure.
1487 errno_t
async_data_read_finalize(cap_call_handle_t chandle
, const void *src
,
1490 return ipc_answer_2(chandle
, EOK
, (sysarg_t
) src
, (sysarg_t
) size
);
1493 /** Wrapper for forwarding any read request
1496 errno_t
async_data_read_forward_fast(async_exch_t
*exch
, sysarg_t imethod
,
1497 sysarg_t arg1
, sysarg_t arg2
, sysarg_t arg3
, sysarg_t arg4
,
1498 ipc_call_t
*dataptr
)
1503 cap_call_handle_t chandle
;
1504 if (!async_data_read_receive(&chandle
, NULL
)) {
1505 ipc_answer_0(chandle
, EINVAL
);
1509 aid_t msg
= async_send_fast(exch
, imethod
, arg1
, arg2
, arg3
, arg4
,
1512 ipc_answer_0(chandle
, EINVAL
);
1516 errno_t retval
= ipc_forward_fast(chandle
, exch
->phone
, 0, 0, 0,
1517 IPC_FF_ROUTE_FROM_ME
);
1518 if (retval
!= EOK
) {
1520 ipc_answer_0(chandle
, retval
);
1525 async_wait_for(msg
, &rc
);
1527 return (errno_t
) rc
;
1530 /** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
1532 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
1533 * calls so that the user doesn't have to remember the meaning of each IPC
1536 * So far, this wrapper is to be used from within a connection fibril.
1538 * @param chandle Storage for the handle of the IPC_M_DATA_WRITE.
1539 * @param size Storage for the suggested size. May be NULL.
1541 * @return True on success, false on failure.
1544 bool async_data_write_receive(cap_call_handle_t
*chandle
, size_t *size
)
1547 return async_data_write_receive_call(chandle
, &data
, size
);
1550 /** Wrapper for receiving the IPC_M_DATA_WRITE calls using the async framework.
1552 * This wrapper only makes it more comfortable to receive IPC_M_DATA_WRITE
1553 * calls so that the user doesn't have to remember the meaning of each IPC
1556 * So far, this wrapper is to be used from within a connection fibril.
1558 * @param chandle Storage for the handle of the IPC_M_DATA_WRITE.
1559 * @param data Storage for the ipc call data.
1560 * @param size Storage for the suggested size. May be NULL.
1562 * @return True on success, false on failure.
1565 bool async_data_write_receive_call(cap_call_handle_t
*chandle
, ipc_call_t
*data
,
1571 *chandle
= async_get_call(data
);
1573 if (IPC_GET_IMETHOD(*data
) != IPC_M_DATA_WRITE
)
1577 *size
= (size_t) IPC_GET_ARG2(*data
);
1582 /** Wrapper for answering the IPC_M_DATA_WRITE calls using the async framework.
1584 * This wrapper only makes it more comfortable to answer IPC_M_DATA_WRITE
1585 * calls so that the user doesn't have to remember the meaning of each IPC
1588 * @param chandle Handle of the IPC_M_DATA_WRITE call to answer.
1589 * @param dst Final destination address for the IPC_M_DATA_WRITE call.
1590 * @param size Final size for the IPC_M_DATA_WRITE call.
1592 * @return Zero on success or a value from @ref errno.h on failure.
1595 errno_t
async_data_write_finalize(cap_call_handle_t chandle
, void *dst
,
1598 return ipc_answer_2(chandle
, EOK
, (sysarg_t
) dst
, (sysarg_t
) size
);
1601 /** Wrapper for receiving binary data or strings
1603 * This wrapper only makes it more comfortable to use async_data_write_*
1604 * functions to receive binary data or strings.
1606 * @param data Pointer to data pointer (which should be later disposed
1607 * by free()). If the operation fails, the pointer is not
1609 * @param nullterm If true then the received data is always zero terminated.
1610 * This also causes to allocate one extra byte beyond the
1611 * raw transmitted data.
1612 * @param min_size Minimum size (in bytes) of the data to receive.
1613 * @param max_size Maximum size (in bytes) of the data to receive. 0 means
1615 * @param granulariy If non-zero then the size of the received data has to
1616 * be divisible by this value.
1617 * @param received If not NULL, the size of the received data is stored here.
1619 * @return Zero on success or a value from @ref errno.h on failure.
1622 errno_t
async_data_write_accept(void **data
, const bool nullterm
,
1623 const size_t min_size
, const size_t max_size
, const size_t granularity
,
1628 cap_call_handle_t chandle
;
1630 if (!async_data_write_receive(&chandle
, &size
)) {
1631 ipc_answer_0(chandle
, EINVAL
);
1635 if (size
< min_size
) {
1636 ipc_answer_0(chandle
, EINVAL
);
1640 if ((max_size
> 0) && (size
> max_size
)) {
1641 ipc_answer_0(chandle
, EINVAL
);
1645 if ((granularity
> 0) && ((size
% granularity
) != 0)) {
1646 ipc_answer_0(chandle
, EINVAL
);
1653 arg_data
= malloc(size
+ 1);
1655 arg_data
= malloc(size
);
1657 if (arg_data
== NULL
) {
1658 ipc_answer_0(chandle
, ENOMEM
);
1662 errno_t rc
= async_data_write_finalize(chandle
, arg_data
, size
);
1669 ((char *) arg_data
)[size
] = 0;
1672 if (received
!= NULL
)
1678 /** Wrapper for voiding any data that is about to be received
1680 * This wrapper can be used to void any pending data
1682 * @param retval Error value from @ref errno.h to be returned to the caller.
1685 void async_data_write_void(errno_t retval
)
1687 cap_call_handle_t chandle
;
1688 async_data_write_receive(&chandle
, NULL
);
1689 ipc_answer_0(chandle
, retval
);
1692 /** Wrapper for forwarding any data that is about to be received
1695 errno_t
async_data_write_forward_fast(async_exch_t
*exch
, sysarg_t imethod
,
1696 sysarg_t arg1
, sysarg_t arg2
, sysarg_t arg3
, sysarg_t arg4
,
1697 ipc_call_t
*dataptr
)
1702 cap_call_handle_t chandle
;
1703 if (!async_data_write_receive(&chandle
, NULL
)) {
1704 ipc_answer_0(chandle
, EINVAL
);
1708 aid_t msg
= async_send_fast(exch
, imethod
, arg1
, arg2
, arg3
, arg4
,
1711 ipc_answer_0(chandle
, EINVAL
);
1715 errno_t retval
= ipc_forward_fast(chandle
, exch
->phone
, 0, 0, 0,
1716 IPC_FF_ROUTE_FROM_ME
);
1717 if (retval
!= EOK
) {
1719 ipc_answer_0(chandle
, retval
);
1724 async_wait_for(msg
, &rc
);
1726 return (errno_t
) rc
;
1729 /** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
1731 * If the current call is IPC_M_CONNECT_TO_ME then a new
1732 * async session is created for the accepted phone.
1734 * @param mgmt Exchange management style.
1736 * @return New async session.
1737 * @return NULL on failure.
1740 async_sess_t
*async_callback_receive(exch_mgmt_t mgmt
)
1742 /* Accept the phone */
1744 cap_call_handle_t chandle
= async_get_call(&call
);
1745 cap_phone_handle_t phandle
= (cap_handle_t
) IPC_GET_ARG5(call
);
1747 if ((IPC_GET_IMETHOD(call
) != IPC_M_CONNECT_TO_ME
) ||
1748 !CAP_HANDLE_VALID((phandle
))) {
1749 async_answer_0(chandle
, EINVAL
);
1753 async_sess_t
*sess
= (async_sess_t
*) malloc(sizeof(async_sess_t
));
1755 async_answer_0(chandle
, ENOMEM
);
1761 sess
->phone
= phandle
;
1766 fibril_mutex_initialize(&sess
->remote_state_mtx
);
1767 sess
->remote_state_data
= NULL
;
1769 list_initialize(&sess
->exch_list
);
1770 fibril_mutex_initialize(&sess
->mutex
);
1771 atomic_set(&sess
->refcnt
, 0);
1773 /* Acknowledge the connected phone */
1774 async_answer_0(chandle
, EOK
);
1779 /** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
1781 * If the call is IPC_M_CONNECT_TO_ME then a new
1782 * async session is created. However, the phone is
1783 * not accepted automatically.
1785 * @param mgmt Exchange management style.
1786 * @param call Call data.
1788 * @return New async session.
1789 * @return NULL on failure.
1790 * @return NULL if the call is not IPC_M_CONNECT_TO_ME.
1793 async_sess_t
*async_callback_receive_start(exch_mgmt_t mgmt
, ipc_call_t
*call
)
1795 cap_phone_handle_t phandle
= (cap_handle_t
) IPC_GET_ARG5(*call
);
1797 if ((IPC_GET_IMETHOD(*call
) != IPC_M_CONNECT_TO_ME
) ||
1798 !CAP_HANDLE_VALID((phandle
)))
1801 async_sess_t
*sess
= (async_sess_t
*) malloc(sizeof(async_sess_t
));
1807 sess
->phone
= phandle
;
1812 fibril_mutex_initialize(&sess
->remote_state_mtx
);
1813 sess
->remote_state_data
= NULL
;
1815 list_initialize(&sess
->exch_list
);
1816 fibril_mutex_initialize(&sess
->mutex
);
1817 atomic_set(&sess
->refcnt
, 0);
1822 bool async_state_change_receive(cap_call_handle_t
*chandle
, sysarg_t
*arg1
,
1823 sysarg_t
*arg2
, sysarg_t
*arg3
)
1828 *chandle
= async_get_call(&call
);
1830 if (IPC_GET_IMETHOD(call
) != IPC_M_STATE_CHANGE_AUTHORIZE
)
1834 *arg1
= IPC_GET_ARG1(call
);
1836 *arg2
= IPC_GET_ARG2(call
);
1838 *arg3
= IPC_GET_ARG3(call
);
1843 errno_t
async_state_change_finalize(cap_call_handle_t chandle
,
1844 async_exch_t
*other_exch
)
1846 return ipc_answer_1(chandle
, EOK
, CAP_HANDLE_RAW(other_exch
->phone
));
1849 __noreturn
void async_manager(void)
1851 futex_lock(&async_futex
);
1852 fibril_switch(FIBRIL_FROM_DEAD
);
1853 __builtin_unreachable();