Merge branch 'fix-changelogs' into 'main'
[tor.git] / src / lib / dispatch / dispatch_core.c
blob687ba5b73c072560935a46bd8e7189aa6ddf385a
1 /* Copyright (c) 2001, Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
7 /**
8 * \file dispatch_core.c
9 * \brief Core module for sending and receiving messages.
12 #define DISPATCH_PRIVATE
13 #include "orconfig.h"
15 #include "lib/dispatch/dispatch.h"
16 #include "lib/dispatch/dispatch_st.h"
17 #include "lib/dispatch/dispatch_naming.h"
19 #include "lib/malloc/malloc.h"
20 #include "lib/log/util_bug.h"
22 #include <string.h>
24 /**
25 * Use <b>d</b> to drop all storage held for <b>msg</b>.
27 * (We need the dispatcher so we know how to free the auxiliary data.)
28 **/
29 void
30 dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
32 if (!msg)
33 return;
35 d->typefns[msg->type].free_fn(msg->aux_data__);
36 tor_free(msg);
39 /**
40 * Format the auxiliary data held by msg.
41 **/
42 char *
43 dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
45 if (!msg)
46 return NULL;
48 return d->typefns[msg->type].fmt_fn(msg->aux_data__);
51 /**
52 * Release all storage held by <b>d</b>.
53 **/
54 void
55 dispatch_free_(dispatch_t *d)
57 if (d == NULL)
58 return;
60 size_t n_queues = d->n_queues;
61 for (size_t i = 0; i < n_queues; ++i) {
62 msg_t *m, *mtmp;
63 TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
64 dispatch_free_msg(d, m);
68 size_t n_msgs = d->n_msgs;
70 for (size_t i = 0; i < n_msgs; ++i) {
71 tor_free(d->table[i]);
73 tor_free(d->table);
74 tor_free(d->typefns);
75 tor_free(d->queues);
77 // This is the only time we will treat d->cfg as non-const.
78 //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
80 tor_free(d);
83 /**
84 * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
85 * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
86 **/
87 int
88 dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
89 dispatch_alertfn_t fn, void *userdata)
91 if (BUG(chan >= d->n_queues))
92 return -1;
94 dqueue_t *q = &d->queues[chan];
95 q->alert_fn = fn;
96 q->alert_fn_arg = userdata;
97 return 0;
101 * Send a message on the appropriate channel notifying that channel if
102 * necessary.
104 * This function takes ownership of the auxiliary data; it can't be static or
105 * stack-allocated, and the caller is not allowed to use it afterwards.
107 * This function does not check the various vields of the message object for
108 * consistency.
111 dispatch_send(dispatch_t *d,
112 subsys_id_t sender,
113 channel_id_t channel,
114 message_id_t msg,
115 msg_type_id_t type,
116 msg_aux_data_t auxdata)
118 if (!d->table[msg]) {
119 /* Fast path: nobody wants this data. */
121 d->typefns[type].free_fn(auxdata);
122 return 0;
125 msg_t *m = tor_malloc(sizeof(msg_t));
127 m->sender = sender;
128 m->channel = channel;
129 m->msg = msg;
130 m->type = type;
131 memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
133 return dispatch_send_msg(d, m);
137 dispatch_send_msg(dispatch_t *d, msg_t *m)
139 if (BUG(!d))
140 goto err;
141 if (BUG(!m))
142 goto err;
143 if (BUG(m->channel >= d->n_queues))
144 goto err;
145 if (BUG(m->msg >= d->n_msgs))
146 goto err;
148 dtbl_entry_t *ent = d->table[m->msg];
149 if (ent) {
150 if (BUG(m->type != ent->type))
151 goto err;
152 if (BUG(m->channel != ent->channel))
153 goto err;
156 return dispatch_send_msg_unchecked(d, m);
157 err:
158 /* Probably it isn't safe to free m, since type could be wrong. */
159 return -1;
163 * Send a message on the appropriate queue, notifying that queue if necessary.
165 * This function takes ownership of the message object and its auxiliary data;
166 * it can't be static or stack-allocated, and the caller isn't allowed to use
167 * it afterwards.
169 * This function does not check the various fields of the message object for
170 * consistency, and can crash if they are out of range. Only functions that
171 * have already constructed the message in a safe way, or checked it for
172 * correctness themselves, should call this function.
175 dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
177 /* Find the right queue. */
178 dqueue_t *q = &d->queues[m->channel];
179 bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
181 /* Append the message. */
182 TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
184 if (debug_logging_enabled()) {
185 char *arg = dispatch_fmt_msg_data(d, m);
186 log_debug(LD_MESG,
187 "Queued: %s (%s) from %s, on %s.",
188 get_message_id_name(m->msg),
189 arg,
190 get_subsys_id_name(m->sender),
191 get_channel_id_name(m->channel));
192 tor_free(arg);
195 /* If we just made the queue nonempty for the first time, call the alert
196 * function. */
197 if (was_empty) {
198 q->alert_fn(d, m->channel, q->alert_fn_arg);
201 return 0;
205 * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
207 static void
208 dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
210 tor_assert(m->msg <= d->n_msgs);
211 dtbl_entry_t *ent = d->table[m->msg];
212 int n_fns = ent->n_fns;
214 if (debug_logging_enabled()) {
215 char *arg = dispatch_fmt_msg_data(d, m);
216 log_debug(LD_MESG,
217 "Delivering: %s (%s) from %s, on %s:",
218 get_message_id_name(m->msg),
219 arg,
220 get_subsys_id_name(m->sender),
221 get_channel_id_name(m->channel));
222 tor_free(arg);
225 int i;
226 for (i=0; i < n_fns; ++i) {
227 if (ent->rcv[i].enabled) {
228 log_debug(LD_MESG, " Delivering to %s.",
229 get_subsys_id_name(ent->rcv[i].sys));
230 ent->rcv[i].fn(m);
236 * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
237 * on the given dispatcher. Return 0 on success or recoverable failure,
238 * -1 on unrecoverable error.
241 dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
243 if (BUG(ch >= d->n_queues))
244 return 0;
246 int n_flushed = 0;
247 dqueue_t *q = &d->queues[ch];
249 while (n_flushed < max_msgs) {
250 msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
251 if (!m)
252 break;
253 TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
254 dispatcher_run_msg_cbs(d, m);
255 dispatch_free_msg(d, m);
256 ++n_flushed;
259 return 0;