run GC on ENOSPC when calling mq_open()
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob05f407deb10fc172e5af014372c7ab3302c7cc50
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <float.h>
25 #include <math.h>
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
30 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
31 #else
32 # define MQ_IO_MARK(mq) ((void)(0))
33 # define MQ_IO_SET(mq,val) ((void)(0))
34 # define MQ_IO_CLOSE(mq) ((int)(0))
35 # define MQ_IO_NIL_P(mq) ((int)(1))
36 #endif
38 struct posix_mq {
39 mqd_t des;
40 struct mq_attr attr;
41 VALUE name;
42 VALUE thread;
43 #ifdef MQD_TO_FD
44 VALUE io;
45 #endif
48 #ifdef MQD_TO_FD
49 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
50 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
51 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
52 static int MQ_IO_CLOSE(struct posix_mq *mq)
54 if (NIL_P(mq->io))
55 return 0;
57 /* not safe during GC */
58 rb_io_close(mq->io);
59 mq->io = Qnil;
61 return 1;
63 #endif
65 static VALUE cPOSIX_MQ, cAttr;
66 static ID id_new, id_kill, id_fileno, id_mul, id_divmod;
67 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
68 static VALUE sym_r, sym_w, sym_rw;
69 static const mqd_t MQD_INVALID = (mqd_t)-1;
71 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
72 #ifndef RSTRING_PTR
73 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
74 #endif
75 #ifndef RSTRING_LEN
76 # define RSTRING_LEN(s) (RSTRING(s)->len)
77 #endif
78 #ifndef RFLOAT_VALUE
79 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
80 #endif
82 #ifndef HAVE_RB_STR_SET_LEN
83 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
84 # ifdef RUBINIUS
85 # error upgrade Rubinius, rb_str_set_len should be available
86 # endif
87 static void rb_18_str_set_len(VALUE str, long len)
89 RSTRING(str)->len = len;
90 RSTRING(str)->ptr[len] = '\0';
92 #define rb_str_set_len rb_18_str_set_len
93 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
95 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
96 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
97 # include <rubysig.h>
98 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
99 typedef void rb_unblock_function_t(void *);
100 typedef VALUE rb_blocking_function_t(void *);
101 static VALUE
102 rb_thread_blocking_region(
103 rb_blocking_function_t *func, void *data1,
104 rb_unblock_function_t *ubf, void *data2)
106 VALUE rv;
108 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
110 TRAP_BEG;
111 rv = func(data1);
112 TRAP_END;
114 return rv;
116 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
118 /* used to pass arguments to mq_open inside blocking region */
119 struct open_args {
120 int argc;
121 const char *name;
122 int oflags;
123 mode_t mode;
124 struct mq_attr attr;
127 /* used to pass arguments to mq_send/mq_receive inside blocking region */
128 struct rw_args {
129 mqd_t des;
130 char *msg_ptr;
131 size_t msg_len;
132 unsigned msg_prio;
133 struct timespec *timeout;
136 #ifndef HAVE_MQ_TIMEDSEND
137 static mqd_t
138 not_timedsend(mqd_t mqdes, const char *msg_ptr,
139 size_t msg_len, unsigned msg_prio,
140 const struct timespec *abs_timeout)
142 rb_bug("mq_timedsend workaround failed");
143 return (mqd_t)-1;
145 # define mq_timedsend not_timedsend
146 #endif
147 #ifndef HAVE_MQ_TIMEDRECEIVE
148 static ssize_t
149 not_timedreceive(mqd_t mqdes, char *msg_ptr,
150 size_t msg_len, unsigned *msg_prio,
151 const struct timespec *abs_timeout)
153 rb_bug("mq_timedreceive workaround failed");
154 return (mqd_t)-1;
156 # define mq_timedreceive not_timedreceive
157 #endif
159 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
160 static void num2timespec(struct timespec *ts, VALUE t)
162 switch (TYPE(t)) {
163 case T_FIXNUM:
164 case T_BIGNUM:
165 ts->tv_sec = NUM2TIMET(t);
166 ts->tv_nsec = 0;
167 break;
168 case T_FLOAT: {
169 double f, d;
170 double val = RFLOAT_VALUE(t);
172 d = modf(val, &f);
173 if (d >= 0) {
174 ts->tv_nsec = (long)(d * 1e9 + 0.5);
175 } else {
176 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
177 if (ts->tv_nsec > 0) {
178 ts->tv_nsec = 1000000000 - ts->tv_nsec;
179 f -= 1;
182 ts->tv_sec = (time_t)f;
183 if (f != ts->tv_sec)
184 rb_raise(rb_eRangeError, "%f out of range", val);
185 ts->tv_sec = (time_t)f;
187 break;
188 default: {
189 VALUE f;
190 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
192 Check_Type(ary, T_ARRAY);
194 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
195 f = rb_ary_entry(ary, 1);
196 f = rb_funcall(f, id_mul, 1, INT2FIX(1000000000));
197 ts->tv_nsec = NUM2LONG(f);
201 #else
202 static void num2timespec(struct timespec *ts, VALUE t)
204 rb_raise(rb_eNotImpError,
205 "mq_timedsend and/or mq_timedreceive missing");
207 #endif
209 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
211 struct timespec ts, now;
213 if (NIL_P(t))
214 return NULL;
216 num2timespec(&ts, t);
217 clock_gettime(CLOCK_REALTIME, &now);
218 dest->tv_sec = now.tv_sec + ts.tv_sec;
219 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
221 if (dest->tv_nsec > 1000000000) {
222 dest->tv_nsec -= 1000000000;
223 ++dest->tv_sec;
226 return dest;
229 /* (may) run without GVL */
230 static VALUE xopen(void *ptr)
232 struct open_args *x = ptr;
233 mqd_t rv;
235 switch (x->argc) {
236 case 2: rv = mq_open(x->name, x->oflags); break;
237 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
238 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
239 default: rv = MQD_INVALID;
242 return (VALUE)rv;
245 /* runs without GVL */
246 static VALUE xsend(void *ptr)
248 struct rw_args *x = ptr;
250 if (x->timeout)
251 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
252 x->msg_prio, x->timeout);
254 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
257 /* runs without GVL */
258 static VALUE xrecv(void *ptr)
260 struct rw_args *x = ptr;
262 if (x->timeout)
263 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
264 &x->msg_prio, x->timeout);
266 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
269 /* called by GC */
270 static void mark(void *ptr)
272 struct posix_mq *mq = ptr;
274 rb_gc_mark(mq->name);
275 rb_gc_mark(mq->thread);
276 MQ_IO_MARK(mq);
279 /* called by GC */
280 static void _free(void *ptr)
282 struct posix_mq *mq = ptr;
284 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
285 /* we ignore errors when gc-ing */
286 mq_close(mq->des);
287 errno = 0;
289 xfree(ptr);
292 /* automatically called at creation (before initialize) */
293 static VALUE alloc(VALUE klass)
295 struct posix_mq *mq;
296 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
298 mq->des = MQD_INVALID;
299 mq->attr.mq_flags = 0;
300 mq->attr.mq_maxmsg = 0;
301 mq->attr.mq_msgsize = -1;
302 mq->attr.mq_curmsgs = 0;
303 mq->name = Qnil;
304 mq->thread = Qnil;
305 MQ_IO_SET(mq, Qnil);
307 return rv;
310 /* unwraps the posix_mq struct from self */
311 static struct posix_mq *get(VALUE self, int need_valid)
313 struct posix_mq *mq;
315 Data_Get_Struct(self, struct posix_mq, mq);
317 if (need_valid && mq->des == MQD_INVALID)
318 rb_raise(rb_eIOError, "closed queue descriptor");
320 return mq;
323 static void check_struct_type(VALUE astruct)
325 if (CLASS_OF(astruct) == cAttr)
326 return;
327 astruct = rb_inspect(astruct);
328 rb_raise(rb_eTypeError, "not a POSIX_MQ::Attr: %s",
329 StringValuePtr(astruct));
332 static void rstruct2mqattr(struct mq_attr *attr, VALUE astruct, int all)
334 VALUE tmp;
336 check_struct_type(astruct);
337 attr->mq_flags = NUM2LONG(rb_funcall(astruct, id_flags, 0));
339 tmp = rb_funcall(astruct, id_maxmsg, 0);
340 if (all || !NIL_P(tmp))
341 attr->mq_maxmsg = NUM2LONG(tmp);
343 tmp = rb_funcall(astruct, id_msgsize, 0);
344 if (all || !NIL_P(tmp))
345 attr->mq_msgsize = NUM2LONG(tmp);
347 tmp = rb_funcall(astruct, id_curmsgs, 0);
348 if (!NIL_P(tmp))
349 attr->mq_curmsgs = NUM2LONG(tmp);
353 * call-seq:
354 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
356 * Opens a POSIX message queue given by +name+. +name+ should start
357 * with a slash ("/") for portable applications.
359 * If a Symbol is given in place of integer +flags+, then:
361 * * +:r+ is equivalent to IO::RDONLY
362 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
363 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
365 * +mode+ is an integer and only used when IO::CREAT is used.
366 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
367 * If +mq_attr+ is not specified when creating a queue, then the
368 * system defaults will be used.
370 * See the manpage for mq_open(3) for more details on this function.
372 static VALUE init(int argc, VALUE *argv, VALUE self)
374 struct posix_mq *mq = get(self, 0);
375 struct open_args x;
376 VALUE name, oflags, mode, attr;
378 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
380 switch (TYPE(oflags)) {
381 case T_NIL:
382 x.oflags = O_RDONLY;
383 break;
384 case T_SYMBOL:
385 if (oflags == sym_r)
386 x.oflags = O_RDONLY;
387 else if (oflags == sym_w)
388 x.oflags = O_CREAT|O_WRONLY;
389 else if (oflags == sym_rw)
390 x.oflags = O_CREAT|O_RDWR;
391 else {
392 oflags = rb_inspect(oflags);
393 rb_raise(rb_eArgError,
394 "symbol must be :r, :w, or :rw: %s",
395 StringValuePtr(oflags));
397 break;
398 case T_BIGNUM:
399 case T_FIXNUM:
400 x.oflags = NUM2INT(oflags);
401 break;
402 default:
403 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
406 x.name = StringValueCStr(name);
407 x.argc = 2;
409 switch (TYPE(mode)) {
410 case T_FIXNUM:
411 x.argc = 3;
412 x.mode = NUM2UINT(mode);
413 break;
414 case T_NIL:
415 if (x.oflags & O_CREAT) {
416 x.argc = 3;
417 x.mode = 0666;
419 break;
420 default:
421 rb_raise(rb_eArgError, "mode not an integer");
424 switch (TYPE(attr)) {
425 case T_STRUCT:
426 x.argc = 4;
427 rstruct2mqattr(&x.attr, attr, 1);
429 /* principle of least surprise */
430 if (x.attr.mq_flags & O_NONBLOCK)
431 x.oflags |= O_NONBLOCK;
432 break;
433 case T_NIL:
434 break;
435 default:
436 check_struct_type(attr);
439 mq->des = (mqd_t)xopen(&x);
440 if (mq->des == MQD_INVALID) {
441 switch (errno) {
442 case ENOMEM:
443 case EMFILE:
444 case ENFILE:
445 case ENOSPC:
446 rb_gc();
447 mq->des = (mqd_t)xopen(&x);
449 if (mq->des == MQD_INVALID)
450 rb_sys_fail("mq_open");
453 mq->name = rb_str_dup(name);
454 if (x.oflags & O_NONBLOCK)
455 mq->attr.mq_flags = O_NONBLOCK;
457 return self;
461 * call-seq:
462 * POSIX_MQ.unlink(name) => 1
464 * Unlinks the message queue given by +name+. The queue will be destroyed
465 * when the last process with the queue open closes its queue descriptors.
467 static VALUE s_unlink(VALUE self, VALUE name)
469 mqd_t rv = mq_unlink(StringValueCStr(name));
471 if (rv == MQD_INVALID)
472 rb_sys_fail("mq_unlink");
474 return INT2NUM(1);
478 * call-seq:
479 * mq.unlink => mq
481 * Unlinks the message queue to prevent other processes from accessing it.
482 * All existing queue descriptors to this queue including those opened by
483 * other processes are unaffected. The queue will only be destroyed
484 * when the last process with open descriptors to this queue closes
485 * the descriptors.
487 static VALUE _unlink(VALUE self)
489 struct posix_mq *mq = get(self, 0);
490 mqd_t rv;
492 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
494 rv = mq_unlink(RSTRING_PTR(mq->name));
495 if (rv == MQD_INVALID)
496 rb_sys_fail("mq_unlink");
498 return self;
501 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
503 buffer = rb_obj_as_string(buffer);
504 x->msg_ptr = RSTRING_PTR(buffer);
505 x->msg_len = (size_t)RSTRING_LEN(buffer);
509 * call-seq:
510 * mq.send(string [,priority[, timeout]]) => nil
512 * Inserts the given +string+ into the message queue with an optional,
513 * unsigned integer +priority+. If the optional +timeout+ is specified,
514 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
515 * before +timeout+ seconds has elapsed. Without +timeout+, this method
516 * may block until the queue is writable.
518 * On some older systems, the +timeout+ argument is not currently
519 * supported and may raise NotImplementedError if +timeout+ is used.
521 static VALUE _send(int argc, VALUE *argv, VALUE self)
523 struct posix_mq *mq = get(self, 1);
524 struct rw_args x;
525 VALUE buffer, prio, timeout;
526 mqd_t rv;
527 struct timespec expire;
529 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
531 setup_send_buffer(&x, buffer);
532 x.des = mq->des;
533 x.timeout = convert_timeout(&expire, timeout);
534 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
536 rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
537 if (rv == MQD_INVALID)
538 rb_sys_fail("mq_send");
540 return Qnil;
544 * call-seq:
545 * mq << string => mq
547 * Inserts the given +string+ into the message queue with a
548 * default priority of 0 and no timeout.
550 static VALUE send0(VALUE self, VALUE buffer)
552 struct posix_mq *mq = get(self, 1);
553 struct rw_args x;
554 mqd_t rv;
556 setup_send_buffer(&x, buffer);
557 x.des = mq->des;
558 x.timeout = NULL;
559 x.msg_prio = 0;
561 rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
562 if (rv == MQD_INVALID)
563 rb_sys_fail("mq_send");
565 return self;
568 #ifdef MQD_TO_FD
570 * call-seq:
571 * mq.to_io => IO
573 * Returns an IO.select-able +IO+ object. This method is only available
574 * under Linux and FreeBSD and is not intended to be portable.
576 static VALUE to_io(VALUE self)
578 struct posix_mq *mq = get(self, 1);
579 int fd = MQD_TO_FD(mq->des);
581 if (NIL_P(mq->io))
582 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
584 return mq->io;
586 #endif
588 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
591 * call-seq:
592 * mq.receive([buffer, [timeout]]) => [ message, priority ]
594 * Takes the highest priority message off the queue and returns
595 * an array containing the message as a String and the Integer
596 * priority of the message.
598 * If the optional +buffer+ is present, then it must be a String
599 * which will receive the data.
601 * If the optional +timeout+ is present, then it may be a Float
602 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
603 * will be raised if +timeout+ has elapsed and there are no messages
604 * in the queue.
606 * On some older systems, the +timeout+ argument is not currently
607 * supported and may raise NotImplementedError if +timeout+ is used.
609 static VALUE receive(int argc, VALUE *argv, VALUE self)
611 return _receive(1, argc, argv, self);
615 * call-seq:
616 * mq.shift([buffer, [timeout]]) => message
618 * Takes the highest priority message off the queue and returns
619 * the message as a String.
621 * If the optional +buffer+ is present, then it must be a String
622 * which will receive the data.
624 * If the optional +timeout+ is present, then it may be a Float
625 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
626 * will be raised if +timeout+ has elapsed and there are no messages
627 * in the queue.
629 * On some older systems, the +timeout+ argument is not currently
630 * supported and may raise NotImplementedError if +timeout+ is used.
632 static VALUE shift(int argc, VALUE *argv, VALUE self)
634 return _receive(0, argc, argv, self);
637 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
639 struct posix_mq *mq = get(self, 1);
640 struct rw_args x;
641 VALUE buffer, timeout;
642 ssize_t r;
643 struct timespec expire;
645 if (mq->attr.mq_msgsize < 0) {
646 if (mq_getattr(mq->des, &mq->attr) < 0)
647 rb_sys_fail("mq_getattr");
650 rb_scan_args(argc, argv, "02", &buffer, &timeout);
651 x.timeout = convert_timeout(&expire, timeout);
653 if (NIL_P(buffer)) {
654 buffer = rb_str_new(0, mq->attr.mq_msgsize);
655 } else {
656 StringValue(buffer);
657 rb_str_modify(buffer);
658 rb_str_resize(buffer, mq->attr.mq_msgsize);
660 OBJ_TAINT(buffer);
661 x.msg_ptr = RSTRING_PTR(buffer);
662 x.msg_len = (size_t)mq->attr.mq_msgsize;
663 x.des = mq->des;
665 r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
666 if (r < 0)
667 rb_sys_fail("mq_receive");
669 rb_str_set_len(buffer, r);
671 if (wantarray)
672 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
673 return buffer;
677 * call-seq:
678 * mq.attr => mq_attr
680 * Returns a POSIX_MQ::Attr struct containing the attributes
681 * of the message queue. See the mq_getattr(3) manpage for
682 * more details.
684 static VALUE getattr(VALUE self)
686 struct posix_mq *mq = get(self, 1);
687 VALUE astruct;
689 if (mq_getattr(mq->des, &mq->attr) < 0)
690 rb_sys_fail("mq_getattr");
692 return rb_funcall(cAttr, id_new, 4,
693 LONG2NUM(mq->attr.mq_flags),
694 LONG2NUM(mq->attr.mq_maxmsg),
695 LONG2NUM(mq->attr.mq_msgsize),
696 LONG2NUM(mq->attr.mq_curmsgs));
700 * call-seq:
701 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
703 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
704 * See the mq_setattr(3) manpage for more details.
706 * Consider using the POSIX_MQ#nonblock= method as it is easier and
707 * more natural to use.
709 static VALUE setattr(VALUE self, VALUE astruct)
711 struct posix_mq *mq = get(self, 1);
712 struct mq_attr newattr;
714 rstruct2mqattr(&newattr, astruct, 0);
716 if (mq_setattr(mq->des, &newattr, NULL) < 0)
717 rb_sys_fail("mq_setattr");
719 return astruct;
723 * call-seq:
724 * mq.close => nil
726 * Closes the underlying message queue descriptor.
727 * If this descriptor had a registered notification request, the request
728 * will be removed so another descriptor or process may register a
729 * notification request. Message queue descriptors are automatically
730 * closed by garbage collection.
732 static VALUE _close(VALUE self)
734 struct posix_mq *mq = get(self, 1);
736 if (! MQ_IO_CLOSE(mq)) {
737 if (mq_close(mq->des) == -1)
738 rb_sys_fail("mq_close");
740 mq->des = MQD_INVALID;
742 return Qnil;
746 * call-seq:
747 * mq.closed? => true or false
749 * Returns +true+ if the message queue descriptor is closed and therefore
750 * unusable, otherwise +false+
752 static VALUE closed(VALUE self)
754 struct posix_mq *mq = get(self, 0);
756 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
760 * call-seq:
761 * mq.name => string
763 * Returns the string name of message queue associated with +mq+
765 static VALUE name(VALUE self)
767 struct posix_mq *mq = get(self, 0);
769 return rb_str_dup(mq->name);
772 static int lookup_sig(VALUE sig)
774 static VALUE list;
775 const char *ptr;
776 long len;
778 sig = rb_obj_as_string(sig);
779 len = RSTRING_LEN(sig);
780 ptr = RSTRING_PTR(sig);
782 if (len > 3 && !memcmp("SIG", ptr, 3))
783 sig = rb_str_new(ptr + 3, len - 3);
785 if (!list) {
786 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
788 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
789 rb_global_variable(&list);
792 sig = rb_hash_aref(list, sig);
793 if (NIL_P(sig))
794 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
796 return NUM2INT(sig);
800 * TODO: Under Linux, we could just use netlink directly
801 * the same way glibc does...
803 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
804 static void thread_notify_fd(union sigval sv)
806 int fd = sv.sival_int;
808 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
811 static void my_mq_notify(mqd_t des, struct sigevent *not)
813 mqd_t rv = mq_notify(des, not);
815 if (rv == MQD_INVALID) {
816 if (errno == ENOMEM) {
817 rb_gc();
818 rv = mq_notify(des, not);
820 if (rv == MQD_INVALID)
821 rb_sys_fail("mq_notify");
825 /* :nodoc: */
826 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
828 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
829 struct posix_mq *mq = get(self, 1);
830 struct sigevent not;
831 pthread_attr_t attr;
833 errno = pthread_attr_init(&attr);
834 if (errno) rb_sys_fail("pthread_attr_init");
836 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
837 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
839 #ifdef PTHREAD_STACK_MIN
840 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
841 #endif
843 not.sigev_notify = SIGEV_THREAD;
844 not.sigev_notify_function = thread_notify_fd;
845 not.sigev_notify_attributes = &attr;
846 not.sigev_value.sival_int = fd;
848 if (!NIL_P(mq->thread))
849 rb_funcall(mq->thread, id_kill, 0, 0);
850 mq->thread = thr;
852 my_mq_notify(mq->des, &not);
854 return thr;
857 /* :nodoc: */
858 static VALUE notify_cleanup(VALUE self)
860 struct posix_mq *mq = get(self, 1);
862 if (!NIL_P(mq->thread)) {
863 rb_funcall(mq->thread, id_kill, 0, 0);
864 mq->thread = Qnil;
866 return Qnil;
870 * call-seq:
871 * mq.notify = signal => signal
873 * Registers the notification request to deliver a given +signal+
874 * to the current process when message is received.
875 * If +signal+ is +nil+, it will unregister and disable the notification
876 * request to allow other processes to register a request.
877 * If +signal+ is +false+, it will register a no-op notification request
878 * which will prevent other processes from registering a notification.
879 * If +signal+ is an +IO+ object, it will spawn a thread upon the
880 * arrival of the next message and write one "\\0" byte to the file
881 * descriptor belonging to that IO object.
882 * Only one process may have a notification request for a queue
883 * at a time, Errno::EBUSY will be raised if there is already
884 * a notification request registration for the queue.
886 * Notifications are only fired once and processes must reregister
887 * for subsequent notifications.
889 * For readers of the mq_notify(3) manpage, passing +false+
890 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
891 * of passing a NULL notification pointer to mq_notify(3).
893 static VALUE setnotify(VALUE self, VALUE arg)
895 struct posix_mq *mq = get(self, 1);
896 struct sigevent not;
897 struct sigevent * notification = &not;
898 VALUE rv = arg;
900 notify_cleanup(self);
901 not.sigev_notify = SIGEV_SIGNAL;
903 switch (TYPE(arg)) {
904 case T_FALSE:
905 not.sigev_notify = SIGEV_NONE;
906 break;
907 case T_NIL:
908 notification = NULL;
909 break;
910 case T_FIXNUM:
911 not.sigev_signo = NUM2INT(arg);
912 break;
913 case T_SYMBOL:
914 case T_STRING:
915 not.sigev_signo = lookup_sig(arg);
916 rv = INT2NUM(not.sigev_signo);
917 break;
918 default:
919 rb_raise(rb_eArgError, "must be a signal or nil");
922 my_mq_notify(mq->des, notification);
924 return rv;
928 * call-seq:
929 * mq.nonblock? => true or false
931 * Returns the current non-blocking state of the message queue descriptor.
933 static VALUE nonblock_p(VALUE self)
935 struct posix_mq *mq = get(self, 1);
937 if (mq_getattr(mq->des, &mq->attr) < 0)
938 rb_sys_fail("mq_getattr");
939 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
943 * call-seq:
944 * mq.nonblock = boolean => boolean
946 * Enables or disables non-blocking operation for the message queue
947 * descriptor. Errno::EAGAIN will be raised in situations where
948 * the queue would block. This is not compatible with +timeout+
949 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
951 static VALUE setnonblock(VALUE self, VALUE nb)
953 struct mq_attr newattr;
954 struct posix_mq *mq = get(self, 1);
956 if (nb == Qtrue)
957 newattr.mq_flags = O_NONBLOCK;
958 else if (nb == Qfalse)
959 newattr.mq_flags = 0;
960 else
961 rb_raise(rb_eArgError, "must be true or false");
963 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
964 rb_sys_fail("mq_setattr");
966 mq->attr.mq_flags = newattr.mq_flags;
968 return nb;
971 void Init_posix_mq_ext(void)
973 cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
974 rb_define_alloc_func(cPOSIX_MQ, alloc);
975 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
978 * The maximum number of open message descriptors supported
979 * by the system. This may be -1, in which case it is dynamically
980 * set at runtime. Consult your operating system documentation
981 * for system-specific information about this.
983 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
984 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
987 * The maximum priority that may be specified for POSIX_MQ#send
988 * On POSIX-compliant systems, this is at least 31, but some
989 * systems allow higher limits.
990 * The minimum priority is always zero.
992 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
993 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
995 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
997 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
998 rb_define_method(cPOSIX_MQ, "send", _send, -1);
999 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
1000 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
1001 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
1002 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
1003 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
1004 rb_define_method(cPOSIX_MQ, "close", _close, 0);
1005 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
1006 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
1007 rb_define_method(cPOSIX_MQ, "name", name, 0);
1008 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
1009 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
1010 rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1011 rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1012 rb_define_method(cPOSIX_MQ, "nonblock?", nonblock_p, 0);
1013 #ifdef MQD_TO_FD
1014 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1015 #endif
1017 id_new = rb_intern("new");
1018 id_kill = rb_intern("kill");
1019 id_fileno = rb_intern("fileno");
1020 id_mul = rb_intern("*");
1021 id_divmod = rb_intern("divmod");
1022 id_flags = rb_intern("flags");
1023 id_maxmsg = rb_intern("maxmsg");
1024 id_msgsize = rb_intern("msgsize");
1025 id_curmsgs = rb_intern("curmsgs");
1026 sym_r = ID2SYM(rb_intern("r"));
1027 sym_w = ID2SYM(rb_intern("w"));
1028 sym_rw = ID2SYM(rb_intern("rw"));