drop extra args to rb_funcall
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob6a5439ba75136647279bc8c6897a039f46995cd2
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <float.h>
25 #include <math.h>
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 # define FD_TO_MQD(fd) (mqd_t)(fd)
30 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
31 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
32 #else
33 # define MQ_IO_MARK(mq) ((void)(0))
34 # define MQ_IO_SET(mq,val) ((void)(0))
35 # define MQ_IO_CLOSE(mq) ((int)(0))
36 # define MQ_IO_NIL_P(mq) ((int)(1))
37 # define MQ_IO_SET_AUTOCLOSE(mq, boolean) for(;0;)
38 #endif
40 struct posix_mq {
41 mqd_t des;
42 unsigned autoclose:1;
43 struct mq_attr attr;
44 VALUE name;
45 VALUE thread;
46 #ifdef MQD_TO_FD
47 VALUE io;
48 #endif
51 #ifdef MQD_TO_FD
52 static ID id_setautoclose;
53 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
54 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
55 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
57 static void MQ_IO_SET_AUTOCLOSE(struct posix_mq *mq, VALUE boolean)
59 if (!NIL_P(mq->io))
60 rb_funcall(mq->io, id_setautoclose, 1, boolean);
63 static int MQ_IO_CLOSE(struct posix_mq *mq)
65 if (NIL_P(mq->io))
66 return 0;
68 /* not safe during GC */
69 rb_io_close(mq->io);
70 mq->io = Qnil;
72 return 1;
74 #endif
76 # define PMQ_WANTARRAY (1<<0)
77 # define PMQ_TRY (1<<1)
79 static VALUE cAttr;
80 static ID id_new, id_kill, id_fileno, id_divmod;
81 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
82 static VALUE sym_r, sym_w, sym_rw;
83 static const mqd_t MQD_INVALID = (mqd_t)-1;
85 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
86 #ifndef RSTRING_PTR
87 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
88 #endif
89 #ifndef RSTRING_LEN
90 # define RSTRING_LEN(s) (RSTRING(s)->len)
91 #endif
92 #ifndef RFLOAT_VALUE
93 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
94 #endif
96 #ifndef HAVE_RB_STR_SET_LEN
97 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
98 # ifdef RUBINIUS
99 # error upgrade Rubinius, rb_str_set_len should be available
100 # endif
101 static void rb_18_str_set_len(VALUE str, long len)
103 RSTRING(str)->len = len;
104 RSTRING(str)->ptr[len] = '\0';
106 #define rb_str_set_len rb_18_str_set_len
107 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
109 #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
110 /* Ruby 2.0+ */
111 # include <ruby/thread.h>
112 # define WITHOUT_GVL(fn,a,ubf,b) \
113 rb_thread_call_without_gvl((fn),(a),(ubf),(b))
114 #elif defined(HAVE_RB_THREAD_BLOCKING_REGION)
115 typedef VALUE (*my_blocking_fn_t)(void*);
116 # define WITHOUT_GVL(fn,a,ubf,b) \
117 rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
119 #else /* Ruby 1.8 */
120 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
121 # include <rubysig.h>
122 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
123 typedef void rb_unblock_function_t(void *);
124 typedef void * rb_blocking_function_t(void *);
125 static void * WITHOUT_GVL(rb_blocking_function_t *func, void *data1,
126 rb_unblock_function_t *ubf, void *data2)
128 void *rv;
130 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
132 TRAP_BEG;
133 rv = func(data1);
134 TRAP_END;
136 return rv;
138 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
140 /* used to pass arguments to mq_open inside blocking region */
141 struct open_args {
142 mqd_t des;
143 int argc;
144 const char *name;
145 int oflags;
146 mode_t mode;
147 struct mq_attr attr;
150 /* used to pass arguments to mq_send/mq_receive inside blocking region */
151 struct rw_args {
152 mqd_t des;
153 unsigned msg_prio;
154 union {
155 ssize_t received;
156 int retval;
158 char *msg_ptr;
159 size_t msg_len;
160 struct timespec *timeout;
163 #ifndef HAVE_MQ_TIMEDSEND
164 static mqd_t
165 not_timedsend(mqd_t mqdes, const char *msg_ptr,
166 size_t msg_len, unsigned msg_prio,
167 const struct timespec *abs_timeout)
169 rb_bug("mq_timedsend workaround failed");
170 return (mqd_t)-1;
172 # define mq_timedsend not_timedsend
173 #endif
174 #ifndef HAVE_MQ_TIMEDRECEIVE
175 static ssize_t
176 not_timedreceive(mqd_t mqdes, char *msg_ptr,
177 size_t msg_len, unsigned *msg_prio,
178 const struct timespec *abs_timeout)
180 rb_bug("mq_timedreceive workaround failed");
181 return (mqd_t)-1;
183 # define mq_timedreceive not_timedreceive
184 #endif
186 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
187 static void num2timespec(struct timespec *ts, VALUE t)
189 switch (TYPE(t)) {
190 case T_FIXNUM:
191 case T_BIGNUM:
192 ts->tv_sec = NUM2TIMET(t);
193 ts->tv_nsec = 0;
194 break;
195 case T_FLOAT: {
196 double f, d;
197 double val = RFLOAT_VALUE(t);
199 d = modf(val, &f);
200 if (d >= 0) {
201 ts->tv_nsec = (long)(d * 1e9 + 0.5);
202 } else {
203 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
204 if (ts->tv_nsec > 0) {
205 ts->tv_nsec = 1000000000 - ts->tv_nsec;
206 f -= 1;
209 ts->tv_sec = (time_t)f;
210 if (f != ts->tv_sec)
211 rb_raise(rb_eRangeError, "%f out of range", val);
213 break;
214 default: {
215 VALUE f;
216 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
218 Check_Type(ary, T_ARRAY);
220 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
221 f = rb_ary_entry(ary, 1);
222 f = rb_funcall(f, '*', 1, INT2FIX(1000000000));
223 ts->tv_nsec = NUM2LONG(f);
227 #else
228 static void num2timespec(struct timespec *ts, VALUE t)
230 rb_raise(rb_eNotImpError,
231 "mq_timedsend and/or mq_timedreceive missing");
233 #endif
235 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
237 struct timespec ts, now;
239 if (NIL_P(t))
240 return NULL;
242 num2timespec(&ts, t);
243 clock_gettime(CLOCK_REALTIME, &now);
244 dest->tv_sec = now.tv_sec + ts.tv_sec;
245 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
247 if (dest->tv_nsec > 1000000000) {
248 dest->tv_nsec -= 1000000000;
249 ++dest->tv_sec;
252 return dest;
255 /* (may) run without GVL */
256 static void * xopen(void *ptr)
258 struct open_args *x = ptr;
260 switch (x->argc) {
261 case 2: x->des = mq_open(x->name, x->oflags); break;
262 case 3: x->des = mq_open(x->name, x->oflags, x->mode, NULL); break;
263 case 4: x->des = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
264 default: x->des = MQD_INVALID;
267 return NULL;
270 /* runs without GVL */
271 static void *xsend(void *ptr)
273 struct rw_args *x = ptr;
275 x->retval = x->timeout ?
276 mq_timedsend(x->des, x->msg_ptr, x->msg_len,
277 x->msg_prio, x->timeout) :
278 mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
280 return NULL;
283 /* runs without GVL */
284 static void * xrecv(void *ptr)
286 struct rw_args *x = ptr;
288 x->received = x->timeout ?
289 mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
290 &x->msg_prio, x->timeout) :
291 mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
293 return NULL;
296 /* called by GC */
297 static void mark(void *ptr)
299 struct posix_mq *mq = ptr;
301 rb_gc_mark(mq->name);
302 rb_gc_mark(mq->thread);
303 MQ_IO_MARK(mq);
306 /* called by GC */
307 static void _free(void *ptr)
309 struct posix_mq *mq = ptr;
311 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
312 /* we ignore errors when gc-ing */
313 if (mq->autoclose)
314 mq_close(mq->des);
315 errno = 0;
317 xfree(ptr);
320 static size_t memsize(const void *ptr)
322 return sizeof(struct posix_mq);
325 static const rb_data_type_t mqtype = {
326 "posix_mq",
327 { mark, _free, memsize, /* reserved */ },
328 /* parent, data, [ flags ] */
331 /* automatically called at creation (before initialize) */
332 static VALUE alloc(VALUE klass)
334 struct posix_mq *mq;
335 VALUE rv = TypedData_Make_Struct(klass, struct posix_mq, &mqtype, mq);
337 mq->des = MQD_INVALID;
338 mq->autoclose = 1;
339 mq->attr.mq_flags = 0;
340 mq->attr.mq_maxmsg = 0;
341 mq->attr.mq_msgsize = -1;
342 mq->attr.mq_curmsgs = 0;
343 mq->name = Qnil;
344 mq->thread = Qnil;
345 MQ_IO_SET(mq, Qnil);
347 return rv;
350 /* unwraps the posix_mq struct from self */
351 static struct posix_mq *get(VALUE self, int need_valid)
353 struct posix_mq *mq;
355 TypedData_Get_Struct(self, struct posix_mq, &mqtype, mq);
357 if (need_valid && mq->des == MQD_INVALID)
358 rb_raise(rb_eIOError, "closed queue descriptor");
360 return mq;
363 static void check_struct_type(VALUE astruct)
365 if (CLASS_OF(astruct) == cAttr)
366 return;
367 astruct = rb_inspect(astruct);
368 rb_raise(rb_eTypeError, "not a POSIX_MQ::Attr: %s",
369 StringValuePtr(astruct));
372 static void rstruct2mqattr(struct mq_attr *attr, VALUE astruct, int all)
374 VALUE tmp;
376 check_struct_type(astruct);
377 attr->mq_flags = NUM2LONG(rb_funcall(astruct, id_flags, 0));
379 tmp = rb_funcall(astruct, id_maxmsg, 0);
380 if (all || !NIL_P(tmp))
381 attr->mq_maxmsg = NUM2LONG(tmp);
383 tmp = rb_funcall(astruct, id_msgsize, 0);
384 if (all || !NIL_P(tmp))
385 attr->mq_msgsize = NUM2LONG(tmp);
387 tmp = rb_funcall(astruct, id_curmsgs, 0);
388 if (!NIL_P(tmp))
389 attr->mq_curmsgs = NUM2LONG(tmp);
392 #ifdef FD_TO_MQD
394 * call-seq:
395 * POSIX_MQ.for_fd(socket) => mq
397 * Adopts a socket as a POSIX message queue. Argument will be
398 * checked to ensure it is a POSIX message queue socket.
400 * This is useful for adopting systemd sockets passed via the
401 * ListenMessageQueue directive.
402 * Returns a +POSIX_MQ+ instance. This method is only available
403 * under Linux and FreeBSD and is not intended to be portable.
406 static VALUE for_fd(VALUE klass, VALUE socket)
408 VALUE mqv = alloc(klass);
409 struct posix_mq *mq = get(mqv, 0);
410 mqd_t mqd;
412 mq->name = Qnil;
413 mqd = FD_TO_MQD(NUM2INT(socket));
415 if (mq_getattr(mqd, &mq->attr) < 0)
416 rb_sys_fail("provided file descriptor is not a POSIX MQ");
418 mq->des = mqd;
419 return mqv;
421 #endif /* FD_TO_MQD */
424 * call-seq:
425 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
427 * Opens a POSIX message queue given by +name+. +name+ should start
428 * with a slash ("/") for portable applications.
430 * If a Symbol is given in place of integer +flags+, then:
432 * * +:r+ is equivalent to IO::RDONLY
433 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
434 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
436 * +mode+ is an integer and only used when IO::CREAT is used.
437 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
438 * If +mq_attr+ is not specified when creating a queue, then the
439 * system defaults will be used.
441 * See the manpage for mq_open(3) for more details on this function.
443 static VALUE init(int argc, VALUE *argv, VALUE self)
445 struct posix_mq *mq = get(self, 0);
446 struct open_args x;
447 VALUE name, oflags, mode, attr;
449 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
451 switch (TYPE(oflags)) {
452 case T_NIL:
453 x.oflags = O_RDONLY;
454 break;
455 case T_SYMBOL:
456 if (oflags == sym_r)
457 x.oflags = O_RDONLY;
458 else if (oflags == sym_w)
459 x.oflags = O_CREAT|O_WRONLY;
460 else if (oflags == sym_rw)
461 x.oflags = O_CREAT|O_RDWR;
462 else {
463 oflags = rb_inspect(oflags);
464 rb_raise(rb_eArgError,
465 "symbol must be :r, :w, or :rw: %s",
466 StringValuePtr(oflags));
468 break;
469 case T_BIGNUM:
470 case T_FIXNUM:
471 x.oflags = NUM2INT(oflags);
472 break;
473 default:
474 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
477 x.name = StringValueCStr(name);
478 x.argc = 2;
480 switch (TYPE(mode)) {
481 case T_FIXNUM:
482 x.argc = 3;
483 x.mode = NUM2UINT(mode);
484 break;
485 case T_NIL:
486 if (x.oflags & O_CREAT) {
487 x.argc = 3;
488 x.mode = 0666;
490 break;
491 default:
492 rb_raise(rb_eArgError, "mode not an integer");
495 switch (TYPE(attr)) {
496 case T_STRUCT:
497 x.argc = 4;
498 rstruct2mqattr(&x.attr, attr, 1);
500 /* principle of least surprise */
501 if (x.attr.mq_flags & O_NONBLOCK)
502 x.oflags |= O_NONBLOCK;
503 break;
504 case T_NIL:
505 break;
506 default:
507 check_struct_type(attr);
510 (void)xopen(&x);
511 mq->des = x.des;
512 if (mq->des == MQD_INVALID) {
513 switch (errno) {
514 case ENOMEM:
515 case EMFILE:
516 case ENFILE:
517 case ENOSPC:
518 rb_gc();
519 (void)xopen(&x);
520 mq->des = x.des;
522 if (mq->des == MQD_INVALID)
523 rb_sys_fail("mq_open");
526 mq->name = rb_str_new_frozen(name);
527 if (x.oflags & O_NONBLOCK)
528 mq->attr.mq_flags = O_NONBLOCK;
530 return self;
534 * call-seq:
535 * POSIX_MQ.unlink(name) => 1
537 * Unlinks the message queue given by +name+. The queue will be destroyed
538 * when the last process with the queue open closes its queue descriptors.
540 static VALUE s_unlink(VALUE self, VALUE name)
542 int rv = mq_unlink(StringValueCStr(name));
544 if (rv < 0)
545 rb_sys_fail("mq_unlink");
547 return INT2NUM(1);
551 * call-seq:
552 * mq.unlink => mq
554 * Unlinks the message queue to prevent other processes from accessing it.
555 * All existing queue descriptors to this queue including those opened by
556 * other processes are unaffected. The queue will only be destroyed
557 * when the last process with open descriptors to this queue closes
558 * the descriptors.
560 static VALUE _unlink(VALUE self)
562 struct posix_mq *mq = get(self, 0);
563 int rv;
565 if (NIL_P(mq->name)) {
566 rb_raise(rb_eArgError, "can not unlink an adopted socket");
569 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
571 rv = mq_unlink(RSTRING_PTR(mq->name));
572 if (rv < 0)
573 rb_sys_fail("mq_unlink");
575 return self;
578 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
580 buffer = rb_obj_as_string(buffer);
581 x->msg_ptr = RSTRING_PTR(buffer);
582 x->msg_len = (size_t)RSTRING_LEN(buffer);
585 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self);
588 * call-seq:
589 * mq.send(string [,priority[, timeout]]) => true
591 * Inserts the given +string+ into the message queue with an optional,
592 * unsigned integer +priority+. If the optional +timeout+ is specified,
593 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
594 * before +timeout+ seconds has elapsed. Without +timeout+, this method
595 * may block until the queue is writable.
597 * On some older systems, the +timeout+ argument is not currently
598 * supported and may raise NotImplementedError if +timeout+ is used.
600 static VALUE my_send(int argc, VALUE *argv, VALUE self)
602 return _send(0, argc, argv, self);
605 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self)
607 struct posix_mq *mq = get(self, 1);
608 struct rw_args x;
609 VALUE buffer, prio, timeout;
610 struct timespec expire;
612 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
614 setup_send_buffer(&x, buffer);
615 x.des = mq->des;
616 x.timeout = convert_timeout(&expire, timeout);
617 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
619 retry:
620 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
621 if (x.retval < 0) {
622 if (errno == EINTR)
623 goto retry;
624 if (errno == EAGAIN && (sflags & PMQ_TRY))
625 return Qfalse;
626 rb_sys_fail("mq_send");
629 return Qtrue;
633 * call-seq:
634 * mq << string => mq
636 * Inserts the given +string+ into the message queue with a
637 * default priority of 0 and no timeout.
639 * Returns itself so its calls may be chained. This use is only
640 * recommended only for users who expect blocking behavior from
641 * the queue.
643 static VALUE send0(VALUE self, VALUE buffer)
645 struct posix_mq *mq = get(self, 1);
646 struct rw_args x;
648 setup_send_buffer(&x, buffer);
649 x.des = mq->des;
650 x.timeout = NULL;
651 x.msg_prio = 0;
653 retry:
654 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
655 if (x.retval < 0) {
656 if (errno == EINTR)
657 goto retry;
658 rb_sys_fail("mq_send");
661 return self;
664 #ifdef MQD_TO_FD
666 * call-seq:
667 * mq.to_io => IO
669 * Returns an IO.select-able +IO+ object. This method is only available
670 * under Linux and FreeBSD and is not intended to be portable.
672 static VALUE to_io(VALUE self)
674 struct posix_mq *mq = get(self, 1);
675 int fd = MQD_TO_FD(mq->des);
677 if (NIL_P(mq->io)) {
678 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
680 if (!mq->autoclose)
681 rb_funcall(mq->io, id_setautoclose, 1, Qfalse);
684 return mq->io;
686 #endif
688 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self);
691 * call-seq:
692 * mq.receive([buffer, [timeout]]) => [ message, priority ]
694 * Takes the highest priority message off the queue and returns
695 * an array containing the message as a String and the Integer
696 * priority of the message.
698 * If the optional +buffer+ is present, then it must be a String
699 * which will receive the data.
701 * If the optional +timeout+ is present, then it may be a Float
702 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
703 * will be raised if +timeout+ has elapsed and there are no messages
704 * in the queue.
706 * On some older systems, the +timeout+ argument is not currently
707 * supported and may raise NotImplementedError if +timeout+ is used.
709 static VALUE receive(int argc, VALUE *argv, VALUE self)
711 return _receive(PMQ_WANTARRAY, argc, argv, self);
715 * call-seq:
716 * mq.shift([buffer, [timeout]]) => message
718 * Takes the highest priority message off the queue and returns
719 * the message as a String.
721 * If the optional +buffer+ is present, then it must be a String
722 * which will receive the data.
724 * If the optional +timeout+ is present, then it may be a Float
725 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
726 * will be raised if +timeout+ has elapsed and there are no messages
727 * in the queue.
729 * On some older systems, the +timeout+ argument is not currently
730 * supported and may raise NotImplementedError if +timeout+ is used.
732 static VALUE shift(int argc, VALUE *argv, VALUE self)
734 return _receive(0, argc, argv, self);
737 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self)
739 struct posix_mq *mq = get(self, 1);
740 struct rw_args x;
741 VALUE buffer, timeout;
742 struct timespec expire;
744 if (mq->attr.mq_msgsize < 0) {
745 if (mq_getattr(mq->des, &mq->attr) < 0)
746 rb_sys_fail("mq_getattr");
749 rb_scan_args(argc, argv, "02", &buffer, &timeout);
750 x.timeout = convert_timeout(&expire, timeout);
752 if (NIL_P(buffer)) {
753 buffer = rb_str_new(0, mq->attr.mq_msgsize);
754 } else {
755 StringValue(buffer);
756 rb_str_modify(buffer);
757 rb_str_resize(buffer, mq->attr.mq_msgsize);
759 OBJ_TAINT(buffer);
760 x.msg_ptr = RSTRING_PTR(buffer);
761 x.msg_len = (size_t)mq->attr.mq_msgsize;
762 x.des = mq->des;
764 retry:
765 WITHOUT_GVL(xrecv, &x, RUBY_UBF_IO, 0);
766 if (x.received < 0) {
767 if (errno == EINTR)
768 goto retry;
769 if (errno == EAGAIN && (rflags & PMQ_TRY))
770 return Qnil;
771 rb_sys_fail("mq_receive");
774 rb_str_set_len(buffer, x.received);
776 if (rflags & PMQ_WANTARRAY)
777 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
778 return buffer;
782 * call-seq:
783 * mq.attr => mq_attr
785 * Returns a POSIX_MQ::Attr struct containing the attributes
786 * of the message queue. See the mq_getattr(3) manpage for
787 * more details.
789 static VALUE getattr(VALUE self)
791 struct posix_mq *mq = get(self, 1);
793 if (mq_getattr(mq->des, &mq->attr) < 0)
794 rb_sys_fail("mq_getattr");
796 return rb_funcall(cAttr, id_new, 4,
797 LONG2NUM(mq->attr.mq_flags),
798 LONG2NUM(mq->attr.mq_maxmsg),
799 LONG2NUM(mq->attr.mq_msgsize),
800 LONG2NUM(mq->attr.mq_curmsgs));
804 * call-seq:
805 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
807 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
808 * See the mq_setattr(3) manpage for more details.
810 * Consider using the POSIX_MQ#nonblock= method as it is easier and
811 * more natural to use.
813 static VALUE setattr(VALUE self, VALUE astruct)
815 struct posix_mq *mq = get(self, 1);
816 struct mq_attr newattr;
818 rstruct2mqattr(&newattr, astruct, 0);
820 if (mq_setattr(mq->des, &newattr, NULL) < 0)
821 rb_sys_fail("mq_setattr");
823 return astruct;
827 * call-seq:
828 * mq.close => nil
830 * Closes the underlying message queue descriptor.
831 * If this descriptor had a registered notification request, the request
832 * will be removed so another descriptor or process may register a
833 * notification request. Message queue descriptors are automatically
834 * closed by garbage collection.
836 static VALUE _close(VALUE self)
838 struct posix_mq *mq;
840 if (IDEMPOTENT_IO_CLOSE) { /* defined in extconf.rb */
841 mq = get(self, 0);
842 if (!mq || (mq->des == MQD_INVALID))
843 return Qnil;
844 } else {
845 mq = get(self, 1);
848 if (! MQ_IO_CLOSE(mq)) {
849 if (mq_close(mq->des) < 0)
850 rb_sys_fail("mq_close");
852 mq->des = MQD_INVALID;
854 return Qnil;
858 * call-seq:
859 * mq.closed? => true or false
861 * Returns +true+ if the message queue descriptor is closed and therefore
862 * unusable, otherwise +false+
864 static VALUE closed(VALUE self)
866 struct posix_mq *mq = get(self, 0);
868 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
872 * call-seq:
873 * mq.name => string
875 * Returns the string name of message queue associated with +mq+
877 static VALUE name(VALUE self)
879 struct posix_mq *mq = get(self, 0);
881 if (NIL_P(mq->name)) {
883 * We could use readlink(2) on /proc/self/fd/N, but lots of
884 * care required.
885 * http://stackoverflow.com/questions/1188757/
887 rb_raise(rb_eArgError, "can not get name of an adopted socket");
890 /* XXX compatibility: in retrospect, we could return a frozen string */
891 return rb_str_dup(mq->name);
894 static int lookup_sig(VALUE sig)
896 static VALUE list;
897 const char *ptr;
898 long len;
900 sig = rb_obj_as_string(sig);
901 len = RSTRING_LEN(sig);
902 ptr = RSTRING_PTR(sig);
904 if (len > 3 && !memcmp("SIG", ptr, 3))
905 sig = rb_str_new(ptr + 3, len - 3);
907 if (!list) {
908 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
910 list = rb_funcall(mSignal, rb_intern("list"), 0);
911 rb_obj_freeze(list);
912 rb_global_variable(&list);
915 sig = rb_hash_aref(list, sig);
916 if (NIL_P(sig))
917 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
919 return NUM2INT(sig);
923 * TODO: Under Linux, we could just use netlink directly
924 * the same way glibc does...
926 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
927 static void thread_notify_fd(union sigval sv)
929 int fd = sv.sival_int;
931 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
934 static void my_mq_notify(mqd_t des, struct sigevent *not)
936 int rv = mq_notify(des, not);
938 if (rv < 0) {
939 if (errno == ENOMEM) {
940 rb_gc();
941 rv = mq_notify(des, not);
943 if (rv < 0)
944 rb_sys_fail("mq_notify");
948 static void lower_stack_size(pthread_attr_t *attr)
950 /* some OSes have ridiculously small stack sizes */
951 #ifdef PTHREAD_STACK_MIN
952 size_t stack_size = PTHREAD_STACK_MIN;
953 size_t min_size = 4096;
955 if (stack_size < min_size)
956 stack_size = min_size;
957 pthread_attr_setstacksize(attr, stack_size);
958 #endif
961 /* :nodoc: */
962 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
964 int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
965 struct posix_mq *mq = get(self, 1);
966 struct sigevent not;
967 pthread_attr_t attr;
969 errno = pthread_attr_init(&attr);
970 if (errno) rb_sys_fail("pthread_attr_init");
972 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
973 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
975 lower_stack_size(&attr);
976 not.sigev_notify = SIGEV_THREAD;
977 not.sigev_notify_function = thread_notify_fd;
978 not.sigev_notify_attributes = &attr;
979 not.sigev_value.sival_int = fd;
981 if (!NIL_P(mq->thread))
982 rb_funcall(mq->thread, id_kill, 0);
983 mq->thread = thr;
985 my_mq_notify(mq->des, &not);
987 return thr;
990 /* :nodoc: */
991 static VALUE notify_cleanup(VALUE self)
993 struct posix_mq *mq = get(self, 1);
995 if (!NIL_P(mq->thread)) {
996 rb_funcall(mq->thread, id_kill, 0);
997 mq->thread = Qnil;
999 return Qnil;
1003 * call-seq:
1004 * mq.notify = signal => signal
1006 * Registers the notification request to deliver a given +signal+
1007 * to the current process when message is received.
1008 * If +signal+ is +nil+, it will unregister and disable the notification
1009 * request to allow other processes to register a request.
1010 * If +signal+ is +false+, it will register a no-op notification request
1011 * which will prevent other processes from registering a notification.
1012 * If +signal+ is an +IO+ object, it will spawn a thread upon the
1013 * arrival of the next message and write one "\\0" byte to the file
1014 * descriptor belonging to that IO object.
1015 * Only one process may have a notification request for a queue
1016 * at a time, Errno::EBUSY will be raised if there is already
1017 * a notification request registration for the queue.
1019 * Notifications are only fired once and processes must reregister
1020 * for subsequent notifications.
1022 * For readers of the mq_notify(3) manpage, passing +false+
1023 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
1024 * of passing a NULL notification pointer to mq_notify(3).
1026 static VALUE setnotify(VALUE self, VALUE arg)
1028 struct posix_mq *mq = get(self, 1);
1029 struct sigevent not;
1030 struct sigevent * notification = &not;
1031 VALUE rv = arg;
1033 notify_cleanup(self);
1034 not.sigev_notify = SIGEV_SIGNAL;
1036 switch (TYPE(arg)) {
1037 case T_FALSE:
1038 not.sigev_notify = SIGEV_NONE;
1039 break;
1040 case T_NIL:
1041 notification = NULL;
1042 break;
1043 case T_FIXNUM:
1044 not.sigev_signo = NUM2INT(arg);
1045 break;
1046 case T_SYMBOL:
1047 case T_STRING:
1048 not.sigev_signo = lookup_sig(arg);
1049 rv = INT2NUM(not.sigev_signo);
1050 break;
1051 default:
1052 rb_raise(rb_eArgError, "must be a signal or nil");
1055 my_mq_notify(mq->des, notification);
1057 return rv;
1061 * call-seq:
1062 * mq.nonblock? => true or false
1064 * Returns the current non-blocking state of the message queue descriptor.
1066 static VALUE nonblock_p(VALUE self)
1068 struct posix_mq *mq = get(self, 1);
1070 if (mq_getattr(mq->des, &mq->attr) < 0)
1071 rb_sys_fail("mq_getattr");
1072 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
1076 * call-seq:
1077 * mq.nonblock = boolean => boolean
1079 * Enables or disables non-blocking operation for the message queue
1080 * descriptor. Errno::EAGAIN will be raised in situations where
1081 * the queue would block. This is not compatible with +timeout+
1082 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
1084 static VALUE setnonblock(VALUE self, VALUE nb)
1086 struct mq_attr newattr;
1087 struct posix_mq *mq = get(self, 1);
1089 if (nb == Qtrue)
1090 newattr.mq_flags = O_NONBLOCK;
1091 else if (nb == Qfalse)
1092 newattr.mq_flags = 0;
1093 else
1094 rb_raise(rb_eArgError, "must be true or false");
1096 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
1097 rb_sys_fail("mq_setattr");
1099 mq->attr.mq_flags = newattr.mq_flags;
1101 return nb;
1105 * call-seq:
1106 * mq.autoclose = boolean => boolean
1108 * Determines whether or not the _mq_ will be closed automatically
1109 * at finalization.
1111 static VALUE setautoclose(VALUE self, VALUE autoclose)
1113 struct posix_mq *mq = get(self, 1);
1115 MQ_IO_SET_AUTOCLOSE(mq, autoclose);
1116 mq->autoclose = RTEST(autoclose) ? 1 : 0;
1117 return autoclose;
1121 * call-seq:
1122 * mq.autoclose? => boolean
1124 * Returns whether or not the _mq_ will be closed automatically
1125 * at finalization.
1127 static VALUE autoclose_p(VALUE self)
1129 struct posix_mq *mq = get(self, 1);
1131 return mq->autoclose ? Qtrue : Qfalse;
1135 * call-seq:
1136 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
1138 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
1139 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1140 * on success instead of +nil+.
1142 * This does not guarantee non-blocking behavior, the message queue must
1143 * be made non-blocking before calling this method.
1145 static VALUE trysend(int argc, VALUE *argv, VALUE self)
1147 return _send(PMQ_TRY, argc, argv, self);
1151 * call-seq:
1152 * mq.tryshift([buffer [, timeout]]) => message or nil
1154 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1155 * Errno::EAGAIN when non-blocking operation is desired.
1157 * This does not guarantee non-blocking behavior, the message queue must
1158 * be made non-blocking before calling this method.
1160 static VALUE tryshift(int argc, VALUE *argv, VALUE self)
1162 return _receive(PMQ_TRY, argc, argv, self);
1166 * call-seq:
1167 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1169 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1170 * Errno::EAGAIN when non-blocking operation is desired.
1172 * This does not guarantee non-blocking behavior, the message queue must
1173 * be made non-blocking before calling this method.
1175 static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
1177 return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
1180 void Init_posix_mq_ext(void)
1182 VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
1183 rb_define_alloc_func(cPOSIX_MQ, alloc);
1184 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
1187 * The maximum number of open message descriptors supported
1188 * by the system. This may be -1, in which case it is dynamically
1189 * set at runtime. Consult your operating system documentation
1190 * for system-specific information about this.
1192 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
1193 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
1196 * The maximum priority that may be specified for POSIX_MQ#send
1197 * On POSIX-compliant systems, this is at least 31, but some
1198 * systems allow higher limits.
1199 * The minimum priority is always zero.
1201 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
1202 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
1204 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
1206 rb_define_private_method(cPOSIX_MQ, "initialize", init, -1);
1207 rb_define_method(cPOSIX_MQ, "send", my_send, -1);
1208 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
1209 rb_define_method(cPOSIX_MQ, "trysend", trysend, -1);
1210 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
1211 rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1);
1212 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
1213 rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1);
1214 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
1215 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
1216 rb_define_method(cPOSIX_MQ, "close", _close, 0);
1217 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
1218 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
1219 rb_define_method(cPOSIX_MQ, "name", name, 0);
1220 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
1221 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
1222 rb_define_private_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1223 rb_define_private_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1224 rb_define_method(cPOSIX_MQ, "nonblock?", nonblock_p, 0);
1225 rb_define_method(cPOSIX_MQ, "autoclose?", autoclose_p, 0);
1226 rb_define_method(cPOSIX_MQ, "autoclose=", setautoclose, 1);
1227 #ifdef MQD_TO_FD
1228 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1229 id_setautoclose = rb_intern("autoclose=");
1230 #endif
1232 #ifdef FD_TO_MQD
1233 rb_define_module_function(cPOSIX_MQ, "for_fd", for_fd, 1);
1234 #endif
1236 id_new = rb_intern("new");
1237 id_kill = rb_intern("kill");
1238 id_fileno = rb_intern("fileno");
1239 id_divmod = rb_intern("divmod");
1240 id_flags = rb_intern("flags");
1241 id_maxmsg = rb_intern("maxmsg");
1242 id_msgsize = rb_intern("msgsize");
1243 id_curmsgs = rb_intern("curmsgs");
1244 sym_r = ID2SYM(rb_intern("r"));
1245 sym_w = ID2SYM(rb_intern("w"));
1246 sym_rw = ID2SYM(rb_intern("rw"));