avoid double close() and EBADF
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob73d139b1fbeab91e71d9c1dee1a2673d521cd6c2
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef RB_GC_GUARD
14 # define RB_GC_GUARD(v) (*(volatile VALUE *)&(v))
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
25 #if defined(__linux__)
26 # define MQD_TO_FD(mqd) (int)(mqd)
27 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
28 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
29 #else
30 # define MQ_IO_MARK(mq) ((void)(0))
31 # define MQ_IO_SET(mq,val) ((void)(0))
32 # define MQ_IO_CLOSE(mq) ((void)(0))
33 # define MQ_IO_NILP(mq) ((void)(1))
34 #endif
36 struct posix_mq {
37 mqd_t des;
38 struct mq_attr attr;
39 VALUE name;
40 VALUE thread;
41 #ifdef MQD_TO_FD
42 VALUE io;
43 #endif
46 #ifdef MQD_TO_FD
47 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
48 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
49 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
50 static int MQ_IO_CLOSE(struct posix_mq *mq)
52 if (NIL_P(mq->io))
53 return 0;
55 /* not safe during GC */
56 rb_io_close(mq->io);
57 mq->io = Qnil;
59 return 1;
61 #endif
63 static VALUE cPOSIX_MQ, cAttr;
64 static ID id_new, id_kill, id_fileno;
65 static ID sym_r, sym_w, sym_rw;
66 static const mqd_t MQD_INVALID = (mqd_t)-1;
68 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
69 #ifndef RSTRING_PTR
70 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
71 #endif
72 #ifndef RSTRING_LEN
73 # define RSTRING_LEN(s) (RSTRING(s)->len)
74 #endif
75 #ifndef RSTRUCT_PTR
76 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
77 #endif
78 #ifndef RSTRUCT_LEN
79 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
80 #endif
82 #ifndef HAVE_RB_STR_SET_LEN
83 # ifdef RUBINIUS
84 # define rb_str_set_len(str,len) rb_str_resize(str,len)
85 # else /* 1.8.6 optimized version */
86 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
87 static void rb_18_str_set_len(VALUE str, long len)
89 RSTRING(str)->len = len;
90 RSTRING(str)->ptr[len] = '\0';
92 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
93 # endif /* ! RUBINIUS */
94 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
96 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
97 static VALUE rb_struct_alloc_noinit(VALUE class)
99 return rb_funcall(class, id_new, 0, 0);
101 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
103 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
104 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
105 # include <rubysig.h>
106 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
107 typedef void rb_unblock_function_t(void *);
108 typedef VALUE rb_blocking_function_t(void *);
109 static VALUE
110 rb_thread_blocking_region(
111 rb_blocking_function_t *func, void *data1,
112 rb_unblock_function_t *ubf, void *data2)
114 VALUE rv;
116 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
118 TRAP_BEG;
119 rv = func(data1);
120 TRAP_END;
122 return rv;
124 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
126 /* used to pass arguments to mq_open inside blocking region */
127 struct open_args {
128 int argc;
129 const char *name;
130 int oflags;
131 mode_t mode;
132 struct mq_attr attr;
135 /* used to pass arguments to mq_send/mq_receive inside blocking region */
136 struct rw_args {
137 mqd_t des;
138 char *msg_ptr;
139 size_t msg_len;
140 unsigned msg_prio;
141 struct timespec *timeout;
144 /* hope it's there..., TODO: a better version that works in rbx */
145 struct timeval rb_time_interval(VALUE);
147 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
149 struct timeval tv, now;
151 if (NIL_P(t))
152 return NULL;
154 tv = rb_time_interval(t); /* aggregate return :( */
155 gettimeofday(&now, NULL);
156 dest->tv_sec = now.tv_sec + tv.tv_sec;
157 dest->tv_nsec = (now.tv_usec + tv.tv_usec) * 1000;
159 if (dest->tv_nsec > 1000000000) {
160 dest->tv_nsec -= 1000000000;
161 dest->tv_sec++;
164 return dest;
167 /* (may) run without GVL */
168 static VALUE xopen(void *ptr)
170 struct open_args *x = ptr;
171 mqd_t rv;
173 switch (x->argc) {
174 case 2: rv = mq_open(x->name, x->oflags); break;
175 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
176 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
177 default: rv = MQD_INVALID;
180 return (VALUE)rv;
183 /* runs without GVL */
184 static VALUE xsend(void *ptr)
186 struct rw_args *x = ptr;
188 if (x->timeout)
189 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
190 x->msg_prio, x->timeout);
192 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
195 /* runs without GVL */
196 static VALUE xrecv(void *ptr)
198 struct rw_args *x = ptr;
200 if (x->timeout)
201 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
202 &x->msg_prio, x->timeout);
204 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
207 /* called by GC */
208 static void mark(void *ptr)
210 struct posix_mq *mq = ptr;
212 rb_gc_mark(mq->name);
213 rb_gc_mark(mq->thread);
214 MQ_IO_MARK(mq);
217 /* called by GC */
218 static void _free(void *ptr)
220 struct posix_mq *mq = ptr;
222 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
223 /* we ignore errors when gc-ing */
224 mq_close(mq->des);
225 errno = 0;
227 xfree(ptr);
230 /* automatically called at creation (before initialize) */
231 static VALUE alloc(VALUE klass)
233 struct posix_mq *mq;
234 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
236 mq->des = MQD_INVALID;
237 mq->attr.mq_flags = 0;
238 mq->attr.mq_maxmsg = 0;
239 mq->attr.mq_msgsize = -1;
240 mq->attr.mq_curmsgs = 0;
241 mq->name = Qnil;
242 mq->thread = Qnil;
243 MQ_IO_SET(mq, Qnil);
245 return rv;
248 /* unwraps the posix_mq struct from self */
249 static struct posix_mq *get(VALUE self, int need_valid)
251 struct posix_mq *mq;
253 Data_Get_Struct(self, struct posix_mq, mq);
255 if (need_valid && mq->des == MQD_INVALID)
256 rb_raise(rb_eIOError, "closed queue descriptor");
258 return mq;
261 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
262 static void attr_from_struct(struct mq_attr *attr, VALUE astruct, int all)
264 VALUE *ptr;
266 if (CLASS_OF(astruct) != cAttr) {
267 RB_GC_GUARD(astruct) = rb_inspect(astruct);
268 rb_raise(rb_eArgError, "not a POSIX_MQ::Attr: %s",
269 RSTRING_PTR(astruct));
272 ptr = RSTRUCT_PTR(astruct);
274 attr->mq_flags = NUM2LONG(ptr[0]);
276 if (all || !NIL_P(ptr[1]))
277 attr->mq_maxmsg = NUM2LONG(ptr[1]);
278 if (all || !NIL_P(ptr[2]))
279 attr->mq_msgsize = NUM2LONG(ptr[2]);
280 if (!NIL_P(ptr[3]))
281 attr->mq_curmsgs = NUM2LONG(ptr[3]);
285 * call-seq:
286 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
288 * Opens a POSIX message queue given by +name+. +name+ should start
289 * with a slash ("/") for portable applications.
291 * If a Symbol is given in place of integer +flags+, then:
293 * * +:r+ is equivalent to IO::RDONLY
294 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
295 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
297 * +mode+ is an integer and only used when IO::CREAT is used.
298 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
299 * If +mq_attr+ is not specified when creating a queue, then the
300 * system defaults will be used.
302 * See the manpage for mq_open(3) for more details on this function.
304 static VALUE init(int argc, VALUE *argv, VALUE self)
306 struct posix_mq *mq = get(self, 0);
307 struct open_args x;
308 VALUE name, oflags, mode, attr;
310 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
312 if (TYPE(name) != T_STRING)
313 rb_raise(rb_eArgError, "name must be a string");
315 switch (TYPE(oflags)) {
316 case T_NIL:
317 x.oflags = O_RDONLY;
318 break;
319 case T_SYMBOL:
320 if (oflags == sym_r)
321 x.oflags = O_RDONLY;
322 else if (oflags == sym_w)
323 x.oflags = O_CREAT|O_WRONLY;
324 else if (oflags == sym_rw)
325 x.oflags = O_CREAT|O_RDWR;
326 else {
327 RB_GC_GUARD(oflags) = oflags;
328 rb_raise(rb_eArgError,
329 "symbol must be :r, :w, or :rw: %s",
330 RSTRING_PTR(oflags));
332 break;
333 case T_BIGNUM:
334 case T_FIXNUM:
335 x.oflags = NUM2INT(oflags);
336 break;
337 default:
338 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
341 x.name = RSTRING_PTR(name);
342 x.argc = 2;
344 switch (TYPE(mode)) {
345 case T_FIXNUM:
346 x.argc = 3;
347 x.mode = NUM2UINT(mode);
348 break;
349 case T_NIL:
350 if (x.oflags & O_CREAT) {
351 x.argc = 3;
352 x.mode = 0666;
354 break;
355 default:
356 rb_raise(rb_eArgError, "mode not an integer");
359 switch (TYPE(attr)) {
360 case T_STRUCT:
361 x.argc = 4;
362 attr_from_struct(&x.attr, attr, 1);
364 /* principle of least surprise */
365 if (x.attr.mq_flags & O_NONBLOCK)
366 x.oflags |= O_NONBLOCK;
367 break;
368 case T_NIL:
369 break;
370 default:
371 RB_GC_GUARD(attr) = rb_inspect(attr);
372 rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
373 RSTRING_PTR(attr));
376 mq->des = (mqd_t)xopen(&x);
377 if (mq->des == MQD_INVALID) {
378 if (errno == ENOMEM || errno == EMFILE || errno == ENFILE) {
379 rb_gc();
380 mq->des = (mqd_t)xopen(&x);
382 if (mq->des == MQD_INVALID)
383 rb_sys_fail("mq_open");
386 mq->name = rb_str_dup(name);
387 if (x.oflags & O_NONBLOCK)
388 mq->attr.mq_flags = O_NONBLOCK;
390 return self;
394 * call-seq:
395 * POSIX_MQ.unlink(name) => 1
397 * Unlinks the message queue given by +name+. The queue will be destroyed
398 * when the last process with the queue open closes its queue descriptors.
400 static VALUE s_unlink(VALUE self, VALUE name)
402 mqd_t rv;
404 if (TYPE(name) != T_STRING)
405 rb_raise(rb_eArgError, "argument must be a string");
407 rv = mq_unlink(RSTRING_PTR(name));
408 if (rv == MQD_INVALID)
409 rb_sys_fail("mq_unlink");
411 return INT2NUM(1);
415 * call-seq:
416 * mq.unlink => mq
418 * Unlinks the message queue to prevent other processes from accessing it.
419 * All existing queue descriptors to this queue including those opened by
420 * other processes are unaffected. The queue will only be destroyed
421 * when the last process with open descriptors to this queue closes
422 * the descriptors.
424 static VALUE _unlink(VALUE self)
426 struct posix_mq *mq = get(self, 0);
427 mqd_t rv;
429 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
431 rv = mq_unlink(RSTRING_PTR(mq->name));
432 if (rv == MQD_INVALID)
433 rb_sys_fail("mq_unlink");
435 return self;
438 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
440 buffer = rb_obj_as_string(buffer);
441 x->msg_ptr = RSTRING_PTR(buffer);
442 x->msg_len = (size_t)RSTRING_LEN(buffer);
446 * call-seq:
447 * mq.send(string [,priority[, timeout]]) => nil
449 * Inserts the given +string+ into the message queue with an optional,
450 * unsigned integer +priority+. If the optional +timeout+ is specified,
451 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
452 * before +timeout+ seconds has elapsed. Without +timeout+, this method
453 * may block until the queue is writable.
455 static VALUE _send(int argc, VALUE *argv, VALUE self)
457 struct posix_mq *mq = get(self, 1);
458 struct rw_args x;
459 VALUE buffer, prio, timeout;
460 mqd_t rv;
461 struct timespec expire;
463 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
465 setup_send_buffer(&x, buffer);
466 x.des = mq->des;
467 x.timeout = convert_timeout(&expire, timeout);
468 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
470 if (mq->attr.mq_flags & O_NONBLOCK)
471 rv = (mqd_t)xsend(&x);
472 else
473 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
474 RUBY_UBF_IO, 0);
475 if (rv == MQD_INVALID)
476 rb_sys_fail("mq_send");
478 return Qnil;
482 * call-seq:
483 * mq << string => mq
485 * Inserts the given +string+ into the message queue with a
486 * default priority of 0 and no timeout.
488 static VALUE send0(VALUE self, VALUE buffer)
490 struct posix_mq *mq = get(self, 1);
491 struct rw_args x;
492 mqd_t rv;
494 setup_send_buffer(&x, buffer);
495 x.des = mq->des;
496 x.timeout = NULL;
497 x.msg_prio = 0;
499 if (mq->attr.mq_flags & O_NONBLOCK)
500 rv = (mqd_t)xsend(&x);
501 else
502 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
503 RUBY_UBF_IO, 0);
505 if (rv == MQD_INVALID)
506 rb_sys_fail("mq_send");
508 return self;
511 #ifdef MQD_TO_FD
513 * call-seq:
514 * mq.to_io => IO
516 * Returns an IO.select-able +IO+ object. This method is only available
517 * under Linux and FreeBSD and is not intended to be portable.
519 static VALUE to_io(VALUE self)
521 struct posix_mq *mq = get(self, 1);
522 int fd = MQD_TO_FD(mq->des);
524 if (NIL_P(mq->io))
525 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
527 return mq->io;
529 #endif
531 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
534 * call-seq:
535 * mq.receive([buffer, [timeout]]) => [ message, priority ]
537 * Takes the highest priority message off the queue and returns
538 * an array containing the message as a String and the Integer
539 * priority of the message.
541 * If the optional +buffer+ is present, then it must be a String
542 * which will receive the data.
544 * If the optional +timeout+ is present, then it may be a Float
545 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
546 * will be raised if +timeout+ has elapsed and there are no messages
547 * in the queue.
549 static VALUE receive(int argc, VALUE *argv, VALUE self)
551 return _receive(1, argc, argv, self);
555 * call-seq:
556 * mq.shift([buffer, [timeout]]) => message
558 * Takes the highest priority message off the queue and returns
559 * the message as a String.
561 * If the optional +buffer+ is present, then it must be a String
562 * which will receive the data.
564 * If the optional +timeout+ is present, then it may be a Float
565 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
566 * will be raised if +timeout+ has elapsed and there are no messages
567 * in the queue.
569 static VALUE shift(int argc, VALUE *argv, VALUE self)
571 return _receive(0, argc, argv, self);
574 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
576 struct posix_mq *mq = get(self, 1);
577 struct rw_args x;
578 VALUE buffer, timeout;
579 ssize_t r;
580 struct timespec expire;
582 if (mq->attr.mq_msgsize < 0) {
583 if (mq_getattr(mq->des, &mq->attr) < 0)
584 rb_sys_fail("mq_getattr");
587 rb_scan_args(argc, argv, "02", &buffer, &timeout);
588 x.timeout = convert_timeout(&expire, timeout);
590 if (NIL_P(buffer)) {
591 buffer = rb_str_new(0, mq->attr.mq_msgsize);
592 } else {
593 StringValue(buffer);
594 rb_str_modify(buffer);
595 rb_str_resize(buffer, mq->attr.mq_msgsize);
597 OBJ_TAINT(buffer);
598 x.msg_ptr = RSTRING_PTR(buffer);
599 x.msg_len = (size_t)mq->attr.mq_msgsize;
600 x.des = mq->des;
602 if (mq->attr.mq_flags & O_NONBLOCK) {
603 r = (ssize_t)xrecv(&x);
604 } else {
605 r = (ssize_t)rb_thread_blocking_region(xrecv, &x,
606 RUBY_UBF_IO, 0);
608 if (r < 0)
609 rb_sys_fail("mq_receive");
611 rb_str_set_len(buffer, r);
613 if (wantarray)
614 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
615 return buffer;
619 * call-seq:
620 * mq.attr => mq_attr
622 * Returns a POSIX_MQ::Attr struct containing the attributes
623 * of the message queue. See the mq_getattr(3) manpage for
624 * more details.
626 static VALUE getattr(VALUE self)
628 struct posix_mq *mq = get(self, 1);
629 VALUE astruct;
630 VALUE *ptr;
632 if (mq_getattr(mq->des, &mq->attr) < 0)
633 rb_sys_fail("mq_getattr");
635 astruct = rb_struct_alloc_noinit(cAttr);
636 ptr = RSTRUCT_PTR(astruct);
637 ptr[0] = LONG2NUM(mq->attr.mq_flags);
638 ptr[1] = LONG2NUM(mq->attr.mq_maxmsg);
639 ptr[2] = LONG2NUM(mq->attr.mq_msgsize);
640 ptr[3] = LONG2NUM(mq->attr.mq_curmsgs);
642 return astruct;
646 * call-seq:
647 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
649 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
650 * See the mq_setattr(3) manpage for more details.
652 * Consider using the POSIX_MQ#nonblock= method as it is easier and
653 * more natural to use.
655 static VALUE setattr(VALUE self, VALUE astruct)
657 struct posix_mq *mq = get(self, 1);
658 struct mq_attr newattr;
660 attr_from_struct(&newattr, astruct, 0);
662 if (mq_setattr(mq->des, &newattr, NULL) < 0)
663 rb_sys_fail("mq_setattr");
665 return astruct;
669 * call-seq:
670 * mq.close => nil
672 * Closes the underlying message queue descriptor.
673 * If this descriptor had a registered notification request, the request
674 * will be removed so another descriptor or process may register a
675 * notification request. Message queue descriptors are automatically
676 * closed by garbage collection.
678 static VALUE _close(VALUE self)
680 struct posix_mq *mq = get(self, 1);
682 if (! MQ_IO_CLOSE(mq)) {
683 if (mq_close(mq->des) == -1)
684 rb_sys_fail("mq_close");
686 mq->des = MQD_INVALID;
688 return Qnil;
692 * call-seq:
693 * mq.closed? => true or false
695 * Returns +true+ if the message queue descriptor is closed and therefore
696 * unusable, otherwise +false+
698 static VALUE closed(VALUE self)
700 struct posix_mq *mq = get(self, 0);
702 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
706 * call-seq:
707 * mq.name => string
709 * Returns the string name of message queue associated with +mq+
711 static VALUE name(VALUE self)
713 struct posix_mq *mq = get(self, 0);
715 return rb_str_dup(mq->name);
718 static int lookup_sig(VALUE sig)
720 static VALUE list;
721 const char *ptr;
722 long len;
724 sig = rb_obj_as_string(sig);
725 len = RSTRING_LEN(sig);
726 ptr = RSTRING_PTR(sig);
728 if (len > 3 && !memcmp("SIG", ptr, 3))
729 sig = rb_str_new(ptr + 3, len - 3);
731 if (!list) {
732 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
734 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
735 rb_global_variable(&list);
738 sig = rb_hash_aref(list, sig);
739 if (NIL_P(sig))
740 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
742 return NUM2INT(sig);
746 * TODO: Under Linux, we could just use netlink directly
747 * the same way glibc does...
749 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
750 static void thread_notify_fd(union sigval sv)
752 int fd = sv.sival_int;
754 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
757 static void my_mq_notify(mqd_t des, struct sigevent *not)
759 mqd_t rv = mq_notify(des, not);
761 if (rv == MQD_INVALID) {
762 if (errno == ENOMEM) {
763 rb_gc();
764 rv = mq_notify(des, not);
766 if (rv == MQD_INVALID)
767 rb_sys_fail("mq_notify");
771 /* :nodoc: */
772 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
774 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
775 struct posix_mq *mq = get(self, 1);
776 struct sigevent not;
777 pthread_attr_t attr;
779 errno = pthread_attr_init(&attr);
780 if (errno) rb_sys_fail("pthread_attr_init");
782 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
783 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
785 #ifdef PTHREAD_STACK_MIN
786 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
787 #endif
789 not.sigev_notify = SIGEV_THREAD;
790 not.sigev_notify_function = thread_notify_fd;
791 not.sigev_notify_attributes = &attr;
792 not.sigev_value.sival_int = fd;
794 if (!NIL_P(mq->thread))
795 rb_funcall(mq->thread, id_kill, 0, 0);
796 mq->thread = thr;
798 my_mq_notify(mq->des, &not);
800 return thr;
803 /* :nodoc: */
804 static VALUE notify_cleanup(VALUE self)
806 struct posix_mq *mq = get(self, 1);
808 if (!NIL_P(mq->thread)) {
809 rb_funcall(mq->thread, id_kill, 0, 0);
810 mq->thread = Qnil;
812 return Qnil;
816 * call-seq:
817 * mq.notify = signal => signal
819 * Registers the notification request to deliver a given +signal+
820 * to the current process when message is received.
821 * If +signal+ is +nil+, it will unregister and disable the notification
822 * request to allow other processes to register a request.
823 * If +signal+ is +false+, it will register a no-op notification request
824 * which will prevent other processes from registering a notification.
825 * If +signal+ is an +IO+ object, it will spawn a thread upon the
826 * arrival of the next message and write one "\\0" byte to the file
827 * descriptor belonging to that IO object.
828 * Only one process may have a notification request for a queue
829 * at a time, Errno::EBUSY will be raised if there is already
830 * a notification request registration for the queue.
832 * Notifications are only fired once and processes must reregister
833 * for subsequent notifications.
835 * For readers of the mq_notify(3) manpage, passing +false+
836 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
837 * of passing a NULL notification pointer to mq_notify(3).
839 static VALUE setnotify(VALUE self, VALUE arg)
841 struct posix_mq *mq = get(self, 1);
842 struct sigevent not;
843 struct sigevent * notification = &not;
844 VALUE rv = arg;
846 notify_cleanup(self);
847 not.sigev_notify = SIGEV_SIGNAL;
849 switch (TYPE(arg)) {
850 case T_FALSE:
851 not.sigev_notify = SIGEV_NONE;
852 break;
853 case T_NIL:
854 notification = NULL;
855 break;
856 case T_FIXNUM:
857 not.sigev_signo = NUM2INT(arg);
858 break;
859 case T_SYMBOL:
860 case T_STRING:
861 not.sigev_signo = lookup_sig(arg);
862 rv = INT2NUM(not.sigev_signo);
863 break;
864 default:
865 rb_raise(rb_eArgError, "must be a signal or nil");
868 my_mq_notify(mq->des, notification);
870 return rv;
874 * call-seq:
875 * mq.nonblock? => true or false
877 * Returns the current non-blocking state of the message queue descriptor.
879 static VALUE getnonblock(VALUE self)
881 struct posix_mq *mq = get(self, 1);
883 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
887 * call-seq:
888 * mq.nonblock = boolean => boolean
890 * Enables or disables non-blocking operation for the message queue
891 * descriptor. Errno::EAGAIN will be raised in situations where
892 * the queue would block. This is not compatible with +timeout+
893 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
895 static VALUE setnonblock(VALUE self, VALUE nb)
897 struct mq_attr newattr;
898 struct posix_mq *mq = get(self, 1);
900 if (nb == Qtrue)
901 newattr.mq_flags = O_NONBLOCK;
902 else if (nb == Qfalse)
903 newattr.mq_flags = 0;
904 else
905 rb_raise(rb_eArgError, "must be true or false");
907 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
908 rb_sys_fail("mq_setattr");
910 mq->attr.mq_flags = newattr.mq_flags;
912 return nb;
915 void Init_posix_mq_ext(void)
917 cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
918 rb_define_alloc_func(cPOSIX_MQ, alloc);
919 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
922 * The maximum number of open message descriptors supported
923 * by the system. This may be -1, in which case it is dynamically
924 * set at runtime. Consult your operating system documentation
925 * for system-specific information about this.
927 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
928 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
931 * The maximum priority that may be specified for POSIX_MQ#send
932 * On POSIX-compliant systems, this is at least 31, but some
933 * systems allow higher limits.
934 * The minimum priority is always zero.
936 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
937 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
939 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
941 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
942 rb_define_method(cPOSIX_MQ, "send", _send, -1);
943 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
944 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
945 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
946 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
947 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
948 rb_define_method(cPOSIX_MQ, "close", _close, 0);
949 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
950 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
951 rb_define_method(cPOSIX_MQ, "name", name, 0);
952 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
953 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
954 rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
955 rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
956 rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
957 #ifdef MQD_TO_FD
958 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
959 #endif
961 id_new = rb_intern("new");
962 id_kill = rb_intern("kill");
963 id_fileno = rb_intern("fileno");
964 sym_r = ID2SYM(rb_intern("r"));
965 sym_w = ID2SYM(rb_intern("w"));
966 sym_rw = ID2SYM(rb_intern("rw"));