cleaner lookup "Signal" of constant
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob4c9d2ea8666fe5c3f470f0d580a9c2c689bf10c2
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #include <time.h>
14 #include <mqueue.h>
15 #include <fcntl.h>
16 #include <sys/stat.h>
17 #include <errno.h>
18 #include <assert.h>
19 #include <unistd.h>
21 #if defined(__linux__)
22 # define MQD_TO_FD(mqd) (int)(mqd)
23 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
24 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
25 #else
26 # define MQ_IO_MARK(mq) ((void)(0))
27 # define MQ_IO_SET(mq,val) ((void)(0))
28 #endif
30 #ifdef MQD_TO_FD
31 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
32 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
33 #endif
35 struct posix_mq {
36 mqd_t des;
37 struct mq_attr attr;
38 VALUE name;
39 VALUE thread;
40 #ifdef MQD_TO_FD
41 VALUE io;
42 #endif
45 static VALUE cPOSIX_MQ, cAttr;
46 static ID id_new, id_kill, id_fileno;
47 static ID sym_r, sym_w, sym_rw;
48 static const mqd_t MQD_INVALID = (mqd_t)-1;
50 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
51 #ifndef RSTRING_PTR
52 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
53 #endif
54 #ifndef RSTRING_LEN
55 # define RSTRING_LEN(s) (RSTRING(s)->len)
56 #endif
57 #ifndef RSTRUCT_PTR
58 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
59 #endif
60 #ifndef RSTRUCT_LEN
61 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
62 #endif
64 #ifndef HAVE_RB_STR_SET_LEN
65 # ifdef RUBINIUS
66 # define rb_str_set_len(str,len) rb_str_resize(str,len)
67 # else /* 1.8.6 optimized version */
68 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
69 static void rb_18_str_set_len(VALUE str, long len)
71 RSTRING(str)->len = len;
72 RSTRING(str)->ptr[len] = '\0';
74 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
75 # endif /* ! RUBINIUS */
76 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
78 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
79 static VALUE rb_struct_alloc_noinit(VALUE class)
81 return rb_funcall(class, id_new, 0, 0);
83 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
85 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
86 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
87 # include <rubysig.h>
88 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
89 typedef void rb_unblock_function_t(void *);
90 typedef VALUE rb_blocking_function_t(void *);
91 static VALUE
92 rb_thread_blocking_region(
93 rb_blocking_function_t *func, void *data1,
94 rb_unblock_function_t *ubf, void *data2)
96 VALUE rv;
98 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
100 TRAP_BEG;
101 rv = func(data1);
102 TRAP_END;
104 return rv;
106 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
108 /* used to pass arguments to mq_open inside blocking region */
109 struct open_args {
110 int argc;
111 const char *name;
112 int oflags;
113 mode_t mode;
114 struct mq_attr attr;
117 /* used to pass arguments to mq_send/mq_receive inside blocking region */
118 struct rw_args {
119 mqd_t des;
120 char *msg_ptr;
121 size_t msg_len;
122 unsigned msg_prio;
123 struct timespec *timeout;
126 /* hope it's there..., TODO: a better version that works in rbx */
127 struct timeval rb_time_interval(VALUE);
129 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
131 struct timeval tv, now;
133 if (NIL_P(t))
134 return NULL;
136 tv = rb_time_interval(t); /* aggregate return :( */
137 gettimeofday(&now, NULL);
138 dest->tv_sec = now.tv_sec + tv.tv_sec;
139 dest->tv_nsec = (now.tv_usec + tv.tv_usec) * 1000;
141 if (dest->tv_nsec > 1000000000) {
142 dest->tv_nsec -= 1000000000;
143 dest->tv_sec++;
146 return dest;
149 /* (may) run without GVL */
150 static VALUE xopen(void *ptr)
152 struct open_args *x = ptr;
153 mqd_t rv;
155 switch (x->argc) {
156 case 2: rv = mq_open(x->name, x->oflags); break;
157 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
158 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
159 default: rv = MQD_INVALID;
162 return (VALUE)rv;
165 /* runs without GVL */
166 static VALUE xsend(void *ptr)
168 struct rw_args *x = ptr;
170 if (x->timeout)
171 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
172 x->msg_prio, x->timeout);
174 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
177 /* runs without GVL */
178 static VALUE xrecv(void *ptr)
180 struct rw_args *x = ptr;
182 if (x->timeout)
183 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
184 &x->msg_prio, x->timeout);
186 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
189 /* called by GC */
190 static void mark(void *ptr)
192 struct posix_mq *mq = ptr;
194 rb_gc_mark(mq->name);
195 rb_gc_mark(mq->thread);
196 MQ_IO_MARK(mq);
199 /* called by GC */
200 static void _free(void *ptr)
202 struct posix_mq *mq = ptr;
204 if (mq->des != MQD_INVALID) {
205 /* we ignore errors when gc-ing */
206 int saved_errno = errno;
208 mq_close(mq->des);
209 errno = saved_errno;
211 xfree(ptr);
214 /* automatically called at creation (before initialize) */
215 static VALUE alloc(VALUE klass)
217 struct posix_mq *mq;
218 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
220 mq->des = MQD_INVALID;
221 mq->attr.mq_flags = 0;
222 mq->attr.mq_maxmsg = 0;
223 mq->attr.mq_msgsize = -1;
224 mq->attr.mq_curmsgs = 0;
225 mq->name = Qnil;
226 mq->thread = Qnil;
227 MQ_IO_SET(mq, Qnil);
229 return rv;
232 /* unwraps the posix_mq struct from self */
233 static struct posix_mq *get(VALUE self, int need_valid)
235 struct posix_mq *mq;
237 Data_Get_Struct(self, struct posix_mq, mq);
239 if (need_valid && mq->des == MQD_INVALID)
240 rb_raise(rb_eIOError, "closed queue descriptor");
242 return mq;
245 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
246 static void attr_from_struct(struct mq_attr *attr, VALUE astruct, int all)
248 VALUE *ptr;
250 if (CLASS_OF(astruct) != cAttr)
251 rb_raise(rb_eArgError, "not a POSIX_MQ::Attr: %s",
252 RSTRING_PTR(rb_inspect(astruct)));
254 ptr = RSTRUCT_PTR(astruct);
256 attr->mq_flags = NUM2LONG(ptr[0]);
258 if (all || !NIL_P(ptr[1]))
259 attr->mq_maxmsg = NUM2LONG(ptr[1]);
260 if (all || !NIL_P(ptr[2]))
261 attr->mq_msgsize = NUM2LONG(ptr[2]);
262 if (!NIL_P(ptr[3]))
263 attr->mq_curmsgs = NUM2LONG(ptr[3]);
267 * call-seq:
268 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
270 * Opens a POSIX message queue given by +name+. +name+ should start
271 * with a slash ("/") for portable applications.
273 * If a Symbol is given in place of integer +flags+, then:
275 * * +:r+ is equivalent to IO::RDONLY
276 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
277 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
279 * +mode+ is an integer and only used when IO::CREAT is used.
280 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
281 * If +mq_attr+ is not specified when creating a queue, then the
282 * system defaults will be used.
284 * See the manpage for mq_open(3) for more details on this function.
286 static VALUE init(int argc, VALUE *argv, VALUE self)
288 struct posix_mq *mq = get(self, 0);
289 struct open_args x;
290 VALUE name, oflags, mode, attr;
292 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
294 if (TYPE(name) != T_STRING)
295 rb_raise(rb_eArgError, "name must be a string");
297 switch (TYPE(oflags)) {
298 case T_NIL:
299 x.oflags = O_RDONLY;
300 break;
301 case T_SYMBOL:
302 if (oflags == sym_r)
303 x.oflags = O_RDONLY;
304 else if (oflags == sym_w)
305 x.oflags = O_CREAT|O_WRONLY;
306 else if (oflags == sym_rw)
307 x.oflags = O_CREAT|O_RDWR;
308 else
309 rb_raise(rb_eArgError,
310 "symbol must be :r, :w, or :rw: %s",
311 RSTRING_PTR(rb_inspect(oflags)));
312 break;
313 case T_BIGNUM:
314 case T_FIXNUM:
315 x.oflags = NUM2INT(oflags);
316 break;
317 default:
318 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
321 x.name = RSTRING_PTR(name);
322 x.argc = 2;
324 switch (TYPE(mode)) {
325 case T_FIXNUM:
326 x.argc = 3;
327 x.mode = NUM2UINT(mode);
328 break;
329 case T_NIL:
330 if (x.oflags & O_CREAT) {
331 x.argc = 3;
332 x.mode = 0666;
334 break;
335 default:
336 rb_raise(rb_eArgError, "mode not an integer");
339 switch (TYPE(attr)) {
340 case T_STRUCT:
341 x.argc = 4;
342 attr_from_struct(&x.attr, attr, 1);
344 /* principle of least surprise */
345 if (x.attr.mq_flags & O_NONBLOCK)
346 x.oflags |= O_NONBLOCK;
347 break;
348 case T_NIL:
349 break;
350 default:
351 rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
352 RSTRING_PTR(rb_inspect(attr)));
355 mq->des = (mqd_t)xopen(&x);
356 if (mq->des == MQD_INVALID)
357 rb_sys_fail("mq_open");
359 mq->name = rb_str_dup(name);
360 if (x.oflags & O_NONBLOCK)
361 mq->attr.mq_flags = O_NONBLOCK;
363 return self;
367 * call-seq:
368 * POSIX_MQ.unlink(name) => 1
370 * Unlinks the message queue given by +name+. The queue will be destroyed
371 * when the last process with the queue open closes its queue descriptors.
373 static VALUE s_unlink(VALUE self, VALUE name)
375 mqd_t rv;
377 if (TYPE(name) != T_STRING)
378 rb_raise(rb_eArgError, "argument must be a string");
380 rv = mq_unlink(RSTRING_PTR(name));
381 if (rv == MQD_INVALID)
382 rb_sys_fail("mq_unlink");
384 return INT2NUM(1);
388 * call-seq:
389 * mq.unlink => mq
391 * Unlinks the message queue to prevent other processes from accessing it.
392 * All existing queue descriptors to this queue including those opened by
393 * other processes are unaffected. The queue will only be destroyed
394 * when the last process with open descriptors to this queue closes
395 * the descriptors.
397 static VALUE _unlink(VALUE self)
399 struct posix_mq *mq = get(self, 0);
400 mqd_t rv;
402 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
404 rv = mq_unlink(RSTRING_PTR(mq->name));
405 if (rv == MQD_INVALID)
406 rb_sys_fail("mq_unlink");
408 return self;
411 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
413 buffer = rb_obj_as_string(buffer);
414 x->msg_ptr = RSTRING_PTR(buffer);
415 x->msg_len = (size_t)RSTRING_LEN(buffer);
419 * call-seq:
420 * mq.send(string [,priority[, timeout]]) => nil
422 * Inserts the given +string+ into the message queue with an optional,
423 * unsigned integer +priority+. If the optional +timeout+ is specified,
424 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
425 * before +timeout+ seconds has elapsed. Without +timeout+, this method
426 * may block until the queue is writable.
428 static VALUE _send(int argc, VALUE *argv, VALUE self)
430 struct posix_mq *mq = get(self, 1);
431 struct rw_args x;
432 VALUE buffer, prio, timeout;
433 mqd_t rv;
434 struct timespec expire;
436 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
438 setup_send_buffer(&x, buffer);
439 x.des = mq->des;
440 x.timeout = convert_timeout(&expire, timeout);
441 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
443 if (mq->attr.mq_flags & O_NONBLOCK)
444 rv = (mqd_t)xsend(&x);
445 else
446 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
447 RUBY_UBF_IO, 0);
448 if (rv == MQD_INVALID)
449 rb_sys_fail("mq_send");
451 return Qnil;
455 * call-seq:
456 * mq << string => mq
458 * Inserts the given +string+ into the message queue with a
459 * default priority of 0 and no timeout.
461 static VALUE send0(VALUE self, VALUE buffer)
463 struct posix_mq *mq = get(self, 1);
464 struct rw_args x;
465 mqd_t rv;
467 setup_send_buffer(&x, buffer);
468 x.des = mq->des;
469 x.timeout = NULL;
470 x.msg_prio = 0;
472 if (mq->attr.mq_flags & O_NONBLOCK)
473 rv = (mqd_t)xsend(&x);
474 else
475 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
476 RUBY_UBF_IO, 0);
478 if (rv == MQD_INVALID)
479 rb_sys_fail("mq_send");
481 return self;
484 #ifdef MQD_TO_FD
486 * call-seq:
487 * mq.to_io => IO
489 * Returns an IO.select-able +IO+ object. This method is only available
490 * under Linux and FreeBSD and is not intended to be portable.
492 static VALUE to_io(VALUE self)
494 struct posix_mq *mq = get(self, 1);
495 int fd = MQD_TO_FD(mq->des);
497 if (NIL_P(mq->io))
498 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
500 return mq->io;
502 #endif
504 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
507 * call-seq:
508 * mq.receive([buffer, [timeout]]) => [ message, priority ]
510 * Takes the highest priority message off the queue and returns
511 * an array containing the message as a String and the Integer
512 * priority of the message.
514 * If the optional +buffer+ is present, then it must be a String
515 * which will receive the data.
517 * If the optional +timeout+ is present, then it may be a Float
518 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
519 * will be raised if +timeout+ has elapsed and there are no messages
520 * in the queue.
522 static VALUE receive(int argc, VALUE *argv, VALUE self)
524 return _receive(1, argc, argv, self);
528 * call-seq:
529 * mq.shift([buffer, [timeout]]) => message
531 * Takes the highest priority message off the queue and returns
532 * the message as a String.
534 * If the optional +buffer+ is present, then it must be a String
535 * which will receive the data.
537 * If the optional +timeout+ is present, then it may be a Float
538 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
539 * will be raised if +timeout+ has elapsed and there are no messages
540 * in the queue.
542 static VALUE shift(int argc, VALUE *argv, VALUE self)
544 return _receive(0, argc, argv, self);
547 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
549 struct posix_mq *mq = get(self, 1);
550 struct rw_args x;
551 VALUE buffer, timeout;
552 ssize_t r;
553 struct timespec expire;
555 if (mq->attr.mq_msgsize < 0) {
556 if (mq_getattr(mq->des, &mq->attr) < 0)
557 rb_sys_fail("mq_getattr");
560 rb_scan_args(argc, argv, "02", &buffer, &timeout);
561 x.timeout = convert_timeout(&expire, timeout);
563 if (NIL_P(buffer)) {
564 buffer = rb_str_new(0, mq->attr.mq_msgsize);
565 } else {
566 StringValue(buffer);
567 rb_str_modify(buffer);
568 rb_str_resize(buffer, mq->attr.mq_msgsize);
570 OBJ_TAINT(buffer);
571 x.msg_ptr = RSTRING_PTR(buffer);
572 x.msg_len = (size_t)mq->attr.mq_msgsize;
573 x.des = mq->des;
575 if (mq->attr.mq_flags & O_NONBLOCK) {
576 r = (ssize_t)xrecv(&x);
577 } else {
578 r = (ssize_t)rb_thread_blocking_region(xrecv, &x,
579 RUBY_UBF_IO, 0);
581 if (r < 0)
582 rb_sys_fail("mq_receive");
584 rb_str_set_len(buffer, r);
586 if (wantarray)
587 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
588 return buffer;
592 * call-seq:
593 * mq.attr => mq_attr
595 * Returns a POSIX_MQ::Attr struct containing the attributes
596 * of the message queue. See the mq_getattr(3) manpage for
597 * more details.
599 static VALUE getattr(VALUE self)
601 struct posix_mq *mq = get(self, 1);
602 VALUE astruct;
603 VALUE *ptr;
605 if (mq_getattr(mq->des, &mq->attr) < 0)
606 rb_sys_fail("mq_getattr");
608 astruct = rb_struct_alloc_noinit(cAttr);
609 ptr = RSTRUCT_PTR(astruct);
610 ptr[0] = LONG2NUM(mq->attr.mq_flags);
611 ptr[1] = LONG2NUM(mq->attr.mq_maxmsg);
612 ptr[2] = LONG2NUM(mq->attr.mq_msgsize);
613 ptr[3] = LONG2NUM(mq->attr.mq_curmsgs);
615 return astruct;
619 * call-seq:
620 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
622 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
623 * See the mq_setattr(3) manpage for more details.
625 * Consider using the POSIX_MQ#nonblock= method as it is easier and
626 * more natural to use.
628 static VALUE setattr(VALUE self, VALUE astruct)
630 struct posix_mq *mq = get(self, 1);
631 struct mq_attr newattr;
633 attr_from_struct(&newattr, astruct, 0);
635 if (mq_setattr(mq->des, &newattr, NULL) < 0)
636 rb_sys_fail("mq_setattr");
638 return astruct;
642 * call-seq:
643 * mq.close => nil
645 * Closes the underlying message queue descriptor.
646 * If this descriptor had a registered notification request, the request
647 * will be removed so another descriptor or process may register a
648 * notification request. Message queue descriptors are automatically
649 * closed by garbage collection.
651 static VALUE _close(VALUE self)
653 struct posix_mq *mq = get(self, 1);
655 if (mq_close(mq->des) < 0)
656 rb_sys_fail("mq_close");
658 mq->des = MQD_INVALID;
659 MQ_IO_SET(mq, Qnil);
661 return Qnil;
665 * call-seq:
666 * mq.closed? => true or false
668 * Returns +true+ if the message queue descriptor is closed and therefore
669 * unusable, otherwise +false+
671 static VALUE closed(VALUE self)
673 struct posix_mq *mq = get(self, 0);
675 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
679 * call-seq:
680 * mq.name => string
682 * Returns the string name of message queue associated with +mq+
684 static VALUE name(VALUE self)
686 struct posix_mq *mq = get(self, 0);
688 return rb_str_dup(mq->name);
691 static int lookup_sig(VALUE sig)
693 static VALUE list;
694 const char *ptr;
695 long len;
697 sig = rb_obj_as_string(sig);
698 len = RSTRING_LEN(sig);
699 ptr = RSTRING_PTR(sig);
701 if (len > 3 && !memcmp("SIG", ptr, 3))
702 sig = rb_str_new(ptr + 3, len - 3);
704 if (!list) {
705 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
707 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
708 rb_global_variable(&list);
711 sig = rb_hash_aref(list, sig);
712 if (NIL_P(sig))
713 rb_raise(rb_eArgError, "invalid signal: %s\n",
714 RSTRING_PTR(rb_inspect(sig)));
716 return NUM2INT(sig);
719 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
720 static void thread_notify_fd(union sigval sv)
722 int fd = sv.sival_int;
724 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
728 * TODO: Under Linux, we could just use netlink directly
729 * the same way glibc does...
731 static void setup_notify_io(struct sigevent *not, VALUE io)
733 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
734 pthread_attr_t attr;
735 int e;
737 if ((e = pthread_attr_init(&attr)))
738 goto err;
739 if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
740 goto err;
741 #ifdef PTHREAD_STACK_MIN
742 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
743 #endif
744 not->sigev_notify = SIGEV_THREAD;
745 not->sigev_notify_function = thread_notify_fd;
746 not->sigev_notify_attributes = &attr;
747 not->sigev_value.sival_int = fd;
748 return;
749 err:
750 rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
754 * call-seq:
755 * mq.notify = signal => signal
757 * Registers the notification request to deliver a given +signal+
758 * to the current process when message is received.
759 * If +signal+ is +nil+, it will unregister and disable the notification
760 * request to allow other processes to register a request.
761 * If +signal+ is +false+, it will register a no-op notification request
762 * which will prevent other processes from registering a notification.
763 * If +signal+ is an +IO+ object, it will spawn a thread upon the
764 * arrival of the next message and write one "\\0" byte to the file
765 * descriptor belonging to that IO object.
766 * Only one process may have a notification request for a queue
767 * at a time, Errno::EBUSY will be raised if there is already
768 * a notification request registration for the queue.
770 * Notifications are only fired once and processes must reregister
771 * for subsequent notifications.
773 * For readers of the mq_notify(3) manpage, passing +false+
774 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
775 * of passing a NULL notification pointer to mq_notify(3).
777 static VALUE setnotify(VALUE self, VALUE arg)
779 struct posix_mq *mq = get(self, 1);
780 struct sigevent not;
781 struct sigevent * notification = &not;
782 VALUE rv = arg;
784 if (!NIL_P(mq->thread)) {
785 rb_funcall(mq->thread, id_kill, 0, 0);
786 mq->thread = Qnil;
788 not.sigev_notify = SIGEV_SIGNAL;
790 switch (TYPE(arg)) {
791 case T_FALSE:
792 not.sigev_notify = SIGEV_NONE;
793 break;
794 case T_NIL:
795 notification = NULL;
796 break;
797 case T_FIXNUM:
798 not.sigev_signo = NUM2INT(arg);
799 break;
800 case T_SYMBOL:
801 case T_STRING:
802 not.sigev_signo = lookup_sig(arg);
803 rv = INT2NUM(not.sigev_signo);
804 break;
805 case T_FILE:
806 setup_notify_io(&not, arg);
807 break;
808 default:
809 /* maybe support Proc+thread via sigev_notify_function.. */
810 rb_raise(rb_eArgError, "must be a signal or nil");
813 if (mq_notify(mq->des, notification) < 0)
814 rb_sys_fail("mq_notify");
816 return rv;
820 * call-seq:
821 * mq.nonblock? => true or false
823 * Returns the current non-blocking state of the message queue descriptor.
825 static VALUE getnonblock(VALUE self)
827 struct posix_mq *mq = get(self, 1);
829 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
833 * call-seq:
834 * mq.nonblock = boolean => boolean
836 * Enables or disables non-blocking operation for the message queue
837 * descriptor. Errno::EAGAIN will be raised in situations where
838 * the queue would block. This is not compatible with +timeout+
839 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
841 static VALUE setnonblock(VALUE self, VALUE nb)
843 struct mq_attr newattr;
844 struct posix_mq *mq = get(self, 1);
846 if (nb == Qtrue)
847 newattr.mq_flags = O_NONBLOCK;
848 else if (nb == Qfalse)
849 newattr.mq_flags = 0;
850 else
851 rb_raise(rb_eArgError, "must be true or false");
853 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
854 rb_sys_fail("mq_setattr");
856 mq->attr.mq_flags = newattr.mq_flags;
858 return nb;
861 /* :nodoc: */
862 static VALUE setnotifythread(VALUE self, VALUE thread)
864 struct posix_mq *mq = get(self, 1);
866 mq->thread = thread;
867 return thread;
870 void Init_posix_mq_ext(void)
872 cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
873 rb_define_alloc_func(cPOSIX_MQ, alloc);
874 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
877 * The maximum number of open message descriptors supported
878 * by the system. This may be -1, in which case it is dynamically
879 * set at runtime. Consult your operating system documentation
880 * for system-specific information about this.
882 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
883 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
886 * The maximum priority that may be specified for POSIX_MQ#send
887 * On POSIX-compliant systems, this is at least 31, but some
888 * systems allow higher limits.
889 * The minimum priority is always zero.
891 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
892 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
894 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
896 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
897 rb_define_method(cPOSIX_MQ, "send", _send, -1);
898 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
899 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
900 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
901 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
902 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
903 rb_define_method(cPOSIX_MQ, "close", _close, 0);
904 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
905 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
906 rb_define_method(cPOSIX_MQ, "name", name, 0);
907 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
908 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
909 rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
910 rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
911 #ifdef MQD_TO_FD
912 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
913 #endif
915 id_new = rb_intern("new");
916 id_kill = rb_intern("kill");
917 id_fileno = rb_intern("fileno");
918 sym_r = ID2SYM(rb_intern("r"));
919 sym_w = ID2SYM(rb_intern("w"));
920 sym_rw = ID2SYM(rb_intern("rw"));