sgi-xp: isolate xpc_vars structure to sn2 only
[linux-2.6/linux-acpi-2.6/ibm-acpi-2.6.git] / drivers / misc / sgi-xp / xpc_channel.c
blob26c5e12c122042712c438900175e380dd047bb9e
1 /*
2 * This file is subject to the terms and conditions of the GNU General Public
3 * License. See the file "COPYING" in the main directory of this archive
4 * for more details.
6 * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved.
7 */
9 /*
10 * Cross Partition Communication (XPC) channel support.
12 * This is the part of XPC that manages the channels and
13 * sends/receives messages across them to/from other partitions.
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
25 #include "xpc.h"
28 * Guarantee that the kzalloc'd memory is cacheline aligned.
30 void *
31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
33 /* see if kzalloc will give us cachline aligned memory by default */
34 *base = kzalloc(size, flags);
35 if (*base == NULL)
36 return NULL;
38 if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
39 return *base;
41 kfree(*base);
43 /* nope, we'll have to do it ourselves */
44 *base = kzalloc(size + L1_CACHE_BYTES, flags);
45 if (*base == NULL)
46 return NULL;
48 return (void *)L1_CACHE_ALIGN((u64)*base);
52 * Allocate the local message queue and the notify queue.
54 static enum xp_retval
55 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
57 unsigned long irq_flags;
58 int nentries;
59 size_t nbytes;
61 for (nentries = ch->local_nentries; nentries > 0; nentries--) {
63 nbytes = nentries * ch->msg_size;
64 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
65 GFP_KERNEL,
66 &ch->local_msgqueue_base);
67 if (ch->local_msgqueue == NULL)
68 continue;
70 nbytes = nentries * sizeof(struct xpc_notify);
71 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
72 if (ch->notify_queue == NULL) {
73 kfree(ch->local_msgqueue_base);
74 ch->local_msgqueue = NULL;
75 continue;
78 spin_lock_irqsave(&ch->lock, irq_flags);
79 if (nentries < ch->local_nentries) {
80 dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
81 "partid=%d, channel=%d\n", nentries,
82 ch->local_nentries, ch->partid, ch->number);
84 ch->local_nentries = nentries;
86 spin_unlock_irqrestore(&ch->lock, irq_flags);
87 return xpSuccess;
90 dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
91 "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
92 return xpNoMemory;
96 * Allocate the cached remote message queue.
98 static enum xp_retval
99 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
101 unsigned long irq_flags;
102 int nentries;
103 size_t nbytes;
105 DBUG_ON(ch->remote_nentries <= 0);
107 for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
109 nbytes = nentries * ch->msg_size;
110 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
111 GFP_KERNEL,
112 &ch->remote_msgqueue_base);
113 if (ch->remote_msgqueue == NULL)
114 continue;
116 spin_lock_irqsave(&ch->lock, irq_flags);
117 if (nentries < ch->remote_nentries) {
118 dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
119 "partid=%d, channel=%d\n", nentries,
120 ch->remote_nentries, ch->partid, ch->number);
122 ch->remote_nentries = nentries;
124 spin_unlock_irqrestore(&ch->lock, irq_flags);
125 return xpSuccess;
128 dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
129 "partid=%d, channel=%d\n", ch->partid, ch->number);
130 return xpNoMemory;
134 * Allocate message queues and other stuff associated with a channel.
136 * Note: Assumes all of the channel sizes are filled in.
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel *ch)
141 unsigned long irq_flags;
142 enum xp_retval ret;
144 DBUG_ON(ch->flags & XPC_C_SETUP);
146 ret = xpc_allocate_local_msgqueue(ch);
147 if (ret != xpSuccess)
148 return ret;
150 ret = xpc_allocate_remote_msgqueue(ch);
151 if (ret != xpSuccess) {
152 kfree(ch->local_msgqueue_base);
153 ch->local_msgqueue = NULL;
154 kfree(ch->notify_queue);
155 ch->notify_queue = NULL;
156 return ret;
159 spin_lock_irqsave(&ch->lock, irq_flags);
160 ch->flags |= XPC_C_SETUP;
161 spin_unlock_irqrestore(&ch->lock, irq_flags);
163 return xpSuccess;
167 * Process a connect message from a remote partition.
169 * Note: xpc_process_connect() is expecting to be called with the
170 * spin_lock_irqsave held and will leave it locked upon return.
172 static void
173 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
175 enum xp_retval ret;
177 DBUG_ON(!spin_is_locked(&ch->lock));
179 if (!(ch->flags & XPC_C_OPENREQUEST) ||
180 !(ch->flags & XPC_C_ROPENREQUEST)) {
181 /* nothing more to do for now */
182 return;
184 DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
186 if (!(ch->flags & XPC_C_SETUP)) {
187 spin_unlock_irqrestore(&ch->lock, *irq_flags);
188 ret = xpc_allocate_msgqueues(ch);
189 spin_lock_irqsave(&ch->lock, *irq_flags);
191 if (ret != xpSuccess)
192 XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
194 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
195 return;
197 DBUG_ON(!(ch->flags & XPC_C_SETUP));
198 DBUG_ON(ch->local_msgqueue == NULL);
199 DBUG_ON(ch->remote_msgqueue == NULL);
202 if (!(ch->flags & XPC_C_OPENREPLY)) {
203 ch->flags |= XPC_C_OPENREPLY;
204 xpc_IPI_send_openreply(ch, irq_flags);
207 if (!(ch->flags & XPC_C_ROPENREPLY))
208 return;
210 DBUG_ON(ch->remote_msgqueue_pa == 0);
212 ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP); /* clear all else */
214 dev_info(xpc_chan, "channel %d to partition %d connected\n",
215 ch->number, ch->partid);
217 spin_unlock_irqrestore(&ch->lock, *irq_flags);
218 xpc_create_kthreads(ch, 1, 0);
219 spin_lock_irqsave(&ch->lock, *irq_flags);
223 * Notify those who wanted to be notified upon delivery of their message.
225 static void
226 xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put)
228 struct xpc_notify *notify;
229 u8 notify_type;
230 s64 get = ch->w_remote_GP.get - 1;
232 while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
234 notify = &ch->notify_queue[get % ch->local_nentries];
237 * See if the notify entry indicates it was associated with
238 * a message who's sender wants to be notified. It is possible
239 * that it is, but someone else is doing or has done the
240 * notification.
242 notify_type = notify->type;
243 if (notify_type == 0 ||
244 cmpxchg(&notify->type, notify_type, 0) != notify_type) {
245 continue;
248 DBUG_ON(notify_type != XPC_N_CALL);
250 atomic_dec(&ch->n_to_notify);
252 if (notify->func != NULL) {
253 dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
254 "msg_number=%ld, partid=%d, channel=%d\n",
255 (void *)notify, get, ch->partid, ch->number);
257 notify->func(reason, ch->partid, ch->number,
258 notify->key);
260 dev_dbg(xpc_chan, "notify->func() returned, "
261 "notify=0x%p, msg_number=%ld, partid=%d, "
262 "channel=%d\n", (void *)notify, get,
263 ch->partid, ch->number);
269 * Free up message queues and other stuff that were allocated for the specified
270 * channel.
272 * Note: ch->reason and ch->reason_line are left set for debugging purposes,
273 * they're cleared when XPC_C_DISCONNECTED is cleared.
275 static void
276 xpc_free_msgqueues(struct xpc_channel *ch)
278 DBUG_ON(!spin_is_locked(&ch->lock));
279 DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
281 ch->remote_msgqueue_pa = 0;
282 ch->func = NULL;
283 ch->key = NULL;
284 ch->msg_size = 0;
285 ch->local_nentries = 0;
286 ch->remote_nentries = 0;
287 ch->kthreads_assigned_limit = 0;
288 ch->kthreads_idle_limit = 0;
290 ch->local_GP->get = 0;
291 ch->local_GP->put = 0;
292 ch->remote_GP.get = 0;
293 ch->remote_GP.put = 0;
294 ch->w_local_GP.get = 0;
295 ch->w_local_GP.put = 0;
296 ch->w_remote_GP.get = 0;
297 ch->w_remote_GP.put = 0;
298 ch->next_msg_to_pull = 0;
300 if (ch->flags & XPC_C_SETUP) {
301 ch->flags &= ~XPC_C_SETUP;
303 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
304 ch->flags, ch->partid, ch->number);
306 kfree(ch->local_msgqueue_base);
307 ch->local_msgqueue = NULL;
308 kfree(ch->remote_msgqueue_base);
309 ch->remote_msgqueue = NULL;
310 kfree(ch->notify_queue);
311 ch->notify_queue = NULL;
316 * spin_lock_irqsave() is expected to be held on entry.
318 static void
319 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
321 struct xpc_partition *part = &xpc_partitions[ch->partid];
322 u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
324 DBUG_ON(!spin_is_locked(&ch->lock));
326 if (!(ch->flags & XPC_C_DISCONNECTING))
327 return;
329 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
331 /* make sure all activity has settled down first */
333 if (atomic_read(&ch->kthreads_assigned) > 0 ||
334 atomic_read(&ch->references) > 0) {
335 return;
337 DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
338 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
340 if (part->act_state == XPC_P_DEACTIVATING) {
341 /* can't proceed until the other side disengages from us */
342 if (xpc_partition_engaged(1UL << ch->partid))
343 return;
345 } else {
347 /* as long as the other side is up do the full protocol */
349 if (!(ch->flags & XPC_C_RCLOSEREQUEST))
350 return;
352 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
353 ch->flags |= XPC_C_CLOSEREPLY;
354 xpc_IPI_send_closereply(ch, irq_flags);
357 if (!(ch->flags & XPC_C_RCLOSEREPLY))
358 return;
361 /* wake those waiting for notify completion */
362 if (atomic_read(&ch->n_to_notify) > 0) {
363 /* >>> we do callout while holding ch->lock */
364 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
367 /* both sides are disconnected now */
369 if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
370 spin_unlock_irqrestore(&ch->lock, *irq_flags);
371 xpc_disconnect_callout(ch, xpDisconnected);
372 spin_lock_irqsave(&ch->lock, *irq_flags);
375 /* it's now safe to free the channel's message queues */
376 xpc_free_msgqueues(ch);
378 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
379 ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
381 atomic_dec(&part->nchannels_active);
383 if (channel_was_connected) {
384 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
385 "reason=%d\n", ch->number, ch->partid, ch->reason);
388 if (ch->flags & XPC_C_WDISCONNECT) {
389 /* we won't lose the CPU since we're holding ch->lock */
390 complete(&ch->wdisconnect_wait);
391 } else if (ch->delayed_IPI_flags) {
392 if (part->act_state != XPC_P_DEACTIVATING) {
393 /* time to take action on any delayed IPI flags */
394 spin_lock(&part->IPI_lock);
395 XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
396 ch->delayed_IPI_flags);
397 spin_unlock(&part->IPI_lock);
399 ch->delayed_IPI_flags = 0;
404 * Process a change in the channel's remote connection state.
406 static void
407 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
408 u8 IPI_flags)
410 unsigned long irq_flags;
411 struct xpc_openclose_args *args =
412 &part->remote_openclose_args[ch_number];
413 struct xpc_channel *ch = &part->channels[ch_number];
414 enum xp_retval reason;
416 spin_lock_irqsave(&ch->lock, irq_flags);
418 again:
420 if ((ch->flags & XPC_C_DISCONNECTED) &&
421 (ch->flags & XPC_C_WDISCONNECT)) {
423 * Delay processing IPI flags until thread waiting disconnect
424 * has had a chance to see that the channel is disconnected.
426 ch->delayed_IPI_flags |= IPI_flags;
427 spin_unlock_irqrestore(&ch->lock, irq_flags);
428 return;
431 if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
433 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
434 "from partid=%d, channel=%d\n", args->reason,
435 ch->partid, ch->number);
438 * If RCLOSEREQUEST is set, we're probably waiting for
439 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
440 * with this RCLOSEREQUEST in the IPI_flags.
443 if (ch->flags & XPC_C_RCLOSEREQUEST) {
444 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
445 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
446 DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
447 DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
449 DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
450 IPI_flags &= ~XPC_IPI_CLOSEREPLY;
451 ch->flags |= XPC_C_RCLOSEREPLY;
453 /* both sides have finished disconnecting */
454 xpc_process_disconnect(ch, &irq_flags);
455 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
456 goto again;
459 if (ch->flags & XPC_C_DISCONNECTED) {
460 if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
461 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
462 ch_number) &
463 XPC_IPI_OPENREQUEST)) {
465 DBUG_ON(ch->delayed_IPI_flags != 0);
466 spin_lock(&part->IPI_lock);
467 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
468 ch_number,
469 XPC_IPI_CLOSEREQUEST);
470 spin_unlock(&part->IPI_lock);
472 spin_unlock_irqrestore(&ch->lock, irq_flags);
473 return;
476 XPC_SET_REASON(ch, 0, 0);
477 ch->flags &= ~XPC_C_DISCONNECTED;
479 atomic_inc(&part->nchannels_active);
480 ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
483 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
486 * The meaningful CLOSEREQUEST connection state fields are:
487 * reason = reason connection is to be closed
490 ch->flags |= XPC_C_RCLOSEREQUEST;
492 if (!(ch->flags & XPC_C_DISCONNECTING)) {
493 reason = args->reason;
494 if (reason <= xpSuccess || reason > xpUnknownReason)
495 reason = xpUnknownReason;
496 else if (reason == xpUnregistering)
497 reason = xpOtherUnregistering;
499 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
501 DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
502 spin_unlock_irqrestore(&ch->lock, irq_flags);
503 return;
506 xpc_process_disconnect(ch, &irq_flags);
509 if (IPI_flags & XPC_IPI_CLOSEREPLY) {
511 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
512 " channel=%d\n", ch->partid, ch->number);
514 if (ch->flags & XPC_C_DISCONNECTED) {
515 DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
516 spin_unlock_irqrestore(&ch->lock, irq_flags);
517 return;
520 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
522 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
523 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
524 & XPC_IPI_CLOSEREQUEST)) {
526 DBUG_ON(ch->delayed_IPI_flags != 0);
527 spin_lock(&part->IPI_lock);
528 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
529 ch_number,
530 XPC_IPI_CLOSEREPLY);
531 spin_unlock(&part->IPI_lock);
533 spin_unlock_irqrestore(&ch->lock, irq_flags);
534 return;
537 ch->flags |= XPC_C_RCLOSEREPLY;
539 if (ch->flags & XPC_C_CLOSEREPLY) {
540 /* both sides have finished disconnecting */
541 xpc_process_disconnect(ch, &irq_flags);
545 if (IPI_flags & XPC_IPI_OPENREQUEST) {
547 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
548 "local_nentries=%d) received from partid=%d, "
549 "channel=%d\n", args->msg_size, args->local_nentries,
550 ch->partid, ch->number);
552 if (part->act_state == XPC_P_DEACTIVATING ||
553 (ch->flags & XPC_C_ROPENREQUEST)) {
554 spin_unlock_irqrestore(&ch->lock, irq_flags);
555 return;
558 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
559 ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
560 spin_unlock_irqrestore(&ch->lock, irq_flags);
561 return;
563 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
564 XPC_C_OPENREQUEST)));
565 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
566 XPC_C_OPENREPLY | XPC_C_CONNECTED));
569 * The meaningful OPENREQUEST connection state fields are:
570 * msg_size = size of channel's messages in bytes
571 * local_nentries = remote partition's local_nentries
573 if (args->msg_size == 0 || args->local_nentries == 0) {
574 /* assume OPENREQUEST was delayed by mistake */
575 spin_unlock_irqrestore(&ch->lock, irq_flags);
576 return;
579 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
580 ch->remote_nentries = args->local_nentries;
582 if (ch->flags & XPC_C_OPENREQUEST) {
583 if (args->msg_size != ch->msg_size) {
584 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
585 &irq_flags);
586 spin_unlock_irqrestore(&ch->lock, irq_flags);
587 return;
589 } else {
590 ch->msg_size = args->msg_size;
592 XPC_SET_REASON(ch, 0, 0);
593 ch->flags &= ~XPC_C_DISCONNECTED;
595 atomic_inc(&part->nchannels_active);
598 xpc_process_connect(ch, &irq_flags);
601 if (IPI_flags & XPC_IPI_OPENREPLY) {
603 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
604 "local_nentries=%d, remote_nentries=%d) received from "
605 "partid=%d, channel=%d\n", args->local_msgqueue_pa,
606 args->local_nentries, args->remote_nentries,
607 ch->partid, ch->number);
609 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
610 spin_unlock_irqrestore(&ch->lock, irq_flags);
611 return;
613 if (!(ch->flags & XPC_C_OPENREQUEST)) {
614 XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
615 &irq_flags);
616 spin_unlock_irqrestore(&ch->lock, irq_flags);
617 return;
620 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
621 DBUG_ON(ch->flags & XPC_C_CONNECTED);
624 * The meaningful OPENREPLY connection state fields are:
625 * local_msgqueue_pa = physical address of remote
626 * partition's local_msgqueue
627 * local_nentries = remote partition's local_nentries
628 * remote_nentries = remote partition's remote_nentries
630 DBUG_ON(args->local_msgqueue_pa == 0);
631 DBUG_ON(args->local_nentries == 0);
632 DBUG_ON(args->remote_nentries == 0);
634 ch->flags |= XPC_C_ROPENREPLY;
635 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
637 if (args->local_nentries < ch->remote_nentries) {
638 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
639 "remote_nentries=%d, old remote_nentries=%d, "
640 "partid=%d, channel=%d\n",
641 args->local_nentries, ch->remote_nentries,
642 ch->partid, ch->number);
644 ch->remote_nentries = args->local_nentries;
646 if (args->remote_nentries < ch->local_nentries) {
647 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
648 "local_nentries=%d, old local_nentries=%d, "
649 "partid=%d, channel=%d\n",
650 args->remote_nentries, ch->local_nentries,
651 ch->partid, ch->number);
653 ch->local_nentries = args->remote_nentries;
656 xpc_process_connect(ch, &irq_flags);
659 spin_unlock_irqrestore(&ch->lock, irq_flags);
663 * Attempt to establish a channel connection to a remote partition.
665 static enum xp_retval
666 xpc_connect_channel(struct xpc_channel *ch)
668 unsigned long irq_flags;
669 struct xpc_registration *registration = &xpc_registrations[ch->number];
671 if (mutex_trylock(&registration->mutex) == 0)
672 return xpRetry;
674 if (!XPC_CHANNEL_REGISTERED(ch->number)) {
675 mutex_unlock(&registration->mutex);
676 return xpUnregistered;
679 spin_lock_irqsave(&ch->lock, irq_flags);
681 DBUG_ON(ch->flags & XPC_C_CONNECTED);
682 DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
684 if (ch->flags & XPC_C_DISCONNECTING) {
685 spin_unlock_irqrestore(&ch->lock, irq_flags);
686 mutex_unlock(&registration->mutex);
687 return ch->reason;
690 /* add info from the channel connect registration to the channel */
692 ch->kthreads_assigned_limit = registration->assigned_limit;
693 ch->kthreads_idle_limit = registration->idle_limit;
694 DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
695 DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
696 DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
698 ch->func = registration->func;
699 DBUG_ON(registration->func == NULL);
700 ch->key = registration->key;
702 ch->local_nentries = registration->nentries;
704 if (ch->flags & XPC_C_ROPENREQUEST) {
705 if (registration->msg_size != ch->msg_size) {
706 /* the local and remote sides aren't the same */
709 * Because XPC_DISCONNECT_CHANNEL() can block we're
710 * forced to up the registration sema before we unlock
711 * the channel lock. But that's okay here because we're
712 * done with the part that required the registration
713 * sema. XPC_DISCONNECT_CHANNEL() requires that the
714 * channel lock be locked and will unlock and relock
715 * the channel lock as needed.
717 mutex_unlock(&registration->mutex);
718 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
719 &irq_flags);
720 spin_unlock_irqrestore(&ch->lock, irq_flags);
721 return xpUnequalMsgSizes;
723 } else {
724 ch->msg_size = registration->msg_size;
726 XPC_SET_REASON(ch, 0, 0);
727 ch->flags &= ~XPC_C_DISCONNECTED;
729 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
732 mutex_unlock(&registration->mutex);
734 /* initiate the connection */
736 ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
737 xpc_IPI_send_openrequest(ch, &irq_flags);
739 xpc_process_connect(ch, &irq_flags);
741 spin_unlock_irqrestore(&ch->lock, irq_flags);
743 return xpSuccess;
747 * Clear some of the msg flags in the local message queue.
749 static inline void
750 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
752 struct xpc_msg *msg;
753 s64 get;
755 get = ch->w_remote_GP.get;
756 do {
757 msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
758 (get % ch->local_nentries) *
759 ch->msg_size);
760 msg->flags = 0;
761 } while (++get < ch->remote_GP.get);
765 * Clear some of the msg flags in the remote message queue.
767 static inline void
768 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
770 struct xpc_msg *msg;
771 s64 put;
773 put = ch->w_remote_GP.put;
774 do {
775 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
776 (put % ch->remote_nentries) *
777 ch->msg_size);
778 msg->flags = 0;
779 } while (++put < ch->remote_GP.put);
782 static void
783 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
785 struct xpc_channel *ch = &part->channels[ch_number];
786 int nmsgs_sent;
788 ch->remote_GP = part->remote_GPs[ch_number];
790 /* See what, if anything, has changed for each connected channel */
792 xpc_msgqueue_ref(ch);
794 if (ch->w_remote_GP.get == ch->remote_GP.get &&
795 ch->w_remote_GP.put == ch->remote_GP.put) {
796 /* nothing changed since GPs were last pulled */
797 xpc_msgqueue_deref(ch);
798 return;
801 if (!(ch->flags & XPC_C_CONNECTED)) {
802 xpc_msgqueue_deref(ch);
803 return;
807 * First check to see if messages recently sent by us have been
808 * received by the other side. (The remote GET value will have
809 * changed since we last looked at it.)
812 if (ch->w_remote_GP.get != ch->remote_GP.get) {
815 * We need to notify any senders that want to be notified
816 * that their sent messages have been received by their
817 * intended recipients. We need to do this before updating
818 * w_remote_GP.get so that we don't allocate the same message
819 * queue entries prematurely (see xpc_allocate_msg()).
821 if (atomic_read(&ch->n_to_notify) > 0) {
823 * Notify senders that messages sent have been
824 * received and delivered by the other side.
826 xpc_notify_senders(ch, xpMsgDelivered,
827 ch->remote_GP.get);
831 * Clear msg->flags in previously sent messages, so that
832 * they're ready for xpc_allocate_msg().
834 xpc_clear_local_msgqueue_flags(ch);
836 ch->w_remote_GP.get = ch->remote_GP.get;
838 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
839 "channel=%d\n", ch->w_remote_GP.get, ch->partid,
840 ch->number);
843 * If anyone was waiting for message queue entries to become
844 * available, wake them up.
846 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
847 wake_up(&ch->msg_allocate_wq);
851 * Now check for newly sent messages by the other side. (The remote
852 * PUT value will have changed since we last looked at it.)
855 if (ch->w_remote_GP.put != ch->remote_GP.put) {
857 * Clear msg->flags in previously received messages, so that
858 * they're ready for xpc_get_deliverable_msg().
860 xpc_clear_remote_msgqueue_flags(ch);
862 ch->w_remote_GP.put = ch->remote_GP.put;
864 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
865 "channel=%d\n", ch->w_remote_GP.put, ch->partid,
866 ch->number);
868 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
869 if (nmsgs_sent > 0) {
870 dev_dbg(xpc_chan, "msgs waiting to be copied and "
871 "delivered=%d, partid=%d, channel=%d\n",
872 nmsgs_sent, ch->partid, ch->number);
874 if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)
875 xpc_activate_kthreads(ch, nmsgs_sent);
879 xpc_msgqueue_deref(ch);
882 void
883 xpc_process_channel_activity(struct xpc_partition *part)
885 unsigned long irq_flags;
886 u64 IPI_amo, IPI_flags;
887 struct xpc_channel *ch;
888 int ch_number;
889 u32 ch_flags;
891 IPI_amo = xpc_get_IPI_flags(part);
894 * Initiate channel connections for registered channels.
896 * For each connected channel that has pending messages activate idle
897 * kthreads and/or create new kthreads as needed.
900 for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
901 ch = &part->channels[ch_number];
904 * Process any open or close related IPI flags, and then deal
905 * with connecting or disconnecting the channel as required.
908 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
910 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags))
911 xpc_process_openclose_IPI(part, ch_number, IPI_flags);
913 ch_flags = ch->flags; /* need an atomic snapshot of flags */
915 if (ch_flags & XPC_C_DISCONNECTING) {
916 spin_lock_irqsave(&ch->lock, irq_flags);
917 xpc_process_disconnect(ch, &irq_flags);
918 spin_unlock_irqrestore(&ch->lock, irq_flags);
919 continue;
922 if (part->act_state == XPC_P_DEACTIVATING)
923 continue;
925 if (!(ch_flags & XPC_C_CONNECTED)) {
926 if (!(ch_flags & XPC_C_OPENREQUEST)) {
927 DBUG_ON(ch_flags & XPC_C_SETUP);
928 (void)xpc_connect_channel(ch);
929 } else {
930 spin_lock_irqsave(&ch->lock, irq_flags);
931 xpc_process_connect(ch, &irq_flags);
932 spin_unlock_irqrestore(&ch->lock, irq_flags);
934 continue;
938 * Process any message related IPI flags, this may involve the
939 * activation of kthreads to deliver any pending messages sent
940 * from the other partition.
943 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags))
944 xpc_process_msg_IPI(part, ch_number);
949 * XPC's heartbeat code calls this function to inform XPC that a partition is
950 * going down. XPC responds by tearing down the XPartition Communication
951 * infrastructure used for the just downed partition.
953 * XPC's heartbeat code will never call this function and xpc_partition_up()
954 * at the same time. Nor will it ever make multiple calls to either function
955 * at the same time.
957 void
958 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
960 unsigned long irq_flags;
961 int ch_number;
962 struct xpc_channel *ch;
964 dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
965 XPC_PARTID(part), reason);
967 if (!xpc_part_ref(part)) {
968 /* infrastructure for this partition isn't currently set up */
969 return;
972 /* disconnect channels associated with the partition going down */
974 for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
975 ch = &part->channels[ch_number];
977 xpc_msgqueue_ref(ch);
978 spin_lock_irqsave(&ch->lock, irq_flags);
980 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
982 spin_unlock_irqrestore(&ch->lock, irq_flags);
983 xpc_msgqueue_deref(ch);
986 xpc_wakeup_channel_mgr(part);
988 xpc_part_deref(part);
992 * Called by XP at the time of channel connection registration to cause
993 * XPC to establish connections to all currently active partitions.
995 void
996 xpc_initiate_connect(int ch_number)
998 short partid;
999 struct xpc_partition *part;
1000 struct xpc_channel *ch;
1002 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
1004 for (partid = 0; partid < xp_max_npartitions; partid++) {
1005 part = &xpc_partitions[partid];
1007 if (xpc_part_ref(part)) {
1008 ch = &part->channels[ch_number];
1011 * Initiate the establishment of a connection on the
1012 * newly registered channel to the remote partition.
1014 xpc_wakeup_channel_mgr(part);
1015 xpc_part_deref(part);
1020 void
1021 xpc_connected_callout(struct xpc_channel *ch)
1023 /* let the registerer know that a connection has been established */
1025 if (ch->func != NULL) {
1026 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
1027 "partid=%d, channel=%d\n", ch->partid, ch->number);
1029 ch->func(xpConnected, ch->partid, ch->number,
1030 (void *)(u64)ch->local_nentries, ch->key);
1032 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
1033 "partid=%d, channel=%d\n", ch->partid, ch->number);
1038 * Called by XP at the time of channel connection unregistration to cause
1039 * XPC to teardown all current connections for the specified channel.
1041 * Before returning xpc_initiate_disconnect() will wait until all connections
1042 * on the specified channel have been closed/torndown. So the caller can be
1043 * assured that they will not be receiving any more callouts from XPC to the
1044 * function they registered via xpc_connect().
1046 * Arguments:
1048 * ch_number - channel # to unregister.
1050 void
1051 xpc_initiate_disconnect(int ch_number)
1053 unsigned long irq_flags;
1054 short partid;
1055 struct xpc_partition *part;
1056 struct xpc_channel *ch;
1058 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
1060 /* initiate the channel disconnect for every active partition */
1061 for (partid = 0; partid < xp_max_npartitions; partid++) {
1062 part = &xpc_partitions[partid];
1064 if (xpc_part_ref(part)) {
1065 ch = &part->channels[ch_number];
1066 xpc_msgqueue_ref(ch);
1068 spin_lock_irqsave(&ch->lock, irq_flags);
1070 if (!(ch->flags & XPC_C_DISCONNECTED)) {
1071 ch->flags |= XPC_C_WDISCONNECT;
1073 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
1074 &irq_flags);
1077 spin_unlock_irqrestore(&ch->lock, irq_flags);
1079 xpc_msgqueue_deref(ch);
1080 xpc_part_deref(part);
1084 xpc_disconnect_wait(ch_number);
1088 * To disconnect a channel, and reflect it back to all who may be waiting.
1090 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1091 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1092 * xpc_disconnect_wait().
1094 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1096 void
1097 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
1098 enum xp_retval reason, unsigned long *irq_flags)
1100 u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
1102 DBUG_ON(!spin_is_locked(&ch->lock));
1104 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
1105 return;
1107 DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
1109 dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
1110 reason, line, ch->partid, ch->number);
1112 XPC_SET_REASON(ch, reason, line);
1114 ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
1115 /* some of these may not have been set */
1116 ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
1117 XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
1118 XPC_C_CONNECTING | XPC_C_CONNECTED);
1120 xpc_IPI_send_closerequest(ch, irq_flags);
1122 if (channel_was_connected)
1123 ch->flags |= XPC_C_WASCONNECTED;
1125 spin_unlock_irqrestore(&ch->lock, *irq_flags);
1127 /* wake all idle kthreads so they can exit */
1128 if (atomic_read(&ch->kthreads_idle) > 0) {
1129 wake_up_all(&ch->idle_wq);
1131 } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
1132 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
1133 /* start a kthread that will do the xpDisconnecting callout */
1134 xpc_create_kthreads(ch, 1, 1);
1137 /* wake those waiting to allocate an entry from the local msg queue */
1138 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1139 wake_up(&ch->msg_allocate_wq);
1141 spin_lock_irqsave(&ch->lock, *irq_flags);
1144 void
1145 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
1148 * Let the channel's registerer know that the channel is being
1149 * disconnected. We don't want to do this if the registerer was never
1150 * informed of a connection being made.
1153 if (ch->func != NULL) {
1154 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
1155 "channel=%d\n", reason, ch->partid, ch->number);
1157 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
1159 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
1160 "channel=%d\n", reason, ch->partid, ch->number);
1165 * Wait for a message entry to become available for the specified channel,
1166 * but don't wait any longer than 1 jiffy.
1168 enum xp_retval
1169 xpc_allocate_msg_wait(struct xpc_channel *ch)
1171 enum xp_retval ret;
1173 if (ch->flags & XPC_C_DISCONNECTING) {
1174 DBUG_ON(ch->reason == xpInterrupted);
1175 return ch->reason;
1178 atomic_inc(&ch->n_on_msg_allocate_wq);
1179 ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1180 atomic_dec(&ch->n_on_msg_allocate_wq);
1182 if (ch->flags & XPC_C_DISCONNECTING) {
1183 ret = ch->reason;
1184 DBUG_ON(ch->reason == xpInterrupted);
1185 } else if (ret == 0) {
1186 ret = xpTimeout;
1187 } else {
1188 ret = xpInterrupted;
1191 return ret;
1195 * Allocate an entry for a message from the message queue associated with the
1196 * specified channel. NOTE that this routine can sleep waiting for a message
1197 * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1199 * Arguments:
1201 * partid - ID of partition to which the channel is connected.
1202 * ch_number - channel #.
1203 * flags - see xpc.h for valid flags.
1204 * payload - address of the allocated payload area pointer (filled in on
1205 * return) in which the user-defined message is constructed.
1207 enum xp_retval
1208 xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload)
1210 struct xpc_partition *part = &xpc_partitions[partid];
1211 enum xp_retval ret = xpUnknownReason;
1212 struct xpc_msg *msg = NULL;
1214 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1215 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1217 *payload = NULL;
1219 if (xpc_part_ref(part)) {
1220 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
1221 xpc_part_deref(part);
1223 if (msg != NULL)
1224 *payload = &msg->payload;
1227 return ret;
1231 * Send a message previously allocated using xpc_initiate_allocate() on the
1232 * specified channel connected to the specified partition.
1234 * This routine will not wait for the message to be received, nor will
1235 * notification be given when it does happen. Once this routine has returned
1236 * the message entry allocated via xpc_initiate_allocate() is no longer
1237 * accessable to the caller.
1239 * This routine, although called by users, does not call xpc_part_ref() to
1240 * ensure that the partition infrastructure is in place. It relies on the
1241 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1243 * Arguments:
1245 * partid - ID of partition to which the channel is connected.
1246 * ch_number - channel # to send message on.
1247 * payload - pointer to the payload area allocated via
1248 * xpc_initiate_allocate().
1250 enum xp_retval
1251 xpc_initiate_send(short partid, int ch_number, void *payload)
1253 struct xpc_partition *part = &xpc_partitions[partid];
1254 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1255 enum xp_retval ret;
1257 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1258 partid, ch_number);
1260 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1261 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1262 DBUG_ON(msg == NULL);
1264 ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
1266 return ret;
1270 * Send a message previously allocated using xpc_initiate_allocate on the
1271 * specified channel connected to the specified partition.
1273 * This routine will not wait for the message to be sent. Once this routine
1274 * has returned the message entry allocated via xpc_initiate_allocate() is no
1275 * longer accessable to the caller.
1277 * Once the remote end of the channel has received the message, the function
1278 * passed as an argument to xpc_initiate_send_notify() will be called. This
1279 * allows the sender to free up or re-use any buffers referenced by the
1280 * message, but does NOT mean the message has been processed at the remote
1281 * end by a receiver.
1283 * If this routine returns an error, the caller's function will NOT be called.
1285 * This routine, although called by users, does not call xpc_part_ref() to
1286 * ensure that the partition infrastructure is in place. It relies on the
1287 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1289 * Arguments:
1291 * partid - ID of partition to which the channel is connected.
1292 * ch_number - channel # to send message on.
1293 * payload - pointer to the payload area allocated via
1294 * xpc_initiate_allocate().
1295 * func - function to call with asynchronous notification of message
1296 * receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1297 * key - user-defined key to be passed to the function when it's called.
1299 enum xp_retval
1300 xpc_initiate_send_notify(short partid, int ch_number, void *payload,
1301 xpc_notify_func func, void *key)
1303 struct xpc_partition *part = &xpc_partitions[partid];
1304 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1305 enum xp_retval ret;
1307 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1308 partid, ch_number);
1310 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1311 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1312 DBUG_ON(msg == NULL);
1313 DBUG_ON(func == NULL);
1315 ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
1316 func, key);
1317 return ret;
1321 * Deliver a message to its intended recipient.
1323 void
1324 xpc_deliver_msg(struct xpc_channel *ch)
1326 struct xpc_msg *msg;
1328 msg = xpc_get_deliverable_msg(ch);
1329 if (msg != NULL) {
1332 * This ref is taken to protect the payload itself from being
1333 * freed before the user is finished with it, which the user
1334 * indicates by calling xpc_initiate_received().
1336 xpc_msgqueue_ref(ch);
1338 atomic_inc(&ch->kthreads_active);
1340 if (ch->func != NULL) {
1341 dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
1342 "msg_number=%ld, partid=%d, channel=%d\n",
1343 (void *)msg, msg->number, ch->partid,
1344 ch->number);
1346 /* deliver the message to its intended recipient */
1347 ch->func(xpMsgReceived, ch->partid, ch->number,
1348 &msg->payload, ch->key);
1350 dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
1351 "msg_number=%ld, partid=%d, channel=%d\n",
1352 (void *)msg, msg->number, ch->partid,
1353 ch->number);
1356 atomic_dec(&ch->kthreads_active);
1361 * Acknowledge receipt of a delivered message.
1363 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1364 * that sent the message.
1366 * This function, although called by users, does not call xpc_part_ref() to
1367 * ensure that the partition infrastructure is in place. It relies on the
1368 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1370 * Arguments:
1372 * partid - ID of partition to which the channel is connected.
1373 * ch_number - channel # message received on.
1374 * payload - pointer to the payload area allocated via
1375 * xpc_initiate_allocate().
1377 void
1378 xpc_initiate_received(short partid, int ch_number, void *payload)
1380 struct xpc_partition *part = &xpc_partitions[partid];
1381 struct xpc_channel *ch;
1382 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1384 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1385 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1387 ch = &part->channels[ch_number];
1388 xpc_received_msg(ch, msg);
1390 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */
1391 xpc_msgqueue_deref(ch);