2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an
36 * involved explanation of the protocol.
39 #include "dmsg_local.h"
42 * Maximum spanning tree distance. This has the practical effect of
43 * stopping tail-chasing closed loops when a feeder span is lost.
45 #define DMSG_SPAN_MAXDIST 16
48 * RED-BLACK TREE DEFINITIONS
52 * (1) shared fsid's (a cluster).
53 * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
55 * We need to aggegate all active LNK_SPANs, aggregate, and create our own
56 * outgoing LNK_SPAN transactions on each of our connections representing
57 * the aggregated state.
59 * h2span_conn - list of iocom connections who wish to receive SPAN
60 * propagation from other connections. Might contain
61 * a filter string. Only iocom's with an open
62 * LNK_CONN transactions are applicable for SPAN
65 * h2span_relay - List of links relayed (via SPAN). Essentially
66 * each relay structure represents a LNK_SPAN
67 * transaction that we initiated, verses h2span_link
68 * which is a LNK_SPAN transaction that we received.
72 * h2span_cluster - Organizes the shared fsid's. One structure for
75 * h2span_node - Organizes the nodes in a cluster. One structure
76 * for each unique {cluster,node}, aka {fsid, pfs_fsid}.
78 * h2span_link - Organizes all incoming and outgoing LNK_SPAN message
79 * transactions related to a node.
81 * One h2span_link structure for each incoming LNK_SPAN
82 * transaction. Links selected for propagation back
83 * out are also where the outgoing LNK_SPAN messages
84 * are indexed into (so we can propagate changes).
86 * The h2span_link's use a red-black tree to sort the
87 * distance hop metric for the incoming LNK_SPAN. We
88 * then select the top N for outgoing. When the
89 * topology changes the top N may also change and cause
90 * new outgoing LNK_SPAN transactions to be opened
91 * and less desireable ones to be closed, causing
92 * transactional aborts within the message flow in
95 * Also note - All outgoing LNK_SPAN message transactions are also
96 * entered into a red-black tree for use by the routing
97 * function. This is handled by msg.c in the state
103 TAILQ_HEAD(h2span_conn_queue
, h2span_conn
);
104 TAILQ_HEAD(h2span_relay_queue
, h2span_relay
);
106 RB_HEAD(h2span_cluster_tree
, h2span_cluster
);
107 RB_HEAD(h2span_node_tree
, h2span_node
);
108 RB_HEAD(h2span_link_tree
, h2span_link
);
109 RB_HEAD(h2span_relay_tree
, h2span_relay
);
113 * Received LNK_CONN transaction enables SPAN protocol over connection.
114 * (may contain filter). Typically one for each mount and several may
115 * share the same media.
118 TAILQ_ENTRY(h2span_conn
) entry
;
119 struct h2span_relay_tree tree
;
124 * All received LNK_SPANs are organized by cluster (pfs_clid),
125 * node (pfs_fsid), and link (received LNK_SPAN transaction).
127 struct h2span_cluster
{
128 RB_ENTRY(h2span_cluster
) rbnode
;
129 struct h2span_node_tree tree
;
130 uuid_t pfs_clid
; /* shared fsid */
132 char cl_label
[128]; /* cluster label (typ PEER_BLOCK) */
133 int refs
; /* prevents destruction */
137 RB_ENTRY(h2span_node
) rbnode
;
138 struct h2span_link_tree tree
;
139 struct h2span_cluster
*cls
;
141 uuid_t pfs_fsid
; /* unique fsid */
142 char fs_label
[128]; /* fs label (typ PEER_HAMMER2) */
147 RB_ENTRY(h2span_link
) rbnode
;
148 dmsg_state_t
*state
; /* state<->link */
149 struct h2span_node
*node
; /* related node */
152 struct h2span_relay_queue relayq
; /* relay out */
156 * Any LNK_SPAN transactions we receive which are relayed out other
157 * connections utilize this structure to track the LNK_SPAN transactions
158 * we initiate (relay out) on other connections. We only relay out
159 * LNK_SPANs on connections we have an open CONN transaction for.
161 * The relay structure points to the outgoing LNK_SPAN trans (out_state)
162 * and to the incoming LNK_SPAN transaction (in_state). The relay
163 * structure holds refs on the related states.
165 * In many respects this is the core of the protocol... actually figuring
166 * out what LNK_SPANs to relay. The spanid used for relaying is the
167 * address of the 'state' structure, which is why h2span_relay has to
168 * be entered into a RB-TREE based at h2span_conn (so we can look
169 * up the spanid to validate it).
171 struct h2span_relay
{
172 TAILQ_ENTRY(h2span_relay
) entry
; /* from link */
173 RB_ENTRY(h2span_relay
) rbnode
; /* from h2span_conn */
174 struct h2span_conn
*conn
; /* related CONN transaction */
175 dmsg_state_t
*source_rt
; /* h2span_link state */
176 dmsg_state_t
*target_rt
; /* h2span_relay state */
179 typedef struct h2span_conn h2span_conn_t
;
180 typedef struct h2span_cluster h2span_cluster_t
;
181 typedef struct h2span_node h2span_node_t
;
182 typedef struct h2span_link h2span_link_t
;
183 typedef struct h2span_relay h2span_relay_t
;
185 #define dmsg_termstr(array) _dmsg_termstr((array), sizeof(array))
187 static h2span_relay_t
*dmsg_generate_relay(h2span_conn_t
*conn
,
188 h2span_link_t
*slink
);
189 static uint32_t dmsg_rnss(void);
193 _dmsg_termstr(char *base
, size_t size
)
199 * Cluster peer_type, uuid, AND label must match for a match
203 h2span_cluster_cmp(h2span_cluster_t
*cls1
, h2span_cluster_t
*cls2
)
207 if (cls1
->peer_type
< cls2
->peer_type
)
209 if (cls1
->peer_type
> cls2
->peer_type
)
211 r
= uuid_compare(&cls1
->pfs_clid
, &cls2
->pfs_clid
, NULL
);
213 r
= strcmp(cls1
->cl_label
, cls2
->cl_label
);
219 * Match against fs_label/pfs_fsid. Together these two items represent a
220 * unique node. In most cases the primary differentiator is pfs_fsid but
221 * we also string-match fs_label.
225 h2span_node_cmp(h2span_node_t
*node1
, h2span_node_t
*node2
)
229 r
= strcmp(node1
->fs_label
, node2
->fs_label
);
231 r
= uuid_compare(&node1
->pfs_fsid
, &node2
->pfs_fsid
, NULL
);
236 * Sort/subsort must match h2span_relay_cmp() under any given node
237 * to make the aggregation algorithm easier, so the best links are
238 * in the same sorted order as the best relays.
240 * NOTE: We cannot use link*->state->msgid because this msgid is created
241 * by each remote host and thus might wind up being the same.
245 h2span_link_cmp(h2span_link_t
*link1
, h2span_link_t
*link2
)
247 if (link1
->dist
< link2
->dist
)
249 if (link1
->dist
> link2
->dist
)
251 if (link1
->rnss
< link2
->rnss
)
253 if (link1
->rnss
> link2
->rnss
)
256 if ((uintptr_t)link1
->state
< (uintptr_t)link2
->state
)
258 if ((uintptr_t)link1
->state
> (uintptr_t)link2
->state
)
261 if (link1
->state
->msgid
< link2
->state
->msgid
)
263 if (link1
->state
->msgid
> link2
->state
->msgid
)
270 * Relay entries are sorted by node, subsorted by distance and link
271 * address (so we can match up the conn->tree relay topology with
272 * a node's link topology).
276 h2span_relay_cmp(h2span_relay_t
*relay1
, h2span_relay_t
*relay2
)
278 h2span_link_t
*link1
= relay1
->source_rt
->any
.link
;
279 h2span_link_t
*link2
= relay2
->source_rt
->any
.link
;
281 if ((intptr_t)link1
->node
< (intptr_t)link2
->node
)
283 if ((intptr_t)link1
->node
> (intptr_t)link2
->node
)
285 if (link1
->dist
< link2
->dist
)
287 if (link1
->dist
> link2
->dist
)
289 if (link1
->rnss
< link2
->rnss
)
291 if (link1
->rnss
> link2
->rnss
)
294 if ((uintptr_t)link1
->state
< (uintptr_t)link2
->state
)
296 if ((uintptr_t)link1
->state
> (uintptr_t)link2
->state
)
299 if (link1
->state
->msgid
< link2
->state
->msgid
)
301 if (link1
->state
->msgid
> link2
->state
->msgid
)
307 RB_PROTOTYPE_STATIC(h2span_cluster_tree
, h2span_cluster
,
308 rbnode
, h2span_cluster_cmp
);
309 RB_PROTOTYPE_STATIC(h2span_node_tree
, h2span_node
,
310 rbnode
, h2span_node_cmp
);
311 RB_PROTOTYPE_STATIC(h2span_link_tree
, h2span_link
,
312 rbnode
, h2span_link_cmp
);
313 RB_PROTOTYPE_STATIC(h2span_relay_tree
, h2span_relay
,
314 rbnode
, h2span_relay_cmp
);
316 RB_GENERATE_STATIC(h2span_cluster_tree
, h2span_cluster
,
317 rbnode
, h2span_cluster_cmp
);
318 RB_GENERATE_STATIC(h2span_node_tree
, h2span_node
,
319 rbnode
, h2span_node_cmp
);
320 RB_GENERATE_STATIC(h2span_link_tree
, h2span_link
,
321 rbnode
, h2span_link_cmp
);
322 RB_GENERATE_STATIC(h2span_relay_tree
, h2span_relay
,
323 rbnode
, h2span_relay_cmp
);
326 * Global mutex protects cluster_tree lookups, connq, mediaq.
328 static pthread_mutex_t cluster_mtx
;
329 static struct h2span_cluster_tree cluster_tree
= RB_INITIALIZER(cluster_tree
);
330 static struct h2span_conn_queue connq
= TAILQ_HEAD_INITIALIZER(connq
);
331 static struct dmsg_media_queue mediaq
= TAILQ_HEAD_INITIALIZER(mediaq
);
333 static void dmsg_lnk_span(dmsg_msg_t
*msg
);
334 static void dmsg_lnk_conn(dmsg_msg_t
*msg
);
335 static void dmsg_lnk_circ(dmsg_msg_t
*msg
);
336 static void dmsg_lnk_relay(dmsg_msg_t
*msg
);
337 static void dmsg_relay_scan(h2span_conn_t
*conn
, h2span_node_t
*node
);
338 static void dmsg_relay_delete(h2span_relay_t
*relay
);
341 dmsg_msg_lnk_signal(dmsg_iocom_t
*iocom __unused
)
343 pthread_mutex_lock(&cluster_mtx
);
344 dmsg_relay_scan(NULL
, NULL
);
345 pthread_mutex_unlock(&cluster_mtx
);
349 * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK.
350 * (incoming iocom lock not held)
352 * This function is typically called for one-way and opening-transactions
353 * since state->func is assigned after that, but it will also be called
354 * if no state->func is assigned on transaction-open.
357 dmsg_msg_lnk(dmsg_msg_t
*msg
)
359 switch(msg
->tcmd
& DMSGF_BASECMDMASK
) {
370 msg
->iocom
->usrmsg_callback(msg
, 1);
371 /* state invalid after reply */
377 * LNK_CONN - iocom identify message reception.
378 * (incoming iocom lock not held)
380 * Remote node identifies itself to us, sets up a SPAN filter, and gives us
381 * the ok to start transmitting SPANs.
384 dmsg_lnk_conn(dmsg_msg_t
*msg
)
386 dmsg_state_t
*state
= msg
->state
;
389 h2span_relay_t
*relay
;
392 pthread_mutex_lock(&cluster_mtx
);
395 "dmsg_lnk_conn: msg %p cmd %08x state %p "
396 "txcmd %08x rxcmd %08x\n",
397 msg
, msg
->any
.head
.cmd
, state
,
398 state
->txcmd
, state
->rxcmd
);
400 switch(msg
->any
.head
.cmd
& DMSGF_TRANSMASK
) {
401 case DMSG_LNK_CONN
| DMSGF_CREATE
:
402 case DMSG_LNK_CONN
| DMSGF_CREATE
| DMSGF_DELETE
:
404 * On transaction start we allocate a new h2span_conn and
405 * acknowledge the request, leaving the transaction open.
406 * We then relay priority-selected SPANs.
408 fprintf(stderr
, "LNK_CONN(%08x): %s/%s/%s\n",
409 (uint32_t)msg
->any
.head
.msgid
,
410 dmsg_uuid_to_str(&msg
->any
.lnk_conn
.pfs_clid
,
412 msg
->any
.lnk_conn
.cl_label
,
413 msg
->any
.lnk_conn
.fs_label
);
416 conn
= dmsg_alloc(sizeof(*conn
));
418 RB_INIT(&conn
->tree
);
419 state
->iocom
->conn
= conn
; /* XXX only one */
421 state
->func
= dmsg_lnk_conn
;
422 state
->any
.conn
= conn
;
423 TAILQ_INSERT_TAIL(&connq
, conn
, entry
);
428 TAILQ_FOREACH(media
, &mediaq
, entry
) {
429 if (uuid_compare(&msg
->any
.lnk_conn
.mediaid
,
430 &media
->mediaid
, NULL
) == 0) {
435 media
= dmsg_alloc(sizeof(*media
));
436 media
->mediaid
= msg
->any
.lnk_conn
.mediaid
;
437 TAILQ_INSERT_TAIL(&mediaq
, media
, entry
);
439 state
->media
= media
;
442 if ((msg
->any
.head
.cmd
& DMSGF_DELETE
) == 0) {
443 msg
->iocom
->usrmsg_callback(msg
, 0);
444 dmsg_msg_result(msg
, 0);
445 dmsg_iocom_signal(msg
->iocom
);
449 case DMSG_LNK_CONN
| DMSGF_DELETE
:
450 case DMSG_LNK_ERROR
| DMSGF_DELETE
:
452 * On transaction terminate we clean out our h2span_conn
453 * and acknowledge the request, closing the transaction.
455 fprintf(stderr
, "LNK_CONN: Terminated\n");
456 conn
= state
->any
.conn
;
462 * Callback will clean out media config / user-opaque state
464 media
= state
->media
;
466 if (media
->refs
== 0) {
467 fprintf(stderr
, "Media shutdown\n");
468 TAILQ_REMOVE(&mediaq
, media
, entry
);
469 pthread_mutex_unlock(&cluster_mtx
);
470 msg
->iocom
->usrmsg_callback(msg
, 0);
471 pthread_mutex_lock(&cluster_mtx
);
477 * Clean out all relays. This requires terminating each
480 while ((relay
= RB_ROOT(&conn
->tree
)) != NULL
) {
481 dmsg_relay_delete(relay
);
488 msg
->state
->any
.conn
= NULL
;
489 msg
->state
->iocom
->conn
= NULL
;
490 TAILQ_REMOVE(&connq
, conn
, entry
);
493 dmsg_msg_reply(msg
, 0);
494 /* state invalid after reply */
497 msg
->iocom
->usrmsg_callback(msg
, 1);
499 if (msg
->any
.head
.cmd
& DMSGF_DELETE
)
501 dmsg_msg_reply(msg
, DMSG_ERR_NOSUPP
);
505 pthread_mutex_unlock(&cluster_mtx
);
509 * LNK_SPAN - Spanning tree protocol message reception
510 * (incoming iocom lock not held)
512 * Receive a spanning tree transactional message, creating or destroying
513 * a SPAN and propagating it to other iocoms.
516 dmsg_lnk_span(dmsg_msg_t
*msg
)
518 dmsg_state_t
*state
= msg
->state
;
519 h2span_cluster_t dummy_cls
;
520 h2span_node_t dummy_node
;
521 h2span_cluster_t
*cls
;
523 h2span_link_t
*slink
;
524 h2span_relay_t
*relay
;
527 assert((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0);
529 pthread_mutex_lock(&cluster_mtx
);
532 * On transaction start we initialize the tracking infrastructure
534 if (msg
->any
.head
.cmd
& DMSGF_CREATE
) {
535 assert(state
->func
== NULL
);
536 state
->func
= dmsg_lnk_span
;
538 dmsg_termstr(msg
->any
.lnk_span
.cl_label
);
539 dmsg_termstr(msg
->any
.lnk_span
.fs_label
);
544 dummy_cls
.pfs_clid
= msg
->any
.lnk_span
.pfs_clid
;
545 dummy_cls
.peer_type
= msg
->any
.lnk_span
.peer_type
;
546 bcopy(msg
->any
.lnk_span
.cl_label
,
548 sizeof(dummy_cls
.cl_label
));
549 cls
= RB_FIND(h2span_cluster_tree
, &cluster_tree
, &dummy_cls
);
551 cls
= dmsg_alloc(sizeof(*cls
));
552 cls
->pfs_clid
= msg
->any
.lnk_span
.pfs_clid
;
553 cls
->peer_type
= msg
->any
.lnk_span
.peer_type
;
554 bcopy(msg
->any
.lnk_span
.cl_label
,
556 sizeof(cls
->cl_label
));
558 RB_INSERT(h2span_cluster_tree
, &cluster_tree
, cls
);
564 dummy_node
.pfs_fsid
= msg
->any
.lnk_span
.pfs_fsid
;
565 bcopy(msg
->any
.lnk_span
.fs_label
, dummy_node
.fs_label
,
566 sizeof(dummy_node
.fs_label
));
567 node
= RB_FIND(h2span_node_tree
, &cls
->tree
, &dummy_node
);
569 node
= dmsg_alloc(sizeof(*node
));
570 node
->pfs_fsid
= msg
->any
.lnk_span
.pfs_fsid
;
571 node
->pfs_type
= msg
->any
.lnk_span
.pfs_type
;
572 bcopy(msg
->any
.lnk_span
.fs_label
,
574 sizeof(node
->fs_label
));
576 RB_INIT(&node
->tree
);
577 RB_INSERT(h2span_node_tree
, &cls
->tree
, node
);
578 if (msg
->iocom
->node_handler
) {
579 msg
->iocom
->node_handler(&node
->opaque
, msg
,
587 assert(state
->any
.link
== NULL
);
588 slink
= dmsg_alloc(sizeof(*slink
));
589 TAILQ_INIT(&slink
->relayq
);
591 slink
->dist
= msg
->any
.lnk_span
.dist
;
592 slink
->rnss
= msg
->any
.lnk_span
.rnss
;
593 slink
->state
= state
;
594 state
->any
.link
= slink
;
596 RB_INSERT(h2span_link_tree
, &node
->tree
, slink
);
599 "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
602 dmsg_uuid_to_str(&msg
->any
.lnk_span
.pfs_clid
, &alloc
),
603 msg
->any
.lnk_span
.cl_label
,
604 msg
->any
.lnk_span
.fs_label
,
605 msg
->any
.lnk_span
.dist
);
608 dmsg_relay_scan(NULL
, node
);
610 dmsg_iocom_signal(msg
->iocom
);
614 * On transaction terminate we remove the tracking infrastructure.
616 if (msg
->any
.head
.cmd
& DMSGF_DELETE
) {
617 slink
= state
->any
.link
;
618 assert(slink
!= NULL
);
622 fprintf(stderr
, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
625 dmsg_uuid_to_str(&cls
->pfs_clid
, &alloc
),
626 state
->msg
->any
.lnk_span
.cl_label
,
627 state
->msg
->any
.lnk_span
.fs_label
,
628 state
->msg
->any
.lnk_span
.dist
);
632 * Clean out all relays. This requires terminating each
635 while ((relay
= TAILQ_FIRST(&slink
->relayq
)) != NULL
) {
636 dmsg_relay_delete(relay
);
640 * Clean out the topology
642 RB_REMOVE(h2span_link_tree
, &node
->tree
, slink
);
643 if (RB_EMPTY(&node
->tree
)) {
644 RB_REMOVE(h2span_node_tree
, &cls
->tree
, node
);
645 if (msg
->iocom
->node_handler
) {
646 msg
->iocom
->node_handler(&node
->opaque
, msg
,
649 if (RB_EMPTY(&cls
->tree
) && cls
->refs
== 0) {
650 RB_REMOVE(h2span_cluster_tree
,
658 state
->any
.link
= NULL
;
664 * We have to terminate the transaction
666 dmsg_state_reply(state
, 0);
667 /* state invalid after reply */
670 * If the node still exists issue any required updates. If
671 * it doesn't then all related relays have already been
672 * removed and there's nothing left to do.
676 dmsg_relay_scan(NULL
, node
);
679 dmsg_iocom_signal(msg
->iocom
);
682 pthread_mutex_unlock(&cluster_mtx
);
686 * LNK_CIRC - Virtual circuit protocol message reception
687 * (incoming iocom lock not held)
692 dmsg_lnk_circ(dmsg_msg_t
*msg
)
694 dmsg_circuit_t
*circA
;
695 dmsg_circuit_t
*circB
;
696 dmsg_state_t
*rx_state
;
697 dmsg_state_t
*tx_state
;
701 dmsg_iocom_t
*iocomA
;
702 dmsg_iocom_t
*iocomB
;
705 /*pthread_mutex_lock(&cluster_mtx);*/
707 if (DMsgDebugOpt
>= 4)
708 fprintf(stderr
, "CIRC receive cmd=%08x\n", msg
->any
.head
.cmd
);
710 switch (msg
->any
.head
.cmd
& (DMSGF_CREATE
|
714 case DMSGF_CREATE
| DMSGF_DELETE
:
716 * (A) wishes to establish a virtual circuit through us to (B).
717 * (B) is specified by lnk_circ.target (the message id for
718 * a LNK_SPAN that (A) received from us which represents (B)).
720 * Designate the originator of the circuit (the current
721 * remote end) as (A) and the other side as (B).
723 * Accept the VC but do not reply. We will wait for the end-
724 * to-end reply to propagate back.
729 * Locate the open transaction state that the other end
730 * specified in <target>. This will be an open SPAN
731 * transaction that we transmitted (h2span_relay) over
732 * the interface the LNK_CIRC is being received on.
734 * (all LNK_CIRC's that we transmit are on circuit0)
736 pthread_mutex_lock(&iocomA
->mtx
);
737 dummy
.msgid
= msg
->any
.lnk_circ
.target
;
738 tx_state
= RB_FIND(dmsg_state_tree
,
739 &iocomA
->circuit0
.statewr_tree
,
741 pthread_mutex_unlock(&iocomA
->mtx
);
742 if (tx_state
== NULL
) {
744 fprintf(stderr
, "dmsg_lnk_circ: no circuit\n");
745 dmsg_msg_reply(msg
, DMSG_ERR_CANTCIRC
);
748 if (tx_state
->icmd
!= DMSG_LNK_SPAN
) {
750 fprintf(stderr
, "dmsg_lnk_circ: not LNK_SPAN\n");
751 dmsg_msg_reply(msg
, DMSG_ERR_CANTCIRC
);
755 /* locate h2span_link */
756 rx_state
= tx_state
->any
.relay
->source_rt
;
759 * A wishes to establish a VC through us to the
762 * A sends us the msgid of an open SPAN transaction
763 * it received from us as <target>.
765 circA
= dmsg_alloc(sizeof(*circA
));
766 dmsg_circuit_init(iocomA
, circA
);
767 circA
->state
= msg
->state
; /* LNK_CIRC state */
768 circA
->msgid
= msg
->state
->msgid
;
769 circA
->span_state
= tx_state
; /* H2SPAN_RELAY state */
771 circA
->refs
= 2; /* state and peer */
774 * Upgrade received state so we act on both it and its
775 * peer (created below) symmetrically.
777 msg
->state
->any
.circ
= circA
;
778 msg
->state
->func
= dmsg_lnk_circ
;
780 iocomB
= rx_state
->iocom
;
782 circB
= dmsg_alloc(sizeof(*circB
));
783 dmsg_circuit_init(iocomB
, circB
);
786 * Create a LNK_CIRC transaction on B
788 fwd_msg
= dmsg_msg_alloc(&iocomB
->circuit0
,
789 0, DMSG_LNK_CIRC
| DMSGF_CREATE
,
790 dmsg_lnk_circ
, circB
);
791 fwd_msg
->state
->any
.circ
= circB
;
792 fwd_msg
->any
.lnk_circ
.target
= rx_state
->msgid
;
793 circB
->state
= fwd_msg
->state
; /* LNK_CIRC state */
794 circB
->msgid
= fwd_msg
->any
.head
.msgid
;
795 circB
->span_state
= rx_state
; /* H2SPAN_LINK state */
797 circB
->refs
= 2; /* state and peer */
799 if (DMsgDebugOpt
>= 4)
800 fprintf(stderr
, "CIRC forward %p->%p\n", circA
, circB
);
803 * Link the two circuits together.
808 if (iocomA
< iocomB
) {
809 pthread_mutex_lock(&iocomA
->mtx
);
810 pthread_mutex_lock(&iocomB
->mtx
);
812 pthread_mutex_lock(&iocomB
->mtx
);
813 pthread_mutex_lock(&iocomA
->mtx
);
815 if (RB_INSERT(dmsg_circuit_tree
, &iocomA
->circuit_tree
, circA
))
817 if (RB_INSERT(dmsg_circuit_tree
, &iocomB
->circuit_tree
, circB
))
819 if (iocomA
< iocomB
) {
820 pthread_mutex_unlock(&iocomB
->mtx
);
821 pthread_mutex_unlock(&iocomA
->mtx
);
823 pthread_mutex_unlock(&iocomA
->mtx
);
824 pthread_mutex_unlock(&iocomB
->mtx
);
827 dmsg_msg_write(fwd_msg
);
829 if ((msg
->any
.head
.cmd
& DMSGF_DELETE
) == 0)
831 /* FALL THROUGH TO DELETE */
834 * (A) Is deleting the virtual circuit, propogate closure
838 if (msg
->state
->any
.circ
== NULL
) {
839 /* already returned an error/deleted */
842 circA
= msg
->state
->any
.circ
;
844 assert(msg
->state
== circA
->state
);
847 * We are closing B's send side. If B's receive side is
848 * already closed we disconnect the circuit from B's state.
851 if (circB
&& (state
= circB
->state
) != NULL
) {
852 if (state
->rxcmd
& DMSGF_DELETE
) {
855 state
->any
.circ
= NULL
;
856 dmsg_circuit_drop(circB
);
858 dmsg_state_reply(state
, msg
->any
.head
.error
);
862 * We received a close on A. If A's send side is already
863 * closed we disconnect the circuit from A's state.
865 if (circA
&& (state
= circA
->state
) != NULL
) {
866 if (state
->txcmd
& DMSGF_DELETE
) {
869 state
->any
.circ
= NULL
;
870 dmsg_circuit_drop(circA
);
875 * Disconnect the peer<->peer association
881 dmsg_circuit_drop(circA
);
882 dmsg_circuit_drop(circB
); /* XXX SMP */
886 case DMSGF_REPLY
| DMSGF_CREATE
:
887 case DMSGF_REPLY
| DMSGF_CREATE
| DMSGF_DELETE
:
889 * (B) is acknowledging the creation of the virtual
890 * circuit. This propagates all the way back to (A), though
891 * it should be noted that (A) can start issuing commands
892 * via the virtual circuit before seeing this reply.
894 circB
= msg
->state
->any
.circ
;
897 assert(msg
->state
== circB
->state
);
899 if ((msg
->any
.head
.cmd
& DMSGF_DELETE
) == 0) {
900 dmsg_state_result(circA
->state
, msg
->any
.head
.error
);
903 /* FALL THROUGH TO DELETE */
904 case DMSGF_REPLY
| DMSGF_DELETE
:
906 * (B) Is deleting the virtual circuit or acknowledging
907 * our deletion of the virtual circuit, propogate closure
911 circB
= msg
->state
->any
.circ
;
913 assert(msg
->state
== circB
->state
);
916 * We received a close on (B), propagate to (A). If we have
917 * already received the close from (A) we disconnect the state.
920 if (circA
&& (state
= circA
->state
) != NULL
) {
921 if (state
->rxcmd
& DMSGF_DELETE
) {
924 state
->any
.circ
= NULL
;
925 dmsg_circuit_drop(circA
);
927 dmsg_state_reply(state
, msg
->any
.head
.error
);
931 * We received a close on (B). If (B)'s send side is already
932 * closed we disconnect the state.
934 if (circB
&& (state
= circB
->state
) != NULL
) {
935 if (state
->txcmd
& DMSGF_DELETE
) {
938 state
->any
.circ
= NULL
;
939 dmsg_circuit_drop(circB
);
944 * Disconnect the peer<->peer association
950 dmsg_circuit_drop(circB
);
951 dmsg_circuit_drop(circA
); /* XXX SMP */
957 /*pthread_mutex_lock(&cluster_mtx);*/
961 * Update relay transactions for SPANs.
963 * Called with cluster_mtx held.
965 static void dmsg_relay_scan_specific(h2span_node_t
*node
,
966 h2span_conn_t
*conn
);
969 dmsg_relay_scan(h2span_conn_t
*conn
, h2span_node_t
*node
)
971 h2span_cluster_t
*cls
;
975 * Iterate specific node
977 TAILQ_FOREACH(conn
, &connq
, entry
)
978 dmsg_relay_scan_specific(node
, conn
);
983 * Iterate cluster ids, nodes, and either a specific connection
984 * or all connections.
986 RB_FOREACH(cls
, h2span_cluster_tree
, &cluster_tree
) {
990 RB_FOREACH(node
, h2span_node_tree
, &cls
->tree
) {
992 * Synchronize the node's link (received SPANs)
993 * with each connection's relays.
996 dmsg_relay_scan_specific(node
, conn
);
998 TAILQ_FOREACH(conn
, &connq
, entry
) {
999 dmsg_relay_scan_specific(node
,
1002 assert(conn
== NULL
);
1010 * Update the relay'd SPANs for this (node, conn).
1012 * Iterate links and adjust relays to match. We only propagate the top link
1013 * for now (XXX we want to propagate the top two).
1015 * The dmsg_relay_scan_cmp() function locates the first relay element
1016 * for any given node. The relay elements will be sub-sorted by dist.
1018 struct relay_scan_info
{
1019 h2span_node_t
*node
;
1020 h2span_relay_t
*relay
;
1024 dmsg_relay_scan_cmp(h2span_relay_t
*relay
, void *arg
)
1026 struct relay_scan_info
*info
= arg
;
1028 if ((intptr_t)relay
->source_rt
->any
.link
->node
< (intptr_t)info
->node
)
1030 if ((intptr_t)relay
->source_rt
->any
.link
->node
> (intptr_t)info
->node
)
1036 dmsg_relay_scan_callback(h2span_relay_t
*relay
, void *arg
)
1038 struct relay_scan_info
*info
= arg
;
1040 info
->relay
= relay
;
1045 dmsg_relay_scan_specific(h2span_node_t
*node
, h2span_conn_t
*conn
)
1047 struct relay_scan_info info
;
1048 h2span_relay_t
*relay
;
1049 h2span_relay_t
*next_relay
;
1050 h2span_link_t
*slink
;
1051 dmsg_lnk_conn_t
*lconn
;
1052 dmsg_lnk_span_t
*lspan
;
1055 #ifdef REQUIRE_SYMMETRICAL
1056 uint32_t lastdist
= DMSG_SPAN_MAXDIST
;
1057 uint32_t lastrnss
= 0;
1064 * Locate the first related relay for the node on this connection.
1065 * relay will be NULL if there were none.
1067 RB_SCAN(h2span_relay_tree
, &conn
->tree
,
1068 dmsg_relay_scan_cmp
, dmsg_relay_scan_callback
, &info
);
1072 assert(relay
->source_rt
->any
.link
->node
== node
);
1074 if (DMsgDebugOpt
> 8)
1075 fprintf(stderr
, "relay scan for connection %p\n", conn
);
1078 * Iterate the node's links (received SPANs) in distance order,
1079 * lowest (best) dist first.
1081 * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
1083 * Track relays while iterating the best links and construct
1084 * missing relays when necessary.
1086 * (If some prior better link was removed it would have also
1087 * removed the relay, so the relay can only match exactly or
1091 RB_FOREACH(slink
, h2span_link_tree
, &node
->tree
) {
1093 * Increment count of successful relays. This isn't
1094 * quite accurate if we break out but nothing after
1095 * the loop uses (count).
1097 * If count exceeds the maximum number of relays we desire
1098 * we normally want to break out. However, in order to
1099 * guarantee a symmetric path we have to continue if both
1100 * (dist) and (rnss) continue to match. Otherwise the SPAN
1101 * propagation in the reverse direction may choose different
1102 * routes and we will not have a symmetric path.
1104 * NOTE: Spanning tree does not have to be symmetrical so
1105 * this code is not currently enabled.
1107 if (++count
>= maxcount
) {
1108 #ifdef REQUIRE_SYMMETRICAL
1109 if (lastdist
!= slink
->dist
|| lastrnss
!= slink
->rnss
)
1114 /* go beyond the nominal maximum desired relays */
1118 * Match, relay already in-place, get the next
1119 * relay to match against the next slink.
1121 if (relay
&& relay
->source_rt
->any
.link
== slink
) {
1122 relay
= RB_NEXT(h2span_relay_tree
, &conn
->tree
, relay
);
1127 * We might want this SLINK, if it passes our filters.
1129 * The spanning tree can cause closed loops so we have
1130 * to limit slink->dist.
1132 if (slink
->dist
> DMSG_SPAN_MAXDIST
)
1136 * Don't bother transmitting a LNK_SPAN out the same
1137 * connection it came in on. Trivial optimization.
1139 if (slink
->state
->iocom
== conn
->state
->iocom
)
1143 * NOTE ON FILTERS: The protocol spec allows non-requested
1144 * SPANs to be transmitted, the other end is expected to
1145 * leave their transactions open but otherwise ignore them.
1147 * Don't bother transmitting if the remote connection
1148 * is not accepting this SPAN's peer_type.
1150 * pfs_mask is typically used so pure clients can filter
1151 * out receiving SPANs for other pure clients.
1153 lspan
= &slink
->state
->msg
->any
.lnk_span
;
1154 lconn
= &conn
->state
->msg
->any
.lnk_conn
;
1155 if (((1LLU << lspan
->peer_type
) & lconn
->peer_mask
) == 0)
1157 if (((1LLU << lspan
->pfs_type
) & lconn
->pfs_mask
) == 0)
1161 * Do not give pure clients visibility to other pure clients
1163 if (lconn
->pfs_type
== DMSG_PFSTYPE_CLIENT
&&
1164 lspan
->pfs_type
== DMSG_PFSTYPE_CLIENT
) {
1169 * Connection filter, if cluster uuid is not NULL it must
1170 * match the span cluster uuid. Only applies when the
1171 * peer_type matches.
1173 if (lspan
->peer_type
== lconn
->peer_type
&&
1174 !uuid_is_nil(&lconn
->pfs_clid
, NULL
) &&
1175 uuid_compare(&slink
->node
->cls
->pfs_clid
,
1176 &lconn
->pfs_clid
, NULL
)) {
1181 * Connection filter, if cluster label is not empty it must
1182 * match the span cluster label. Only applies when the
1183 * peer_type matches.
1185 if (lspan
->peer_type
== lconn
->peer_type
&&
1186 lconn
->cl_label
[0] &&
1187 strcmp(lconn
->cl_label
, slink
->node
->cls
->cl_label
)) {
1192 * NOTE! pfs_fsid differentiates nodes within the same cluster
1193 * so we obviously don't want to match those. Similarly
1198 * Ok, we've accepted this SPAN for relaying.
1200 assert(relay
== NULL
||
1201 relay
->source_rt
->any
.link
->node
!= slink
->node
||
1202 relay
->source_rt
->any
.link
->dist
>= slink
->dist
);
1203 relay
= dmsg_generate_relay(conn
, slink
);
1204 #ifdef REQUIRE_SYMMETRICAL
1205 lastdist
= slink
->dist
;
1206 lastrnss
= slink
->rnss
;
1210 * Match (created new relay), get the next relay to
1211 * match against the next slink.
1213 relay
= RB_NEXT(h2span_relay_tree
, &conn
->tree
, relay
);
1217 * Any remaining relay's belonging to this connection which match
1218 * the node are in excess of the current aggregate spanning state
1219 * and should be removed.
1221 while (relay
&& relay
->source_rt
->any
.link
->node
== node
) {
1222 next_relay
= RB_NEXT(h2span_relay_tree
, &conn
->tree
, relay
);
1223 fprintf(stderr
, "RELAY DELETE FROM EXTRAS\n");
1224 dmsg_relay_delete(relay
);
1230 * Helper function to generate missing relay.
1232 * cluster_mtx must be held
1236 dmsg_generate_relay(h2span_conn_t
*conn
, h2span_link_t
*slink
)
1238 h2span_relay_t
*relay
;
1241 relay
= dmsg_alloc(sizeof(*relay
));
1243 relay
->source_rt
= slink
->state
;
1244 /* relay->source_rt->any.link = slink; */
1247 * NOTE: relay->target_rt->any.relay set to relay by alloc.
1249 msg
= dmsg_msg_alloc(&conn
->state
->iocom
->circuit0
,
1250 0, DMSG_LNK_SPAN
| DMSGF_CREATE
,
1251 dmsg_lnk_relay
, relay
);
1252 relay
->target_rt
= msg
->state
;
1254 msg
->any
.lnk_span
= slink
->state
->msg
->any
.lnk_span
;
1255 msg
->any
.lnk_span
.dist
= slink
->dist
+ 1;
1256 msg
->any
.lnk_span
.rnss
= slink
->rnss
+ dmsg_rnss();
1258 RB_INSERT(h2span_relay_tree
, &conn
->tree
, relay
);
1259 TAILQ_INSERT_TAIL(&slink
->relayq
, relay
, entry
);
1261 dmsg_msg_write(msg
);
1267 * Messages received on relay SPANs. These are open transactions so it is
1268 * in fact possible for the other end to close the transaction.
1270 * XXX MPRACE on state structure
1273 dmsg_lnk_relay(dmsg_msg_t
*msg
)
1275 dmsg_state_t
*state
= msg
->state
;
1276 h2span_relay_t
*relay
;
1278 assert(msg
->any
.head
.cmd
& DMSGF_REPLY
);
1280 if (msg
->any
.head
.cmd
& DMSGF_DELETE
) {
1281 pthread_mutex_lock(&cluster_mtx
);
1282 fprintf(stderr
, "RELAY DELETE FROM LNK_RELAY MSG\n");
1283 if ((relay
= state
->any
.relay
) != NULL
) {
1284 dmsg_relay_delete(relay
);
1286 dmsg_state_reply(state
, 0);
1288 pthread_mutex_unlock(&cluster_mtx
);
1293 * cluster_mtx held by caller
1297 dmsg_relay_delete(h2span_relay_t
*relay
)
1300 "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
1301 relay
->source_rt
->any
.link
,
1303 relay
->source_rt
->any
.link
->node
->cls
, relay
->source_rt
->any
.link
->node
,
1304 relay
->source_rt
->any
.link
->dist
,
1305 relay
->conn
->state
->iocom
->sock_fd
, relay
->target_rt
);
1307 RB_REMOVE(h2span_relay_tree
, &relay
->conn
->tree
, relay
);
1308 TAILQ_REMOVE(&relay
->source_rt
->any
.link
->relayq
, relay
, entry
);
1310 if (relay
->target_rt
) {
1311 relay
->target_rt
->any
.relay
= NULL
;
1312 dmsg_state_reply(relay
->target_rt
, 0);
1313 /* state invalid after reply */
1314 relay
->target_rt
= NULL
;
1317 relay
->source_rt
= NULL
;
1321 /************************************************************************
1322 * MESSAGE ROUTING AND SOURCE VALIDATION *
1323 ************************************************************************/
1326 dmsg_circuit_route(dmsg_msg_t
*msg
)
1328 dmsg_iocom_t
*iocom
= msg
->iocom
;
1329 dmsg_circuit_t
*circ
;
1330 dmsg_circuit_t
*peer
;
1331 dmsg_circuit_t dummy
;
1335 * Relay occurs before any state processing, msg state should always
1338 assert(msg
->state
== NULL
);
1341 * Lookup the circuit on the incoming iocom.
1343 pthread_mutex_lock(&iocom
->mtx
);
1345 dummy
.msgid
= msg
->any
.head
.circuit
;
1346 circ
= RB_FIND(dmsg_circuit_tree
, &iocom
->circuit_tree
, &dummy
);
1349 dmsg_circuit_hold(peer
);
1351 if (DMsgDebugOpt
>= 4) {
1353 "CIRC relay %08x %p->%p\n",
1354 msg
->any
.head
.cmd
, circ
, peer
);
1357 msg
->iocom
= peer
->iocom
;
1358 msg
->any
.head
.circuit
= peer
->msgid
;
1359 dmsg_circuit_drop_locked(msg
->circuit
);
1360 msg
->circuit
= peer
;
1362 pthread_mutex_unlock(&iocom
->mtx
);
1364 dmsg_msg_write(msg
);
1365 error
= DMSG_IOQ_ERROR_ROUTED
;
1370 /************************************************************************
1371 * ROUTER AND MESSAGING HANDLES *
1372 ************************************************************************
1374 * Basically the idea here is to provide a stable data structure which
1375 * can be localized to the caller for higher level protocols to work with.
1376 * Depends on the context, these dmsg_handle's can be pooled by use-case
1377 * and remain persistent through a client (or mount point's) life.
1382 * Obtain a stable handle on a cluster given its uuid. This ties directly
1383 * into the global cluster topology, creating the structure if necessary
1384 * (even if the uuid does not exist or does not exist yet), and preventing
1385 * the structure from getting ripped out from under us while we hold a
1389 dmsg_cluster_get(uuid_t
*pfs_clid
)
1391 h2span_cluster_t dummy_cls
;
1392 h2span_cluster_t
*cls
;
1394 dummy_cls
.pfs_clid
= *pfs_clid
;
1395 pthread_mutex_lock(&cluster_mtx
);
1396 cls
= RB_FIND(h2span_cluster_tree
, &cluster_tree
, &dummy_cls
);
1399 pthread_mutex_unlock(&cluster_mtx
);
1404 dmsg_cluster_put(h2span_cluster_t
*cls
)
1406 pthread_mutex_lock(&cluster_mtx
);
1407 assert(cls
->refs
> 0);
1409 if (RB_EMPTY(&cls
->tree
) && cls
->refs
== 0) {
1410 RB_REMOVE(h2span_cluster_tree
,
1411 &cluster_tree
, cls
);
1414 pthread_mutex_unlock(&cluster_mtx
);
1418 * Obtain a stable handle to a specific cluster node given its uuid.
1419 * This handle does NOT lock in the route to the node and is typically
1420 * used as part of the dmsg_handle_*() API to obtain a set of
1424 dmsg_node_get(h2span_cluster_t
*cls
, uuid_t
*pfs_fsid
)
1431 * Dumps the spanning tree
1436 dmsg_shell_tree(dmsg_circuit_t
*circuit
, char *cmdbuf __unused
)
1438 h2span_cluster_t
*cls
;
1439 h2span_node_t
*node
;
1440 h2span_link_t
*slink
;
1441 h2span_relay_t
*relay
;
1444 pthread_mutex_lock(&cluster_mtx
);
1445 RB_FOREACH(cls
, h2span_cluster_tree
, &cluster_tree
) {
1446 dmsg_circuit_printf(circuit
, "Cluster %s %s (%s)\n",
1447 dmsg_peer_type_to_str(cls
->peer_type
),
1448 dmsg_uuid_to_str(&cls
->pfs_clid
, &uustr
),
1450 RB_FOREACH(node
, h2span_node_tree
, &cls
->tree
) {
1451 dmsg_circuit_printf(circuit
, " Node %02x %s (%s)\n",
1453 dmsg_uuid_to_str(&node
->pfs_fsid
, &uustr
),
1455 RB_FOREACH(slink
, h2span_link_tree
, &node
->tree
) {
1456 dmsg_circuit_printf(circuit
,
1457 "\tSLink msgid %016jx "
1459 (intmax_t)slink
->state
->msgid
,
1461 slink
->state
->iocom
->sock_fd
);
1462 TAILQ_FOREACH(relay
, &slink
->relayq
, entry
) {
1463 dmsg_circuit_printf(circuit
,
1464 "\t Relay-out msgid %016jx "
1466 (intmax_t)relay
->target_rt
->msgid
,
1467 relay
->target_rt
->iocom
->sock_fd
);
1472 pthread_mutex_unlock(&cluster_mtx
);
1476 TAILQ_FOREACH(conn
, &connq
, entry
) {
1484 * Locate the state representing an incoming LNK_SPAN given its msgid.
1487 dmsg_debug_findspan(uint64_t msgid
, dmsg_state_t
**statep
)
1489 h2span_cluster_t
*cls
;
1490 h2span_node_t
*node
;
1491 h2span_link_t
*slink
;
1493 pthread_mutex_lock(&cluster_mtx
);
1494 RB_FOREACH(cls
, h2span_cluster_tree
, &cluster_tree
) {
1495 RB_FOREACH(node
, h2span_node_tree
, &cls
->tree
) {
1496 RB_FOREACH(slink
, h2span_link_tree
, &node
->tree
) {
1497 if (slink
->state
->msgid
== msgid
) {
1498 *statep
= slink
->state
;
1504 pthread_mutex_unlock(&cluster_mtx
);
1508 pthread_mutex_unlock(&cluster_mtx
);
1513 * Random number sub-sort value to add to SPAN rnss fields on relay.
1514 * This allows us to differentiate spans with the same <dist> field
1515 * for relaying purposes. We must normally limit the number of relays
1516 * for any given SPAN origination but we must also guarantee that a
1517 * symmetric reverse path exists, so we use the rnss field as a sub-sort
1518 * (since there can be thousands or millions if we only match on <dist>),
1519 * and if there STILL too many spans we go past the limit.
1525 if (DMsgRNSS
== 0) {
1526 pthread_mutex_lock(&cluster_mtx
);
1527 while (DMsgRNSS
== 0) {
1529 DMsgRNSS
= random();
1531 pthread_mutex_unlock(&cluster_mtx
);