2 * Copyright (c) 2011-2015 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
36 #include "dmsg_local.h"
38 #define DMSG_BLOCK_DEBUG
42 #ifdef DMSG_BLOCK_DEBUG
46 static int dmsg_state_msgrx(dmsg_msg_t
*msg
, int mstate
);
47 static void dmsg_state_cleanuptx(dmsg_iocom_t
*iocom
, dmsg_msg_t
*msg
);
48 static void dmsg_msg_free_locked(dmsg_msg_t
*msg
);
49 static void dmsg_state_free(dmsg_state_t
*state
);
50 static void dmsg_subq_delete(dmsg_state_t
*state
);
51 static void dmsg_simulate_failure(dmsg_state_t
*state
, int meto
, int error
);
52 static void dmsg_state_abort(dmsg_state_t
*state
);
53 static void dmsg_state_dying(dmsg_state_t
*state
);
55 RB_GENERATE(dmsg_state_tree
, dmsg_state
, rbnode
, dmsg_state_cmp
);
58 * STATE TREE - Represents open transactions which are indexed by their
59 * { msgid } relative to the governing iocom.
62 dmsg_state_cmp(dmsg_state_t
*state1
, dmsg_state_t
*state2
)
64 if (state1
->msgid
< state2
->msgid
)
66 if (state1
->msgid
> state2
->msgid
)
72 * Initialize a low-level ioq
75 dmsg_ioq_init(dmsg_iocom_t
*iocom __unused
, dmsg_ioq_t
*ioq
)
77 bzero(ioq
, sizeof(*ioq
));
78 ioq
->state
= DMSG_MSGQ_STATE_HEADER1
;
79 TAILQ_INIT(&ioq
->msgq
);
85 * caller holds iocom->mtx.
88 dmsg_ioq_done(dmsg_iocom_t
*iocom __unused
, dmsg_ioq_t
*ioq
)
92 while ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
93 assert(0); /* shouldn't happen */
94 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
97 if ((msg
= ioq
->msg
) != NULL
) {
104 * Initialize a low-level communications channel.
106 * NOTE: The signal_func() is called at least once from the loop and can be
107 * re-armed via dmsg_iocom_restate().
110 dmsg_iocom_init(dmsg_iocom_t
*iocom
, int sock_fd
, int alt_fd
,
111 void (*signal_func
)(dmsg_iocom_t
*iocom
),
112 void (*rcvmsg_func
)(dmsg_msg_t
*msg
),
113 void (*usrmsg_func
)(dmsg_msg_t
*msg
, int unmanaged
),
114 void (*altmsg_func
)(dmsg_iocom_t
*iocom
))
118 bzero(iocom
, sizeof(*iocom
));
120 asprintf(&iocom
->label
, "iocom-%p", iocom
);
121 iocom
->signal_callback
= signal_func
;
122 iocom
->rcvmsg_callback
= rcvmsg_func
;
123 iocom
->altmsg_callback
= altmsg_func
;
124 iocom
->usrmsg_callback
= usrmsg_func
;
126 pthread_mutex_init(&iocom
->mtx
, NULL
);
127 RB_INIT(&iocom
->staterd_tree
);
128 RB_INIT(&iocom
->statewr_tree
);
129 TAILQ_INIT(&iocom
->txmsgq
);
130 iocom
->sock_fd
= sock_fd
;
131 iocom
->alt_fd
= alt_fd
;
132 iocom
->flags
= DMSG_IOCOMF_RREQ
| DMSG_IOCOMF_CLOSEALT
;
134 iocom
->flags
|= DMSG_IOCOMF_SWORK
;
135 dmsg_ioq_init(iocom
, &iocom
->ioq_rx
);
136 dmsg_ioq_init(iocom
, &iocom
->ioq_tx
);
137 iocom
->state0
.refs
= 1; /* should never trigger a free */
138 iocom
->state0
.iocom
= iocom
;
139 iocom
->state0
.parent
= &iocom
->state0
;
140 iocom
->state0
.flags
= DMSG_STATE_ROOT
;
141 TAILQ_INIT(&iocom
->state0
.subq
);
143 if (pipe(iocom
->wakeupfds
) < 0)
145 fcntl(iocom
->wakeupfds
[0], F_SETFL
, O_NONBLOCK
);
146 fcntl(iocom
->wakeupfds
[1], F_SETFL
, O_NONBLOCK
);
149 * Negotiate session crypto synchronously. This will mark the
150 * connection as error'd if it fails. If this is a pipe it's
151 * a linkage that we set up ourselves to the filesystem and there
154 if (fstat(sock_fd
, &st
) < 0)
156 if (S_ISSOCK(st
.st_mode
))
157 dmsg_crypto_negotiate(iocom
);
160 * Make sure our fds are set to non-blocking for the iocom core.
163 fcntl(sock_fd
, F_SETFL
, O_NONBLOCK
);
165 /* if line buffered our single fgets() should be fine */
167 fcntl(alt_fd
, F_SETFL
, O_NONBLOCK
);
172 dmsg_iocom_label(dmsg_iocom_t
*iocom
, const char *ctl
, ...)
179 vasprintf(&iocom
->label
, ctl
, va
);
186 * May only be called from a callback from iocom_core.
188 * Adjust state machine functions, set flags to guarantee that both
189 * the recevmsg_func and the sendmsg_func is called at least once.
192 dmsg_iocom_restate(dmsg_iocom_t
*iocom
,
193 void (*signal_func
)(dmsg_iocom_t
*),
194 void (*rcvmsg_func
)(dmsg_msg_t
*msg
))
196 pthread_mutex_lock(&iocom
->mtx
);
197 iocom
->signal_callback
= signal_func
;
198 iocom
->rcvmsg_callback
= rcvmsg_func
;
200 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
202 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
203 pthread_mutex_unlock(&iocom
->mtx
);
207 dmsg_iocom_signal(dmsg_iocom_t
*iocom
)
209 pthread_mutex_lock(&iocom
->mtx
);
210 if (iocom
->signal_callback
)
211 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
212 pthread_mutex_unlock(&iocom
->mtx
);
216 * Cleanup a terminating iocom.
218 * Caller should not hold iocom->mtx. The iocom has already been disconnected
219 * from all possible references to it.
222 dmsg_iocom_done(dmsg_iocom_t
*iocom
)
224 if (iocom
->sock_fd
>= 0) {
225 close(iocom
->sock_fd
);
228 if (iocom
->alt_fd
>= 0 && (iocom
->flags
& DMSG_IOCOMF_CLOSEALT
)) {
229 close(iocom
->alt_fd
);
232 dmsg_ioq_done(iocom
, &iocom
->ioq_rx
);
233 dmsg_ioq_done(iocom
, &iocom
->ioq_tx
);
234 if (iocom
->wakeupfds
[0] >= 0) {
235 close(iocom
->wakeupfds
[0]);
236 iocom
->wakeupfds
[0] = -1;
238 if (iocom
->wakeupfds
[1] >= 0) {
239 close(iocom
->wakeupfds
[1]);
240 iocom
->wakeupfds
[1] = -1;
242 pthread_mutex_destroy(&iocom
->mtx
);
246 * Allocate a new message using the specified transaction state.
248 * If CREATE is set a new transaction is allocated relative to the passed-in
249 * transaction (the 'state' argument becomes pstate).
251 * If CREATE is not set the message is associated with the passed-in
255 dmsg_msg_alloc(dmsg_state_t
*state
,
256 size_t aux_size
, uint32_t cmd
,
257 void (*func
)(dmsg_msg_t
*), void *data
)
259 dmsg_iocom_t
*iocom
= state
->iocom
;
262 pthread_mutex_lock(&iocom
->mtx
);
263 msg
= dmsg_msg_alloc_locked(state
, aux_size
, cmd
, func
, data
);
264 pthread_mutex_unlock(&iocom
->mtx
);
270 dmsg_msg_alloc_locked(dmsg_state_t
*state
,
271 size_t aux_size
, uint32_t cmd
,
272 void (*func
)(dmsg_msg_t
*), void *data
)
274 dmsg_iocom_t
*iocom
= state
->iocom
;
275 dmsg_state_t
*pstate
;
280 aligned_size
= DMSG_DOALIGN(aux_size
);
281 if ((cmd
& (DMSGF_CREATE
| DMSGF_REPLY
)) == DMSGF_CREATE
) {
283 * When CREATE is set without REPLY the caller is
284 * initiating a new transaction stacked under the specified
287 * It is possible to race a circuit failure, inherit the
288 * parent's STATE_DYING flag to trigger an abort sequence
289 * in the transmit path. By not inheriting ABORTING the
290 * abort sequence can recurse.
292 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
293 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
296 state
= malloc(sizeof(*state
));
297 bzero(state
, sizeof(*state
));
298 atomic_add_int(&dmsg_state_count
, 1);
300 TAILQ_INIT(&state
->subq
);
301 state
->parent
= pstate
;
302 state
->iocom
= iocom
;
303 state
->flags
= DMSG_STATE_DYNAMIC
;
304 state
->msgid
= (uint64_t)(uintptr_t)state
;
305 state
->txcmd
= cmd
& ~(DMSGF_CREATE
| DMSGF_DELETE
);
306 state
->rxcmd
= DMSGF_REPLY
;
307 state
->icmd
= state
->txcmd
& DMSGF_BASECMDMASK
;
309 state
->any
.any
= data
;
311 state
->flags
|= DMSG_STATE_SUBINSERTED
|
312 DMSG_STATE_RBINSERTED
;
313 state
->flags
|= pstate
->flags
& DMSG_STATE_DYING
;
314 if (TAILQ_EMPTY(&pstate
->subq
))
315 dmsg_state_hold(pstate
);
316 RB_INSERT(dmsg_state_tree
, &iocom
->statewr_tree
, state
);
317 TAILQ_INSERT_TAIL(&pstate
->subq
, state
, entry
);
318 dmsg_state_hold(state
); /* state on pstate->subq */
319 dmsg_state_hold(state
); /* state on rbtree */
320 dmsg_state_hold(state
); /* msg->state */
323 * Otherwise the message is transmitted over the existing
326 pstate
= state
->parent
;
327 dmsg_state_hold(state
); /* msg->state */
330 /* XXX SMP race for state */
331 hbytes
= (cmd
& DMSGF_SIZE
) * DMSG_ALIGN
;
332 assert((size_t)hbytes
>= sizeof(struct dmsg_hdr
));
333 msg
= malloc(offsetof(struct dmsg_msg
, any
.head
) + hbytes
);
334 bzero(msg
, offsetof(struct dmsg_msg
, any
.head
));
337 * [re]allocate the auxillary data buffer. The caller knows that
338 * a size-aligned buffer will be allocated but we do not want to
339 * force the caller to zero any tail piece, so we do that ourself.
341 if (msg
->aux_size
!= aux_size
) {
344 msg
->aux_data
= NULL
;
348 msg
->aux_data
= malloc(aligned_size
);
349 msg
->aux_size
= aux_size
;
350 if (aux_size
!= aligned_size
) {
351 bzero(msg
->aux_data
+ aux_size
,
352 aligned_size
- aux_size
);
358 * Set REVTRANS if the transaction was remotely initiated
359 * Set REVCIRC if the circuit was remotely initiated
361 if (state
->flags
& DMSG_STATE_OPPOSITE
)
362 cmd
|= DMSGF_REVTRANS
;
363 if (pstate
->flags
& DMSG_STATE_OPPOSITE
)
364 cmd
|= DMSGF_REVCIRC
;
367 * Finish filling out the header.
369 bzero(&msg
->any
.head
, hbytes
);
370 msg
->hdr_size
= hbytes
;
371 msg
->any
.head
.magic
= DMSG_HDR_MAGIC
;
372 msg
->any
.head
.cmd
= cmd
;
373 msg
->any
.head
.aux_descr
= 0;
374 msg
->any
.head
.aux_crc
= 0;
375 msg
->any
.head
.msgid
= state
->msgid
;
376 msg
->any
.head
.circuit
= pstate
->msgid
;
383 * Free a message so it can be reused afresh.
385 * NOTE: aux_size can be 0 with a non-NULL aux_data.
389 dmsg_msg_free_locked(dmsg_msg_t
*msg
)
393 if ((state
= msg
->state
) != NULL
) {
394 dmsg_state_drop(state
);
395 msg
->state
= NULL
; /* safety */
399 msg
->aux_data
= NULL
; /* safety */
406 dmsg_msg_free(dmsg_msg_t
*msg
)
408 dmsg_iocom_t
*iocom
= msg
->state
->iocom
;
410 pthread_mutex_lock(&iocom
->mtx
);
411 dmsg_msg_free_locked(msg
);
412 pthread_mutex_unlock(&iocom
->mtx
);
416 * I/O core loop for an iocom.
418 * Thread localized, iocom->mtx not held.
421 dmsg_iocom_core(dmsg_iocom_t
*iocom
)
423 struct pollfd fds
[3];
428 int wi
; /* wakeup pipe */
430 int ai
; /* alt bulk path socket */
432 while ((iocom
->flags
& DMSG_IOCOMF_EOF
) == 0) {
434 * These iocom->flags are only manipulated within the
435 * context of the current thread. However, modifications
436 * still require atomic ops.
439 fprintf(stderr
, "iocom %p %08x\n", iocom
, iocom
->flags
);
441 if ((iocom
->flags
& (DMSG_IOCOMF_RWORK
|
446 DMSG_IOCOMF_AWWORK
)) == 0) {
448 * Only poll if no immediate work is pending.
449 * Otherwise we are just wasting our time calling
460 * Always check the inter-thread pipe, e.g.
461 * for iocom->txmsgq work.
464 fds
[wi
].fd
= iocom
->wakeupfds
[0];
465 fds
[wi
].events
= POLLIN
;
469 * Check the socket input/output direction as
472 if (iocom
->flags
& (DMSG_IOCOMF_RREQ
|
475 fds
[si
].fd
= iocom
->sock_fd
;
479 if (iocom
->flags
& DMSG_IOCOMF_RREQ
)
480 fds
[si
].events
|= POLLIN
;
481 if (iocom
->flags
& DMSG_IOCOMF_WREQ
)
482 fds
[si
].events
|= POLLOUT
;
486 * Check the alternative fd for work.
488 if (iocom
->alt_fd
>= 0) {
490 fds
[ai
].fd
= iocom
->alt_fd
;
491 fds
[ai
].events
= POLLIN
;
494 poll(fds
, count
, timeout
);
496 if (wi
>= 0 && (fds
[wi
].revents
& POLLIN
))
497 atomic_set_int(&iocom
->flags
,
499 if (si
>= 0 && (fds
[si
].revents
& POLLIN
))
500 atomic_set_int(&iocom
->flags
,
502 if (si
>= 0 && (fds
[si
].revents
& POLLOUT
))
503 atomic_set_int(&iocom
->flags
,
505 if (wi
>= 0 && (fds
[wi
].revents
& POLLOUT
))
506 atomic_set_int(&iocom
->flags
,
508 if (ai
>= 0 && (fds
[ai
].revents
& POLLIN
))
509 atomic_set_int(&iocom
->flags
,
513 * Always check the pipe
515 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_PWORK
);
518 if (iocom
->flags
& DMSG_IOCOMF_SWORK
) {
519 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
520 iocom
->signal_callback(iocom
);
524 * Pending message queues from other threads wake us up
525 * with a write to the wakeupfds[] pipe. We have to clear
526 * the pipe with a dummy read.
528 if (iocom
->flags
& DMSG_IOCOMF_PWORK
) {
529 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_PWORK
);
530 read(iocom
->wakeupfds
[0], dummybuf
, sizeof(dummybuf
));
531 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
532 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_WWORK
);
536 * Message write sequencing
538 if (iocom
->flags
& DMSG_IOCOMF_WWORK
)
539 dmsg_iocom_flush1(iocom
);
542 * Message read sequencing. Run this after the write
543 * sequencing in case the write sequencing allowed another
544 * auto-DELETE to occur on the read side.
546 if (iocom
->flags
& DMSG_IOCOMF_RWORK
) {
547 while ((iocom
->flags
& DMSG_IOCOMF_EOF
) == 0 &&
548 (msg
= dmsg_ioq_read(iocom
)) != NULL
) {
550 fprintf(stderr
, "receive %s\n",
553 iocom
->rcvmsg_callback(msg
);
554 pthread_mutex_lock(&iocom
->mtx
);
555 dmsg_state_cleanuprx(iocom
, msg
);
556 pthread_mutex_unlock(&iocom
->mtx
);
560 if (iocom
->flags
& DMSG_IOCOMF_ARWORK
) {
561 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_ARWORK
);
562 iocom
->altmsg_callback(iocom
);
568 * Make sure there's enough room in the FIFO to hold the
571 * Assume worst case encrypted form is 2x the size of the
572 * plaintext equivalent.
576 dmsg_ioq_makeroom(dmsg_ioq_t
*ioq
, size_t needed
)
581 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
582 nmax
= sizeof(ioq
->buf
) - ioq
->fifo_end
;
583 if (bytes
+ nmax
/ 2 < needed
) {
585 bcopy(ioq
->buf
+ ioq
->fifo_beg
,
589 ioq
->fifo_cdx
-= ioq
->fifo_beg
;
591 if (ioq
->fifo_cdn
< ioq
->fifo_end
) {
592 bcopy(ioq
->buf
+ ioq
->fifo_cdn
,
593 ioq
->buf
+ ioq
->fifo_cdx
,
594 ioq
->fifo_end
- ioq
->fifo_cdn
);
596 ioq
->fifo_end
-= ioq
->fifo_cdn
- ioq
->fifo_cdx
;
597 ioq
->fifo_cdn
= ioq
->fifo_cdx
;
598 nmax
= sizeof(ioq
->buf
) - ioq
->fifo_end
;
604 * Read the next ready message from the ioq, issuing I/O if needed.
605 * Caller should retry on a read-event when NULL is returned.
607 * If an error occurs during reception a DMSG_LNK_ERROR msg will
608 * be returned for each open transaction, then the ioq and iocom
609 * will be errored out and a non-transactional DMSG_LNK_ERROR
610 * msg will be returned as the final message. The caller should not call
611 * us again after the final message is returned.
613 * Thread localized, iocom->mtx not held.
616 dmsg_ioq_read(dmsg_iocom_t
*iocom
)
618 dmsg_ioq_t
*ioq
= &iocom
->ioq_rx
;
630 * If a message is already pending we can just remove and
631 * return it. Message state has already been processed.
632 * (currently not implemented)
634 if ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
635 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
637 if (msg
->state
== &iocom
->state0
) {
638 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_EOF
);
639 fprintf(stderr
, "EOF ON SOCKET %d\n", iocom
->sock_fd
);
643 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_RREQ
| DMSG_IOCOMF_RWORK
);
646 * If the stream is errored out we stop processing it.
652 * Message read in-progress (msg is NULL at the moment). We don't
653 * allocate a msg until we have its core header.
655 nmax
= sizeof(ioq
->buf
) - ioq
->fifo_end
;
656 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
; /* already decrypted */
660 case DMSG_MSGQ_STATE_HEADER1
:
662 * Load the primary header, fail on any non-trivial read
663 * error or on EOF. Since the primary header is the same
664 * size is the message alignment it will never straddle
665 * the end of the buffer.
667 nmax
= dmsg_ioq_makeroom(ioq
, sizeof(msg
->any
.head
));
668 if (bytes
< sizeof(msg
->any
.head
)) {
669 n
= read(iocom
->sock_fd
,
670 ioq
->buf
+ ioq
->fifo_end
,
674 ioq
->error
= DMSG_IOQ_ERROR_EOF
;
677 if (errno
!= EINTR
&&
678 errno
!= EINPROGRESS
&&
680 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
686 ioq
->fifo_end
+= (size_t)n
;
691 * Decrypt data received so far. Data will be decrypted
692 * in-place but might create gaps in the FIFO. Partial
693 * blocks are not immediately decrypted.
695 * WARNING! The header might be in the wrong endian, we
696 * do not fix it up until we get the entire
699 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
700 dmsg_crypto_decrypt(iocom
, ioq
);
702 ioq
->fifo_cdx
= ioq
->fifo_end
;
703 ioq
->fifo_cdn
= ioq
->fifo_end
;
705 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
708 * Insufficient data accumulated (msg is NULL, caller will
712 if (bytes
< sizeof(msg
->any
.head
))
716 * Check and fixup the core header. Note that the icrc
717 * has to be calculated before any fixups, but the crc
718 * fields in the msg may have to be swapped like everything
721 head
= (void *)(ioq
->buf
+ ioq
->fifo_beg
);
722 if (head
->magic
!= DMSG_HDR_MAGIC
&&
723 head
->magic
!= DMSG_HDR_MAGIC_REV
) {
724 fprintf(stderr
, "%s: head->magic is bad %02x\n",
725 iocom
->label
, head
->magic
);
726 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
)
727 fprintf(stderr
, "(on encrypted link)\n");
728 ioq
->error
= DMSG_IOQ_ERROR_SYNC
;
733 * Calculate the full header size and aux data size
735 if (head
->magic
== DMSG_HDR_MAGIC_REV
) {
736 ioq
->hbytes
= (bswap32(head
->cmd
) & DMSGF_SIZE
) *
738 aux_size
= bswap32(head
->aux_bytes
);
740 ioq
->hbytes
= (head
->cmd
& DMSGF_SIZE
) *
742 aux_size
= head
->aux_bytes
;
744 ioq
->abytes
= DMSG_DOALIGN(aux_size
);
745 ioq
->unaligned_aux_size
= aux_size
;
746 if (ioq
->hbytes
< sizeof(msg
->any
.head
) ||
747 ioq
->hbytes
> sizeof(msg
->any
) ||
748 ioq
->abytes
> DMSG_AUX_MAX
) {
749 ioq
->error
= DMSG_IOQ_ERROR_FIELD
;
754 * Allocate the message, the next state will fill it in.
756 * NOTE: The aux_data buffer will be sized to an aligned
757 * value and the aligned remainder zero'd for
760 * NOTE: Supply dummy state and a degenerate cmd without
761 * CREATE set. The message will temporarily be
762 * associated with state0 until later post-processing.
764 msg
= dmsg_msg_alloc(&iocom
->state0
, aux_size
,
765 ioq
->hbytes
/ DMSG_ALIGN
,
770 * Fall through to the next state. Make sure that the
771 * extended header does not straddle the end of the buffer.
772 * We still want to issue larger reads into our buffer,
773 * book-keeping is easier if we don't bcopy() yet.
775 * Make sure there is enough room for bloated encrypt data.
777 nmax
= dmsg_ioq_makeroom(ioq
, ioq
->hbytes
);
778 ioq
->state
= DMSG_MSGQ_STATE_HEADER2
;
780 case DMSG_MSGQ_STATE_HEADER2
:
782 * Fill out the extended header.
785 if (bytes
< ioq
->hbytes
) {
787 n
= read(iocom
->sock_fd
,
788 ioq
->buf
+ ioq
->fifo_end
,
792 ioq
->error
= DMSG_IOQ_ERROR_EOF
;
795 if (errno
!= EINTR
&&
796 errno
!= EINPROGRESS
&&
798 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
804 ioq
->fifo_end
+= (size_t)n
;
808 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
809 dmsg_crypto_decrypt(iocom
, ioq
);
811 ioq
->fifo_cdx
= ioq
->fifo_end
;
812 ioq
->fifo_cdn
= ioq
->fifo_end
;
814 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
817 * Insufficient data accumulated (set msg NULL so caller will
820 if (bytes
< ioq
->hbytes
) {
826 * Calculate the extended header, decrypt data received
827 * so far. Handle endian-conversion for the entire extended
830 head
= (void *)(ioq
->buf
+ ioq
->fifo_beg
);
835 if (head
->magic
== DMSG_HDR_MAGIC_REV
)
836 xcrc32
= bswap32(head
->hdr_crc
);
838 xcrc32
= head
->hdr_crc
;
840 if (dmsg_icrc32(head
, ioq
->hbytes
) != xcrc32
) {
841 ioq
->error
= DMSG_IOQ_ERROR_XCRC
;
842 fprintf(stderr
, "BAD-XCRC(%08x,%08x) %s\n",
843 xcrc32
, dmsg_icrc32(head
, ioq
->hbytes
),
848 head
->hdr_crc
= xcrc32
;
850 if (head
->magic
== DMSG_HDR_MAGIC_REV
) {
851 dmsg_bswap_head(head
);
855 * Copy the extended header into the msg and adjust the
858 bcopy(head
, &msg
->any
, ioq
->hbytes
);
861 * We are either done or we fall-through.
863 if (ioq
->abytes
== 0) {
864 ioq
->fifo_beg
+= ioq
->hbytes
;
869 * Must adjust bytes (and the state) when falling through.
870 * nmax doesn't change.
872 ioq
->fifo_beg
+= ioq
->hbytes
;
873 bytes
-= ioq
->hbytes
;
874 ioq
->state
= DMSG_MSGQ_STATE_AUXDATA1
;
876 case DMSG_MSGQ_STATE_AUXDATA1
:
878 * Copy the partial or complete [decrypted] payload from
879 * remaining bytes in the FIFO in order to optimize the
880 * makeroom call in the AUXDATA2 state. We have to
881 * fall-through either way so we can check the crc.
883 * msg->aux_size tracks our aux data.
885 * (Lets not complicate matters if the data is encrypted,
886 * since the data in-stream is not the same size as the
889 if (bytes
>= ioq
->abytes
) {
890 bcopy(ioq
->buf
+ ioq
->fifo_beg
, msg
->aux_data
,
892 msg
->aux_size
= ioq
->abytes
;
893 ioq
->fifo_beg
+= ioq
->abytes
;
894 assert(ioq
->fifo_beg
<= ioq
->fifo_cdx
);
895 assert(ioq
->fifo_cdx
<= ioq
->fifo_cdn
);
896 bytes
-= ioq
->abytes
;
898 bcopy(ioq
->buf
+ ioq
->fifo_beg
, msg
->aux_data
,
900 msg
->aux_size
= bytes
;
901 ioq
->fifo_beg
+= bytes
;
902 if (ioq
->fifo_cdx
< ioq
->fifo_beg
)
903 ioq
->fifo_cdx
= ioq
->fifo_beg
;
904 assert(ioq
->fifo_beg
<= ioq
->fifo_cdx
);
905 assert(ioq
->fifo_cdx
<= ioq
->fifo_cdn
);
910 ioq
->state
= DMSG_MSGQ_STATE_AUXDATA2
;
912 case DMSG_MSGQ_STATE_AUXDATA2
:
914 * Make sure there is enough room for more data.
917 nmax
= dmsg_ioq_makeroom(ioq
, ioq
->abytes
- msg
->aux_size
);
920 * Read and decrypt more of the payload.
922 if (msg
->aux_size
< ioq
->abytes
) {
925 n
= read(iocom
->sock_fd
,
926 ioq
->buf
+ ioq
->fifo_end
,
930 ioq
->error
= DMSG_IOQ_ERROR_EOF
;
933 if (errno
!= EINTR
&&
934 errno
!= EINPROGRESS
&&
936 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
942 ioq
->fifo_end
+= (size_t)n
;
946 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
947 dmsg_crypto_decrypt(iocom
, ioq
);
949 ioq
->fifo_cdx
= ioq
->fifo_end
;
950 ioq
->fifo_cdn
= ioq
->fifo_end
;
952 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
954 if (bytes
> ioq
->abytes
- msg
->aux_size
)
955 bytes
= ioq
->abytes
- msg
->aux_size
;
958 bcopy(ioq
->buf
+ ioq
->fifo_beg
,
959 msg
->aux_data
+ msg
->aux_size
,
961 msg
->aux_size
+= bytes
;
962 ioq
->fifo_beg
+= bytes
;
966 * Insufficient data accumulated (set msg NULL so caller will
969 * Assert the auxillary data size is correct, then record the
970 * original unaligned size from the message header.
972 if (msg
->aux_size
< ioq
->abytes
) {
976 assert(msg
->aux_size
== ioq
->abytes
);
977 msg
->aux_size
= ioq
->unaligned_aux_size
;
980 * Check aux_crc, then we are done. Note that the crc
981 * is calculated over the aligned size, not the actual
984 xcrc32
= dmsg_icrc32(msg
->aux_data
, ioq
->abytes
);
985 if (xcrc32
!= msg
->any
.head
.aux_crc
) {
986 ioq
->error
= DMSG_IOQ_ERROR_ACRC
;
988 "iocom: ACRC error %08x vs %08x "
989 "msgid %016jx msgcmd %08x auxsize %d\n",
991 msg
->any
.head
.aux_crc
,
992 (intmax_t)msg
->any
.head
.msgid
,
994 msg
->any
.head
.aux_bytes
);
998 case DMSG_MSGQ_STATE_ERROR
:
1000 * Continued calls to drain recorded transactions (returning
1001 * a LNK_ERROR for each one), before we return the final
1004 assert(msg
== NULL
);
1008 * We don't double-return errors, the caller should not
1009 * have called us again after getting an error msg.
1016 * Check the message sequence. The iv[] should prevent any
1017 * possibility of a replay but we add this check anyway.
1019 if (msg
&& ioq
->error
== 0) {
1020 if ((msg
->any
.head
.salt
& 255) != (ioq
->seq
& 255)) {
1021 ioq
->error
= DMSG_IOQ_ERROR_MSGSEQ
;
1028 * Handle error, RREQ, or completion
1030 * NOTE: nmax and bytes are invalid at this point, we don't bother
1031 * to update them when breaking out.
1036 * An unrecoverable error causes all active receive
1037 * transactions to be terminated with a LNK_ERROR message.
1039 * Once all active transactions are exhausted we set the
1040 * iocom ERROR flag and return a non-transactional LNK_ERROR
1041 * message, which should cause master processing loops to
1044 fprintf(stderr
, "IOQ ERROR %d\n", ioq
->error
);
1045 assert(ioq
->msg
== msg
);
1053 * No more I/O read processing
1055 ioq
->state
= DMSG_MSGQ_STATE_ERROR
;
1058 * Simulate a remote LNK_ERROR DELETE msg for any open
1059 * transactions, ending with a final non-transactional
1060 * LNK_ERROR (that the session can detect) when no
1061 * transactions remain.
1063 * NOTE: Temporarily supply state0 and a degenerate cmd
1064 * without CREATE set. The real state will be
1065 * assigned in the loop.
1067 * NOTE: We are simulating a received message using our
1068 * side of the state, so the DMSGF_REV* bits have
1071 pthread_mutex_lock(&iocom
->mtx
);
1072 dmsg_iocom_drain(iocom
);
1073 dmsg_simulate_failure(&iocom
->state0
, 0, ioq
->error
);
1074 pthread_mutex_unlock(&iocom
->mtx
);
1075 if (TAILQ_FIRST(&ioq
->msgq
))
1080 * For the iocom error case we want to set RWORK to indicate
1081 * that more messages might be pending.
1083 * It is possible to return NULL when there is more work to
1084 * do because each message has to be DELETEd in both
1085 * directions before we continue on with the next (though
1086 * this could be optimized). The transmit direction will
1090 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
1092 } else if (msg
== NULL
) {
1094 * Insufficient data received to finish building the message,
1095 * set RREQ and return NULL.
1097 * Leave ioq->msg intact.
1098 * Leave the FIFO intact.
1100 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RREQ
);
1103 * Continue processing msg.
1105 * The fifo has already been advanced past the message.
1106 * Trivially reset the FIFO indices if possible.
1108 * clear the FIFO if it is now empty and set RREQ to wait
1109 * for more from the socket. If the FIFO is not empty set
1110 * TWORK to bypass the poll so we loop immediately.
1112 if (ioq
->fifo_beg
== ioq
->fifo_cdx
&&
1113 ioq
->fifo_cdn
== ioq
->fifo_end
) {
1114 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RREQ
);
1120 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
1122 ioq
->state
= DMSG_MSGQ_STATE_HEADER1
;
1126 * Handle message routing. Validates non-zero sources
1127 * and routes message. Error will be 0 if the message is
1130 * State processing only occurs for messages destined for us.
1132 if (DMsgDebugOpt
>= 5) {
1134 "rxmsg cmd=%08x circ=%016jx\n",
1136 (intmax_t)msg
->any
.head
.circuit
);
1139 error
= dmsg_state_msgrx(msg
, 0);
1143 * Abort-after-closure, throw message away and
1144 * start reading another.
1146 if (error
== DMSG_IOQ_ERROR_EALREADY
) {
1152 * Process real error and throw away message.
1159 * No error and not routed
1161 /* no error, not routed. Fall through and return msg */
1167 * Calculate the header and data crc's and write a low-level message to
1168 * the connection. If aux_crc is non-zero the aux_data crc is already
1169 * assumed to have been set.
1171 * A non-NULL msg is added to the queue but not necessarily flushed.
1172 * Calling this function with msg == NULL will get a flush going.
1174 * (called from iocom_core only)
1177 dmsg_iocom_flush1(dmsg_iocom_t
*iocom
)
1179 dmsg_ioq_t
*ioq
= &iocom
->ioq_tx
;
1184 dmsg_msg_queue_t tmpq
;
1186 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_WREQ
| DMSG_IOCOMF_WWORK
);
1188 pthread_mutex_lock(&iocom
->mtx
);
1189 while ((msg
= TAILQ_FIRST(&iocom
->txmsgq
)) != NULL
) {
1190 TAILQ_REMOVE(&iocom
->txmsgq
, msg
, qentry
);
1191 TAILQ_INSERT_TAIL(&tmpq
, msg
, qentry
);
1193 pthread_mutex_unlock(&iocom
->mtx
);
1196 * Flush queue, doing all required encryption and CRC generation,
1197 * with the mutex unlocked.
1199 while ((msg
= TAILQ_FIRST(&tmpq
)) != NULL
) {
1201 * Process terminal connection errors.
1203 TAILQ_REMOVE(&tmpq
, msg
, qentry
);
1205 TAILQ_INSERT_TAIL(&ioq
->msgq
, msg
, qentry
);
1211 * Finish populating the msg fields. The salt ensures that
1212 * the iv[] array is ridiculously randomized and we also
1213 * re-seed our PRNG every 32768 messages just to be sure.
1215 msg
->any
.head
.magic
= DMSG_HDR_MAGIC
;
1216 msg
->any
.head
.salt
= (random() << 8) | (ioq
->seq
& 255);
1218 if ((ioq
->seq
& 32767) == 0) {
1219 pthread_mutex_lock(&iocom
->mtx
);
1221 pthread_mutex_unlock(&iocom
->mtx
);
1225 * Calculate aux_crc if 0, then calculate hdr_crc.
1227 if (msg
->aux_size
&& msg
->any
.head
.aux_crc
== 0) {
1228 abytes
= DMSG_DOALIGN(msg
->aux_size
);
1229 xcrc32
= dmsg_icrc32(msg
->aux_data
, abytes
);
1230 msg
->any
.head
.aux_crc
= xcrc32
;
1232 msg
->any
.head
.aux_bytes
= msg
->aux_size
;
1234 hbytes
= (msg
->any
.head
.cmd
& DMSGF_SIZE
) *
1236 msg
->any
.head
.hdr_crc
= 0;
1237 msg
->any
.head
.hdr_crc
= dmsg_icrc32(&msg
->any
.head
, hbytes
);
1240 * Enqueue the message (the flush codes handles stream
1243 TAILQ_INSERT_TAIL(&ioq
->msgq
, msg
, qentry
);
1246 dmsg_iocom_flush2(iocom
);
1250 * Thread localized, iocom->mtx not held by caller.
1252 * (called from iocom_core via iocom_flush1 only)
1255 dmsg_iocom_flush2(dmsg_iocom_t
*iocom
)
1257 dmsg_ioq_t
*ioq
= &iocom
->ioq_tx
;
1260 struct iovec iov
[DMSG_IOQ_MAXIOVEC
];
1270 dmsg_iocom_drain(iocom
);
1275 * Pump messages out the connection by building an iovec.
1277 * ioq->hbytes/ioq->abytes tracks how much of the first message
1278 * in the queue has been successfully written out, so we can
1286 TAILQ_FOREACH(msg
, &ioq
->msgq
, qentry
) {
1287 hbytes
= (msg
->any
.head
.cmd
& DMSGF_SIZE
) *
1289 abytes
= DMSG_DOALIGN(msg
->aux_size
);
1290 assert(hoff
<= hbytes
&& aoff
<= abytes
);
1292 if (hoff
< hbytes
) {
1293 size_t maxlen
= hbytes
- hoff
;
1294 if (maxlen
> sizeof(ioq
->buf
) / 2)
1295 maxlen
= sizeof(ioq
->buf
) / 2;
1296 iov
[iovcnt
].iov_base
= (char *)&msg
->any
.head
+ hoff
;
1297 iov
[iovcnt
].iov_len
= maxlen
;
1300 if (iovcnt
== DMSG_IOQ_MAXIOVEC
||
1301 maxlen
!= hbytes
- hoff
) {
1305 if (aoff
< abytes
) {
1306 size_t maxlen
= abytes
- aoff
;
1307 if (maxlen
> sizeof(ioq
->buf
) / 2)
1308 maxlen
= sizeof(ioq
->buf
) / 2;
1310 assert(msg
->aux_data
!= NULL
);
1311 iov
[iovcnt
].iov_base
= (char *)msg
->aux_data
+ aoff
;
1312 iov
[iovcnt
].iov_len
= maxlen
;
1315 if (iovcnt
== DMSG_IOQ_MAXIOVEC
||
1316 maxlen
!= abytes
- aoff
) {
1325 * Shortcut if no work to do. Be sure to check for old work still
1326 * pending in the FIFO.
1328 if (iovcnt
== 0 && ioq
->fifo_beg
== ioq
->fifo_cdx
)
1332 * Encrypt and write the data. The crypto code will move the
1333 * data into the fifo and adjust the iov as necessary. If
1334 * encryption is disabled the iov is left alone.
1336 * May return a smaller iov (thus a smaller n), with aggregated
1337 * chunks. May reduce nmax to what fits in the FIFO.
1339 * This function sets nact to the number of original bytes now
1340 * encrypted, adding to the FIFO some number of bytes that might
1341 * be greater depending on the crypto mechanic. iov[] is adjusted
1342 * to point at the FIFO if necessary.
1344 * NOTE: nact is the number of bytes eaten from the message. For
1345 * encrypted data this is the number of bytes processed for
1346 * encryption and not necessarily the number of bytes writable.
1347 * The return value from the writev() is the post-encrypted
1348 * byte count which might be larger.
1350 * NOTE: For direct writes, nact is the return value from the writev().
1352 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
1354 * Make sure the FIFO has a reasonable amount of space
1355 * left (if not completely full).
1357 * In this situation we are staging the encrypted message
1358 * data in the FIFO. (nact) represents how much plaintext
1359 * has been staged, (n) represents how much encrypted data
1360 * has been flushed. The two are independent of each other.
1362 if (ioq
->fifo_beg
> sizeof(ioq
->buf
) / 2 &&
1363 sizeof(ioq
->buf
) - ioq
->fifo_end
< DMSG_ALIGN
* 2) {
1364 bcopy(ioq
->buf
+ ioq
->fifo_beg
, ioq
->buf
,
1365 ioq
->fifo_end
- ioq
->fifo_beg
);
1366 ioq
->fifo_cdx
-= ioq
->fifo_beg
;
1367 ioq
->fifo_cdn
-= ioq
->fifo_beg
;
1368 ioq
->fifo_end
-= ioq
->fifo_beg
;
1373 * beg .... cdx ............ cdn ............. end
1374 * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1376 * Advance fifo_beg on a successful write.
1378 iovcnt
= dmsg_crypto_encrypt(iocom
, ioq
, iov
, iovcnt
, &nact
);
1379 n
= writev(iocom
->sock_fd
, iov
, iovcnt
);
1383 if (ioq
->fifo_beg
== ioq
->fifo_end
) {
1392 * We don't mess with the nact returned by the crypto_encrypt
1393 * call, which represents the filling of the FIFO. (n) tells
1394 * us how much we were able to write from the FIFO. The two
1395 * are different beasts when encrypting.
1399 * In this situation we are not staging the messages to the
1400 * FIFO but instead writing them directly from the msg
1401 * structure(s) unencrypted, so (nact) is basically (n).
1403 n
= writev(iocom
->sock_fd
, iov
, iovcnt
);
1412 * Clean out the transmit queue based on what we successfully
1413 * encrypted (nact is the plaintext count) and is now in the FIFO.
1414 * ioq->hbytes/abytes represents the portion of the first message
1417 while ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
1418 hbytes
= (msg
->any
.head
.cmd
& DMSGF_SIZE
) *
1420 abytes
= DMSG_DOALIGN(msg
->aux_size
);
1422 if ((size_t)nact
< hbytes
- ioq
->hbytes
) {
1423 ioq
->hbytes
+= nact
;
1427 nact
-= hbytes
- ioq
->hbytes
;
1428 ioq
->hbytes
= hbytes
;
1429 if ((size_t)nact
< abytes
- ioq
->abytes
) {
1430 ioq
->abytes
+= nact
;
1434 nact
-= abytes
- ioq
->abytes
;
1435 /* ioq->abytes = abytes; optimized out */
1439 "txmsg cmd=%08x circ=%016jx\n",
1441 (intmax_t)msg
->any
.head
.circuit
);
1444 #ifdef DMSG_BLOCK_DEBUG
1447 if (msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_DELETE
)) {
1448 if ((msg
->state
->flags
& DMSG_STATE_ROOT
) == 0) {
1449 tcmd
= (msg
->state
->icmd
& DMSGF_BASECMDMASK
) |
1450 (msg
->any
.head
.cmd
& (DMSGF_CREATE
|
1457 tcmd
= msg
->any
.head
.cmd
& DMSGF_CMDSWMASK
;
1461 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
:
1462 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
:
1463 fprintf(stderr
, "write BIO %-3d %016jx %d@%016jx\n",
1464 biocount
, msg
->any
.head
.msgid
,
1465 msg
->any
.blk_read
.bytes
,
1466 msg
->any
.blk_read
.offset
);
1468 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
1469 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
1470 fprintf(stderr
, "wretr BIO %-3d %016jx %d@%016jx\n",
1471 biocount
, msg
->any
.head
.msgid
,
1472 msg
->any
.blk_read
.bytes
,
1473 msg
->any
.blk_read
.offset
);
1480 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
1489 * Process the return value from the write w/regards to blocking.
1492 if (save_errno
!= EINTR
&&
1493 save_errno
!= EINPROGRESS
&&
1494 save_errno
!= EAGAIN
) {
1498 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
1499 dmsg_iocom_drain(iocom
);
1502 * Wait for socket buffer space, do not try to
1503 * process more packets for transmit until space
1506 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_WREQ
);
1508 } else if (TAILQ_FIRST(&ioq
->msgq
) ||
1509 TAILQ_FIRST(&iocom
->txmsgq
) ||
1510 ioq
->fifo_beg
!= ioq
->fifo_cdx
) {
1512 * If the write succeeded and more messages are pending
1513 * in either msgq, or the FIFO WWORK must remain set.
1515 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_WWORK
);
1517 /* else no transmit-side work remains */
1520 dmsg_iocom_drain(iocom
);
1525 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1526 * write events will occur. We don't kill read msgs because we want
1527 * the caller to pull off our contrived terminal error msg to detect
1528 * the connection failure.
1530 * Localized to iocom_core thread, iocom->mtx not held by caller.
1533 dmsg_iocom_drain(dmsg_iocom_t
*iocom
)
1535 dmsg_ioq_t
*ioq
= &iocom
->ioq_tx
;
1538 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_WREQ
| DMSG_IOCOMF_WWORK
);
1542 while ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
1543 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
1550 * Write a message to an iocom, with additional state processing.
1553 dmsg_msg_write(dmsg_msg_t
*msg
)
1555 dmsg_iocom_t
*iocom
= msg
->state
->iocom
;
1556 dmsg_state_t
*state
;
1559 pthread_mutex_lock(&iocom
->mtx
);
1564 "msgtx: cmd=%08x msgid=%016jx "
1565 "state %p(%08x) error=%d\n",
1566 msg
->any
.head
.cmd
, msg
->any
.head
.msgid
,
1567 state
, (state
? state
->icmd
: 0),
1568 msg
->any
.head
.error
);
1574 * Make sure the parent transaction is still open in the transmit
1575 * direction. If it isn't the message is dead and we have to
1576 * potentially simulate a rxmsg terminating the transaction.
1578 if ((state
->parent
->txcmd
& DMSGF_DELETE
) ||
1579 (state
->parent
->rxcmd
& DMSGF_DELETE
)) {
1580 fprintf(stderr
, "dmsg_msg_write: EARLY TERMINATION\n");
1581 dmsg_simulate_failure(state
, DMSG_ERR_LOSTLINK
);
1582 dmsg_state_cleanuptx(iocom
, msg
);
1584 pthread_mutex_unlock(&iocom
->mtx
);
1589 * Process state data into the message as needed, then update the
1590 * state based on the message.
1592 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1594 * Existing transaction (could be reply). It is also
1595 * possible for this to be the first reply (CREATE is set),
1596 * in which case we populate state->txcmd.
1598 * state->txcmd is adjusted to hold the final message cmd,
1599 * and we also be sure to set the CREATE bit here. We did
1600 * not set it in dmsg_msg_alloc() because that would have
1601 * not been serialized (state could have gotten ripped out
1602 * from under the message prior to it being transmitted).
1604 if ((msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_REPLY
)) ==
1606 state
->txcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
1607 state
->icmd
= state
->txcmd
& DMSGF_BASECMDMASK
;
1608 state
->flags
&= ~DMSG_STATE_NEW
;
1610 msg
->any
.head
.msgid
= state
->msgid
;
1612 if (msg
->any
.head
.cmd
& DMSGF_CREATE
) {
1613 state
->txcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
1618 * Discard messages sent to transactions which are already dead.
1620 if (state
&& (state
->txcmd
& DMSGF_DELETE
)) {
1621 printf("dmsg_msg_write: drop msg %08x to dead "
1622 "circuit state=%p\n",
1623 msg
->any
.head
.cmd
, state
);
1629 * Normally we queue the msg for output. However, if the circuit is
1630 * dead or dying we must simulate a failure in the return direction
1631 * and throw the message away. The other end is not expecting any
1632 * further messages from us on this state.
1634 * Note that the I/O thread is responsible for generating the CRCs
1637 if (state
->flags
& DMSG_STATE_DYING
) {
1639 if ((state
->parent
->txcmd
& DMSGF_DELETE
) ||
1640 (state
->parent
->flags
& DMSG_STATE_DYING
) ||
1641 (state
->flags
& DMSG_STATE_DYING
)) {
1644 * Illegal message, kill state and related sub-state.
1645 * Cannot transmit if state is already dying.
1647 printf("dmsg_msg_write: Write to dying circuit "
1648 "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
1649 state
->parent
->rxcmd
,
1650 state
->parent
->txcmd
,
1651 state
->parent
->flags
);
1652 dmsg_state_hold(state
);
1653 dmsg_state_cleanuptx(iocom
, msg
);
1654 if ((state
->flags
& DMSG_STATE_ABORTING
) == 0) {
1655 dmsg_simulate_failure(state
, 1, DMSG_ERR_LOSTLINK
);
1657 dmsg_state_drop(state
);
1661 * Queue the message, clean up transmit state prior to queueing
1662 * to avoid SMP races.
1665 printf("dmsg_msg_write: commit msg state=%p to txkmsgq\n", state
);
1666 dmsg_state_cleanuptx(iocom
, msg
);
1667 TAILQ_INSERT_TAIL(&iocom
->txmsgq
, msg
, qentry
);
1669 write(iocom
->wakeupfds
[1], &dummy
, 1); /* XXX optimize me */
1671 pthread_mutex_unlock(&iocom
->mtx
);
1675 * Remove state from its parent's subq. This can wind up recursively
1676 * dropping the parent upward.
1678 * NOTE: iocom must be locked.
1680 * NOTE: Once we drop the parent, our pstate pointer may become invalid.
1684 dmsg_subq_delete(dmsg_state_t
*state
)
1686 dmsg_state_t
*pstate
;
1688 if (state
->flags
& DMSG_STATE_SUBINSERTED
) {
1689 pstate
= state
->parent
;
1691 if (pstate
->scan
== state
)
1692 pstate
->scan
= NULL
;
1693 TAILQ_REMOVE(&pstate
->subq
, state
, entry
);
1694 state
->flags
&= ~DMSG_STATE_SUBINSERTED
;
1695 state
->parent
= NULL
;
1696 if (TAILQ_EMPTY(&pstate
->subq
))
1697 dmsg_state_drop(pstate
);/* pstate->subq */
1698 pstate
= NULL
; /* safety */
1699 dmsg_state_drop(state
); /* pstate->subq */
1701 assert(state
->parent
== NULL
);
1706 * Simulate reception of a transaction DELETE message when the link goes
1707 * bad. This routine must recurse through state->subq and generate messages
1708 * and callbacks bottom-up.
1710 * iocom->mtx must be held by caller.
1714 dmsg_simulate_failure(dmsg_state_t
*state
, int meto
, int error
)
1716 dmsg_state_t
*substate
;
1718 dmsg_state_hold(state
);
1720 dmsg_state_abort(state
);
1723 * Recurse through sub-states.
1726 TAILQ_FOREACH(substate
, &state
->subq
, entry
) {
1727 if (substate
->flags
& DMSG_STATE_ABORTING
)
1729 state
->scan
= substate
;
1730 dmsg_simulate_failure(substate
, 1, error
);
1731 if (state
->scan
!= substate
)
1735 dmsg_state_drop(state
);
1740 dmsg_state_abort(dmsg_state_t
*state
)
1742 dmsg_iocom_t
*iocom
;
1746 * Set ABORTING and DYING, return if already set. If the state was
1747 * just allocated we defer the abort operation until the related
1748 * message is processed.
1750 if (state
->flags
& DMSG_STATE_ABORTING
)
1752 state
->flags
|= DMSG_STATE_ABORTING
;
1753 dmsg_state_dying(state
);
1754 if (state
->flags
& DMSG_STATE_NEW
) {
1755 printf("dmsg_state_abort(0): state %p rxcmd %08x txcmd %08x "
1756 "flags %08x - in NEW state\n",
1757 state
, state
->rxcmd
, state
->txcmd
, state
->flags
);
1762 * Simulate parent state failure before child states. Device
1763 * drivers need to understand this and flag the situation but might
1764 * have asynchronous operations in progress that they cannot stop.
1765 * To make things easier, parent states will not actually disappear
1766 * until the children are all gone.
1768 if ((state
->rxcmd
& DMSGF_DELETE
) == 0) {
1769 fprintf(stderr
, "SIMULATE ERROR\n");
1770 msg
= dmsg_msg_alloc_locked(state
, 0, DMSG_LNK_ERROR
,
1772 if ((state
->rxcmd
& DMSGF_CREATE
) == 0)
1773 msg
->any
.head
.cmd
|= DMSGF_CREATE
;
1774 msg
->any
.head
.cmd
|= DMSGF_DELETE
|
1775 (state
->rxcmd
& DMSGF_REPLY
);
1776 msg
->any
.head
.cmd
^= (DMSGF_REVTRANS
| DMSGF_REVCIRC
);
1777 msg
->any
.head
.error
= DMSG_ERR_LOSTLINK
;
1778 msg
->any
.head
.cmd
|= DMSGF_ABORT
;
1781 * Issue callback synchronously even though this isn't
1782 * the receiver thread. We need to issue the callback
1783 * before removing state from the subq in order to allow
1784 * the callback to reply.
1786 iocom
= state
->iocom
;
1787 dmsg_state_msgrx(msg
, 1);
1788 pthread_mutex_unlock(&iocom
->mtx
);
1789 iocom
->rcvmsg_callback(msg
);
1790 pthread_mutex_lock(&iocom
->mtx
);
1791 dmsg_state_cleanuprx(iocom
, msg
);
1793 TAILQ_INSERT_TAIL(&iocom
->ioq_rx
.msgq
, msg
, qentry
);
1794 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
1801 * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing
1802 * the transmission of any new messages on these states. This is done
1803 * atomically when parent state is terminating, whereas setting ABORTING is
1804 * not atomic and can leak races.
1808 dmsg_state_dying(dmsg_state_t
*state
)
1812 if ((state
->flags
& DMSG_STATE_DYING
) == 0) {
1813 state
->flags
|= DMSG_STATE_DYING
;
1814 TAILQ_FOREACH(scan
, &state
->subq
, entry
)
1815 dmsg_state_dying(scan
);
1820 * This is a shortcut to formulate a reply to msg with a simple error code,
1821 * It can reply to and terminate a transaction, or it can reply to a one-way
1822 * messages. A DMSG_LNK_ERROR command code is utilized to encode
1823 * the error code (which can be 0). Not all transactions are terminated
1824 * with DMSG_LNK_ERROR status (the low level only cares about the
1825 * MSGF_DELETE flag), but most are.
1827 * Replies to one-way messages are a bit of an oxymoron but the feature
1828 * is used by the debug (DBG) protocol.
1830 * The reply contains no extended data.
1833 dmsg_msg_reply(dmsg_msg_t
*msg
, uint32_t error
)
1835 dmsg_state_t
*state
= msg
->state
;
1840 * Reply with a simple error code and terminate the transaction.
1842 cmd
= DMSG_LNK_ERROR
;
1845 * Check if our direction has even been initiated yet, set CREATE.
1847 * Check what direction this is (command or reply direction). Note
1848 * that txcmd might not have been initiated yet.
1850 * If our direction has already been closed we just return without
1853 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1854 if (state
->txcmd
& DMSGF_DELETE
)
1856 if (state
->txcmd
& DMSGF_REPLY
)
1858 cmd
|= DMSGF_DELETE
;
1860 if ((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0)
1865 * Allocate the message and associate it with the existing state.
1866 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1867 * allocate new state. We have our state already.
1869 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1870 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1871 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1872 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1874 nmsg
->any
.head
.error
= error
;
1876 dmsg_msg_write(nmsg
);
1880 * Similar to dmsg_msg_reply() but leave the transaction open. That is,
1881 * we are generating a streaming reply or an intermediate acknowledgement
1882 * of some sort as part of the higher level protocol, with more to come
1886 dmsg_msg_result(dmsg_msg_t
*msg
, uint32_t error
)
1888 dmsg_state_t
*state
= msg
->state
;
1894 * Reply with a simple error code and terminate the transaction.
1896 cmd
= DMSG_LNK_ERROR
;
1899 * Check if our direction has even been initiated yet, set CREATE.
1901 * Check what direction this is (command or reply direction). Note
1902 * that txcmd might not have been initiated yet.
1904 * If our direction has already been closed we just return without
1907 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1908 if (state
->txcmd
& DMSGF_DELETE
)
1910 if (state
->txcmd
& DMSGF_REPLY
)
1912 /* continuing transaction, do not set MSGF_DELETE */
1914 if ((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0)
1917 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1918 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1919 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1920 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1922 nmsg
->any
.head
.error
= error
;
1924 dmsg_msg_write(nmsg
);
1928 * Terminate a transaction given a state structure by issuing a DELETE.
1929 * (the state structure must not be &iocom->state0)
1932 dmsg_state_reply(dmsg_state_t
*state
, uint32_t error
)
1935 uint32_t cmd
= DMSG_LNK_ERROR
| DMSGF_DELETE
;
1938 * Nothing to do if we already transmitted a delete
1940 if (state
->txcmd
& DMSGF_DELETE
)
1944 * Set REPLY if the other end initiated the command. Otherwise
1945 * we are the command direction.
1947 if (state
->txcmd
& DMSGF_REPLY
)
1950 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1951 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1952 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1953 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1955 nmsg
->any
.head
.error
= error
;
1956 dmsg_msg_write(nmsg
);
1960 * Terminate a transaction given a state structure by issuing a DELETE.
1961 * (the state structure must not be &iocom->state0)
1964 dmsg_state_result(dmsg_state_t
*state
, uint32_t error
)
1967 uint32_t cmd
= DMSG_LNK_ERROR
;
1970 * Nothing to do if we already transmitted a delete
1972 if (state
->txcmd
& DMSGF_DELETE
)
1976 * Set REPLY if the other end initiated the command. Otherwise
1977 * we are the command direction.
1979 if (state
->txcmd
& DMSGF_REPLY
)
1982 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1983 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1984 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1985 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1987 nmsg
->any
.head
.error
= error
;
1988 dmsg_msg_write(nmsg
);
1991 /************************************************************************
1992 * TRANSACTION STATE HANDLING *
1993 ************************************************************************
1998 * Process state tracking for a message after reception, prior to execution.
1999 * Possibly route the message (consuming it).
2001 * Called with msglk held and the msg dequeued.
2003 * All messages are called with dummy state and return actual state.
2004 * (One-off messages often just return the same dummy state).
2006 * May request that caller discard the message by setting *discardp to 1.
2007 * The returned state is not used in this case and is allowed to be NULL.
2011 * These routines handle persistent and command/reply message state via the
2012 * CREATE and DELETE flags. The first message in a command or reply sequence
2013 * sets CREATE, the last message in a command or reply sequence sets DELETE.
2015 * There can be any number of intermediate messages belonging to the same
2016 * sequence sent inbetween the CREATE message and the DELETE message,
2017 * which set neither flag. This represents a streaming command or reply.
2019 * Any command message received with CREATE set expects a reply sequence to
2020 * be returned. Reply sequences work the same as command sequences except the
2021 * REPLY bit is also sent. Both the command side and reply side can
2022 * degenerate into a single message with both CREATE and DELETE set. Note
2023 * that one side can be streaming and the other side not, or neither, or both.
2025 * The msgid is unique for the initiator. That is, two sides sending a new
2026 * message can use the same msgid without colliding.
2030 * The message may be running over a circuit. If the circuit is half-deleted
2031 * The message is typically racing against a link failure and must be thrown
2032 * out. As the circuit deletion propagates the library will automatically
2033 * generate terminations for sub states.
2037 * ABORT sequences work by setting the ABORT flag along with normal message
2038 * state. However, ABORTs can also be sent on half-closed messages, that is
2039 * even if the command or reply side has already sent a DELETE, as long as
2040 * the message has not been fully closed it can still send an ABORT+DELETE
2041 * to terminate the half-closed message state.
2043 * Since ABORT+DELETEs can race we silently discard ABORT's for message
2044 * state which has already been fully closed. REPLY+ABORT+DELETEs can
2045 * also race, and in this situation the other side might have already
2046 * initiated a new unrelated command with the same message id. Since
2047 * the abort has not set the CREATE flag the situation can be detected
2048 * and the message will also be discarded.
2050 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
2051 * The ABORT request is essentially integrated into the command instead
2052 * of being sent later on. In this situation the command implementation
2053 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
2054 * special-case non-blocking operation for the command.
2056 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
2057 * to be mid-stream aborts for command/reply sequences. ABORTs on
2058 * one-way messages are not supported.
2060 * NOTE! If a command sequence does not support aborts the ABORT flag is
2065 * One-off messages (no reply expected) are sent without an established
2066 * transaction. CREATE and DELETE are left clear and the msgid is usually 0.
2067 * For one-off messages sent over circuits msgid generally MUST be 0.
2069 * One-off messages cannot be aborted and typically aren't processed
2070 * by these routines. Order is still guaranteed for messages sent over
2071 * the same circuit. The REPLY bit can be used to distinguish whether
2072 * a one-off message is a command or reply. For example, one-off replies
2073 * will typically just contain status updates.
2076 dmsg_state_msgrx(dmsg_msg_t
*msg
, int mstate
)
2078 dmsg_iocom_t
*iocom
= msg
->state
->iocom
;
2079 dmsg_state_t
*state
;
2080 dmsg_state_t
*pstate
;
2081 dmsg_state_t sdummy
;
2084 pthread_mutex_lock(&iocom
->mtx
);
2088 "msgrx: cmd=%08x msgid=%016jx "
2089 "circuit=%016jx error=%d\n",
2091 msg
->any
.head
.msgid
,
2092 msg
->any
.head
.circuit
,
2093 msg
->any
.head
.error
);
2097 * Lookup the circuit (pstate). The circuit will be an open
2098 * transaction. The REVCIRC bit in the message tells us which side
2101 * If mstate is non-zero the state has already been incorporated
2102 * into the message as part of a simulated abort. Note that in this
2103 * situation the parent state may have already been removed from
2107 pstate
= msg
->state
->parent
;
2108 } else if (msg
->any
.head
.circuit
) {
2109 sdummy
.msgid
= msg
->any
.head
.circuit
;
2111 if (msg
->any
.head
.cmd
& DMSGF_REVCIRC
) {
2112 pstate
= RB_FIND(dmsg_state_tree
,
2113 &iocom
->statewr_tree
,
2116 pstate
= RB_FIND(dmsg_state_tree
,
2117 &iocom
->staterd_tree
,
2122 * If we cannot find the circuit throw the message away.
2123 * The state will have already been taken care of by
2124 * the simulated failure code. This case can occur due
2125 * to a failure propagating in one direction crossing a
2126 * request on the failed circuit propagating in the other
2129 if (pstate
== NULL
) {
2131 "missing parent in stacked trans %s\n",
2133 pthread_mutex_unlock(&iocom
->mtx
);
2134 error
= DMSG_IOQ_ERROR_EALREADY
;
2139 pstate
= &iocom
->state0
;
2141 /* WARNING: pstate not (yet) refd */
2146 * If mstate is non-zero the state has already been incorporated
2147 * into the message as part of a simulated abort. Note that in this
2148 * situation the state may have already been removed from the RBTREE.
2150 * If received msg is a command state is on staterd_tree.
2151 * If received msg is a reply state is on statewr_tree.
2152 * Otherwise there is no state (retain &iocom->state0)
2157 sdummy
.msgid
= msg
->any
.head
.msgid
;
2158 if (msg
->any
.head
.cmd
& DMSGF_REVTRANS
) {
2159 state
= RB_FIND(dmsg_state_tree
,
2160 &iocom
->statewr_tree
, &sdummy
);
2162 state
= RB_FIND(dmsg_state_tree
,
2163 &iocom
->staterd_tree
, &sdummy
);
2169 "msgrx:\tstate %p(%08x)",
2170 state
, (state
? state
->icmd
: 0));
2171 if (pstate
!= &iocom
->state0
) {
2174 pstate
, pstate
->icmd
);
2176 fprintf(stderr
, "\n");
2180 /* state already assigned to msg */
2183 * Message over an existing transaction (CREATE should not
2186 dmsg_state_drop(msg
->state
);
2187 dmsg_state_hold(state
);
2189 assert(pstate
== state
->parent
);
2192 * Either a new transaction (if CREATE set) or a one-off.
2198 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2199 * inside the case statements.
2201 * Construct new state as necessary.
2203 switch(msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_DELETE
|
2206 case DMSGF_CREATE
| DMSGF_DELETE
:
2208 * Create new sub-transaction under pstate.
2209 * (any DELETE is handled in post-processing of msg).
2211 * (During routing the msgid was made unique for this
2212 * direction over the comlink, so our RB trees can be
2213 * iocom-based instead of state-based).
2215 if (state
!= pstate
) {
2217 "duplicate transaction %s\n",
2219 error
= DMSG_IOQ_ERROR_TRANS
;
2225 * Allocate the new state.
2227 state
= malloc(sizeof(*state
));
2228 bzero(state
, sizeof(*state
));
2229 atomic_add_int(&dmsg_state_count
, 1);
2231 TAILQ_INIT(&state
->subq
);
2232 dmsg_state_hold(pstate
);
2233 state
->parent
= pstate
;
2234 state
->iocom
= iocom
;
2235 state
->flags
= DMSG_STATE_DYNAMIC
|
2236 DMSG_STATE_OPPOSITE
;
2237 state
->msgid
= msg
->any
.head
.msgid
;
2238 state
->txcmd
= DMSGF_REPLY
;
2239 state
->rxcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
2240 state
->icmd
= state
->rxcmd
& DMSGF_BASECMDMASK
;
2241 state
->flags
&= ~DMSG_STATE_NEW
;
2244 RB_INSERT(dmsg_state_tree
, &iocom
->staterd_tree
, state
);
2245 if (TAILQ_EMPTY(&pstate
->subq
))
2246 dmsg_state_hold(pstate
);/* pstate->subq */
2247 TAILQ_INSERT_TAIL(&pstate
->subq
, state
, entry
);
2248 state
->flags
|= DMSG_STATE_SUBINSERTED
|
2249 DMSG_STATE_RBINSERTED
;
2250 dmsg_state_hold(state
); /* pstate->subq */
2251 dmsg_state_hold(state
); /* state on rbtree */
2252 dmsg_state_hold(state
); /* msg->state */
2255 * If the parent is a relay set up the state handler to
2256 * automatically route the message. Local processing will
2259 * (state relays are seeded by SPAN processing)
2262 state
->func
= dmsg_state_relay
;
2267 * Persistent state is expected but might not exist if an
2268 * ABORT+DELETE races the close.
2270 * (any DELETE is handled in post-processing of msg).
2272 if (state
== pstate
) {
2273 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2274 error
= DMSG_IOQ_ERROR_EALREADY
;
2276 fprintf(stderr
, "missing-state %s\n",
2278 error
= DMSG_IOQ_ERROR_TRANS
;
2285 * Handle another ABORT+DELETE case if the msgid has already
2288 if ((state
->rxcmd
& DMSGF_CREATE
) == 0) {
2289 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2290 error
= DMSG_IOQ_ERROR_EALREADY
;
2292 fprintf(stderr
, "reused-state %s\n",
2294 error
= DMSG_IOQ_ERROR_TRANS
;
2303 * Check for mid-stream ABORT command received, otherwise
2306 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2307 if ((state
== pstate
) ||
2308 (state
->rxcmd
& DMSGF_CREATE
) == 0) {
2309 error
= DMSG_IOQ_ERROR_EALREADY
;
2315 case DMSGF_REPLY
| DMSGF_CREATE
:
2316 case DMSGF_REPLY
| DMSGF_CREATE
| DMSGF_DELETE
:
2318 * When receiving a reply with CREATE set the original
2319 * persistent state message should already exist.
2321 if (state
== pstate
) {
2322 fprintf(stderr
, "no-state(r) %s\n",
2324 error
= DMSG_IOQ_ERROR_TRANS
;
2328 assert(((state
->rxcmd
^ msg
->any
.head
.cmd
) & DMSGF_REPLY
) == 0);
2329 state
->rxcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
2332 case DMSGF_REPLY
| DMSGF_DELETE
:
2334 * Received REPLY+ABORT+DELETE in case where msgid has
2335 * already been fully closed, ignore the message.
2337 if (state
== pstate
) {
2338 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2339 error
= DMSG_IOQ_ERROR_EALREADY
;
2341 fprintf(stderr
, "no-state(r,d) %s\n",
2343 error
= DMSG_IOQ_ERROR_TRANS
;
2350 * Received REPLY+ABORT+DELETE in case where msgid has
2351 * already been reused for an unrelated message,
2352 * ignore the message.
2354 if ((state
->rxcmd
& DMSGF_CREATE
) == 0) {
2355 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2356 error
= DMSG_IOQ_ERROR_EALREADY
;
2358 fprintf(stderr
, "reused-state(r,d) %s\n",
2360 error
= DMSG_IOQ_ERROR_TRANS
;
2369 * Check for mid-stream ABORT reply received to sent command.
2371 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2372 if (state
== pstate
||
2373 (state
->rxcmd
& DMSGF_CREATE
) == 0) {
2374 error
= DMSG_IOQ_ERROR_EALREADY
;
2383 * Calculate the easy-switch() transactional command. Represents
2384 * the outer-transaction command for any transaction-create or
2385 * transaction-delete, and the inner message command for any
2386 * non-transaction or inside-transaction command. tcmd will be
2387 * set to 0 for any messaging error condition.
2389 * The two can be told apart because outer-transaction commands
2390 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2392 if (msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_DELETE
)) {
2393 if ((msg
->state
->flags
& DMSG_STATE_ROOT
) == 0) {
2394 msg
->tcmd
= (state
->icmd
& DMSGF_BASECMDMASK
) |
2395 (msg
->any
.head
.cmd
& (DMSGF_CREATE
|
2402 msg
->tcmd
= msg
->any
.head
.cmd
& DMSGF_CMDSWMASK
;
2405 #ifdef DMSG_BLOCK_DEBUG
2406 switch (msg
->tcmd
) {
2407 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
:
2408 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
:
2409 fprintf(stderr
, "read BIO %-3d %016jx %d@%016jx\n",
2410 biocount
, msg
->any
.head
.msgid
,
2411 msg
->any
.blk_read
.bytes
,
2412 msg
->any
.blk_read
.offset
);
2414 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2415 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2416 fprintf(stderr
, "rread BIO %-3d %016jx %d@%016jx\n",
2417 biocount
, msg
->any
.head
.msgid
,
2418 msg
->any
.blk_read
.bytes
,
2419 msg
->any
.blk_read
.offset
);
2427 * Adjust state, mark receive side as DELETED if appropriate and
2428 * adjust RB tree if both sides are DELETED. cleanuprx handles
2429 * the rest after the state callback returns.
2431 assert(msg
->state
->iocom
== iocom
);
2432 assert(msg
->state
== state
);
2434 if (state
->flags
& DMSG_STATE_ROOT
) {
2436 * Nothing to do for non-transactional messages.
2438 } else if (msg
->any
.head
.cmd
& DMSGF_DELETE
) {
2440 * Message terminating transaction, remove the state from
2441 * the RB tree if the full transaction is now complete.
2442 * The related state, subq, and parent link is retained
2443 * until after the state callback is complete.
2445 assert((state
->rxcmd
& DMSGF_DELETE
) == 0);
2446 state
->rxcmd
|= DMSGF_DELETE
;
2447 if (state
->txcmd
& DMSGF_DELETE
) {
2448 assert(state
->flags
& DMSG_STATE_RBINSERTED
);
2449 if (state
->rxcmd
& DMSGF_REPLY
) {
2450 assert(msg
->any
.head
.cmd
& DMSGF_REPLY
);
2451 RB_REMOVE(dmsg_state_tree
,
2452 &iocom
->statewr_tree
, state
);
2454 assert((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0);
2455 RB_REMOVE(dmsg_state_tree
,
2456 &iocom
->staterd_tree
, state
);
2458 state
->flags
&= ~DMSG_STATE_RBINSERTED
;
2459 dmsg_state_drop(state
);
2463 pthread_mutex_unlock(&iocom
->mtx
);
2465 if (DMsgDebugOpt
&& error
)
2466 fprintf(stderr
, "msgrx: error %d\n", error
);
2472 * Route the message and handle pair-state processing.
2475 dmsg_state_relay(dmsg_msg_t
*lmsg
)
2477 dmsg_state_t
*lpstate
;
2478 dmsg_state_t
*rpstate
;
2479 dmsg_state_t
*lstate
;
2480 dmsg_state_t
*rstate
;
2483 #ifdef DMSG_BLOCK_DEBUG
2484 switch (lmsg
->tcmd
) {
2485 case DMSG_BLK_OPEN
| DMSGF_CREATE
:
2486 fprintf(stderr
, "relay BIO_OPEN (CREATE)\n");
2488 case DMSG_BLK_OPEN
| DMSGF_DELETE
:
2489 fprintf(stderr
, "relay BIO_OPEN (DELETE)\n");
2491 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
:
2492 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
:
2493 atomic_add_int(&biocount
, 1);
2494 fprintf(stderr
, "relay BIO %-3d %016jx %d@%016jx\n",
2495 biocount
, lmsg
->any
.head
.msgid
,
2496 lmsg
->any
.blk_read
.bytes
,
2497 lmsg
->any
.blk_read
.offset
);
2499 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2500 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2501 fprintf(stderr
, "retrn BIO %-3d %016jx %d@%016jx\n",
2502 biocount
, lmsg
->any
.head
.msgid
,
2503 lmsg
->any
.blk_read
.bytes
,
2504 lmsg
->any
.blk_read
.offset
);
2505 atomic_add_int(&biocount
, -1);
2512 if ((lmsg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_REPLY
)) ==
2515 * New sub-transaction, establish new state and relay.
2517 lstate
= lmsg
->state
;
2518 lpstate
= lstate
->parent
;
2519 rpstate
= lpstate
->relay
;
2520 assert(lstate
->relay
== NULL
);
2521 assert(rpstate
!= NULL
);
2523 rmsg
= dmsg_msg_alloc(rpstate
, 0,
2525 dmsg_state_relay
, NULL
);
2526 rstate
= rmsg
->state
;
2527 rstate
->relay
= lstate
;
2528 lstate
->relay
= rstate
;
2529 dmsg_state_hold(lstate
);
2530 dmsg_state_hold(rstate
);
2533 * State & relay already established
2535 lstate
= lmsg
->state
;
2536 rstate
= lstate
->relay
;
2537 assert(rstate
!= NULL
);
2539 assert((rstate
->txcmd
& DMSGF_DELETE
) == 0);
2542 if (lstate
->flags
& DMSG_STATE_ABORTING
) {
2544 "relay: relay lost link l=%p r=%p\n",
2546 dmsg_simulate_failure(rstate
, 0, DMSG_ERR_LOSTLINK
);
2550 rmsg
= dmsg_msg_alloc(rstate
, 0,
2552 dmsg_state_relay
, NULL
);
2554 if (lmsg
->hdr_size
> sizeof(lmsg
->any
.head
)) {
2555 bcopy(&lmsg
->any
.head
+ 1, &rmsg
->any
.head
+ 1,
2556 lmsg
->hdr_size
- sizeof(lmsg
->any
.head
));
2558 rmsg
->any
.head
.error
= lmsg
->any
.head
.error
;
2559 rmsg
->any
.head
.reserved02
= lmsg
->any
.head
.reserved02
;
2560 rmsg
->any
.head
.reserved18
= lmsg
->any
.head
.reserved18
;
2561 rmsg
->aux_size
= lmsg
->aux_size
;
2562 rmsg
->aux_data
= lmsg
->aux_data
;
2563 lmsg
->aux_data
= NULL
;
2566 fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2568 dmsg_msg_write(rmsg
);
2572 * Cleanup and retire msg after issuing the state callback. The state
2573 * has already been removed from the RB tree. The subq and msg must be
2576 * Called with the iocom mutex held (to handle subq disconnection).
2579 dmsg_state_cleanuprx(dmsg_iocom_t
*iocom
, dmsg_msg_t
*msg
)
2581 dmsg_state_t
*state
;
2583 assert(msg
->state
->iocom
== iocom
);
2585 if (state
->flags
& DMSG_STATE_ROOT
) {
2587 * Free a non-transactional message, there is no state
2591 } else if ((state
->flags
& DMSG_STATE_SUBINSERTED
) &&
2592 (state
->rxcmd
& DMSGF_DELETE
) &&
2593 (state
->txcmd
& DMSGF_DELETE
)) {
2595 * Must disconnect from parent and drop relay.
2597 dmsg_subq_delete(state
);
2599 dmsg_state_drop(state
->relay
);
2600 state
->relay
= NULL
;
2605 * Message not terminating transaction, leave state intact
2606 * and free message if it isn't the CREATE message.
2613 * Clean up the state after pulling out needed fields and queueing the
2614 * message for transmission. This occurs in dmsg_msg_write().
2616 * Called with the mutex locked.
2619 dmsg_state_cleanuptx(dmsg_iocom_t
*iocom
, dmsg_msg_t
*msg
)
2621 dmsg_state_t
*state
;
2623 assert(iocom
== msg
->state
->iocom
);
2626 dmsg_state_hold(state
);
2628 if (state
->flags
& DMSG_STATE_ROOT
) {
2630 } else if (msg
->any
.head
.cmd
& DMSGF_DELETE
) {
2632 * Message terminating transaction, destroy the related
2633 * state, the original message, and this message (if it
2634 * isn't the original message due to a CREATE|DELETE).
2636 * It's possible for governing state to terminate while
2637 * sub-transactions still exist. This is allowed but
2638 * will cause sub-transactions to recursively fail.
2639 * Further reception of sub-transaction messages will be
2640 * impossible because the circuit will no longer exist.
2641 * (XXX need code to make sure that happens properly).
2643 * NOTE: It is possible for a fafilure to terminate the
2644 * state after we have written the message but before
2645 * we are able to call cleanuptx, so txcmd might already
2646 * have DMSGF_DELETE set.
2648 if ((state
->txcmd
& DMSGF_DELETE
) == 0 &&
2649 (state
->rxcmd
& DMSGF_DELETE
)) {
2650 state
->txcmd
|= DMSGF_DELETE
;
2651 assert(state
->flags
& DMSG_STATE_RBINSERTED
);
2652 if (state
->txcmd
& DMSGF_REPLY
) {
2653 assert(msg
->any
.head
.cmd
& DMSGF_REPLY
);
2654 RB_REMOVE(dmsg_state_tree
,
2655 &iocom
->staterd_tree
, state
);
2657 assert((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0);
2658 RB_REMOVE(dmsg_state_tree
,
2659 &iocom
->statewr_tree
, state
);
2661 state
->flags
&= ~DMSG_STATE_RBINSERTED
;
2662 dmsg_subq_delete(state
);
2665 dmsg_state_drop(state
->relay
);
2666 state
->relay
= NULL
;
2668 dmsg_state_drop(state
); /* state->rbtree */
2669 } else if ((state
->txcmd
& DMSGF_DELETE
) == 0) {
2670 state
->txcmd
|= DMSGF_DELETE
;
2675 * Deferred abort after transmission.
2677 if ((state
->flags
& (DMSG_STATE_ABORTING
| DMSG_STATE_DYING
)) &&
2678 (state
->rxcmd
& DMSGF_DELETE
) == 0) {
2679 printf("kdmsg_state_cleanuptx: state=%p "
2680 "executing deferred abort\n",
2682 state
->flags
&= ~DMSG_STATE_ABORTING
;
2683 dmsg_simulate_failure(state
, 1, DMSG_ERR_LOSTLINK
);
2686 dmsg_state_drop(state
);
2690 * Called with or without locks
2693 dmsg_state_hold(dmsg_state_t
*state
)
2695 atomic_add_int(&state
->refs
, 1);
2699 dmsg_state_drop(dmsg_state_t
*state
)
2701 assert(state
->refs
> 0);
2702 if (atomic_fetchadd_int(&state
->refs
, -1) == 1)
2703 dmsg_state_free(state
);
2707 * Called with iocom locked
2710 dmsg_state_free(dmsg_state_t
*state
)
2712 atomic_add_int(&dmsg_state_count
, -1);
2714 fprintf(stderr
, "terminate state %p\n", state
);
2716 assert((state
->flags
& (DMSG_STATE_ROOT
|
2717 DMSG_STATE_SUBINSERTED
|
2718 DMSG_STATE_RBINSERTED
)) == 0);
2719 assert(TAILQ_EMPTY(&state
->subq
));
2720 assert(state
->refs
== 0);
2721 if (state
->any
.any
!= NULL
) /* XXX avoid deadlock w/exit & kernel */
2723 assert(state
->any
.any
== NULL
);
2728 * This swaps endian for a hammer2_msg_hdr. Note that the extended
2729 * header is not adjusted, just the core header.
2732 dmsg_bswap_head(dmsg_hdr_t
*head
)
2734 head
->magic
= bswap16(head
->magic
);
2735 head
->reserved02
= bswap16(head
->reserved02
);
2736 head
->salt
= bswap32(head
->salt
);
2738 head
->msgid
= bswap64(head
->msgid
);
2739 head
->circuit
= bswap64(head
->circuit
);
2740 head
->reserved18
= bswap64(head
->reserved18
);
2742 head
->cmd
= bswap32(head
->cmd
);
2743 head
->aux_crc
= bswap32(head
->aux_crc
);
2744 head
->aux_bytes
= bswap32(head
->aux_bytes
);
2745 head
->error
= bswap32(head
->error
);
2746 head
->aux_descr
= bswap64(head
->aux_descr
);
2747 head
->reserved38
= bswap32(head
->reserved38
);
2748 head
->hdr_crc
= bswap32(head
->hdr_crc
);