remove rb_time_interval() and gettimeofday() dependency
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blobb634a91d2e15af741a9a96688529b11885de211d
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #ifndef RB_GC_GUARD
18 # define RB_GC_GUARD(v) (*(volatile VALUE *)&(v))
19 #endif
21 #include <time.h>
22 #include <mqueue.h>
23 #include <fcntl.h>
24 #include <sys/stat.h>
25 #include <errno.h>
26 #include <assert.h>
27 #include <unistd.h>
28 #include <float.h>
29 #include <math.h>
31 #if defined(__linux__)
32 # define MQD_TO_FD(mqd) (int)(mqd)
33 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
34 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
35 #else
36 # define MQ_IO_MARK(mq) ((void)(0))
37 # define MQ_IO_SET(mq,val) ((void)(0))
38 # define MQ_IO_CLOSE(mq) ((void)(0))
39 # define MQ_IO_NILP(mq) ((void)(1))
40 #endif
42 struct posix_mq {
43 mqd_t des;
44 struct mq_attr attr;
45 VALUE name;
46 VALUE thread;
47 #ifdef MQD_TO_FD
48 VALUE io;
49 #endif
52 #ifdef MQD_TO_FD
53 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
54 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
55 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
56 static int MQ_IO_CLOSE(struct posix_mq *mq)
58 if (NIL_P(mq->io))
59 return 0;
61 /* not safe during GC */
62 rb_io_close(mq->io);
63 mq->io = Qnil;
65 return 1;
67 #endif
69 static VALUE cPOSIX_MQ, cAttr;
70 static ID id_new, id_kill, id_fileno, id_mul, id_divmod;
71 static ID sym_r, sym_w, sym_rw;
72 static const mqd_t MQD_INVALID = (mqd_t)-1;
74 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
75 #ifndef RSTRING_PTR
76 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
77 #endif
78 #ifndef RSTRING_LEN
79 # define RSTRING_LEN(s) (RSTRING(s)->len)
80 #endif
81 #ifndef RSTRUCT_PTR
82 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
83 #endif
84 #ifndef RSTRUCT_LEN
85 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
86 #endif
88 #ifndef HAVE_RB_STR_SET_LEN
89 # ifdef RUBINIUS
90 # define rb_str_set_len(str,len) rb_str_resize(str,len)
91 # else /* 1.8.6 optimized version */
92 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
93 static void rb_18_str_set_len(VALUE str, long len)
95 RSTRING(str)->len = len;
96 RSTRING(str)->ptr[len] = '\0';
98 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
99 # endif /* ! RUBINIUS */
100 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
102 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
103 static VALUE rb_struct_alloc_noinit(VALUE class)
105 return rb_funcall(class, id_new, 0, 0);
107 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
109 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
110 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
111 # include <rubysig.h>
112 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
113 typedef void rb_unblock_function_t(void *);
114 typedef VALUE rb_blocking_function_t(void *);
115 static VALUE
116 rb_thread_blocking_region(
117 rb_blocking_function_t *func, void *data1,
118 rb_unblock_function_t *ubf, void *data2)
120 VALUE rv;
122 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
124 TRAP_BEG;
125 rv = func(data1);
126 TRAP_END;
128 return rv;
130 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
132 /* used to pass arguments to mq_open inside blocking region */
133 struct open_args {
134 int argc;
135 const char *name;
136 int oflags;
137 mode_t mode;
138 struct mq_attr attr;
141 /* used to pass arguments to mq_send/mq_receive inside blocking region */
142 struct rw_args {
143 mqd_t des;
144 char *msg_ptr;
145 size_t msg_len;
146 unsigned msg_prio;
147 struct timespec *timeout;
150 static void num2timespec(struct timespec *ts, VALUE t)
152 switch (TYPE(t)) {
153 case T_FIXNUM:
154 case T_BIGNUM:
155 ts->tv_sec = NUM2TIMET(t);
156 ts->tv_nsec = 0;
157 break;
158 case T_FLOAT: {
159 double f, d;
160 double val = RFLOAT_VALUE(t);
162 d = modf(val, &f);
163 if (d >= 0) {
164 ts->tv_nsec = (long)(d * 1e9 + 0.5);
165 } else {
166 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
167 if (ts->tv_nsec > 0) {
168 ts->tv_nsec = 1000000000 - ts->tv_nsec;
169 f -= 1;
172 ts->tv_sec = (time_t)f;
173 if (f != ts->tv_sec)
174 rb_raise(rb_eRangeError, "%f out of range", val);
175 ts->tv_sec = (time_t)f;
177 break;
178 default: {
179 VALUE f;
180 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
182 Check_Type(ary, T_ARRAY);
184 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
185 f = rb_ary_entry(ary, 1);
186 f = rb_funcall(f, id_mul, 1, INT2FIX(1000000000));
187 ts->tv_nsec = NUM2LONG(f);
192 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
194 struct timespec ts, now;
196 if (NIL_P(t))
197 return NULL;
199 num2timespec(&ts, t);
200 clock_gettime(CLOCK_REALTIME, &now);
201 dest->tv_sec = now.tv_sec + ts.tv_sec;
202 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
204 if (dest->tv_nsec > 1000000000) {
205 dest->tv_nsec -= 1000000000;
206 ++dest->tv_sec;
209 return dest;
212 /* (may) run without GVL */
213 static VALUE xopen(void *ptr)
215 struct open_args *x = ptr;
216 mqd_t rv;
218 switch (x->argc) {
219 case 2: rv = mq_open(x->name, x->oflags); break;
220 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
221 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
222 default: rv = MQD_INVALID;
225 return (VALUE)rv;
228 /* runs without GVL */
229 static VALUE xsend(void *ptr)
231 struct rw_args *x = ptr;
233 if (x->timeout)
234 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
235 x->msg_prio, x->timeout);
237 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
240 /* runs without GVL */
241 static VALUE xrecv(void *ptr)
243 struct rw_args *x = ptr;
245 if (x->timeout)
246 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
247 &x->msg_prio, x->timeout);
249 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
252 /* called by GC */
253 static void mark(void *ptr)
255 struct posix_mq *mq = ptr;
257 rb_gc_mark(mq->name);
258 rb_gc_mark(mq->thread);
259 MQ_IO_MARK(mq);
262 /* called by GC */
263 static void _free(void *ptr)
265 struct posix_mq *mq = ptr;
267 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
268 /* we ignore errors when gc-ing */
269 mq_close(mq->des);
270 errno = 0;
272 xfree(ptr);
275 /* automatically called at creation (before initialize) */
276 static VALUE alloc(VALUE klass)
278 struct posix_mq *mq;
279 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
281 mq->des = MQD_INVALID;
282 mq->attr.mq_flags = 0;
283 mq->attr.mq_maxmsg = 0;
284 mq->attr.mq_msgsize = -1;
285 mq->attr.mq_curmsgs = 0;
286 mq->name = Qnil;
287 mq->thread = Qnil;
288 MQ_IO_SET(mq, Qnil);
290 return rv;
293 /* unwraps the posix_mq struct from self */
294 static struct posix_mq *get(VALUE self, int need_valid)
296 struct posix_mq *mq;
298 Data_Get_Struct(self, struct posix_mq, mq);
300 if (need_valid && mq->des == MQD_INVALID)
301 rb_raise(rb_eIOError, "closed queue descriptor");
303 return mq;
306 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
307 static void attr_from_struct(struct mq_attr *attr, VALUE astruct, int all)
309 VALUE *ptr;
311 if (CLASS_OF(astruct) != cAttr) {
312 RB_GC_GUARD(astruct) = rb_inspect(astruct);
313 rb_raise(rb_eArgError, "not a POSIX_MQ::Attr: %s",
314 RSTRING_PTR(astruct));
317 ptr = RSTRUCT_PTR(astruct);
319 attr->mq_flags = NUM2LONG(ptr[0]);
321 if (all || !NIL_P(ptr[1]))
322 attr->mq_maxmsg = NUM2LONG(ptr[1]);
323 if (all || !NIL_P(ptr[2]))
324 attr->mq_msgsize = NUM2LONG(ptr[2]);
325 if (!NIL_P(ptr[3]))
326 attr->mq_curmsgs = NUM2LONG(ptr[3]);
330 * call-seq:
331 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
333 * Opens a POSIX message queue given by +name+. +name+ should start
334 * with a slash ("/") for portable applications.
336 * If a Symbol is given in place of integer +flags+, then:
338 * * +:r+ is equivalent to IO::RDONLY
339 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
340 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
342 * +mode+ is an integer and only used when IO::CREAT is used.
343 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
344 * If +mq_attr+ is not specified when creating a queue, then the
345 * system defaults will be used.
347 * See the manpage for mq_open(3) for more details on this function.
349 static VALUE init(int argc, VALUE *argv, VALUE self)
351 struct posix_mq *mq = get(self, 0);
352 struct open_args x;
353 VALUE name, oflags, mode, attr;
355 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
357 if (TYPE(name) != T_STRING)
358 rb_raise(rb_eArgError, "name must be a string");
360 switch (TYPE(oflags)) {
361 case T_NIL:
362 x.oflags = O_RDONLY;
363 break;
364 case T_SYMBOL:
365 if (oflags == sym_r)
366 x.oflags = O_RDONLY;
367 else if (oflags == sym_w)
368 x.oflags = O_CREAT|O_WRONLY;
369 else if (oflags == sym_rw)
370 x.oflags = O_CREAT|O_RDWR;
371 else {
372 RB_GC_GUARD(oflags) = oflags;
373 rb_raise(rb_eArgError,
374 "symbol must be :r, :w, or :rw: %s",
375 RSTRING_PTR(oflags));
377 break;
378 case T_BIGNUM:
379 case T_FIXNUM:
380 x.oflags = NUM2INT(oflags);
381 break;
382 default:
383 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
386 x.name = RSTRING_PTR(name);
387 x.argc = 2;
389 switch (TYPE(mode)) {
390 case T_FIXNUM:
391 x.argc = 3;
392 x.mode = NUM2UINT(mode);
393 break;
394 case T_NIL:
395 if (x.oflags & O_CREAT) {
396 x.argc = 3;
397 x.mode = 0666;
399 break;
400 default:
401 rb_raise(rb_eArgError, "mode not an integer");
404 switch (TYPE(attr)) {
405 case T_STRUCT:
406 x.argc = 4;
407 attr_from_struct(&x.attr, attr, 1);
409 /* principle of least surprise */
410 if (x.attr.mq_flags & O_NONBLOCK)
411 x.oflags |= O_NONBLOCK;
412 break;
413 case T_NIL:
414 break;
415 default:
416 RB_GC_GUARD(attr) = rb_inspect(attr);
417 rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
418 RSTRING_PTR(attr));
421 mq->des = (mqd_t)xopen(&x);
422 if (mq->des == MQD_INVALID) {
423 if (errno == ENOMEM || errno == EMFILE || errno == ENFILE) {
424 rb_gc();
425 mq->des = (mqd_t)xopen(&x);
427 if (mq->des == MQD_INVALID)
428 rb_sys_fail("mq_open");
431 mq->name = rb_str_dup(name);
432 if (x.oflags & O_NONBLOCK)
433 mq->attr.mq_flags = O_NONBLOCK;
435 return self;
439 * call-seq:
440 * POSIX_MQ.unlink(name) => 1
442 * Unlinks the message queue given by +name+. The queue will be destroyed
443 * when the last process with the queue open closes its queue descriptors.
445 static VALUE s_unlink(VALUE self, VALUE name)
447 mqd_t rv;
449 if (TYPE(name) != T_STRING)
450 rb_raise(rb_eArgError, "argument must be a string");
452 rv = mq_unlink(RSTRING_PTR(name));
453 if (rv == MQD_INVALID)
454 rb_sys_fail("mq_unlink");
456 return INT2NUM(1);
460 * call-seq:
461 * mq.unlink => mq
463 * Unlinks the message queue to prevent other processes from accessing it.
464 * All existing queue descriptors to this queue including those opened by
465 * other processes are unaffected. The queue will only be destroyed
466 * when the last process with open descriptors to this queue closes
467 * the descriptors.
469 static VALUE _unlink(VALUE self)
471 struct posix_mq *mq = get(self, 0);
472 mqd_t rv;
474 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
476 rv = mq_unlink(RSTRING_PTR(mq->name));
477 if (rv == MQD_INVALID)
478 rb_sys_fail("mq_unlink");
480 return self;
483 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
485 buffer = rb_obj_as_string(buffer);
486 x->msg_ptr = RSTRING_PTR(buffer);
487 x->msg_len = (size_t)RSTRING_LEN(buffer);
491 * call-seq:
492 * mq.send(string [,priority[, timeout]]) => nil
494 * Inserts the given +string+ into the message queue with an optional,
495 * unsigned integer +priority+. If the optional +timeout+ is specified,
496 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
497 * before +timeout+ seconds has elapsed. Without +timeout+, this method
498 * may block until the queue is writable.
500 static VALUE _send(int argc, VALUE *argv, VALUE self)
502 struct posix_mq *mq = get(self, 1);
503 struct rw_args x;
504 VALUE buffer, prio, timeout;
505 mqd_t rv;
506 struct timespec expire;
508 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
510 setup_send_buffer(&x, buffer);
511 x.des = mq->des;
512 x.timeout = convert_timeout(&expire, timeout);
513 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
515 if (mq->attr.mq_flags & O_NONBLOCK)
516 rv = (mqd_t)xsend(&x);
517 else
518 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
519 RUBY_UBF_IO, 0);
520 if (rv == MQD_INVALID)
521 rb_sys_fail("mq_send");
523 return Qnil;
527 * call-seq:
528 * mq << string => mq
530 * Inserts the given +string+ into the message queue with a
531 * default priority of 0 and no timeout.
533 static VALUE send0(VALUE self, VALUE buffer)
535 struct posix_mq *mq = get(self, 1);
536 struct rw_args x;
537 mqd_t rv;
539 setup_send_buffer(&x, buffer);
540 x.des = mq->des;
541 x.timeout = NULL;
542 x.msg_prio = 0;
544 if (mq->attr.mq_flags & O_NONBLOCK)
545 rv = (mqd_t)xsend(&x);
546 else
547 rv = (mqd_t)rb_thread_blocking_region(xsend, &x,
548 RUBY_UBF_IO, 0);
550 if (rv == MQD_INVALID)
551 rb_sys_fail("mq_send");
553 return self;
556 #ifdef MQD_TO_FD
558 * call-seq:
559 * mq.to_io => IO
561 * Returns an IO.select-able +IO+ object. This method is only available
562 * under Linux and FreeBSD and is not intended to be portable.
564 static VALUE to_io(VALUE self)
566 struct posix_mq *mq = get(self, 1);
567 int fd = MQD_TO_FD(mq->des);
569 if (NIL_P(mq->io))
570 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
572 return mq->io;
574 #endif
576 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
579 * call-seq:
580 * mq.receive([buffer, [timeout]]) => [ message, priority ]
582 * Takes the highest priority message off the queue and returns
583 * an array containing the message as a String and the Integer
584 * priority of the message.
586 * If the optional +buffer+ is present, then it must be a String
587 * which will receive the data.
589 * If the optional +timeout+ is present, then it may be a Float
590 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
591 * will be raised if +timeout+ has elapsed and there are no messages
592 * in the queue.
594 static VALUE receive(int argc, VALUE *argv, VALUE self)
596 return _receive(1, argc, argv, self);
600 * call-seq:
601 * mq.shift([buffer, [timeout]]) => message
603 * Takes the highest priority message off the queue and returns
604 * the message as a String.
606 * If the optional +buffer+ is present, then it must be a String
607 * which will receive the data.
609 * If the optional +timeout+ is present, then it may be a Float
610 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
611 * will be raised if +timeout+ has elapsed and there are no messages
612 * in the queue.
614 static VALUE shift(int argc, VALUE *argv, VALUE self)
616 return _receive(0, argc, argv, self);
619 static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
621 struct posix_mq *mq = get(self, 1);
622 struct rw_args x;
623 VALUE buffer, timeout;
624 ssize_t r;
625 struct timespec expire;
627 if (mq->attr.mq_msgsize < 0) {
628 if (mq_getattr(mq->des, &mq->attr) < 0)
629 rb_sys_fail("mq_getattr");
632 rb_scan_args(argc, argv, "02", &buffer, &timeout);
633 x.timeout = convert_timeout(&expire, timeout);
635 if (NIL_P(buffer)) {
636 buffer = rb_str_new(0, mq->attr.mq_msgsize);
637 } else {
638 StringValue(buffer);
639 rb_str_modify(buffer);
640 rb_str_resize(buffer, mq->attr.mq_msgsize);
642 OBJ_TAINT(buffer);
643 x.msg_ptr = RSTRING_PTR(buffer);
644 x.msg_len = (size_t)mq->attr.mq_msgsize;
645 x.des = mq->des;
647 if (mq->attr.mq_flags & O_NONBLOCK) {
648 r = (ssize_t)xrecv(&x);
649 } else {
650 r = (ssize_t)rb_thread_blocking_region(xrecv, &x,
651 RUBY_UBF_IO, 0);
653 if (r < 0)
654 rb_sys_fail("mq_receive");
656 rb_str_set_len(buffer, r);
658 if (wantarray)
659 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
660 return buffer;
664 * call-seq:
665 * mq.attr => mq_attr
667 * Returns a POSIX_MQ::Attr struct containing the attributes
668 * of the message queue. See the mq_getattr(3) manpage for
669 * more details.
671 static VALUE getattr(VALUE self)
673 struct posix_mq *mq = get(self, 1);
674 VALUE astruct;
675 VALUE *ptr;
677 if (mq_getattr(mq->des, &mq->attr) < 0)
678 rb_sys_fail("mq_getattr");
680 astruct = rb_struct_alloc_noinit(cAttr);
681 ptr = RSTRUCT_PTR(astruct);
682 ptr[0] = LONG2NUM(mq->attr.mq_flags);
683 ptr[1] = LONG2NUM(mq->attr.mq_maxmsg);
684 ptr[2] = LONG2NUM(mq->attr.mq_msgsize);
685 ptr[3] = LONG2NUM(mq->attr.mq_curmsgs);
687 return astruct;
691 * call-seq:
692 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
694 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
695 * See the mq_setattr(3) manpage for more details.
697 * Consider using the POSIX_MQ#nonblock= method as it is easier and
698 * more natural to use.
700 static VALUE setattr(VALUE self, VALUE astruct)
702 struct posix_mq *mq = get(self, 1);
703 struct mq_attr newattr;
705 attr_from_struct(&newattr, astruct, 0);
707 if (mq_setattr(mq->des, &newattr, NULL) < 0)
708 rb_sys_fail("mq_setattr");
710 return astruct;
714 * call-seq:
715 * mq.close => nil
717 * Closes the underlying message queue descriptor.
718 * If this descriptor had a registered notification request, the request
719 * will be removed so another descriptor or process may register a
720 * notification request. Message queue descriptors are automatically
721 * closed by garbage collection.
723 static VALUE _close(VALUE self)
725 struct posix_mq *mq = get(self, 1);
727 if (! MQ_IO_CLOSE(mq)) {
728 if (mq_close(mq->des) == -1)
729 rb_sys_fail("mq_close");
731 mq->des = MQD_INVALID;
733 return Qnil;
737 * call-seq:
738 * mq.closed? => true or false
740 * Returns +true+ if the message queue descriptor is closed and therefore
741 * unusable, otherwise +false+
743 static VALUE closed(VALUE self)
745 struct posix_mq *mq = get(self, 0);
747 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
751 * call-seq:
752 * mq.name => string
754 * Returns the string name of message queue associated with +mq+
756 static VALUE name(VALUE self)
758 struct posix_mq *mq = get(self, 0);
760 return rb_str_dup(mq->name);
763 static int lookup_sig(VALUE sig)
765 static VALUE list;
766 const char *ptr;
767 long len;
769 sig = rb_obj_as_string(sig);
770 len = RSTRING_LEN(sig);
771 ptr = RSTRING_PTR(sig);
773 if (len > 3 && !memcmp("SIG", ptr, 3))
774 sig = rb_str_new(ptr + 3, len - 3);
776 if (!list) {
777 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
779 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
780 rb_global_variable(&list);
783 sig = rb_hash_aref(list, sig);
784 if (NIL_P(sig))
785 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
787 return NUM2INT(sig);
791 * TODO: Under Linux, we could just use netlink directly
792 * the same way glibc does...
794 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
795 static void thread_notify_fd(union sigval sv)
797 int fd = sv.sival_int;
799 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
802 static void my_mq_notify(mqd_t des, struct sigevent *not)
804 mqd_t rv = mq_notify(des, not);
806 if (rv == MQD_INVALID) {
807 if (errno == ENOMEM) {
808 rb_gc();
809 rv = mq_notify(des, not);
811 if (rv == MQD_INVALID)
812 rb_sys_fail("mq_notify");
816 /* :nodoc: */
817 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
819 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
820 struct posix_mq *mq = get(self, 1);
821 struct sigevent not;
822 pthread_attr_t attr;
824 errno = pthread_attr_init(&attr);
825 if (errno) rb_sys_fail("pthread_attr_init");
827 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
828 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
830 #ifdef PTHREAD_STACK_MIN
831 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
832 #endif
834 not.sigev_notify = SIGEV_THREAD;
835 not.sigev_notify_function = thread_notify_fd;
836 not.sigev_notify_attributes = &attr;
837 not.sigev_value.sival_int = fd;
839 if (!NIL_P(mq->thread))
840 rb_funcall(mq->thread, id_kill, 0, 0);
841 mq->thread = thr;
843 my_mq_notify(mq->des, &not);
845 return thr;
848 /* :nodoc: */
849 static VALUE notify_cleanup(VALUE self)
851 struct posix_mq *mq = get(self, 1);
853 if (!NIL_P(mq->thread)) {
854 rb_funcall(mq->thread, id_kill, 0, 0);
855 mq->thread = Qnil;
857 return Qnil;
861 * call-seq:
862 * mq.notify = signal => signal
864 * Registers the notification request to deliver a given +signal+
865 * to the current process when message is received.
866 * If +signal+ is +nil+, it will unregister and disable the notification
867 * request to allow other processes to register a request.
868 * If +signal+ is +false+, it will register a no-op notification request
869 * which will prevent other processes from registering a notification.
870 * If +signal+ is an +IO+ object, it will spawn a thread upon the
871 * arrival of the next message and write one "\\0" byte to the file
872 * descriptor belonging to that IO object.
873 * Only one process may have a notification request for a queue
874 * at a time, Errno::EBUSY will be raised if there is already
875 * a notification request registration for the queue.
877 * Notifications are only fired once and processes must reregister
878 * for subsequent notifications.
880 * For readers of the mq_notify(3) manpage, passing +false+
881 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
882 * of passing a NULL notification pointer to mq_notify(3).
884 static VALUE setnotify(VALUE self, VALUE arg)
886 struct posix_mq *mq = get(self, 1);
887 struct sigevent not;
888 struct sigevent * notification = &not;
889 VALUE rv = arg;
891 notify_cleanup(self);
892 not.sigev_notify = SIGEV_SIGNAL;
894 switch (TYPE(arg)) {
895 case T_FALSE:
896 not.sigev_notify = SIGEV_NONE;
897 break;
898 case T_NIL:
899 notification = NULL;
900 break;
901 case T_FIXNUM:
902 not.sigev_signo = NUM2INT(arg);
903 break;
904 case T_SYMBOL:
905 case T_STRING:
906 not.sigev_signo = lookup_sig(arg);
907 rv = INT2NUM(not.sigev_signo);
908 break;
909 default:
910 rb_raise(rb_eArgError, "must be a signal or nil");
913 my_mq_notify(mq->des, notification);
915 return rv;
919 * call-seq:
920 * mq.nonblock? => true or false
922 * Returns the current non-blocking state of the message queue descriptor.
924 static VALUE getnonblock(VALUE self)
926 struct posix_mq *mq = get(self, 1);
928 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
932 * call-seq:
933 * mq.nonblock = boolean => boolean
935 * Enables or disables non-blocking operation for the message queue
936 * descriptor. Errno::EAGAIN will be raised in situations where
937 * the queue would block. This is not compatible with +timeout+
938 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
940 static VALUE setnonblock(VALUE self, VALUE nb)
942 struct mq_attr newattr;
943 struct posix_mq *mq = get(self, 1);
945 if (nb == Qtrue)
946 newattr.mq_flags = O_NONBLOCK;
947 else if (nb == Qfalse)
948 newattr.mq_flags = 0;
949 else
950 rb_raise(rb_eArgError, "must be true or false");
952 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
953 rb_sys_fail("mq_setattr");
955 mq->attr.mq_flags = newattr.mq_flags;
957 return nb;
960 void Init_posix_mq_ext(void)
962 cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
963 rb_define_alloc_func(cPOSIX_MQ, alloc);
964 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
967 * The maximum number of open message descriptors supported
968 * by the system. This may be -1, in which case it is dynamically
969 * set at runtime. Consult your operating system documentation
970 * for system-specific information about this.
972 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
973 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
976 * The maximum priority that may be specified for POSIX_MQ#send
977 * On POSIX-compliant systems, this is at least 31, but some
978 * systems allow higher limits.
979 * The minimum priority is always zero.
981 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
982 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
984 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
986 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
987 rb_define_method(cPOSIX_MQ, "send", _send, -1);
988 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
989 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
990 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
991 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
992 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
993 rb_define_method(cPOSIX_MQ, "close", _close, 0);
994 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
995 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
996 rb_define_method(cPOSIX_MQ, "name", name, 0);
997 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
998 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
999 rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1000 rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1001 rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
1002 #ifdef MQD_TO_FD
1003 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1004 #endif
1006 id_new = rb_intern("new");
1007 id_kill = rb_intern("kill");
1008 id_fileno = rb_intern("fileno");
1009 id_mul = rb_intern("*");
1010 id_divmod = rb_intern("divmod");
1011 sym_r = ID2SYM(rb_intern("r"));
1012 sym_w = ID2SYM(rb_intern("w"));
1013 sym_rw = ID2SYM(rb_intern("rw"));