1 /* Copyright (c) 2001, Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
8 * \file dispatch_core.c
9 * \brief Core module for sending and receiving messages.
12 #define DISPATCH_PRIVATE
15 #include "lib/dispatch/dispatch.h"
16 #include "lib/dispatch/dispatch_st.h"
17 #include "lib/dispatch/dispatch_naming.h"
19 #include "lib/malloc/malloc.h"
20 #include "lib/log/util_bug.h"
25 * Use <b>d</b> to drop all storage held for <b>msg</b>.
27 * (We need the dispatcher so we know how to free the auxiliary data.)
30 dispatch_free_msg_(const dispatch_t
*d
, msg_t
*msg
)
35 d
->typefns
[msg
->type
].free_fn(msg
->aux_data__
);
40 * Format the auxiliary data held by msg.
43 dispatch_fmt_msg_data(const dispatch_t
*d
, const msg_t
*msg
)
48 return d
->typefns
[msg
->type
].fmt_fn(msg
->aux_data__
);
52 * Release all storage held by <b>d</b>.
55 dispatch_free_(dispatch_t
*d
)
60 size_t n_queues
= d
->n_queues
;
61 for (size_t i
= 0; i
< n_queues
; ++i
) {
63 TOR_SIMPLEQ_FOREACH_SAFE(m
, &d
->queues
[i
].queue
, next
, mtmp
) {
64 dispatch_free_msg(d
, m
);
68 size_t n_msgs
= d
->n_msgs
;
70 for (size_t i
= 0; i
< n_msgs
; ++i
) {
71 tor_free(d
->table
[i
]);
77 // This is the only time we will treat d->cfg as non-const.
78 //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
84 * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
85 * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
88 dispatch_set_alert_fn(dispatch_t
*d
, channel_id_t chan
,
89 dispatch_alertfn_t fn
, void *userdata
)
91 if (BUG(chan
>= d
->n_queues
))
94 dqueue_t
*q
= &d
->queues
[chan
];
96 q
->alert_fn_arg
= userdata
;
101 * Send a message on the appropriate channel notifying that channel if
104 * This function takes ownership of the auxiliary data; it can't be static or
105 * stack-allocated, and the caller is not allowed to use it afterwards.
107 * This function does not check the various vields of the message object for
111 dispatch_send(dispatch_t
*d
,
113 channel_id_t channel
,
116 msg_aux_data_t auxdata
)
118 if (!d
->table
[msg
]) {
119 /* Fast path: nobody wants this data. */
121 d
->typefns
[type
].free_fn(auxdata
);
125 msg_t
*m
= tor_malloc(sizeof(msg_t
));
128 m
->channel
= channel
;
131 memcpy(&m
->aux_data__
, &auxdata
, sizeof(msg_aux_data_t
));
133 return dispatch_send_msg(d
, m
);
137 dispatch_send_msg(dispatch_t
*d
, msg_t
*m
)
143 if (BUG(m
->channel
>= d
->n_queues
))
145 if (BUG(m
->msg
>= d
->n_msgs
))
148 dtbl_entry_t
*ent
= d
->table
[m
->msg
];
150 if (BUG(m
->type
!= ent
->type
))
152 if (BUG(m
->channel
!= ent
->channel
))
156 return dispatch_send_msg_unchecked(d
, m
);
158 /* Probably it isn't safe to free m, since type could be wrong. */
163 * Send a message on the appropriate queue, notifying that queue if necessary.
165 * This function takes ownership of the message object and its auxiliary data;
166 * it can't be static or stack-allocated, and the caller isn't allowed to use
169 * This function does not check the various fields of the message object for
170 * consistency, and can crash if they are out of range. Only functions that
171 * have already constructed the message in a safe way, or checked it for
172 * correctness themselves, should call this function.
175 dispatch_send_msg_unchecked(dispatch_t
*d
, msg_t
*m
)
177 /* Find the right queue. */
178 dqueue_t
*q
= &d
->queues
[m
->channel
];
179 bool was_empty
= TOR_SIMPLEQ_EMPTY(&q
->queue
);
181 /* Append the message. */
182 TOR_SIMPLEQ_INSERT_TAIL(&q
->queue
, m
, next
);
184 if (debug_logging_enabled()) {
185 char *arg
= dispatch_fmt_msg_data(d
, m
);
187 "Queued: %s (%s) from %s, on %s.",
188 get_message_id_name(m
->msg
),
190 get_subsys_id_name(m
->sender
),
191 get_channel_id_name(m
->channel
));
195 /* If we just made the queue nonempty for the first time, call the alert
198 q
->alert_fn(d
, m
->channel
, q
->alert_fn_arg
);
205 * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
208 dispatcher_run_msg_cbs(const dispatch_t
*d
, msg_t
*m
)
210 tor_assert(m
->msg
<= d
->n_msgs
);
211 dtbl_entry_t
*ent
= d
->table
[m
->msg
];
212 int n_fns
= ent
->n_fns
;
214 if (debug_logging_enabled()) {
215 char *arg
= dispatch_fmt_msg_data(d
, m
);
217 "Delivering: %s (%s) from %s, on %s:",
218 get_message_id_name(m
->msg
),
220 get_subsys_id_name(m
->sender
),
221 get_channel_id_name(m
->channel
));
226 for (i
=0; i
< n_fns
; ++i
) {
227 if (ent
->rcv
[i
].enabled
) {
228 log_debug(LD_MESG
, " Delivering to %s.",
229 get_subsys_id_name(ent
->rcv
[i
].sys
));
236 * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
237 * on the given dispatcher. Return 0 on success or recoverable failure,
238 * -1 on unrecoverable error.
241 dispatch_flush(dispatch_t
*d
, channel_id_t ch
, int max_msgs
)
243 if (BUG(ch
>= d
->n_queues
))
247 dqueue_t
*q
= &d
->queues
[ch
];
249 while (n_flushed
< max_msgs
) {
250 msg_t
*m
= TOR_SIMPLEQ_FIRST(&q
->queue
);
253 TOR_SIMPLEQ_REMOVE_HEAD(&q
->queue
, next
);
254 dispatcher_run_msg_cbs(d
, m
);
255 dispatch_free_msg(d
, m
);