uts: make emu10k non-verbose
[unleashed.git] / kernel / os / msg.c
blob5b763e187bae6a00802dc01a144423c9a6e0bbe0
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
26 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
27 /* All Rights Reserved */
31 * Inter-Process Communication Message Facility.
33 * See os/ipc.c for a description of common IPC functionality.
35 * Resource controls
36 * -----------------
38 * Control: zone.max-msg-ids (rc_zone_msgmni)
39 * Description: Maximum number of message queue ids allowed a zone.
41 * When msgget() is used to allocate a message queue, one id is
42 * allocated. If the id allocation doesn't succeed, msgget() fails
43 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
44 * the id is deallocated.
46 * Control: project.max-msg-ids (rc_project_msgmni)
47 * Description: Maximum number of message queue ids allowed a project.
49 * When msgget() is used to allocate a message queue, one id is
50 * allocated. If the id allocation doesn't succeed, msgget() fails
51 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
52 * the id is deallocated.
54 * Control: process.max-msg-qbytes (rc_process_msgmnb)
55 * Description: Maximum number of bytes of messages on a message queue.
57 * When msgget() successfully allocates a message queue, the minimum
58 * enforced value of this limit is used to initialize msg_qbytes.
60 * Control: process.max-msg-messages (rc_process_msgtql)
61 * Description: Maximum number of messages on a message queue.
63 * When msgget() successfully allocates a message queue, the minimum
64 * enforced value of this limit is used to initialize a per-queue
65 * limit on the number of messages.
68 #include <sys/types.h>
69 #include <sys/t_lock.h>
70 #include <sys/param.h>
71 #include <sys/cred.h>
72 #include <sys/user.h>
73 #include <sys/proc.h>
74 #include <sys/time.h>
75 #include <sys/ipc.h>
76 #include <sys/ipc_impl.h>
77 #include <sys/msg.h>
78 #include <sys/msg_impl.h>
79 #include <sys/list.h>
80 #include <sys/systm.h>
81 #include <sys/sysmacros.h>
82 #include <sys/cpuvar.h>
83 #include <sys/kmem.h>
84 #include <sys/ddi.h>
85 #include <sys/errno.h>
86 #include <sys/cmn_err.h>
87 #include <sys/debug.h>
88 #include <sys/project.h>
89 #include <sys/modctl.h>
90 #include <sys/syscall.h>
91 #include <sys/policy.h>
92 #include <sys/zone.h>
94 #include <c2/audit.h>
97 * The following tunables are obsolete. Though for compatibility we
98 * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
99 * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
100 * mechanism for administrating the IPC Message facility is through the
101 * resource controls described at the top of this file.
103 size_t msginfo_msgmax = 2048; /* (obsolete) */
104 size_t msginfo_msgmnb = 4096; /* (obsolete) */
105 int msginfo_msgmni = 50; /* (obsolete) */
106 int msginfo_msgtql = 40; /* (obsolete) */
107 int msginfo_msgssz = 8; /* (obsolete) */
108 int msginfo_msgmap = 0; /* (obsolete) */
109 ushort_t msginfo_msgseg = 1024; /* (obsolete) */
111 extern rctl_hndl_t rc_zone_msgmni;
112 extern rctl_hndl_t rc_project_msgmni;
113 extern rctl_hndl_t rc_process_msgmnb;
114 extern rctl_hndl_t rc_process_msgtql;
115 static ipc_service_t *msq_svc;
116 static zone_key_t msg_zone_key;
118 static void msg_dtor(kipc_perm_t *);
119 static void msg_rmid(kipc_perm_t *);
120 static void msg_remove_zone(zoneid_t, void *);
123 * Module linkage information for the kernel.
125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
126 uintptr_t a4, uintptr_t a5);
128 static struct sysent ipcmsg_sysent = {
130 #ifdef _LP64
131 SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
132 #else
133 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
134 #endif
135 (int (*)())msgsys
138 #ifdef _SYSCALL32_IMPL
139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
140 uint32_t a4, uint32_t a5);
142 static struct sysent ipcmsg_sysent32 = {
144 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
145 (int (*)())msgsys32
147 #endif /* _SYSCALL32_IMPL */
149 static struct modlsys modlsys = {
150 &mod_syscallops, "System V message facility", &ipcmsg_sysent
153 #ifdef _SYSCALL32_IMPL
154 static struct modlsys modlsys32 = {
155 &mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
157 #endif
160 * Big Theory statement for message queue correctness
162 * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
163 * receivers who are waiting for an event. Using the cv_broadcast method
164 * resulted in negative scaling when the number of waiting receivers are large
165 * (the thundering herd problem). Instead, the receivers waiting to receive a
166 * message are now linked in a queue-like fashion and awaken one at a time in
167 * a controlled manner.
169 * Receivers can block on two different classes of waiting list:
170 * 1) "sendwait" list, which is the more complex list of the two. The
171 * receiver will be awakened by a sender posting a new message. There
172 * are two types of "sendwait" list used:
173 * a) msg_wait_snd: handles all receivers who are looking for
174 * a message type >= 0, but was unable to locate a match.
176 * slot 0: reserved for receivers that have designated they
177 * will take any message type.
178 * rest: consist of receivers requesting a specific type
179 * but the type was not present. The entries are
180 * hashed into a bucket in an attempt to keep
181 * any list search relatively short.
182 * b) msg_wait_snd_ngt: handles all receivers that have designated
183 * a negative message type. Unlike msg_wait_snd, the hash bucket
184 * serves a range of negative message types (-1 to -5, -6 to -10
185 * and so forth), where the last bucket is reserved for all the
186 * negative message types that hash outside of MSG_MAX_QNUM - 1.
187 * This is done this way to simplify the operation of locating a
188 * negative message type.
190 * 2) "copyout" list, where the receiver is awakened by another
191 * receiver after a message is copied out. This is a linked list
192 * of waiters that are awakened one at a time. Although the solution is
193 * not optimal, the complexity that would be added in for waking
194 * up the right entry far exceeds any potential pay back (too many
195 * correctness and corner case issues).
197 * The lists are doubly linked. In the case of the "sendwait"
198 * list, this allows the thread to remove itself from the list without having
199 * to traverse the list. In the case of the "copyout" list it simply allows
200 * us to use common functions with the "sendwait" list.
202 * To make sure receivers are not hung out to dry, we must guarantee:
203 * 1. If any queued message matches any receiver, then at least one
204 * matching receiver must be processing the request.
205 * 2. Blocking on the copyout queue is only temporary while messages
206 * are being copied out. The process is guaranted to wakeup
207 * when it gets to front of the queue (copyout is a FIFO).
209 * Rules for blocking and waking up:
210 * 1. A receiver entering msgrcv must examine all messages for a match
211 * before blocking on a sendwait queue.
212 * 2. If the receiver blocks because the message it chose is already
213 * being copied out, then when it wakes up needs to start start
214 * checking the messages from the beginning.
215 * 3) When ever a process returns from msgrcv for any reason, if it
216 * had attempted to copy a message or blocked waiting for a copy
217 * to complete it needs to wakeup the next receiver blocked on
218 * a copy out.
219 * 4) When a message is sent, the sender selects a process waiting
220 * for that type of message. This selection process rotates between
221 * receivers types of 0, negative and positive to prevent starvation of
222 * any one particular receiver type.
223 * 5) The following are the scenarios for processes that are awakened
224 * by a msgsnd:
225 * a) The process finds the message and is able to copy
226 * it out. Once complete, the process returns.
227 * b) The message that was sent that triggered the wakeup is no
228 * longer available (another process found the message first).
229 * We issue a wakeup on copy queue and then go back to
230 * sleep waiting for another matching message to be sent.
231 * c) The message that was supposed to be processed was
232 * already serviced by another process. However a different
233 * message is present which we can service. The message
234 * is copied and the process returns.
235 * d) The message is found, but some sort of error occurs that
236 * prevents the message from being copied. The receiver
237 * wakes up the next sender that can service this message
238 * type and returns an error to the caller.
239 * e) The message is found, but it is marked as being copied
240 * out. The receiver then goes to sleep on the copyout
241 * queue where it will be awakened again sometime in the future.
244 * 6) Whenever a message is found that matches the message type designated,
245 * but is being copied out we have to block on the copyout queue.
246 * After process copying finishes the copy out, it must wakeup (either
247 * directly or indirectly) all receivers who blocked on its copyout,
248 * so they are guaranteed a chance to examine the remaining messages.
249 * This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
250 * and so on. The chain cannot be broken. This leads to the following
251 * cases:
252 * a) A receiver is finished copying the message (or encountered)
253 * an error), the first entry on the copyout queue is woken
254 * up.
255 * b) When the receiver is woken up, it attempts to locate
256 * a message type match.
257 * c) If a message type is found and
258 * -- MSG_RCVCOPY flag is not set, the message is
259 * marked for copying out. Regardless of the copyout
260 * success the next entry on the copyout queue is
261 * awakened and the operation is completed.
262 * -- MSG_RCVCOPY is set, we simply go back to sleep again
263 * on the copyout queue.
264 * d) If the message type is not found then we wakeup the next
265 * process on the copyout queue.
266 * 7) If a msgsnd is unable to complete for of any of the following reasons
267 * a) the msgq has no space for the message
268 * b) the maximum number of messages allowed has been reached
269 * then one of two things happen:
270 * 1) If the passed in msg_flag has IPC_NOWAIT set, then
271 * an error is returned.
272 * 2) The IPC_NOWAIT bit is not set in msg_flag, then the
273 * the thread is placed to sleep until the request can be
274 * serviced.
275 * 8) When waking a thread waiting to send a message, a check is done to
276 * verify that the operation being asked for by the thread will complete.
277 * This decision making process is done in a loop where the oldest request
278 * is checked first. The search will continue until there is no more
279 * room on the msgq or we have checked all the waiters.
282 static uint_t msg_type_hash(long);
283 static int msgq_check_err(kmsqid_t *qp, int cvres);
284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
285 kmsqid_t *);
286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
287 struct msg *, struct ipcmsgbuf *, int);
288 static void msg_rcvq_wakeup_all(list_t *);
289 static void msg_wakeup_senders(kmsqid_t *);
290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
295 static struct msg *msgrcv_lookup(kmsqid_t *, long);
297 msg_select_t msg_fnd_sndr[] = {
298 { msg_fnd_any_snd, &msg_fnd_sndr[1] },
299 { msg_fnd_spc_snd, &msg_fnd_sndr[2] },
300 { msg_fnd_neg_snd, &msg_fnd_sndr[0] }
303 msg_select_t msg_fnd_rdr[1] = {
304 { msg_fnd_any_rdr, &msg_fnd_rdr[0] },
307 static struct modlinkage modlinkage = {
308 MODREV_1,
309 &modlsys,
310 #ifdef _SYSCALL32_IMPL
311 &modlsys32,
312 #endif
313 NULL
316 #define MSG_SMALL_INIT (size_t)-1
318 _init(void)
320 int result;
322 msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
323 sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
324 offsetof(ipc_rqty_t, ipcq_msgmni));
325 zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
327 if ((result = mod_install(&modlinkage)) == 0)
328 return (0);
330 (void) zone_key_delete(msg_zone_key);
331 ipcs_destroy(msq_svc);
333 return (result);
337 _fini(void)
339 return (EBUSY);
343 _info(struct modinfo *modinfop)
345 return (mod_info(&modlinkage, modinfop));
348 static void
349 msg_dtor(kipc_perm_t *perm)
351 kmsqid_t *qp = (kmsqid_t *)perm;
352 int ii;
354 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
355 ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
356 ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
357 list_destroy(&qp->msg_wait_snd[ii]);
358 list_destroy(&qp->msg_wait_snd_ngt[ii]);
360 ASSERT(list_is_empty(&qp->msg_cpy_block));
361 ASSERT(list_is_empty(&qp->msg_wait_rcv));
362 list_destroy(&qp->msg_cpy_block);
363 ASSERT(qp->msg_snd_cnt == 0);
364 ASSERT(qp->msg_cbytes == 0);
365 list_destroy(&qp->msg_list);
366 list_destroy(&qp->msg_wait_rcv);
370 #define msg_hold(mp) (mp)->msg_copycnt++
373 * msg_rele - decrement the reference count on the message. When count
374 * reaches zero, free message header and contents.
376 static void
377 msg_rele(struct msg *mp)
379 ASSERT(mp->msg_copycnt > 0);
380 if (mp->msg_copycnt-- == 1) {
381 if (mp->msg_addr)
382 kmem_free(mp->msg_addr, mp->msg_size);
383 kmem_free(mp, sizeof (struct msg));
388 * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
389 * waiting for free bytes on queue.
391 * Called with queue locked.
393 static void
394 msgunlink(kmsqid_t *qp, struct msg *mp)
396 list_remove(&qp->msg_list, mp);
397 qp->msg_qnum--;
398 qp->msg_cbytes -= mp->msg_size;
399 msg_rele(mp);
401 /* Wake up waiting writers */
402 msg_wakeup_senders(qp);
405 static void
406 msg_rmid(kipc_perm_t *perm)
408 kmsqid_t *qp = (kmsqid_t *)perm;
409 struct msg *mp;
410 int ii;
413 while ((mp = list_head(&qp->msg_list)) != NULL)
414 msgunlink(qp, mp);
415 ASSERT(qp->msg_cbytes == 0);
418 * Wake up everyone who is in a wait state of some sort
419 * for this message queue.
421 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
422 msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
423 msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
425 msg_rcvq_wakeup_all(&qp->msg_cpy_block);
426 msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
430 * msgctl system call.
432 * gets q lock (via ipc_lookup), releases before return.
433 * may call users of msg_lock
435 static int
436 msgctl(int msgid, int cmd, void *arg)
438 STRUCT_DECL(msqid_ds, ds); /* SVR4 queue work area */
439 kmsqid_t *qp; /* ptr to associated q */
440 int error;
441 struct cred *cr;
442 model_t mdl = get_udatamodel();
443 struct msqid_ds64 ds64;
444 kmutex_t *lock;
445 proc_t *pp = curproc;
447 STRUCT_INIT(ds, mdl);
448 cr = CRED();
451 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
453 switch (cmd) {
454 case IPC_SET:
455 if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
456 return (set_errno(EFAULT));
457 break;
459 case IPC_SET64:
460 if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
461 return (set_errno(EFAULT));
462 break;
464 case IPC_RMID:
465 if (error = ipc_rmid(msq_svc, msgid, cr))
466 return (set_errno(error));
467 return (0);
471 * get msqid_ds for this msgid
473 if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
474 return (set_errno(EINVAL));
476 switch (cmd) {
477 case IPC_SET:
478 if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
479 secpolicy_ipc_config(cr) != 0) {
480 mutex_exit(lock);
481 return (set_errno(EPERM));
483 if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
484 &STRUCT_BUF(ds)->msg_perm, mdl)) {
485 mutex_exit(lock);
486 return (set_errno(error));
488 qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
489 qp->msg_ctime = gethrestime_sec();
490 break;
492 case IPC_STAT:
493 if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
494 mutex_exit(lock);
495 return (set_errno(error));
498 if (qp->msg_rcv_cnt)
499 qp->msg_perm.ipc_mode |= MSG_RWAIT;
500 if (qp->msg_snd_cnt)
501 qp->msg_perm.ipc_mode |= MSG_WWAIT;
502 ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
503 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
504 STRUCT_FSETP(ds, msg_first, NULL); /* kernel addr */
505 STRUCT_FSETP(ds, msg_last, NULL);
506 STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
507 STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
508 STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
509 STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
510 STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
511 STRUCT_FSET(ds, msg_stime, qp->msg_stime);
512 STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
513 STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
514 break;
516 case IPC_SET64:
517 mutex_enter(&pp->p_lock);
518 if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
519 secpolicy_ipc_config(cr) != 0 &&
520 rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
521 ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
522 mutex_exit(&pp->p_lock);
523 mutex_exit(lock);
524 return (set_errno(EPERM));
526 mutex_exit(&pp->p_lock);
527 if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
528 &ds64.msgx_perm)) {
529 mutex_exit(lock);
530 return (set_errno(error));
532 qp->msg_qbytes = ds64.msgx_qbytes;
533 qp->msg_ctime = gethrestime_sec();
534 break;
536 case IPC_STAT64:
537 if (qp->msg_rcv_cnt)
538 qp->msg_perm.ipc_mode |= MSG_RWAIT;
539 if (qp->msg_snd_cnt)
540 qp->msg_perm.ipc_mode |= MSG_WWAIT;
541 ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
542 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
543 ds64.msgx_cbytes = qp->msg_cbytes;
544 ds64.msgx_qnum = qp->msg_qnum;
545 ds64.msgx_qbytes = qp->msg_qbytes;
546 ds64.msgx_lspid = qp->msg_lspid;
547 ds64.msgx_lrpid = qp->msg_lrpid;
548 ds64.msgx_stime = qp->msg_stime;
549 ds64.msgx_rtime = qp->msg_rtime;
550 ds64.msgx_ctime = qp->msg_ctime;
551 break;
553 default:
554 mutex_exit(lock);
555 return (set_errno(EINVAL));
558 mutex_exit(lock);
561 * Do copyout last (after releasing mutex).
563 switch (cmd) {
564 case IPC_STAT:
565 if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
566 return (set_errno(EFAULT));
567 break;
569 case IPC_STAT64:
570 if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
571 return (set_errno(EFAULT));
572 break;
575 return (0);
579 * Remove all message queues associated with a given zone. Called by
580 * zone_shutdown when the zone is halted.
582 /*ARGSUSED1*/
583 static void
584 msg_remove_zone(zoneid_t zoneid, void *arg)
586 ipc_remove_zone(msq_svc, zoneid);
590 * msgget system call.
592 static int
593 msgget(key_t key, int msgflg)
595 kmsqid_t *qp;
596 kmutex_t *lock;
597 int id, error;
598 int ii;
599 proc_t *pp = curproc;
601 top:
602 if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
603 return (set_errno(error));
605 if (IPC_FREE(&qp->msg_perm)) {
606 mutex_exit(lock);
607 mutex_exit(&pp->p_lock);
609 list_create(&qp->msg_list, sizeof (struct msg),
610 offsetof(struct msg, msg_node));
611 qp->msg_qnum = 0;
612 qp->msg_lspid = qp->msg_lrpid = 0;
613 qp->msg_stime = qp->msg_rtime = 0;
614 qp->msg_ctime = gethrestime_sec();
615 qp->msg_ngt_cnt = 0;
616 qp->msg_neg_copy = 0;
617 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
618 list_create(&qp->msg_wait_snd[ii],
619 sizeof (msgq_wakeup_t),
620 offsetof(msgq_wakeup_t, msgw_list));
621 list_create(&qp->msg_wait_snd_ngt[ii],
622 sizeof (msgq_wakeup_t),
623 offsetof(msgq_wakeup_t, msgw_list));
626 * The proper initialization of msg_lowest_type is to the
627 * highest possible value. By doing this we guarantee that
628 * when the first send happens, the lowest type will be set
629 * properly.
631 qp->msg_lowest_type = MSG_SMALL_INIT;
632 list_create(&qp->msg_cpy_block,
633 sizeof (msgq_wakeup_t),
634 offsetof(msgq_wakeup_t, msgw_list));
635 list_create(&qp->msg_wait_rcv,
636 sizeof (msgq_wakeup_t),
637 offsetof(msgq_wakeup_t, msgw_list));
638 qp->msg_fnd_sndr = &msg_fnd_sndr[0];
639 qp->msg_fnd_rdr = &msg_fnd_rdr[0];
640 qp->msg_rcv_cnt = 0;
641 qp->msg_snd_cnt = 0;
642 qp->msg_snd_smallest = MSG_SMALL_INIT;
644 if (error = ipc_commit_begin(msq_svc, key, msgflg,
645 (kipc_perm_t *)qp)) {
646 if (error == EAGAIN)
647 goto top;
648 return (set_errno(error));
650 qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
651 pp->p_rctls, pp);
652 qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
653 pp->p_rctls, pp);
654 lock = ipc_commit_end(msq_svc, &qp->msg_perm);
657 if (AU_AUDITING())
658 audit_ipcget(AT_IPC_MSG, (void *)qp);
660 id = qp->msg_perm.ipc_id;
661 mutex_exit(lock);
662 return (id);
665 static ssize_t
666 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
668 struct msg *smp; /* ptr to best msg on q */
669 kmsqid_t *qp; /* ptr to associated q */
670 kmutex_t *lock;
671 size_t xtsz; /* transfer byte count */
672 int error = 0;
673 int cvres;
674 uint_t msg_hash;
675 msgq_wakeup_t msg_entry;
677 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
679 msg_hash = msg_type_hash(msgtyp);
680 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
681 return ((ssize_t)set_errno(EINVAL));
683 ipc_hold(msq_svc, (kipc_perm_t *)qp);
685 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
686 goto msgrcv_out;
690 * Various information (including the condvar_t) required for the
691 * process to sleep is provided by it's stack.
693 msg_entry.msgw_thrd = curthread;
694 msg_entry.msgw_snd_wake = 0;
695 msg_entry.msgw_type = msgtyp;
696 findmsg:
697 smp = msgrcv_lookup(qp, msgtyp);
699 if (smp) {
701 * We found a possible message to copy out.
703 if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
704 long t = msg_entry.msgw_snd_wake;
705 long copy_type = smp->msg_type;
708 * It is available, attempt to copy it.
710 error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
711 smp, msgp, msgflg);
714 * It is possible to consume a different message
715 * type then what originally awakened for (negative
716 * types). If this happens a check must be done to
717 * to determine if another receiver is available
718 * for the waking message type, Failure to do this
719 * can result in a message on the queue that can be
720 * serviced by a sleeping receiver.
722 if (!error && t && (copy_type != t))
723 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
726 * Don't forget to wakeup a sleeper that blocked because
727 * we were copying things out.
729 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
730 goto msgrcv_out;
733 * The selected message is being copied out, so block. We do
734 * not need to wake the next person up on the msg_cpy_block list
735 * due to the fact some one is copying out and they will get
736 * things moving again once the copy is completed.
738 cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
739 &msg_entry, &lock, qp);
740 error = msgq_check_err(qp, cvres);
741 if (error) {
742 goto msgrcv_out;
744 goto findmsg;
747 * There isn't a message to copy out that matches the designated
748 * criteria.
750 if (msgflg & IPC_NOWAIT) {
751 error = ENOMSG;
752 goto msgrcv_out;
754 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
757 * Wait for new message. We keep the negative and positive types
758 * separate for performance reasons.
760 msg_entry.msgw_snd_wake = 0;
761 if (msgtyp >= 0) {
762 cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
763 &msg_entry, &lock, qp);
764 } else {
765 qp->msg_ngt_cnt++;
766 cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
767 &msg_entry, &lock, qp);
768 qp->msg_ngt_cnt--;
771 if (!(error = msgq_check_err(qp, cvres))) {
772 goto findmsg;
775 msgrcv_out:
776 if (error) {
777 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
778 if (msg_entry.msgw_snd_wake) {
779 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
780 msg_entry.msgw_snd_wake);
782 ipc_rele(msq_svc, (kipc_perm_t *)qp);
783 return ((ssize_t)set_errno(error));
785 ipc_rele(msq_svc, (kipc_perm_t *)qp);
786 return ((ssize_t)xtsz);
789 static int
790 msgq_check_err(kmsqid_t *qp, int cvres)
792 if (IPC_FREE(&qp->msg_perm)) {
793 return (EIDRM);
796 if (cvres == 0) {
797 return (EINTR);
800 return (0);
803 static int
804 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
805 size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
807 size_t xtsz;
808 STRUCT_HANDLE(ipcmsgbuf, umsgp);
809 model_t mdl = get_udatamodel();
810 int copyerror = 0;
812 STRUCT_SET_HANDLE(umsgp, mdl, msgp);
813 if (msgsz < smp->msg_size) {
814 if ((msgflg & MSG_NOERROR) == 0) {
815 return (E2BIG);
816 } else {
817 xtsz = msgsz;
819 } else {
820 xtsz = smp->msg_size;
822 *xtsz_ret = xtsz;
825 * To prevent a DOS attack we mark the message as being
826 * copied out and release mutex. When the copy is completed
827 * we need to acquire the mutex and make the appropriate updates.
829 ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
830 smp->msg_flags |= MSG_RCVCOPY;
831 msg_hold(smp);
832 if (msgtyp < 0) {
833 ASSERT(qp->msg_neg_copy == 0);
834 qp->msg_neg_copy = 1;
836 mutex_exit(*lock);
838 if (mdl == DATAMODEL_NATIVE) {
839 copyerror = copyout(&smp->msg_type, msgp,
840 sizeof (smp->msg_type));
841 } else {
843 * 32-bit callers need an imploded msg type.
845 int32_t msg_type32 = smp->msg_type;
847 copyerror = copyout(&msg_type32, msgp,
848 sizeof (msg_type32));
851 if (copyerror == 0 && xtsz) {
852 copyerror = copyout(smp->msg_addr,
853 STRUCT_FADDR(umsgp, mtext), xtsz);
857 * Reclaim the mutex and make sure the message queue still exists.
860 *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
861 if (msgtyp < 0) {
862 qp->msg_neg_copy = 0;
864 ASSERT(smp->msg_flags & MSG_RCVCOPY);
865 smp->msg_flags &= ~MSG_RCVCOPY;
866 msg_rele(smp);
867 if (IPC_FREE(&qp->msg_perm)) {
868 return (EIDRM);
870 if (copyerror) {
871 return (EFAULT);
873 qp->msg_lrpid = ttoproc(curthread)->p_pid;
874 qp->msg_rtime = gethrestime_sec();
875 msgunlink(qp, smp);
876 return (0);
879 static struct msg *
880 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
882 struct msg *smp = NULL;
883 long qp_low;
884 struct msg *mp; /* ptr to msg on q */
885 long low_msgtype;
886 static struct msg neg_copy_smp;
888 mp = list_head(&qp->msg_list);
889 if (msgtyp == 0) {
890 smp = mp;
891 } else {
892 qp_low = qp->msg_lowest_type;
893 if (msgtyp > 0) {
895 * If our lowest possible message type is larger than
896 * the message type desired, then we know there is
897 * no entry present.
899 if (qp_low > msgtyp) {
900 return (NULL);
903 for (; mp; mp = list_next(&qp->msg_list, mp)) {
904 if (msgtyp == mp->msg_type) {
905 smp = mp;
906 break;
909 } else {
911 * We have kept track of the lowest possible message
912 * type on the send queue. This allows us to terminate
913 * the search early if we find a message type of that
914 * type. Note, the lowest type may not be the actual
915 * lowest value in the system, it is only guaranteed
916 * that there isn't a value lower than that.
918 low_msgtype = -msgtyp;
919 if (low_msgtype < qp_low) {
920 return (NULL);
922 if (qp->msg_neg_copy) {
923 neg_copy_smp.msg_flags = MSG_RCVCOPY;
924 return (&neg_copy_smp);
926 for (; mp; mp = list_next(&qp->msg_list, mp)) {
927 if (mp->msg_type <= low_msgtype &&
928 !(smp && smp->msg_type <= mp->msg_type)) {
929 smp = mp;
930 low_msgtype = mp->msg_type;
931 if (low_msgtype == qp_low) {
932 break;
936 if (smp) {
938 * Update the lowest message type.
940 qp->msg_lowest_type = smp->msg_type;
944 return (smp);
948 * msgids system call.
950 static int
951 msgids(int *buf, uint_t nids, uint_t *pnids)
953 int error;
955 if (error = ipc_ids(msq_svc, buf, nids, pnids))
956 return (set_errno(error));
958 return (0);
961 #define RND(x) roundup((x), sizeof (size_t))
962 #define RND32(x) roundup((x), sizeof (size32_t))
965 * msgsnap system call.
967 static int
968 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
970 struct msg *mp; /* ptr to msg on q */
971 kmsqid_t *qp; /* ptr to associated q */
972 kmutex_t *lock;
973 size_t size;
974 size_t nmsg;
975 struct msg **snaplist;
976 int error, i;
977 model_t mdl = get_udatamodel();
978 STRUCT_DECL(msgsnap_head, head);
979 STRUCT_DECL(msgsnap_mhead, mhead);
981 STRUCT_INIT(head, mdl);
982 STRUCT_INIT(mhead, mdl);
984 if (bufsz < STRUCT_SIZE(head))
985 return (set_errno(EINVAL));
987 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
988 return (set_errno(EINVAL));
990 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
991 mutex_exit(lock);
992 return (set_errno(error));
994 ipc_hold(msq_svc, (kipc_perm_t *)qp);
997 * First compute the required buffer size and
998 * the number of messages on the queue.
1000 size = nmsg = 0;
1001 for (mp = list_head(&qp->msg_list); mp;
1002 mp = list_next(&qp->msg_list, mp)) {
1003 if (msgtyp == 0 ||
1004 (msgtyp > 0 && msgtyp == mp->msg_type) ||
1005 (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1006 nmsg++;
1007 if (mdl == DATAMODEL_NATIVE)
1008 size += RND(mp->msg_size);
1009 else
1010 size += RND32(mp->msg_size);
1014 size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1015 if (size > bufsz)
1016 nmsg = 0;
1018 if (nmsg > 0) {
1020 * Mark the messages as being copied.
1022 snaplist = (struct msg **)kmem_alloc(nmsg *
1023 sizeof (struct msg *), KM_SLEEP);
1024 i = 0;
1025 for (mp = list_head(&qp->msg_list); mp;
1026 mp = list_next(&qp->msg_list, mp)) {
1027 if (msgtyp == 0 ||
1028 (msgtyp > 0 && msgtyp == mp->msg_type) ||
1029 (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1030 msg_hold(mp);
1031 snaplist[i] = mp;
1032 i++;
1036 mutex_exit(lock);
1039 * Copy out the buffer header.
1041 STRUCT_FSET(head, msgsnap_size, size);
1042 STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1043 if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1044 error = EFAULT;
1046 buf += STRUCT_SIZE(head);
1049 * Now copy out the messages one by one.
1051 for (i = 0; i < nmsg; i++) {
1052 mp = snaplist[i];
1053 if (error == 0) {
1054 STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1055 STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1056 if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1057 error = EFAULT;
1058 buf += STRUCT_SIZE(mhead);
1060 if (error == 0 &&
1061 mp->msg_size != 0 &&
1062 copyout(mp->msg_addr, buf, mp->msg_size))
1063 error = EFAULT;
1064 if (mdl == DATAMODEL_NATIVE)
1065 buf += RND(mp->msg_size);
1066 else
1067 buf += RND32(mp->msg_size);
1069 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1070 msg_rele(mp);
1071 /* Check for msg q deleted or reallocated */
1072 if (IPC_FREE(&qp->msg_perm))
1073 error = EIDRM;
1074 mutex_exit(lock);
1077 (void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1078 ipc_rele(msq_svc, (kipc_perm_t *)qp);
1080 if (nmsg > 0)
1081 kmem_free(snaplist, nmsg * sizeof (struct msg *));
1083 if (error)
1084 return (set_errno(error));
1085 return (0);
1088 #define MSG_PREALLOC_LIMIT 8192
1091 * msgsnd system call.
1093 static int
1094 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1096 kmsqid_t *qp;
1097 kmutex_t *lock = NULL;
1098 struct msg *mp = NULL;
1099 long type;
1100 int error = 0, wait_wakeup = 0;
1101 msgq_wakeup_t msg_entry;
1102 model_t mdl = get_udatamodel();
1103 STRUCT_HANDLE(ipcmsgbuf, umsgp);
1105 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
1106 STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1108 if (mdl == DATAMODEL_NATIVE) {
1109 if (copyin(msgp, &type, sizeof (type)))
1110 return (set_errno(EFAULT));
1111 } else {
1112 int32_t type32;
1113 if (copyin(msgp, &type32, sizeof (type32)))
1114 return (set_errno(EFAULT));
1115 type = type32;
1118 if (type < 1)
1119 return (set_errno(EINVAL));
1122 * We want the value here large enough that most of the
1123 * the message operations will use the "lockless" path,
1124 * but small enough that a user can not reserve large
1125 * chunks of kernel memory unless they have a valid
1126 * reason to.
1128 if (msgsz <= MSG_PREALLOC_LIMIT) {
1130 * We are small enough that we can afford to do the
1131 * allocation now. This saves dropping the lock
1132 * and then reacquiring the lock.
1134 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1135 mp->msg_copycnt = 1;
1136 mp->msg_size = msgsz;
1137 if (msgsz) {
1138 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1139 if (copyin(STRUCT_FADDR(umsgp, mtext),
1140 mp->msg_addr, msgsz) == -1) {
1141 error = EFAULT;
1142 goto msgsnd_out;
1147 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1148 error = EINVAL;
1149 goto msgsnd_out;
1152 ipc_hold(msq_svc, (kipc_perm_t *)qp);
1154 if (msgsz > qp->msg_qbytes) {
1155 error = EINVAL;
1156 goto msgsnd_out;
1159 if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1160 goto msgsnd_out;
1162 top:
1164 * Allocate space on q, message header, & buffer space.
1166 ASSERT(qp->msg_qnum <= qp->msg_qmax);
1167 while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1168 (qp->msg_qnum == qp->msg_qmax)) {
1169 int cvres;
1171 if (msgflg & IPC_NOWAIT) {
1172 error = EAGAIN;
1173 goto msgsnd_out;
1176 wait_wakeup = 0;
1177 qp->msg_snd_cnt++;
1178 msg_entry.msgw_snd_size = msgsz;
1179 msg_entry.msgw_thrd = curthread;
1180 msg_entry.msgw_type = type;
1181 cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1182 list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1183 if (qp->msg_snd_smallest > msgsz)
1184 qp->msg_snd_smallest = msgsz;
1185 cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1186 lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1187 qp->msg_snd_cnt--;
1188 if (list_link_active(&msg_entry.msgw_list))
1189 list_remove(&qp->msg_wait_rcv, &msg_entry);
1190 if (error = msgq_check_err(qp, cvres)) {
1191 goto msgsnd_out;
1193 wait_wakeup = 1;
1196 if (mp == NULL) {
1197 int failure;
1199 mutex_exit(lock);
1200 ASSERT(msgsz > 0);
1201 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1202 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1203 mp->msg_size = msgsz;
1204 mp->msg_copycnt = 1;
1206 failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1207 mp->msg_addr, msgsz) == -1);
1208 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1209 if (IPC_FREE(&qp->msg_perm)) {
1210 error = EIDRM;
1211 goto msgsnd_out;
1213 if (failure) {
1214 error = EFAULT;
1215 goto msgsnd_out;
1217 goto top;
1221 * Everything is available, put msg on q.
1223 qp->msg_qnum++;
1224 qp->msg_cbytes += msgsz;
1225 qp->msg_lspid = curproc->p_pid;
1226 qp->msg_stime = gethrestime_sec();
1227 mp->msg_type = type;
1228 if (qp->msg_lowest_type > type)
1229 qp->msg_lowest_type = type;
1230 list_insert_tail(&qp->msg_list, mp);
1232 * Get the proper receiver going.
1234 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1236 msgsnd_out:
1238 * We were woken up from the send wait list, but an
1239 * an error occured on placing the message onto the
1240 * msg queue. Given that, we need to do the wakeup
1241 * dance again.
1244 if (wait_wakeup && error) {
1245 msg_wakeup_senders(qp);
1247 if (lock)
1248 ipc_rele(msq_svc, (kipc_perm_t *)qp); /* drops lock */
1250 if (error) {
1251 if (mp)
1252 msg_rele(mp);
1253 return (set_errno(error));
1256 return (0);
1259 static void
1260 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1262 msg_select_t *walker = *flist;
1263 msgq_wakeup_t *wakeup;
1264 uint_t msg_hash;
1266 msg_hash = msg_type_hash(type);
1268 do {
1269 wakeup = walker->selection(qp, msg_hash, type);
1270 walker = walker->next_selection;
1271 } while (!wakeup && walker != *flist);
1273 *flist = (*flist)->next_selection;
1274 if (wakeup) {
1275 if (type) {
1276 wakeup->msgw_snd_wake = type;
1278 cv_signal(&wakeup->msgw_wake_cv);
1282 static uint_t
1283 msg_type_hash(long msg_type)
1285 if (msg_type < 0) {
1286 long hash = -msg_type / MSG_NEG_INTERVAL;
1288 * Negative message types are hashed over an
1289 * interval. Any message type that hashes
1290 * beyond MSG_MAX_QNUM is automatically placed
1291 * in the last bucket.
1293 if (hash > MSG_MAX_QNUM)
1294 hash = MSG_MAX_QNUM;
1295 return (hash);
1299 * 0 or positive message type. The first bucket is reserved for
1300 * message receivers of type 0, the other buckets we hash into.
1302 if (msg_type)
1303 return (1 + (msg_type % MSG_MAX_QNUM));
1304 return (0);
1308 * Routines to see if we have a receiver of type 0 either blocked waiting
1309 * for a message. Simply return the first guy on the list.
1312 static msgq_wakeup_t *
1313 /* ARGSUSED */
1314 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1316 msgq_wakeup_t *walker;
1318 walker = list_head(&qp->msg_wait_snd[0]);
1320 if (walker)
1321 list_remove(&qp->msg_wait_snd[0], walker);
1322 return (walker);
1325 static msgq_wakeup_t *
1326 /* ARGSUSED */
1327 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1329 msgq_wakeup_t *walker;
1331 walker = list_head(&qp->msg_cpy_block);
1332 if (walker)
1333 list_remove(&qp->msg_cpy_block, walker);
1334 return (walker);
1337 static msgq_wakeup_t *
1338 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1340 msgq_wakeup_t *walker;
1342 walker = list_head(&qp->msg_wait_snd[msg_hash]);
1344 while (walker && walker->msgw_type != type)
1345 walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1346 if (walker)
1347 list_remove(&qp->msg_wait_snd[msg_hash], walker);
1348 return (walker);
1351 /* ARGSUSED */
1352 static msgq_wakeup_t *
1353 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1355 msgq_wakeup_t *qptr;
1356 int count;
1357 int check_index;
1358 int neg_index;
1359 int nbuckets;
1361 if (!qp->msg_ngt_cnt) {
1362 return (NULL);
1364 neg_index = msg_type_hash(-type);
1367 * Check for a match among the negative type queues. Any buckets
1368 * at neg_index or larger can match the type. Use the last send
1369 * time to randomize the starting bucket to prevent starvation.
1370 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1371 * from the random starting point, and wrapping around after
1372 * MSG_MAX_QNUM.
1375 nbuckets = MSG_MAX_QNUM - neg_index + 1;
1376 check_index = neg_index + (qp->msg_stime % nbuckets);
1378 for (count = nbuckets; count > 0; count--) {
1379 qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1380 while (qptr) {
1382 * The lowest hash bucket may actually contain
1383 * message types that are not valid for this
1384 * request. This can happen due to the fact that
1385 * the message buckets actually contain a consecutive
1386 * range of types.
1388 if (-qptr->msgw_type >= type) {
1389 list_remove(&qp->msg_wait_snd_ngt[check_index],
1390 qptr);
1391 return (qptr);
1393 qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1394 qptr);
1396 if (++check_index > MSG_MAX_QNUM) {
1397 check_index = neg_index;
1400 return (NULL);
1403 static int
1404 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1405 kmsqid_t *qp)
1407 int cvres;
1409 cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1411 list_insert_tail(queue, entry);
1413 qp->msg_rcv_cnt++;
1414 cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1415 *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1416 qp->msg_rcv_cnt--;
1418 if (list_link_active(&entry->msgw_list)) {
1420 * We woke up unexpectedly, remove ourself.
1422 list_remove(queue, entry);
1425 return (cvres);
1428 static void
1429 msg_rcvq_wakeup_all(list_t *q_ptr)
1431 msgq_wakeup_t *q_walk;
1433 while (q_walk = list_head(q_ptr)) {
1434 list_remove(q_ptr, q_walk);
1435 cv_signal(&q_walk->msgw_wake_cv);
1440 * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1441 * system calls.
1443 static ssize_t
1444 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1445 uintptr_t a4, uintptr_t a5)
1447 ssize_t error;
1449 switch (opcode) {
1450 case MSGGET:
1451 error = msgget((key_t)a1, (int)a2);
1452 break;
1453 case MSGCTL:
1454 error = msgctl((int)a1, (int)a2, (void *)a3);
1455 break;
1456 case MSGRCV:
1457 error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1458 (size_t)a3, (long)a4, (int)a5);
1459 break;
1460 case MSGSND:
1461 error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1462 (size_t)a3, (int)a4);
1463 break;
1464 case MSGIDS:
1465 error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1466 break;
1467 case MSGSNAP:
1468 error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1469 break;
1470 default:
1471 error = set_errno(EINVAL);
1472 break;
1475 return (error);
1479 * Determine if a writer who is waiting can process its message. If so
1480 * wake it up.
1482 static void
1483 msg_wakeup_senders(kmsqid_t *qp)
1486 struct msgq_wakeup *ptr, *optr;
1487 size_t avail, smallest;
1488 int msgs_out;
1491 * Is there a writer waiting, and if so, can it be serviced? If
1492 * not return back to the caller.
1494 if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1495 return;
1497 avail = qp->msg_qbytes - qp->msg_cbytes;
1498 if (avail < qp->msg_snd_smallest)
1499 return;
1501 ptr = list_head(&qp->msg_wait_rcv);
1502 if (ptr == NULL) {
1503 qp->msg_snd_smallest = MSG_SMALL_INIT;
1504 return;
1506 optr = ptr;
1509 * smallest: minimum message size of all queued writers
1511 * avail: amount of space left on the msgq
1512 * if all the writers we have woken up are successful.
1514 * msgs_out: is the number of messages on the message queue if
1515 * all the writers we have woken up are successful.
1518 smallest = MSG_SMALL_INIT;
1519 msgs_out = qp->msg_qnum;
1520 while (ptr) {
1521 ptr = list_next(&qp->msg_wait_rcv, ptr);
1522 if (optr->msgw_snd_size <= avail) {
1523 list_remove(&qp->msg_wait_rcv, optr);
1524 avail -= optr->msgw_snd_size;
1525 cv_signal(&optr->msgw_wake_cv);
1526 msgs_out++;
1527 if (msgs_out == qp->msg_qmax ||
1528 avail < qp->msg_snd_smallest)
1529 break;
1530 } else {
1531 if (smallest > optr->msgw_snd_size)
1532 smallest = optr->msgw_snd_size;
1534 optr = ptr;
1538 * Reset the smallest message size if the entire list has been visited
1540 if (ptr == NULL && smallest != MSG_SMALL_INIT)
1541 qp->msg_snd_smallest = smallest;
1544 #ifdef _SYSCALL32_IMPL
1546 * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1547 * system calls for 32-bit callers on LP64 kernel.
1549 static ssize32_t
1550 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1551 uint32_t a4, uint32_t a5)
1553 ssize_t error;
1555 switch (opcode) {
1556 case MSGGET:
1557 error = msgget((key_t)a1, (int)a2);
1558 break;
1559 case MSGCTL:
1560 error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1561 break;
1562 case MSGRCV:
1563 error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1564 (size_t)a3, (long)(int32_t)a4, (int)a5);
1565 break;
1566 case MSGSND:
1567 error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1568 (size_t)(int32_t)a3, (int)a4);
1569 break;
1570 case MSGIDS:
1571 error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1572 (uint_t *)(uintptr_t)a3);
1573 break;
1574 case MSGSNAP:
1575 error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1576 (long)(int32_t)a4);
1577 break;
1578 default:
1579 error = set_errno(EINVAL);
1580 break;
1583 return (error);
1585 #endif /* SYSCALL32_IMPL */