open/notify: invoke GC if needed
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob04c1ea0524e2f5f411c3041766c2e2dc0911e82b
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef RB_GC_GUARD
14 # define RB_GC_GUARD(v) (*(volatile VALUE *)&(v))
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
25 #if defined(__linux__)
26 # define MQD_TO_FD(mqd) (int)(mqd)
27 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
28 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
29 #else
30 # define MQ_IO_MARK(mq) ((void)(0))
31 # define MQ_IO_SET(mq,val) ((void)(0))
32 #endif
34 #ifdef MQD_TO_FD
35 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
36 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
37 #endif
39 struct posix_mq {
40 mqd_t des;
41 struct mq_attr attr;
42 VALUE name;
43 VALUE thread;
44 #ifdef MQD_TO_FD
45 VALUE io;
46 #endif
49 static VALUE cPOSIX_MQ, cAttr;
50 static ID id_new, id_kill, id_fileno;
51 static ID sym_r, sym_w, sym_rw;
52 static const mqd_t MQD_INVALID = (mqd_t)-1;
54 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
55 #ifndef RSTRING_PTR
56 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
57 #endif
58 #ifndef RSTRING_LEN
59 # define RSTRING_LEN(s) (RSTRING(s)->len)
60 #endif
61 #ifndef RSTRUCT_PTR
62 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
63 #endif
64 #ifndef RSTRUCT_LEN
65 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
66 #endif
68 #ifndef HAVE_RB_STR_SET_LEN
69 # ifdef RUBINIUS
70 # define rb_str_set_len(str,len) rb_str_resize(str,len)
71 # else /* 1.8.6 optimized version */
72 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
73 static void rb_18_str_set_len(VALUE str, long len)
75 RSTRING(str)->len = len;
76 RSTRING(str)->ptr[len] = '\0';
78 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
79 # endif /* ! RUBINIUS */
80 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
82 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
83 static VALUE rb_struct_alloc_noinit(VALUE class)
85 return rb_funcall(class, id_new, 0, 0);
87 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
89 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
90 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
91 # include <rubysig.h>
92 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
93 typedef void rb_unblock_function_t(void *);
94 typedef VALUE rb_blocking_function_t(void *);
95 static VALUE
96 rb_thread_blocking_region(
97 rb_blocking_function_t *func, void *data1,
98 rb_unblock_function_t *ubf, void *data2)
100 VALUE rv;
102 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
104 TRAP_BEG;
105 rv = func(data1);
106 TRAP_END;
108 return rv;
110 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
112 /* used to pass arguments to mq_open inside blocking region */
113 struct open_args {
114 int argc;
115 const char *name;
116 int oflags;
117 mode_t mode;
118 struct mq_attr attr;
121 /* used to pass arguments to mq_send/mq_receive inside blocking region */
122 struct rw_args {
123 mqd_t des;
124 char *msg_ptr;
125 size_t msg_len;
126 unsigned msg_prio;
127 struct timespec *timeout;
130 /* hope it's there..., TODO: a better version that works in rbx */
131 struct timeval rb_time_interval(VALUE);
133 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
135 struct timeval tv, now;
137 if (NIL_P(t))
138 return NULL;
140 tv = rb_time_interval(t); /* aggregate return :( */
141 gettimeofday(&now, NULL);
142 dest->tv_sec = now.tv_sec + tv.tv_sec;
143 dest->tv_nsec = (now.tv_usec + tv.tv_usec) * 1000;
145 if (dest->tv_nsec > 1000000000) {
146 dest->tv_nsec -= 1000000000;
147 dest->tv_sec++;
150 return dest;
153 /* (may) run without GVL */
154 static VALUE xopen(void *ptr)
156 struct open_args *x = ptr;
157 mqd_t rv;
159 switch (x->argc) {
160 case 2: rv = mq_open(x->name, x->oflags); break;
161 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
162 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
163 default: rv = MQD_INVALID;
166 return (VALUE)rv;
169 /* runs without GVL */
170 static VALUE xsend(void *ptr)
172 struct rw_args *x = ptr;
174 if (x->timeout)
175 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
176 x->msg_prio, x->timeout);
178 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
181 /* runs without GVL */
182 static VALUE xrecv(void *ptr)
184 struct rw_args *x = ptr;
186 if (x->timeout)
187 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
188 &x->msg_prio, x->timeout);
190 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
193 /* called by GC */
194 static void mark(void *ptr)
196 struct posix_mq *mq = ptr;
198 rb_gc_mark(mq->name);
199 rb_gc_mark(mq->thread);
200 MQ_IO_MARK(mq);
203 /* called by GC */
204 static void _free(void *ptr)
206 struct posix_mq *mq = ptr;
208 if (mq->des != MQD_INVALID) {
209 /* we ignore errors when gc-ing */
210 int saved_errno = errno;
212 mq_close(mq->des);
213 errno = saved_errno;
215 xfree(ptr);
218 /* automatically called at creation (before initialize) */
219 static VALUE alloc(VALUE klass)
221 struct posix_mq *mq;
222 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
224 mq->des = MQD_INVALID;
225 mq->attr.mq_flags = 0;
226 mq->attr.mq_maxmsg = 0;
227 mq->attr.mq_msgsize = -1;
228 mq->attr.mq_curmsgs = 0;
229 mq->name = Qnil;
230 mq->thread = Qnil;
231 MQ_IO_SET(mq, Qnil);
233 return rv;
236 /* unwraps the posix_mq struct from self */
237 static struct posix_mq *get(VALUE self, int need_valid)
239 struct posix_mq *mq;
241 Data_Get_Struct(self, struct posix_mq, mq);
243 if (need_valid && mq->des == MQD_INVALID)
244 rb_raise(rb_eIOError, "closed queue descriptor");
246 return mq;
249 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
250 static void attr_from_struct(struct mq_attr *attr, VALUE astruct, int all)
252 VALUE *ptr;
254 if (CLASS_OF(astruct) != cAttr) {
255 RB_GC_GUARD(astruct) = rb_inspect(astruct);
256 rb_raise(rb_eArgError, "not a POSIX_MQ::Attr: %s",
257 RSTRING_PTR(astruct));
260 ptr = RSTRUCT_PTR(astruct);
262 attr->mq_flags = NUM2LONG(ptr[0]);
264 if (all || !NIL_P(ptr[1]))
265 attr->mq_maxmsg = NUM2LONG(ptr[1]);
266 if (all || !NIL_P(ptr[2]))
267 attr->mq_msgsize = NUM2LONG(ptr[2]);
268 if (!NIL_P(ptr[3]))
269 attr->mq_curmsgs = NUM2LONG(ptr[3]);
273 * call-seq:
274 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
276 * Opens a POSIX message queue given by +name+. +name+ should start
277 * with a slash ("/") for portable applications.
279 * If a Symbol is given in place of integer +flags+, then:
281 * * +:r+ is equivalent to IO::RDONLY
282 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
283 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
285 * +mode+ is an integer and only used when IO::CREAT is used.
286 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
287 * If +mq_attr+ is not specified when creating a queue, then the
288 * system defaults will be used.
290 * See the manpage for mq_open(3) for more details on this function.
292 static VALUE init(int argc, VALUE *argv, VALUE self)
294 struct posix_mq *mq = get(self, 0);
295 struct open_args x;
296 VALUE name, oflags, mode, attr;
298 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
300 if (TYPE(name) != T_STRING)
301 rb_raise(rb_eArgError, "name must be a string");
303 switch (TYPE(oflags)) {
304 case T_NIL:
305 x.oflags = O_RDONLY;
306 break;
307 case T_SYMBOL:
308 if (oflags == sym_r)
309 x.oflags = O_RDONLY;
310 else if (oflags == sym_w)
311 x.oflags = O_CREAT|O_WRONLY;
312 else if (oflags == sym_rw)
313 x.oflags = O_CREAT|O_RDWR;
314 else {
315 RB_GC_GUARD(oflags) = oflags;
316 rb_raise(rb_eArgError,
317 "symbol must be :r, :w, or :rw: %s",
318 RSTRING_PTR(oflags));
320 break;
321 case T_BIGNUM:
322 case T_FIXNUM:
323 x.oflags = NUM2INT(oflags);
324 break;
325 default:
326 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
329 x.name = RSTRING_PTR(name);
330 x.argc = 2;
332 switch (TYPE(mode)) {
333 case T_FIXNUM:
334 x.argc = 3;
335 x.mode = NUM2UINT(mode);
336 break;
337 case T_NIL:
338 if (x.oflags & O_CREAT) {
339 x.argc = 3;
340 x.mode = 0666;
342 break;
343 default:
344 rb_raise(rb_eArgError, "mode not an integer");
347 switch (TYPE(attr)) {
348 case T_STRUCT:
349 x.argc = 4;
350 attr_from_struct(&x.attr, attr, 1);
352 /* principle of least surprise */
353 if (x.attr.mq_flags & O_NONBLOCK)
354 x.oflags |= O_NONBLOCK;
355 break;
356 case T_NIL:
357 break;
358 default:
359 RB_GC_GUARD(attr) = rb_inspect(attr);
360 rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
361 RSTRING_PTR(attr));
364 mq->des = (mqd_t)xopen(&x);
365 if (mq->des == MQD_INVALID) {
366 if (errno == ENOMEM || errno == EMFILE || errno == ENFILE) {
367 rb_gc();
368 mq->des = (mqd_t)xopen(&x);
370 if (mq->des == MQD_INVALID)
371 rb_sys_fail("mq_open");
374 mq->name = rb_str_dup(name);
375 if (x.oflags & O_NONBLOCK)
376 mq->attr.mq_flags = O_NONBLOCK;
378 return self;
382 * call-seq:
383 * POSIX_MQ.unlink(name) => 1
385 * Unlinks the message queue given by +name+. The queue will be destroyed
386 * when the last process with the queue open closes its queue descriptors.
388 static VALUE s_unlink(VALUE self, VALUE name)
390 mqd_t rv;
392 if (TYPE(name) != T_STRING)
393 rb_raise(rb_eArgError, "argument must be a string");
395 rv = mq_unlink(RSTRING_PTR(name));
396 if (rv == MQD_INVALID)
397 rb_sys_fail("mq_unlink");
399 return INT2NUM(1);
403 * call-seq:
404 * mq.unlink => mq
406 * Unlinks the message queue to prevent other processes from accessing it.
407 * All existing queue descriptors to this queue including those opened by
408 * other processes are unaffected. The queue will only be destroyed
409 * when the last process with open descriptors to this queue closes
410 * the descriptors.
412 static VALUE _unlink(VALUE self)
414 struct posix_mq *mq = get(self, 0);
415 mqd_t rv;
417 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
419 rv = mq_unlink(RSTRING_PTR(mq->name));
420 if (rv == MQD_INVALID)
421 rb_sys_fail("mq_unlink");
423 return self;
426 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
428 buffer = rb_obj_as_string(buffer);
429 x->msg_ptr = RSTRING_PTR(buffer);
430 x->msg_len = (size_t)RSTRING_LEN(buffer);
434 * call-seq:
435 * mq.send(string [,priority[, timeout]]) => nil
437 * Inserts the given +string+ into the message queue with an optional,
438 * unsigned integer +priority+. If the optional +timeout+ is specified,
439 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
440 * before +timeout+ seconds has elapsed. Without +timeout+, this method
441 * may block until the queue is writable.
443 static VALUE _send(int argc, VALUE *argv, VALUE self)
445 struct posix_mq *mq = get(self, 1);
446 struct rw_args x;
447 VALUE buffer, prio, timeout;
448 mqd_t rv;
449 struct timespec expire;
451 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
453 setup_send_buffer(&x, buffer);
454 x.des = mq->des;
455 x.timeout = convert_timeout(&expire, timeout);
456 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
458 if (mq->attr.mq_flags & O_NONBLOCK)
459 rv = (mqd_t)xsend(&x);
460 else
461 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
462 RUBY_UBF_IO, 0);
463 if (rv == MQD_INVALID)
464 rb_sys_fail("mq_send");
466 return Qnil;
470 * call-seq:
471 * mq << string => mq
473 * Inserts the given +string+ into the message queue with a
474 * default priority of 0 and no timeout.
476 static VALUE send0(VALUE self, VALUE buffer)
478 struct posix_mq *mq = get(self, 1);
479 struct rw_args x;
480 mqd_t rv;
482 setup_send_buffer(&x, buffer);
483 x.des = mq->des;
484 x.timeout = NULL;
485 x.msg_prio = 0;
487 if (mq->attr.mq_flags & O_NONBLOCK)
488 rv = (mqd_t)xsend(&x);
489 else
490 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
491 RUBY_UBF_IO, 0);
493 if (rv == MQD_INVALID)
494 rb_sys_fail("mq_send");
496 return self;
499 #ifdef MQD_TO_FD
501 * call-seq:
502 * mq.to_io => IO
504 * Returns an IO.select-able +IO+ object. This method is only available
505 * under Linux and FreeBSD and is not intended to be portable.
507 static VALUE to_io(VALUE self)
509 struct posix_mq *mq = get(self, 1);
510 int fd = MQD_TO_FD(mq->des);
512 if (NIL_P(mq->io))
513 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
515 return mq->io;
517 #endif
519 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
522 * call-seq:
523 * mq.receive([buffer, [timeout]]) => [ message, priority ]
525 * Takes the highest priority message off the queue and returns
526 * an array containing the message as a String and the Integer
527 * priority of the message.
529 * If the optional +buffer+ is present, then it must be a String
530 * which will receive the data.
532 * If the optional +timeout+ is present, then it may be a Float
533 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
534 * will be raised if +timeout+ has elapsed and there are no messages
535 * in the queue.
537 static VALUE receive(int argc, VALUE *argv, VALUE self)
539 return _receive(1, argc, argv, self);
543 * call-seq:
544 * mq.shift([buffer, [timeout]]) => message
546 * Takes the highest priority message off the queue and returns
547 * the message as a String.
549 * If the optional +buffer+ is present, then it must be a String
550 * which will receive the data.
552 * If the optional +timeout+ is present, then it may be a Float
553 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
554 * will be raised if +timeout+ has elapsed and there are no messages
555 * in the queue.
557 static VALUE shift(int argc, VALUE *argv, VALUE self)
559 return _receive(0, argc, argv, self);
562 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
564 struct posix_mq *mq = get(self, 1);
565 struct rw_args x;
566 VALUE buffer, timeout;
567 ssize_t r;
568 struct timespec expire;
570 if (mq->attr.mq_msgsize < 0) {
571 if (mq_getattr(mq->des, &mq->attr) < 0)
572 rb_sys_fail("mq_getattr");
575 rb_scan_args(argc, argv, "02", &buffer, &timeout);
576 x.timeout = convert_timeout(&expire, timeout);
578 if (NIL_P(buffer)) {
579 buffer = rb_str_new(0, mq->attr.mq_msgsize);
580 } else {
581 StringValue(buffer);
582 rb_str_modify(buffer);
583 rb_str_resize(buffer, mq->attr.mq_msgsize);
585 OBJ_TAINT(buffer);
586 x.msg_ptr = RSTRING_PTR(buffer);
587 x.msg_len = (size_t)mq->attr.mq_msgsize;
588 x.des = mq->des;
590 if (mq->attr.mq_flags & O_NONBLOCK) {
591 r = (ssize_t)xrecv(&x);
592 } else {
593 r = (ssize_t)rb_thread_blocking_region(xrecv, &x,
594 RUBY_UBF_IO, 0);
596 if (r < 0)
597 rb_sys_fail("mq_receive");
599 rb_str_set_len(buffer, r);
601 if (wantarray)
602 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
603 return buffer;
607 * call-seq:
608 * mq.attr => mq_attr
610 * Returns a POSIX_MQ::Attr struct containing the attributes
611 * of the message queue. See the mq_getattr(3) manpage for
612 * more details.
614 static VALUE getattr(VALUE self)
616 struct posix_mq *mq = get(self, 1);
617 VALUE astruct;
618 VALUE *ptr;
620 if (mq_getattr(mq->des, &mq->attr) < 0)
621 rb_sys_fail("mq_getattr");
623 astruct = rb_struct_alloc_noinit(cAttr);
624 ptr = RSTRUCT_PTR(astruct);
625 ptr[0] = LONG2NUM(mq->attr.mq_flags);
626 ptr[1] = LONG2NUM(mq->attr.mq_maxmsg);
627 ptr[2] = LONG2NUM(mq->attr.mq_msgsize);
628 ptr[3] = LONG2NUM(mq->attr.mq_curmsgs);
630 return astruct;
634 * call-seq:
635 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
637 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
638 * See the mq_setattr(3) manpage for more details.
640 * Consider using the POSIX_MQ#nonblock= method as it is easier and
641 * more natural to use.
643 static VALUE setattr(VALUE self, VALUE astruct)
645 struct posix_mq *mq = get(self, 1);
646 struct mq_attr newattr;
648 attr_from_struct(&newattr, astruct, 0);
650 if (mq_setattr(mq->des, &newattr, NULL) < 0)
651 rb_sys_fail("mq_setattr");
653 return astruct;
657 * call-seq:
658 * mq.close => nil
660 * Closes the underlying message queue descriptor.
661 * If this descriptor had a registered notification request, the request
662 * will be removed so another descriptor or process may register a
663 * notification request. Message queue descriptors are automatically
664 * closed by garbage collection.
666 static VALUE _close(VALUE self)
668 struct posix_mq *mq = get(self, 1);
670 if (mq_close(mq->des) < 0)
671 rb_sys_fail("mq_close");
673 mq->des = MQD_INVALID;
674 MQ_IO_SET(mq, Qnil);
676 return Qnil;
680 * call-seq:
681 * mq.closed? => true or false
683 * Returns +true+ if the message queue descriptor is closed and therefore
684 * unusable, otherwise +false+
686 static VALUE closed(VALUE self)
688 struct posix_mq *mq = get(self, 0);
690 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
694 * call-seq:
695 * mq.name => string
697 * Returns the string name of message queue associated with +mq+
699 static VALUE name(VALUE self)
701 struct posix_mq *mq = get(self, 0);
703 return rb_str_dup(mq->name);
706 static int lookup_sig(VALUE sig)
708 static VALUE list;
709 const char *ptr;
710 long len;
712 sig = rb_obj_as_string(sig);
713 len = RSTRING_LEN(sig);
714 ptr = RSTRING_PTR(sig);
716 if (len > 3 && !memcmp("SIG", ptr, 3))
717 sig = rb_str_new(ptr + 3, len - 3);
719 if (!list) {
720 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
722 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
723 rb_global_variable(&list);
726 sig = rb_hash_aref(list, sig);
727 if (NIL_P(sig))
728 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
730 return NUM2INT(sig);
734 * TODO: Under Linux, we could just use netlink directly
735 * the same way glibc does...
737 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
738 static void thread_notify_fd(union sigval sv)
740 int fd = sv.sival_int;
742 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
745 static void my_mq_notify(mqd_t des, struct sigevent *not)
747 mqd_t rv = mq_notify(des, not);
749 if (rv == MQD_INVALID) {
750 if (errno == ENOMEM) {
751 rb_gc();
752 rv = mq_notify(des, not);
754 if (rv == MQD_INVALID)
755 rb_sys_fail("mq_notify");
759 /* :nodoc: */
760 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
762 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
763 struct posix_mq *mq = get(self, 1);
764 struct sigevent not;
765 pthread_attr_t attr;
767 errno = pthread_attr_init(&attr);
768 if (errno) rb_sys_fail("pthread_attr_init");
770 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
771 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
773 #ifdef PTHREAD_STACK_MIN
774 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
775 #endif
777 not.sigev_notify = SIGEV_THREAD;
778 not.sigev_notify_function = thread_notify_fd;
779 not.sigev_notify_attributes = &attr;
780 not.sigev_value.sival_int = fd;
782 if (!NIL_P(mq->thread))
783 rb_funcall(mq->thread, id_kill, 0, 0);
784 mq->thread = thr;
786 my_mq_notify(mq->des, &not);
788 return thr;
791 /* :nodoc: */
792 static VALUE notify_cleanup(VALUE self)
794 struct posix_mq *mq = get(self, 1);
796 if (!NIL_P(mq->thread)) {
797 rb_funcall(mq->thread, id_kill, 0, 0);
798 mq->thread = Qnil;
800 return Qnil;
804 * call-seq:
805 * mq.notify = signal => signal
807 * Registers the notification request to deliver a given +signal+
808 * to the current process when message is received.
809 * If +signal+ is +nil+, it will unregister and disable the notification
810 * request to allow other processes to register a request.
811 * If +signal+ is +false+, it will register a no-op notification request
812 * which will prevent other processes from registering a notification.
813 * If +signal+ is an +IO+ object, it will spawn a thread upon the
814 * arrival of the next message and write one "\\0" byte to the file
815 * descriptor belonging to that IO object.
816 * Only one process may have a notification request for a queue
817 * at a time, Errno::EBUSY will be raised if there is already
818 * a notification request registration for the queue.
820 * Notifications are only fired once and processes must reregister
821 * for subsequent notifications.
823 * For readers of the mq_notify(3) manpage, passing +false+
824 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
825 * of passing a NULL notification pointer to mq_notify(3).
827 static VALUE setnotify(VALUE self, VALUE arg)
829 struct posix_mq *mq = get(self, 1);
830 struct sigevent not;
831 struct sigevent * notification = &not;
832 VALUE rv = arg;
834 notify_cleanup(self);
835 not.sigev_notify = SIGEV_SIGNAL;
837 switch (TYPE(arg)) {
838 case T_FALSE:
839 not.sigev_notify = SIGEV_NONE;
840 break;
841 case T_NIL:
842 notification = NULL;
843 break;
844 case T_FIXNUM:
845 not.sigev_signo = NUM2INT(arg);
846 break;
847 case T_SYMBOL:
848 case T_STRING:
849 not.sigev_signo = lookup_sig(arg);
850 rv = INT2NUM(not.sigev_signo);
851 break;
852 default:
853 rb_raise(rb_eArgError, "must be a signal or nil");
856 my_mq_notify(mq->des, notification);
858 return rv;
862 * call-seq:
863 * mq.nonblock? => true or false
865 * Returns the current non-blocking state of the message queue descriptor.
867 static VALUE getnonblock(VALUE self)
869 struct posix_mq *mq = get(self, 1);
871 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
875 * call-seq:
876 * mq.nonblock = boolean => boolean
878 * Enables or disables non-blocking operation for the message queue
879 * descriptor. Errno::EAGAIN will be raised in situations where
880 * the queue would block. This is not compatible with +timeout+
881 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
883 static VALUE setnonblock(VALUE self, VALUE nb)
885 struct mq_attr newattr;
886 struct posix_mq *mq = get(self, 1);
888 if (nb == Qtrue)
889 newattr.mq_flags = O_NONBLOCK;
890 else if (nb == Qfalse)
891 newattr.mq_flags = 0;
892 else
893 rb_raise(rb_eArgError, "must be true or false");
895 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
896 rb_sys_fail("mq_setattr");
898 mq->attr.mq_flags = newattr.mq_flags;
900 return nb;
903 void Init_posix_mq_ext(void)
905 cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
906 rb_define_alloc_func(cPOSIX_MQ, alloc);
907 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
910 * The maximum number of open message descriptors supported
911 * by the system. This may be -1, in which case it is dynamically
912 * set at runtime. Consult your operating system documentation
913 * for system-specific information about this.
915 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
916 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
919 * The maximum priority that may be specified for POSIX_MQ#send
920 * On POSIX-compliant systems, this is at least 31, but some
921 * systems allow higher limits.
922 * The minimum priority is always zero.
924 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
925 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
927 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
929 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
930 rb_define_method(cPOSIX_MQ, "send", _send, -1);
931 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
932 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
933 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
934 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
935 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
936 rb_define_method(cPOSIX_MQ, "close", _close, 0);
937 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
938 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
939 rb_define_method(cPOSIX_MQ, "name", name, 0);
940 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
941 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
942 rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
943 rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
944 rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
945 #ifdef MQD_TO_FD
946 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
947 #endif
949 id_new = rb_intern("new");
950 id_kill = rb_intern("kill");
951 id_fileno = rb_intern("fileno");
952 sym_r = ID2SYM(rb_intern("r"));
953 sym_w = ID2SYM(rb_intern("w"));
954 sym_rw = ID2SYM(rb_intern("rw"));