1 /* Copyright (c) 2001, Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
9 * @brief Enforce various requirements on a pubsub_builder.
13 #define PUBSUB_PRIVATE
16 #include "lib/dispatch/dispatch_naming.h"
17 #include "lib/dispatch/msgtypes.h"
18 #include "lib/pubsub/pubsub_flags.h"
19 #include "lib/pubsub/pubsub_builder_st.h"
20 #include "lib/pubsub/pubsub_build.h"
22 #include "lib/container/bitarray.h"
23 #include "lib/container/smartlist.h"
24 #include "lib/log/util_bug.h"
25 #include "lib/malloc/malloc.h"
26 #include "lib/string/compat_string.h"
30 static void pubsub_adjmap_add(pubsub_adjmap_t
*map
,
31 const pubsub_cfg_t
*item
);
34 * Helper: construct and return a new pubsub_adjacency_map from <b>cfg</b>.
35 * Return NULL on error.
37 static pubsub_adjmap_t
*
38 pubsub_build_adjacency_map(const pubsub_items_t
*cfg
)
40 pubsub_adjmap_t
*map
= tor_malloc_zero(sizeof(*map
));
41 const size_t n_subsystems
= get_num_subsys_ids();
42 const size_t n_msgs
= get_num_message_ids();
44 map
->n_subsystems
= n_subsystems
;
47 map
->pub_by_subsys
= tor_calloc(n_subsystems
, sizeof(smartlist_t
*));
48 map
->sub_by_subsys
= tor_calloc(n_subsystems
, sizeof(smartlist_t
*));
49 map
->pub_by_msg
= tor_calloc(n_msgs
, sizeof(smartlist_t
*));
50 map
->sub_by_msg
= tor_calloc(n_msgs
, sizeof(smartlist_t
*));
52 SMARTLIST_FOREACH_BEGIN(cfg
->items
, const pubsub_cfg_t
*, item
) {
53 pubsub_adjmap_add(map
, item
);
54 } SMARTLIST_FOREACH_END(item
);
60 * Helper: add a single pubsub_cfg_t to an adjacency map.
63 pubsub_adjmap_add(pubsub_adjmap_t
*map
,
64 const pubsub_cfg_t
*item
)
66 smartlist_t
**by_subsys
;
69 tor_assert(item
->subsys
< map
->n_subsystems
);
70 tor_assert(item
->msg
< map
->n_msgs
);
72 if (item
->is_publish
) {
73 by_subsys
= &map
->pub_by_subsys
[item
->subsys
];
74 by_msg
= &map
->pub_by_msg
[item
->msg
];
76 by_subsys
= &map
->sub_by_subsys
[item
->subsys
];
77 by_msg
= &map
->sub_by_msg
[item
->msg
];
81 *by_subsys
= smartlist_new();
83 *by_msg
= smartlist_new();
84 smartlist_add(*by_subsys
, (void*) item
);
85 smartlist_add(*by_msg
, (void *) item
);
89 * Release all storage held by m and set m to NULL.
91 #define pubsub_adjmap_free(m) \
92 FREE_AND_NULL(pubsub_adjmap_t, pubsub_adjmap_free_, m)
95 * Free every element of an <b>n</b>-element array of smartlists, then
96 * free the array itself.
99 pubsub_adjmap_free_helper(smartlist_t
**lsts
, size_t n
)
104 for (unsigned i
= 0; i
< n
; ++i
) {
105 smartlist_free(lsts
[i
]);
111 * Release all storage held by <b>map</b>.
114 pubsub_adjmap_free_(pubsub_adjmap_t
*map
)
118 pubsub_adjmap_free_helper(map
->pub_by_subsys
, map
->n_subsystems
);
119 pubsub_adjmap_free_helper(map
->sub_by_subsys
, map
->n_subsystems
);
120 pubsub_adjmap_free_helper(map
->pub_by_msg
, map
->n_msgs
);
121 pubsub_adjmap_free_helper(map
->sub_by_msg
, map
->n_msgs
);
126 * Helper: return the length of <b>sl</b>, or 0 if sl is NULL.
129 smartlist_len_opt(const smartlist_t
*sl
)
132 return smartlist_len(sl
);
137 /** Return a pointer to a statically allocated string encoding the
138 * dispatcher flags in <b>flags</b>. */
140 format_flags(unsigned flags
)
144 if (flags
& DISP_FLAG_EXCL
) {
145 strlcat(buf
, " EXCL", sizeof(buf
));
147 if (flags
& DISP_FLAG_STUB
) {
148 strlcat(buf
, " STUB", sizeof(buf
));
150 return buf
[0] ? buf
+1 : buf
;
154 * Log a message containing a description of <b>cfg</b> at severity, prefixed
155 * by the string <b>prefix</b>.
158 pubsub_cfg_dump(const pubsub_cfg_t
*cfg
, int severity
, const char *prefix
)
162 tor_log(severity
, LD_MESG
,
163 "%s%s %s: %s{%s} on %s (%s) <%u %u %u %u %x> [%s:%d]",
165 get_subsys_id_name(cfg
->subsys
),
166 cfg
->is_publish
? "PUB" : "SUB",
167 get_message_id_name(cfg
->msg
),
168 get_msg_type_id_name(cfg
->type
),
169 get_channel_id_name(cfg
->channel
),
170 format_flags(cfg
->flags
),
171 cfg
->subsys
, cfg
->msg
, cfg
->type
, cfg
->channel
, cfg
->flags
,
172 cfg
->added_by_file
, cfg
->added_by_line
);
176 * Helper: fill a bitarray <b>out</b> with entries corresponding to the
177 * subsystems listed in <b>items</b>.
180 get_message_bitarray(const pubsub_adjmap_t
*map
,
181 const smartlist_t
*items
,
184 *out
= bitarray_init_zero((unsigned)map
->n_subsystems
);
188 SMARTLIST_FOREACH_BEGIN(items
, const pubsub_cfg_t
*, cfg
) {
189 bitarray_set(*out
, cfg
->subsys
);
190 } SMARTLIST_FOREACH_END(cfg
);
194 * Helper for lint_message: check that all the pubsub_cfg_t items in the two
195 * respective smartlists obey our local graph topology rules.
197 * (Right now this is just a matter of "each subsystem only
198 * publishes/subscribes once; no subsystem is a publisher and subscriber for
199 * the same message.")
201 * Return 0 on success, -1 on failure.
204 lint_message_graph(const pubsub_adjmap_t
*map
,
206 const smartlist_t
*pub
,
207 const smartlist_t
*sub
)
209 bitarray_t
*published_by
= NULL
;
210 bitarray_t
*subscribed_by
= NULL
;
213 get_message_bitarray(map
, pub
, &published_by
);
214 get_message_bitarray(map
, sub
, &subscribed_by
);
216 /* Check whether any subsystem is publishing and subscribing the same
219 for (unsigned i
= 0; i
< map
->n_subsystems
; ++i
) {
220 if (bitarray_is_set(published_by
, i
) &&
221 bitarray_is_set(subscribed_by
, i
)) {
222 log_warn(LD_MESG
|LD_BUG
,
223 "Message \"%s\" is published and subscribed by the same "
225 get_message_id_name(msg
),
226 get_subsys_id_name(i
));
231 bitarray_free(published_by
);
232 bitarray_free(subscribed_by
);
238 * Helper for lint_message: check that all the pubsub_cfg_t items in the two
239 * respective smartlists have compatible flags, channels, and types.
242 lint_message_consistency(message_id_t msg
,
243 const smartlist_t
*pub
,
244 const smartlist_t
*sub
)
246 if (!smartlist_len_opt(pub
) && !smartlist_len_opt(sub
))
247 return 0; // LCOV_EXCL_LINE -- this was already checked.
249 /* The 'all' list has the publishers and the subscribers. */
250 smartlist_t
*all
= smartlist_new();
252 smartlist_add_all(all
, pub
);
254 smartlist_add_all(all
, sub
);
256 const pubsub_cfg_t
*item0
= smartlist_get(all
, 0);
258 /* Indicates which subsystems we've found publishing/subscribing here. */
259 bool pub_excl
= false, sub_excl
= false, chan_same
= true, type_same
= true;
261 /* Simple message consistency properties across messages.
263 SMARTLIST_FOREACH_BEGIN(all
, const pubsub_cfg_t
*, cfg
) {
264 chan_same
&= (cfg
->channel
== item0
->channel
);
265 type_same
&= (cfg
->type
== item0
->type
);
267 pub_excl
|= (cfg
->flags
& DISP_FLAG_EXCL
) != 0;
269 sub_excl
|= (cfg
->flags
& DISP_FLAG_EXCL
) != 0;
270 } SMARTLIST_FOREACH_END(cfg
);
275 log_warn(LD_MESG
|LD_BUG
,
276 "Message \"%s\" is associated with multiple inconsistent "
278 get_message_id_name(msg
));
282 log_warn(LD_MESG
|LD_BUG
,
283 "Message \"%s\" is associated with multiple inconsistent "
285 get_message_id_name(msg
));
289 /* Enforce exclusive-ness for publishers and subscribers that have asked for
292 if (pub_excl
&& smartlist_len_opt(pub
) > 1) {
293 log_warn(LD_MESG
|LD_BUG
,
294 "Message \"%s\" has multiple publishers, but at least one is "
295 "marked as exclusive.",
296 get_message_id_name(msg
));
299 if (sub_excl
&& smartlist_len_opt(sub
) > 1) {
300 log_warn(LD_MESG
|LD_BUG
,
301 "Message \"%s\" has multiple subscribers, but at least one is "
302 "marked as exclusive.",
303 get_message_id_name(msg
));
313 * Check whether there are any errors or inconsistencies for the message
314 * described by <b>msg</b> in <b>map</b>. If there are problems, log about
315 * them, and return -1. Otherwise return 0.
318 lint_message(const pubsub_adjmap_t
*map
, message_id_t msg
)
320 /* NOTE: Some of the checks in this function are maybe over-zealous, and we
321 * might not want to have them forever. I've marked them with [?] below.
323 if (BUG(msg
>= map
->n_msgs
))
324 return 0; // LCOV_EXCL_LINE
326 const smartlist_t
*pub
= map
->pub_by_msg
[msg
];
327 const smartlist_t
*sub
= map
->sub_by_msg
[msg
];
329 const size_t n_pub
= smartlist_len_opt(pub
);
330 const size_t n_sub
= smartlist_len_opt(sub
);
332 if (n_pub
== 0 && n_sub
== 0) {
333 log_info(LD_MESG
, "Nobody is publishing or subscribing to message "
335 get_message_id_name(msg
));
336 return 0; // No publishers or subscribers: nothing to do.
338 /* We'll set this to false if there are any problems. */
341 /* First make sure that if there are publishers, there are subscribers. */
343 log_warn(LD_MESG
|LD_BUG
,
344 "Message \"%s\" has subscribers, but no publishers.",
345 get_message_id_name(msg
));
347 } else if (n_sub
== 0) {
348 log_warn(LD_MESG
|LD_BUG
,
349 "Message \"%s\" has publishers, but no subscribers.",
350 get_message_id_name(msg
));
354 /* Check the message graph topology. */
355 if (lint_message_graph(map
, msg
, pub
, sub
) < 0)
358 /* Check whether the messages have the same fields set on them. */
359 if (lint_message_consistency(msg
, pub
, sub
) < 0)
363 /* There was a problem -- let's log all the publishers and subscribers on
366 SMARTLIST_FOREACH(pub
, pubsub_cfg_t
*, cfg
,
367 pubsub_cfg_dump(cfg
, LOG_WARN
, " "));
370 SMARTLIST_FOREACH(sub
, pubsub_cfg_t
*, cfg
,
371 pubsub_cfg_dump(cfg
, LOG_WARN
, " "));
379 * Check all the messages in <b>map</b> for consistency. Return 0 on success,
383 pubsub_adjmap_check(const pubsub_adjmap_t
*map
)
386 for (unsigned i
= 0; i
< map
->n_msgs
; ++i
) {
387 if (lint_message(map
, i
) < 0) {
391 return all_ok
? 0 : -1;
395 * Check builder for consistency and various constraints. Return 0 on success,
399 pubsub_builder_check(pubsub_builder_t
*builder
)
401 pubsub_adjmap_t
*map
= pubsub_build_adjacency_map(builder
->items
);
405 goto err
; // should be impossible
407 if (pubsub_adjmap_check(map
) < 0)
412 pubsub_adjmap_free(map
);