avoid deprecated rb_thread_blocking_region in Ruby 2.0/2.1
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blobcbc32b96eef80dad749d04358bbe8bdd8f26b581
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <float.h>
25 #include <math.h>
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
30 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
31 #else
32 # define MQ_IO_MARK(mq) ((void)(0))
33 # define MQ_IO_SET(mq,val) ((void)(0))
34 # define MQ_IO_CLOSE(mq) ((int)(0))
35 # define MQ_IO_NIL_P(mq) ((int)(1))
36 #endif
38 struct posix_mq {
39 mqd_t des;
40 struct mq_attr attr;
41 VALUE name;
42 VALUE thread;
43 #ifdef MQD_TO_FD
44 VALUE io;
45 #endif
48 #ifdef MQD_TO_FD
49 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
50 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
51 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
52 static int MQ_IO_CLOSE(struct posix_mq *mq)
54 if (NIL_P(mq->io))
55 return 0;
57 /* not safe during GC */
58 rb_io_close(mq->io);
59 mq->io = Qnil;
61 return 1;
63 #endif
65 # define PMQ_WANTARRAY (1<<0)
66 # define PMQ_TRY (1<<1)
68 static VALUE cAttr;
69 static ID id_new, id_kill, id_fileno, id_divmod;
70 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
71 static VALUE sym_r, sym_w, sym_rw;
72 static const mqd_t MQD_INVALID = (mqd_t)-1;
74 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
75 #ifndef RSTRING_PTR
76 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
77 #endif
78 #ifndef RSTRING_LEN
79 # define RSTRING_LEN(s) (RSTRING(s)->len)
80 #endif
81 #ifndef RFLOAT_VALUE
82 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
83 #endif
85 #ifndef HAVE_RB_STR_SET_LEN
86 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
87 # ifdef RUBINIUS
88 # error upgrade Rubinius, rb_str_set_len should be available
89 # endif
90 static void rb_18_str_set_len(VALUE str, long len)
92 RSTRING(str)->len = len;
93 RSTRING(str)->ptr[len] = '\0';
95 #define rb_str_set_len rb_18_str_set_len
96 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
98 #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
99 /* Ruby 2.0+ */
100 # include <ruby/thread.h>
101 # define WITHOUT_GVL(fn,a,ubf,b) \
102 rb_thread_call_without_gvl((fn),(a),(ubf),(b))
103 #elif defined(HAVE_RB_THREAD_BLOCKING_REGION)
104 typedef VALUE (*my_blocking_fn_t)(void*);
105 # define WITHOUT_GVL(fn,a,ubf,b) \
106 rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
108 #else /* Ruby 1.8 */
109 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
110 # include <rubysig.h>
111 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
112 typedef void rb_unblock_function_t(void *);
113 typedef void * rb_blocking_function_t(void *);
114 static void * WITHOUT_GVL(rb_blocking_function_t *func, void *data1,
115 rb_unblock_function_t *ubf, void *data2)
117 void *rv;
119 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
121 TRAP_BEG;
122 rv = func(data1);
123 TRAP_END;
125 return rv;
127 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
129 /* used to pass arguments to mq_open inside blocking region */
130 struct open_args {
131 mqd_t des;
132 int argc;
133 const char *name;
134 int oflags;
135 mode_t mode;
136 struct mq_attr attr;
139 /* used to pass arguments to mq_send/mq_receive inside blocking region */
140 struct rw_args {
141 mqd_t des;
142 union {
143 ssize_t received;
144 int retval;
146 char *msg_ptr;
147 size_t msg_len;
148 unsigned msg_prio;
149 struct timespec *timeout;
152 #ifndef HAVE_MQ_TIMEDSEND
153 static mqd_t
154 not_timedsend(mqd_t mqdes, const char *msg_ptr,
155 size_t msg_len, unsigned msg_prio,
156 const struct timespec *abs_timeout)
158 rb_bug("mq_timedsend workaround failed");
159 return (mqd_t)-1;
161 # define mq_timedsend not_timedsend
162 #endif
163 #ifndef HAVE_MQ_TIMEDRECEIVE
164 static ssize_t
165 not_timedreceive(mqd_t mqdes, char *msg_ptr,
166 size_t msg_len, unsigned *msg_prio,
167 const struct timespec *abs_timeout)
169 rb_bug("mq_timedreceive workaround failed");
170 return (mqd_t)-1;
172 # define mq_timedreceive not_timedreceive
173 #endif
175 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
176 static void num2timespec(struct timespec *ts, VALUE t)
178 switch (TYPE(t)) {
179 case T_FIXNUM:
180 case T_BIGNUM:
181 ts->tv_sec = NUM2TIMET(t);
182 ts->tv_nsec = 0;
183 break;
184 case T_FLOAT: {
185 double f, d;
186 double val = RFLOAT_VALUE(t);
188 d = modf(val, &f);
189 if (d >= 0) {
190 ts->tv_nsec = (long)(d * 1e9 + 0.5);
191 } else {
192 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
193 if (ts->tv_nsec > 0) {
194 ts->tv_nsec = 1000000000 - ts->tv_nsec;
195 f -= 1;
198 ts->tv_sec = (time_t)f;
199 if (f != ts->tv_sec)
200 rb_raise(rb_eRangeError, "%f out of range", val);
202 break;
203 default: {
204 VALUE f;
205 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
207 Check_Type(ary, T_ARRAY);
209 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
210 f = rb_ary_entry(ary, 1);
211 f = rb_funcall(f, '*', 1, INT2FIX(1000000000));
212 ts->tv_nsec = NUM2LONG(f);
216 #else
217 static void num2timespec(struct timespec *ts, VALUE t)
219 rb_raise(rb_eNotImpError,
220 "mq_timedsend and/or mq_timedreceive missing");
222 #endif
224 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
226 struct timespec ts, now;
228 if (NIL_P(t))
229 return NULL;
231 num2timespec(&ts, t);
232 clock_gettime(CLOCK_REALTIME, &now);
233 dest->tv_sec = now.tv_sec + ts.tv_sec;
234 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
236 if (dest->tv_nsec > 1000000000) {
237 dest->tv_nsec -= 1000000000;
238 ++dest->tv_sec;
241 return dest;
244 /* (may) run without GVL */
245 static void * xopen(void *ptr)
247 struct open_args *x = ptr;
249 switch (x->argc) {
250 case 2: x->des = mq_open(x->name, x->oflags); break;
251 case 3: x->des = mq_open(x->name, x->oflags, x->mode, NULL); break;
252 case 4: x->des = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
253 default: x->des = MQD_INVALID;
256 return NULL;
259 /* runs without GVL */
260 static void *xsend(void *ptr)
262 struct rw_args *x = ptr;
264 x->retval = x->timeout ?
265 mq_timedsend(x->des, x->msg_ptr, x->msg_len,
266 x->msg_prio, x->timeout) :
267 mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
269 return NULL;
272 /* runs without GVL */
273 static void * xrecv(void *ptr)
275 struct rw_args *x = ptr;
277 x->received = x->timeout ?
278 mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
279 &x->msg_prio, x->timeout) :
280 mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
282 return NULL;
285 /* called by GC */
286 static void mark(void *ptr)
288 struct posix_mq *mq = ptr;
290 rb_gc_mark(mq->name);
291 rb_gc_mark(mq->thread);
292 MQ_IO_MARK(mq);
295 /* called by GC */
296 static void _free(void *ptr)
298 struct posix_mq *mq = ptr;
300 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
301 /* we ignore errors when gc-ing */
302 mq_close(mq->des);
303 errno = 0;
305 xfree(ptr);
308 /* automatically called at creation (before initialize) */
309 static VALUE alloc(VALUE klass)
311 struct posix_mq *mq;
312 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
314 mq->des = MQD_INVALID;
315 mq->attr.mq_flags = 0;
316 mq->attr.mq_maxmsg = 0;
317 mq->attr.mq_msgsize = -1;
318 mq->attr.mq_curmsgs = 0;
319 mq->name = Qnil;
320 mq->thread = Qnil;
321 MQ_IO_SET(mq, Qnil);
323 return rv;
326 /* unwraps the posix_mq struct from self */
327 static struct posix_mq *get(VALUE self, int need_valid)
329 struct posix_mq *mq;
331 Data_Get_Struct(self, struct posix_mq, mq);
333 if (need_valid && mq->des == MQD_INVALID)
334 rb_raise(rb_eIOError, "closed queue descriptor");
336 return mq;
339 static void check_struct_type(VALUE astruct)
341 if (CLASS_OF(astruct) == cAttr)
342 return;
343 astruct = rb_inspect(astruct);
344 rb_raise(rb_eTypeError, "not a POSIX_MQ::Attr: %s",
345 StringValuePtr(astruct));
348 static void rstruct2mqattr(struct mq_attr *attr, VALUE astruct, int all)
350 VALUE tmp;
352 check_struct_type(astruct);
353 attr->mq_flags = NUM2LONG(rb_funcall(astruct, id_flags, 0));
355 tmp = rb_funcall(astruct, id_maxmsg, 0);
356 if (all || !NIL_P(tmp))
357 attr->mq_maxmsg = NUM2LONG(tmp);
359 tmp = rb_funcall(astruct, id_msgsize, 0);
360 if (all || !NIL_P(tmp))
361 attr->mq_msgsize = NUM2LONG(tmp);
363 tmp = rb_funcall(astruct, id_curmsgs, 0);
364 if (!NIL_P(tmp))
365 attr->mq_curmsgs = NUM2LONG(tmp);
369 * call-seq:
370 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
372 * Opens a POSIX message queue given by +name+. +name+ should start
373 * with a slash ("/") for portable applications.
375 * If a Symbol is given in place of integer +flags+, then:
377 * * +:r+ is equivalent to IO::RDONLY
378 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
379 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
381 * +mode+ is an integer and only used when IO::CREAT is used.
382 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
383 * If +mq_attr+ is not specified when creating a queue, then the
384 * system defaults will be used.
386 * See the manpage for mq_open(3) for more details on this function.
388 static VALUE init(int argc, VALUE *argv, VALUE self)
390 struct posix_mq *mq = get(self, 0);
391 struct open_args x;
392 VALUE name, oflags, mode, attr;
394 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
396 switch (TYPE(oflags)) {
397 case T_NIL:
398 x.oflags = O_RDONLY;
399 break;
400 case T_SYMBOL:
401 if (oflags == sym_r)
402 x.oflags = O_RDONLY;
403 else if (oflags == sym_w)
404 x.oflags = O_CREAT|O_WRONLY;
405 else if (oflags == sym_rw)
406 x.oflags = O_CREAT|O_RDWR;
407 else {
408 oflags = rb_inspect(oflags);
409 rb_raise(rb_eArgError,
410 "symbol must be :r, :w, or :rw: %s",
411 StringValuePtr(oflags));
413 break;
414 case T_BIGNUM:
415 case T_FIXNUM:
416 x.oflags = NUM2INT(oflags);
417 break;
418 default:
419 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
422 x.name = StringValueCStr(name);
423 x.argc = 2;
425 switch (TYPE(mode)) {
426 case T_FIXNUM:
427 x.argc = 3;
428 x.mode = NUM2UINT(mode);
429 break;
430 case T_NIL:
431 if (x.oflags & O_CREAT) {
432 x.argc = 3;
433 x.mode = 0666;
435 break;
436 default:
437 rb_raise(rb_eArgError, "mode not an integer");
440 switch (TYPE(attr)) {
441 case T_STRUCT:
442 x.argc = 4;
443 rstruct2mqattr(&x.attr, attr, 1);
445 /* principle of least surprise */
446 if (x.attr.mq_flags & O_NONBLOCK)
447 x.oflags |= O_NONBLOCK;
448 break;
449 case T_NIL:
450 break;
451 default:
452 check_struct_type(attr);
455 (void)xopen(&x);
456 mq->des = x.des;
457 if (mq->des == MQD_INVALID) {
458 switch (errno) {
459 case ENOMEM:
460 case EMFILE:
461 case ENFILE:
462 case ENOSPC:
463 rb_gc();
464 (void)xopen(&x);
465 mq->des = x.des;
467 if (mq->des == MQD_INVALID)
468 rb_sys_fail("mq_open");
471 mq->name = rb_str_dup(name);
472 if (x.oflags & O_NONBLOCK)
473 mq->attr.mq_flags = O_NONBLOCK;
475 return self;
479 * call-seq:
480 * POSIX_MQ.unlink(name) => 1
482 * Unlinks the message queue given by +name+. The queue will be destroyed
483 * when the last process with the queue open closes its queue descriptors.
485 static VALUE s_unlink(VALUE self, VALUE name)
487 int rv = mq_unlink(StringValueCStr(name));
489 if (rv == -1)
490 rb_sys_fail("mq_unlink");
492 return INT2NUM(1);
496 * call-seq:
497 * mq.unlink => mq
499 * Unlinks the message queue to prevent other processes from accessing it.
500 * All existing queue descriptors to this queue including those opened by
501 * other processes are unaffected. The queue will only be destroyed
502 * when the last process with open descriptors to this queue closes
503 * the descriptors.
505 static VALUE _unlink(VALUE self)
507 struct posix_mq *mq = get(self, 0);
508 int rv;
510 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
512 rv = mq_unlink(RSTRING_PTR(mq->name));
513 if (rv == -1)
514 rb_sys_fail("mq_unlink");
516 return self;
519 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
521 buffer = rb_obj_as_string(buffer);
522 x->msg_ptr = RSTRING_PTR(buffer);
523 x->msg_len = (size_t)RSTRING_LEN(buffer);
526 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self);
529 * call-seq:
530 * mq.send(string [,priority[, timeout]]) => true
532 * Inserts the given +string+ into the message queue with an optional,
533 * unsigned integer +priority+. If the optional +timeout+ is specified,
534 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
535 * before +timeout+ seconds has elapsed. Without +timeout+, this method
536 * may block until the queue is writable.
538 * On some older systems, the +timeout+ argument is not currently
539 * supported and may raise NotImplementedError if +timeout+ is used.
541 static VALUE my_send(int argc, VALUE *argv, VALUE self)
543 return _send(0, argc, argv, self);
546 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self)
548 struct posix_mq *mq = get(self, 1);
549 struct rw_args x;
550 VALUE buffer, prio, timeout;
551 struct timespec expire;
553 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
555 setup_send_buffer(&x, buffer);
556 x.des = mq->des;
557 x.timeout = convert_timeout(&expire, timeout);
558 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
560 retry:
561 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
562 if (x.retval == -1) {
563 if (errno == EINTR)
564 goto retry;
565 if (errno == EAGAIN && (sflags & PMQ_TRY))
566 return Qfalse;
567 rb_sys_fail("mq_send");
570 return Qtrue;
574 * call-seq:
575 * mq << string => mq
577 * Inserts the given +string+ into the message queue with a
578 * default priority of 0 and no timeout.
580 * Returns itself so its calls may be chained. This use is only
581 * recommended only for users who expect blocking behavior from
582 * the queue.
584 static VALUE send0(VALUE self, VALUE buffer)
586 struct posix_mq *mq = get(self, 1);
587 struct rw_args x;
589 setup_send_buffer(&x, buffer);
590 x.des = mq->des;
591 x.timeout = NULL;
592 x.msg_prio = 0;
594 retry:
595 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
596 if (x.retval == -1) {
597 if (errno == EINTR)
598 goto retry;
599 rb_sys_fail("mq_send");
602 return self;
605 #ifdef MQD_TO_FD
607 * call-seq:
608 * mq.to_io => IO
610 * Returns an IO.select-able +IO+ object. This method is only available
611 * under Linux and FreeBSD and is not intended to be portable.
613 static VALUE to_io(VALUE self)
615 struct posix_mq *mq = get(self, 1);
616 int fd = MQD_TO_FD(mq->des);
618 if (NIL_P(mq->io))
619 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
621 return mq->io;
623 #endif
625 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self);
628 * call-seq:
629 * mq.receive([buffer, [timeout]]) => [ message, priority ]
631 * Takes the highest priority message off the queue and returns
632 * an array containing the message as a String and the Integer
633 * priority of the message.
635 * If the optional +buffer+ is present, then it must be a String
636 * which will receive the data.
638 * If the optional +timeout+ is present, then it may be a Float
639 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
640 * will be raised if +timeout+ has elapsed and there are no messages
641 * in the queue.
643 * On some older systems, the +timeout+ argument is not currently
644 * supported and may raise NotImplementedError if +timeout+ is used.
646 static VALUE receive(int argc, VALUE *argv, VALUE self)
648 return _receive(PMQ_WANTARRAY, argc, argv, self);
652 * call-seq:
653 * mq.shift([buffer, [timeout]]) => message
655 * Takes the highest priority message off the queue and returns
656 * the message as a String.
658 * If the optional +buffer+ is present, then it must be a String
659 * which will receive the data.
661 * If the optional +timeout+ is present, then it may be a Float
662 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
663 * will be raised if +timeout+ has elapsed and there are no messages
664 * in the queue.
666 * On some older systems, the +timeout+ argument is not currently
667 * supported and may raise NotImplementedError if +timeout+ is used.
669 static VALUE shift(int argc, VALUE *argv, VALUE self)
671 return _receive(0, argc, argv, self);
674 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self)
676 struct posix_mq *mq = get(self, 1);
677 struct rw_args x;
678 VALUE buffer, timeout;
679 struct timespec expire;
681 if (mq->attr.mq_msgsize < 0) {
682 if (mq_getattr(mq->des, &mq->attr) < 0)
683 rb_sys_fail("mq_getattr");
686 rb_scan_args(argc, argv, "02", &buffer, &timeout);
687 x.timeout = convert_timeout(&expire, timeout);
689 if (NIL_P(buffer)) {
690 buffer = rb_str_new(0, mq->attr.mq_msgsize);
691 } else {
692 StringValue(buffer);
693 rb_str_modify(buffer);
694 rb_str_resize(buffer, mq->attr.mq_msgsize);
696 OBJ_TAINT(buffer);
697 x.msg_ptr = RSTRING_PTR(buffer);
698 x.msg_len = (size_t)mq->attr.mq_msgsize;
699 x.des = mq->des;
701 retry:
702 WITHOUT_GVL(xrecv, &x, RUBY_UBF_IO, 0);
703 if (x.received < 0) {
704 if (errno == EINTR)
705 goto retry;
706 if (errno == EAGAIN && (rflags & PMQ_TRY))
707 return Qnil;
708 rb_sys_fail("mq_receive");
711 rb_str_set_len(buffer, x.received);
713 if (rflags & PMQ_WANTARRAY)
714 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
715 return buffer;
719 * call-seq:
720 * mq.attr => mq_attr
722 * Returns a POSIX_MQ::Attr struct containing the attributes
723 * of the message queue. See the mq_getattr(3) manpage for
724 * more details.
726 static VALUE getattr(VALUE self)
728 struct posix_mq *mq = get(self, 1);
730 if (mq_getattr(mq->des, &mq->attr) < 0)
731 rb_sys_fail("mq_getattr");
733 return rb_funcall(cAttr, id_new, 4,
734 LONG2NUM(mq->attr.mq_flags),
735 LONG2NUM(mq->attr.mq_maxmsg),
736 LONG2NUM(mq->attr.mq_msgsize),
737 LONG2NUM(mq->attr.mq_curmsgs));
741 * call-seq:
742 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
744 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
745 * See the mq_setattr(3) manpage for more details.
747 * Consider using the POSIX_MQ#nonblock= method as it is easier and
748 * more natural to use.
750 static VALUE setattr(VALUE self, VALUE astruct)
752 struct posix_mq *mq = get(self, 1);
753 struct mq_attr newattr;
755 rstruct2mqattr(&newattr, astruct, 0);
757 if (mq_setattr(mq->des, &newattr, NULL) < 0)
758 rb_sys_fail("mq_setattr");
760 return astruct;
764 * call-seq:
765 * mq.close => nil
767 * Closes the underlying message queue descriptor.
768 * If this descriptor had a registered notification request, the request
769 * will be removed so another descriptor or process may register a
770 * notification request. Message queue descriptors are automatically
771 * closed by garbage collection.
773 static VALUE _close(VALUE self)
775 struct posix_mq *mq = get(self, 1);
777 if (! MQ_IO_CLOSE(mq)) {
778 if (mq_close(mq->des) == -1)
779 rb_sys_fail("mq_close");
781 mq->des = MQD_INVALID;
783 return Qnil;
787 * call-seq:
788 * mq.closed? => true or false
790 * Returns +true+ if the message queue descriptor is closed and therefore
791 * unusable, otherwise +false+
793 static VALUE closed(VALUE self)
795 struct posix_mq *mq = get(self, 0);
797 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
801 * call-seq:
802 * mq.name => string
804 * Returns the string name of message queue associated with +mq+
806 static VALUE name(VALUE self)
808 struct posix_mq *mq = get(self, 0);
810 return rb_str_dup(mq->name);
813 static int lookup_sig(VALUE sig)
815 static VALUE list;
816 const char *ptr;
817 long len;
819 sig = rb_obj_as_string(sig);
820 len = RSTRING_LEN(sig);
821 ptr = RSTRING_PTR(sig);
823 if (len > 3 && !memcmp("SIG", ptr, 3))
824 sig = rb_str_new(ptr + 3, len - 3);
826 if (!list) {
827 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
829 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
830 rb_global_variable(&list);
833 sig = rb_hash_aref(list, sig);
834 if (NIL_P(sig))
835 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
837 return NUM2INT(sig);
841 * TODO: Under Linux, we could just use netlink directly
842 * the same way glibc does...
844 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
845 static void thread_notify_fd(union sigval sv)
847 int fd = sv.sival_int;
849 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
852 static void my_mq_notify(mqd_t des, struct sigevent *not)
854 int rv = mq_notify(des, not);
856 if (rv == -1) {
857 if (errno == ENOMEM) {
858 rb_gc();
859 rv = mq_notify(des, not);
861 if (rv == -1)
862 rb_sys_fail("mq_notify");
866 static void lower_stack_size(pthread_attr_t *attr)
868 /* some OSes have ridiculously small stack sizes */
869 #ifdef PTHREAD_STACK_MIN
870 size_t stack_size = PTHREAD_STACK_MIN;
871 size_t min_size = 4096;
873 if (stack_size < min_size)
874 stack_size = min_size;
875 pthread_attr_setstacksize(attr, stack_size);
876 #endif
879 /* :nodoc: */
880 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
882 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
883 struct posix_mq *mq = get(self, 1);
884 struct sigevent not;
885 pthread_attr_t attr;
887 errno = pthread_attr_init(&attr);
888 if (errno) rb_sys_fail("pthread_attr_init");
890 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
891 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
893 lower_stack_size(&attr);
894 not.sigev_notify = SIGEV_THREAD;
895 not.sigev_notify_function = thread_notify_fd;
896 not.sigev_notify_attributes = &attr;
897 not.sigev_value.sival_int = fd;
899 if (!NIL_P(mq->thread))
900 rb_funcall(mq->thread, id_kill, 0, 0);
901 mq->thread = thr;
903 my_mq_notify(mq->des, &not);
905 return thr;
908 /* :nodoc: */
909 static VALUE notify_cleanup(VALUE self)
911 struct posix_mq *mq = get(self, 1);
913 if (!NIL_P(mq->thread)) {
914 rb_funcall(mq->thread, id_kill, 0, 0);
915 mq->thread = Qnil;
917 return Qnil;
921 * call-seq:
922 * mq.notify = signal => signal
924 * Registers the notification request to deliver a given +signal+
925 * to the current process when message is received.
926 * If +signal+ is +nil+, it will unregister and disable the notification
927 * request to allow other processes to register a request.
928 * If +signal+ is +false+, it will register a no-op notification request
929 * which will prevent other processes from registering a notification.
930 * If +signal+ is an +IO+ object, it will spawn a thread upon the
931 * arrival of the next message and write one "\\0" byte to the file
932 * descriptor belonging to that IO object.
933 * Only one process may have a notification request for a queue
934 * at a time, Errno::EBUSY will be raised if there is already
935 * a notification request registration for the queue.
937 * Notifications are only fired once and processes must reregister
938 * for subsequent notifications.
940 * For readers of the mq_notify(3) manpage, passing +false+
941 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
942 * of passing a NULL notification pointer to mq_notify(3).
944 static VALUE setnotify(VALUE self, VALUE arg)
946 struct posix_mq *mq = get(self, 1);
947 struct sigevent not;
948 struct sigevent * notification = &not;
949 VALUE rv = arg;
951 notify_cleanup(self);
952 not.sigev_notify = SIGEV_SIGNAL;
954 switch (TYPE(arg)) {
955 case T_FALSE:
956 not.sigev_notify = SIGEV_NONE;
957 break;
958 case T_NIL:
959 notification = NULL;
960 break;
961 case T_FIXNUM:
962 not.sigev_signo = NUM2INT(arg);
963 break;
964 case T_SYMBOL:
965 case T_STRING:
966 not.sigev_signo = lookup_sig(arg);
967 rv = INT2NUM(not.sigev_signo);
968 break;
969 default:
970 rb_raise(rb_eArgError, "must be a signal or nil");
973 my_mq_notify(mq->des, notification);
975 return rv;
979 * call-seq:
980 * mq.nonblock? => true or false
982 * Returns the current non-blocking state of the message queue descriptor.
984 static VALUE nonblock_p(VALUE self)
986 struct posix_mq *mq = get(self, 1);
988 if (mq_getattr(mq->des, &mq->attr) < 0)
989 rb_sys_fail("mq_getattr");
990 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
994 * call-seq:
995 * mq.nonblock = boolean => boolean
997 * Enables or disables non-blocking operation for the message queue
998 * descriptor. Errno::EAGAIN will be raised in situations where
999 * the queue would block. This is not compatible with +timeout+
1000 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
1002 static VALUE setnonblock(VALUE self, VALUE nb)
1004 struct mq_attr newattr;
1005 struct posix_mq *mq = get(self, 1);
1007 if (nb == Qtrue)
1008 newattr.mq_flags = O_NONBLOCK;
1009 else if (nb == Qfalse)
1010 newattr.mq_flags = 0;
1011 else
1012 rb_raise(rb_eArgError, "must be true or false");
1014 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
1015 rb_sys_fail("mq_setattr");
1017 mq->attr.mq_flags = newattr.mq_flags;
1019 return nb;
1023 * call-seq:
1024 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
1026 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
1027 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1028 * on success instead of +nil+.
1030 * This does not guarantee non-blocking behavior, the message queue must
1031 * be made non-blocking before calling this method.
1033 static VALUE trysend(int argc, VALUE *argv, VALUE self)
1035 return _send(PMQ_TRY, argc, argv, self);
1039 * call-seq:
1040 * mq.tryshift([buffer [, timeout]]) => message or nil
1042 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1043 * Errno::EAGAIN when non-blocking operation is desired.
1045 * This does not guarantee non-blocking behavior, the message queue must
1046 * be made non-blocking before calling this method.
1048 static VALUE tryshift(int argc, VALUE *argv, VALUE self)
1050 return _receive(PMQ_TRY, argc, argv, self);
1054 * call-seq:
1055 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1057 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1058 * Errno::EAGAIN when non-blocking operation is desired.
1060 * This does not guarantee non-blocking behavior, the message queue must
1061 * be made non-blocking before calling this method.
1063 static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
1065 return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
1068 void Init_posix_mq_ext(void)
1070 VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
1071 rb_define_alloc_func(cPOSIX_MQ, alloc);
1072 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
1075 * The maximum number of open message descriptors supported
1076 * by the system. This may be -1, in which case it is dynamically
1077 * set at runtime. Consult your operating system documentation
1078 * for system-specific information about this.
1080 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
1081 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
1084 * The maximum priority that may be specified for POSIX_MQ#send
1085 * On POSIX-compliant systems, this is at least 31, but some
1086 * systems allow higher limits.
1087 * The minimum priority is always zero.
1089 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
1090 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
1092 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
1094 rb_define_private_method(cPOSIX_MQ, "initialize", init, -1);
1095 rb_define_method(cPOSIX_MQ, "send", my_send, -1);
1096 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
1097 rb_define_method(cPOSIX_MQ, "trysend", trysend, -1);
1098 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
1099 rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1);
1100 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
1101 rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1);
1102 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
1103 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
1104 rb_define_method(cPOSIX_MQ, "close", _close, 0);
1105 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
1106 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
1107 rb_define_method(cPOSIX_MQ, "name", name, 0);
1108 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
1109 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
1110 rb_define_private_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1111 rb_define_private_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1112 rb_define_method(cPOSIX_MQ, "nonblock?", nonblock_p, 0);
1113 #ifdef MQD_TO_FD
1114 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1115 #endif
1117 id_new = rb_intern("new");
1118 id_kill = rb_intern("kill");
1119 id_fileno = rb_intern("fileno");
1120 id_divmod = rb_intern("divmod");
1121 id_flags = rb_intern("flags");
1122 id_maxmsg = rb_intern("maxmsg");
1123 id_msgsize = rb_intern("msgsize");
1124 id_curmsgs = rb_intern("curmsgs");
1125 sym_r = ID2SYM(rb_intern("r"));
1126 sym_w = ID2SYM(rb_intern("w"));
1127 sym_rw = ID2SYM(rb_intern("rw"));