2 * Copyright (c) 2011-2015 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
36 #include "dmsg_local.h"
38 #define DMSG_BLOCK_DEBUG
42 #ifdef DMSG_BLOCK_DEBUG
46 static int dmsg_state_msgrx(dmsg_msg_t
*msg
, int mstate
);
47 static void dmsg_state_cleanuptx(dmsg_iocom_t
*iocom
, dmsg_msg_t
*msg
);
48 static void dmsg_msg_free_locked(dmsg_msg_t
*msg
);
49 static void dmsg_state_free(dmsg_state_t
*state
);
50 static void dmsg_subq_delete(dmsg_state_t
*state
);
51 static void dmsg_simulate_failure(dmsg_state_t
*state
, int meto
, int error
);
52 static void dmsg_state_abort(dmsg_state_t
*state
);
53 static void dmsg_state_dying(dmsg_state_t
*state
);
55 RB_GENERATE(dmsg_state_tree
, dmsg_state
, rbnode
, dmsg_state_cmp
);
58 * STATE TREE - Represents open transactions which are indexed by their
59 * { msgid } relative to the governing iocom.
62 dmsg_state_cmp(dmsg_state_t
*state1
, dmsg_state_t
*state2
)
64 if (state1
->msgid
< state2
->msgid
)
66 if (state1
->msgid
> state2
->msgid
)
72 * Initialize a low-level ioq
75 dmsg_ioq_init(dmsg_iocom_t
*iocom __unused
, dmsg_ioq_t
*ioq
)
77 bzero(ioq
, sizeof(*ioq
));
78 ioq
->state
= DMSG_MSGQ_STATE_HEADER1
;
79 TAILQ_INIT(&ioq
->msgq
);
85 * caller holds iocom->mtx.
88 dmsg_ioq_done(dmsg_iocom_t
*iocom __unused
, dmsg_ioq_t
*ioq
)
92 while ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
93 assert(0); /* shouldn't happen */
94 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
97 if ((msg
= ioq
->msg
) != NULL
) {
104 * Initialize a low-level communications channel.
106 * NOTE: The signal_func() is called at least once from the loop and can be
107 * re-armed via dmsg_iocom_restate().
110 dmsg_iocom_init(dmsg_iocom_t
*iocom
, int sock_fd
, int alt_fd
,
111 void (*signal_func
)(dmsg_iocom_t
*iocom
),
112 void (*rcvmsg_func
)(dmsg_msg_t
*msg
),
113 void (*usrmsg_func
)(dmsg_msg_t
*msg
, int unmanaged
),
114 void (*altmsg_func
)(dmsg_iocom_t
*iocom
))
118 bzero(iocom
, sizeof(*iocom
));
120 asprintf(&iocom
->label
, "iocom-%p", iocom
);
121 iocom
->signal_callback
= signal_func
;
122 iocom
->rcvmsg_callback
= rcvmsg_func
;
123 iocom
->altmsg_callback
= altmsg_func
;
124 iocom
->usrmsg_callback
= usrmsg_func
;
126 pthread_mutex_init(&iocom
->mtx
, NULL
);
127 RB_INIT(&iocom
->staterd_tree
);
128 RB_INIT(&iocom
->statewr_tree
);
129 TAILQ_INIT(&iocom
->txmsgq
);
130 iocom
->sock_fd
= sock_fd
;
131 iocom
->alt_fd
= alt_fd
;
132 iocom
->flags
= DMSG_IOCOMF_RREQ
| DMSG_IOCOMF_CLOSEALT
;
134 iocom
->flags
|= DMSG_IOCOMF_SWORK
;
135 dmsg_ioq_init(iocom
, &iocom
->ioq_rx
);
136 dmsg_ioq_init(iocom
, &iocom
->ioq_tx
);
137 iocom
->state0
.refs
= 1; /* should never trigger a free */
138 iocom
->state0
.iocom
= iocom
;
139 iocom
->state0
.parent
= &iocom
->state0
;
140 iocom
->state0
.flags
= DMSG_STATE_ROOT
;
141 TAILQ_INIT(&iocom
->state0
.subq
);
143 if (pipe(iocom
->wakeupfds
) < 0)
145 fcntl(iocom
->wakeupfds
[0], F_SETFL
, O_NONBLOCK
);
146 fcntl(iocom
->wakeupfds
[1], F_SETFL
, O_NONBLOCK
);
149 * Negotiate session crypto synchronously. This will mark the
150 * connection as error'd if it fails. If this is a pipe it's
151 * a linkage that we set up ourselves to the filesystem and there
154 if (fstat(sock_fd
, &st
) < 0)
156 if (S_ISSOCK(st
.st_mode
))
157 dmsg_crypto_negotiate(iocom
);
160 * Make sure our fds are set to non-blocking for the iocom core.
163 fcntl(sock_fd
, F_SETFL
, O_NONBLOCK
);
165 /* if line buffered our single fgets() should be fine */
167 fcntl(alt_fd
, F_SETFL
, O_NONBLOCK
);
172 dmsg_iocom_label(dmsg_iocom_t
*iocom
, const char *ctl
, ...)
179 vasprintf(&iocom
->label
, ctl
, va
);
186 * May only be called from a callback from iocom_core.
188 * Adjust state machine functions, set flags to guarantee that both
189 * the recevmsg_func and the sendmsg_func is called at least once.
192 dmsg_iocom_restate(dmsg_iocom_t
*iocom
,
193 void (*signal_func
)(dmsg_iocom_t
*),
194 void (*rcvmsg_func
)(dmsg_msg_t
*msg
))
196 pthread_mutex_lock(&iocom
->mtx
);
197 iocom
->signal_callback
= signal_func
;
198 iocom
->rcvmsg_callback
= rcvmsg_func
;
200 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
202 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
203 pthread_mutex_unlock(&iocom
->mtx
);
207 dmsg_iocom_signal(dmsg_iocom_t
*iocom
)
209 pthread_mutex_lock(&iocom
->mtx
);
210 if (iocom
->signal_callback
)
211 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
212 pthread_mutex_unlock(&iocom
->mtx
);
216 * Cleanup a terminating iocom.
218 * Caller should not hold iocom->mtx. The iocom has already been disconnected
219 * from all possible references to it.
222 dmsg_iocom_done(dmsg_iocom_t
*iocom
)
224 if (iocom
->sock_fd
>= 0) {
225 close(iocom
->sock_fd
);
228 if (iocom
->alt_fd
>= 0 && (iocom
->flags
& DMSG_IOCOMF_CLOSEALT
)) {
229 close(iocom
->alt_fd
);
232 dmsg_ioq_done(iocom
, &iocom
->ioq_rx
);
233 dmsg_ioq_done(iocom
, &iocom
->ioq_tx
);
234 if (iocom
->wakeupfds
[0] >= 0) {
235 close(iocom
->wakeupfds
[0]);
236 iocom
->wakeupfds
[0] = -1;
238 if (iocom
->wakeupfds
[1] >= 0) {
239 close(iocom
->wakeupfds
[1]);
240 iocom
->wakeupfds
[1] = -1;
242 pthread_mutex_destroy(&iocom
->mtx
);
246 * Allocate a new message using the specified transaction state.
248 * If CREATE is set a new transaction is allocated relative to the passed-in
249 * transaction (the 'state' argument becomes pstate).
251 * If CREATE is not set the message is associated with the passed-in
255 dmsg_msg_alloc(dmsg_state_t
*state
,
256 size_t aux_size
, uint32_t cmd
,
257 void (*func
)(dmsg_msg_t
*), void *data
)
259 dmsg_iocom_t
*iocom
= state
->iocom
;
262 pthread_mutex_lock(&iocom
->mtx
);
263 msg
= dmsg_msg_alloc_locked(state
, aux_size
, cmd
, func
, data
);
264 pthread_mutex_unlock(&iocom
->mtx
);
270 dmsg_msg_alloc_locked(dmsg_state_t
*state
,
271 size_t aux_size
, uint32_t cmd
,
272 void (*func
)(dmsg_msg_t
*), void *data
)
274 dmsg_iocom_t
*iocom
= state
->iocom
;
275 dmsg_state_t
*pstate
;
280 aligned_size
= DMSG_DOALIGN(aux_size
);
281 if ((cmd
& (DMSGF_CREATE
| DMSGF_REPLY
)) == DMSGF_CREATE
) {
283 * When CREATE is set without REPLY the caller is
284 * initiating a new transaction stacked under the specified
287 * It is possible to race a circuit failure, inherit the
288 * parent's STATE_DYING flag to trigger an abort sequence
289 * in the transmit path. By not inheriting ABORTING the
290 * abort sequence can recurse.
292 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
293 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
296 state
= malloc(sizeof(*state
));
297 bzero(state
, sizeof(*state
));
298 atomic_add_int(&dmsg_state_count
, 1);
300 TAILQ_INIT(&state
->subq
);
301 state
->parent
= pstate
;
302 state
->iocom
= iocom
;
303 state
->flags
= DMSG_STATE_DYNAMIC
;
304 state
->msgid
= (uint64_t)(uintptr_t)state
;
305 state
->txcmd
= cmd
& ~(DMSGF_CREATE
| DMSGF_DELETE
);
306 state
->rxcmd
= DMSGF_REPLY
;
307 state
->icmd
= state
->txcmd
& DMSGF_BASECMDMASK
;
309 state
->any
.any
= data
;
311 state
->flags
|= DMSG_STATE_SUBINSERTED
|
312 DMSG_STATE_RBINSERTED
;
313 state
->flags
|= pstate
->flags
& DMSG_STATE_DYING
;
314 if (TAILQ_EMPTY(&pstate
->subq
))
315 dmsg_state_hold(pstate
);
316 RB_INSERT(dmsg_state_tree
, &iocom
->statewr_tree
, state
);
317 TAILQ_INSERT_TAIL(&pstate
->subq
, state
, entry
);
318 dmsg_state_hold(state
); /* state on pstate->subq */
319 dmsg_state_hold(state
); /* state on rbtree */
320 dmsg_state_hold(state
); /* msg->state */
323 * Otherwise the message is transmitted over the existing
326 pstate
= state
->parent
;
327 dmsg_state_hold(state
); /* msg->state */
330 /* XXX SMP race for state */
331 hbytes
= (cmd
& DMSGF_SIZE
) * DMSG_ALIGN
;
332 assert((size_t)hbytes
>= sizeof(struct dmsg_hdr
));
333 msg
= malloc(offsetof(struct dmsg_msg
, any
.head
) + hbytes
);
334 bzero(msg
, offsetof(struct dmsg_msg
, any
.head
));
337 * [re]allocate the auxillary data buffer. The caller knows that
338 * a size-aligned buffer will be allocated but we do not want to
339 * force the caller to zero any tail piece, so we do that ourself.
341 if (msg
->aux_size
!= aux_size
) {
344 msg
->aux_data
= NULL
;
348 msg
->aux_data
= malloc(aligned_size
);
349 msg
->aux_size
= aux_size
;
350 if (aux_size
!= aligned_size
) {
351 bzero(msg
->aux_data
+ aux_size
,
352 aligned_size
- aux_size
);
358 * Set REVTRANS if the transaction was remotely initiated
359 * Set REVCIRC if the circuit was remotely initiated
361 if (state
->flags
& DMSG_STATE_OPPOSITE
)
362 cmd
|= DMSGF_REVTRANS
;
363 if (pstate
->flags
& DMSG_STATE_OPPOSITE
)
364 cmd
|= DMSGF_REVCIRC
;
367 * Finish filling out the header.
369 bzero(&msg
->any
.head
, hbytes
);
370 msg
->hdr_size
= hbytes
;
371 msg
->any
.head
.magic
= DMSG_HDR_MAGIC
;
372 msg
->any
.head
.cmd
= cmd
;
373 msg
->any
.head
.aux_descr
= 0;
374 msg
->any
.head
.aux_crc
= 0;
375 msg
->any
.head
.msgid
= state
->msgid
;
376 msg
->any
.head
.circuit
= pstate
->msgid
;
383 * Free a message so it can be reused afresh.
385 * NOTE: aux_size can be 0 with a non-NULL aux_data.
389 dmsg_msg_free_locked(dmsg_msg_t
*msg
)
393 if ((state
= msg
->state
) != NULL
) {
394 dmsg_state_drop(state
);
395 msg
->state
= NULL
; /* safety */
399 msg
->aux_data
= NULL
; /* safety */
406 dmsg_msg_free(dmsg_msg_t
*msg
)
408 dmsg_iocom_t
*iocom
= msg
->state
->iocom
;
410 pthread_mutex_lock(&iocom
->mtx
);
411 dmsg_msg_free_locked(msg
);
412 pthread_mutex_unlock(&iocom
->mtx
);
416 * I/O core loop for an iocom.
418 * Thread localized, iocom->mtx not held.
421 dmsg_iocom_core(dmsg_iocom_t
*iocom
)
423 struct pollfd fds
[3];
428 int wi
; /* wakeup pipe */
430 int ai
; /* alt bulk path socket */
432 while ((iocom
->flags
& DMSG_IOCOMF_EOF
) == 0) {
434 * These iocom->flags are only manipulated within the
435 * context of the current thread. However, modifications
436 * still require atomic ops.
438 dmio_printf(iocom
, 5, "iocom %p %08x\n",
439 iocom
, iocom
->flags
);
440 if ((iocom
->flags
& (DMSG_IOCOMF_RWORK
|
445 DMSG_IOCOMF_AWWORK
)) == 0) {
447 * Only poll if no immediate work is pending.
448 * Otherwise we are just wasting our time calling
459 * Always check the inter-thread pipe, e.g.
460 * for iocom->txmsgq work.
463 fds
[wi
].fd
= iocom
->wakeupfds
[0];
464 fds
[wi
].events
= POLLIN
;
468 * Check the socket input/output direction as
471 if (iocom
->flags
& (DMSG_IOCOMF_RREQ
|
474 fds
[si
].fd
= iocom
->sock_fd
;
478 if (iocom
->flags
& DMSG_IOCOMF_RREQ
)
479 fds
[si
].events
|= POLLIN
;
480 if (iocom
->flags
& DMSG_IOCOMF_WREQ
)
481 fds
[si
].events
|= POLLOUT
;
485 * Check the alternative fd for work.
487 if (iocom
->alt_fd
>= 0) {
489 fds
[ai
].fd
= iocom
->alt_fd
;
490 fds
[ai
].events
= POLLIN
;
493 poll(fds
, count
, timeout
);
495 if (wi
>= 0 && (fds
[wi
].revents
& POLLIN
))
496 atomic_set_int(&iocom
->flags
,
498 if (si
>= 0 && (fds
[si
].revents
& POLLIN
))
499 atomic_set_int(&iocom
->flags
,
501 if (si
>= 0 && (fds
[si
].revents
& POLLOUT
))
502 atomic_set_int(&iocom
->flags
,
504 if (wi
>= 0 && (fds
[wi
].revents
& POLLOUT
))
505 atomic_set_int(&iocom
->flags
,
507 if (ai
>= 0 && (fds
[ai
].revents
& POLLIN
))
508 atomic_set_int(&iocom
->flags
,
512 * Always check the pipe
514 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_PWORK
);
517 if (iocom
->flags
& DMSG_IOCOMF_SWORK
) {
518 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_SWORK
);
519 iocom
->signal_callback(iocom
);
523 * Pending message queues from other threads wake us up
524 * with a write to the wakeupfds[] pipe. We have to clear
525 * the pipe with a dummy read.
527 if (iocom
->flags
& DMSG_IOCOMF_PWORK
) {
528 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_PWORK
);
529 read(iocom
->wakeupfds
[0], dummybuf
, sizeof(dummybuf
));
530 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
531 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_WWORK
);
535 * Message write sequencing
537 if (iocom
->flags
& DMSG_IOCOMF_WWORK
)
538 dmsg_iocom_flush1(iocom
);
541 * Message read sequencing. Run this after the write
542 * sequencing in case the write sequencing allowed another
543 * auto-DELETE to occur on the read side.
545 if (iocom
->flags
& DMSG_IOCOMF_RWORK
) {
546 while ((iocom
->flags
& DMSG_IOCOMF_EOF
) == 0 &&
547 (msg
= dmsg_ioq_read(iocom
)) != NULL
) {
548 dmio_printf(iocom
, 4, "receive %s\n",
550 iocom
->rcvmsg_callback(msg
);
551 pthread_mutex_lock(&iocom
->mtx
);
552 dmsg_state_cleanuprx(iocom
, msg
);
553 pthread_mutex_unlock(&iocom
->mtx
);
557 if (iocom
->flags
& DMSG_IOCOMF_ARWORK
) {
558 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_ARWORK
);
559 iocom
->altmsg_callback(iocom
);
565 * Make sure there's enough room in the FIFO to hold the
568 * Assume worst case encrypted form is 2x the size of the
569 * plaintext equivalent.
573 dmsg_ioq_makeroom(dmsg_ioq_t
*ioq
, size_t needed
)
578 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
579 nmax
= sizeof(ioq
->buf
) - ioq
->fifo_end
;
580 if (bytes
+ nmax
/ 2 < needed
) {
582 bcopy(ioq
->buf
+ ioq
->fifo_beg
,
586 ioq
->fifo_cdx
-= ioq
->fifo_beg
;
588 if (ioq
->fifo_cdn
< ioq
->fifo_end
) {
589 bcopy(ioq
->buf
+ ioq
->fifo_cdn
,
590 ioq
->buf
+ ioq
->fifo_cdx
,
591 ioq
->fifo_end
- ioq
->fifo_cdn
);
593 ioq
->fifo_end
-= ioq
->fifo_cdn
- ioq
->fifo_cdx
;
594 ioq
->fifo_cdn
= ioq
->fifo_cdx
;
595 nmax
= sizeof(ioq
->buf
) - ioq
->fifo_end
;
601 * Read the next ready message from the ioq, issuing I/O if needed.
602 * Caller should retry on a read-event when NULL is returned.
604 * If an error occurs during reception a DMSG_LNK_ERROR msg will
605 * be returned for each open transaction, then the ioq and iocom
606 * will be errored out and a non-transactional DMSG_LNK_ERROR
607 * msg will be returned as the final message. The caller should not call
608 * us again after the final message is returned.
610 * Thread localized, iocom->mtx not held.
613 dmsg_ioq_read(dmsg_iocom_t
*iocom
)
615 dmsg_ioq_t
*ioq
= &iocom
->ioq_rx
;
627 * If a message is already pending we can just remove and
628 * return it. Message state has already been processed.
629 * (currently not implemented)
631 if ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
632 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
634 if (msg
->state
== &iocom
->state0
) {
635 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_EOF
);
636 dmio_printf(iocom
, 1,
637 "EOF ON SOCKET %d\n",
642 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_RREQ
| DMSG_IOCOMF_RWORK
);
645 * If the stream is errored out we stop processing it.
651 * Message read in-progress (msg is NULL at the moment). We don't
652 * allocate a msg until we have its core header.
654 nmax
= sizeof(ioq
->buf
) - ioq
->fifo_end
;
655 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
; /* already decrypted */
659 case DMSG_MSGQ_STATE_HEADER1
:
661 * Load the primary header, fail on any non-trivial read
662 * error or on EOF. Since the primary header is the same
663 * size is the message alignment it will never straddle
664 * the end of the buffer.
666 nmax
= dmsg_ioq_makeroom(ioq
, sizeof(msg
->any
.head
));
667 if (bytes
< sizeof(msg
->any
.head
)) {
668 n
= read(iocom
->sock_fd
,
669 ioq
->buf
+ ioq
->fifo_end
,
673 ioq
->error
= DMSG_IOQ_ERROR_EOF
;
676 if (errno
!= EINTR
&&
677 errno
!= EINPROGRESS
&&
679 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
685 ioq
->fifo_end
+= (size_t)n
;
690 * Decrypt data received so far. Data will be decrypted
691 * in-place but might create gaps in the FIFO. Partial
692 * blocks are not immediately decrypted.
694 * WARNING! The header might be in the wrong endian, we
695 * do not fix it up until we get the entire
698 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
699 dmsg_crypto_decrypt(iocom
, ioq
);
701 ioq
->fifo_cdx
= ioq
->fifo_end
;
702 ioq
->fifo_cdn
= ioq
->fifo_end
;
704 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
707 * Insufficient data accumulated (msg is NULL, caller will
711 if (bytes
< sizeof(msg
->any
.head
))
715 * Check and fixup the core header. Note that the icrc
716 * has to be calculated before any fixups, but the crc
717 * fields in the msg may have to be swapped like everything
720 head
= (void *)(ioq
->buf
+ ioq
->fifo_beg
);
721 if (head
->magic
!= DMSG_HDR_MAGIC
&&
722 head
->magic
!= DMSG_HDR_MAGIC_REV
) {
723 dmio_printf(iocom
, 1,
724 "%s: head->magic is bad %02x\n",
725 iocom
->label
, head
->magic
);
726 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
)
727 dmio_printf(iocom
, 1, "%s\n",
728 "(on encrypted link)");
729 ioq
->error
= DMSG_IOQ_ERROR_SYNC
;
734 * Calculate the full header size and aux data size
736 if (head
->magic
== DMSG_HDR_MAGIC_REV
) {
737 ioq
->hbytes
= (bswap32(head
->cmd
) & DMSGF_SIZE
) *
739 aux_size
= bswap32(head
->aux_bytes
);
741 ioq
->hbytes
= (head
->cmd
& DMSGF_SIZE
) *
743 aux_size
= head
->aux_bytes
;
745 ioq
->abytes
= DMSG_DOALIGN(aux_size
);
746 ioq
->unaligned_aux_size
= aux_size
;
747 if (ioq
->hbytes
< sizeof(msg
->any
.head
) ||
748 ioq
->hbytes
> sizeof(msg
->any
) ||
749 ioq
->abytes
> DMSG_AUX_MAX
) {
750 ioq
->error
= DMSG_IOQ_ERROR_FIELD
;
755 * Allocate the message, the next state will fill it in.
757 * NOTE: The aux_data buffer will be sized to an aligned
758 * value and the aligned remainder zero'd for
761 * NOTE: Supply dummy state and a degenerate cmd without
762 * CREATE set. The message will temporarily be
763 * associated with state0 until later post-processing.
765 msg
= dmsg_msg_alloc(&iocom
->state0
, aux_size
,
766 ioq
->hbytes
/ DMSG_ALIGN
,
771 * Fall through to the next state. Make sure that the
772 * extended header does not straddle the end of the buffer.
773 * We still want to issue larger reads into our buffer,
774 * book-keeping is easier if we don't bcopy() yet.
776 * Make sure there is enough room for bloated encrypt data.
778 nmax
= dmsg_ioq_makeroom(ioq
, ioq
->hbytes
);
779 ioq
->state
= DMSG_MSGQ_STATE_HEADER2
;
781 case DMSG_MSGQ_STATE_HEADER2
:
783 * Fill out the extended header.
786 if (bytes
< ioq
->hbytes
) {
788 n
= read(iocom
->sock_fd
,
789 ioq
->buf
+ ioq
->fifo_end
,
793 ioq
->error
= DMSG_IOQ_ERROR_EOF
;
796 if (errno
!= EINTR
&&
797 errno
!= EINPROGRESS
&&
799 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
805 ioq
->fifo_end
+= (size_t)n
;
809 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
810 dmsg_crypto_decrypt(iocom
, ioq
);
812 ioq
->fifo_cdx
= ioq
->fifo_end
;
813 ioq
->fifo_cdn
= ioq
->fifo_end
;
815 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
818 * Insufficient data accumulated (set msg NULL so caller will
821 if (bytes
< ioq
->hbytes
) {
827 * Calculate the extended header, decrypt data received
828 * so far. Handle endian-conversion for the entire extended
831 head
= (void *)(ioq
->buf
+ ioq
->fifo_beg
);
836 if (head
->magic
== DMSG_HDR_MAGIC_REV
)
837 xcrc32
= bswap32(head
->hdr_crc
);
839 xcrc32
= head
->hdr_crc
;
841 if (dmsg_icrc32(head
, ioq
->hbytes
) != xcrc32
) {
842 ioq
->error
= DMSG_IOQ_ERROR_XCRC
;
843 dmio_printf(iocom
, 1, "BAD-XCRC(%08x,%08x) %s\n",
844 xcrc32
, dmsg_icrc32(head
, ioq
->hbytes
),
849 head
->hdr_crc
= xcrc32
;
851 if (head
->magic
== DMSG_HDR_MAGIC_REV
) {
852 dmsg_bswap_head(head
);
856 * Copy the extended header into the msg and adjust the
859 bcopy(head
, &msg
->any
, ioq
->hbytes
);
862 * We are either done or we fall-through.
864 if (ioq
->abytes
== 0) {
865 ioq
->fifo_beg
+= ioq
->hbytes
;
870 * Must adjust bytes (and the state) when falling through.
871 * nmax doesn't change.
873 ioq
->fifo_beg
+= ioq
->hbytes
;
874 bytes
-= ioq
->hbytes
;
875 ioq
->state
= DMSG_MSGQ_STATE_AUXDATA1
;
877 case DMSG_MSGQ_STATE_AUXDATA1
:
879 * Copy the partial or complete [decrypted] payload from
880 * remaining bytes in the FIFO in order to optimize the
881 * makeroom call in the AUXDATA2 state. We have to
882 * fall-through either way so we can check the crc.
884 * msg->aux_size tracks our aux data.
886 * (Lets not complicate matters if the data is encrypted,
887 * since the data in-stream is not the same size as the
890 if (bytes
>= ioq
->abytes
) {
891 bcopy(ioq
->buf
+ ioq
->fifo_beg
, msg
->aux_data
,
893 msg
->aux_size
= ioq
->abytes
;
894 ioq
->fifo_beg
+= ioq
->abytes
;
895 assert(ioq
->fifo_beg
<= ioq
->fifo_cdx
);
896 assert(ioq
->fifo_cdx
<= ioq
->fifo_cdn
);
897 bytes
-= ioq
->abytes
;
899 bcopy(ioq
->buf
+ ioq
->fifo_beg
, msg
->aux_data
,
901 msg
->aux_size
= bytes
;
902 ioq
->fifo_beg
+= bytes
;
903 if (ioq
->fifo_cdx
< ioq
->fifo_beg
)
904 ioq
->fifo_cdx
= ioq
->fifo_beg
;
905 assert(ioq
->fifo_beg
<= ioq
->fifo_cdx
);
906 assert(ioq
->fifo_cdx
<= ioq
->fifo_cdn
);
911 ioq
->state
= DMSG_MSGQ_STATE_AUXDATA2
;
913 case DMSG_MSGQ_STATE_AUXDATA2
:
915 * Make sure there is enough room for more data.
918 nmax
= dmsg_ioq_makeroom(ioq
, ioq
->abytes
- msg
->aux_size
);
921 * Read and decrypt more of the payload.
923 if (msg
->aux_size
< ioq
->abytes
) {
926 n
= read(iocom
->sock_fd
,
927 ioq
->buf
+ ioq
->fifo_end
,
931 ioq
->error
= DMSG_IOQ_ERROR_EOF
;
934 if (errno
!= EINTR
&&
935 errno
!= EINPROGRESS
&&
937 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
943 ioq
->fifo_end
+= (size_t)n
;
947 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
948 dmsg_crypto_decrypt(iocom
, ioq
);
950 ioq
->fifo_cdx
= ioq
->fifo_end
;
951 ioq
->fifo_cdn
= ioq
->fifo_end
;
953 bytes
= ioq
->fifo_cdx
- ioq
->fifo_beg
;
955 if (bytes
> ioq
->abytes
- msg
->aux_size
)
956 bytes
= ioq
->abytes
- msg
->aux_size
;
959 bcopy(ioq
->buf
+ ioq
->fifo_beg
,
960 msg
->aux_data
+ msg
->aux_size
,
962 msg
->aux_size
+= bytes
;
963 ioq
->fifo_beg
+= bytes
;
967 * Insufficient data accumulated (set msg NULL so caller will
970 * Assert the auxillary data size is correct, then record the
971 * original unaligned size from the message header.
973 if (msg
->aux_size
< ioq
->abytes
) {
977 assert(msg
->aux_size
== ioq
->abytes
);
978 msg
->aux_size
= ioq
->unaligned_aux_size
;
981 * Check aux_crc, then we are done. Note that the crc
982 * is calculated over the aligned size, not the actual
985 xcrc32
= dmsg_icrc32(msg
->aux_data
, ioq
->abytes
);
986 if (xcrc32
!= msg
->any
.head
.aux_crc
) {
987 ioq
->error
= DMSG_IOQ_ERROR_ACRC
;
988 dmio_printf(iocom
, 1,
989 "iocom: ACRC error %08x vs %08x "
990 "msgid %016jx msgcmd %08x auxsize %d\n",
992 msg
->any
.head
.aux_crc
,
993 (intmax_t)msg
->any
.head
.msgid
,
995 msg
->any
.head
.aux_bytes
);
999 case DMSG_MSGQ_STATE_ERROR
:
1001 * Continued calls to drain recorded transactions (returning
1002 * a LNK_ERROR for each one), before we return the final
1005 assert(msg
== NULL
);
1009 * We don't double-return errors, the caller should not
1010 * have called us again after getting an error msg.
1017 * Check the message sequence. The iv[] should prevent any
1018 * possibility of a replay but we add this check anyway.
1020 if (msg
&& ioq
->error
== 0) {
1021 if ((msg
->any
.head
.salt
& 255) != (ioq
->seq
& 255)) {
1022 ioq
->error
= DMSG_IOQ_ERROR_MSGSEQ
;
1029 * Handle error, RREQ, or completion
1031 * NOTE: nmax and bytes are invalid at this point, we don't bother
1032 * to update them when breaking out.
1037 * An unrecoverable error causes all active receive
1038 * transactions to be terminated with a LNK_ERROR message.
1040 * Once all active transactions are exhausted we set the
1041 * iocom ERROR flag and return a non-transactional LNK_ERROR
1042 * message, which should cause master processing loops to
1045 dmio_printf(iocom
, 1, "IOQ ERROR %d\n", ioq
->error
);
1046 assert(ioq
->msg
== msg
);
1054 * No more I/O read processing
1056 ioq
->state
= DMSG_MSGQ_STATE_ERROR
;
1059 * Simulate a remote LNK_ERROR DELETE msg for any open
1060 * transactions, ending with a final non-transactional
1061 * LNK_ERROR (that the session can detect) when no
1062 * transactions remain.
1064 * NOTE: Temporarily supply state0 and a degenerate cmd
1065 * without CREATE set. The real state will be
1066 * assigned in the loop.
1068 * NOTE: We are simulating a received message using our
1069 * side of the state, so the DMSGF_REV* bits have
1072 pthread_mutex_lock(&iocom
->mtx
);
1073 dmsg_iocom_drain(iocom
);
1074 dmsg_simulate_failure(&iocom
->state0
, 0, ioq
->error
);
1075 pthread_mutex_unlock(&iocom
->mtx
);
1076 if (TAILQ_FIRST(&ioq
->msgq
))
1081 * For the iocom error case we want to set RWORK to indicate
1082 * that more messages might be pending.
1084 * It is possible to return NULL when there is more work to
1085 * do because each message has to be DELETEd in both
1086 * directions before we continue on with the next (though
1087 * this could be optimized). The transmit direction will
1091 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
1093 } else if (msg
== NULL
) {
1095 * Insufficient data received to finish building the message,
1096 * set RREQ and return NULL.
1098 * Leave ioq->msg intact.
1099 * Leave the FIFO intact.
1101 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RREQ
);
1104 * Continue processing msg.
1106 * The fifo has already been advanced past the message.
1107 * Trivially reset the FIFO indices if possible.
1109 * clear the FIFO if it is now empty and set RREQ to wait
1110 * for more from the socket. If the FIFO is not empty set
1111 * TWORK to bypass the poll so we loop immediately.
1113 if (ioq
->fifo_beg
== ioq
->fifo_cdx
&&
1114 ioq
->fifo_cdn
== ioq
->fifo_end
) {
1115 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RREQ
);
1121 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
1123 ioq
->state
= DMSG_MSGQ_STATE_HEADER1
;
1127 * Handle message routing. Validates non-zero sources
1128 * and routes message. Error will be 0 if the message is
1131 * State processing only occurs for messages destined for us.
1133 dmio_printf(iocom
, 5,
1134 "rxmsg cmd=%08x circ=%016jx\n",
1136 (intmax_t)msg
->any
.head
.circuit
);
1138 error
= dmsg_state_msgrx(msg
, 0);
1142 * Abort-after-closure, throw message away and
1143 * start reading another.
1145 if (error
== DMSG_IOQ_ERROR_EALREADY
) {
1151 * Process real error and throw away message.
1158 * No error and not routed
1160 /* no error, not routed. Fall through and return msg */
1166 * Calculate the header and data crc's and write a low-level message to
1167 * the connection. If aux_crc is non-zero the aux_data crc is already
1168 * assumed to have been set.
1170 * A non-NULL msg is added to the queue but not necessarily flushed.
1171 * Calling this function with msg == NULL will get a flush going.
1173 * (called from iocom_core only)
1176 dmsg_iocom_flush1(dmsg_iocom_t
*iocom
)
1178 dmsg_ioq_t
*ioq
= &iocom
->ioq_tx
;
1183 dmsg_msg_queue_t tmpq
;
1185 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_WREQ
| DMSG_IOCOMF_WWORK
);
1187 pthread_mutex_lock(&iocom
->mtx
);
1188 while ((msg
= TAILQ_FIRST(&iocom
->txmsgq
)) != NULL
) {
1189 TAILQ_REMOVE(&iocom
->txmsgq
, msg
, qentry
);
1190 TAILQ_INSERT_TAIL(&tmpq
, msg
, qentry
);
1192 pthread_mutex_unlock(&iocom
->mtx
);
1195 * Flush queue, doing all required encryption and CRC generation,
1196 * with the mutex unlocked.
1198 while ((msg
= TAILQ_FIRST(&tmpq
)) != NULL
) {
1200 * Process terminal connection errors.
1202 TAILQ_REMOVE(&tmpq
, msg
, qentry
);
1204 TAILQ_INSERT_TAIL(&ioq
->msgq
, msg
, qentry
);
1210 * Finish populating the msg fields. The salt ensures that
1211 * the iv[] array is ridiculously randomized and we also
1212 * re-seed our PRNG every 32768 messages just to be sure.
1214 msg
->any
.head
.magic
= DMSG_HDR_MAGIC
;
1215 msg
->any
.head
.salt
= (random() << 8) | (ioq
->seq
& 255);
1217 if ((ioq
->seq
& 32767) == 0) {
1218 pthread_mutex_lock(&iocom
->mtx
);
1220 pthread_mutex_unlock(&iocom
->mtx
);
1224 * Calculate aux_crc if 0, then calculate hdr_crc.
1226 if (msg
->aux_size
&& msg
->any
.head
.aux_crc
== 0) {
1227 abytes
= DMSG_DOALIGN(msg
->aux_size
);
1228 xcrc32
= dmsg_icrc32(msg
->aux_data
, abytes
);
1229 msg
->any
.head
.aux_crc
= xcrc32
;
1231 msg
->any
.head
.aux_bytes
= msg
->aux_size
;
1233 hbytes
= (msg
->any
.head
.cmd
& DMSGF_SIZE
) *
1235 msg
->any
.head
.hdr_crc
= 0;
1236 msg
->any
.head
.hdr_crc
= dmsg_icrc32(&msg
->any
.head
, hbytes
);
1239 * Enqueue the message (the flush codes handles stream
1242 TAILQ_INSERT_TAIL(&ioq
->msgq
, msg
, qentry
);
1245 dmsg_iocom_flush2(iocom
);
1249 * Thread localized, iocom->mtx not held by caller.
1251 * (called from iocom_core via iocom_flush1 only)
1254 dmsg_iocom_flush2(dmsg_iocom_t
*iocom
)
1256 dmsg_ioq_t
*ioq
= &iocom
->ioq_tx
;
1259 struct iovec iov
[DMSG_IOQ_MAXIOVEC
];
1269 dmsg_iocom_drain(iocom
);
1274 * Pump messages out the connection by building an iovec.
1276 * ioq->hbytes/ioq->abytes tracks how much of the first message
1277 * in the queue has been successfully written out, so we can
1285 TAILQ_FOREACH(msg
, &ioq
->msgq
, qentry
) {
1286 hbytes
= (msg
->any
.head
.cmd
& DMSGF_SIZE
) *
1288 abytes
= DMSG_DOALIGN(msg
->aux_size
);
1289 assert(hoff
<= hbytes
&& aoff
<= abytes
);
1291 if (hoff
< hbytes
) {
1292 size_t maxlen
= hbytes
- hoff
;
1293 if (maxlen
> sizeof(ioq
->buf
) / 2)
1294 maxlen
= sizeof(ioq
->buf
) / 2;
1295 iov
[iovcnt
].iov_base
= (char *)&msg
->any
.head
+ hoff
;
1296 iov
[iovcnt
].iov_len
= maxlen
;
1299 if (iovcnt
== DMSG_IOQ_MAXIOVEC
||
1300 maxlen
!= hbytes
- hoff
) {
1304 if (aoff
< abytes
) {
1305 size_t maxlen
= abytes
- aoff
;
1306 if (maxlen
> sizeof(ioq
->buf
) / 2)
1307 maxlen
= sizeof(ioq
->buf
) / 2;
1309 assert(msg
->aux_data
!= NULL
);
1310 iov
[iovcnt
].iov_base
= (char *)msg
->aux_data
+ aoff
;
1311 iov
[iovcnt
].iov_len
= maxlen
;
1314 if (iovcnt
== DMSG_IOQ_MAXIOVEC
||
1315 maxlen
!= abytes
- aoff
) {
1324 * Shortcut if no work to do. Be sure to check for old work still
1325 * pending in the FIFO.
1327 if (iovcnt
== 0 && ioq
->fifo_beg
== ioq
->fifo_cdx
)
1331 * Encrypt and write the data. The crypto code will move the
1332 * data into the fifo and adjust the iov as necessary. If
1333 * encryption is disabled the iov is left alone.
1335 * May return a smaller iov (thus a smaller n), with aggregated
1336 * chunks. May reduce nmax to what fits in the FIFO.
1338 * This function sets nact to the number of original bytes now
1339 * encrypted, adding to the FIFO some number of bytes that might
1340 * be greater depending on the crypto mechanic. iov[] is adjusted
1341 * to point at the FIFO if necessary.
1343 * NOTE: nact is the number of bytes eaten from the message. For
1344 * encrypted data this is the number of bytes processed for
1345 * encryption and not necessarily the number of bytes writable.
1346 * The return value from the writev() is the post-encrypted
1347 * byte count which might be larger.
1349 * NOTE: For direct writes, nact is the return value from the writev().
1351 if (iocom
->flags
& DMSG_IOCOMF_CRYPTED
) {
1353 * Make sure the FIFO has a reasonable amount of space
1354 * left (if not completely full).
1356 * In this situation we are staging the encrypted message
1357 * data in the FIFO. (nact) represents how much plaintext
1358 * has been staged, (n) represents how much encrypted data
1359 * has been flushed. The two are independent of each other.
1361 if (ioq
->fifo_beg
> sizeof(ioq
->buf
) / 2 &&
1362 sizeof(ioq
->buf
) - ioq
->fifo_end
< DMSG_ALIGN
* 2) {
1363 bcopy(ioq
->buf
+ ioq
->fifo_beg
, ioq
->buf
,
1364 ioq
->fifo_end
- ioq
->fifo_beg
);
1365 ioq
->fifo_cdx
-= ioq
->fifo_beg
;
1366 ioq
->fifo_cdn
-= ioq
->fifo_beg
;
1367 ioq
->fifo_end
-= ioq
->fifo_beg
;
1372 * beg .... cdx ............ cdn ............. end
1373 * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1375 * Advance fifo_beg on a successful write.
1377 iovcnt
= dmsg_crypto_encrypt(iocom
, ioq
, iov
, iovcnt
, &nact
);
1378 n
= writev(iocom
->sock_fd
, iov
, iovcnt
);
1382 if (ioq
->fifo_beg
== ioq
->fifo_end
) {
1391 * We don't mess with the nact returned by the crypto_encrypt
1392 * call, which represents the filling of the FIFO. (n) tells
1393 * us how much we were able to write from the FIFO. The two
1394 * are different beasts when encrypting.
1398 * In this situation we are not staging the messages to the
1399 * FIFO but instead writing them directly from the msg
1400 * structure(s) unencrypted, so (nact) is basically (n).
1402 n
= writev(iocom
->sock_fd
, iov
, iovcnt
);
1411 * Clean out the transmit queue based on what we successfully
1412 * encrypted (nact is the plaintext count) and is now in the FIFO.
1413 * ioq->hbytes/abytes represents the portion of the first message
1416 while ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
1417 hbytes
= (msg
->any
.head
.cmd
& DMSGF_SIZE
) *
1419 abytes
= DMSG_DOALIGN(msg
->aux_size
);
1421 if ((size_t)nact
< hbytes
- ioq
->hbytes
) {
1422 ioq
->hbytes
+= nact
;
1426 nact
-= hbytes
- ioq
->hbytes
;
1427 ioq
->hbytes
= hbytes
;
1428 if ((size_t)nact
< abytes
- ioq
->abytes
) {
1429 ioq
->abytes
+= nact
;
1433 nact
-= abytes
- ioq
->abytes
;
1434 /* ioq->abytes = abytes; optimized out */
1436 dmio_printf(iocom
, 5,
1437 "txmsg cmd=%08x circ=%016jx\n",
1439 (intmax_t)msg
->any
.head
.circuit
);
1441 #ifdef DMSG_BLOCK_DEBUG
1444 if (msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_DELETE
)) {
1445 if ((msg
->state
->flags
& DMSG_STATE_ROOT
) == 0) {
1446 tcmd
= (msg
->state
->icmd
& DMSGF_BASECMDMASK
) |
1447 (msg
->any
.head
.cmd
& (DMSGF_CREATE
|
1454 tcmd
= msg
->any
.head
.cmd
& DMSGF_CMDSWMASK
;
1458 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
:
1459 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
:
1460 dmio_printf(iocom
, 4,
1461 "write BIO %-3d %016jx %d@%016jx\n",
1462 biocount
, msg
->any
.head
.msgid
,
1463 msg
->any
.blk_read
.bytes
,
1464 msg
->any
.blk_read
.offset
);
1466 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
1467 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
1468 dmio_printf(iocom
, 4,
1469 "wretr BIO %-3d %016jx %d@%016jx\n",
1470 biocount
, msg
->any
.head
.msgid
,
1471 msg
->any
.blk_read
.bytes
,
1472 msg
->any
.blk_read
.offset
);
1479 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
1488 * Process the return value from the write w/regards to blocking.
1491 if (save_errno
!= EINTR
&&
1492 save_errno
!= EINPROGRESS
&&
1493 save_errno
!= EAGAIN
) {
1497 ioq
->error
= DMSG_IOQ_ERROR_SOCK
;
1498 dmsg_iocom_drain(iocom
);
1501 * Wait for socket buffer space, do not try to
1502 * process more packets for transmit until space
1505 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_WREQ
);
1507 } else if (TAILQ_FIRST(&ioq
->msgq
) ||
1508 TAILQ_FIRST(&iocom
->txmsgq
) ||
1509 ioq
->fifo_beg
!= ioq
->fifo_cdx
) {
1511 * If the write succeeded and more messages are pending
1512 * in either msgq, or the FIFO WWORK must remain set.
1514 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_WWORK
);
1516 /* else no transmit-side work remains */
1519 dmsg_iocom_drain(iocom
);
1524 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1525 * write events will occur. We don't kill read msgs because we want
1526 * the caller to pull off our contrived terminal error msg to detect
1527 * the connection failure.
1529 * Localized to iocom_core thread, iocom->mtx not held by caller.
1532 dmsg_iocom_drain(dmsg_iocom_t
*iocom
)
1534 dmsg_ioq_t
*ioq
= &iocom
->ioq_tx
;
1537 atomic_clear_int(&iocom
->flags
, DMSG_IOCOMF_WREQ
| DMSG_IOCOMF_WWORK
);
1541 while ((msg
= TAILQ_FIRST(&ioq
->msgq
)) != NULL
) {
1542 TAILQ_REMOVE(&ioq
->msgq
, msg
, qentry
);
1549 * Write a message to an iocom, with additional state processing.
1552 dmsg_msg_write(dmsg_msg_t
*msg
)
1554 dmsg_iocom_t
*iocom
= msg
->state
->iocom
;
1555 dmsg_state_t
*state
;
1558 pthread_mutex_lock(&iocom
->mtx
);
1561 dmio_printf(iocom
, 5,
1562 "msgtx: cmd=%08x msgid=%016jx "
1563 "state %p(%08x) error=%d\n",
1564 msg
->any
.head
.cmd
, msg
->any
.head
.msgid
,
1565 state
, (state
? state
->icmd
: 0),
1566 msg
->any
.head
.error
);
1571 * Make sure the parent transaction is still open in the transmit
1572 * direction. If it isn't the message is dead and we have to
1573 * potentially simulate a rxmsg terminating the transaction.
1575 if ((state
->parent
->txcmd
& DMSGF_DELETE
) ||
1576 (state
->parent
->rxcmd
& DMSGF_DELETE
)) {
1577 dmio_printf(iocom
, 4, "dmsg_msg_write: EARLY TERMINATION\n");
1578 dmsg_simulate_failure(state
, DMSG_ERR_LOSTLINK
);
1579 dmsg_state_cleanuptx(iocom
, msg
);
1581 pthread_mutex_unlock(&iocom
->mtx
);
1586 * Process state data into the message as needed, then update the
1587 * state based on the message.
1589 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1591 * Existing transaction (could be reply). It is also
1592 * possible for this to be the first reply (CREATE is set),
1593 * in which case we populate state->txcmd.
1595 * state->txcmd is adjusted to hold the final message cmd,
1596 * and we also be sure to set the CREATE bit here. We did
1597 * not set it in dmsg_msg_alloc() because that would have
1598 * not been serialized (state could have gotten ripped out
1599 * from under the message prior to it being transmitted).
1601 if ((msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_REPLY
)) ==
1603 state
->txcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
1604 state
->icmd
= state
->txcmd
& DMSGF_BASECMDMASK
;
1605 state
->flags
&= ~DMSG_STATE_NEW
;
1607 msg
->any
.head
.msgid
= state
->msgid
;
1609 if (msg
->any
.head
.cmd
& DMSGF_CREATE
) {
1610 state
->txcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
1615 * Discard messages sent to transactions which are already dead.
1617 if (state
&& (state
->txcmd
& DMSGF_DELETE
)) {
1618 dmio_printf(iocom
, 4,
1619 "dmsg_msg_write: drop msg %08x to dead "
1620 "circuit state=%p\n",
1621 msg
->any
.head
.cmd
, state
);
1627 * Normally we queue the msg for output. However, if the circuit is
1628 * dead or dying we must simulate a failure in the return direction
1629 * and throw the message away. The other end is not expecting any
1630 * further messages from us on this state.
1632 * Note that the I/O thread is responsible for generating the CRCs
1635 if (state
->flags
& DMSG_STATE_DYING
) {
1637 if ((state
->parent
->txcmd
& DMSGF_DELETE
) ||
1638 (state
->parent
->flags
& DMSG_STATE_DYING
) ||
1639 (state
->flags
& DMSG_STATE_DYING
)) {
1642 * Illegal message, kill state and related sub-state.
1643 * Cannot transmit if state is already dying.
1645 dmio_printf(iocom
, 4,
1646 "dmsg_msg_write: Write to dying circuit "
1647 "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
1648 state
->parent
->rxcmd
,
1649 state
->parent
->txcmd
,
1650 state
->parent
->flags
);
1651 dmsg_state_hold(state
);
1652 dmsg_state_cleanuptx(iocom
, msg
);
1653 if ((state
->flags
& DMSG_STATE_ABORTING
) == 0) {
1654 dmsg_simulate_failure(state
, 1, DMSG_ERR_LOSTLINK
);
1656 dmsg_state_drop(state
);
1660 * Queue the message, clean up transmit state prior to queueing
1661 * to avoid SMP races.
1663 dmio_printf(iocom
, 5,
1664 "dmsg_msg_write: commit msg state=%p to txkmsgq\n",
1666 dmsg_state_cleanuptx(iocom
, msg
);
1667 TAILQ_INSERT_TAIL(&iocom
->txmsgq
, msg
, qentry
);
1669 write(iocom
->wakeupfds
[1], &dummy
, 1); /* XXX optimize me */
1671 pthread_mutex_unlock(&iocom
->mtx
);
1675 * Remove state from its parent's subq. This can wind up recursively
1676 * dropping the parent upward.
1678 * NOTE: iocom must be locked.
1680 * NOTE: Once we drop the parent, our pstate pointer may become invalid.
1684 dmsg_subq_delete(dmsg_state_t
*state
)
1686 dmsg_state_t
*pstate
;
1688 if (state
->flags
& DMSG_STATE_SUBINSERTED
) {
1689 pstate
= state
->parent
;
1691 if (pstate
->scan
== state
)
1692 pstate
->scan
= NULL
;
1693 TAILQ_REMOVE(&pstate
->subq
, state
, entry
);
1694 state
->flags
&= ~DMSG_STATE_SUBINSERTED
;
1695 state
->parent
= NULL
;
1696 if (TAILQ_EMPTY(&pstate
->subq
))
1697 dmsg_state_drop(pstate
);/* pstate->subq */
1698 pstate
= NULL
; /* safety */
1699 dmsg_state_drop(state
); /* pstate->subq */
1701 assert(state
->parent
== NULL
);
1706 * Simulate reception of a transaction DELETE message when the link goes
1707 * bad. This routine must recurse through state->subq and generate messages
1708 * and callbacks bottom-up.
1710 * iocom->mtx must be held by caller.
1714 dmsg_simulate_failure(dmsg_state_t
*state
, int meto
, int error
)
1716 dmsg_state_t
*substate
;
1718 dmsg_state_hold(state
);
1720 dmsg_state_abort(state
);
1723 * Recurse through sub-states.
1726 TAILQ_FOREACH(substate
, &state
->subq
, entry
) {
1727 if (substate
->flags
& DMSG_STATE_ABORTING
)
1729 state
->scan
= substate
;
1730 dmsg_simulate_failure(substate
, 1, error
);
1731 if (state
->scan
!= substate
)
1735 dmsg_state_drop(state
);
1740 dmsg_state_abort(dmsg_state_t
*state
)
1742 dmsg_iocom_t
*iocom
;
1746 * Set ABORTING and DYING, return if already set. If the state was
1747 * just allocated we defer the abort operation until the related
1748 * message is processed.
1750 if (state
->flags
& DMSG_STATE_ABORTING
)
1752 state
->flags
|= DMSG_STATE_ABORTING
;
1753 dmsg_state_dying(state
);
1754 if (state
->flags
& DMSG_STATE_NEW
) {
1755 dmio_printf(iocom
, 4,
1756 "dmsg_state_abort(0): state %p rxcmd %08x "
1757 "txcmd %08x flags %08x - in NEW state\n",
1758 state
, state
->rxcmd
,
1759 state
->txcmd
, state
->flags
);
1764 * Simulate parent state failure before child states. Device
1765 * drivers need to understand this and flag the situation but might
1766 * have asynchronous operations in progress that they cannot stop.
1767 * To make things easier, parent states will not actually disappear
1768 * until the children are all gone.
1770 if ((state
->rxcmd
& DMSGF_DELETE
) == 0) {
1771 dmio_printf(iocom
, 5,
1772 "dmsg_state_abort() on state %p\n",
1774 msg
= dmsg_msg_alloc_locked(state
, 0, DMSG_LNK_ERROR
,
1776 if ((state
->rxcmd
& DMSGF_CREATE
) == 0)
1777 msg
->any
.head
.cmd
|= DMSGF_CREATE
;
1778 msg
->any
.head
.cmd
|= DMSGF_DELETE
|
1779 (state
->rxcmd
& DMSGF_REPLY
);
1780 msg
->any
.head
.cmd
^= (DMSGF_REVTRANS
| DMSGF_REVCIRC
);
1781 msg
->any
.head
.error
= DMSG_ERR_LOSTLINK
;
1782 msg
->any
.head
.cmd
|= DMSGF_ABORT
;
1785 * Issue callback synchronously even though this isn't
1786 * the receiver thread. We need to issue the callback
1787 * before removing state from the subq in order to allow
1788 * the callback to reply.
1790 iocom
= state
->iocom
;
1791 dmsg_state_msgrx(msg
, 1);
1792 pthread_mutex_unlock(&iocom
->mtx
);
1793 iocom
->rcvmsg_callback(msg
);
1794 pthread_mutex_lock(&iocom
->mtx
);
1795 dmsg_state_cleanuprx(iocom
, msg
);
1797 TAILQ_INSERT_TAIL(&iocom
->ioq_rx
.msgq
, msg
, qentry
);
1798 atomic_set_int(&iocom
->flags
, DMSG_IOCOMF_RWORK
);
1805 * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing
1806 * the transmission of any new messages on these states. This is done
1807 * atomically when parent state is terminating, whereas setting ABORTING is
1808 * not atomic and can leak races.
1812 dmsg_state_dying(dmsg_state_t
*state
)
1816 if ((state
->flags
& DMSG_STATE_DYING
) == 0) {
1817 state
->flags
|= DMSG_STATE_DYING
;
1818 TAILQ_FOREACH(scan
, &state
->subq
, entry
)
1819 dmsg_state_dying(scan
);
1824 * This is a shortcut to formulate a reply to msg with a simple error code,
1825 * It can reply to and terminate a transaction, or it can reply to a one-way
1826 * messages. A DMSG_LNK_ERROR command code is utilized to encode
1827 * the error code (which can be 0). Not all transactions are terminated
1828 * with DMSG_LNK_ERROR status (the low level only cares about the
1829 * MSGF_DELETE flag), but most are.
1831 * Replies to one-way messages are a bit of an oxymoron but the feature
1832 * is used by the debug (DBG) protocol.
1834 * The reply contains no extended data.
1837 dmsg_msg_reply(dmsg_msg_t
*msg
, uint32_t error
)
1839 dmsg_state_t
*state
= msg
->state
;
1844 * Reply with a simple error code and terminate the transaction.
1846 cmd
= DMSG_LNK_ERROR
;
1849 * Check if our direction has even been initiated yet, set CREATE.
1851 * Check what direction this is (command or reply direction). Note
1852 * that txcmd might not have been initiated yet.
1854 * If our direction has already been closed we just return without
1857 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1858 if (state
->txcmd
& DMSGF_DELETE
)
1860 if (state
->txcmd
& DMSGF_REPLY
)
1862 cmd
|= DMSGF_DELETE
;
1864 if ((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0)
1869 * Allocate the message and associate it with the existing state.
1870 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1871 * allocate new state. We have our state already.
1873 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1874 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1875 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1876 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1878 nmsg
->any
.head
.error
= error
;
1880 dmsg_msg_write(nmsg
);
1884 * Similar to dmsg_msg_reply() but leave the transaction open. That is,
1885 * we are generating a streaming reply or an intermediate acknowledgement
1886 * of some sort as part of the higher level protocol, with more to come
1890 dmsg_msg_result(dmsg_msg_t
*msg
, uint32_t error
)
1892 dmsg_state_t
*state
= msg
->state
;
1898 * Reply with a simple error code and terminate the transaction.
1900 cmd
= DMSG_LNK_ERROR
;
1903 * Check if our direction has even been initiated yet, set CREATE.
1905 * Check what direction this is (command or reply direction). Note
1906 * that txcmd might not have been initiated yet.
1908 * If our direction has already been closed we just return without
1911 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1912 if (state
->txcmd
& DMSGF_DELETE
)
1914 if (state
->txcmd
& DMSGF_REPLY
)
1916 /* continuing transaction, do not set MSGF_DELETE */
1918 if ((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0)
1921 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1922 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1923 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1924 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1926 nmsg
->any
.head
.error
= error
;
1928 dmsg_msg_write(nmsg
);
1932 * Terminate a transaction given a state structure by issuing a DELETE.
1933 * (the state structure must not be &iocom->state0)
1936 dmsg_state_reply(dmsg_state_t
*state
, uint32_t error
)
1939 uint32_t cmd
= DMSG_LNK_ERROR
| DMSGF_DELETE
;
1942 * Nothing to do if we already transmitted a delete
1944 if (state
->txcmd
& DMSGF_DELETE
)
1948 * Set REPLY if the other end initiated the command. Otherwise
1949 * we are the command direction.
1951 if (state
->txcmd
& DMSGF_REPLY
)
1954 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1955 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1956 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1957 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1959 nmsg
->any
.head
.error
= error
;
1960 dmsg_msg_write(nmsg
);
1964 * Terminate a transaction given a state structure by issuing a DELETE.
1965 * (the state structure must not be &iocom->state0)
1968 dmsg_state_result(dmsg_state_t
*state
, uint32_t error
)
1971 uint32_t cmd
= DMSG_LNK_ERROR
;
1974 * Nothing to do if we already transmitted a delete
1976 if (state
->txcmd
& DMSGF_DELETE
)
1980 * Set REPLY if the other end initiated the command. Otherwise
1981 * we are the command direction.
1983 if (state
->txcmd
& DMSGF_REPLY
)
1986 nmsg
= dmsg_msg_alloc(state
, 0, cmd
, NULL
, NULL
);
1987 if ((state
->flags
& DMSG_STATE_ROOT
) == 0) {
1988 if ((state
->txcmd
& DMSGF_CREATE
) == 0)
1989 nmsg
->any
.head
.cmd
|= DMSGF_CREATE
;
1991 nmsg
->any
.head
.error
= error
;
1992 dmsg_msg_write(nmsg
);
1995 /************************************************************************
1996 * TRANSACTION STATE HANDLING *
1997 ************************************************************************
2002 * Process state tracking for a message after reception, prior to execution.
2003 * Possibly route the message (consuming it).
2005 * Called with msglk held and the msg dequeued.
2007 * All messages are called with dummy state and return actual state.
2008 * (One-off messages often just return the same dummy state).
2010 * May request that caller discard the message by setting *discardp to 1.
2011 * The returned state is not used in this case and is allowed to be NULL.
2015 * These routines handle persistent and command/reply message state via the
2016 * CREATE and DELETE flags. The first message in a command or reply sequence
2017 * sets CREATE, the last message in a command or reply sequence sets DELETE.
2019 * There can be any number of intermediate messages belonging to the same
2020 * sequence sent inbetween the CREATE message and the DELETE message,
2021 * which set neither flag. This represents a streaming command or reply.
2023 * Any command message received with CREATE set expects a reply sequence to
2024 * be returned. Reply sequences work the same as command sequences except the
2025 * REPLY bit is also sent. Both the command side and reply side can
2026 * degenerate into a single message with both CREATE and DELETE set. Note
2027 * that one side can be streaming and the other side not, or neither, or both.
2029 * The msgid is unique for the initiator. That is, two sides sending a new
2030 * message can use the same msgid without colliding.
2034 * The message may be running over a circuit. If the circuit is half-deleted
2035 * The message is typically racing against a link failure and must be thrown
2036 * out. As the circuit deletion propagates the library will automatically
2037 * generate terminations for sub states.
2041 * ABORT sequences work by setting the ABORT flag along with normal message
2042 * state. However, ABORTs can also be sent on half-closed messages, that is
2043 * even if the command or reply side has already sent a DELETE, as long as
2044 * the message has not been fully closed it can still send an ABORT+DELETE
2045 * to terminate the half-closed message state.
2047 * Since ABORT+DELETEs can race we silently discard ABORT's for message
2048 * state which has already been fully closed. REPLY+ABORT+DELETEs can
2049 * also race, and in this situation the other side might have already
2050 * initiated a new unrelated command with the same message id. Since
2051 * the abort has not set the CREATE flag the situation can be detected
2052 * and the message will also be discarded.
2054 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
2055 * The ABORT request is essentially integrated into the command instead
2056 * of being sent later on. In this situation the command implementation
2057 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
2058 * special-case non-blocking operation for the command.
2060 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
2061 * to be mid-stream aborts for command/reply sequences. ABORTs on
2062 * one-way messages are not supported.
2064 * NOTE! If a command sequence does not support aborts the ABORT flag is
2069 * One-off messages (no reply expected) are sent without an established
2070 * transaction. CREATE and DELETE are left clear and the msgid is usually 0.
2071 * For one-off messages sent over circuits msgid generally MUST be 0.
2073 * One-off messages cannot be aborted and typically aren't processed
2074 * by these routines. Order is still guaranteed for messages sent over
2075 * the same circuit. The REPLY bit can be used to distinguish whether
2076 * a one-off message is a command or reply. For example, one-off replies
2077 * will typically just contain status updates.
2080 dmsg_state_msgrx(dmsg_msg_t
*msg
, int mstate
)
2082 dmsg_iocom_t
*iocom
= msg
->state
->iocom
;
2083 dmsg_state_t
*state
;
2084 dmsg_state_t
*pstate
;
2085 dmsg_state_t sdummy
;
2088 pthread_mutex_lock(&iocom
->mtx
);
2091 dmio_printf(iocom
, 5,
2092 "msgrx: cmd=%08x msgid=%016jx "
2093 "circuit=%016jx error=%d\n",
2095 msg
->any
.head
.msgid
,
2096 msg
->any
.head
.circuit
,
2097 msg
->any
.head
.error
);
2101 * Lookup the circuit (pstate). The circuit will be an open
2102 * transaction. The REVCIRC bit in the message tells us which side
2105 * If mstate is non-zero the state has already been incorporated
2106 * into the message as part of a simulated abort. Note that in this
2107 * situation the parent state may have already been removed from
2111 pstate
= msg
->state
->parent
;
2112 } else if (msg
->any
.head
.circuit
) {
2113 sdummy
.msgid
= msg
->any
.head
.circuit
;
2115 if (msg
->any
.head
.cmd
& DMSGF_REVCIRC
) {
2116 pstate
= RB_FIND(dmsg_state_tree
,
2117 &iocom
->statewr_tree
,
2120 pstate
= RB_FIND(dmsg_state_tree
,
2121 &iocom
->staterd_tree
,
2126 * If we cannot find the circuit throw the message away.
2127 * The state will have already been taken care of by
2128 * the simulated failure code. This case can occur due
2129 * to a failure propagating in one direction crossing a
2130 * request on the failed circuit propagating in the other
2133 if (pstate
== NULL
) {
2134 dmio_printf(iocom
, 4,
2135 "missing parent in stacked trans %s\n",
2137 pthread_mutex_unlock(&iocom
->mtx
);
2138 error
= DMSG_IOQ_ERROR_EALREADY
;
2143 pstate
= &iocom
->state0
;
2145 /* WARNING: pstate not (yet) refd */
2150 * If mstate is non-zero the state has already been incorporated
2151 * into the message as part of a simulated abort. Note that in this
2152 * situation the state may have already been removed from the RBTREE.
2154 * If received msg is a command state is on staterd_tree.
2155 * If received msg is a reply state is on statewr_tree.
2156 * Otherwise there is no state (retain &iocom->state0)
2161 sdummy
.msgid
= msg
->any
.head
.msgid
;
2162 if (msg
->any
.head
.cmd
& DMSGF_REVTRANS
) {
2163 state
= RB_FIND(dmsg_state_tree
,
2164 &iocom
->statewr_tree
, &sdummy
);
2166 state
= RB_FIND(dmsg_state_tree
,
2167 &iocom
->staterd_tree
, &sdummy
);
2172 dmio_printf(iocom
, 5, "msgrx:\tstate %p(%08x)",
2173 state
, (state
? state
->icmd
: 0));
2174 if (pstate
!= &iocom
->state0
) {
2175 dmio_printf(iocom
, 5,
2177 pstate
, pstate
->icmd
);
2179 dmio_printf(iocom
, 5, "%s\n", "");
2183 /* state already assigned to msg */
2186 * Message over an existing transaction (CREATE should not
2189 dmsg_state_drop(msg
->state
);
2190 dmsg_state_hold(state
);
2192 assert(pstate
== state
->parent
);
2195 * Either a new transaction (if CREATE set) or a one-off.
2201 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2202 * inside the case statements.
2204 * Construct new state as necessary.
2206 switch(msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_DELETE
|
2209 case DMSGF_CREATE
| DMSGF_DELETE
:
2211 * Create new sub-transaction under pstate.
2212 * (any DELETE is handled in post-processing of msg).
2214 * (During routing the msgid was made unique for this
2215 * direction over the comlink, so our RB trees can be
2216 * iocom-based instead of state-based).
2218 if (state
!= pstate
) {
2219 dmio_printf(iocom
, 2,
2220 "duplicate transaction %s\n",
2222 error
= DMSG_IOQ_ERROR_TRANS
;
2228 * Allocate the new state.
2230 state
= malloc(sizeof(*state
));
2231 bzero(state
, sizeof(*state
));
2232 atomic_add_int(&dmsg_state_count
, 1);
2234 TAILQ_INIT(&state
->subq
);
2235 dmsg_state_hold(pstate
);
2236 state
->parent
= pstate
;
2237 state
->iocom
= iocom
;
2238 state
->flags
= DMSG_STATE_DYNAMIC
|
2239 DMSG_STATE_OPPOSITE
;
2240 state
->msgid
= msg
->any
.head
.msgid
;
2241 state
->txcmd
= DMSGF_REPLY
;
2242 state
->rxcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
2243 state
->icmd
= state
->rxcmd
& DMSGF_BASECMDMASK
;
2244 state
->flags
&= ~DMSG_STATE_NEW
;
2247 RB_INSERT(dmsg_state_tree
, &iocom
->staterd_tree
, state
);
2248 if (TAILQ_EMPTY(&pstate
->subq
))
2249 dmsg_state_hold(pstate
);/* pstate->subq */
2250 TAILQ_INSERT_TAIL(&pstate
->subq
, state
, entry
);
2251 state
->flags
|= DMSG_STATE_SUBINSERTED
|
2252 DMSG_STATE_RBINSERTED
;
2253 dmsg_state_hold(state
); /* pstate->subq */
2254 dmsg_state_hold(state
); /* state on rbtree */
2255 dmsg_state_hold(state
); /* msg->state */
2258 * If the parent is a relay set up the state handler to
2259 * automatically route the message. Local processing will
2262 * (state relays are seeded by SPAN processing)
2265 state
->func
= dmsg_state_relay
;
2270 * Persistent state is expected but might not exist if an
2271 * ABORT+DELETE races the close.
2273 * (any DELETE is handled in post-processing of msg).
2275 if (state
== pstate
) {
2276 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2277 error
= DMSG_IOQ_ERROR_EALREADY
;
2279 dmio_printf(iocom
, 2,
2280 "missing-state %s\n",
2282 error
= DMSG_IOQ_ERROR_TRANS
;
2289 * Handle another ABORT+DELETE case if the msgid has already
2292 if ((state
->rxcmd
& DMSGF_CREATE
) == 0) {
2293 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2294 error
= DMSG_IOQ_ERROR_EALREADY
;
2296 dmio_printf(iocom
, 2,
2297 "reused-state %s\n",
2299 error
= DMSG_IOQ_ERROR_TRANS
;
2308 * Check for mid-stream ABORT command received, otherwise
2311 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2312 if ((state
== pstate
) ||
2313 (state
->rxcmd
& DMSGF_CREATE
) == 0) {
2314 error
= DMSG_IOQ_ERROR_EALREADY
;
2320 case DMSGF_REPLY
| DMSGF_CREATE
:
2321 case DMSGF_REPLY
| DMSGF_CREATE
| DMSGF_DELETE
:
2323 * When receiving a reply with CREATE set the original
2324 * persistent state message should already exist.
2326 if (state
== pstate
) {
2327 dmio_printf(iocom
, 2, "no-state(r) %s\n",
2329 error
= DMSG_IOQ_ERROR_TRANS
;
2333 assert(((state
->rxcmd
^ msg
->any
.head
.cmd
) & DMSGF_REPLY
) == 0);
2334 state
->rxcmd
= msg
->any
.head
.cmd
& ~DMSGF_DELETE
;
2337 case DMSGF_REPLY
| DMSGF_DELETE
:
2339 * Received REPLY+ABORT+DELETE in case where msgid has
2340 * already been fully closed, ignore the message.
2342 if (state
== pstate
) {
2343 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2344 error
= DMSG_IOQ_ERROR_EALREADY
;
2346 dmio_printf(iocom
, 2,
2347 "no-state(r,d) %s\n",
2349 error
= DMSG_IOQ_ERROR_TRANS
;
2356 * Received REPLY+ABORT+DELETE in case where msgid has
2357 * already been reused for an unrelated message,
2358 * ignore the message.
2360 if ((state
->rxcmd
& DMSGF_CREATE
) == 0) {
2361 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2362 error
= DMSG_IOQ_ERROR_EALREADY
;
2364 dmio_printf(iocom
, 2,
2365 "reused-state(r,d) %s\n",
2367 error
= DMSG_IOQ_ERROR_TRANS
;
2376 * Check for mid-stream ABORT reply received to sent command.
2378 if (msg
->any
.head
.cmd
& DMSGF_ABORT
) {
2379 if (state
== pstate
||
2380 (state
->rxcmd
& DMSGF_CREATE
) == 0) {
2381 error
= DMSG_IOQ_ERROR_EALREADY
;
2390 * Calculate the easy-switch() transactional command. Represents
2391 * the outer-transaction command for any transaction-create or
2392 * transaction-delete, and the inner message command for any
2393 * non-transaction or inside-transaction command. tcmd will be
2394 * set to 0 for any messaging error condition.
2396 * The two can be told apart because outer-transaction commands
2397 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2399 if (msg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_DELETE
)) {
2400 if ((msg
->state
->flags
& DMSG_STATE_ROOT
) == 0) {
2401 msg
->tcmd
= (state
->icmd
& DMSGF_BASECMDMASK
) |
2402 (msg
->any
.head
.cmd
& (DMSGF_CREATE
|
2409 msg
->tcmd
= msg
->any
.head
.cmd
& DMSGF_CMDSWMASK
;
2412 #ifdef DMSG_BLOCK_DEBUG
2413 switch (msg
->tcmd
) {
2414 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
:
2415 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
:
2416 dmio_printf(iocom
, 4,
2417 "read BIO %-3d %016jx %d@%016jx\n",
2418 biocount
, msg
->any
.head
.msgid
,
2419 msg
->any
.blk_read
.bytes
,
2420 msg
->any
.blk_read
.offset
);
2422 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2423 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2424 dmio_printf(iocom
, 4,
2425 "rread BIO %-3d %016jx %d@%016jx\n",
2426 biocount
, msg
->any
.head
.msgid
,
2427 msg
->any
.blk_read
.bytes
,
2428 msg
->any
.blk_read
.offset
);
2436 * Adjust state, mark receive side as DELETED if appropriate and
2437 * adjust RB tree if both sides are DELETED. cleanuprx handles
2438 * the rest after the state callback returns.
2440 assert(msg
->state
->iocom
== iocom
);
2441 assert(msg
->state
== state
);
2443 if (state
->flags
& DMSG_STATE_ROOT
) {
2445 * Nothing to do for non-transactional messages.
2447 } else if (msg
->any
.head
.cmd
& DMSGF_DELETE
) {
2449 * Message terminating transaction, remove the state from
2450 * the RB tree if the full transaction is now complete.
2451 * The related state, subq, and parent link is retained
2452 * until after the state callback is complete.
2454 assert((state
->rxcmd
& DMSGF_DELETE
) == 0);
2455 state
->rxcmd
|= DMSGF_DELETE
;
2456 if (state
->txcmd
& DMSGF_DELETE
) {
2457 assert(state
->flags
& DMSG_STATE_RBINSERTED
);
2458 if (state
->rxcmd
& DMSGF_REPLY
) {
2459 assert(msg
->any
.head
.cmd
& DMSGF_REPLY
);
2460 RB_REMOVE(dmsg_state_tree
,
2461 &iocom
->statewr_tree
, state
);
2463 assert((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0);
2464 RB_REMOVE(dmsg_state_tree
,
2465 &iocom
->staterd_tree
, state
);
2467 state
->flags
&= ~DMSG_STATE_RBINSERTED
;
2468 dmsg_state_drop(state
);
2472 pthread_mutex_unlock(&iocom
->mtx
);
2474 if (DMsgDebugOpt
&& error
)
2475 dmio_printf(iocom
, 1, "msgrx: error %d\n", error
);
2481 * Route the message and handle pair-state processing.
2484 dmsg_state_relay(dmsg_msg_t
*lmsg
)
2486 dmsg_state_t
*lpstate
;
2487 dmsg_state_t
*rpstate
;
2488 dmsg_state_t
*lstate
;
2489 dmsg_state_t
*rstate
;
2492 #ifdef DMSG_BLOCK_DEBUG
2493 switch (lmsg
->tcmd
) {
2494 case DMSG_BLK_OPEN
| DMSGF_CREATE
:
2495 dmio_printf(iocom
, 4, "%s\n",
2496 "relay BIO_OPEN (CREATE)");
2498 case DMSG_BLK_OPEN
| DMSGF_DELETE
:
2499 dmio_printf(iocom
, 4, "%s\n",
2500 "relay BIO_OPEN (DELETE)");
2502 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
:
2503 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
:
2504 atomic_add_int(&biocount
, 1);
2505 dmio_printf(iocom
, 4,
2506 "relay BIO %-3d %016jx %d@%016jx\n",
2507 biocount
, lmsg
->any
.head
.msgid
,
2508 lmsg
->any
.blk_read
.bytes
,
2509 lmsg
->any
.blk_read
.offset
);
2511 case DMSG_BLK_READ
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2512 case DMSG_BLK_WRITE
| DMSGF_CREATE
| DMSGF_DELETE
| DMSGF_REPLY
:
2513 dmio_printf(iocom
, 4,
2514 "retrn BIO %-3d %016jx %d@%016jx\n",
2515 biocount
, lmsg
->any
.head
.msgid
,
2516 lmsg
->any
.blk_read
.bytes
,
2517 lmsg
->any
.blk_read
.offset
);
2518 atomic_add_int(&biocount
, -1);
2525 if ((lmsg
->any
.head
.cmd
& (DMSGF_CREATE
| DMSGF_REPLY
)) ==
2528 * New sub-transaction, establish new state and relay.
2530 lstate
= lmsg
->state
;
2531 lpstate
= lstate
->parent
;
2532 rpstate
= lpstate
->relay
;
2533 assert(lstate
->relay
== NULL
);
2534 assert(rpstate
!= NULL
);
2536 rmsg
= dmsg_msg_alloc(rpstate
, 0,
2538 dmsg_state_relay
, NULL
);
2539 rstate
= rmsg
->state
;
2540 rstate
->relay
= lstate
;
2541 lstate
->relay
= rstate
;
2542 dmsg_state_hold(lstate
);
2543 dmsg_state_hold(rstate
);
2546 * State & relay already established
2548 lstate
= lmsg
->state
;
2549 rstate
= lstate
->relay
;
2550 assert(rstate
!= NULL
);
2552 assert((rstate
->txcmd
& DMSGF_DELETE
) == 0);
2555 if (lstate
->flags
& DMSG_STATE_ABORTING
) {
2556 dmio_printf(iocom
, 4,
2557 "relay: relay lost link l=%p r=%p\n",
2559 dmsg_simulate_failure(rstate
, 0, DMSG_ERR_LOSTLINK
);
2563 rmsg
= dmsg_msg_alloc(rstate
, 0,
2565 dmsg_state_relay
, NULL
);
2567 if (lmsg
->hdr_size
> sizeof(lmsg
->any
.head
)) {
2568 bcopy(&lmsg
->any
.head
+ 1, &rmsg
->any
.head
+ 1,
2569 lmsg
->hdr_size
- sizeof(lmsg
->any
.head
));
2571 rmsg
->any
.head
.error
= lmsg
->any
.head
.error
;
2572 rmsg
->any
.head
.reserved02
= lmsg
->any
.head
.reserved02
;
2573 rmsg
->any
.head
.reserved18
= lmsg
->any
.head
.reserved18
;
2574 rmsg
->aux_size
= lmsg
->aux_size
;
2575 rmsg
->aux_data
= lmsg
->aux_data
;
2576 lmsg
->aux_data
= NULL
;
2578 dmsg_msg_write(rmsg
);
2582 * Cleanup and retire msg after issuing the state callback. The state
2583 * has already been removed from the RB tree. The subq and msg must be
2586 * Called with the iocom mutex held (to handle subq disconnection).
2589 dmsg_state_cleanuprx(dmsg_iocom_t
*iocom
, dmsg_msg_t
*msg
)
2591 dmsg_state_t
*state
;
2593 assert(msg
->state
->iocom
== iocom
);
2595 if (state
->flags
& DMSG_STATE_ROOT
) {
2597 * Free a non-transactional message, there is no state
2601 } else if ((state
->flags
& DMSG_STATE_SUBINSERTED
) &&
2602 (state
->rxcmd
& DMSGF_DELETE
) &&
2603 (state
->txcmd
& DMSGF_DELETE
)) {
2605 * Must disconnect from parent and drop relay.
2607 dmsg_subq_delete(state
);
2609 dmsg_state_drop(state
->relay
);
2610 state
->relay
= NULL
;
2615 * Message not terminating transaction, leave state intact
2616 * and free message if it isn't the CREATE message.
2623 * Clean up the state after pulling out needed fields and queueing the
2624 * message for transmission. This occurs in dmsg_msg_write().
2626 * Called with the mutex locked.
2629 dmsg_state_cleanuptx(dmsg_iocom_t
*iocom
, dmsg_msg_t
*msg
)
2631 dmsg_state_t
*state
;
2633 assert(iocom
== msg
->state
->iocom
);
2636 dmsg_state_hold(state
);
2638 if (state
->flags
& DMSG_STATE_ROOT
) {
2640 } else if (msg
->any
.head
.cmd
& DMSGF_DELETE
) {
2642 * Message terminating transaction, destroy the related
2643 * state, the original message, and this message (if it
2644 * isn't the original message due to a CREATE|DELETE).
2646 * It's possible for governing state to terminate while
2647 * sub-transactions still exist. This is allowed but
2648 * will cause sub-transactions to recursively fail.
2649 * Further reception of sub-transaction messages will be
2650 * impossible because the circuit will no longer exist.
2651 * (XXX need code to make sure that happens properly).
2653 * NOTE: It is possible for a fafilure to terminate the
2654 * state after we have written the message but before
2655 * we are able to call cleanuptx, so txcmd might already
2656 * have DMSGF_DELETE set.
2658 if ((state
->txcmd
& DMSGF_DELETE
) == 0 &&
2659 (state
->rxcmd
& DMSGF_DELETE
)) {
2660 state
->txcmd
|= DMSGF_DELETE
;
2661 assert(state
->flags
& DMSG_STATE_RBINSERTED
);
2662 if (state
->txcmd
& DMSGF_REPLY
) {
2663 assert(msg
->any
.head
.cmd
& DMSGF_REPLY
);
2664 RB_REMOVE(dmsg_state_tree
,
2665 &iocom
->staterd_tree
, state
);
2667 assert((msg
->any
.head
.cmd
& DMSGF_REPLY
) == 0);
2668 RB_REMOVE(dmsg_state_tree
,
2669 &iocom
->statewr_tree
, state
);
2671 state
->flags
&= ~DMSG_STATE_RBINSERTED
;
2672 dmsg_subq_delete(state
);
2675 dmsg_state_drop(state
->relay
);
2676 state
->relay
= NULL
;
2678 dmsg_state_drop(state
); /* state->rbtree */
2679 } else if ((state
->txcmd
& DMSGF_DELETE
) == 0) {
2680 state
->txcmd
|= DMSGF_DELETE
;
2685 * Deferred abort after transmission.
2687 if ((state
->flags
& (DMSG_STATE_ABORTING
| DMSG_STATE_DYING
)) &&
2688 (state
->rxcmd
& DMSGF_DELETE
) == 0) {
2689 dmio_printf(iocom
, 4,
2690 "cleanuptx: state=%p "
2691 "executing deferred abort\n",
2693 state
->flags
&= ~DMSG_STATE_ABORTING
;
2694 dmsg_simulate_failure(state
, 1, DMSG_ERR_LOSTLINK
);
2697 dmsg_state_drop(state
);
2701 * Called with or without locks
2704 dmsg_state_hold(dmsg_state_t
*state
)
2706 atomic_add_int(&state
->refs
, 1);
2710 dmsg_state_drop(dmsg_state_t
*state
)
2712 assert(state
->refs
> 0);
2713 if (atomic_fetchadd_int(&state
->refs
, -1) == 1)
2714 dmsg_state_free(state
);
2718 * Called with iocom locked
2721 dmsg_state_free(dmsg_state_t
*state
)
2723 atomic_add_int(&dmsg_state_count
, -1);
2724 dmio_printf(state
->iocom
, 5, "terminate state %p\n", state
);
2725 assert((state
->flags
& (DMSG_STATE_ROOT
|
2726 DMSG_STATE_SUBINSERTED
|
2727 DMSG_STATE_RBINSERTED
)) == 0);
2728 assert(TAILQ_EMPTY(&state
->subq
));
2729 assert(state
->refs
== 0);
2730 if (state
->any
.any
!= NULL
) /* XXX avoid deadlock w/exit & kernel */
2732 assert(state
->any
.any
== NULL
);
2737 * This swaps endian for a hammer2_msg_hdr. Note that the extended
2738 * header is not adjusted, just the core header.
2741 dmsg_bswap_head(dmsg_hdr_t
*head
)
2743 head
->magic
= bswap16(head
->magic
);
2744 head
->reserved02
= bswap16(head
->reserved02
);
2745 head
->salt
= bswap32(head
->salt
);
2747 head
->msgid
= bswap64(head
->msgid
);
2748 head
->circuit
= bswap64(head
->circuit
);
2749 head
->reserved18
= bswap64(head
->reserved18
);
2751 head
->cmd
= bswap32(head
->cmd
);
2752 head
->aux_crc
= bswap32(head
->aux_crc
);
2753 head
->aux_bytes
= bswap32(head
->aux_bytes
);
2754 head
->error
= bswap32(head
->error
);
2755 head
->aux_descr
= bswap64(head
->aux_descr
);
2756 head
->reserved38
= bswap32(head
->reserved38
);
2757 head
->hdr_crc
= bswap32(head
->hdr_crc
);