2 * This file is subject to the terms and conditions of the GNU General Public
3 * License. See the file "COPYING" in the main directory of this archive
6 * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved.
10 * Cross Partition Communication (XPC) channel support.
12 * This is the part of XPC that manages the channels and
13 * sends/receives messages across them to/from other partitions.
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
28 * Guarantee that the kzalloc'd memory is cacheline aligned.
31 xpc_kzalloc_cacheline_aligned(size_t size
, gfp_t flags
, void **base
)
33 /* see if kzalloc will give us cachline aligned memory by default */
34 *base
= kzalloc(size
, flags
);
38 if ((u64
)*base
== L1_CACHE_ALIGN((u64
)*base
))
43 /* nope, we'll have to do it ourselves */
44 *base
= kzalloc(size
+ L1_CACHE_BYTES
, flags
);
48 return (void *)L1_CACHE_ALIGN((u64
)*base
);
52 * Allocate the local message queue and the notify queue.
55 xpc_allocate_local_msgqueue(struct xpc_channel
*ch
)
57 unsigned long irq_flags
;
61 for (nentries
= ch
->local_nentries
; nentries
> 0; nentries
--) {
63 nbytes
= nentries
* ch
->msg_size
;
64 ch
->local_msgqueue
= xpc_kzalloc_cacheline_aligned(nbytes
,
66 &ch
->local_msgqueue_base
);
67 if (ch
->local_msgqueue
== NULL
)
70 nbytes
= nentries
* sizeof(struct xpc_notify
);
71 ch
->notify_queue
= kzalloc(nbytes
, GFP_KERNEL
);
72 if (ch
->notify_queue
== NULL
) {
73 kfree(ch
->local_msgqueue_base
);
74 ch
->local_msgqueue
= NULL
;
78 spin_lock_irqsave(&ch
->lock
, irq_flags
);
79 if (nentries
< ch
->local_nentries
) {
80 dev_dbg(xpc_chan
, "nentries=%d local_nentries=%d, "
81 "partid=%d, channel=%d\n", nentries
,
82 ch
->local_nentries
, ch
->partid
, ch
->number
);
84 ch
->local_nentries
= nentries
;
86 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
90 dev_dbg(xpc_chan
, "can't get memory for local message queue and notify "
91 "queue, partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
96 * Allocate the cached remote message queue.
99 xpc_allocate_remote_msgqueue(struct xpc_channel
*ch
)
101 unsigned long irq_flags
;
105 DBUG_ON(ch
->remote_nentries
<= 0);
107 for (nentries
= ch
->remote_nentries
; nentries
> 0; nentries
--) {
109 nbytes
= nentries
* ch
->msg_size
;
110 ch
->remote_msgqueue
= xpc_kzalloc_cacheline_aligned(nbytes
,
112 &ch
->remote_msgqueue_base
);
113 if (ch
->remote_msgqueue
== NULL
)
116 spin_lock_irqsave(&ch
->lock
, irq_flags
);
117 if (nentries
< ch
->remote_nentries
) {
118 dev_dbg(xpc_chan
, "nentries=%d remote_nentries=%d, "
119 "partid=%d, channel=%d\n", nentries
,
120 ch
->remote_nentries
, ch
->partid
, ch
->number
);
122 ch
->remote_nentries
= nentries
;
124 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
128 dev_dbg(xpc_chan
, "can't get memory for cached remote message queue, "
129 "partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
134 * Allocate message queues and other stuff associated with a channel.
136 * Note: Assumes all of the channel sizes are filled in.
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel
*ch
)
141 unsigned long irq_flags
;
144 DBUG_ON(ch
->flags
& XPC_C_SETUP
);
146 ret
= xpc_allocate_local_msgqueue(ch
);
147 if (ret
!= xpSuccess
)
150 ret
= xpc_allocate_remote_msgqueue(ch
);
151 if (ret
!= xpSuccess
) {
152 kfree(ch
->local_msgqueue_base
);
153 ch
->local_msgqueue
= NULL
;
154 kfree(ch
->notify_queue
);
155 ch
->notify_queue
= NULL
;
159 spin_lock_irqsave(&ch
->lock
, irq_flags
);
160 ch
->flags
|= XPC_C_SETUP
;
161 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
167 * Process a connect message from a remote partition.
169 * Note: xpc_process_connect() is expecting to be called with the
170 * spin_lock_irqsave held and will leave it locked upon return.
173 xpc_process_connect(struct xpc_channel
*ch
, unsigned long *irq_flags
)
177 DBUG_ON(!spin_is_locked(&ch
->lock
));
179 if (!(ch
->flags
& XPC_C_OPENREQUEST
) ||
180 !(ch
->flags
& XPC_C_ROPENREQUEST
)) {
181 /* nothing more to do for now */
184 DBUG_ON(!(ch
->flags
& XPC_C_CONNECTING
));
186 if (!(ch
->flags
& XPC_C_SETUP
)) {
187 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
188 ret
= xpc_allocate_msgqueues(ch
);
189 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
191 if (ret
!= xpSuccess
)
192 XPC_DISCONNECT_CHANNEL(ch
, ret
, irq_flags
);
194 if (ch
->flags
& (XPC_C_CONNECTED
| XPC_C_DISCONNECTING
))
197 DBUG_ON(!(ch
->flags
& XPC_C_SETUP
));
198 DBUG_ON(ch
->local_msgqueue
== NULL
);
199 DBUG_ON(ch
->remote_msgqueue
== NULL
);
202 if (!(ch
->flags
& XPC_C_OPENREPLY
)) {
203 ch
->flags
|= XPC_C_OPENREPLY
;
204 xpc_send_chctl_openreply(ch
, irq_flags
);
207 if (!(ch
->flags
& XPC_C_ROPENREPLY
))
210 DBUG_ON(ch
->remote_msgqueue_pa
== 0);
212 ch
->flags
= (XPC_C_CONNECTED
| XPC_C_SETUP
); /* clear all else */
214 dev_info(xpc_chan
, "channel %d to partition %d connected\n",
215 ch
->number
, ch
->partid
);
217 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
218 xpc_create_kthreads(ch
, 1, 0);
219 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
223 * Free up message queues and other stuff that were allocated for the specified
226 * Note: ch->reason and ch->reason_line are left set for debugging purposes,
227 * they're cleared when XPC_C_DISCONNECTED is cleared.
230 xpc_free_msgqueues(struct xpc_channel
*ch
)
232 struct xpc_channel_sn2
*ch_sn2
= &ch
->sn
.sn2
;
234 DBUG_ON(!spin_is_locked(&ch
->lock
));
235 DBUG_ON(atomic_read(&ch
->n_to_notify
) != 0);
237 ch
->remote_msgqueue_pa
= 0;
241 ch
->local_nentries
= 0;
242 ch
->remote_nentries
= 0;
243 ch
->kthreads_assigned_limit
= 0;
244 ch
->kthreads_idle_limit
= 0;
246 ch_sn2
->local_GP
->get
= 0;
247 ch_sn2
->local_GP
->put
= 0;
248 ch_sn2
->remote_GP
.get
= 0;
249 ch_sn2
->remote_GP
.put
= 0;
250 ch_sn2
->w_local_GP
.get
= 0;
251 ch_sn2
->w_local_GP
.put
= 0;
252 ch_sn2
->w_remote_GP
.get
= 0;
253 ch_sn2
->w_remote_GP
.put
= 0;
254 ch_sn2
->next_msg_to_pull
= 0;
256 if (ch
->flags
& XPC_C_SETUP
) {
257 ch
->flags
&= ~XPC_C_SETUP
;
259 dev_dbg(xpc_chan
, "ch->flags=0x%x, partid=%d, channel=%d\n",
260 ch
->flags
, ch
->partid
, ch
->number
);
262 kfree(ch
->local_msgqueue_base
);
263 ch
->local_msgqueue
= NULL
;
264 kfree(ch
->remote_msgqueue_base
);
265 ch
->remote_msgqueue
= NULL
;
266 kfree(ch
->notify_queue
);
267 ch
->notify_queue
= NULL
;
272 * spin_lock_irqsave() is expected to be held on entry.
275 xpc_process_disconnect(struct xpc_channel
*ch
, unsigned long *irq_flags
)
277 struct xpc_partition
*part
= &xpc_partitions
[ch
->partid
];
278 u32 channel_was_connected
= (ch
->flags
& XPC_C_WASCONNECTED
);
280 DBUG_ON(!spin_is_locked(&ch
->lock
));
282 if (!(ch
->flags
& XPC_C_DISCONNECTING
))
285 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREQUEST
));
287 /* make sure all activity has settled down first */
289 if (atomic_read(&ch
->kthreads_assigned
) > 0 ||
290 atomic_read(&ch
->references
) > 0) {
293 DBUG_ON((ch
->flags
& XPC_C_CONNECTEDCALLOUT_MADE
) &&
294 !(ch
->flags
& XPC_C_DISCONNECTINGCALLOUT_MADE
));
296 if (part
->act_state
== XPC_P_DEACTIVATING
) {
297 /* can't proceed until the other side disengages from us */
298 if (xpc_partition_engaged(ch
->partid
))
303 /* as long as the other side is up do the full protocol */
305 if (!(ch
->flags
& XPC_C_RCLOSEREQUEST
))
308 if (!(ch
->flags
& XPC_C_CLOSEREPLY
)) {
309 ch
->flags
|= XPC_C_CLOSEREPLY
;
310 xpc_send_chctl_closereply(ch
, irq_flags
);
313 if (!(ch
->flags
& XPC_C_RCLOSEREPLY
))
317 /* wake those waiting for notify completion */
318 if (atomic_read(&ch
->n_to_notify
) > 0) {
319 /* >>> we do callout while holding ch->lock */
320 xpc_notify_senders_of_disconnect(ch
);
323 /* both sides are disconnected now */
325 if (ch
->flags
& XPC_C_DISCONNECTINGCALLOUT_MADE
) {
326 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
327 xpc_disconnect_callout(ch
, xpDisconnected
);
328 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
331 /* it's now safe to free the channel's message queues */
332 xpc_free_msgqueues(ch
);
334 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
335 ch
->flags
= (XPC_C_DISCONNECTED
| (ch
->flags
& XPC_C_WDISCONNECT
));
337 atomic_dec(&part
->nchannels_active
);
339 if (channel_was_connected
) {
340 dev_info(xpc_chan
, "channel %d to partition %d disconnected, "
341 "reason=%d\n", ch
->number
, ch
->partid
, ch
->reason
);
344 if (ch
->flags
& XPC_C_WDISCONNECT
) {
345 /* we won't lose the CPU since we're holding ch->lock */
346 complete(&ch
->wdisconnect_wait
);
347 } else if (ch
->delayed_chctl_flags
) {
348 if (part
->act_state
!= XPC_P_DEACTIVATING
) {
349 /* time to take action on any delayed chctl flags */
350 spin_lock(&part
->chctl_lock
);
351 part
->chctl
.flags
[ch
->number
] |=
352 ch
->delayed_chctl_flags
;
353 spin_unlock(&part
->chctl_lock
);
355 ch
->delayed_chctl_flags
= 0;
360 * Process a change in the channel's remote connection state.
363 xpc_process_openclose_chctl_flags(struct xpc_partition
*part
, int ch_number
,
366 unsigned long irq_flags
;
367 struct xpc_openclose_args
*args
=
368 &part
->remote_openclose_args
[ch_number
];
369 struct xpc_channel
*ch
= &part
->channels
[ch_number
];
370 enum xp_retval reason
;
372 spin_lock_irqsave(&ch
->lock
, irq_flags
);
376 if ((ch
->flags
& XPC_C_DISCONNECTED
) &&
377 (ch
->flags
& XPC_C_WDISCONNECT
)) {
379 * Delay processing chctl flags until thread waiting disconnect
380 * has had a chance to see that the channel is disconnected.
382 ch
->delayed_chctl_flags
|= chctl_flags
;
383 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
387 if (chctl_flags
& XPC_CHCTL_CLOSEREQUEST
) {
389 dev_dbg(xpc_chan
, "XPC_CHCTL_CLOSEREQUEST (reason=%d) received "
390 "from partid=%d, channel=%d\n", args
->reason
,
391 ch
->partid
, ch
->number
);
394 * If RCLOSEREQUEST is set, we're probably waiting for
395 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
396 * with this RCLOSEREQUEST in the chctl_flags.
399 if (ch
->flags
& XPC_C_RCLOSEREQUEST
) {
400 DBUG_ON(!(ch
->flags
& XPC_C_DISCONNECTING
));
401 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREQUEST
));
402 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREPLY
));
403 DBUG_ON(ch
->flags
& XPC_C_RCLOSEREPLY
);
405 DBUG_ON(!(chctl_flags
& XPC_CHCTL_CLOSEREPLY
));
406 chctl_flags
&= ~XPC_CHCTL_CLOSEREPLY
;
407 ch
->flags
|= XPC_C_RCLOSEREPLY
;
409 /* both sides have finished disconnecting */
410 xpc_process_disconnect(ch
, &irq_flags
);
411 DBUG_ON(!(ch
->flags
& XPC_C_DISCONNECTED
));
415 if (ch
->flags
& XPC_C_DISCONNECTED
) {
416 if (!(chctl_flags
& XPC_CHCTL_OPENREQUEST
)) {
417 if (part
->chctl
.flags
[ch_number
] &
418 XPC_CHCTL_OPENREQUEST
) {
420 DBUG_ON(ch
->delayed_chctl_flags
!= 0);
421 spin_lock(&part
->chctl_lock
);
422 part
->chctl
.flags
[ch_number
] |=
423 XPC_CHCTL_CLOSEREQUEST
;
424 spin_unlock(&part
->chctl_lock
);
426 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
430 XPC_SET_REASON(ch
, 0, 0);
431 ch
->flags
&= ~XPC_C_DISCONNECTED
;
433 atomic_inc(&part
->nchannels_active
);
434 ch
->flags
|= (XPC_C_CONNECTING
| XPC_C_ROPENREQUEST
);
437 chctl_flags
&= ~(XPC_CHCTL_OPENREQUEST
| XPC_CHCTL_OPENREPLY
);
440 * The meaningful CLOSEREQUEST connection state fields are:
441 * reason = reason connection is to be closed
444 ch
->flags
|= XPC_C_RCLOSEREQUEST
;
446 if (!(ch
->flags
& XPC_C_DISCONNECTING
)) {
447 reason
= args
->reason
;
448 if (reason
<= xpSuccess
|| reason
> xpUnknownReason
)
449 reason
= xpUnknownReason
;
450 else if (reason
== xpUnregistering
)
451 reason
= xpOtherUnregistering
;
453 XPC_DISCONNECT_CHANNEL(ch
, reason
, &irq_flags
);
455 DBUG_ON(chctl_flags
& XPC_CHCTL_CLOSEREPLY
);
456 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
460 xpc_process_disconnect(ch
, &irq_flags
);
463 if (chctl_flags
& XPC_CHCTL_CLOSEREPLY
) {
465 dev_dbg(xpc_chan
, "XPC_CHCTL_CLOSEREPLY received from partid="
466 "%d, channel=%d\n", ch
->partid
, ch
->number
);
468 if (ch
->flags
& XPC_C_DISCONNECTED
) {
469 DBUG_ON(part
->act_state
!= XPC_P_DEACTIVATING
);
470 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
474 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREQUEST
));
476 if (!(ch
->flags
& XPC_C_RCLOSEREQUEST
)) {
477 if (part
->chctl
.flags
[ch_number
] &
478 XPC_CHCTL_CLOSEREQUEST
) {
480 DBUG_ON(ch
->delayed_chctl_flags
!= 0);
481 spin_lock(&part
->chctl_lock
);
482 part
->chctl
.flags
[ch_number
] |=
483 XPC_CHCTL_CLOSEREPLY
;
484 spin_unlock(&part
->chctl_lock
);
486 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
490 ch
->flags
|= XPC_C_RCLOSEREPLY
;
492 if (ch
->flags
& XPC_C_CLOSEREPLY
) {
493 /* both sides have finished disconnecting */
494 xpc_process_disconnect(ch
, &irq_flags
);
498 if (chctl_flags
& XPC_CHCTL_OPENREQUEST
) {
500 dev_dbg(xpc_chan
, "XPC_CHCTL_OPENREQUEST (msg_size=%d, "
501 "local_nentries=%d) received from partid=%d, "
502 "channel=%d\n", args
->msg_size
, args
->local_nentries
,
503 ch
->partid
, ch
->number
);
505 if (part
->act_state
== XPC_P_DEACTIVATING
||
506 (ch
->flags
& XPC_C_ROPENREQUEST
)) {
507 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
511 if (ch
->flags
& (XPC_C_DISCONNECTING
| XPC_C_WDISCONNECT
)) {
512 ch
->delayed_chctl_flags
|= XPC_CHCTL_OPENREQUEST
;
513 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
516 DBUG_ON(!(ch
->flags
& (XPC_C_DISCONNECTED
|
517 XPC_C_OPENREQUEST
)));
518 DBUG_ON(ch
->flags
& (XPC_C_ROPENREQUEST
| XPC_C_ROPENREPLY
|
519 XPC_C_OPENREPLY
| XPC_C_CONNECTED
));
522 * The meaningful OPENREQUEST connection state fields are:
523 * msg_size = size of channel's messages in bytes
524 * local_nentries = remote partition's local_nentries
526 if (args
->msg_size
== 0 || args
->local_nentries
== 0) {
527 /* assume OPENREQUEST was delayed by mistake */
528 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
532 ch
->flags
|= (XPC_C_ROPENREQUEST
| XPC_C_CONNECTING
);
533 ch
->remote_nentries
= args
->local_nentries
;
535 if (ch
->flags
& XPC_C_OPENREQUEST
) {
536 if (args
->msg_size
!= ch
->msg_size
) {
537 XPC_DISCONNECT_CHANNEL(ch
, xpUnequalMsgSizes
,
539 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
543 ch
->msg_size
= args
->msg_size
;
545 XPC_SET_REASON(ch
, 0, 0);
546 ch
->flags
&= ~XPC_C_DISCONNECTED
;
548 atomic_inc(&part
->nchannels_active
);
551 xpc_process_connect(ch
, &irq_flags
);
554 if (chctl_flags
& XPC_CHCTL_OPENREPLY
) {
556 dev_dbg(xpc_chan
, "XPC_CHCTL_OPENREPLY (local_msgqueue_pa="
557 "0x%lx, local_nentries=%d, remote_nentries=%d) "
558 "received from partid=%d, channel=%d\n",
559 args
->local_msgqueue_pa
, args
->local_nentries
,
560 args
->remote_nentries
, ch
->partid
, ch
->number
);
562 if (ch
->flags
& (XPC_C_DISCONNECTING
| XPC_C_DISCONNECTED
)) {
563 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
566 if (!(ch
->flags
& XPC_C_OPENREQUEST
)) {
567 XPC_DISCONNECT_CHANNEL(ch
, xpOpenCloseError
,
569 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
573 DBUG_ON(!(ch
->flags
& XPC_C_ROPENREQUEST
));
574 DBUG_ON(ch
->flags
& XPC_C_CONNECTED
);
577 * The meaningful OPENREPLY connection state fields are:
578 * local_msgqueue_pa = physical address of remote
579 * partition's local_msgqueue
580 * local_nentries = remote partition's local_nentries
581 * remote_nentries = remote partition's remote_nentries
583 DBUG_ON(args
->local_msgqueue_pa
== 0);
584 DBUG_ON(args
->local_nentries
== 0);
585 DBUG_ON(args
->remote_nentries
== 0);
587 ch
->flags
|= XPC_C_ROPENREPLY
;
588 ch
->remote_msgqueue_pa
= args
->local_msgqueue_pa
;
590 if (args
->local_nentries
< ch
->remote_nentries
) {
591 dev_dbg(xpc_chan
, "XPC_CHCTL_OPENREPLY: new "
592 "remote_nentries=%d, old remote_nentries=%d, "
593 "partid=%d, channel=%d\n",
594 args
->local_nentries
, ch
->remote_nentries
,
595 ch
->partid
, ch
->number
);
597 ch
->remote_nentries
= args
->local_nentries
;
599 if (args
->remote_nentries
< ch
->local_nentries
) {
600 dev_dbg(xpc_chan
, "XPC_CHCTL_OPENREPLY: new "
601 "local_nentries=%d, old local_nentries=%d, "
602 "partid=%d, channel=%d\n",
603 args
->remote_nentries
, ch
->local_nentries
,
604 ch
->partid
, ch
->number
);
606 ch
->local_nentries
= args
->remote_nentries
;
609 xpc_process_connect(ch
, &irq_flags
);
612 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
616 * Attempt to establish a channel connection to a remote partition.
618 static enum xp_retval
619 xpc_connect_channel(struct xpc_channel
*ch
)
621 unsigned long irq_flags
;
622 struct xpc_registration
*registration
= &xpc_registrations
[ch
->number
];
624 if (mutex_trylock(®istration
->mutex
) == 0)
627 if (!XPC_CHANNEL_REGISTERED(ch
->number
)) {
628 mutex_unlock(®istration
->mutex
);
629 return xpUnregistered
;
632 spin_lock_irqsave(&ch
->lock
, irq_flags
);
634 DBUG_ON(ch
->flags
& XPC_C_CONNECTED
);
635 DBUG_ON(ch
->flags
& XPC_C_OPENREQUEST
);
637 if (ch
->flags
& XPC_C_DISCONNECTING
) {
638 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
639 mutex_unlock(®istration
->mutex
);
643 /* add info from the channel connect registration to the channel */
645 ch
->kthreads_assigned_limit
= registration
->assigned_limit
;
646 ch
->kthreads_idle_limit
= registration
->idle_limit
;
647 DBUG_ON(atomic_read(&ch
->kthreads_assigned
) != 0);
648 DBUG_ON(atomic_read(&ch
->kthreads_idle
) != 0);
649 DBUG_ON(atomic_read(&ch
->kthreads_active
) != 0);
651 ch
->func
= registration
->func
;
652 DBUG_ON(registration
->func
== NULL
);
653 ch
->key
= registration
->key
;
655 ch
->local_nentries
= registration
->nentries
;
657 if (ch
->flags
& XPC_C_ROPENREQUEST
) {
658 if (registration
->msg_size
!= ch
->msg_size
) {
659 /* the local and remote sides aren't the same */
662 * Because XPC_DISCONNECT_CHANNEL() can block we're
663 * forced to up the registration sema before we unlock
664 * the channel lock. But that's okay here because we're
665 * done with the part that required the registration
666 * sema. XPC_DISCONNECT_CHANNEL() requires that the
667 * channel lock be locked and will unlock and relock
668 * the channel lock as needed.
670 mutex_unlock(®istration
->mutex
);
671 XPC_DISCONNECT_CHANNEL(ch
, xpUnequalMsgSizes
,
673 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
674 return xpUnequalMsgSizes
;
677 ch
->msg_size
= registration
->msg_size
;
679 XPC_SET_REASON(ch
, 0, 0);
680 ch
->flags
&= ~XPC_C_DISCONNECTED
;
682 atomic_inc(&xpc_partitions
[ch
->partid
].nchannels_active
);
685 mutex_unlock(®istration
->mutex
);
687 /* initiate the connection */
689 ch
->flags
|= (XPC_C_OPENREQUEST
| XPC_C_CONNECTING
);
690 xpc_send_chctl_openrequest(ch
, &irq_flags
);
692 xpc_process_connect(ch
, &irq_flags
);
694 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
700 xpc_process_sent_chctl_flags(struct xpc_partition
*part
)
702 unsigned long irq_flags
;
703 union xpc_channel_ctl_flags chctl
;
704 struct xpc_channel
*ch
;
708 chctl
.all_flags
= xpc_get_chctl_all_flags(part
);
711 * Initiate channel connections for registered channels.
713 * For each connected channel that has pending messages activate idle
714 * kthreads and/or create new kthreads as needed.
717 for (ch_number
= 0; ch_number
< part
->nchannels
; ch_number
++) {
718 ch
= &part
->channels
[ch_number
];
721 * Process any open or close related chctl flags, and then deal
722 * with connecting or disconnecting the channel as required.
725 if (chctl
.flags
[ch_number
] & XPC_OPENCLOSE_CHCTL_FLAGS
) {
726 xpc_process_openclose_chctl_flags(part
, ch_number
,
727 chctl
.flags
[ch_number
]);
730 ch_flags
= ch
->flags
; /* need an atomic snapshot of flags */
732 if (ch_flags
& XPC_C_DISCONNECTING
) {
733 spin_lock_irqsave(&ch
->lock
, irq_flags
);
734 xpc_process_disconnect(ch
, &irq_flags
);
735 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
739 if (part
->act_state
== XPC_P_DEACTIVATING
)
742 if (!(ch_flags
& XPC_C_CONNECTED
)) {
743 if (!(ch_flags
& XPC_C_OPENREQUEST
)) {
744 DBUG_ON(ch_flags
& XPC_C_SETUP
);
745 (void)xpc_connect_channel(ch
);
747 spin_lock_irqsave(&ch
->lock
, irq_flags
);
748 xpc_process_connect(ch
, &irq_flags
);
749 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
755 * Process any message related chctl flags, this may involve
756 * the activation of kthreads to deliver any pending messages
757 * sent from the other partition.
760 if (chctl
.flags
[ch_number
] & XPC_MSG_CHCTL_FLAGS
)
761 xpc_process_msg_chctl_flags(part
, ch_number
);
766 * XPC's heartbeat code calls this function to inform XPC that a partition is
767 * going down. XPC responds by tearing down the XPartition Communication
768 * infrastructure used for the just downed partition.
770 * XPC's heartbeat code will never call this function and xpc_partition_up()
771 * at the same time. Nor will it ever make multiple calls to either function
775 xpc_partition_going_down(struct xpc_partition
*part
, enum xp_retval reason
)
777 unsigned long irq_flags
;
779 struct xpc_channel
*ch
;
781 dev_dbg(xpc_chan
, "deactivating partition %d, reason=%d\n",
782 XPC_PARTID(part
), reason
);
784 if (!xpc_part_ref(part
)) {
785 /* infrastructure for this partition isn't currently set up */
789 /* disconnect channels associated with the partition going down */
791 for (ch_number
= 0; ch_number
< part
->nchannels
; ch_number
++) {
792 ch
= &part
->channels
[ch_number
];
794 xpc_msgqueue_ref(ch
);
795 spin_lock_irqsave(&ch
->lock
, irq_flags
);
797 XPC_DISCONNECT_CHANNEL(ch
, reason
, &irq_flags
);
799 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
800 xpc_msgqueue_deref(ch
);
803 xpc_wakeup_channel_mgr(part
);
805 xpc_part_deref(part
);
809 * Called by XP at the time of channel connection registration to cause
810 * XPC to establish connections to all currently active partitions.
813 xpc_initiate_connect(int ch_number
)
816 struct xpc_partition
*part
;
817 struct xpc_channel
*ch
;
819 DBUG_ON(ch_number
< 0 || ch_number
>= XPC_MAX_NCHANNELS
);
821 for (partid
= 0; partid
< xp_max_npartitions
; partid
++) {
822 part
= &xpc_partitions
[partid
];
824 if (xpc_part_ref(part
)) {
825 ch
= &part
->channels
[ch_number
];
828 * Initiate the establishment of a connection on the
829 * newly registered channel to the remote partition.
831 xpc_wakeup_channel_mgr(part
);
832 xpc_part_deref(part
);
838 xpc_connected_callout(struct xpc_channel
*ch
)
840 /* let the registerer know that a connection has been established */
842 if (ch
->func
!= NULL
) {
843 dev_dbg(xpc_chan
, "ch->func() called, reason=xpConnected, "
844 "partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
846 ch
->func(xpConnected
, ch
->partid
, ch
->number
,
847 (void *)(u64
)ch
->local_nentries
, ch
->key
);
849 dev_dbg(xpc_chan
, "ch->func() returned, reason=xpConnected, "
850 "partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
855 * Called by XP at the time of channel connection unregistration to cause
856 * XPC to teardown all current connections for the specified channel.
858 * Before returning xpc_initiate_disconnect() will wait until all connections
859 * on the specified channel have been closed/torndown. So the caller can be
860 * assured that they will not be receiving any more callouts from XPC to the
861 * function they registered via xpc_connect().
865 * ch_number - channel # to unregister.
868 xpc_initiate_disconnect(int ch_number
)
870 unsigned long irq_flags
;
872 struct xpc_partition
*part
;
873 struct xpc_channel
*ch
;
875 DBUG_ON(ch_number
< 0 || ch_number
>= XPC_MAX_NCHANNELS
);
877 /* initiate the channel disconnect for every active partition */
878 for (partid
= 0; partid
< xp_max_npartitions
; partid
++) {
879 part
= &xpc_partitions
[partid
];
881 if (xpc_part_ref(part
)) {
882 ch
= &part
->channels
[ch_number
];
883 xpc_msgqueue_ref(ch
);
885 spin_lock_irqsave(&ch
->lock
, irq_flags
);
887 if (!(ch
->flags
& XPC_C_DISCONNECTED
)) {
888 ch
->flags
|= XPC_C_WDISCONNECT
;
890 XPC_DISCONNECT_CHANNEL(ch
, xpUnregistering
,
894 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
896 xpc_msgqueue_deref(ch
);
897 xpc_part_deref(part
);
901 xpc_disconnect_wait(ch_number
);
905 * To disconnect a channel, and reflect it back to all who may be waiting.
907 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
908 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
909 * xpc_disconnect_wait().
911 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
914 xpc_disconnect_channel(const int line
, struct xpc_channel
*ch
,
915 enum xp_retval reason
, unsigned long *irq_flags
)
917 u32 channel_was_connected
= (ch
->flags
& XPC_C_CONNECTED
);
919 DBUG_ON(!spin_is_locked(&ch
->lock
));
921 if (ch
->flags
& (XPC_C_DISCONNECTING
| XPC_C_DISCONNECTED
))
924 DBUG_ON(!(ch
->flags
& (XPC_C_CONNECTING
| XPC_C_CONNECTED
)));
926 dev_dbg(xpc_chan
, "reason=%d, line=%d, partid=%d, channel=%d\n",
927 reason
, line
, ch
->partid
, ch
->number
);
929 XPC_SET_REASON(ch
, reason
, line
);
931 ch
->flags
|= (XPC_C_CLOSEREQUEST
| XPC_C_DISCONNECTING
);
932 /* some of these may not have been set */
933 ch
->flags
&= ~(XPC_C_OPENREQUEST
| XPC_C_OPENREPLY
|
934 XPC_C_ROPENREQUEST
| XPC_C_ROPENREPLY
|
935 XPC_C_CONNECTING
| XPC_C_CONNECTED
);
937 xpc_send_chctl_closerequest(ch
, irq_flags
);
939 if (channel_was_connected
)
940 ch
->flags
|= XPC_C_WASCONNECTED
;
942 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
944 /* wake all idle kthreads so they can exit */
945 if (atomic_read(&ch
->kthreads_idle
) > 0) {
946 wake_up_all(&ch
->idle_wq
);
948 } else if ((ch
->flags
& XPC_C_CONNECTEDCALLOUT_MADE
) &&
949 !(ch
->flags
& XPC_C_DISCONNECTINGCALLOUT
)) {
950 /* start a kthread that will do the xpDisconnecting callout */
951 xpc_create_kthreads(ch
, 1, 1);
954 /* wake those waiting to allocate an entry from the local msg queue */
955 if (atomic_read(&ch
->n_on_msg_allocate_wq
) > 0)
956 wake_up(&ch
->msg_allocate_wq
);
958 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
962 xpc_disconnect_callout(struct xpc_channel
*ch
, enum xp_retval reason
)
965 * Let the channel's registerer know that the channel is being
966 * disconnected. We don't want to do this if the registerer was never
967 * informed of a connection being made.
970 if (ch
->func
!= NULL
) {
971 dev_dbg(xpc_chan
, "ch->func() called, reason=%d, partid=%d, "
972 "channel=%d\n", reason
, ch
->partid
, ch
->number
);
974 ch
->func(reason
, ch
->partid
, ch
->number
, NULL
, ch
->key
);
976 dev_dbg(xpc_chan
, "ch->func() returned, reason=%d, partid=%d, "
977 "channel=%d\n", reason
, ch
->partid
, ch
->number
);
982 * Wait for a message entry to become available for the specified channel,
983 * but don't wait any longer than 1 jiffy.
986 xpc_allocate_msg_wait(struct xpc_channel
*ch
)
990 if (ch
->flags
& XPC_C_DISCONNECTING
) {
991 DBUG_ON(ch
->reason
== xpInterrupted
);
995 atomic_inc(&ch
->n_on_msg_allocate_wq
);
996 ret
= interruptible_sleep_on_timeout(&ch
->msg_allocate_wq
, 1);
997 atomic_dec(&ch
->n_on_msg_allocate_wq
);
999 if (ch
->flags
& XPC_C_DISCONNECTING
) {
1001 DBUG_ON(ch
->reason
== xpInterrupted
);
1002 } else if (ret
== 0) {
1005 ret
= xpInterrupted
;
1012 * Send a message that contains the user's payload on the specified channel
1013 * connected to the specified partition.
1015 * NOTE that this routine can sleep waiting for a message entry to become
1016 * available. To not sleep, pass in the XPC_NOWAIT flag.
1018 * Once sent, this routine will not wait for the message to be received, nor
1019 * will notification be given when it does happen.
1023 * partid - ID of partition to which the channel is connected.
1024 * ch_number - channel # to send message on.
1025 * flags - see xp.h for valid flags.
1026 * payload - pointer to the payload which is to be sent.
1027 * payload_size - size of the payload in bytes.
1030 xpc_initiate_send(short partid
, int ch_number
, u32 flags
, void *payload
,
1033 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1034 enum xp_retval ret
= xpUnknownReason
;
1036 dev_dbg(xpc_chan
, "payload=0x%p, partid=%d, channel=%d\n", payload
,
1039 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1040 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1041 DBUG_ON(payload
== NULL
);
1043 if (xpc_part_ref(part
)) {
1044 ret
= xpc_send_msg(&part
->channels
[ch_number
], flags
, payload
,
1045 payload_size
, 0, NULL
, NULL
);
1046 xpc_part_deref(part
);
1053 * Send a message that contains the user's payload on the specified channel
1054 * connected to the specified partition.
1056 * NOTE that this routine can sleep waiting for a message entry to become
1057 * available. To not sleep, pass in the XPC_NOWAIT flag.
1059 * This routine will not wait for the message to be sent or received.
1061 * Once the remote end of the channel has received the message, the function
1062 * passed as an argument to xpc_initiate_send_notify() will be called. This
1063 * allows the sender to free up or re-use any buffers referenced by the
1064 * message, but does NOT mean the message has been processed at the remote
1065 * end by a receiver.
1067 * If this routine returns an error, the caller's function will NOT be called.
1071 * partid - ID of partition to which the channel is connected.
1072 * ch_number - channel # to send message on.
1073 * flags - see xp.h for valid flags.
1074 * payload - pointer to the payload which is to be sent.
1075 * payload_size - size of the payload in bytes.
1076 * func - function to call with asynchronous notification of message
1077 * receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1078 * key - user-defined key to be passed to the function when it's called.
1081 xpc_initiate_send_notify(short partid
, int ch_number
, u32 flags
, void *payload
,
1082 u16 payload_size
, xpc_notify_func func
, void *key
)
1084 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1085 enum xp_retval ret
= xpUnknownReason
;
1087 dev_dbg(xpc_chan
, "payload=0x%p, partid=%d, channel=%d\n", payload
,
1090 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1091 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1092 DBUG_ON(payload
== NULL
);
1093 DBUG_ON(func
== NULL
);
1095 if (xpc_part_ref(part
)) {
1096 ret
= xpc_send_msg(&part
->channels
[ch_number
], flags
, payload
,
1097 payload_size
, XPC_N_CALL
, func
, key
);
1098 xpc_part_deref(part
);
1104 * Deliver a message to its intended recipient.
1107 xpc_deliver_msg(struct xpc_channel
*ch
)
1109 struct xpc_msg
*msg
;
1111 msg
= xpc_get_deliverable_msg(ch
);
1115 * This ref is taken to protect the payload itself from being
1116 * freed before the user is finished with it, which the user
1117 * indicates by calling xpc_initiate_received().
1119 xpc_msgqueue_ref(ch
);
1121 atomic_inc(&ch
->kthreads_active
);
1123 if (ch
->func
!= NULL
) {
1124 dev_dbg(xpc_chan
, "ch->func() called, msg=0x%p, "
1125 "msg_number=%ld, partid=%d, channel=%d\n",
1126 (void *)msg
, msg
->number
, ch
->partid
,
1129 /* deliver the message to its intended recipient */
1130 ch
->func(xpMsgReceived
, ch
->partid
, ch
->number
,
1131 &msg
->payload
, ch
->key
);
1133 dev_dbg(xpc_chan
, "ch->func() returned, msg=0x%p, "
1134 "msg_number=%ld, partid=%d, channel=%d\n",
1135 (void *)msg
, msg
->number
, ch
->partid
,
1139 atomic_dec(&ch
->kthreads_active
);
1144 * Acknowledge receipt of a delivered message.
1146 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1147 * that sent the message.
1149 * This function, although called by users, does not call xpc_part_ref() to
1150 * ensure that the partition infrastructure is in place. It relies on the
1151 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1155 * partid - ID of partition to which the channel is connected.
1156 * ch_number - channel # message received on.
1157 * payload - pointer to the payload area allocated via
1158 * xpc_initiate_send() or xpc_initiate_send_notify().
1161 xpc_initiate_received(short partid
, int ch_number
, void *payload
)
1163 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1164 struct xpc_channel
*ch
;
1165 struct xpc_msg
*msg
= XPC_MSG_ADDRESS(payload
);
1167 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1168 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1170 ch
= &part
->channels
[ch_number
];
1171 xpc_received_msg(ch
, msg
);
1173 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */
1174 xpc_msgqueue_deref(ch
);