2 * This file is subject to the terms and conditions of the GNU General Public
3 * License. See the file "COPYING" in the main directory of this archive
6 * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved.
10 * Cross Partition Communication (XPC) channel support.
12 * This is the part of XPC that manages the channels and
13 * sends/receives messages across them to/from other partitions.
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
28 * Guarantee that the kzalloc'd memory is cacheline aligned.
31 xpc_kzalloc_cacheline_aligned(size_t size
, gfp_t flags
, void **base
)
33 /* see if kzalloc will give us cachline aligned memory by default */
34 *base
= kzalloc(size
, flags
);
38 if ((u64
)*base
== L1_CACHE_ALIGN((u64
)*base
))
43 /* nope, we'll have to do it ourselves */
44 *base
= kzalloc(size
+ L1_CACHE_BYTES
, flags
);
48 return (void *)L1_CACHE_ALIGN((u64
)*base
);
52 * Allocate the local message queue and the notify queue.
55 xpc_allocate_local_msgqueue(struct xpc_channel
*ch
)
57 unsigned long irq_flags
;
61 for (nentries
= ch
->local_nentries
; nentries
> 0; nentries
--) {
63 nbytes
= nentries
* ch
->msg_size
;
64 ch
->local_msgqueue
= xpc_kzalloc_cacheline_aligned(nbytes
,
66 &ch
->local_msgqueue_base
);
67 if (ch
->local_msgqueue
== NULL
)
70 nbytes
= nentries
* sizeof(struct xpc_notify
);
71 ch
->notify_queue
= kzalloc(nbytes
, GFP_KERNEL
);
72 if (ch
->notify_queue
== NULL
) {
73 kfree(ch
->local_msgqueue_base
);
74 ch
->local_msgqueue
= NULL
;
78 spin_lock_irqsave(&ch
->lock
, irq_flags
);
79 if (nentries
< ch
->local_nentries
) {
80 dev_dbg(xpc_chan
, "nentries=%d local_nentries=%d, "
81 "partid=%d, channel=%d\n", nentries
,
82 ch
->local_nentries
, ch
->partid
, ch
->number
);
84 ch
->local_nentries
= nentries
;
86 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
90 dev_dbg(xpc_chan
, "can't get memory for local message queue and notify "
91 "queue, partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
96 * Allocate the cached remote message queue.
99 xpc_allocate_remote_msgqueue(struct xpc_channel
*ch
)
101 unsigned long irq_flags
;
105 DBUG_ON(ch
->remote_nentries
<= 0);
107 for (nentries
= ch
->remote_nentries
; nentries
> 0; nentries
--) {
109 nbytes
= nentries
* ch
->msg_size
;
110 ch
->remote_msgqueue
= xpc_kzalloc_cacheline_aligned(nbytes
,
112 &ch
->remote_msgqueue_base
);
113 if (ch
->remote_msgqueue
== NULL
)
116 spin_lock_irqsave(&ch
->lock
, irq_flags
);
117 if (nentries
< ch
->remote_nentries
) {
118 dev_dbg(xpc_chan
, "nentries=%d remote_nentries=%d, "
119 "partid=%d, channel=%d\n", nentries
,
120 ch
->remote_nentries
, ch
->partid
, ch
->number
);
122 ch
->remote_nentries
= nentries
;
124 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
128 dev_dbg(xpc_chan
, "can't get memory for cached remote message queue, "
129 "partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
134 * Allocate message queues and other stuff associated with a channel.
136 * Note: Assumes all of the channel sizes are filled in.
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel
*ch
)
141 unsigned long irq_flags
;
144 DBUG_ON(ch
->flags
& XPC_C_SETUP
);
146 ret
= xpc_allocate_local_msgqueue(ch
);
147 if (ret
!= xpSuccess
)
150 ret
= xpc_allocate_remote_msgqueue(ch
);
151 if (ret
!= xpSuccess
) {
152 kfree(ch
->local_msgqueue_base
);
153 ch
->local_msgqueue
= NULL
;
154 kfree(ch
->notify_queue
);
155 ch
->notify_queue
= NULL
;
159 spin_lock_irqsave(&ch
->lock
, irq_flags
);
160 ch
->flags
|= XPC_C_SETUP
;
161 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
167 * Process a connect message from a remote partition.
169 * Note: xpc_process_connect() is expecting to be called with the
170 * spin_lock_irqsave held and will leave it locked upon return.
173 xpc_process_connect(struct xpc_channel
*ch
, unsigned long *irq_flags
)
177 DBUG_ON(!spin_is_locked(&ch
->lock
));
179 if (!(ch
->flags
& XPC_C_OPENREQUEST
) ||
180 !(ch
->flags
& XPC_C_ROPENREQUEST
)) {
181 /* nothing more to do for now */
184 DBUG_ON(!(ch
->flags
& XPC_C_CONNECTING
));
186 if (!(ch
->flags
& XPC_C_SETUP
)) {
187 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
188 ret
= xpc_allocate_msgqueues(ch
);
189 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
191 if (ret
!= xpSuccess
)
192 XPC_DISCONNECT_CHANNEL(ch
, ret
, irq_flags
);
194 if (ch
->flags
& (XPC_C_CONNECTED
| XPC_C_DISCONNECTING
))
197 DBUG_ON(!(ch
->flags
& XPC_C_SETUP
));
198 DBUG_ON(ch
->local_msgqueue
== NULL
);
199 DBUG_ON(ch
->remote_msgqueue
== NULL
);
202 if (!(ch
->flags
& XPC_C_OPENREPLY
)) {
203 ch
->flags
|= XPC_C_OPENREPLY
;
204 xpc_IPI_send_openreply(ch
, irq_flags
);
207 if (!(ch
->flags
& XPC_C_ROPENREPLY
))
210 DBUG_ON(ch
->remote_msgqueue_pa
== 0);
212 ch
->flags
= (XPC_C_CONNECTED
| XPC_C_SETUP
); /* clear all else */
214 dev_info(xpc_chan
, "channel %d to partition %d connected\n",
215 ch
->number
, ch
->partid
);
217 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
218 xpc_create_kthreads(ch
, 1, 0);
219 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
223 * Notify those who wanted to be notified upon delivery of their message.
226 xpc_notify_senders(struct xpc_channel
*ch
, enum xp_retval reason
, s64 put
)
228 struct xpc_notify
*notify
;
230 s64 get
= ch
->w_remote_GP
.get
- 1;
232 while (++get
< put
&& atomic_read(&ch
->n_to_notify
) > 0) {
234 notify
= &ch
->notify_queue
[get
% ch
->local_nentries
];
237 * See if the notify entry indicates it was associated with
238 * a message who's sender wants to be notified. It is possible
239 * that it is, but someone else is doing or has done the
242 notify_type
= notify
->type
;
243 if (notify_type
== 0 ||
244 cmpxchg(¬ify
->type
, notify_type
, 0) != notify_type
) {
248 DBUG_ON(notify_type
!= XPC_N_CALL
);
250 atomic_dec(&ch
->n_to_notify
);
252 if (notify
->func
!= NULL
) {
253 dev_dbg(xpc_chan
, "notify->func() called, notify=0x%p, "
254 "msg_number=%ld, partid=%d, channel=%d\n",
255 (void *)notify
, get
, ch
->partid
, ch
->number
);
257 notify
->func(reason
, ch
->partid
, ch
->number
,
260 dev_dbg(xpc_chan
, "notify->func() returned, "
261 "notify=0x%p, msg_number=%ld, partid=%d, "
262 "channel=%d\n", (void *)notify
, get
,
263 ch
->partid
, ch
->number
);
269 * Free up message queues and other stuff that were allocated for the specified
272 * Note: ch->reason and ch->reason_line are left set for debugging purposes,
273 * they're cleared when XPC_C_DISCONNECTED is cleared.
276 xpc_free_msgqueues(struct xpc_channel
*ch
)
278 DBUG_ON(!spin_is_locked(&ch
->lock
));
279 DBUG_ON(atomic_read(&ch
->n_to_notify
) != 0);
281 ch
->remote_msgqueue_pa
= 0;
285 ch
->local_nentries
= 0;
286 ch
->remote_nentries
= 0;
287 ch
->kthreads_assigned_limit
= 0;
288 ch
->kthreads_idle_limit
= 0;
290 ch
->local_GP
->get
= 0;
291 ch
->local_GP
->put
= 0;
292 ch
->remote_GP
.get
= 0;
293 ch
->remote_GP
.put
= 0;
294 ch
->w_local_GP
.get
= 0;
295 ch
->w_local_GP
.put
= 0;
296 ch
->w_remote_GP
.get
= 0;
297 ch
->w_remote_GP
.put
= 0;
298 ch
->next_msg_to_pull
= 0;
300 if (ch
->flags
& XPC_C_SETUP
) {
301 ch
->flags
&= ~XPC_C_SETUP
;
303 dev_dbg(xpc_chan
, "ch->flags=0x%x, partid=%d, channel=%d\n",
304 ch
->flags
, ch
->partid
, ch
->number
);
306 kfree(ch
->local_msgqueue_base
);
307 ch
->local_msgqueue
= NULL
;
308 kfree(ch
->remote_msgqueue_base
);
309 ch
->remote_msgqueue
= NULL
;
310 kfree(ch
->notify_queue
);
311 ch
->notify_queue
= NULL
;
316 * spin_lock_irqsave() is expected to be held on entry.
319 xpc_process_disconnect(struct xpc_channel
*ch
, unsigned long *irq_flags
)
321 struct xpc_partition
*part
= &xpc_partitions
[ch
->partid
];
322 u32 channel_was_connected
= (ch
->flags
& XPC_C_WASCONNECTED
);
324 DBUG_ON(!spin_is_locked(&ch
->lock
));
326 if (!(ch
->flags
& XPC_C_DISCONNECTING
))
329 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREQUEST
));
331 /* make sure all activity has settled down first */
333 if (atomic_read(&ch
->kthreads_assigned
) > 0 ||
334 atomic_read(&ch
->references
) > 0) {
337 DBUG_ON((ch
->flags
& XPC_C_CONNECTEDCALLOUT_MADE
) &&
338 !(ch
->flags
& XPC_C_DISCONNECTINGCALLOUT_MADE
));
340 if (part
->act_state
== XPC_P_DEACTIVATING
) {
341 /* can't proceed until the other side disengages from us */
342 if (xpc_partition_engaged(1UL << ch
->partid
))
347 /* as long as the other side is up do the full protocol */
349 if (!(ch
->flags
& XPC_C_RCLOSEREQUEST
))
352 if (!(ch
->flags
& XPC_C_CLOSEREPLY
)) {
353 ch
->flags
|= XPC_C_CLOSEREPLY
;
354 xpc_IPI_send_closereply(ch
, irq_flags
);
357 if (!(ch
->flags
& XPC_C_RCLOSEREPLY
))
361 /* wake those waiting for notify completion */
362 if (atomic_read(&ch
->n_to_notify
) > 0) {
363 /* >>> we do callout while holding ch->lock */
364 xpc_notify_senders(ch
, ch
->reason
, ch
->w_local_GP
.put
);
367 /* both sides are disconnected now */
369 if (ch
->flags
& XPC_C_DISCONNECTINGCALLOUT_MADE
) {
370 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
371 xpc_disconnect_callout(ch
, xpDisconnected
);
372 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
375 /* it's now safe to free the channel's message queues */
376 xpc_free_msgqueues(ch
);
378 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
379 ch
->flags
= (XPC_C_DISCONNECTED
| (ch
->flags
& XPC_C_WDISCONNECT
));
381 atomic_dec(&part
->nchannels_active
);
383 if (channel_was_connected
) {
384 dev_info(xpc_chan
, "channel %d to partition %d disconnected, "
385 "reason=%d\n", ch
->number
, ch
->partid
, ch
->reason
);
388 if (ch
->flags
& XPC_C_WDISCONNECT
) {
389 /* we won't lose the CPU since we're holding ch->lock */
390 complete(&ch
->wdisconnect_wait
);
391 } else if (ch
->delayed_IPI_flags
) {
392 if (part
->act_state
!= XPC_P_DEACTIVATING
) {
393 /* time to take action on any delayed IPI flags */
394 spin_lock(&part
->IPI_lock
);
395 XPC_SET_IPI_FLAGS(part
->local_IPI_amo
, ch
->number
,
396 ch
->delayed_IPI_flags
);
397 spin_unlock(&part
->IPI_lock
);
399 ch
->delayed_IPI_flags
= 0;
404 * Process a change in the channel's remote connection state.
407 xpc_process_openclose_IPI(struct xpc_partition
*part
, int ch_number
,
410 unsigned long irq_flags
;
411 struct xpc_openclose_args
*args
=
412 &part
->remote_openclose_args
[ch_number
];
413 struct xpc_channel
*ch
= &part
->channels
[ch_number
];
414 enum xp_retval reason
;
416 spin_lock_irqsave(&ch
->lock
, irq_flags
);
420 if ((ch
->flags
& XPC_C_DISCONNECTED
) &&
421 (ch
->flags
& XPC_C_WDISCONNECT
)) {
423 * Delay processing IPI flags until thread waiting disconnect
424 * has had a chance to see that the channel is disconnected.
426 ch
->delayed_IPI_flags
|= IPI_flags
;
427 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
431 if (IPI_flags
& XPC_IPI_CLOSEREQUEST
) {
433 dev_dbg(xpc_chan
, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
434 "from partid=%d, channel=%d\n", args
->reason
,
435 ch
->partid
, ch
->number
);
438 * If RCLOSEREQUEST is set, we're probably waiting for
439 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
440 * with this RCLOSEREQUEST in the IPI_flags.
443 if (ch
->flags
& XPC_C_RCLOSEREQUEST
) {
444 DBUG_ON(!(ch
->flags
& XPC_C_DISCONNECTING
));
445 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREQUEST
));
446 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREPLY
));
447 DBUG_ON(ch
->flags
& XPC_C_RCLOSEREPLY
);
449 DBUG_ON(!(IPI_flags
& XPC_IPI_CLOSEREPLY
));
450 IPI_flags
&= ~XPC_IPI_CLOSEREPLY
;
451 ch
->flags
|= XPC_C_RCLOSEREPLY
;
453 /* both sides have finished disconnecting */
454 xpc_process_disconnect(ch
, &irq_flags
);
455 DBUG_ON(!(ch
->flags
& XPC_C_DISCONNECTED
));
459 if (ch
->flags
& XPC_C_DISCONNECTED
) {
460 if (!(IPI_flags
& XPC_IPI_OPENREQUEST
)) {
461 if ((XPC_GET_IPI_FLAGS(part
->local_IPI_amo
,
463 XPC_IPI_OPENREQUEST
)) {
465 DBUG_ON(ch
->delayed_IPI_flags
!= 0);
466 spin_lock(&part
->IPI_lock
);
467 XPC_SET_IPI_FLAGS(part
->local_IPI_amo
,
469 XPC_IPI_CLOSEREQUEST
);
470 spin_unlock(&part
->IPI_lock
);
472 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
476 XPC_SET_REASON(ch
, 0, 0);
477 ch
->flags
&= ~XPC_C_DISCONNECTED
;
479 atomic_inc(&part
->nchannels_active
);
480 ch
->flags
|= (XPC_C_CONNECTING
| XPC_C_ROPENREQUEST
);
483 IPI_flags
&= ~(XPC_IPI_OPENREQUEST
| XPC_IPI_OPENREPLY
);
486 * The meaningful CLOSEREQUEST connection state fields are:
487 * reason = reason connection is to be closed
490 ch
->flags
|= XPC_C_RCLOSEREQUEST
;
492 if (!(ch
->flags
& XPC_C_DISCONNECTING
)) {
493 reason
= args
->reason
;
494 if (reason
<= xpSuccess
|| reason
> xpUnknownReason
)
495 reason
= xpUnknownReason
;
496 else if (reason
== xpUnregistering
)
497 reason
= xpOtherUnregistering
;
499 XPC_DISCONNECT_CHANNEL(ch
, reason
, &irq_flags
);
501 DBUG_ON(IPI_flags
& XPC_IPI_CLOSEREPLY
);
502 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
506 xpc_process_disconnect(ch
, &irq_flags
);
509 if (IPI_flags
& XPC_IPI_CLOSEREPLY
) {
511 dev_dbg(xpc_chan
, "XPC_IPI_CLOSEREPLY received from partid=%d,"
512 " channel=%d\n", ch
->partid
, ch
->number
);
514 if (ch
->flags
& XPC_C_DISCONNECTED
) {
515 DBUG_ON(part
->act_state
!= XPC_P_DEACTIVATING
);
516 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
520 DBUG_ON(!(ch
->flags
& XPC_C_CLOSEREQUEST
));
522 if (!(ch
->flags
& XPC_C_RCLOSEREQUEST
)) {
523 if ((XPC_GET_IPI_FLAGS(part
->local_IPI_amo
, ch_number
)
524 & XPC_IPI_CLOSEREQUEST
)) {
526 DBUG_ON(ch
->delayed_IPI_flags
!= 0);
527 spin_lock(&part
->IPI_lock
);
528 XPC_SET_IPI_FLAGS(part
->local_IPI_amo
,
531 spin_unlock(&part
->IPI_lock
);
533 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
537 ch
->flags
|= XPC_C_RCLOSEREPLY
;
539 if (ch
->flags
& XPC_C_CLOSEREPLY
) {
540 /* both sides have finished disconnecting */
541 xpc_process_disconnect(ch
, &irq_flags
);
545 if (IPI_flags
& XPC_IPI_OPENREQUEST
) {
547 dev_dbg(xpc_chan
, "XPC_IPI_OPENREQUEST (msg_size=%d, "
548 "local_nentries=%d) received from partid=%d, "
549 "channel=%d\n", args
->msg_size
, args
->local_nentries
,
550 ch
->partid
, ch
->number
);
552 if (part
->act_state
== XPC_P_DEACTIVATING
||
553 (ch
->flags
& XPC_C_ROPENREQUEST
)) {
554 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
558 if (ch
->flags
& (XPC_C_DISCONNECTING
| XPC_C_WDISCONNECT
)) {
559 ch
->delayed_IPI_flags
|= XPC_IPI_OPENREQUEST
;
560 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
563 DBUG_ON(!(ch
->flags
& (XPC_C_DISCONNECTED
|
564 XPC_C_OPENREQUEST
)));
565 DBUG_ON(ch
->flags
& (XPC_C_ROPENREQUEST
| XPC_C_ROPENREPLY
|
566 XPC_C_OPENREPLY
| XPC_C_CONNECTED
));
569 * The meaningful OPENREQUEST connection state fields are:
570 * msg_size = size of channel's messages in bytes
571 * local_nentries = remote partition's local_nentries
573 if (args
->msg_size
== 0 || args
->local_nentries
== 0) {
574 /* assume OPENREQUEST was delayed by mistake */
575 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
579 ch
->flags
|= (XPC_C_ROPENREQUEST
| XPC_C_CONNECTING
);
580 ch
->remote_nentries
= args
->local_nentries
;
582 if (ch
->flags
& XPC_C_OPENREQUEST
) {
583 if (args
->msg_size
!= ch
->msg_size
) {
584 XPC_DISCONNECT_CHANNEL(ch
, xpUnequalMsgSizes
,
586 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
590 ch
->msg_size
= args
->msg_size
;
592 XPC_SET_REASON(ch
, 0, 0);
593 ch
->flags
&= ~XPC_C_DISCONNECTED
;
595 atomic_inc(&part
->nchannels_active
);
598 xpc_process_connect(ch
, &irq_flags
);
601 if (IPI_flags
& XPC_IPI_OPENREPLY
) {
603 dev_dbg(xpc_chan
, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
604 "local_nentries=%d, remote_nentries=%d) received from "
605 "partid=%d, channel=%d\n", args
->local_msgqueue_pa
,
606 args
->local_nentries
, args
->remote_nentries
,
607 ch
->partid
, ch
->number
);
609 if (ch
->flags
& (XPC_C_DISCONNECTING
| XPC_C_DISCONNECTED
)) {
610 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
613 if (!(ch
->flags
& XPC_C_OPENREQUEST
)) {
614 XPC_DISCONNECT_CHANNEL(ch
, xpOpenCloseError
,
616 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
620 DBUG_ON(!(ch
->flags
& XPC_C_ROPENREQUEST
));
621 DBUG_ON(ch
->flags
& XPC_C_CONNECTED
);
624 * The meaningful OPENREPLY connection state fields are:
625 * local_msgqueue_pa = physical address of remote
626 * partition's local_msgqueue
627 * local_nentries = remote partition's local_nentries
628 * remote_nentries = remote partition's remote_nentries
630 DBUG_ON(args
->local_msgqueue_pa
== 0);
631 DBUG_ON(args
->local_nentries
== 0);
632 DBUG_ON(args
->remote_nentries
== 0);
634 ch
->flags
|= XPC_C_ROPENREPLY
;
635 ch
->remote_msgqueue_pa
= args
->local_msgqueue_pa
;
637 if (args
->local_nentries
< ch
->remote_nentries
) {
638 dev_dbg(xpc_chan
, "XPC_IPI_OPENREPLY: new "
639 "remote_nentries=%d, old remote_nentries=%d, "
640 "partid=%d, channel=%d\n",
641 args
->local_nentries
, ch
->remote_nentries
,
642 ch
->partid
, ch
->number
);
644 ch
->remote_nentries
= args
->local_nentries
;
646 if (args
->remote_nentries
< ch
->local_nentries
) {
647 dev_dbg(xpc_chan
, "XPC_IPI_OPENREPLY: new "
648 "local_nentries=%d, old local_nentries=%d, "
649 "partid=%d, channel=%d\n",
650 args
->remote_nentries
, ch
->local_nentries
,
651 ch
->partid
, ch
->number
);
653 ch
->local_nentries
= args
->remote_nentries
;
656 xpc_process_connect(ch
, &irq_flags
);
659 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
663 * Attempt to establish a channel connection to a remote partition.
665 static enum xp_retval
666 xpc_connect_channel(struct xpc_channel
*ch
)
668 unsigned long irq_flags
;
669 struct xpc_registration
*registration
= &xpc_registrations
[ch
->number
];
671 if (mutex_trylock(®istration
->mutex
) == 0)
674 if (!XPC_CHANNEL_REGISTERED(ch
->number
)) {
675 mutex_unlock(®istration
->mutex
);
676 return xpUnregistered
;
679 spin_lock_irqsave(&ch
->lock
, irq_flags
);
681 DBUG_ON(ch
->flags
& XPC_C_CONNECTED
);
682 DBUG_ON(ch
->flags
& XPC_C_OPENREQUEST
);
684 if (ch
->flags
& XPC_C_DISCONNECTING
) {
685 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
686 mutex_unlock(®istration
->mutex
);
690 /* add info from the channel connect registration to the channel */
692 ch
->kthreads_assigned_limit
= registration
->assigned_limit
;
693 ch
->kthreads_idle_limit
= registration
->idle_limit
;
694 DBUG_ON(atomic_read(&ch
->kthreads_assigned
) != 0);
695 DBUG_ON(atomic_read(&ch
->kthreads_idle
) != 0);
696 DBUG_ON(atomic_read(&ch
->kthreads_active
) != 0);
698 ch
->func
= registration
->func
;
699 DBUG_ON(registration
->func
== NULL
);
700 ch
->key
= registration
->key
;
702 ch
->local_nentries
= registration
->nentries
;
704 if (ch
->flags
& XPC_C_ROPENREQUEST
) {
705 if (registration
->msg_size
!= ch
->msg_size
) {
706 /* the local and remote sides aren't the same */
709 * Because XPC_DISCONNECT_CHANNEL() can block we're
710 * forced to up the registration sema before we unlock
711 * the channel lock. But that's okay here because we're
712 * done with the part that required the registration
713 * sema. XPC_DISCONNECT_CHANNEL() requires that the
714 * channel lock be locked and will unlock and relock
715 * the channel lock as needed.
717 mutex_unlock(®istration
->mutex
);
718 XPC_DISCONNECT_CHANNEL(ch
, xpUnequalMsgSizes
,
720 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
721 return xpUnequalMsgSizes
;
724 ch
->msg_size
= registration
->msg_size
;
726 XPC_SET_REASON(ch
, 0, 0);
727 ch
->flags
&= ~XPC_C_DISCONNECTED
;
729 atomic_inc(&xpc_partitions
[ch
->partid
].nchannels_active
);
732 mutex_unlock(®istration
->mutex
);
734 /* initiate the connection */
736 ch
->flags
|= (XPC_C_OPENREQUEST
| XPC_C_CONNECTING
);
737 xpc_IPI_send_openrequest(ch
, &irq_flags
);
739 xpc_process_connect(ch
, &irq_flags
);
741 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
747 * Clear some of the msg flags in the local message queue.
750 xpc_clear_local_msgqueue_flags(struct xpc_channel
*ch
)
755 get
= ch
->w_remote_GP
.get
;
757 msg
= (struct xpc_msg
*)((u64
)ch
->local_msgqueue
+
758 (get
% ch
->local_nentries
) *
761 } while (++get
< ch
->remote_GP
.get
);
765 * Clear some of the msg flags in the remote message queue.
768 xpc_clear_remote_msgqueue_flags(struct xpc_channel
*ch
)
773 put
= ch
->w_remote_GP
.put
;
775 msg
= (struct xpc_msg
*)((u64
)ch
->remote_msgqueue
+
776 (put
% ch
->remote_nentries
) *
779 } while (++put
< ch
->remote_GP
.put
);
783 xpc_process_msg_IPI(struct xpc_partition
*part
, int ch_number
)
785 struct xpc_channel
*ch
= &part
->channels
[ch_number
];
788 ch
->remote_GP
= part
->remote_GPs
[ch_number
];
790 /* See what, if anything, has changed for each connected channel */
792 xpc_msgqueue_ref(ch
);
794 if (ch
->w_remote_GP
.get
== ch
->remote_GP
.get
&&
795 ch
->w_remote_GP
.put
== ch
->remote_GP
.put
) {
796 /* nothing changed since GPs were last pulled */
797 xpc_msgqueue_deref(ch
);
801 if (!(ch
->flags
& XPC_C_CONNECTED
)) {
802 xpc_msgqueue_deref(ch
);
807 * First check to see if messages recently sent by us have been
808 * received by the other side. (The remote GET value will have
809 * changed since we last looked at it.)
812 if (ch
->w_remote_GP
.get
!= ch
->remote_GP
.get
) {
815 * We need to notify any senders that want to be notified
816 * that their sent messages have been received by their
817 * intended recipients. We need to do this before updating
818 * w_remote_GP.get so that we don't allocate the same message
819 * queue entries prematurely (see xpc_allocate_msg()).
821 if (atomic_read(&ch
->n_to_notify
) > 0) {
823 * Notify senders that messages sent have been
824 * received and delivered by the other side.
826 xpc_notify_senders(ch
, xpMsgDelivered
,
831 * Clear msg->flags in previously sent messages, so that
832 * they're ready for xpc_allocate_msg().
834 xpc_clear_local_msgqueue_flags(ch
);
836 ch
->w_remote_GP
.get
= ch
->remote_GP
.get
;
838 dev_dbg(xpc_chan
, "w_remote_GP.get changed to %ld, partid=%d, "
839 "channel=%d\n", ch
->w_remote_GP
.get
, ch
->partid
,
843 * If anyone was waiting for message queue entries to become
844 * available, wake them up.
846 if (atomic_read(&ch
->n_on_msg_allocate_wq
) > 0)
847 wake_up(&ch
->msg_allocate_wq
);
851 * Now check for newly sent messages by the other side. (The remote
852 * PUT value will have changed since we last looked at it.)
855 if (ch
->w_remote_GP
.put
!= ch
->remote_GP
.put
) {
857 * Clear msg->flags in previously received messages, so that
858 * they're ready for xpc_get_deliverable_msg().
860 xpc_clear_remote_msgqueue_flags(ch
);
862 ch
->w_remote_GP
.put
= ch
->remote_GP
.put
;
864 dev_dbg(xpc_chan
, "w_remote_GP.put changed to %ld, partid=%d, "
865 "channel=%d\n", ch
->w_remote_GP
.put
, ch
->partid
,
868 nmsgs_sent
= ch
->w_remote_GP
.put
- ch
->w_local_GP
.get
;
869 if (nmsgs_sent
> 0) {
870 dev_dbg(xpc_chan
, "msgs waiting to be copied and "
871 "delivered=%d, partid=%d, channel=%d\n",
872 nmsgs_sent
, ch
->partid
, ch
->number
);
874 if (ch
->flags
& XPC_C_CONNECTEDCALLOUT_MADE
)
875 xpc_activate_kthreads(ch
, nmsgs_sent
);
879 xpc_msgqueue_deref(ch
);
883 xpc_process_channel_activity(struct xpc_partition
*part
)
885 unsigned long irq_flags
;
886 u64 IPI_amo
, IPI_flags
;
887 struct xpc_channel
*ch
;
891 IPI_amo
= xpc_get_IPI_flags(part
);
894 * Initiate channel connections for registered channels.
896 * For each connected channel that has pending messages activate idle
897 * kthreads and/or create new kthreads as needed.
900 for (ch_number
= 0; ch_number
< part
->nchannels
; ch_number
++) {
901 ch
= &part
->channels
[ch_number
];
904 * Process any open or close related IPI flags, and then deal
905 * with connecting or disconnecting the channel as required.
908 IPI_flags
= XPC_GET_IPI_FLAGS(IPI_amo
, ch_number
);
910 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags
))
911 xpc_process_openclose_IPI(part
, ch_number
, IPI_flags
);
913 ch_flags
= ch
->flags
; /* need an atomic snapshot of flags */
915 if (ch_flags
& XPC_C_DISCONNECTING
) {
916 spin_lock_irqsave(&ch
->lock
, irq_flags
);
917 xpc_process_disconnect(ch
, &irq_flags
);
918 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
922 if (part
->act_state
== XPC_P_DEACTIVATING
)
925 if (!(ch_flags
& XPC_C_CONNECTED
)) {
926 if (!(ch_flags
& XPC_C_OPENREQUEST
)) {
927 DBUG_ON(ch_flags
& XPC_C_SETUP
);
928 (void)xpc_connect_channel(ch
);
930 spin_lock_irqsave(&ch
->lock
, irq_flags
);
931 xpc_process_connect(ch
, &irq_flags
);
932 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
938 * Process any message related IPI flags, this may involve the
939 * activation of kthreads to deliver any pending messages sent
940 * from the other partition.
943 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags
))
944 xpc_process_msg_IPI(part
, ch_number
);
949 * XPC's heartbeat code calls this function to inform XPC that a partition is
950 * going down. XPC responds by tearing down the XPartition Communication
951 * infrastructure used for the just downed partition.
953 * XPC's heartbeat code will never call this function and xpc_partition_up()
954 * at the same time. Nor will it ever make multiple calls to either function
958 xpc_partition_going_down(struct xpc_partition
*part
, enum xp_retval reason
)
960 unsigned long irq_flags
;
962 struct xpc_channel
*ch
;
964 dev_dbg(xpc_chan
, "deactivating partition %d, reason=%d\n",
965 XPC_PARTID(part
), reason
);
967 if (!xpc_part_ref(part
)) {
968 /* infrastructure for this partition isn't currently set up */
972 /* disconnect channels associated with the partition going down */
974 for (ch_number
= 0; ch_number
< part
->nchannels
; ch_number
++) {
975 ch
= &part
->channels
[ch_number
];
977 xpc_msgqueue_ref(ch
);
978 spin_lock_irqsave(&ch
->lock
, irq_flags
);
980 XPC_DISCONNECT_CHANNEL(ch
, reason
, &irq_flags
);
982 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
983 xpc_msgqueue_deref(ch
);
986 xpc_wakeup_channel_mgr(part
);
988 xpc_part_deref(part
);
992 * Called by XP at the time of channel connection registration to cause
993 * XPC to establish connections to all currently active partitions.
996 xpc_initiate_connect(int ch_number
)
999 struct xpc_partition
*part
;
1000 struct xpc_channel
*ch
;
1002 DBUG_ON(ch_number
< 0 || ch_number
>= XPC_MAX_NCHANNELS
);
1004 for (partid
= 0; partid
< xp_max_npartitions
; partid
++) {
1005 part
= &xpc_partitions
[partid
];
1007 if (xpc_part_ref(part
)) {
1008 ch
= &part
->channels
[ch_number
];
1011 * Initiate the establishment of a connection on the
1012 * newly registered channel to the remote partition.
1014 xpc_wakeup_channel_mgr(part
);
1015 xpc_part_deref(part
);
1021 xpc_connected_callout(struct xpc_channel
*ch
)
1023 /* let the registerer know that a connection has been established */
1025 if (ch
->func
!= NULL
) {
1026 dev_dbg(xpc_chan
, "ch->func() called, reason=xpConnected, "
1027 "partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
1029 ch
->func(xpConnected
, ch
->partid
, ch
->number
,
1030 (void *)(u64
)ch
->local_nentries
, ch
->key
);
1032 dev_dbg(xpc_chan
, "ch->func() returned, reason=xpConnected, "
1033 "partid=%d, channel=%d\n", ch
->partid
, ch
->number
);
1038 * Called by XP at the time of channel connection unregistration to cause
1039 * XPC to teardown all current connections for the specified channel.
1041 * Before returning xpc_initiate_disconnect() will wait until all connections
1042 * on the specified channel have been closed/torndown. So the caller can be
1043 * assured that they will not be receiving any more callouts from XPC to the
1044 * function they registered via xpc_connect().
1048 * ch_number - channel # to unregister.
1051 xpc_initiate_disconnect(int ch_number
)
1053 unsigned long irq_flags
;
1055 struct xpc_partition
*part
;
1056 struct xpc_channel
*ch
;
1058 DBUG_ON(ch_number
< 0 || ch_number
>= XPC_MAX_NCHANNELS
);
1060 /* initiate the channel disconnect for every active partition */
1061 for (partid
= 0; partid
< xp_max_npartitions
; partid
++) {
1062 part
= &xpc_partitions
[partid
];
1064 if (xpc_part_ref(part
)) {
1065 ch
= &part
->channels
[ch_number
];
1066 xpc_msgqueue_ref(ch
);
1068 spin_lock_irqsave(&ch
->lock
, irq_flags
);
1070 if (!(ch
->flags
& XPC_C_DISCONNECTED
)) {
1071 ch
->flags
|= XPC_C_WDISCONNECT
;
1073 XPC_DISCONNECT_CHANNEL(ch
, xpUnregistering
,
1077 spin_unlock_irqrestore(&ch
->lock
, irq_flags
);
1079 xpc_msgqueue_deref(ch
);
1080 xpc_part_deref(part
);
1084 xpc_disconnect_wait(ch_number
);
1088 * To disconnect a channel, and reflect it back to all who may be waiting.
1090 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1091 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1092 * xpc_disconnect_wait().
1094 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1097 xpc_disconnect_channel(const int line
, struct xpc_channel
*ch
,
1098 enum xp_retval reason
, unsigned long *irq_flags
)
1100 u32 channel_was_connected
= (ch
->flags
& XPC_C_CONNECTED
);
1102 DBUG_ON(!spin_is_locked(&ch
->lock
));
1104 if (ch
->flags
& (XPC_C_DISCONNECTING
| XPC_C_DISCONNECTED
))
1107 DBUG_ON(!(ch
->flags
& (XPC_C_CONNECTING
| XPC_C_CONNECTED
)));
1109 dev_dbg(xpc_chan
, "reason=%d, line=%d, partid=%d, channel=%d\n",
1110 reason
, line
, ch
->partid
, ch
->number
);
1112 XPC_SET_REASON(ch
, reason
, line
);
1114 ch
->flags
|= (XPC_C_CLOSEREQUEST
| XPC_C_DISCONNECTING
);
1115 /* some of these may not have been set */
1116 ch
->flags
&= ~(XPC_C_OPENREQUEST
| XPC_C_OPENREPLY
|
1117 XPC_C_ROPENREQUEST
| XPC_C_ROPENREPLY
|
1118 XPC_C_CONNECTING
| XPC_C_CONNECTED
);
1120 xpc_IPI_send_closerequest(ch
, irq_flags
);
1122 if (channel_was_connected
)
1123 ch
->flags
|= XPC_C_WASCONNECTED
;
1125 spin_unlock_irqrestore(&ch
->lock
, *irq_flags
);
1127 /* wake all idle kthreads so they can exit */
1128 if (atomic_read(&ch
->kthreads_idle
) > 0) {
1129 wake_up_all(&ch
->idle_wq
);
1131 } else if ((ch
->flags
& XPC_C_CONNECTEDCALLOUT_MADE
) &&
1132 !(ch
->flags
& XPC_C_DISCONNECTINGCALLOUT
)) {
1133 /* start a kthread that will do the xpDisconnecting callout */
1134 xpc_create_kthreads(ch
, 1, 1);
1137 /* wake those waiting to allocate an entry from the local msg queue */
1138 if (atomic_read(&ch
->n_on_msg_allocate_wq
) > 0)
1139 wake_up(&ch
->msg_allocate_wq
);
1141 spin_lock_irqsave(&ch
->lock
, *irq_flags
);
1145 xpc_disconnect_callout(struct xpc_channel
*ch
, enum xp_retval reason
)
1148 * Let the channel's registerer know that the channel is being
1149 * disconnected. We don't want to do this if the registerer was never
1150 * informed of a connection being made.
1153 if (ch
->func
!= NULL
) {
1154 dev_dbg(xpc_chan
, "ch->func() called, reason=%d, partid=%d, "
1155 "channel=%d\n", reason
, ch
->partid
, ch
->number
);
1157 ch
->func(reason
, ch
->partid
, ch
->number
, NULL
, ch
->key
);
1159 dev_dbg(xpc_chan
, "ch->func() returned, reason=%d, partid=%d, "
1160 "channel=%d\n", reason
, ch
->partid
, ch
->number
);
1165 * Wait for a message entry to become available for the specified channel,
1166 * but don't wait any longer than 1 jiffy.
1169 xpc_allocate_msg_wait(struct xpc_channel
*ch
)
1173 if (ch
->flags
& XPC_C_DISCONNECTING
) {
1174 DBUG_ON(ch
->reason
== xpInterrupted
);
1178 atomic_inc(&ch
->n_on_msg_allocate_wq
);
1179 ret
= interruptible_sleep_on_timeout(&ch
->msg_allocate_wq
, 1);
1180 atomic_dec(&ch
->n_on_msg_allocate_wq
);
1182 if (ch
->flags
& XPC_C_DISCONNECTING
) {
1184 DBUG_ON(ch
->reason
== xpInterrupted
);
1185 } else if (ret
== 0) {
1188 ret
= xpInterrupted
;
1195 * Allocate an entry for a message from the message queue associated with the
1196 * specified channel. NOTE that this routine can sleep waiting for a message
1197 * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1201 * partid - ID of partition to which the channel is connected.
1202 * ch_number - channel #.
1203 * flags - see xpc.h for valid flags.
1204 * payload - address of the allocated payload area pointer (filled in on
1205 * return) in which the user-defined message is constructed.
1208 xpc_initiate_allocate(short partid
, int ch_number
, u32 flags
, void **payload
)
1210 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1211 enum xp_retval ret
= xpUnknownReason
;
1212 struct xpc_msg
*msg
= NULL
;
1214 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1215 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1219 if (xpc_part_ref(part
)) {
1220 ret
= xpc_allocate_msg(&part
->channels
[ch_number
], flags
, &msg
);
1221 xpc_part_deref(part
);
1224 *payload
= &msg
->payload
;
1231 * Send a message previously allocated using xpc_initiate_allocate() on the
1232 * specified channel connected to the specified partition.
1234 * This routine will not wait for the message to be received, nor will
1235 * notification be given when it does happen. Once this routine has returned
1236 * the message entry allocated via xpc_initiate_allocate() is no longer
1237 * accessable to the caller.
1239 * This routine, although called by users, does not call xpc_part_ref() to
1240 * ensure that the partition infrastructure is in place. It relies on the
1241 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1245 * partid - ID of partition to which the channel is connected.
1246 * ch_number - channel # to send message on.
1247 * payload - pointer to the payload area allocated via
1248 * xpc_initiate_allocate().
1251 xpc_initiate_send(short partid
, int ch_number
, void *payload
)
1253 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1254 struct xpc_msg
*msg
= XPC_MSG_ADDRESS(payload
);
1257 dev_dbg(xpc_chan
, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg
,
1260 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1261 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1262 DBUG_ON(msg
== NULL
);
1264 ret
= xpc_send_msg(&part
->channels
[ch_number
], msg
, 0, NULL
, NULL
);
1270 * Send a message previously allocated using xpc_initiate_allocate on the
1271 * specified channel connected to the specified partition.
1273 * This routine will not wait for the message to be sent. Once this routine
1274 * has returned the message entry allocated via xpc_initiate_allocate() is no
1275 * longer accessable to the caller.
1277 * Once the remote end of the channel has received the message, the function
1278 * passed as an argument to xpc_initiate_send_notify() will be called. This
1279 * allows the sender to free up or re-use any buffers referenced by the
1280 * message, but does NOT mean the message has been processed at the remote
1281 * end by a receiver.
1283 * If this routine returns an error, the caller's function will NOT be called.
1285 * This routine, although called by users, does not call xpc_part_ref() to
1286 * ensure that the partition infrastructure is in place. It relies on the
1287 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1291 * partid - ID of partition to which the channel is connected.
1292 * ch_number - channel # to send message on.
1293 * payload - pointer to the payload area allocated via
1294 * xpc_initiate_allocate().
1295 * func - function to call with asynchronous notification of message
1296 * receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1297 * key - user-defined key to be passed to the function when it's called.
1300 xpc_initiate_send_notify(short partid
, int ch_number
, void *payload
,
1301 xpc_notify_func func
, void *key
)
1303 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1304 struct xpc_msg
*msg
= XPC_MSG_ADDRESS(payload
);
1307 dev_dbg(xpc_chan
, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg
,
1310 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1311 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1312 DBUG_ON(msg
== NULL
);
1313 DBUG_ON(func
== NULL
);
1315 ret
= xpc_send_msg(&part
->channels
[ch_number
], msg
, XPC_N_CALL
,
1321 * Deliver a message to its intended recipient.
1324 xpc_deliver_msg(struct xpc_channel
*ch
)
1326 struct xpc_msg
*msg
;
1328 msg
= xpc_get_deliverable_msg(ch
);
1332 * This ref is taken to protect the payload itself from being
1333 * freed before the user is finished with it, which the user
1334 * indicates by calling xpc_initiate_received().
1336 xpc_msgqueue_ref(ch
);
1338 atomic_inc(&ch
->kthreads_active
);
1340 if (ch
->func
!= NULL
) {
1341 dev_dbg(xpc_chan
, "ch->func() called, msg=0x%p, "
1342 "msg_number=%ld, partid=%d, channel=%d\n",
1343 (void *)msg
, msg
->number
, ch
->partid
,
1346 /* deliver the message to its intended recipient */
1347 ch
->func(xpMsgReceived
, ch
->partid
, ch
->number
,
1348 &msg
->payload
, ch
->key
);
1350 dev_dbg(xpc_chan
, "ch->func() returned, msg=0x%p, "
1351 "msg_number=%ld, partid=%d, channel=%d\n",
1352 (void *)msg
, msg
->number
, ch
->partid
,
1356 atomic_dec(&ch
->kthreads_active
);
1361 * Acknowledge receipt of a delivered message.
1363 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1364 * that sent the message.
1366 * This function, although called by users, does not call xpc_part_ref() to
1367 * ensure that the partition infrastructure is in place. It relies on the
1368 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1372 * partid - ID of partition to which the channel is connected.
1373 * ch_number - channel # message received on.
1374 * payload - pointer to the payload area allocated via
1375 * xpc_initiate_allocate().
1378 xpc_initiate_received(short partid
, int ch_number
, void *payload
)
1380 struct xpc_partition
*part
= &xpc_partitions
[partid
];
1381 struct xpc_channel
*ch
;
1382 struct xpc_msg
*msg
= XPC_MSG_ADDRESS(payload
);
1384 DBUG_ON(partid
< 0 || partid
>= xp_max_npartitions
);
1385 DBUG_ON(ch_number
< 0 || ch_number
>= part
->nchannels
);
1387 ch
= &part
->channels
[ch_number
];
1388 xpc_received_msg(ch
, msg
);
1390 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */
1391 xpc_msgqueue_deref(ch
);