ad8bcff4fe9ffb33bb9bdcfa9adf7f679d241577
[ruby_posix_mq.git] / ext / posix_mq / posix_mq.c
blobad8bcff4fe9ffb33bb9bdcfa9adf7f679d241577
1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
4 #endif
5 #ifdef HAVE_SIGNAL_H
6 # include <signal.h>
7 #endif
8 #ifdef HAVE_PTHREAD_H
9 # include <pthread.h>
10 #endif
11 #include <ruby.h>
13 #ifndef NUM2TIMET
14 # define NUM2TIMET NUM2INT
15 #endif
17 #include <time.h>
18 #include <mqueue.h>
19 #include <fcntl.h>
20 #include <sys/stat.h>
21 #include <errno.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <float.h>
25 #include <math.h>
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
30 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
31 #else
32 # define MQ_IO_MARK(mq) ((void)(0))
33 # define MQ_IO_SET(mq,val) ((void)(0))
34 # define MQ_IO_CLOSE(mq) ((int)(0))
35 # define MQ_IO_NIL_P(mq) ((int)(1))
36 #endif
38 struct posix_mq {
39 mqd_t des;
40 struct mq_attr attr;
41 VALUE name;
42 VALUE thread;
43 #ifdef MQD_TO_FD
44 VALUE io;
45 #endif
48 #ifdef MQD_TO_FD
49 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
50 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
51 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
52 static int MQ_IO_CLOSE(struct posix_mq *mq)
54 if (NIL_P(mq->io))
55 return 0;
57 /* not safe during GC */
58 rb_io_close(mq->io);
59 mq->io = Qnil;
61 return 1;
63 #endif
65 # define PMQ_WANTARRAY (1<<0)
66 # define PMQ_TRY (1<<1)
68 static VALUE cAttr;
69 static ID id_new, id_kill, id_fileno, id_divmod;
70 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
71 static VALUE sym_r, sym_w, sym_rw;
72 static const mqd_t MQD_INVALID = (mqd_t)-1;
74 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
75 #ifndef RSTRING_PTR
76 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
77 #endif
78 #ifndef RSTRING_LEN
79 # define RSTRING_LEN(s) (RSTRING(s)->len)
80 #endif
81 #ifndef RFLOAT_VALUE
82 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
83 #endif
85 #ifndef HAVE_RB_STR_SET_LEN
86 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
87 # ifdef RUBINIUS
88 # error upgrade Rubinius, rb_str_set_len should be available
89 # endif
90 static void rb_18_str_set_len(VALUE str, long len)
92 RSTRING(str)->len = len;
93 RSTRING(str)->ptr[len] = '\0';
95 #define rb_str_set_len rb_18_str_set_len
96 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
98 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
99 #if defined(HAVE_RB_THREAD_BLOCKING_REGION) && \
100 defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
102 * Ruby 1.9 - 2.1 (we use deprecated rb_thread_blocking_region in 2.0+
103 * because we can detect (but not use) rb_thread_blocking_region in 1.9.3
105 typedef VALUE(*my_blocking_fn_t)(void*);
106 # define WITHOUT_GVL(fn,a,ubf,b) \
107 rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
108 #elif defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) /* Ruby 2.2+ */
109 #include <ruby/thread.h>
110 # define WITHOUT_GVL(fn,a,ubf,b) \
111 rb_thread_call_without_gvl((fn),(a),(ubf),(b))
112 #else /* Ruby 1.8 */
113 # include <rubysig.h>
114 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
115 typedef void rb_unblock_function_t(void *);
116 typedef void * rb_blocking_function_t(void *);
117 static void * WITHOUT_GVL(rb_blocking_function_t *func, void *data1,
118 rb_unblock_function_t *ubf, void *data2)
120 void *rv;
122 assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
124 TRAP_BEG;
125 rv = func(data1);
126 TRAP_END;
128 return rv;
130 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
132 /* used to pass arguments to mq_open inside blocking region */
133 struct open_args {
134 mqd_t des;
135 int argc;
136 const char *name;
137 int oflags;
138 mode_t mode;
139 struct mq_attr attr;
142 /* used to pass arguments to mq_send/mq_receive inside blocking region */
143 struct rw_args {
144 mqd_t des;
145 union {
146 ssize_t received;
147 int retval;
149 char *msg_ptr;
150 size_t msg_len;
151 unsigned msg_prio;
152 struct timespec *timeout;
155 #ifndef HAVE_MQ_TIMEDSEND
156 static mqd_t
157 not_timedsend(mqd_t mqdes, const char *msg_ptr,
158 size_t msg_len, unsigned msg_prio,
159 const struct timespec *abs_timeout)
161 rb_bug("mq_timedsend workaround failed");
162 return (mqd_t)-1;
164 # define mq_timedsend not_timedsend
165 #endif
166 #ifndef HAVE_MQ_TIMEDRECEIVE
167 static ssize_t
168 not_timedreceive(mqd_t mqdes, char *msg_ptr,
169 size_t msg_len, unsigned *msg_prio,
170 const struct timespec *abs_timeout)
172 rb_bug("mq_timedreceive workaround failed");
173 return (mqd_t)-1;
175 # define mq_timedreceive not_timedreceive
176 #endif
178 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
179 static void num2timespec(struct timespec *ts, VALUE t)
181 switch (TYPE(t)) {
182 case T_FIXNUM:
183 case T_BIGNUM:
184 ts->tv_sec = NUM2TIMET(t);
185 ts->tv_nsec = 0;
186 break;
187 case T_FLOAT: {
188 double f, d;
189 double val = RFLOAT_VALUE(t);
191 d = modf(val, &f);
192 if (d >= 0) {
193 ts->tv_nsec = (long)(d * 1e9 + 0.5);
194 } else {
195 ts->tv_nsec = (long)(-d * 1e9 + 0.5);
196 if (ts->tv_nsec > 0) {
197 ts->tv_nsec = 1000000000 - ts->tv_nsec;
198 f -= 1;
201 ts->tv_sec = (time_t)f;
202 if (f != ts->tv_sec)
203 rb_raise(rb_eRangeError, "%f out of range", val);
205 break;
206 default: {
207 VALUE f;
208 VALUE ary = rb_funcall(t, id_divmod, 1, INT2FIX(1));
210 Check_Type(ary, T_ARRAY);
212 ts->tv_sec = NUM2TIMET(rb_ary_entry(ary, 0));
213 f = rb_ary_entry(ary, 1);
214 f = rb_funcall(f, '*', 1, INT2FIX(1000000000));
215 ts->tv_nsec = NUM2LONG(f);
219 #else
220 static void num2timespec(struct timespec *ts, VALUE t)
222 rb_raise(rb_eNotImpError,
223 "mq_timedsend and/or mq_timedreceive missing");
225 #endif
227 static struct timespec *convert_timeout(struct timespec *dest, VALUE t)
229 struct timespec ts, now;
231 if (NIL_P(t))
232 return NULL;
234 num2timespec(&ts, t);
235 clock_gettime(CLOCK_REALTIME, &now);
236 dest->tv_sec = now.tv_sec + ts.tv_sec;
237 dest->tv_nsec = now.tv_nsec + ts.tv_nsec;
239 if (dest->tv_nsec > 1000000000) {
240 dest->tv_nsec -= 1000000000;
241 ++dest->tv_sec;
244 return dest;
247 /* (may) run without GVL */
248 static void * xopen(void *ptr)
250 struct open_args *x = ptr;
252 switch (x->argc) {
253 case 2: x->des = mq_open(x->name, x->oflags); break;
254 case 3: x->des = mq_open(x->name, x->oflags, x->mode, NULL); break;
255 case 4: x->des = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
256 default: x->des = MQD_INVALID;
259 return NULL;
262 /* runs without GVL */
263 static void *xsend(void *ptr)
265 struct rw_args *x = ptr;
267 x->retval = x->timeout ?
268 mq_timedsend(x->des, x->msg_ptr, x->msg_len,
269 x->msg_prio, x->timeout) :
270 mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
272 return NULL;
275 /* runs without GVL */
276 static void * xrecv(void *ptr)
278 struct rw_args *x = ptr;
280 x->received = x->timeout ?
281 mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
282 &x->msg_prio, x->timeout) :
283 mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
285 return NULL;
288 /* called by GC */
289 static void mark(void *ptr)
291 struct posix_mq *mq = ptr;
293 rb_gc_mark(mq->name);
294 rb_gc_mark(mq->thread);
295 MQ_IO_MARK(mq);
298 /* called by GC */
299 static void _free(void *ptr)
301 struct posix_mq *mq = ptr;
303 if (mq->des != MQD_INVALID && MQ_IO_NIL_P(mq)) {
304 /* we ignore errors when gc-ing */
305 mq_close(mq->des);
306 errno = 0;
308 xfree(ptr);
311 /* automatically called at creation (before initialize) */
312 static VALUE alloc(VALUE klass)
314 struct posix_mq *mq;
315 VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
317 mq->des = MQD_INVALID;
318 mq->attr.mq_flags = 0;
319 mq->attr.mq_maxmsg = 0;
320 mq->attr.mq_msgsize = -1;
321 mq->attr.mq_curmsgs = 0;
322 mq->name = Qnil;
323 mq->thread = Qnil;
324 MQ_IO_SET(mq, Qnil);
326 return rv;
329 /* unwraps the posix_mq struct from self */
330 static struct posix_mq *get(VALUE self, int need_valid)
332 struct posix_mq *mq;
334 Data_Get_Struct(self, struct posix_mq, mq);
336 if (need_valid && mq->des == MQD_INVALID)
337 rb_raise(rb_eIOError, "closed queue descriptor");
339 return mq;
342 static void check_struct_type(VALUE astruct)
344 if (CLASS_OF(astruct) == cAttr)
345 return;
346 astruct = rb_inspect(astruct);
347 rb_raise(rb_eTypeError, "not a POSIX_MQ::Attr: %s",
348 StringValuePtr(astruct));
351 static void rstruct2mqattr(struct mq_attr *attr, VALUE astruct, int all)
353 VALUE tmp;
355 check_struct_type(astruct);
356 attr->mq_flags = NUM2LONG(rb_funcall(astruct, id_flags, 0));
358 tmp = rb_funcall(astruct, id_maxmsg, 0);
359 if (all || !NIL_P(tmp))
360 attr->mq_maxmsg = NUM2LONG(tmp);
362 tmp = rb_funcall(astruct, id_msgsize, 0);
363 if (all || !NIL_P(tmp))
364 attr->mq_msgsize = NUM2LONG(tmp);
366 tmp = rb_funcall(astruct, id_curmsgs, 0);
367 if (!NIL_P(tmp))
368 attr->mq_curmsgs = NUM2LONG(tmp);
372 * call-seq:
373 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
375 * Opens a POSIX message queue given by +name+. +name+ should start
376 * with a slash ("/") for portable applications.
378 * If a Symbol is given in place of integer +flags+, then:
380 * * +:r+ is equivalent to IO::RDONLY
381 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
382 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
384 * +mode+ is an integer and only used when IO::CREAT is used.
385 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
386 * If +mq_attr+ is not specified when creating a queue, then the
387 * system defaults will be used.
389 * See the manpage for mq_open(3) for more details on this function.
391 static VALUE init(int argc, VALUE *argv, VALUE self)
393 struct posix_mq *mq = get(self, 0);
394 struct open_args x;
395 VALUE name, oflags, mode, attr;
397 rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
399 switch (TYPE(oflags)) {
400 case T_NIL:
401 x.oflags = O_RDONLY;
402 break;
403 case T_SYMBOL:
404 if (oflags == sym_r)
405 x.oflags = O_RDONLY;
406 else if (oflags == sym_w)
407 x.oflags = O_CREAT|O_WRONLY;
408 else if (oflags == sym_rw)
409 x.oflags = O_CREAT|O_RDWR;
410 else {
411 oflags = rb_inspect(oflags);
412 rb_raise(rb_eArgError,
413 "symbol must be :r, :w, or :rw: %s",
414 StringValuePtr(oflags));
416 break;
417 case T_BIGNUM:
418 case T_FIXNUM:
419 x.oflags = NUM2INT(oflags);
420 break;
421 default:
422 rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
425 x.name = StringValueCStr(name);
426 x.argc = 2;
428 switch (TYPE(mode)) {
429 case T_FIXNUM:
430 x.argc = 3;
431 x.mode = NUM2UINT(mode);
432 break;
433 case T_NIL:
434 if (x.oflags & O_CREAT) {
435 x.argc = 3;
436 x.mode = 0666;
438 break;
439 default:
440 rb_raise(rb_eArgError, "mode not an integer");
443 switch (TYPE(attr)) {
444 case T_STRUCT:
445 x.argc = 4;
446 rstruct2mqattr(&x.attr, attr, 1);
448 /* principle of least surprise */
449 if (x.attr.mq_flags & O_NONBLOCK)
450 x.oflags |= O_NONBLOCK;
451 break;
452 case T_NIL:
453 break;
454 default:
455 check_struct_type(attr);
458 (void)xopen(&x);
459 mq->des = x.des;
460 if (mq->des == MQD_INVALID) {
461 switch (errno) {
462 case ENOMEM:
463 case EMFILE:
464 case ENFILE:
465 case ENOSPC:
466 rb_gc();
467 (void)xopen(&x);
468 mq->des = x.des;
470 if (mq->des == MQD_INVALID)
471 rb_sys_fail("mq_open");
474 mq->name = rb_str_dup(name);
475 if (x.oflags & O_NONBLOCK)
476 mq->attr.mq_flags = O_NONBLOCK;
478 return self;
482 * call-seq:
483 * POSIX_MQ.unlink(name) => 1
485 * Unlinks the message queue given by +name+. The queue will be destroyed
486 * when the last process with the queue open closes its queue descriptors.
488 static VALUE s_unlink(VALUE self, VALUE name)
490 int rv = mq_unlink(StringValueCStr(name));
492 if (rv == -1)
493 rb_sys_fail("mq_unlink");
495 return INT2NUM(1);
499 * call-seq:
500 * mq.unlink => mq
502 * Unlinks the message queue to prevent other processes from accessing it.
503 * All existing queue descriptors to this queue including those opened by
504 * other processes are unaffected. The queue will only be destroyed
505 * when the last process with open descriptors to this queue closes
506 * the descriptors.
508 static VALUE _unlink(VALUE self)
510 struct posix_mq *mq = get(self, 0);
511 int rv;
513 assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
515 rv = mq_unlink(RSTRING_PTR(mq->name));
516 if (rv == -1)
517 rb_sys_fail("mq_unlink");
519 return self;
522 static void setup_send_buffer(struct rw_args *x, VALUE buffer)
524 buffer = rb_obj_as_string(buffer);
525 x->msg_ptr = RSTRING_PTR(buffer);
526 x->msg_len = (size_t)RSTRING_LEN(buffer);
529 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self);
532 * call-seq:
533 * mq.send(string [,priority[, timeout]]) => true
535 * Inserts the given +string+ into the message queue with an optional,
536 * unsigned integer +priority+. If the optional +timeout+ is specified,
537 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
538 * before +timeout+ seconds has elapsed. Without +timeout+, this method
539 * may block until the queue is writable.
541 * On some older systems, the +timeout+ argument is not currently
542 * supported and may raise NotImplementedError if +timeout+ is used.
544 static VALUE my_send(int argc, VALUE *argv, VALUE self)
546 return _send(0, argc, argv, self);
549 static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self)
551 struct posix_mq *mq = get(self, 1);
552 struct rw_args x;
553 VALUE buffer, prio, timeout;
554 struct timespec expire;
556 rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
558 setup_send_buffer(&x, buffer);
559 x.des = mq->des;
560 x.timeout = convert_timeout(&expire, timeout);
561 x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
563 retry:
564 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
565 if (x.retval == -1) {
566 if (errno == EINTR)
567 goto retry;
568 if (errno == EAGAIN && (sflags & PMQ_TRY))
569 return Qfalse;
570 rb_sys_fail("mq_send");
573 return Qtrue;
577 * call-seq:
578 * mq << string => mq
580 * Inserts the given +string+ into the message queue with a
581 * default priority of 0 and no timeout.
583 * Returns itself so its calls may be chained. This use is only
584 * recommended only for users who expect blocking behavior from
585 * the queue.
587 static VALUE send0(VALUE self, VALUE buffer)
589 struct posix_mq *mq = get(self, 1);
590 struct rw_args x;
592 setup_send_buffer(&x, buffer);
593 x.des = mq->des;
594 x.timeout = NULL;
595 x.msg_prio = 0;
597 retry:
598 WITHOUT_GVL(xsend, &x, RUBY_UBF_IO, 0);
599 if (x.retval == -1) {
600 if (errno == EINTR)
601 goto retry;
602 rb_sys_fail("mq_send");
605 return self;
608 #ifdef MQD_TO_FD
610 * call-seq:
611 * mq.to_io => IO
613 * Returns an IO.select-able +IO+ object. This method is only available
614 * under Linux and FreeBSD and is not intended to be portable.
616 static VALUE to_io(VALUE self)
618 struct posix_mq *mq = get(self, 1);
619 int fd = MQD_TO_FD(mq->des);
621 if (NIL_P(mq->io))
622 mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(fd));
624 return mq->io;
626 #endif
628 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self);
631 * call-seq:
632 * mq.receive([buffer, [timeout]]) => [ message, priority ]
634 * Takes the highest priority message off the queue and returns
635 * an array containing the message as a String and the Integer
636 * priority of the message.
638 * If the optional +buffer+ is present, then it must be a String
639 * which will receive the data.
641 * If the optional +timeout+ is present, then it may be a Float
642 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
643 * will be raised if +timeout+ has elapsed and there are no messages
644 * in the queue.
646 * On some older systems, the +timeout+ argument is not currently
647 * supported and may raise NotImplementedError if +timeout+ is used.
649 static VALUE receive(int argc, VALUE *argv, VALUE self)
651 return _receive(PMQ_WANTARRAY, argc, argv, self);
655 * call-seq:
656 * mq.shift([buffer, [timeout]]) => message
658 * Takes the highest priority message off the queue and returns
659 * the message as a String.
661 * If the optional +buffer+ is present, then it must be a String
662 * which will receive the data.
664 * If the optional +timeout+ is present, then it may be a Float
665 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
666 * will be raised if +timeout+ has elapsed and there are no messages
667 * in the queue.
669 * On some older systems, the +timeout+ argument is not currently
670 * supported and may raise NotImplementedError if +timeout+ is used.
672 static VALUE shift(int argc, VALUE *argv, VALUE self)
674 return _receive(0, argc, argv, self);
677 static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self)
679 struct posix_mq *mq = get(self, 1);
680 struct rw_args x;
681 VALUE buffer, timeout;
682 struct timespec expire;
684 if (mq->attr.mq_msgsize < 0) {
685 if (mq_getattr(mq->des, &mq->attr) < 0)
686 rb_sys_fail("mq_getattr");
689 rb_scan_args(argc, argv, "02", &buffer, &timeout);
690 x.timeout = convert_timeout(&expire, timeout);
692 if (NIL_P(buffer)) {
693 buffer = rb_str_new(0, mq->attr.mq_msgsize);
694 } else {
695 StringValue(buffer);
696 rb_str_modify(buffer);
697 rb_str_resize(buffer, mq->attr.mq_msgsize);
699 OBJ_TAINT(buffer);
700 x.msg_ptr = RSTRING_PTR(buffer);
701 x.msg_len = (size_t)mq->attr.mq_msgsize;
702 x.des = mq->des;
704 retry:
705 WITHOUT_GVL(xrecv, &x, RUBY_UBF_IO, 0);
706 if (x.received < 0) {
707 if (errno == EINTR)
708 goto retry;
709 if (errno == EAGAIN && (rflags & PMQ_TRY))
710 return Qnil;
711 rb_sys_fail("mq_receive");
714 rb_str_set_len(buffer, x.received);
716 if (rflags & PMQ_WANTARRAY)
717 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
718 return buffer;
722 * call-seq:
723 * mq.attr => mq_attr
725 * Returns a POSIX_MQ::Attr struct containing the attributes
726 * of the message queue. See the mq_getattr(3) manpage for
727 * more details.
729 static VALUE getattr(VALUE self)
731 struct posix_mq *mq = get(self, 1);
733 if (mq_getattr(mq->des, &mq->attr) < 0)
734 rb_sys_fail("mq_getattr");
736 return rb_funcall(cAttr, id_new, 4,
737 LONG2NUM(mq->attr.mq_flags),
738 LONG2NUM(mq->attr.mq_maxmsg),
739 LONG2NUM(mq->attr.mq_msgsize),
740 LONG2NUM(mq->attr.mq_curmsgs));
744 * call-seq:
745 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
747 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
748 * See the mq_setattr(3) manpage for more details.
750 * Consider using the POSIX_MQ#nonblock= method as it is easier and
751 * more natural to use.
753 static VALUE setattr(VALUE self, VALUE astruct)
755 struct posix_mq *mq = get(self, 1);
756 struct mq_attr newattr;
758 rstruct2mqattr(&newattr, astruct, 0);
760 if (mq_setattr(mq->des, &newattr, NULL) < 0)
761 rb_sys_fail("mq_setattr");
763 return astruct;
767 * call-seq:
768 * mq.close => nil
770 * Closes the underlying message queue descriptor.
771 * If this descriptor had a registered notification request, the request
772 * will be removed so another descriptor or process may register a
773 * notification request. Message queue descriptors are automatically
774 * closed by garbage collection.
776 static VALUE _close(VALUE self)
778 struct posix_mq *mq = get(self, 1);
780 if (! MQ_IO_CLOSE(mq)) {
781 if (mq_close(mq->des) == -1)
782 rb_sys_fail("mq_close");
784 mq->des = MQD_INVALID;
786 return Qnil;
790 * call-seq:
791 * mq.closed? => true or false
793 * Returns +true+ if the message queue descriptor is closed and therefore
794 * unusable, otherwise +false+
796 static VALUE closed(VALUE self)
798 struct posix_mq *mq = get(self, 0);
800 return mq->des == MQD_INVALID ? Qtrue : Qfalse;
804 * call-seq:
805 * mq.name => string
807 * Returns the string name of message queue associated with +mq+
809 static VALUE name(VALUE self)
811 struct posix_mq *mq = get(self, 0);
813 return rb_str_dup(mq->name);
816 static int lookup_sig(VALUE sig)
818 static VALUE list;
819 const char *ptr;
820 long len;
822 sig = rb_obj_as_string(sig);
823 len = RSTRING_LEN(sig);
824 ptr = RSTRING_PTR(sig);
826 if (len > 3 && !memcmp("SIG", ptr, 3))
827 sig = rb_str_new(ptr + 3, len - 3);
829 if (!list) {
830 VALUE mSignal = rb_const_get(rb_cObject, rb_intern("Signal"));
832 list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
833 rb_global_variable(&list);
836 sig = rb_hash_aref(list, sig);
837 if (NIL_P(sig))
838 rb_raise(rb_eArgError, "invalid signal: %s\n", ptr);
840 return NUM2INT(sig);
844 * TODO: Under Linux, we could just use netlink directly
845 * the same way glibc does...
847 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
848 static void thread_notify_fd(union sigval sv)
850 int fd = sv.sival_int;
852 while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
855 static void my_mq_notify(mqd_t des, struct sigevent *not)
857 int rv = mq_notify(des, not);
859 if (rv == -1) {
860 if (errno == ENOMEM) {
861 rb_gc();
862 rv = mq_notify(des, not);
864 if (rv == -1)
865 rb_sys_fail("mq_notify");
869 static void lower_stack_size(pthread_attr_t *attr)
871 /* some OSes have ridiculously small stack sizes */
872 #ifdef PTHREAD_STACK_MIN
873 size_t stack_size = PTHREAD_STACK_MIN;
874 size_t min_size = 4096;
876 if (stack_size < min_size)
877 stack_size = min_size;
878 pthread_attr_setstacksize(attr, stack_size);
879 #endif
882 /* :nodoc: */
883 static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
885 int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
886 struct posix_mq *mq = get(self, 1);
887 struct sigevent not;
888 pthread_attr_t attr;
890 errno = pthread_attr_init(&attr);
891 if (errno) rb_sys_fail("pthread_attr_init");
893 errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
894 if (errno) rb_sys_fail("pthread_attr_setdetachstate");
896 lower_stack_size(&attr);
897 not.sigev_notify = SIGEV_THREAD;
898 not.sigev_notify_function = thread_notify_fd;
899 not.sigev_notify_attributes = &attr;
900 not.sigev_value.sival_int = fd;
902 if (!NIL_P(mq->thread))
903 rb_funcall(mq->thread, id_kill, 0, 0);
904 mq->thread = thr;
906 my_mq_notify(mq->des, &not);
908 return thr;
911 /* :nodoc: */
912 static VALUE notify_cleanup(VALUE self)
914 struct posix_mq *mq = get(self, 1);
916 if (!NIL_P(mq->thread)) {
917 rb_funcall(mq->thread, id_kill, 0, 0);
918 mq->thread = Qnil;
920 return Qnil;
924 * call-seq:
925 * mq.notify = signal => signal
927 * Registers the notification request to deliver a given +signal+
928 * to the current process when message is received.
929 * If +signal+ is +nil+, it will unregister and disable the notification
930 * request to allow other processes to register a request.
931 * If +signal+ is +false+, it will register a no-op notification request
932 * which will prevent other processes from registering a notification.
933 * If +signal+ is an +IO+ object, it will spawn a thread upon the
934 * arrival of the next message and write one "\\0" byte to the file
935 * descriptor belonging to that IO object.
936 * Only one process may have a notification request for a queue
937 * at a time, Errno::EBUSY will be raised if there is already
938 * a notification request registration for the queue.
940 * Notifications are only fired once and processes must reregister
941 * for subsequent notifications.
943 * For readers of the mq_notify(3) manpage, passing +false+
944 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
945 * of passing a NULL notification pointer to mq_notify(3).
947 static VALUE setnotify(VALUE self, VALUE arg)
949 struct posix_mq *mq = get(self, 1);
950 struct sigevent not;
951 struct sigevent * notification = &not;
952 VALUE rv = arg;
954 notify_cleanup(self);
955 not.sigev_notify = SIGEV_SIGNAL;
957 switch (TYPE(arg)) {
958 case T_FALSE:
959 not.sigev_notify = SIGEV_NONE;
960 break;
961 case T_NIL:
962 notification = NULL;
963 break;
964 case T_FIXNUM:
965 not.sigev_signo = NUM2INT(arg);
966 break;
967 case T_SYMBOL:
968 case T_STRING:
969 not.sigev_signo = lookup_sig(arg);
970 rv = INT2NUM(not.sigev_signo);
971 break;
972 default:
973 rb_raise(rb_eArgError, "must be a signal or nil");
976 my_mq_notify(mq->des, notification);
978 return rv;
982 * call-seq:
983 * mq.nonblock? => true or false
985 * Returns the current non-blocking state of the message queue descriptor.
987 static VALUE nonblock_p(VALUE self)
989 struct posix_mq *mq = get(self, 1);
991 if (mq_getattr(mq->des, &mq->attr) < 0)
992 rb_sys_fail("mq_getattr");
993 return mq->attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
997 * call-seq:
998 * mq.nonblock = boolean => boolean
1000 * Enables or disables non-blocking operation for the message queue
1001 * descriptor. Errno::EAGAIN will be raised in situations where
1002 * the queue would block. This is not compatible with +timeout+
1003 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
1005 static VALUE setnonblock(VALUE self, VALUE nb)
1007 struct mq_attr newattr;
1008 struct posix_mq *mq = get(self, 1);
1010 if (nb == Qtrue)
1011 newattr.mq_flags = O_NONBLOCK;
1012 else if (nb == Qfalse)
1013 newattr.mq_flags = 0;
1014 else
1015 rb_raise(rb_eArgError, "must be true or false");
1017 if (mq_setattr(mq->des, &newattr, &mq->attr) < 0)
1018 rb_sys_fail("mq_setattr");
1020 mq->attr.mq_flags = newattr.mq_flags;
1022 return nb;
1026 * call-seq:
1027 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
1029 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
1030 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1031 * on success instead of +nil+.
1033 * This does not guarantee non-blocking behavior, the message queue must
1034 * be made non-blocking before calling this method.
1036 static VALUE trysend(int argc, VALUE *argv, VALUE self)
1038 return _send(PMQ_TRY, argc, argv, self);
1042 * call-seq:
1043 * mq.tryshift([buffer [, timeout]]) => message or nil
1045 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1046 * Errno::EAGAIN when non-blocking operation is desired.
1048 * This does not guarantee non-blocking behavior, the message queue must
1049 * be made non-blocking before calling this method.
1051 static VALUE tryshift(int argc, VALUE *argv, VALUE self)
1053 return _receive(PMQ_TRY, argc, argv, self);
1057 * call-seq:
1058 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1060 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1061 * Errno::EAGAIN when non-blocking operation is desired.
1063 * This does not guarantee non-blocking behavior, the message queue must
1064 * be made non-blocking before calling this method.
1066 static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
1068 return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
1071 void Init_posix_mq_ext(void)
1073 VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
1074 rb_define_alloc_func(cPOSIX_MQ, alloc);
1075 cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
1078 * The maximum number of open message descriptors supported
1079 * by the system. This may be -1, in which case it is dynamically
1080 * set at runtime. Consult your operating system documentation
1081 * for system-specific information about this.
1083 rb_define_const(cPOSIX_MQ, "OPEN_MAX",
1084 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
1087 * The maximum priority that may be specified for POSIX_MQ#send
1088 * On POSIX-compliant systems, this is at least 31, but some
1089 * systems allow higher limits.
1090 * The minimum priority is always zero.
1092 rb_define_const(cPOSIX_MQ, "PRIO_MAX",
1093 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
1095 rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
1097 rb_define_private_method(cPOSIX_MQ, "initialize", init, -1);
1098 rb_define_method(cPOSIX_MQ, "send", my_send, -1);
1099 rb_define_method(cPOSIX_MQ, "<<", send0, 1);
1100 rb_define_method(cPOSIX_MQ, "trysend", trysend, -1);
1101 rb_define_method(cPOSIX_MQ, "receive", receive, -1);
1102 rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1);
1103 rb_define_method(cPOSIX_MQ, "shift", shift, -1);
1104 rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1);
1105 rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
1106 rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
1107 rb_define_method(cPOSIX_MQ, "close", _close, 0);
1108 rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
1109 rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
1110 rb_define_method(cPOSIX_MQ, "name", name, 0);
1111 rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
1112 rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
1113 rb_define_private_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
1114 rb_define_private_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
1115 rb_define_method(cPOSIX_MQ, "nonblock?", nonblock_p, 0);
1116 #ifdef MQD_TO_FD
1117 rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
1118 #endif
1120 id_new = rb_intern("new");
1121 id_kill = rb_intern("kill");
1122 id_fileno = rb_intern("fileno");
1123 id_divmod = rb_intern("divmod");
1124 id_flags = rb_intern("flags");
1125 id_maxmsg = rb_intern("maxmsg");
1126 id_msgsize = rb_intern("msgsize");
1127 id_curmsgs = rb_intern("curmsgs");
1128 sym_r = ID2SYM(rb_intern("r"));
1129 sym_w = ID2SYM(rb_intern("w"));
1130 sym_rw = ID2SYM(rb_intern("rw"));