Ability to adopt file descriptors
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob5e8122e3c41f095c49fe2fbce38cc03f2982c25f
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <float.h>
25 #include <math.h>
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 # define FD_TO_MQD(fd) (mqd_t)(fd)
30 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
31 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
32 #else
33 # define MQ_IO_MARK(mq) ((void)(0))
34 # define MQ_IO_SET(mq,val) ((void)(0))
35 # define MQ_IO_CLOSE(mq) ((int)(0))
36 # define MQ_IO_NIL_P(mq) ((int)(1))
37 #endif
39 struct posix_mq {
40 mqd_t des;
41 struct mq_attr attr;
42 VALUE name;
43 VALUE thread;
44 #ifdef MQD_TO_FD
45 VALUE io;
46 #endif
49 #ifdef MQD_TO_FD
50 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
51 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
52 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
53 static int MQ_IO_CLOSE(struct posix_mq *mq)
55 if (NIL_P(mq->io))
56 return 0;
58 /* not safe during GC */
59 rb_io_close(mq->io);
60 mq->io = Qnil;
62 return 1;
64 #endif
66 # define PMQ_WANTARRAY (1<<0)
67 # define PMQ_TRY (1<<1)
69 static VALUE cAttr;
70 static ID id_new, id_kill, id_fileno, id_divmod;
71 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
72 static VALUE sym_r, sym_w, sym_rw;
73 static const mqd_t MQD_INVALID = (mqd_t)-1;
75 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
76 #ifndef RSTRING_PTR
77 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
78 #endif
79 #ifndef RSTRING_LEN
80 # define RSTRING_LEN(s) (RSTRING(s)->len)
81 #endif
82 #ifndef RFLOAT_VALUE
83 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
84 #endif
86 #ifndef HAVE_RB_STR_SET_LEN
87 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
88 # ifdef RUBINIUS
89 # error upgrade Rubinius, rb_str_set_len should be available
90 # endif
91 static void rb_18_str_set_len(VALUE str, long len)
93 RSTRING(str)->len = len;
94 RSTRING(str)->ptr[len] = '\0';
96 #define rb_str_set_len rb_18_str_set_len
97 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
99 #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
100 /* Ruby 2.0+ */
101 # include <ruby/thread.h>
102 # define WITHOUT_GVL(fn,a,ubf,b) \
103 rb_thread_call_without_gvl((fn),(a),(ubf),(b))
104 #elif defined(HAVE_RB_THREAD_BLOCKING_REGION)
105 typedef VALUE (*my_blocking_fn_t)(void*);
106 # define WITHOUT_GVL(fn,a,ubf,b) \
107 rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
109 #else /* Ruby 1.8 */
110 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
111 # include <rubysig.h>
112 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
113 typedef void rb_unblock_function_t(void *);
114 typedef void * rb_blocking_function_t(void *);
115 static void * WITHOUT_GVL(rb_blocking_function_t *func, void *data1,
116 rb_unblock_function_t *ubf, void *data2)
118 void *rv;
120 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
122 TRAP_BEG;
123 rv = func(data1);
124 TRAP_END;
126 return rv;
128 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
130 /* used to pass arguments to mq_open inside blocking region */
131 struct open_args {
132 mqd_t des;
133 int argc;
134 const char *name;
135 int oflags;
136 mode_t mode;
137 struct mq_attr attr;
140 /* used to pass arguments to mq_send/mq_receive inside blocking region */
141 struct rw_args {
142 mqd_t des;
143 union {
144 ssize_t received;
145 int retval;
147 char *msg_ptr;
148 size_t msg_len;
149 unsigned msg_prio;
150 struct timespec *timeout;
153 #ifndef HAVE_MQ_TIMEDSEND
154 static mqd_t
155 not_timedsend(mqd_t mqdes, const char *msg_ptr,
156 size_t msg_len, unsigned msg_prio,
157 const struct timespec *abs_timeout)
159 rb_bug("mq_timedsend workaround failed");
160 return (mqd_t)-1;
162 # define mq_timedsend not_timedsend
163 #endif
164 #ifndef HAVE_MQ_TIMEDRECEIVE
165 static ssize_t
166 not_timedreceive(mqd_t mqdes, char *msg_ptr,
167 size_t msg_len, unsigned *msg_prio,
168 const struct timespec *abs_timeout)
170 rb_bug("mq_timedreceive workaround failed");
171 return (mqd_t)-1;
173 # define mq_timedreceive not_timedreceive
174 #endif
176 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
177 static void num2timespec(struct timespec *ts, VALUE t)
179 switch (TYPE(t)) {
180 case T_FIXNUM:
181 case T_BIGNUM:
182 ts->tv_sec = NUM2TIMET(t);
183 ts->tv_nsec = 0;
184 break;
185 case T_FLOAT: {
186 double f, d;
187 double val = RFLOAT_VALUE(t);
189 d = modf(val, &f);
190 if (d >= 0) {
191 ts->tv_nsec = (long)(d * 1e9 + 0.5);
192 } else {
193 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
194 if (ts->tv_nsec > 0) {
195 ts->tv_nsec = 1000000000 - ts->tv_nsec;
196 f -= 1;
199 ts->tv_sec = (time_t)f;
200 if (f != ts->tv_sec)
201 rb_raise(rb_eRangeError, "%f out of range", val);
203 break;
204 default: {
205 VALUE f;
206 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
208 Check_Type(ary, T_ARRAY);
210 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
211 f = rb_ary_entry(ary, 1);
212 f = rb_funcall(f, '*', 1, INT2FIX(1000000000));
213 ts->tv_nsec = NUM2LONG(f);
217 #else
218 static void num2timespec(struct timespec *ts, VALUE t)
220 rb_raise(rb_eNotImpError,
221 "mq_timedsend and/or mq_timedreceive missing");
223 #endif
225 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
227 struct timespec ts, now;
229 if (NIL_P(t))
230 return NULL;
232 num2timespec(&ts, t);
233 clock_gettime(CLOCK_REALTIME, &now);
234 dest->tv_sec = now.tv_sec + ts.tv_sec;
235 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
237 if (dest->tv_nsec > 1000000000) {
238 dest->tv_nsec -= 1000000000;
239 ++dest->tv_sec;
242 return dest;
245 /* (may) run without GVL */
246 static void * xopen(void *ptr)
248 struct open_args *x = ptr;
250 switch (x->argc) {
251 case 2: x->des = mq_open(x->name, x->oflags); break;
252 case 3: x->des = mq_open(x->name, x->oflags, x->mode, NULL); break;
253 case 4: x->des = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
254 default: x->des = MQD_INVALID;
257 return NULL;
260 /* runs without GVL */
261 static void *xsend(void *ptr)
263 struct rw_args *x = ptr;
265 x->retval = x->timeout ?
266 mq_timedsend(x->des, x->msg_ptr, x->msg_len,
267 x->msg_prio, x->timeout) :
268 mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
270 return NULL;
273 /* runs without GVL */
274 static void * xrecv(void *ptr)
276 struct rw_args *x = ptr;
278 x->received = x->timeout ?
279 mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
280 &x->msg_prio, x->timeout) :
281 mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
283 return NULL;
286 /* called by GC */
287 static void mark(void *ptr)
289 struct posix_mq *mq = ptr;
291 rb_gc_mark(mq->name);
292 rb_gc_mark(mq->thread);
293 MQ_IO_MARK(mq);
296 /* called by GC */
297 static void _free(void *ptr)
299 struct posix_mq *mq = ptr;
301 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
302 /* we ignore errors when gc-ing */
303 mq_close(mq->des);
304 errno = 0;
306 xfree(ptr);
309 /* automatically called at creation (before initialize) */
310 static VALUE alloc(VALUE klass)
312 struct posix_mq *mq;
313 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
315 mq->des = MQD_INVALID;
316 mq->attr.mq_flags = 0;
317 mq->attr.mq_maxmsg = 0;
318 mq->attr.mq_msgsize = -1;
319 mq->attr.mq_curmsgs = 0;
320 mq->name = Qnil;
321 mq->thread = Qnil;
322 MQ_IO_SET(mq, Qnil);
324 return rv;
327 /* unwraps the posix_mq struct from self */
328 static struct posix_mq *get(VALUE self, int need_valid)
330 struct posix_mq *mq;
332 Data_Get_Struct(self, struct posix_mq, mq);
334 if (need_valid && mq->des == MQD_INVALID)
335 rb_raise(rb_eIOError, "closed queue descriptor");
337 return mq;
340 static void check_struct_type(VALUE astruct)
342 if (CLASS_OF(astruct) == cAttr)
343 return;
344 astruct = rb_inspect(astruct);
345 rb_raise(rb_eTypeError, "not a POSIX_MQ::Attr: %s",
346 StringValuePtr(astruct));
349 static void rstruct2mqattr(struct mq_attr *attr, VALUE astruct, int all)
351 VALUE tmp;
353 check_struct_type(astruct);
354 attr->mq_flags = NUM2LONG(rb_funcall(astruct, id_flags, 0));
356 tmp = rb_funcall(astruct, id_maxmsg, 0);
357 if (all || !NIL_P(tmp))
358 attr->mq_maxmsg = NUM2LONG(tmp);
360 tmp = rb_funcall(astruct, id_msgsize, 0);
361 if (all || !NIL_P(tmp))
362 attr->mq_msgsize = NUM2LONG(tmp);
364 tmp = rb_funcall(astruct, id_curmsgs, 0);
365 if (!NIL_P(tmp))
366 attr->mq_curmsgs = NUM2LONG(tmp);
369 #ifdef FD_TO_MQD
371 * call-seq:
372 * POSIX_MQ.for_fd(socket) => mq
374 * Adopts a socket as a POSIX message queue. Argument will be
375 * checked to ensure it is a POSIX message queue socket.
377 * This is useful for adopting systemd sockets passed via the
378 * ListenMessageQueue directive.
379 * Returns a +POSIX_MQ+ instance. This method is only available
380 * under Linux and FreeBSD and is not intended to be portable.
383 static VALUE for_fd(VALUE klass, VALUE socket)
385 VALUE mqv = alloc(klass);
386 struct posix_mq *mq = get(mqv, 0);
388 mq->name = Qnil;
389 mq->des = FD_TO_MQD(NUM2INT(socket));
391 if (mq_getattr(mq->des, &mq->attr) < 0)
392 rb_sys_fail("provided file descriptor is not a POSIX MQ");
394 return mqv;
396 #endif /* FD_TO_MQD */
399 * call-seq:
400 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
402 * Opens a POSIX message queue given by +name+. +name+ should start
403 * with a slash ("/") for portable applications.
405 * If a Symbol is given in place of integer +flags+, then:
407 * * +:r+ is equivalent to IO::RDONLY
408 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
409 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
411 * +mode+ is an integer and only used when IO::CREAT is used.
412 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
413 * If +mq_attr+ is not specified when creating a queue, then the
414 * system defaults will be used.
416 * See the manpage for mq_open(3) for more details on this function.
418 static VALUE init(int argc, VALUE *argv, VALUE self)
420 struct posix_mq *mq = get(self, 0);
421 struct open_args x;
422 VALUE name, oflags, mode, attr;
424 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
426 switch (TYPE(oflags)) {
427 case T_NIL:
428 x.oflags = O_RDONLY;
429 break;
430 case T_SYMBOL:
431 if (oflags == sym_r)
432 x.oflags = O_RDONLY;
433 else if (oflags == sym_w)
434 x.oflags = O_CREAT|O_WRONLY;
435 else if (oflags == sym_rw)
436 x.oflags = O_CREAT|O_RDWR;
437 else {
438 oflags = rb_inspect(oflags);
439 rb_raise(rb_eArgError,
440 "symbol must be :r, :w, or :rw: %s",
441 StringValuePtr(oflags));
443 break;
444 case T_BIGNUM:
445 case T_FIXNUM:
446 x.oflags = NUM2INT(oflags);
447 break;
448 default:
449 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
452 x.name = StringValueCStr(name);
453 x.argc = 2;
455 switch (TYPE(mode)) {
456 case T_FIXNUM:
457 x.argc = 3;
458 x.mode = NUM2UINT(mode);
459 break;
460 case T_NIL:
461 if (x.oflags & O_CREAT) {
462 x.argc = 3;
463 x.mode = 0666;
465 break;
466 default:
467 rb_raise(rb_eArgError, "mode not an integer");
470 switch (TYPE(attr)) {
471 case T_STRUCT:
472 x.argc = 4;
473 rstruct2mqattr(&x.attr, attr, 1);
475 /* principle of least surprise */
476 if (x.attr.mq_flags & O_NONBLOCK)
477 x.oflags |= O_NONBLOCK;
478 break;
479 case T_NIL:
480 break;
481 default:
482 check_struct_type(attr);
485 (void)xopen(&x);
486 mq->des = x.des;
487 if (mq->des == MQD_INVALID) {
488 switch (errno) {
489 case ENOMEM:
490 case EMFILE:
491 case ENFILE:
492 case ENOSPC:
493 rb_gc();
494 (void)xopen(&x);
495 mq->des = x.des;
497 if (mq->des == MQD_INVALID)
498 rb_sys_fail("mq_open");
501 mq->name = rb_str_dup(name);
502 if (x.oflags & O_NONBLOCK)
503 mq->attr.mq_flags = O_NONBLOCK;
505 return self;
509 * call-seq:
510 * POSIX_MQ.unlink(name) => 1
512 * Unlinks the message queue given by +name+. The queue will be destroyed
513 * when the last process with the queue open closes its queue descriptors.
515 static VALUE s_unlink(VALUE self, VALUE name)
517 int rv = mq_unlink(StringValueCStr(name));
519 if (rv == -1)
520 rb_sys_fail("mq_unlink");
522 return INT2NUM(1);
526 * call-seq:
527 * mq.unlink => mq
529 * Unlinks the message queue to prevent other processes from accessing it.
530 * All existing queue descriptors to this queue including those opened by
531 * other processes are unaffected. The queue will only be destroyed
532 * when the last process with open descriptors to this queue closes
533 * the descriptors.
535 static VALUE _unlink(VALUE self)
537 struct posix_mq *mq = get(self, 0);
538 int rv;
540 if (NIL_P(mq->name)) {
541 rb_raise(rb_eArgError, "can not unlink an adopted socket");
544 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
546 rv = mq_unlink(RSTRING_PTR(mq->name));
547 if (rv == -1)
548 rb_sys_fail("mq_unlink");
550 return self;
553 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
555 buffer = rb_obj_as_string(buffer);
556 x->msg_ptr = RSTRING_PTR(buffer);
557 x->msg_len = (size_t)RSTRING_LEN(buffer);
560 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self);
563 * call-seq:
564 * mq.send(string [,priority[, timeout]]) => true
566 * Inserts the given +string+ into the message queue with an optional,
567 * unsigned integer +priority+. If the optional +timeout+ is specified,
568 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
569 * before +timeout+ seconds has elapsed. Without +timeout+, this method
570 * may block until the queue is writable.
572 * On some older systems, the +timeout+ argument is not currently
573 * supported and may raise NotImplementedError if +timeout+ is used.
575 static VALUE my_send(int argc, VALUE *argv, VALUE self)
577 return _send(0, argc, argv, self);
580 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self)
582 struct posix_mq *mq = get(self, 1);
583 struct rw_args x;
584 VALUE buffer, prio, timeout;
585 struct timespec expire;
587 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
589 setup_send_buffer(&x, buffer);
590 x.des = mq->des;
591 x.timeout = convert_timeout(&expire, timeout);
592 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
594 retry:
595 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
596 if (x.retval == -1) {
597 if (errno == EINTR)
598 goto retry;
599 if (errno == EAGAIN && (sflags & PMQ_TRY))
600 return Qfalse;
601 rb_sys_fail("mq_send");
604 return Qtrue;
608 * call-seq:
609 * mq << string => mq
611 * Inserts the given +string+ into the message queue with a
612 * default priority of 0 and no timeout.
614 * Returns itself so its calls may be chained. This use is only
615 * recommended only for users who expect blocking behavior from
616 * the queue.
618 static VALUE send0(VALUE self, VALUE buffer)
620 struct posix_mq *mq = get(self, 1);
621 struct rw_args x;
623 setup_send_buffer(&x, buffer);
624 x.des = mq->des;
625 x.timeout = NULL;
626 x.msg_prio = 0;
628 retry:
629 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
630 if (x.retval == -1) {
631 if (errno == EINTR)
632 goto retry;
633 rb_sys_fail("mq_send");
636 return self;
639 #ifdef MQD_TO_FD
641 * call-seq:
642 * mq.to_io => IO
644 * Returns an IO.select-able +IO+ object. This method is only available
645 * under Linux and FreeBSD and is not intended to be portable.
647 static VALUE to_io(VALUE self)
649 struct posix_mq *mq = get(self, 1);
650 int fd = MQD_TO_FD(mq->des);
652 if (NIL_P(mq->io))
653 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
655 return mq->io;
657 #endif
659 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self);
662 * call-seq:
663 * mq.receive([buffer, [timeout]]) => [ message, priority ]
665 * Takes the highest priority message off the queue and returns
666 * an array containing the message as a String and the Integer
667 * priority of the message.
669 * If the optional +buffer+ is present, then it must be a String
670 * which will receive the data.
672 * If the optional +timeout+ is present, then it may be a Float
673 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
674 * will be raised if +timeout+ has elapsed and there are no messages
675 * in the queue.
677 * On some older systems, the +timeout+ argument is not currently
678 * supported and may raise NotImplementedError if +timeout+ is used.
680 static VALUE receive(int argc, VALUE *argv, VALUE self)
682 return _receive(PMQ_WANTARRAY, argc, argv, self);
686 * call-seq:
687 * mq.shift([buffer, [timeout]]) => message
689 * Takes the highest priority message off the queue and returns
690 * the message as a String.
692 * If the optional +buffer+ is present, then it must be a String
693 * which will receive the data.
695 * If the optional +timeout+ is present, then it may be a Float
696 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
697 * will be raised if +timeout+ has elapsed and there are no messages
698 * in the queue.
700 * On some older systems, the +timeout+ argument is not currently
701 * supported and may raise NotImplementedError if +timeout+ is used.
703 static VALUE shift(int argc, VALUE *argv, VALUE self)
705 return _receive(0, argc, argv, self);
708 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self)
710 struct posix_mq *mq = get(self, 1);
711 struct rw_args x;
712 VALUE buffer, timeout;
713 struct timespec expire;
715 if (mq->attr.mq_msgsize < 0) {
716 if (mq_getattr(mq->des, &mq->attr) < 0)
717 rb_sys_fail("mq_getattr");
720 rb_scan_args(argc, argv, "02", &buffer, &timeout);
721 x.timeout = convert_timeout(&expire, timeout);
723 if (NIL_P(buffer)) {
724 buffer = rb_str_new(0, mq->attr.mq_msgsize);
725 } else {
726 StringValue(buffer);
727 rb_str_modify(buffer);
728 rb_str_resize(buffer, mq->attr.mq_msgsize);
730 OBJ_TAINT(buffer);
731 x.msg_ptr = RSTRING_PTR(buffer);
732 x.msg_len = (size_t)mq->attr.mq_msgsize;
733 x.des = mq->des;
735 retry:
736 WITHOUT_GVL(xrecv, &x, RUBY_UBF_IO, 0);
737 if (x.received < 0) {
738 if (errno == EINTR)
739 goto retry;
740 if (errno == EAGAIN && (rflags & PMQ_TRY))
741 return Qnil;
742 rb_sys_fail("mq_receive");
745 rb_str_set_len(buffer, x.received);
747 if (rflags & PMQ_WANTARRAY)
748 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
749 return buffer;
753 * call-seq:
754 * mq.attr => mq_attr
756 * Returns a POSIX_MQ::Attr struct containing the attributes
757 * of the message queue. See the mq_getattr(3) manpage for
758 * more details.
760 static VALUE getattr(VALUE self)
762 struct posix_mq *mq = get(self, 1);
764 if (mq_getattr(mq->des, &mq->attr) < 0)
765 rb_sys_fail("mq_getattr");
767 return rb_funcall(cAttr, id_new, 4,
768 LONG2NUM(mq->attr.mq_flags),
769 LONG2NUM(mq->attr.mq_maxmsg),
770 LONG2NUM(mq->attr.mq_msgsize),
771 LONG2NUM(mq->attr.mq_curmsgs));
775 * call-seq:
776 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
778 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
779 * See the mq_setattr(3) manpage for more details.
781 * Consider using the POSIX_MQ#nonblock= method as it is easier and
782 * more natural to use.
784 static VALUE setattr(VALUE self, VALUE astruct)
786 struct posix_mq *mq = get(self, 1);
787 struct mq_attr newattr;
789 rstruct2mqattr(&newattr, astruct, 0);
791 if (mq_setattr(mq->des, &newattr, NULL) < 0)
792 rb_sys_fail("mq_setattr");
794 return astruct;
798 * call-seq:
799 * mq.close => nil
801 * Closes the underlying message queue descriptor.
802 * If this descriptor had a registered notification request, the request
803 * will be removed so another descriptor or process may register a
804 * notification request. Message queue descriptors are automatically
805 * closed by garbage collection.
807 static VALUE _close(VALUE self)
809 struct posix_mq *mq = get(self, 1);
811 if (! MQ_IO_CLOSE(mq)) {
812 if (mq_close(mq->des) == -1)
813 rb_sys_fail("mq_close");
815 mq->des = MQD_INVALID;
817 return Qnil;
821 * call-seq:
822 * mq.closed? => true or false
824 * Returns +true+ if the message queue descriptor is closed and therefore
825 * unusable, otherwise +false+
827 static VALUE closed(VALUE self)
829 struct posix_mq *mq = get(self, 0);
831 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
835 * call-seq:
836 * mq.name => string
838 * Returns the string name of message queue associated with +mq+
840 static VALUE name(VALUE self)
842 struct posix_mq *mq = get(self, 0);
844 if (NIL_P(mq->name)) {
846 * We could use readlink(2) on /proc/self/fd/N, but lots of
847 * care required.
848 * http://stackoverflow.com/questions/1188757/
850 rb_raise(rb_eArgError, "can not get name of an adopted socket");
853 return rb_str_dup(mq->name);
856 static int lookup_sig(VALUE sig)
858 static VALUE list;
859 const char *ptr;
860 long len;
862 sig = rb_obj_as_string(sig);
863 len = RSTRING_LEN(sig);
864 ptr = RSTRING_PTR(sig);
866 if (len > 3 && !memcmp("SIG", ptr, 3))
867 sig = rb_str_new(ptr + 3, len - 3);
869 if (!list) {
870 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
872 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
873 rb_global_variable(&list);
876 sig = rb_hash_aref(list, sig);
877 if (NIL_P(sig))
878 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
880 return NUM2INT(sig);
884 * TODO: Under Linux, we could just use netlink directly
885 * the same way glibc does...
887 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
888 static void thread_notify_fd(union sigval sv)
890 int fd = sv.sival_int;
892 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
895 static void my_mq_notify(mqd_t des, struct sigevent *not)
897 int rv = mq_notify(des, not);
899 if (rv == -1) {
900 if (errno == ENOMEM) {
901 rb_gc();
902 rv = mq_notify(des, not);
904 if (rv == -1)
905 rb_sys_fail("mq_notify");
909 static void lower_stack_size(pthread_attr_t *attr)
911 /* some OSes have ridiculously small stack sizes */
912 #ifdef PTHREAD_STACK_MIN
913 size_t stack_size = PTHREAD_STACK_MIN;
914 size_t min_size = 4096;
916 if (stack_size < min_size)
917 stack_size = min_size;
918 pthread_attr_setstacksize(attr, stack_size);
919 #endif
922 /* :nodoc: */
923 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
925 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
926 struct posix_mq *mq = get(self, 1);
927 struct sigevent not;
928 pthread_attr_t attr;
930 errno = pthread_attr_init(&attr);
931 if (errno) rb_sys_fail("pthread_attr_init");
933 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
934 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
936 lower_stack_size(&attr);
937 not.sigev_notify = SIGEV_THREAD;
938 not.sigev_notify_function = thread_notify_fd;
939 not.sigev_notify_attributes = &attr;
940 not.sigev_value.sival_int = fd;
942 if (!NIL_P(mq->thread))
943 rb_funcall(mq->thread, id_kill, 0, 0);
944 mq->thread = thr;
946 my_mq_notify(mq->des, &not);
948 return thr;
951 /* :nodoc: */
952 static VALUE notify_cleanup(VALUE self)
954 struct posix_mq *mq = get(self, 1);
956 if (!NIL_P(mq->thread)) {
957 rb_funcall(mq->thread, id_kill, 0, 0);
958 mq->thread = Qnil;
960 return Qnil;
964 * call-seq:
965 * mq.notify = signal => signal
967 * Registers the notification request to deliver a given +signal+
968 * to the current process when message is received.
969 * If +signal+ is +nil+, it will unregister and disable the notification
970 * request to allow other processes to register a request.
971 * If +signal+ is +false+, it will register a no-op notification request
972 * which will prevent other processes from registering a notification.
973 * If +signal+ is an +IO+ object, it will spawn a thread upon the
974 * arrival of the next message and write one "\\0" byte to the file
975 * descriptor belonging to that IO object.
976 * Only one process may have a notification request for a queue
977 * at a time, Errno::EBUSY will be raised if there is already
978 * a notification request registration for the queue.
980 * Notifications are only fired once and processes must reregister
981 * for subsequent notifications.
983 * For readers of the mq_notify(3) manpage, passing +false+
984 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
985 * of passing a NULL notification pointer to mq_notify(3).
987 static VALUE setnotify(VALUE self, VALUE arg)
989 struct posix_mq *mq = get(self, 1);
990 struct sigevent not;
991 struct sigevent * notification = &not;
992 VALUE rv = arg;
994 notify_cleanup(self);
995 not.sigev_notify = SIGEV_SIGNAL;
997 switch (TYPE(arg)) {
998 case T_FALSE:
999 not.sigev_notify = SIGEV_NONE;
1000 break;
1001 case T_NIL:
1002 notification = NULL;
1003 break;
1004 case T_FIXNUM:
1005 not.sigev_signo = NUM2INT(arg);
1006 break;
1007 case T_SYMBOL:
1008 case T_STRING:
1009 not.sigev_signo = lookup_sig(arg);
1010 rv = INT2NUM(not.sigev_signo);
1011 break;
1012 default:
1013 rb_raise(rb_eArgError, "must be a signal or nil");
1016 my_mq_notify(mq->des, notification);
1018 return rv;
1022 * call-seq:
1023 * mq.nonblock? => true or false
1025 * Returns the current non-blocking state of the message queue descriptor.
1027 static VALUE nonblock_p(VALUE self)
1029 struct posix_mq *mq = get(self, 1);
1031 if (mq_getattr(mq->des, &mq->attr) < 0)
1032 rb_sys_fail("mq_getattr");
1033 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
1037 * call-seq:
1038 * mq.nonblock = boolean => boolean
1040 * Enables or disables non-blocking operation for the message queue
1041 * descriptor. Errno::EAGAIN will be raised in situations where
1042 * the queue would block. This is not compatible with +timeout+
1043 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
1045 static VALUE setnonblock(VALUE self, VALUE nb)
1047 struct mq_attr newattr;
1048 struct posix_mq *mq = get(self, 1);
1050 if (nb == Qtrue)
1051 newattr.mq_flags = O_NONBLOCK;
1052 else if (nb == Qfalse)
1053 newattr.mq_flags = 0;
1054 else
1055 rb_raise(rb_eArgError, "must be true or false");
1057 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
1058 rb_sys_fail("mq_setattr");
1060 mq->attr.mq_flags = newattr.mq_flags;
1062 return nb;
1066 * call-seq:
1067 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
1069 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
1070 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1071 * on success instead of +nil+.
1073 * This does not guarantee non-blocking behavior, the message queue must
1074 * be made non-blocking before calling this method.
1076 static VALUE trysend(int argc, VALUE *argv, VALUE self)
1078 return _send(PMQ_TRY, argc, argv, self);
1082 * call-seq:
1083 * mq.tryshift([buffer [, timeout]]) => message or nil
1085 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1086 * Errno::EAGAIN when non-blocking operation is desired.
1088 * This does not guarantee non-blocking behavior, the message queue must
1089 * be made non-blocking before calling this method.
1091 static VALUE tryshift(int argc, VALUE *argv, VALUE self)
1093 return _receive(PMQ_TRY, argc, argv, self);
1097 * call-seq:
1098 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1100 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1101 * Errno::EAGAIN when non-blocking operation is desired.
1103 * This does not guarantee non-blocking behavior, the message queue must
1104 * be made non-blocking before calling this method.
1106 static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
1108 return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
1111 void Init_posix_mq_ext(void)
1113 VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
1114 rb_define_alloc_func(cPOSIX_MQ, alloc);
1115 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
1118 * The maximum number of open message descriptors supported
1119 * by the system. This may be -1, in which case it is dynamically
1120 * set at runtime. Consult your operating system documentation
1121 * for system-specific information about this.
1123 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
1124 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
1127 * The maximum priority that may be specified for POSIX_MQ#send
1128 * On POSIX-compliant systems, this is at least 31, but some
1129 * systems allow higher limits.
1130 * The minimum priority is always zero.
1132 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
1133 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
1135 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
1137 rb_define_private_method(cPOSIX_MQ, "initialize", init, -1);
1138 rb_define_method(cPOSIX_MQ, "send", my_send, -1);
1139 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
1140 rb_define_method(cPOSIX_MQ, "trysend", trysend, -1);
1141 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
1142 rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1);
1143 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
1144 rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1);
1145 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
1146 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
1147 rb_define_method(cPOSIX_MQ, "close", _close, 0);
1148 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
1149 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
1150 rb_define_method(cPOSIX_MQ, "name", name, 0);
1151 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
1152 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
1153 rb_define_private_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1154 rb_define_private_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1155 rb_define_method(cPOSIX_MQ, "nonblock?", nonblock_p, 0);
1156 #ifdef MQD_TO_FD
1157 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1158 #endif
1160 #ifdef FD_TO_MQD
1161 rb_define_module_function(cPOSIX_MQ, "for_fd", for_fd, 1);
1162 #endif
1164 id_new = rb_intern("new");
1165 id_kill = rb_intern("kill");
1166 id_fileno = rb_intern("fileno");
1167 id_divmod = rb_intern("divmod");
1168 id_flags = rb_intern("flags");
1169 id_maxmsg = rb_intern("maxmsg");
1170 id_msgsize = rb_intern("msgsize");
1171 id_curmsgs = rb_intern("curmsgs");
1172 sym_r = ID2SYM(rb_intern("r"));
1173 sym_w = ID2SYM(rb_intern("w"));
1174 sym_rw = ID2SYM(rb_intern("rw"));