1 #define _XOPEN_SOURCE 600
12 #if defined(__linux__)
14 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
15 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
17 # warning mqd_t is not select()-able on your OS
19 # define MQ_IO_MARK(mq) ((void)(0))
20 # define MQ_IO_SET(mq,val) ((void)(0))
21 #endif /* non-Linux */
32 static VALUE cPOSIX_MQ
, cAttr
;
34 static ID sym_r
, sym_w
, sym_rw
;
35 static const mqd_t MQD_INVALID
= (mqd_t
)-1;
37 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
39 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
42 # define RSTRING_LEN(s) (RSTRING(s)->len)
45 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
48 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
51 #ifndef HAVE_RB_STR_SET_LEN
53 # define rb_str_set_len(str,len) rb_str_resize(str,len)
54 # else /* 1.8.6 optimized version */
55 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
56 static void rb_18_str_set_len(VALUE str
, long len
)
58 RSTRING(str
)->len
= len
;
59 RSTRING(str
)->ptr
[len
] = '\0';
62 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
63 # endif /* ! RUBINIUS */
64 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
66 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
67 static VALUE
rb_struct_alloc_noinit(VALUE
class)
69 return rb_funcall(class, id_new
, 0, 0);
71 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
73 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
74 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
76 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
77 typedef void rb_unblock_function_t(void *);
78 typedef VALUE
rb_blocking_function_t(void *);
80 rb_thread_blocking_region(
81 rb_blocking_function_t
*func
, void *data1
,
82 rb_unblock_function_t
*ubf
, void *data2
)
86 assert(RUBY_UBF_IO
== ubf
&& "RUBY_UBF_IO required for emulation");
94 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
96 /* used to pass arguments to mq_open inside blocking region */
105 /* used to pass arguments to mq_send/mq_receive inside blocking region */
111 struct timespec
*timeout
;
114 /* hope it's there..., TODO: a better version that works in rbx */
115 struct timeval
rb_time_interval(VALUE
);
117 static struct timespec
*convert_timeout(struct timespec
*dest
, VALUE time
)
119 struct timeval tv
, now
;
124 tv
= rb_time_interval(time
); /* aggregate return :( */
125 gettimeofday(&now
, NULL
);
126 dest
->tv_sec
= now
.tv_sec
+ tv
.tv_sec
;
127 dest
->tv_nsec
= (now
.tv_usec
+ tv
.tv_usec
) * 1000;
129 if (dest
->tv_nsec
> 1000000000) {
130 dest
->tv_nsec
-= 1000000000;
137 /* runs without GVL */
138 static VALUE
xopen(void *ptr
)
140 struct open_args
*x
= ptr
;
144 case 2: rv
= mq_open(x
->name
, x
->oflags
); break;
145 case 3: rv
= mq_open(x
->name
, x
->oflags
, x
->mode
, NULL
); break;
146 case 4: rv
= mq_open(x
->name
, x
->oflags
, x
->mode
, &x
->attr
); break;
147 default: rv
= MQD_INVALID
;
153 /* runs without GVL */
154 static VALUE
xsend(void *ptr
)
156 struct rw_args
*x
= ptr
;
159 return (VALUE
)mq_timedsend(x
->des
, x
->msg_ptr
, x
->msg_len
,
160 x
->msg_prio
, x
->timeout
);
162 return (VALUE
)mq_send(x
->des
, x
->msg_ptr
, x
->msg_len
, x
->msg_prio
);
165 /* runs without GVL */
166 static VALUE
xrecv(void *ptr
)
168 struct rw_args
*x
= ptr
;
171 return (VALUE
)mq_timedreceive(x
->des
, x
->msg_ptr
, x
->msg_len
,
172 &x
->msg_prio
, x
->timeout
);
174 return (VALUE
)mq_receive(x
->des
, x
->msg_ptr
, x
->msg_len
, &x
->msg_prio
);
177 /* runs without GVL, path resolution may be slow */
178 static VALUE
xunlink(void *ptr
)
180 VALUE name
= (VALUE
)ptr
;
182 return (VALUE
)mq_unlink(RSTRING_PTR(name
));
186 static void mark(void *ptr
)
188 struct posix_mq
*mq
= ptr
;
190 rb_gc_mark(mq
->name
);
195 static void _free(void *ptr
)
197 struct posix_mq
*mq
= ptr
;
199 if (mq
->des
!= MQD_INVALID
) {
200 /* we ignore errors when gc-ing */
201 int saved_errno
= errno
;
205 mq
->des
= MQD_INVALID
;
209 /* automatically called at creation (before initialize) */
210 static VALUE
alloc(VALUE klass
)
213 VALUE rv
= Data_Make_Struct(klass
, struct posix_mq
, mark
, _free
, mq
);
215 mq
->des
= MQD_INVALID
;
223 /* unwraps the posix_mq struct from self */
224 static struct posix_mq
*get(VALUE self
, int need_valid
)
228 Data_Get_Struct(self
, struct posix_mq
, mq
);
230 if (need_valid
&& mq
->des
== MQD_INVALID
)
231 rb_raise(rb_eIOError
, "closed queue descriptor");
236 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
237 static void attr_from_struct(struct mq_attr
*attr
, VALUE astruct
, int all
)
241 if (CLASS_OF(astruct
) != cAttr
)
242 rb_raise(rb_eArgError
, "not a POSIX_MQ::Attr: %s",
243 RSTRING_PTR(rb_inspect(astruct
)));
245 ptr
= RSTRUCT_PTR(astruct
);
247 attr
->mq_flags
= NUM2LONG(ptr
[0]);
249 if (all
|| !NIL_P(ptr
[1]))
250 attr
->mq_maxmsg
= NUM2LONG(ptr
[1]);
251 if (all
|| !NIL_P(ptr
[2]))
252 attr
->mq_msgsize
= NUM2LONG(ptr
[2]);
254 attr
->mq_curmsgs
= NUM2LONG(ptr
[3]);
259 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
261 * Opens a POSIX message queue given by +name+. +name+ should start
262 * with a slash ("/") for portable applications.
264 * If a Symbol is given in place of integer +flags+, then:
266 * * +:r+ is equivalent to IO::RDONLY
267 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
268 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
270 * +mode+ is an integer and only used when IO::CREAT is used.
271 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
272 * If +mq_attr+ is not specified when creating a queue, then the
273 * system defaults will be used.
275 * See the manpage for mq_open(3) for more details on this function.
277 static VALUE
init(int argc
, VALUE
*argv
, VALUE self
)
279 struct posix_mq
*mq
= get(self
, 0);
281 VALUE name
, oflags
, mode
, attr
;
283 rb_scan_args(argc
, argv
, "13", &name
, &oflags
, &mode
, &attr
);
285 if (TYPE(name
) != T_STRING
)
286 rb_raise(rb_eArgError
, "name must be a string");
288 switch (TYPE(oflags
)) {
295 else if (oflags
== sym_w
)
296 x
.oflags
= O_CREAT
|O_WRONLY
;
297 else if (oflags
== sym_rw
)
298 x
.oflags
= O_CREAT
|O_RDWR
;
300 rb_raise(rb_eArgError
,
301 "symbol must be :r, :w, or :rw: %s",
302 RSTRING_PTR(rb_inspect(oflags
)));
306 x
.oflags
= NUM2INT(oflags
);
309 rb_raise(rb_eArgError
, "flags must be an int, :r, :w, or :wr");
312 x
.name
= RSTRING_PTR(name
);
315 switch (TYPE(mode
)) {
318 x
.mode
= NUM2INT(mode
);
321 if (x
.oflags
& O_CREAT
) {
327 rb_raise(rb_eArgError
, "mode not an integer");
330 switch (TYPE(attr
)) {
333 attr_from_struct(&x
.attr
, attr
, 1);
335 /* principle of least surprise */
336 if (x
.attr
.mq_flags
& O_NONBLOCK
)
337 x
.oflags
|= O_NONBLOCK
;
342 rb_raise(rb_eArgError
, "attr must be a POSIX_MQ::Attr: %s",
343 RSTRING_PTR(rb_inspect(attr
)));
346 mq
->des
= (mqd_t
)rb_thread_blocking_region(xopen
, &x
, RUBY_UBF_IO
, 0);
347 if (mq
->des
== MQD_INVALID
)
348 rb_sys_fail("mq_open");
350 mq
->name
= rb_str_dup(name
);
357 * POSIX_MQ.unlink(name) => 1
359 * Unlinks the message queue given by +name+. The queue will be destroyed
360 * when the last process with the queue open closes its queue descriptors.
362 static VALUE
s_unlink(VALUE self
, VALUE name
)
365 void *ptr
= (void *)name
;
367 if (TYPE(name
) != T_STRING
)
368 rb_raise(rb_eArgError
, "argument must be a string");
370 rv
= (mqd_t
)rb_thread_blocking_region(xunlink
, ptr
, RUBY_UBF_IO
, 0);
371 if (rv
== MQD_INVALID
)
372 rb_sys_fail("mq_unlink");
381 * Unlinks the message queue to prevent other processes from accessing it.
382 * All existing queue descriptors to this queue including those opened by
383 * other processes are unaffected. The queue will only be destroyed
384 * when the last process with open descriptors to this queue closes
387 static VALUE
_unlink(VALUE self
)
389 struct posix_mq
*mq
= get(self
, 0);
391 void *ptr
= (void *)mq
->name
;
393 assert(TYPE(mq
->name
) == T_STRING
&& "mq->name is not a string");
395 rv
= (mqd_t
)rb_thread_blocking_region(xunlink
, ptr
, RUBY_UBF_IO
, 0);
396 if (rv
== MQD_INVALID
)
397 rb_sys_fail("mq_unlink");
402 static void setup_send_buffer(struct rw_args
*x
, VALUE buffer
)
404 buffer
= rb_obj_as_string(buffer
);
405 x
->msg_ptr
= RSTRING_PTR(buffer
);
406 x
->msg_len
= (size_t)RSTRING_LEN(buffer
);
411 * mq.send(string [,priority[, timeout]]) => nil
413 * Inserts the given +string+ into the message queue with an optional,
414 * unsigned integer +priority+. If the optional +timeout+ is specified,
415 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
416 * before +timeout+ seconds has elapsed. Without +timeout+, this method
417 * may block until the queue is writable.
419 static VALUE
_send(int argc
, VALUE
*argv
, VALUE self
)
421 struct posix_mq
*mq
= get(self
, 1);
423 VALUE buffer
, prio
, timeout
;
425 struct timespec expire
;
427 rb_scan_args(argc
, argv
, "12", &buffer
, &prio
, &timeout
);
429 setup_send_buffer(&x
, buffer
);
431 x
.timeout
= convert_timeout(&expire
, timeout
);
432 x
.msg_prio
= NIL_P(prio
) ? 0 : NUM2UINT(prio
);
434 rv
= (mqd_t
)rb_thread_blocking_region(xsend
, &x
, RUBY_UBF_IO
, 0);
435 if (rv
== MQD_INVALID
)
436 rb_sys_fail("mq_send");
445 * Inserts the given +string+ into the message queue with a
446 * default priority of 0 and no timeout.
448 static VALUE
send0(VALUE self
, VALUE buffer
)
450 struct posix_mq
*mq
= get(self
, 1);
454 setup_send_buffer(&x
, buffer
);
459 rv
= (mqd_t
)rb_thread_blocking_region(xsend
, &x
, RUBY_UBF_IO
, 0);
460 if (rv
== MQD_INVALID
)
461 rb_sys_fail("mq_send");
471 * Returns an IO.select-able +IO+ object. This method is only available
472 * under Linux and is not intended to be portable.
474 static VALUE
to_io(VALUE self
)
476 struct posix_mq
*mq
= get(self
, 1);
479 mq
->io
= rb_funcall(rb_cIO
, id_new
, 1, INT2NUM(mq
->des
));
485 static void get_msgsize(struct posix_mq
*mq
)
489 if (mq_getattr(mq
->des
, &attr
) == MQD_INVALID
)
490 rb_sys_fail("mq_getattr");
492 mq
->msgsize
= attr
.mq_msgsize
;
497 * mq.receive([buffer, [timeout]]) => [ message, priority ]
499 * Takes the highest priority message off the queue and returns
500 * an array containing the message as a String and the Integer
501 * priority of the message.
503 * If the optional +buffer+ is present, then it must be a String
504 * which will receive the data.
506 * If the optional +timeout+ is present, then it may be a Float
507 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
508 * will be raised if +timeout+ has elapsed and there are no messages
511 static VALUE
receive(int argc
, VALUE
*argv
, VALUE self
)
513 struct posix_mq
*mq
= get(self
, 1);
515 VALUE buffer
, timeout
;
517 struct timespec expire
;
522 rb_scan_args(argc
, argv
, "02", &buffer
, &timeout
);
523 x
.timeout
= convert_timeout(&expire
, timeout
);
526 buffer
= rb_str_new(0, mq
->msgsize
);
529 rb_str_modify(buffer
);
530 rb_str_resize(buffer
, mq
->msgsize
);
533 x
.msg_ptr
= RSTRING_PTR(buffer
);
534 x
.msg_len
= (size_t)mq
->msgsize
;
537 r
= (ssize_t
)rb_thread_blocking_region(xrecv
, &x
, RUBY_UBF_IO
, 0);
539 rb_sys_fail("mq_receive");
541 rb_str_set_len(buffer
, r
);
543 return rb_ary_new3(2, buffer
, UINT2NUM(x
.msg_prio
));
550 * Returns a POSIX_MQ::Attr struct containing the attributes
551 * of the message queue. See the mq_getattr(3) manpage for
554 static VALUE
getattr(VALUE self
)
556 struct posix_mq
*mq
= get(self
, 1);
561 if (mq_getattr(mq
->des
, &attr
) == MQD_INVALID
)
562 rb_sys_fail("mq_getattr");
564 astruct
= rb_struct_alloc_noinit(cAttr
);
565 ptr
= RSTRUCT_PTR(astruct
);
566 ptr
[0] = LONG2NUM(attr
.mq_flags
);
567 ptr
[1] = LONG2NUM(attr
.mq_maxmsg
);
568 ptr
[2] = LONG2NUM(attr
.mq_msgsize
);
569 ptr
[3] = LONG2NUM(attr
.mq_curmsgs
);
576 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
578 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
579 * See the mq_setattr(3) manpage for more details.
581 * Consider using the POSIX_MQ#nonblock= method as it is easier and
582 * more natural to use.
584 static VALUE
setattr(VALUE self
, VALUE astruct
)
586 struct posix_mq
*mq
= get(self
, 1);
587 struct mq_attr newattr
;
589 attr_from_struct(&newattr
, astruct
, 0);
591 if (mq_setattr(mq
->des
, &newattr
, NULL
) == MQD_INVALID
)
592 rb_sys_fail("mq_setattr");
601 * Closes the underlying message queue descriptor.
602 * If this descriptor had a registered notification request, the request
603 * will be removed so another descriptor or process may register a
604 * notification request. Message queue descriptors are automatically
605 * closed by garbage collection.
607 static VALUE
_close(VALUE self
)
609 struct posix_mq
*mq
= get(self
, 1);
611 if (mq_close(mq
->des
) == MQD_INVALID
)
612 rb_sys_fail("mq_close");
614 mq
->des
= MQD_INVALID
;
622 * mq.closed? => true or false
624 * Returns +true+ if the message queue descriptor is closed and therefore
625 * unusable, otherwise +false+
627 static VALUE
closed(VALUE self
)
629 struct posix_mq
*mq
= get(self
, 0);
631 return mq
->des
== MQD_INVALID
? Qtrue
: Qfalse
;
638 * Returns the string name of message queue associated with +mq+
640 static VALUE
name(VALUE self
)
642 struct posix_mq
*mq
= get(self
, 0);
647 static int lookup_sig(VALUE sig
)
653 sig
= rb_obj_as_string(sig
);
654 len
= RSTRING_LEN(sig
);
655 ptr
= RSTRING_PTR(sig
);
657 if (len
> 3 && !memcmp("SIG", ptr
, 3))
658 sig
= rb_str_new(ptr
+ 3, len
- 3);
661 VALUE mSignal
= rb_define_module("Signal"""); /* avoid RDoc */
663 list
= rb_funcall(mSignal
, rb_intern("list"), 0, 0);
664 rb_global_variable(&list
);
667 sig
= rb_hash_aref(list
, sig
);
669 rb_raise(rb_eArgError
, "invalid signal: %s\n",
670 RSTRING_PTR(rb_inspect(sig
)));
677 * mq.notify = signal => signal
679 * Registers the notification request to deliver a given +signal+
680 * to the current process when message is received.
681 * If +signal+ is +nil+, it will unregister and disable the notification
682 * request to allow other processes to register a request.
683 * Only one process may have a notification request for a queue
684 * at a time, Errno::EBUSY will be raised if there is already
685 * a notification request registration for the queue.
687 static VALUE
setnotify(VALUE self
, VALUE arg
)
689 struct posix_mq
*mq
= get(self
, 1);
693 not.sigev_notify
= SIGEV_SIGNAL
;
697 not.sigev_signo
= NUM2INT(arg
);
701 not.sigev_signo
= lookup_sig(arg
);
702 rv
= INT2NUM(not.sigev_signo
);
705 not.sigev_notify
= SIGEV_NONE
;
708 /* maybe support Proc+thread via sigev_notify_function.. */
709 rb_raise(rb_eArgError
, "must be a signal or nil");
712 if (mq_notify(mq
->des
, ¬) == MQD_INVALID
)
713 rb_sys_fail("mq_notify");
720 * mq.nonblock? => true or false
722 * Returns the current non-blocking state of the message queue descriptor.
724 static VALUE
getnonblock(VALUE self
)
727 struct posix_mq
*mq
= get(self
, 1);
729 if (mq_getattr(mq
->des
, &attr
) == MQD_INVALID
)
730 rb_sys_fail("mq_getattr");
732 mq
->msgsize
= attr
.mq_msgsize
; /* optimization */
734 return attr
.mq_flags
& O_NONBLOCK
? Qtrue
: Qfalse
;
739 * mq.nonblock = boolean => boolean
741 * Enables or disables non-blocking operation for the message queue
742 * descriptor. Errno::EAGAIN will be raised in situations where
743 * the queue would block. This is not compatible with +timeout+
744 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
746 static VALUE
setnonblock(VALUE self
, VALUE nb
)
748 struct mq_attr newattr
, oldattr
;
749 struct posix_mq
*mq
= get(self
, 1);
752 newattr
.mq_flags
= O_NONBLOCK
;
753 else if (nb
== Qfalse
)
754 newattr
.mq_flags
= 0;
756 rb_raise(rb_eArgError
, "must be true or false");
758 if (mq_setattr(mq
->des
, &newattr
, &oldattr
) == MQD_INVALID
)
759 rb_sys_fail("mq_setattr");
761 mq
->msgsize
= oldattr
.mq_msgsize
; /* optimization */
766 void Init_posix_mq_ext(void)
768 cPOSIX_MQ
= rb_define_class("POSIX_MQ", rb_cObject
);
769 rb_define_alloc_func(cPOSIX_MQ
, alloc
);
770 cAttr
= rb_const_get(cPOSIX_MQ
, rb_intern("Attr"));
773 * The maximum number of open message descriptors supported
774 * by the system. This may be -1, in which case it is dynamically
775 * set at runtime. Consult your operating system documentation
776 * for system-specific information about this.
778 rb_define_const(cPOSIX_MQ
, "OPEN_MAX",
779 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX
)));
782 * The maximum priority that may be specified for POSIX_MQ#send
783 * On POSIX-compliant systems, this is at least 31, but some
784 * systems allow higher limits.
785 * The minimum priority is always zero.
787 rb_define_const(cPOSIX_MQ
, "PRIO_MAX",
788 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX
)));
790 rb_define_singleton_method(cPOSIX_MQ
, "unlink", s_unlink
, 1);
792 rb_define_method(cPOSIX_MQ
, "initialize", init
, -1);
793 rb_define_method(cPOSIX_MQ
, "send", _send
, -1);
794 rb_define_method(cPOSIX_MQ
, "<<", send0
, 1);
795 rb_define_method(cPOSIX_MQ
, "receive", receive
, -1);
796 rb_define_method(cPOSIX_MQ
, "attr", getattr
, 0);
797 rb_define_method(cPOSIX_MQ
, "attr=", setattr
, 1);
798 rb_define_method(cPOSIX_MQ
, "close", _close
, 0);
799 rb_define_method(cPOSIX_MQ
, "closed?", closed
, 0);
800 rb_define_method(cPOSIX_MQ
, "unlink", _unlink
, 0);
801 rb_define_method(cPOSIX_MQ
, "name", name
, 0);
802 rb_define_method(cPOSIX_MQ
, "notify=", setnotify
, 1);
803 rb_define_method(cPOSIX_MQ
, "nonblock=", setnonblock
, 1);
804 rb_define_method(cPOSIX_MQ
, "nonblock?", getnonblock
, 0);
806 rb_define_method(cPOSIX_MQ
, "to_io", to_io
, 0);
809 id_new
= rb_intern("new");
810 sym_r
= ID2SYM(rb_intern("r"));
811 sym_w
= ID2SYM(rb_intern("w"));
812 sym_rw
= ID2SYM(rb_intern("rw"));