sgi-xp: separate chctl_flags from XPC's notify IRQ
[linux-2.6/mini2440.git] / drivers / misc / sgi-xp / xpc_channel.c
blob0d3c153d1d0beaccdb7c8e54167b4e255b5387d1
1 /*
2 * This file is subject to the terms and conditions of the GNU General Public
3 * License. See the file "COPYING" in the main directory of this archive
4 * for more details.
6 * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved.
7 */
9 /*
10 * Cross Partition Communication (XPC) channel support.
12 * This is the part of XPC that manages the channels and
13 * sends/receives messages across them to/from other partitions.
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
25 #include "xpc.h"
28 * Guarantee that the kzalloc'd memory is cacheline aligned.
30 void *
31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
33 /* see if kzalloc will give us cachline aligned memory by default */
34 *base = kzalloc(size, flags);
35 if (*base == NULL)
36 return NULL;
38 if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
39 return *base;
41 kfree(*base);
43 /* nope, we'll have to do it ourselves */
44 *base = kzalloc(size + L1_CACHE_BYTES, flags);
45 if (*base == NULL)
46 return NULL;
48 return (void *)L1_CACHE_ALIGN((u64)*base);
52 * Allocate the local message queue and the notify queue.
54 static enum xp_retval
55 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
57 unsigned long irq_flags;
58 int nentries;
59 size_t nbytes;
61 for (nentries = ch->local_nentries; nentries > 0; nentries--) {
63 nbytes = nentries * ch->msg_size;
64 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
65 GFP_KERNEL,
66 &ch->local_msgqueue_base);
67 if (ch->local_msgqueue == NULL)
68 continue;
70 nbytes = nentries * sizeof(struct xpc_notify);
71 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
72 if (ch->notify_queue == NULL) {
73 kfree(ch->local_msgqueue_base);
74 ch->local_msgqueue = NULL;
75 continue;
78 spin_lock_irqsave(&ch->lock, irq_flags);
79 if (nentries < ch->local_nentries) {
80 dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
81 "partid=%d, channel=%d\n", nentries,
82 ch->local_nentries, ch->partid, ch->number);
84 ch->local_nentries = nentries;
86 spin_unlock_irqrestore(&ch->lock, irq_flags);
87 return xpSuccess;
90 dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
91 "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
92 return xpNoMemory;
96 * Allocate the cached remote message queue.
98 static enum xp_retval
99 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
101 unsigned long irq_flags;
102 int nentries;
103 size_t nbytes;
105 DBUG_ON(ch->remote_nentries <= 0);
107 for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
109 nbytes = nentries * ch->msg_size;
110 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
111 GFP_KERNEL,
112 &ch->remote_msgqueue_base);
113 if (ch->remote_msgqueue == NULL)
114 continue;
116 spin_lock_irqsave(&ch->lock, irq_flags);
117 if (nentries < ch->remote_nentries) {
118 dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
119 "partid=%d, channel=%d\n", nentries,
120 ch->remote_nentries, ch->partid, ch->number);
122 ch->remote_nentries = nentries;
124 spin_unlock_irqrestore(&ch->lock, irq_flags);
125 return xpSuccess;
128 dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
129 "partid=%d, channel=%d\n", ch->partid, ch->number);
130 return xpNoMemory;
134 * Allocate message queues and other stuff associated with a channel.
136 * Note: Assumes all of the channel sizes are filled in.
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel *ch)
141 unsigned long irq_flags;
142 enum xp_retval ret;
144 DBUG_ON(ch->flags & XPC_C_SETUP);
146 ret = xpc_allocate_local_msgqueue(ch);
147 if (ret != xpSuccess)
148 return ret;
150 ret = xpc_allocate_remote_msgqueue(ch);
151 if (ret != xpSuccess) {
152 kfree(ch->local_msgqueue_base);
153 ch->local_msgqueue = NULL;
154 kfree(ch->notify_queue);
155 ch->notify_queue = NULL;
156 return ret;
159 spin_lock_irqsave(&ch->lock, irq_flags);
160 ch->flags |= XPC_C_SETUP;
161 spin_unlock_irqrestore(&ch->lock, irq_flags);
163 return xpSuccess;
167 * Process a connect message from a remote partition.
169 * Note: xpc_process_connect() is expecting to be called with the
170 * spin_lock_irqsave held and will leave it locked upon return.
172 static void
173 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
175 enum xp_retval ret;
177 DBUG_ON(!spin_is_locked(&ch->lock));
179 if (!(ch->flags & XPC_C_OPENREQUEST) ||
180 !(ch->flags & XPC_C_ROPENREQUEST)) {
181 /* nothing more to do for now */
182 return;
184 DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
186 if (!(ch->flags & XPC_C_SETUP)) {
187 spin_unlock_irqrestore(&ch->lock, *irq_flags);
188 ret = xpc_allocate_msgqueues(ch);
189 spin_lock_irqsave(&ch->lock, *irq_flags);
191 if (ret != xpSuccess)
192 XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
194 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
195 return;
197 DBUG_ON(!(ch->flags & XPC_C_SETUP));
198 DBUG_ON(ch->local_msgqueue == NULL);
199 DBUG_ON(ch->remote_msgqueue == NULL);
202 if (!(ch->flags & XPC_C_OPENREPLY)) {
203 ch->flags |= XPC_C_OPENREPLY;
204 xpc_send_chctl_openreply(ch, irq_flags);
207 if (!(ch->flags & XPC_C_ROPENREPLY))
208 return;
210 DBUG_ON(ch->remote_msgqueue_pa == 0);
212 ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP); /* clear all else */
214 dev_info(xpc_chan, "channel %d to partition %d connected\n",
215 ch->number, ch->partid);
217 spin_unlock_irqrestore(&ch->lock, *irq_flags);
218 xpc_create_kthreads(ch, 1, 0);
219 spin_lock_irqsave(&ch->lock, *irq_flags);
223 * Free up message queues and other stuff that were allocated for the specified
224 * channel.
226 * Note: ch->reason and ch->reason_line are left set for debugging purposes,
227 * they're cleared when XPC_C_DISCONNECTED is cleared.
229 static void
230 xpc_free_msgqueues(struct xpc_channel *ch)
232 struct xpc_channel_sn2 *ch_sn2 = &ch->sn.sn2;
234 DBUG_ON(!spin_is_locked(&ch->lock));
235 DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
237 ch->remote_msgqueue_pa = 0;
238 ch->func = NULL;
239 ch->key = NULL;
240 ch->msg_size = 0;
241 ch->local_nentries = 0;
242 ch->remote_nentries = 0;
243 ch->kthreads_assigned_limit = 0;
244 ch->kthreads_idle_limit = 0;
246 ch_sn2->local_GP->get = 0;
247 ch_sn2->local_GP->put = 0;
248 ch_sn2->remote_GP.get = 0;
249 ch_sn2->remote_GP.put = 0;
250 ch_sn2->w_local_GP.get = 0;
251 ch_sn2->w_local_GP.put = 0;
252 ch_sn2->w_remote_GP.get = 0;
253 ch_sn2->w_remote_GP.put = 0;
254 ch_sn2->next_msg_to_pull = 0;
256 if (ch->flags & XPC_C_SETUP) {
257 ch->flags &= ~XPC_C_SETUP;
259 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
260 ch->flags, ch->partid, ch->number);
262 kfree(ch->local_msgqueue_base);
263 ch->local_msgqueue = NULL;
264 kfree(ch->remote_msgqueue_base);
265 ch->remote_msgqueue = NULL;
266 kfree(ch->notify_queue);
267 ch->notify_queue = NULL;
272 * spin_lock_irqsave() is expected to be held on entry.
274 static void
275 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
277 struct xpc_partition *part = &xpc_partitions[ch->partid];
278 u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
280 DBUG_ON(!spin_is_locked(&ch->lock));
282 if (!(ch->flags & XPC_C_DISCONNECTING))
283 return;
285 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
287 /* make sure all activity has settled down first */
289 if (atomic_read(&ch->kthreads_assigned) > 0 ||
290 atomic_read(&ch->references) > 0) {
291 return;
293 DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
294 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
296 if (part->act_state == XPC_P_DEACTIVATING) {
297 /* can't proceed until the other side disengages from us */
298 if (xpc_partition_engaged(ch->partid))
299 return;
301 } else {
303 /* as long as the other side is up do the full protocol */
305 if (!(ch->flags & XPC_C_RCLOSEREQUEST))
306 return;
308 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
309 ch->flags |= XPC_C_CLOSEREPLY;
310 xpc_send_chctl_closereply(ch, irq_flags);
313 if (!(ch->flags & XPC_C_RCLOSEREPLY))
314 return;
317 /* wake those waiting for notify completion */
318 if (atomic_read(&ch->n_to_notify) > 0) {
319 /* >>> we do callout while holding ch->lock */
320 xpc_notify_senders_of_disconnect(ch);
323 /* both sides are disconnected now */
325 if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
326 spin_unlock_irqrestore(&ch->lock, *irq_flags);
327 xpc_disconnect_callout(ch, xpDisconnected);
328 spin_lock_irqsave(&ch->lock, *irq_flags);
331 /* it's now safe to free the channel's message queues */
332 xpc_free_msgqueues(ch);
334 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
335 ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
337 atomic_dec(&part->nchannels_active);
339 if (channel_was_connected) {
340 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
341 "reason=%d\n", ch->number, ch->partid, ch->reason);
344 if (ch->flags & XPC_C_WDISCONNECT) {
345 /* we won't lose the CPU since we're holding ch->lock */
346 complete(&ch->wdisconnect_wait);
347 } else if (ch->delayed_chctl_flags) {
348 if (part->act_state != XPC_P_DEACTIVATING) {
349 /* time to take action on any delayed chctl flags */
350 spin_lock(&part->chctl_lock);
351 part->chctl.flags[ch->number] |=
352 ch->delayed_chctl_flags;
353 spin_unlock(&part->chctl_lock);
355 ch->delayed_chctl_flags = 0;
360 * Process a change in the channel's remote connection state.
362 static void
363 xpc_process_openclose_chctl_flags(struct xpc_partition *part, int ch_number,
364 u8 chctl_flags)
366 unsigned long irq_flags;
367 struct xpc_openclose_args *args =
368 &part->remote_openclose_args[ch_number];
369 struct xpc_channel *ch = &part->channels[ch_number];
370 enum xp_retval reason;
372 spin_lock_irqsave(&ch->lock, irq_flags);
374 again:
376 if ((ch->flags & XPC_C_DISCONNECTED) &&
377 (ch->flags & XPC_C_WDISCONNECT)) {
379 * Delay processing chctl flags until thread waiting disconnect
380 * has had a chance to see that the channel is disconnected.
382 ch->delayed_chctl_flags |= chctl_flags;
383 spin_unlock_irqrestore(&ch->lock, irq_flags);
384 return;
387 if (chctl_flags & XPC_CHCTL_CLOSEREQUEST) {
389 dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREQUEST (reason=%d) received "
390 "from partid=%d, channel=%d\n", args->reason,
391 ch->partid, ch->number);
394 * If RCLOSEREQUEST is set, we're probably waiting for
395 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
396 * with this RCLOSEREQUEST in the chctl_flags.
399 if (ch->flags & XPC_C_RCLOSEREQUEST) {
400 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
401 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
402 DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
403 DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
405 DBUG_ON(!(chctl_flags & XPC_CHCTL_CLOSEREPLY));
406 chctl_flags &= ~XPC_CHCTL_CLOSEREPLY;
407 ch->flags |= XPC_C_RCLOSEREPLY;
409 /* both sides have finished disconnecting */
410 xpc_process_disconnect(ch, &irq_flags);
411 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
412 goto again;
415 if (ch->flags & XPC_C_DISCONNECTED) {
416 if (!(chctl_flags & XPC_CHCTL_OPENREQUEST)) {
417 if (part->chctl.flags[ch_number] &
418 XPC_CHCTL_OPENREQUEST) {
420 DBUG_ON(ch->delayed_chctl_flags != 0);
421 spin_lock(&part->chctl_lock);
422 part->chctl.flags[ch_number] |=
423 XPC_CHCTL_CLOSEREQUEST;
424 spin_unlock(&part->chctl_lock);
426 spin_unlock_irqrestore(&ch->lock, irq_flags);
427 return;
430 XPC_SET_REASON(ch, 0, 0);
431 ch->flags &= ~XPC_C_DISCONNECTED;
433 atomic_inc(&part->nchannels_active);
434 ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
437 chctl_flags &= ~(XPC_CHCTL_OPENREQUEST | XPC_CHCTL_OPENREPLY);
440 * The meaningful CLOSEREQUEST connection state fields are:
441 * reason = reason connection is to be closed
444 ch->flags |= XPC_C_RCLOSEREQUEST;
446 if (!(ch->flags & XPC_C_DISCONNECTING)) {
447 reason = args->reason;
448 if (reason <= xpSuccess || reason > xpUnknownReason)
449 reason = xpUnknownReason;
450 else if (reason == xpUnregistering)
451 reason = xpOtherUnregistering;
453 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
455 DBUG_ON(chctl_flags & XPC_CHCTL_CLOSEREPLY);
456 spin_unlock_irqrestore(&ch->lock, irq_flags);
457 return;
460 xpc_process_disconnect(ch, &irq_flags);
463 if (chctl_flags & XPC_CHCTL_CLOSEREPLY) {
465 dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREPLY received from partid="
466 "%d, channel=%d\n", ch->partid, ch->number);
468 if (ch->flags & XPC_C_DISCONNECTED) {
469 DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
470 spin_unlock_irqrestore(&ch->lock, irq_flags);
471 return;
474 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
476 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
477 if (part->chctl.flags[ch_number] &
478 XPC_CHCTL_CLOSEREQUEST) {
480 DBUG_ON(ch->delayed_chctl_flags != 0);
481 spin_lock(&part->chctl_lock);
482 part->chctl.flags[ch_number] |=
483 XPC_CHCTL_CLOSEREPLY;
484 spin_unlock(&part->chctl_lock);
486 spin_unlock_irqrestore(&ch->lock, irq_flags);
487 return;
490 ch->flags |= XPC_C_RCLOSEREPLY;
492 if (ch->flags & XPC_C_CLOSEREPLY) {
493 /* both sides have finished disconnecting */
494 xpc_process_disconnect(ch, &irq_flags);
498 if (chctl_flags & XPC_CHCTL_OPENREQUEST) {
500 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREQUEST (msg_size=%d, "
501 "local_nentries=%d) received from partid=%d, "
502 "channel=%d\n", args->msg_size, args->local_nentries,
503 ch->partid, ch->number);
505 if (part->act_state == XPC_P_DEACTIVATING ||
506 (ch->flags & XPC_C_ROPENREQUEST)) {
507 spin_unlock_irqrestore(&ch->lock, irq_flags);
508 return;
511 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
512 ch->delayed_chctl_flags |= XPC_CHCTL_OPENREQUEST;
513 spin_unlock_irqrestore(&ch->lock, irq_flags);
514 return;
516 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
517 XPC_C_OPENREQUEST)));
518 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
519 XPC_C_OPENREPLY | XPC_C_CONNECTED));
522 * The meaningful OPENREQUEST connection state fields are:
523 * msg_size = size of channel's messages in bytes
524 * local_nentries = remote partition's local_nentries
526 if (args->msg_size == 0 || args->local_nentries == 0) {
527 /* assume OPENREQUEST was delayed by mistake */
528 spin_unlock_irqrestore(&ch->lock, irq_flags);
529 return;
532 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
533 ch->remote_nentries = args->local_nentries;
535 if (ch->flags & XPC_C_OPENREQUEST) {
536 if (args->msg_size != ch->msg_size) {
537 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
538 &irq_flags);
539 spin_unlock_irqrestore(&ch->lock, irq_flags);
540 return;
542 } else {
543 ch->msg_size = args->msg_size;
545 XPC_SET_REASON(ch, 0, 0);
546 ch->flags &= ~XPC_C_DISCONNECTED;
548 atomic_inc(&part->nchannels_active);
551 xpc_process_connect(ch, &irq_flags);
554 if (chctl_flags & XPC_CHCTL_OPENREPLY) {
556 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY (local_msgqueue_pa="
557 "0x%lx, local_nentries=%d, remote_nentries=%d) "
558 "received from partid=%d, channel=%d\n",
559 args->local_msgqueue_pa, args->local_nentries,
560 args->remote_nentries, ch->partid, ch->number);
562 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
563 spin_unlock_irqrestore(&ch->lock, irq_flags);
564 return;
566 if (!(ch->flags & XPC_C_OPENREQUEST)) {
567 XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
568 &irq_flags);
569 spin_unlock_irqrestore(&ch->lock, irq_flags);
570 return;
573 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
574 DBUG_ON(ch->flags & XPC_C_CONNECTED);
577 * The meaningful OPENREPLY connection state fields are:
578 * local_msgqueue_pa = physical address of remote
579 * partition's local_msgqueue
580 * local_nentries = remote partition's local_nentries
581 * remote_nentries = remote partition's remote_nentries
583 DBUG_ON(args->local_msgqueue_pa == 0);
584 DBUG_ON(args->local_nentries == 0);
585 DBUG_ON(args->remote_nentries == 0);
587 ch->flags |= XPC_C_ROPENREPLY;
588 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
590 if (args->local_nentries < ch->remote_nentries) {
591 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
592 "remote_nentries=%d, old remote_nentries=%d, "
593 "partid=%d, channel=%d\n",
594 args->local_nentries, ch->remote_nentries,
595 ch->partid, ch->number);
597 ch->remote_nentries = args->local_nentries;
599 if (args->remote_nentries < ch->local_nentries) {
600 dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
601 "local_nentries=%d, old local_nentries=%d, "
602 "partid=%d, channel=%d\n",
603 args->remote_nentries, ch->local_nentries,
604 ch->partid, ch->number);
606 ch->local_nentries = args->remote_nentries;
609 xpc_process_connect(ch, &irq_flags);
612 spin_unlock_irqrestore(&ch->lock, irq_flags);
616 * Attempt to establish a channel connection to a remote partition.
618 static enum xp_retval
619 xpc_connect_channel(struct xpc_channel *ch)
621 unsigned long irq_flags;
622 struct xpc_registration *registration = &xpc_registrations[ch->number];
624 if (mutex_trylock(&registration->mutex) == 0)
625 return xpRetry;
627 if (!XPC_CHANNEL_REGISTERED(ch->number)) {
628 mutex_unlock(&registration->mutex);
629 return xpUnregistered;
632 spin_lock_irqsave(&ch->lock, irq_flags);
634 DBUG_ON(ch->flags & XPC_C_CONNECTED);
635 DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
637 if (ch->flags & XPC_C_DISCONNECTING) {
638 spin_unlock_irqrestore(&ch->lock, irq_flags);
639 mutex_unlock(&registration->mutex);
640 return ch->reason;
643 /* add info from the channel connect registration to the channel */
645 ch->kthreads_assigned_limit = registration->assigned_limit;
646 ch->kthreads_idle_limit = registration->idle_limit;
647 DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
648 DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
649 DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
651 ch->func = registration->func;
652 DBUG_ON(registration->func == NULL);
653 ch->key = registration->key;
655 ch->local_nentries = registration->nentries;
657 if (ch->flags & XPC_C_ROPENREQUEST) {
658 if (registration->msg_size != ch->msg_size) {
659 /* the local and remote sides aren't the same */
662 * Because XPC_DISCONNECT_CHANNEL() can block we're
663 * forced to up the registration sema before we unlock
664 * the channel lock. But that's okay here because we're
665 * done with the part that required the registration
666 * sema. XPC_DISCONNECT_CHANNEL() requires that the
667 * channel lock be locked and will unlock and relock
668 * the channel lock as needed.
670 mutex_unlock(&registration->mutex);
671 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
672 &irq_flags);
673 spin_unlock_irqrestore(&ch->lock, irq_flags);
674 return xpUnequalMsgSizes;
676 } else {
677 ch->msg_size = registration->msg_size;
679 XPC_SET_REASON(ch, 0, 0);
680 ch->flags &= ~XPC_C_DISCONNECTED;
682 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
685 mutex_unlock(&registration->mutex);
687 /* initiate the connection */
689 ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
690 xpc_send_chctl_openrequest(ch, &irq_flags);
692 xpc_process_connect(ch, &irq_flags);
694 spin_unlock_irqrestore(&ch->lock, irq_flags);
696 return xpSuccess;
699 void
700 xpc_process_sent_chctl_flags(struct xpc_partition *part)
702 unsigned long irq_flags;
703 union xpc_channel_ctl_flags chctl;
704 struct xpc_channel *ch;
705 int ch_number;
706 u32 ch_flags;
708 chctl.all_flags = xpc_get_chctl_all_flags(part);
711 * Initiate channel connections for registered channels.
713 * For each connected channel that has pending messages activate idle
714 * kthreads and/or create new kthreads as needed.
717 for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
718 ch = &part->channels[ch_number];
721 * Process any open or close related chctl flags, and then deal
722 * with connecting or disconnecting the channel as required.
725 if (chctl.flags[ch_number] & XPC_OPENCLOSE_CHCTL_FLAGS) {
726 xpc_process_openclose_chctl_flags(part, ch_number,
727 chctl.flags[ch_number]);
730 ch_flags = ch->flags; /* need an atomic snapshot of flags */
732 if (ch_flags & XPC_C_DISCONNECTING) {
733 spin_lock_irqsave(&ch->lock, irq_flags);
734 xpc_process_disconnect(ch, &irq_flags);
735 spin_unlock_irqrestore(&ch->lock, irq_flags);
736 continue;
739 if (part->act_state == XPC_P_DEACTIVATING)
740 continue;
742 if (!(ch_flags & XPC_C_CONNECTED)) {
743 if (!(ch_flags & XPC_C_OPENREQUEST)) {
744 DBUG_ON(ch_flags & XPC_C_SETUP);
745 (void)xpc_connect_channel(ch);
746 } else {
747 spin_lock_irqsave(&ch->lock, irq_flags);
748 xpc_process_connect(ch, &irq_flags);
749 spin_unlock_irqrestore(&ch->lock, irq_flags);
751 continue;
755 * Process any message related chctl flags, this may involve
756 * the activation of kthreads to deliver any pending messages
757 * sent from the other partition.
760 if (chctl.flags[ch_number] & XPC_MSG_CHCTL_FLAGS)
761 xpc_process_msg_chctl_flags(part, ch_number);
766 * XPC's heartbeat code calls this function to inform XPC that a partition is
767 * going down. XPC responds by tearing down the XPartition Communication
768 * infrastructure used for the just downed partition.
770 * XPC's heartbeat code will never call this function and xpc_partition_up()
771 * at the same time. Nor will it ever make multiple calls to either function
772 * at the same time.
774 void
775 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
777 unsigned long irq_flags;
778 int ch_number;
779 struct xpc_channel *ch;
781 dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
782 XPC_PARTID(part), reason);
784 if (!xpc_part_ref(part)) {
785 /* infrastructure for this partition isn't currently set up */
786 return;
789 /* disconnect channels associated with the partition going down */
791 for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
792 ch = &part->channels[ch_number];
794 xpc_msgqueue_ref(ch);
795 spin_lock_irqsave(&ch->lock, irq_flags);
797 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
799 spin_unlock_irqrestore(&ch->lock, irq_flags);
800 xpc_msgqueue_deref(ch);
803 xpc_wakeup_channel_mgr(part);
805 xpc_part_deref(part);
809 * Called by XP at the time of channel connection registration to cause
810 * XPC to establish connections to all currently active partitions.
812 void
813 xpc_initiate_connect(int ch_number)
815 short partid;
816 struct xpc_partition *part;
817 struct xpc_channel *ch;
819 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
821 for (partid = 0; partid < xp_max_npartitions; partid++) {
822 part = &xpc_partitions[partid];
824 if (xpc_part_ref(part)) {
825 ch = &part->channels[ch_number];
828 * Initiate the establishment of a connection on the
829 * newly registered channel to the remote partition.
831 xpc_wakeup_channel_mgr(part);
832 xpc_part_deref(part);
837 void
838 xpc_connected_callout(struct xpc_channel *ch)
840 /* let the registerer know that a connection has been established */
842 if (ch->func != NULL) {
843 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
844 "partid=%d, channel=%d\n", ch->partid, ch->number);
846 ch->func(xpConnected, ch->partid, ch->number,
847 (void *)(u64)ch->local_nentries, ch->key);
849 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
850 "partid=%d, channel=%d\n", ch->partid, ch->number);
855 * Called by XP at the time of channel connection unregistration to cause
856 * XPC to teardown all current connections for the specified channel.
858 * Before returning xpc_initiate_disconnect() will wait until all connections
859 * on the specified channel have been closed/torndown. So the caller can be
860 * assured that they will not be receiving any more callouts from XPC to the
861 * function they registered via xpc_connect().
863 * Arguments:
865 * ch_number - channel # to unregister.
867 void
868 xpc_initiate_disconnect(int ch_number)
870 unsigned long irq_flags;
871 short partid;
872 struct xpc_partition *part;
873 struct xpc_channel *ch;
875 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
877 /* initiate the channel disconnect for every active partition */
878 for (partid = 0; partid < xp_max_npartitions; partid++) {
879 part = &xpc_partitions[partid];
881 if (xpc_part_ref(part)) {
882 ch = &part->channels[ch_number];
883 xpc_msgqueue_ref(ch);
885 spin_lock_irqsave(&ch->lock, irq_flags);
887 if (!(ch->flags & XPC_C_DISCONNECTED)) {
888 ch->flags |= XPC_C_WDISCONNECT;
890 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
891 &irq_flags);
894 spin_unlock_irqrestore(&ch->lock, irq_flags);
896 xpc_msgqueue_deref(ch);
897 xpc_part_deref(part);
901 xpc_disconnect_wait(ch_number);
905 * To disconnect a channel, and reflect it back to all who may be waiting.
907 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
908 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
909 * xpc_disconnect_wait().
911 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
913 void
914 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
915 enum xp_retval reason, unsigned long *irq_flags)
917 u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
919 DBUG_ON(!spin_is_locked(&ch->lock));
921 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
922 return;
924 DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
926 dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
927 reason, line, ch->partid, ch->number);
929 XPC_SET_REASON(ch, reason, line);
931 ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
932 /* some of these may not have been set */
933 ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
934 XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
935 XPC_C_CONNECTING | XPC_C_CONNECTED);
937 xpc_send_chctl_closerequest(ch, irq_flags);
939 if (channel_was_connected)
940 ch->flags |= XPC_C_WASCONNECTED;
942 spin_unlock_irqrestore(&ch->lock, *irq_flags);
944 /* wake all idle kthreads so they can exit */
945 if (atomic_read(&ch->kthreads_idle) > 0) {
946 wake_up_all(&ch->idle_wq);
948 } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
949 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
950 /* start a kthread that will do the xpDisconnecting callout */
951 xpc_create_kthreads(ch, 1, 1);
954 /* wake those waiting to allocate an entry from the local msg queue */
955 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
956 wake_up(&ch->msg_allocate_wq);
958 spin_lock_irqsave(&ch->lock, *irq_flags);
961 void
962 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
965 * Let the channel's registerer know that the channel is being
966 * disconnected. We don't want to do this if the registerer was never
967 * informed of a connection being made.
970 if (ch->func != NULL) {
971 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
972 "channel=%d\n", reason, ch->partid, ch->number);
974 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
976 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
977 "channel=%d\n", reason, ch->partid, ch->number);
982 * Wait for a message entry to become available for the specified channel,
983 * but don't wait any longer than 1 jiffy.
985 enum xp_retval
986 xpc_allocate_msg_wait(struct xpc_channel *ch)
988 enum xp_retval ret;
990 if (ch->flags & XPC_C_DISCONNECTING) {
991 DBUG_ON(ch->reason == xpInterrupted);
992 return ch->reason;
995 atomic_inc(&ch->n_on_msg_allocate_wq);
996 ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
997 atomic_dec(&ch->n_on_msg_allocate_wq);
999 if (ch->flags & XPC_C_DISCONNECTING) {
1000 ret = ch->reason;
1001 DBUG_ON(ch->reason == xpInterrupted);
1002 } else if (ret == 0) {
1003 ret = xpTimeout;
1004 } else {
1005 ret = xpInterrupted;
1008 return ret;
1012 * Send a message that contains the user's payload on the specified channel
1013 * connected to the specified partition.
1015 * NOTE that this routine can sleep waiting for a message entry to become
1016 * available. To not sleep, pass in the XPC_NOWAIT flag.
1018 * Once sent, this routine will not wait for the message to be received, nor
1019 * will notification be given when it does happen.
1021 * Arguments:
1023 * partid - ID of partition to which the channel is connected.
1024 * ch_number - channel # to send message on.
1025 * flags - see xp.h for valid flags.
1026 * payload - pointer to the payload which is to be sent.
1027 * payload_size - size of the payload in bytes.
1029 enum xp_retval
1030 xpc_initiate_send(short partid, int ch_number, u32 flags, void *payload,
1031 u16 payload_size)
1033 struct xpc_partition *part = &xpc_partitions[partid];
1034 enum xp_retval ret = xpUnknownReason;
1036 dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
1037 partid, ch_number);
1039 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1040 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1041 DBUG_ON(payload == NULL);
1043 if (xpc_part_ref(part)) {
1044 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
1045 payload_size, 0, NULL, NULL);
1046 xpc_part_deref(part);
1049 return ret;
1053 * Send a message that contains the user's payload on the specified channel
1054 * connected to the specified partition.
1056 * NOTE that this routine can sleep waiting for a message entry to become
1057 * available. To not sleep, pass in the XPC_NOWAIT flag.
1059 * This routine will not wait for the message to be sent or received.
1061 * Once the remote end of the channel has received the message, the function
1062 * passed as an argument to xpc_initiate_send_notify() will be called. This
1063 * allows the sender to free up or re-use any buffers referenced by the
1064 * message, but does NOT mean the message has been processed at the remote
1065 * end by a receiver.
1067 * If this routine returns an error, the caller's function will NOT be called.
1069 * Arguments:
1071 * partid - ID of partition to which the channel is connected.
1072 * ch_number - channel # to send message on.
1073 * flags - see xp.h for valid flags.
1074 * payload - pointer to the payload which is to be sent.
1075 * payload_size - size of the payload in bytes.
1076 * func - function to call with asynchronous notification of message
1077 * receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1078 * key - user-defined key to be passed to the function when it's called.
1080 enum xp_retval
1081 xpc_initiate_send_notify(short partid, int ch_number, u32 flags, void *payload,
1082 u16 payload_size, xpc_notify_func func, void *key)
1084 struct xpc_partition *part = &xpc_partitions[partid];
1085 enum xp_retval ret = xpUnknownReason;
1087 dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
1088 partid, ch_number);
1090 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1091 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1092 DBUG_ON(payload == NULL);
1093 DBUG_ON(func == NULL);
1095 if (xpc_part_ref(part)) {
1096 ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
1097 payload_size, XPC_N_CALL, func, key);
1098 xpc_part_deref(part);
1100 return ret;
1104 * Deliver a message to its intended recipient.
1106 void
1107 xpc_deliver_msg(struct xpc_channel *ch)
1109 struct xpc_msg *msg;
1111 msg = xpc_get_deliverable_msg(ch);
1112 if (msg != NULL) {
1115 * This ref is taken to protect the payload itself from being
1116 * freed before the user is finished with it, which the user
1117 * indicates by calling xpc_initiate_received().
1119 xpc_msgqueue_ref(ch);
1121 atomic_inc(&ch->kthreads_active);
1123 if (ch->func != NULL) {
1124 dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
1125 "msg_number=%ld, partid=%d, channel=%d\n",
1126 (void *)msg, msg->number, ch->partid,
1127 ch->number);
1129 /* deliver the message to its intended recipient */
1130 ch->func(xpMsgReceived, ch->partid, ch->number,
1131 &msg->payload, ch->key);
1133 dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
1134 "msg_number=%ld, partid=%d, channel=%d\n",
1135 (void *)msg, msg->number, ch->partid,
1136 ch->number);
1139 atomic_dec(&ch->kthreads_active);
1144 * Acknowledge receipt of a delivered message.
1146 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1147 * that sent the message.
1149 * This function, although called by users, does not call xpc_part_ref() to
1150 * ensure that the partition infrastructure is in place. It relies on the
1151 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1153 * Arguments:
1155 * partid - ID of partition to which the channel is connected.
1156 * ch_number - channel # message received on.
1157 * payload - pointer to the payload area allocated via
1158 * xpc_initiate_send() or xpc_initiate_send_notify().
1160 void
1161 xpc_initiate_received(short partid, int ch_number, void *payload)
1163 struct xpc_partition *part = &xpc_partitions[partid];
1164 struct xpc_channel *ch;
1165 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1167 DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1168 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1170 ch = &part->channels[ch_number];
1171 xpc_received_msg(ch, msg);
1173 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */
1174 xpc_msgqueue_deref(ch);