add POSIX_MQ#shift helper method
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blobca0cab8316ef5d454fad495788da12ce37247a58
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #include <time.h>
14 #include <mqueue.h>
15 #include <fcntl.h>
16 #include <sys/stat.h>
17 #include <errno.h>
18 #include <assert.h>
19 #include <unistd.h>
21 #if defined(__linux__)
22 # define MQD_TO_FD(mqd) (int)(mqd)
23 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
24 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
25 #else
26 # warning mqd_t is not select()-able on your OS
27 # define MQ_IO_MARK(mq) ((void)(0))
28 # define MQ_IO_SET(mq,val) ((void)(0))
29 #endif
31 #ifdef MQD_TO_FD
32 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
33 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
34 #endif
36 struct posix_mq {
37 mqd_t des;
38 long msgsize;
39 VALUE name;
40 VALUE thread;
41 #ifdef MQD_TO_FD
42 VALUE io;
43 #endif
46 static VALUE cPOSIX_MQ, cAttr;
47 static ID id_new, id_kill, id_fileno;
48 static ID sym_r, sym_w, sym_rw;
49 static const mqd_t MQD_INVALID = (mqd_t)-1;
51 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
52 #ifndef RSTRING_PTR
53 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
54 #endif
55 #ifndef RSTRING_LEN
56 # define RSTRING_LEN(s) (RSTRING(s)->len)
57 #endif
58 #ifndef RSTRUCT_PTR
59 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
60 #endif
61 #ifndef RSTRUCT_LEN
62 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
63 #endif
65 #ifndef HAVE_RB_STR_SET_LEN
66 # ifdef RUBINIUS
67 # define rb_str_set_len(str,len) rb_str_resize(str,len)
68 # else /* 1.8.6 optimized version */
69 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
70 static void rb_18_str_set_len(VALUE str, long len)
72 RSTRING(str)->len = len;
73 RSTRING(str)->ptr[len] = '\0';
74 rb_str_flush(str);
76 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
77 # endif /* ! RUBINIUS */
78 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
80 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
81 static VALUE rb_struct_alloc_noinit(VALUE class)
83 return rb_funcall(class, id_new, 0, 0);
85 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
87 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
88 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
89 # include <rubysig.h>
90 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
91 typedef void rb_unblock_function_t(void *);
92 typedef VALUE rb_blocking_function_t(void *);
93 static VALUE
94 rb_thread_blocking_region(
95 rb_blocking_function_t *func, void *data1,
96 rb_unblock_function_t *ubf, void *data2)
98 VALUE rv;
100 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
102 TRAP_BEG;
103 rv = func(data1);
104 TRAP_END;
106 return rv;
108 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
110 /* used to pass arguments to mq_open inside blocking region */
111 struct open_args {
112 int argc;
113 const char *name;
114 int oflags;
115 mode_t mode;
116 struct mq_attr attr;
119 /* used to pass arguments to mq_send/mq_receive inside blocking region */
120 struct rw_args {
121 mqd_t des;
122 char *msg_ptr;
123 size_t msg_len;
124 unsigned msg_prio;
125 struct timespec *timeout;
128 /* hope it's there..., TODO: a better version that works in rbx */
129 struct timeval rb_time_interval(VALUE);
131 static struct timespec *convert_timeout(struct timespec *dest, VALUE time)
133 struct timeval tv, now;
135 if (NIL_P(time))
136 return NULL;
138 tv = rb_time_interval(time); /* aggregate return :( */
139 gettimeofday(&now, NULL);
140 dest->tv_sec = now.tv_sec + tv.tv_sec;
141 dest->tv_nsec = (now.tv_usec + tv.tv_usec) * 1000;
143 if (dest->tv_nsec > 1000000000) {
144 dest->tv_nsec -= 1000000000;
145 dest->tv_sec++;
148 return dest;
151 /* runs without GVL */
152 static VALUE xopen(void *ptr)
154 struct open_args *x = ptr;
155 mqd_t rv;
157 switch (x->argc) {
158 case 2: rv = mq_open(x->name, x->oflags); break;
159 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
160 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
161 default: rv = MQD_INVALID;
164 return (VALUE)rv;
167 /* runs without GVL */
168 static VALUE xsend(void *ptr)
170 struct rw_args *x = ptr;
172 if (x->timeout)
173 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
174 x->msg_prio, x->timeout);
176 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
179 /* runs without GVL */
180 static VALUE xrecv(void *ptr)
182 struct rw_args *x = ptr;
184 if (x->timeout)
185 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
186 &x->msg_prio, x->timeout);
188 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
191 /* runs without GVL, path resolution may be slow */
192 static VALUE xunlink(void *ptr)
194 VALUE name = (VALUE)ptr;
196 return (VALUE)mq_unlink(RSTRING_PTR(name));
199 /* called by GC */
200 static void mark(void *ptr)
202 struct posix_mq *mq = ptr;
204 rb_gc_mark(mq->name);
205 rb_gc_mark(mq->thread);
206 MQ_IO_MARK(mq);
209 /* called by GC */
210 static void _free(void *ptr)
212 struct posix_mq *mq = ptr;
214 if (mq->des != MQD_INVALID) {
215 /* we ignore errors when gc-ing */
216 int saved_errno = errno;
218 mq_close(mq->des);
219 errno = saved_errno;
220 mq->des = MQD_INVALID;
224 /* automatically called at creation (before initialize) */
225 static VALUE alloc(VALUE klass)
227 struct posix_mq *mq;
228 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
230 mq->des = MQD_INVALID;
231 mq->msgsize = -1;
232 mq->name = Qnil;
233 mq->thread = Qnil;
234 MQ_IO_SET(mq, Qnil);
236 return rv;
239 /* unwraps the posix_mq struct from self */
240 static struct posix_mq *get(VALUE self, int need_valid)
242 struct posix_mq *mq;
244 Data_Get_Struct(self, struct posix_mq, mq);
246 if (need_valid && mq->des == MQD_INVALID)
247 rb_raise(rb_eIOError, "closed queue descriptor");
249 return mq;
252 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
253 static void attr_from_struct(struct mq_attr *attr, VALUE astruct, int all)
255 VALUE *ptr;
257 if (CLASS_OF(astruct) != cAttr)
258 rb_raise(rb_eArgError, "not a POSIX_MQ::Attr: %s",
259 RSTRING_PTR(rb_inspect(astruct)));
261 ptr = RSTRUCT_PTR(astruct);
263 attr->mq_flags = NUM2LONG(ptr[0]);
265 if (all || !NIL_P(ptr[1]))
266 attr->mq_maxmsg = NUM2LONG(ptr[1]);
267 if (all || !NIL_P(ptr[2]))
268 attr->mq_msgsize = NUM2LONG(ptr[2]);
269 if (!NIL_P(ptr[3]))
270 attr->mq_curmsgs = NUM2LONG(ptr[3]);
274 * call-seq:
275 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
277 * Opens a POSIX message queue given by +name+. +name+ should start
278 * with a slash ("/") for portable applications.
280 * If a Symbol is given in place of integer +flags+, then:
282 * * +:r+ is equivalent to IO::RDONLY
283 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
284 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
286 * +mode+ is an integer and only used when IO::CREAT is used.
287 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
288 * If +mq_attr+ is not specified when creating a queue, then the
289 * system defaults will be used.
291 * See the manpage for mq_open(3) for more details on this function.
293 static VALUE init(int argc, VALUE *argv, VALUE self)
295 struct posix_mq *mq = get(self, 0);
296 struct open_args x;
297 VALUE name, oflags, mode, attr;
299 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
301 if (TYPE(name) != T_STRING)
302 rb_raise(rb_eArgError, "name must be a string");
304 switch (TYPE(oflags)) {
305 case T_NIL:
306 x.oflags = O_RDONLY;
307 break;
308 case T_SYMBOL:
309 if (oflags == sym_r)
310 x.oflags = O_RDONLY;
311 else if (oflags == sym_w)
312 x.oflags = O_CREAT|O_WRONLY;
313 else if (oflags == sym_rw)
314 x.oflags = O_CREAT|O_RDWR;
315 else
316 rb_raise(rb_eArgError,
317 "symbol must be :r, :w, or :rw: %s",
318 RSTRING_PTR(rb_inspect(oflags)));
319 break;
320 case T_BIGNUM:
321 case T_FIXNUM:
322 x.oflags = NUM2INT(oflags);
323 break;
324 default:
325 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
328 x.name = RSTRING_PTR(name);
329 x.argc = 2;
331 switch (TYPE(mode)) {
332 case T_FIXNUM:
333 x.argc = 3;
334 x.mode = NUM2INT(mode);
335 break;
336 case T_NIL:
337 if (x.oflags & O_CREAT) {
338 x.argc = 3;
339 x.mode = 0666;
341 break;
342 default:
343 rb_raise(rb_eArgError, "mode not an integer");
346 switch (TYPE(attr)) {
347 case T_STRUCT:
348 x.argc = 4;
349 attr_from_struct(&x.attr, attr, 1);
351 /* principle of least surprise */
352 if (x.attr.mq_flags & O_NONBLOCK)
353 x.oflags |= O_NONBLOCK;
354 break;
355 case T_NIL:
356 break;
357 default:
358 rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
359 RSTRING_PTR(rb_inspect(attr)));
362 mq->des = (mqd_t)rb_thread_blocking_region(xopen, &x, RUBY_UBF_IO, 0);
363 if (mq->des == MQD_INVALID)
364 rb_sys_fail("mq_open");
366 mq->name = rb_str_dup(name);
368 return self;
372 * call-seq:
373 * POSIX_MQ.unlink(name) => 1
375 * Unlinks the message queue given by +name+. The queue will be destroyed
376 * when the last process with the queue open closes its queue descriptors.
378 static VALUE s_unlink(VALUE self, VALUE name)
380 mqd_t rv;
381 void *ptr = (void *)name;
383 if (TYPE(name) != T_STRING)
384 rb_raise(rb_eArgError, "argument must be a string");
386 rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
387 if (rv == MQD_INVALID)
388 rb_sys_fail("mq_unlink");
390 return INT2NUM(1);
394 * call-seq:
395 * mq.unlink => mq
397 * Unlinks the message queue to prevent other processes from accessing it.
398 * All existing queue descriptors to this queue including those opened by
399 * other processes are unaffected. The queue will only be destroyed
400 * when the last process with open descriptors to this queue closes
401 * the descriptors.
403 static VALUE _unlink(VALUE self)
405 struct posix_mq *mq = get(self, 0);
406 mqd_t rv;
407 void *ptr = (void *)mq->name;
409 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
411 rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
412 if (rv == MQD_INVALID)
413 rb_sys_fail("mq_unlink");
415 return self;
418 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
420 buffer = rb_obj_as_string(buffer);
421 x->msg_ptr = RSTRING_PTR(buffer);
422 x->msg_len = (size_t)RSTRING_LEN(buffer);
426 * call-seq:
427 * mq.send(string [,priority[, timeout]]) => nil
429 * Inserts the given +string+ into the message queue with an optional,
430 * unsigned integer +priority+. If the optional +timeout+ is specified,
431 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
432 * before +timeout+ seconds has elapsed. Without +timeout+, this method
433 * may block until the queue is writable.
435 static VALUE _send(int argc, VALUE *argv, VALUE self)
437 struct posix_mq *mq = get(self, 1);
438 struct rw_args x;
439 VALUE buffer, prio, timeout;
440 mqd_t rv;
441 struct timespec expire;
443 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
445 setup_send_buffer(&x, buffer);
446 x.des = mq->des;
447 x.timeout = convert_timeout(&expire, timeout);
448 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
450 rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
451 if (rv == MQD_INVALID)
452 rb_sys_fail("mq_send");
454 return Qnil;
458 * call-seq:
459 * mq << string => mq
461 * Inserts the given +string+ into the message queue with a
462 * default priority of 0 and no timeout.
464 static VALUE send0(VALUE self, VALUE buffer)
466 struct posix_mq *mq = get(self, 1);
467 struct rw_args x;
468 mqd_t rv;
470 setup_send_buffer(&x, buffer);
471 x.des = mq->des;
472 x.timeout = NULL;
473 x.msg_prio = 0;
475 rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
476 if (rv == MQD_INVALID)
477 rb_sys_fail("mq_send");
479 return self;
482 #ifdef MQD_TO_FD
484 * call-seq:
485 * mq.to_io => IO
487 * Returns an IO.select-able +IO+ object. This method is only available
488 * under Linux and is not intended to be portable.
490 static VALUE to_io(VALUE self)
492 struct posix_mq *mq = get(self, 1);
493 int fd = MQD_TO_FD(mq->des);
495 if (NIL_P(mq->io))
496 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
498 return mq->io;
500 #endif
502 static void get_msgsize(struct posix_mq *mq)
504 struct mq_attr attr;
506 if (mq_getattr(mq->des, &attr) < 0)
507 rb_sys_fail("mq_getattr");
509 mq->msgsize = attr.mq_msgsize;
512 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
515 * call-seq:
516 * mq.receive([buffer, [timeout]]) => [ message, priority ]
518 * Takes the highest priority message off the queue and returns
519 * an array containing the message as a String and the Integer
520 * priority of the message.
522 * If the optional +buffer+ is present, then it must be a String
523 * which will receive the data.
525 * If the optional +timeout+ is present, then it may be a Float
526 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
527 * will be raised if +timeout+ has elapsed and there are no messages
528 * in the queue.
530 static VALUE receive(int argc, VALUE *argv, VALUE self)
532 return _receive(1, argc, argv, self);
536 * call-seq:
537 * mq.shift([buffer, [timeout]]) => message
539 * Takes the highest priority message off the queue and returns
540 * the message as a String.
542 * If the optional +buffer+ is present, then it must be a String
543 * which will receive the data.
545 * If the optional +timeout+ is present, then it may be a Float
546 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
547 * will be raised if +timeout+ has elapsed and there are no messages
548 * in the queue.
550 static VALUE shift(int argc, VALUE *argv, VALUE self)
552 return _receive(0, argc, argv, self);
555 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
557 struct posix_mq *mq = get(self, 1);
558 struct rw_args x;
559 VALUE buffer, timeout;
560 ssize_t r;
561 struct timespec expire;
563 if (mq->msgsize < 0)
564 get_msgsize(mq);
566 rb_scan_args(argc, argv, "02", &buffer, &timeout);
567 x.timeout = convert_timeout(&expire, timeout);
569 if (NIL_P(buffer)) {
570 buffer = rb_str_new(0, mq->msgsize);
571 } else {
572 StringValue(buffer);
573 rb_str_modify(buffer);
574 rb_str_resize(buffer, mq->msgsize);
576 OBJ_TAINT(buffer);
577 x.msg_ptr = RSTRING_PTR(buffer);
578 x.msg_len = (size_t)mq->msgsize;
579 x.des = mq->des;
581 r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
582 if (r < 0)
583 rb_sys_fail("mq_receive");
585 rb_str_set_len(buffer, r);
587 if (wantarray)
588 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
589 return buffer;
593 * call-seq:
594 * mq.attr => mq_attr
596 * Returns a POSIX_MQ::Attr struct containing the attributes
597 * of the message queue. See the mq_getattr(3) manpage for
598 * more details.
600 static VALUE getattr(VALUE self)
602 struct posix_mq *mq = get(self, 1);
603 struct mq_attr attr;
604 VALUE astruct;
605 VALUE *ptr;
607 if (mq_getattr(mq->des, &attr) < 0)
608 rb_sys_fail("mq_getattr");
610 astruct = rb_struct_alloc_noinit(cAttr);
611 ptr = RSTRUCT_PTR(astruct);
612 ptr[0] = LONG2NUM(attr.mq_flags);
613 ptr[1] = LONG2NUM(attr.mq_maxmsg);
614 ptr[2] = LONG2NUM(attr.mq_msgsize);
615 ptr[3] = LONG2NUM(attr.mq_curmsgs);
617 return astruct;
621 * call-seq:
622 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
624 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
625 * See the mq_setattr(3) manpage for more details.
627 * Consider using the POSIX_MQ#nonblock= method as it is easier and
628 * more natural to use.
630 static VALUE setattr(VALUE self, VALUE astruct)
632 struct posix_mq *mq = get(self, 1);
633 struct mq_attr newattr;
635 attr_from_struct(&newattr, astruct, 0);
637 if (mq_setattr(mq->des, &newattr, NULL) < 0)
638 rb_sys_fail("mq_setattr");
640 return astruct;
644 * call-seq:
645 * mq.close => nil
647 * Closes the underlying message queue descriptor.
648 * If this descriptor had a registered notification request, the request
649 * will be removed so another descriptor or process may register a
650 * notification request. Message queue descriptors are automatically
651 * closed by garbage collection.
653 static VALUE _close(VALUE self)
655 struct posix_mq *mq = get(self, 1);
657 if (mq_close(mq->des) < 0)
658 rb_sys_fail("mq_close");
660 mq->des = MQD_INVALID;
661 MQ_IO_SET(mq, Qnil);
663 return Qnil;
667 * call-seq:
668 * mq.closed? => true or false
670 * Returns +true+ if the message queue descriptor is closed and therefore
671 * unusable, otherwise +false+
673 static VALUE closed(VALUE self)
675 struct posix_mq *mq = get(self, 0);
677 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
681 * call-seq:
682 * mq.name => string
684 * Returns the string name of message queue associated with +mq+
686 static VALUE name(VALUE self)
688 struct posix_mq *mq = get(self, 0);
690 return mq->name;
693 static int lookup_sig(VALUE sig)
695 static VALUE list;
696 const char *ptr;
697 long len;
699 sig = rb_obj_as_string(sig);
700 len = RSTRING_LEN(sig);
701 ptr = RSTRING_PTR(sig);
703 if (len > 3 && !memcmp("SIG", ptr, 3))
704 sig = rb_str_new(ptr + 3, len - 3);
706 if (!list) {
707 VALUE mSignal = rb_define_module("Signal"""); /* avoid RDoc */
709 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
710 rb_global_variable(&list);
713 sig = rb_hash_aref(list, sig);
714 if (NIL_P(sig))
715 rb_raise(rb_eArgError, "invalid signal: %s\n",
716 RSTRING_PTR(rb_inspect(sig)));
718 return NUM2INT(sig);
721 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
722 static void thread_notify_fd(union sigval sv)
724 int fd = sv.sival_int;
726 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
729 static void setup_notify_io(struct sigevent *not, VALUE io)
731 VALUE fileno = rb_funcall(io, id_fileno, 0, 0);
732 int fd = NUM2INT(fileno);
733 int flags;
734 pthread_attr_t attr;
735 int e;
738 * fd going to be written to inside a native thread,
739 * make it blocking for simplicity
741 flags = fcntl(fd, F_GETFL);
742 if (flags < 0) {
743 rb_sys_fail("fcntl F_GETFL");
744 } else if (flags & O_NONBLOCK) {
745 flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
746 if (flags < 0)
747 rb_sys_fail("fcntl F_SETFL");
750 if ((e = pthread_attr_init(&attr)))
751 goto err;
752 if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
753 goto err;
754 #ifdef PTHREAD_STACK_MIN
755 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
756 #else
757 # warning PTHREAD_STACK_MIN not available,
758 #endif
759 not->sigev_notify = SIGEV_THREAD;
760 not->sigev_notify_function = thread_notify_fd;
761 not->sigev_notify_attributes = &attr;
762 not->sigev_value.sival_int = fd;
763 return;
764 err:
765 rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
769 * call-seq:
770 * mq.notify = signal => signal
772 * Registers the notification request to deliver a given +signal+
773 * to the current process when message is received.
774 * If +signal+ is +nil+, it will unregister and disable the notification
775 * request to allow other processes to register a request.
776 * If +signal+ is +false+, it will register a no-op notification request
777 * which will prevent other processes from registering a notification.
778 * If +signal+ is an +IO+ object, it will spawn a thread upon the
779 * arrival of the next message and write one "\\0" byte to the file
780 * descriptor belonging to that IO object.
781 * Only one process may have a notification request for a queue
782 * at a time, Errno::EBUSY will be raised if there is already
783 * a notification request registration for the queue.
785 * Notifications are only fired once and processes must reregister
786 * for subsequent notifications.
788 * For readers of the mq_notify(3) manpage, passing +false+
789 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
790 * of passing a NULL notification pointer to mq_notify(3).
792 static VALUE setnotify(VALUE self, VALUE arg)
794 struct posix_mq *mq = get(self, 1);
795 struct sigevent not;
796 struct sigevent * notification = &not;
797 VALUE rv = arg;
799 if (!NIL_P(mq->thread)) {
800 rb_funcall(mq->thread, id_kill, 0, 0);
801 mq->thread = Qnil;
803 not.sigev_notify = SIGEV_SIGNAL;
805 switch (TYPE(arg)) {
806 case T_FALSE:
807 not.sigev_notify = SIGEV_NONE;
808 break;
809 case T_NIL:
810 notification = NULL;
811 break;
812 case T_FIXNUM:
813 not.sigev_signo = NUM2INT(arg);
814 break;
815 case T_SYMBOL:
816 case T_STRING:
817 not.sigev_signo = lookup_sig(arg);
818 rv = INT2NUM(not.sigev_signo);
819 break;
820 case T_FILE:
821 setup_notify_io(&not, arg);
822 break;
823 default:
824 /* maybe support Proc+thread via sigev_notify_function.. */
825 rb_raise(rb_eArgError, "must be a signal or nil");
828 if (mq_notify(mq->des, notification) < 0)
829 rb_sys_fail("mq_notify");
831 return rv;
835 * call-seq:
836 * mq.nonblock? => true or false
838 * Returns the current non-blocking state of the message queue descriptor.
840 static VALUE getnonblock(VALUE self)
842 struct mq_attr attr;
843 struct posix_mq *mq = get(self, 1);
845 if (mq_getattr(mq->des, &attr) < 0)
846 rb_sys_fail("mq_getattr");
848 mq->msgsize = attr.mq_msgsize; /* optimization */
850 return attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
854 * call-seq:
855 * mq.nonblock = boolean => boolean
857 * Enables or disables non-blocking operation for the message queue
858 * descriptor. Errno::EAGAIN will be raised in situations where
859 * the queue would block. This is not compatible with +timeout+
860 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
862 static VALUE setnonblock(VALUE self, VALUE nb)
864 struct mq_attr newattr, oldattr;
865 struct posix_mq *mq = get(self, 1);
867 if (nb == Qtrue)
868 newattr.mq_flags = O_NONBLOCK;
869 else if (nb == Qfalse)
870 newattr.mq_flags = 0;
871 else
872 rb_raise(rb_eArgError, "must be true or false");
874 if (mq_setattr(mq->des, &newattr, &oldattr) < 0)
875 rb_sys_fail("mq_setattr");
877 mq->msgsize = oldattr.mq_msgsize; /* optimization */
879 return nb;
882 /* :nodoc: */
883 static VALUE setnotifythread(VALUE self, VALUE thread)
885 struct posix_mq *mq = get(self, 1);
887 mq->thread = thread;
888 return thread;
891 void Init_posix_mq_ext(void)
893 cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
894 rb_define_alloc_func(cPOSIX_MQ, alloc);
895 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
898 * The maximum number of open message descriptors supported
899 * by the system. This may be -1, in which case it is dynamically
900 * set at runtime. Consult your operating system documentation
901 * for system-specific information about this.
903 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
904 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
907 * The maximum priority that may be specified for POSIX_MQ#send
908 * On POSIX-compliant systems, this is at least 31, but some
909 * systems allow higher limits.
910 * The minimum priority is always zero.
912 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
913 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
915 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
917 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
918 rb_define_method(cPOSIX_MQ, "send", _send, -1);
919 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
920 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
921 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
922 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
923 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
924 rb_define_method(cPOSIX_MQ, "close", _close, 0);
925 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
926 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
927 rb_define_method(cPOSIX_MQ, "name", name, 0);
928 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
929 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
930 rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
931 rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
932 #ifdef MQD_TO_FD
933 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
934 #endif
936 id_new = rb_intern("new");
937 id_kill = rb_intern("kill");
938 id_fileno = rb_intern("fileno");
939 sym_r = ID2SYM(rb_intern("r"));
940 sym_w = ID2SYM(rb_intern("w"));
941 sym_rw = ID2SYM(rb_intern("rw"));