73cd815b861888fa457e838114010cade28f1534
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blob73cd815b861888fa457e838114010cade28f1534
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <float.h>
25 #include <math.h>
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
30 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
31 #else
32 # define MQ_IO_MARK(mq) ((void)(0))
33 # define MQ_IO_SET(mq,val) ((void)(0))
34 # define MQ_IO_CLOSE(mq) ((int)(0))
35 # define MQ_IO_NIL_P(mq) ((int)(1))
36 #endif
38 struct posix_mq {
39 mqd_t des;
40 struct mq_attr attr;
41 VALUE name;
42 VALUE thread;
43 #ifdef MQD_TO_FD
44 VALUE io;
45 #endif
48 #ifdef MQD_TO_FD
49 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
50 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
51 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
52 static int MQ_IO_CLOSE(struct posix_mq *mq)
54 if (NIL_P(mq->io))
55 return 0;
57 /* not safe during GC */
58 rb_io_close(mq->io);
59 mq->io = Qnil;
61 return 1;
63 #endif
65 # define PMQ_WANTARRAY (1<<0)
66 # define PMQ_TRY (1<<1)
68 static VALUE cAttr;
69 static ID id_new, id_kill, id_fileno, id_mul, id_divmod;
70 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
71 static VALUE sym_r, sym_w, sym_rw;
72 static const mqd_t MQD_INVALID = (mqd_t)-1;
74 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
75 #ifndef RSTRING_PTR
76 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
77 #endif
78 #ifndef RSTRING_LEN
79 # define RSTRING_LEN(s) (RSTRING(s)->len)
80 #endif
81 #ifndef RFLOAT_VALUE
82 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
83 #endif
85 #ifndef HAVE_RB_STR_SET_LEN
86 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
87 # ifdef RUBINIUS
88 # error upgrade Rubinius, rb_str_set_len should be available
89 # endif
90 static void rb_18_str_set_len(VALUE str, long len)
92 RSTRING(str)->len = len;
93 RSTRING(str)->ptr[len] = '\0';
95 #define rb_str_set_len rb_18_str_set_len
96 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
98 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
99 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
100 # include <rubysig.h>
101 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
102 typedef void rb_unblock_function_t(void *);
103 typedef VALUE rb_blocking_function_t(void *);
104 static VALUE
105 rb_thread_blocking_region(
106 rb_blocking_function_t *func, void *data1,
107 rb_unblock_function_t *ubf, void *data2)
109 VALUE rv;
111 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
113 TRAP_BEG;
114 rv = func(data1);
115 TRAP_END;
117 return rv;
119 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
121 /* used to pass arguments to mq_open inside blocking region */
122 struct open_args {
123 int argc;
124 const char *name;
125 int oflags;
126 mode_t mode;
127 struct mq_attr attr;
130 /* used to pass arguments to mq_send/mq_receive inside blocking region */
131 struct rw_args {
132 mqd_t des;
133 char *msg_ptr;
134 size_t msg_len;
135 unsigned msg_prio;
136 struct timespec *timeout;
139 #ifndef HAVE_MQ_TIMEDSEND
140 static mqd_t
141 not_timedsend(mqd_t mqdes, const char *msg_ptr,
142 size_t msg_len, unsigned msg_prio,
143 const struct timespec *abs_timeout)
145 rb_bug("mq_timedsend workaround failed");
146 return (mqd_t)-1;
148 # define mq_timedsend not_timedsend
149 #endif
150 #ifndef HAVE_MQ_TIMEDRECEIVE
151 static ssize_t
152 not_timedreceive(mqd_t mqdes, char *msg_ptr,
153 size_t msg_len, unsigned *msg_prio,
154 const struct timespec *abs_timeout)
156 rb_bug("mq_timedreceive workaround failed");
157 return (mqd_t)-1;
159 # define mq_timedreceive not_timedreceive
160 #endif
162 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
163 static void num2timespec(struct timespec *ts, VALUE t)
165 switch (TYPE(t)) {
166 case T_FIXNUM:
167 case T_BIGNUM:
168 ts->tv_sec = NUM2TIMET(t);
169 ts->tv_nsec = 0;
170 break;
171 case T_FLOAT: {
172 double f, d;
173 double val = RFLOAT_VALUE(t);
175 d = modf(val, &f);
176 if (d >= 0) {
177 ts->tv_nsec = (long)(d * 1e9 + 0.5);
178 } else {
179 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
180 if (ts->tv_nsec > 0) {
181 ts->tv_nsec = 1000000000 - ts->tv_nsec;
182 f -= 1;
185 ts->tv_sec = (time_t)f;
186 if (f != ts->tv_sec)
187 rb_raise(rb_eRangeError, "%f out of range", val);
188 ts->tv_sec = (time_t)f;
190 break;
191 default: {
192 VALUE f;
193 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
195 Check_Type(ary, T_ARRAY);
197 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
198 f = rb_ary_entry(ary, 1);
199 f = rb_funcall(f, id_mul, 1, INT2FIX(1000000000));
200 ts->tv_nsec = NUM2LONG(f);
204 #else
205 static void num2timespec(struct timespec *ts, VALUE t)
207 rb_raise(rb_eNotImpError,
208 "mq_timedsend and/or mq_timedreceive missing");
210 #endif
212 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
214 struct timespec ts, now;
216 if (NIL_P(t))
217 return NULL;
219 num2timespec(&ts, t);
220 clock_gettime(CLOCK_REALTIME, &now);
221 dest->tv_sec = now.tv_sec + ts.tv_sec;
222 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
224 if (dest->tv_nsec > 1000000000) {
225 dest->tv_nsec -= 1000000000;
226 ++dest->tv_sec;
229 return dest;
232 /* (may) run without GVL */
233 static VALUE xopen(void *ptr)
235 struct open_args *x = ptr;
236 mqd_t rv;
238 switch (x->argc) {
239 case 2: rv = mq_open(x->name, x->oflags); break;
240 case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
241 case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
242 default: rv = MQD_INVALID;
245 return (VALUE)rv;
248 /* runs without GVL */
249 static VALUE xsend(void *ptr)
251 struct rw_args *x = ptr;
253 if (x->timeout)
254 return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
255 x->msg_prio, x->timeout);
257 return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
260 /* runs without GVL */
261 static VALUE xrecv(void *ptr)
263 struct rw_args *x = ptr;
265 if (x->timeout)
266 return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
267 &x->msg_prio, x->timeout);
269 return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
272 /* called by GC */
273 static void mark(void *ptr)
275 struct posix_mq *mq = ptr;
277 rb_gc_mark(mq->name);
278 rb_gc_mark(mq->thread);
279 MQ_IO_MARK(mq);
282 /* called by GC */
283 static void _free(void *ptr)
285 struct posix_mq *mq = ptr;
287 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
288 /* we ignore errors when gc-ing */
289 mq_close(mq->des);
290 errno = 0;
292 xfree(ptr);
295 /* automatically called at creation (before initialize) */
296 static VALUE alloc(VALUE klass)
298 struct posix_mq *mq;
299 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
301 mq->des = MQD_INVALID;
302 mq->attr.mq_flags = 0;
303 mq->attr.mq_maxmsg = 0;
304 mq->attr.mq_msgsize = -1;
305 mq->attr.mq_curmsgs = 0;
306 mq->name = Qnil;
307 mq->thread = Qnil;
308 MQ_IO_SET(mq, Qnil);
310 return rv;
313 /* unwraps the posix_mq struct from self */
314 static struct posix_mq *get(VALUE self, int need_valid)
316 struct posix_mq *mq;
318 Data_Get_Struct(self, struct posix_mq, mq);
320 if (need_valid && mq->des == MQD_INVALID)
321 rb_raise(rb_eIOError, "closed queue descriptor");
323 return mq;
326 static void check_struct_type(VALUE astruct)
328 if (CLASS_OF(astruct) == cAttr)
329 return;
330 astruct = rb_inspect(astruct);
331 rb_raise(rb_eTypeError, "not a POSIX_MQ::Attr: %s",
332 StringValuePtr(astruct));
335 static void rstruct2mqattr(struct mq_attr *attr, VALUE astruct, int all)
337 VALUE tmp;
339 check_struct_type(astruct);
340 attr->mq_flags = NUM2LONG(rb_funcall(astruct, id_flags, 0));
342 tmp = rb_funcall(astruct, id_maxmsg, 0);
343 if (all || !NIL_P(tmp))
344 attr->mq_maxmsg = NUM2LONG(tmp);
346 tmp = rb_funcall(astruct, id_msgsize, 0);
347 if (all || !NIL_P(tmp))
348 attr->mq_msgsize = NUM2LONG(tmp);
350 tmp = rb_funcall(astruct, id_curmsgs, 0);
351 if (!NIL_P(tmp))
352 attr->mq_curmsgs = NUM2LONG(tmp);
356 * call-seq:
357 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
359 * Opens a POSIX message queue given by +name+. +name+ should start
360 * with a slash ("/") for portable applications.
362 * If a Symbol is given in place of integer +flags+, then:
364 * * +:r+ is equivalent to IO::RDONLY
365 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
366 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
368 * +mode+ is an integer and only used when IO::CREAT is used.
369 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
370 * If +mq_attr+ is not specified when creating a queue, then the
371 * system defaults will be used.
373 * See the manpage for mq_open(3) for more details on this function.
375 static VALUE init(int argc, VALUE *argv, VALUE self)
377 struct posix_mq *mq = get(self, 0);
378 struct open_args x;
379 VALUE name, oflags, mode, attr;
381 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
383 switch (TYPE(oflags)) {
384 case T_NIL:
385 x.oflags = O_RDONLY;
386 break;
387 case T_SYMBOL:
388 if (oflags == sym_r)
389 x.oflags = O_RDONLY;
390 else if (oflags == sym_w)
391 x.oflags = O_CREAT|O_WRONLY;
392 else if (oflags == sym_rw)
393 x.oflags = O_CREAT|O_RDWR;
394 else {
395 oflags = rb_inspect(oflags);
396 rb_raise(rb_eArgError,
397 "symbol must be :r, :w, or :rw: %s",
398 StringValuePtr(oflags));
400 break;
401 case T_BIGNUM:
402 case T_FIXNUM:
403 x.oflags = NUM2INT(oflags);
404 break;
405 default:
406 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
409 x.name = StringValueCStr(name);
410 x.argc = 2;
412 switch (TYPE(mode)) {
413 case T_FIXNUM:
414 x.argc = 3;
415 x.mode = NUM2UINT(mode);
416 break;
417 case T_NIL:
418 if (x.oflags & O_CREAT) {
419 x.argc = 3;
420 x.mode = 0666;
422 break;
423 default:
424 rb_raise(rb_eArgError, "mode not an integer");
427 switch (TYPE(attr)) {
428 case T_STRUCT:
429 x.argc = 4;
430 rstruct2mqattr(&x.attr, attr, 1);
432 /* principle of least surprise */
433 if (x.attr.mq_flags & O_NONBLOCK)
434 x.oflags |= O_NONBLOCK;
435 break;
436 case T_NIL:
437 break;
438 default:
439 check_struct_type(attr);
442 mq->des = (mqd_t)xopen(&x);
443 if (mq->des == MQD_INVALID) {
444 switch (errno) {
445 case ENOMEM:
446 case EMFILE:
447 case ENFILE:
448 case ENOSPC:
449 rb_gc();
450 mq->des = (mqd_t)xopen(&x);
452 if (mq->des == MQD_INVALID)
453 rb_sys_fail("mq_open");
456 mq->name = rb_str_dup(name);
457 if (x.oflags & O_NONBLOCK)
458 mq->attr.mq_flags = O_NONBLOCK;
460 return self;
464 * call-seq:
465 * POSIX_MQ.unlink(name) => 1
467 * Unlinks the message queue given by +name+. The queue will be destroyed
468 * when the last process with the queue open closes its queue descriptors.
470 static VALUE s_unlink(VALUE self, VALUE name)
472 mqd_t rv = mq_unlink(StringValueCStr(name));
474 if (rv == MQD_INVALID)
475 rb_sys_fail("mq_unlink");
477 return INT2NUM(1);
481 * call-seq:
482 * mq.unlink => mq
484 * Unlinks the message queue to prevent other processes from accessing it.
485 * All existing queue descriptors to this queue including those opened by
486 * other processes are unaffected. The queue will only be destroyed
487 * when the last process with open descriptors to this queue closes
488 * the descriptors.
490 static VALUE _unlink(VALUE self)
492 struct posix_mq *mq = get(self, 0);
493 mqd_t rv;
495 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
497 rv = mq_unlink(RSTRING_PTR(mq->name));
498 if (rv == MQD_INVALID)
499 rb_sys_fail("mq_unlink");
501 return self;
504 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
506 buffer = rb_obj_as_string(buffer);
507 x->msg_ptr = RSTRING_PTR(buffer);
508 x->msg_len = (size_t)RSTRING_LEN(buffer);
511 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self);
514 * call-seq:
515 * mq.send(string [,priority[, timeout]]) => true
517 * Inserts the given +string+ into the message queue with an optional,
518 * unsigned integer +priority+. If the optional +timeout+ is specified,
519 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
520 * before +timeout+ seconds has elapsed. Without +timeout+, this method
521 * may block until the queue is writable.
523 * On some older systems, the +timeout+ argument is not currently
524 * supported and may raise NotImplementedError if +timeout+ is used.
526 static VALUE my_send(int argc, VALUE *argv, VALUE self)
528 return _send(0, argc, argv, self);
531 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self)
533 struct posix_mq *mq = get(self, 1);
534 struct rw_args x;
535 VALUE buffer, prio, timeout;
536 mqd_t rv;
537 struct timespec expire;
539 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
541 setup_send_buffer(&x, buffer);
542 x.des = mq->des;
543 x.timeout = convert_timeout(&expire, timeout);
544 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
546 rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
547 if (rv == MQD_INVALID) {
548 if (errno == EAGAIN && (sflags & PMQ_TRY))
549 return Qfalse;
550 rb_sys_fail("mq_send");
553 return Qtrue;
557 * call-seq:
558 * mq << string => mq
560 * Inserts the given +string+ into the message queue with a
561 * default priority of 0 and no timeout.
563 * Returns itself so its calls may be chained. This use is only
564 * recommended only for users who expect blocking behavior from
565 * the queue.
567 static VALUE send0(VALUE self, VALUE buffer)
569 struct posix_mq *mq = get(self, 1);
570 struct rw_args x;
571 mqd_t rv;
573 setup_send_buffer(&x, buffer);
574 x.des = mq->des;
575 x.timeout = NULL;
576 x.msg_prio = 0;
578 rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
579 if (rv == MQD_INVALID)
580 rb_sys_fail("mq_send");
582 return self;
585 #ifdef MQD_TO_FD
587 * call-seq:
588 * mq.to_io => IO
590 * Returns an IO.select-able +IO+ object. This method is only available
591 * under Linux and FreeBSD and is not intended to be portable.
593 static VALUE to_io(VALUE self)
595 struct posix_mq *mq = get(self, 1);
596 int fd = MQD_TO_FD(mq->des);
598 if (NIL_P(mq->io))
599 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
601 return mq->io;
603 #endif
605 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self);
608 * call-seq:
609 * mq.receive([buffer, [timeout]]) => [ message, priority ]
611 * Takes the highest priority message off the queue and returns
612 * an array containing the message as a String and the Integer
613 * priority of the message.
615 * If the optional +buffer+ is present, then it must be a String
616 * which will receive the data.
618 * If the optional +timeout+ is present, then it may be a Float
619 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
620 * will be raised if +timeout+ has elapsed and there are no messages
621 * in the queue.
623 * On some older systems, the +timeout+ argument is not currently
624 * supported and may raise NotImplementedError if +timeout+ is used.
626 static VALUE receive(int argc, VALUE *argv, VALUE self)
628 return _receive(PMQ_WANTARRAY, argc, argv, self);
632 * call-seq:
633 * mq.shift([buffer, [timeout]]) => message
635 * Takes the highest priority message off the queue and returns
636 * the message as a String.
638 * If the optional +buffer+ is present, then it must be a String
639 * which will receive the data.
641 * If the optional +timeout+ is present, then it may be a Float
642 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
643 * will be raised if +timeout+ has elapsed and there are no messages
644 * in the queue.
646 * On some older systems, the +timeout+ argument is not currently
647 * supported and may raise NotImplementedError if +timeout+ is used.
649 static VALUE shift(int argc, VALUE *argv, VALUE self)
651 return _receive(0, argc, argv, self);
654 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self)
656 struct posix_mq *mq = get(self, 1);
657 struct rw_args x;
658 VALUE buffer, timeout;
659 ssize_t r;
660 struct timespec expire;
662 if (mq->attr.mq_msgsize < 0) {
663 if (mq_getattr(mq->des, &mq->attr) < 0)
664 rb_sys_fail("mq_getattr");
667 rb_scan_args(argc, argv, "02", &buffer, &timeout);
668 x.timeout = convert_timeout(&expire, timeout);
670 if (NIL_P(buffer)) {
671 buffer = rb_str_new(0, mq->attr.mq_msgsize);
672 } else {
673 StringValue(buffer);
674 rb_str_modify(buffer);
675 rb_str_resize(buffer, mq->attr.mq_msgsize);
677 OBJ_TAINT(buffer);
678 x.msg_ptr = RSTRING_PTR(buffer);
679 x.msg_len = (size_t)mq->attr.mq_msgsize;
680 x.des = mq->des;
682 r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
683 if (r < 0) {
684 if (errno == EAGAIN && (rflags & PMQ_TRY))
685 return Qnil;
686 rb_sys_fail("mq_receive");
689 rb_str_set_len(buffer, r);
691 if (rflags & PMQ_WANTARRAY)
692 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
693 return buffer;
697 * call-seq:
698 * mq.attr => mq_attr
700 * Returns a POSIX_MQ::Attr struct containing the attributes
701 * of the message queue. See the mq_getattr(3) manpage for
702 * more details.
704 static VALUE getattr(VALUE self)
706 struct posix_mq *mq = get(self, 1);
707 VALUE astruct;
709 if (mq_getattr(mq->des, &mq->attr) < 0)
710 rb_sys_fail("mq_getattr");
712 return rb_funcall(cAttr, id_new, 4,
713 LONG2NUM(mq->attr.mq_flags),
714 LONG2NUM(mq->attr.mq_maxmsg),
715 LONG2NUM(mq->attr.mq_msgsize),
716 LONG2NUM(mq->attr.mq_curmsgs));
720 * call-seq:
721 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
723 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
724 * See the mq_setattr(3) manpage for more details.
726 * Consider using the POSIX_MQ#nonblock= method as it is easier and
727 * more natural to use.
729 static VALUE setattr(VALUE self, VALUE astruct)
731 struct posix_mq *mq = get(self, 1);
732 struct mq_attr newattr;
734 rstruct2mqattr(&newattr, astruct, 0);
736 if (mq_setattr(mq->des, &newattr, NULL) < 0)
737 rb_sys_fail("mq_setattr");
739 return astruct;
743 * call-seq:
744 * mq.close => nil
746 * Closes the underlying message queue descriptor.
747 * If this descriptor had a registered notification request, the request
748 * will be removed so another descriptor or process may register a
749 * notification request. Message queue descriptors are automatically
750 * closed by garbage collection.
752 static VALUE _close(VALUE self)
754 struct posix_mq *mq = get(self, 1);
756 if (! MQ_IO_CLOSE(mq)) {
757 if (mq_close(mq->des) == -1)
758 rb_sys_fail("mq_close");
760 mq->des = MQD_INVALID;
762 return Qnil;
766 * call-seq:
767 * mq.closed? => true or false
769 * Returns +true+ if the message queue descriptor is closed and therefore
770 * unusable, otherwise +false+
772 static VALUE closed(VALUE self)
774 struct posix_mq *mq = get(self, 0);
776 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
780 * call-seq:
781 * mq.name => string
783 * Returns the string name of message queue associated with +mq+
785 static VALUE name(VALUE self)
787 struct posix_mq *mq = get(self, 0);
789 return rb_str_dup(mq->name);
792 static int lookup_sig(VALUE sig)
794 static VALUE list;
795 const char *ptr;
796 long len;
798 sig = rb_obj_as_string(sig);
799 len = RSTRING_LEN(sig);
800 ptr = RSTRING_PTR(sig);
802 if (len > 3 && !memcmp("SIG", ptr, 3))
803 sig = rb_str_new(ptr + 3, len - 3);
805 if (!list) {
806 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
808 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
809 rb_global_variable(&list);
812 sig = rb_hash_aref(list, sig);
813 if (NIL_P(sig))
814 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
816 return NUM2INT(sig);
820 * TODO: Under Linux, we could just use netlink directly
821 * the same way glibc does...
823 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
824 static void thread_notify_fd(union sigval sv)
826 int fd = sv.sival_int;
828 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
831 static void my_mq_notify(mqd_t des, struct sigevent *not)
833 mqd_t rv = mq_notify(des, not);
835 if (rv == MQD_INVALID) {
836 if (errno == ENOMEM) {
837 rb_gc();
838 rv = mq_notify(des, not);
840 if (rv == MQD_INVALID)
841 rb_sys_fail("mq_notify");
845 /* :nodoc: */
846 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
848 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
849 struct posix_mq *mq = get(self, 1);
850 struct sigevent not;
851 pthread_attr_t attr;
853 errno = pthread_attr_init(&attr);
854 if (errno) rb_sys_fail("pthread_attr_init");
856 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
857 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
859 #ifdef PTHREAD_STACK_MIN
860 (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
861 #endif
863 not.sigev_notify = SIGEV_THREAD;
864 not.sigev_notify_function = thread_notify_fd;
865 not.sigev_notify_attributes = &attr;
866 not.sigev_value.sival_int = fd;
868 if (!NIL_P(mq->thread))
869 rb_funcall(mq->thread, id_kill, 0, 0);
870 mq->thread = thr;
872 my_mq_notify(mq->des, &not);
874 return thr;
877 /* :nodoc: */
878 static VALUE notify_cleanup(VALUE self)
880 struct posix_mq *mq = get(self, 1);
882 if (!NIL_P(mq->thread)) {
883 rb_funcall(mq->thread, id_kill, 0, 0);
884 mq->thread = Qnil;
886 return Qnil;
890 * call-seq:
891 * mq.notify = signal => signal
893 * Registers the notification request to deliver a given +signal+
894 * to the current process when message is received.
895 * If +signal+ is +nil+, it will unregister and disable the notification
896 * request to allow other processes to register a request.
897 * If +signal+ is +false+, it will register a no-op notification request
898 * which will prevent other processes from registering a notification.
899 * If +signal+ is an +IO+ object, it will spawn a thread upon the
900 * arrival of the next message and write one "\\0" byte to the file
901 * descriptor belonging to that IO object.
902 * Only one process may have a notification request for a queue
903 * at a time, Errno::EBUSY will be raised if there is already
904 * a notification request registration for the queue.
906 * Notifications are only fired once and processes must reregister
907 * for subsequent notifications.
909 * For readers of the mq_notify(3) manpage, passing +false+
910 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
911 * of passing a NULL notification pointer to mq_notify(3).
913 static VALUE setnotify(VALUE self, VALUE arg)
915 struct posix_mq *mq = get(self, 1);
916 struct sigevent not;
917 struct sigevent * notification = &not;
918 VALUE rv = arg;
920 notify_cleanup(self);
921 not.sigev_notify = SIGEV_SIGNAL;
923 switch (TYPE(arg)) {
924 case T_FALSE:
925 not.sigev_notify = SIGEV_NONE;
926 break;
927 case T_NIL:
928 notification = NULL;
929 break;
930 case T_FIXNUM:
931 not.sigev_signo = NUM2INT(arg);
932 break;
933 case T_SYMBOL:
934 case T_STRING:
935 not.sigev_signo = lookup_sig(arg);
936 rv = INT2NUM(not.sigev_signo);
937 break;
938 default:
939 rb_raise(rb_eArgError, "must be a signal or nil");
942 my_mq_notify(mq->des, notification);
944 return rv;
948 * call-seq:
949 * mq.nonblock? => true or false
951 * Returns the current non-blocking state of the message queue descriptor.
953 static VALUE nonblock_p(VALUE self)
955 struct posix_mq *mq = get(self, 1);
957 if (mq_getattr(mq->des, &mq->attr) < 0)
958 rb_sys_fail("mq_getattr");
959 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
963 * call-seq:
964 * mq.nonblock = boolean => boolean
966 * Enables or disables non-blocking operation for the message queue
967 * descriptor. Errno::EAGAIN will be raised in situations where
968 * the queue would block. This is not compatible with +timeout+
969 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
971 static VALUE setnonblock(VALUE self, VALUE nb)
973 struct mq_attr newattr;
974 struct posix_mq *mq = get(self, 1);
976 if (nb == Qtrue)
977 newattr.mq_flags = O_NONBLOCK;
978 else if (nb == Qfalse)
979 newattr.mq_flags = 0;
980 else
981 rb_raise(rb_eArgError, "must be true or false");
983 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
984 rb_sys_fail("mq_setattr");
986 mq->attr.mq_flags = newattr.mq_flags;
988 return nb;
991 static VALUE tryinit(int argc, VALUE *argv, VALUE self)
993 init(argc, argv, self);
994 setnonblock(self, Qtrue);
996 return self;
1000 * call-seq:
1001 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
1003 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
1004 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1005 * on success instead of +nil+.
1007 * This does not guarantee non-blocking behavior, the message queue must
1008 * be made non-blocking before calling this method.
1010 static VALUE trysend(int argc, VALUE *argv, VALUE self)
1012 _send(PMQ_TRY, argc, argv, self);
1016 * call-seq:
1017 * mq.tryshift([buffer [, timeout]]) => message or nil
1019 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1020 * Errno::EAGAIN when non-blocking operation is desired.
1022 * This does not guarantee non-blocking behavior, the message queue must
1023 * be made non-blocking before calling this method.
1025 static VALUE tryshift(int argc, VALUE *argv, VALUE self)
1027 return _receive(PMQ_TRY, argc, argv, self);
1031 * call-seq:
1032 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1034 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1035 * Errno::EAGAIN when non-blocking operation is desired.
1037 * This does not guarantee non-blocking behavior, the message queue must
1038 * be made non-blocking before calling this method.
1040 static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
1042 return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
1045 void Init_posix_mq_ext(void)
1047 VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
1048 rb_define_alloc_func(cPOSIX_MQ, alloc);
1049 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
1052 * The maximum number of open message descriptors supported
1053 * by the system. This may be -1, in which case it is dynamically
1054 * set at runtime. Consult your operating system documentation
1055 * for system-specific information about this.
1057 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
1058 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
1061 * The maximum priority that may be specified for POSIX_MQ#send
1062 * On POSIX-compliant systems, this is at least 31, but some
1063 * systems allow higher limits.
1064 * The minimum priority is always zero.
1066 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
1067 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
1069 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
1071 rb_define_method(cPOSIX_MQ, "initialize", init, -1);
1072 rb_define_method(cPOSIX_MQ, "send", my_send, -1);
1073 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
1074 rb_define_method(cPOSIX_MQ, "trysend", trysend, -1);
1075 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
1076 rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1);
1077 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
1078 rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1);
1079 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
1080 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
1081 rb_define_method(cPOSIX_MQ, "close", _close, 0);
1082 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
1083 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
1084 rb_define_method(cPOSIX_MQ, "name", name, 0);
1085 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
1086 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
1087 rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1088 rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1089 rb_define_method(cPOSIX_MQ, "nonblock?", nonblock_p, 0);
1090 #ifdef MQD_TO_FD
1091 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1092 #endif
1094 id_new = rb_intern("new");
1095 id_kill = rb_intern("kill");
1096 id_fileno = rb_intern("fileno");
1097 id_mul = rb_intern("*");
1098 id_divmod = rb_intern("divmod");
1099 id_flags = rb_intern("flags");
1100 id_maxmsg = rb_intern("maxmsg");
1101 id_msgsize = rb_intern("msgsize");
1102 id_curmsgs = rb_intern("curmsgs");
1103 sym_r = ID2SYM(rb_intern("r"));
1104 sym_w = ID2SYM(rb_intern("w"));
1105 sym_rw = ID2SYM(rb_intern("rw"));