1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
14 # define NUM2TIMET NUM2INT
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
30 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
32 # define MQ_IO_MARK(mq) ((void)(0))
33 # define MQ_IO_SET(mq,val) ((void)(0))
34 # define MQ_IO_CLOSE(mq) ((int)(0))
35 # define MQ_IO_NIL_P(mq) ((int)(1))
49 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
50 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
51 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
52 static int MQ_IO_CLOSE(struct posix_mq
*mq
)
57 /* not safe during GC */
65 # define PMQ_WANTARRAY (1<<0)
66 # define PMQ_TRY (1<<1)
69 static ID id_new
, id_kill
, id_fileno
, id_mul
, id_divmod
;
70 static ID id_flags
, id_maxmsg
, id_msgsize
, id_curmsgs
;
71 static VALUE sym_r
, sym_w
, sym_rw
;
72 static const mqd_t MQD_INVALID
= (mqd_t
)-1;
74 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
76 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
79 # define RSTRING_LEN(s) (RSTRING(s)->len)
82 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
85 #ifndef HAVE_RB_STR_SET_LEN
86 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
88 # error upgrade Rubinius, rb_str_set_len should be available
90 static void rb_18_str_set_len(VALUE str
, long len
)
92 RSTRING(str
)->len
= len
;
93 RSTRING(str
)->ptr
[len
] = '\0';
95 #define rb_str_set_len rb_18_str_set_len
96 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
98 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
99 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
100 # include <rubysig.h>
101 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
102 typedef void rb_unblock_function_t(void *);
103 typedef VALUE
rb_blocking_function_t(void *);
105 rb_thread_blocking_region(
106 rb_blocking_function_t
*func
, void *data1
,
107 rb_unblock_function_t
*ubf
, void *data2
)
111 assert(RUBY_UBF_IO
== ubf
&& "RUBY_UBF_IO required for emulation");
119 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
121 /* used to pass arguments to mq_open inside blocking region */
130 /* used to pass arguments to mq_send/mq_receive inside blocking region */
136 struct timespec
*timeout
;
139 #ifndef HAVE_MQ_TIMEDSEND
141 not_timedsend(mqd_t mqdes
, const char *msg_ptr
,
142 size_t msg_len
, unsigned msg_prio
,
143 const struct timespec
*abs_timeout
)
145 rb_bug("mq_timedsend workaround failed");
148 # define mq_timedsend not_timedsend
150 #ifndef HAVE_MQ_TIMEDRECEIVE
152 not_timedreceive(mqd_t mqdes
, char *msg_ptr
,
153 size_t msg_len
, unsigned *msg_prio
,
154 const struct timespec
*abs_timeout
)
156 rb_bug("mq_timedreceive workaround failed");
159 # define mq_timedreceive not_timedreceive
162 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
163 static void num2timespec(struct timespec
*ts
, VALUE t
)
168 ts
->tv_sec
= NUM2TIMET(t
);
173 double val
= RFLOAT_VALUE(t
);
177 ts
->tv_nsec
= (long)(d
* 1e9
+ 0.5);
179 ts
->tv_nsec
= (long)(-d
* 1e9
+ 0.5);
180 if (ts
->tv_nsec
> 0) {
181 ts
->tv_nsec
= 1000000000 - ts
->tv_nsec
;
185 ts
->tv_sec
= (time_t)f
;
187 rb_raise(rb_eRangeError
, "%f out of range", val
);
188 ts
->tv_sec
= (time_t)f
;
193 VALUE ary
= rb_funcall(t
, id_divmod
, 1, INT2FIX(1));
195 Check_Type(ary
, T_ARRAY
);
197 ts
->tv_sec
= NUM2TIMET(rb_ary_entry(ary
, 0));
198 f
= rb_ary_entry(ary
, 1);
199 f
= rb_funcall(f
, id_mul
, 1, INT2FIX(1000000000));
200 ts
->tv_nsec
= NUM2LONG(f
);
205 static void num2timespec(struct timespec
*ts
, VALUE t
)
207 rb_raise(rb_eNotImpError
,
208 "mq_timedsend and/or mq_timedreceive missing");
212 static struct timespec
*convert_timeout(struct timespec
*dest
, VALUE t
)
214 struct timespec ts
, now
;
219 num2timespec(&ts
, t
);
220 clock_gettime(CLOCK_REALTIME
, &now
);
221 dest
->tv_sec
= now
.tv_sec
+ ts
.tv_sec
;
222 dest
->tv_nsec
= now
.tv_nsec
+ ts
.tv_nsec
;
224 if (dest
->tv_nsec
> 1000000000) {
225 dest
->tv_nsec
-= 1000000000;
232 /* (may) run without GVL */
233 static VALUE
xopen(void *ptr
)
235 struct open_args
*x
= ptr
;
239 case 2: rv
= mq_open(x
->name
, x
->oflags
); break;
240 case 3: rv
= mq_open(x
->name
, x
->oflags
, x
->mode
, NULL
); break;
241 case 4: rv
= mq_open(x
->name
, x
->oflags
, x
->mode
, &x
->attr
); break;
242 default: rv
= MQD_INVALID
;
248 /* runs without GVL */
249 static VALUE
xsend(void *ptr
)
251 struct rw_args
*x
= ptr
;
254 return (VALUE
)mq_timedsend(x
->des
, x
->msg_ptr
, x
->msg_len
,
255 x
->msg_prio
, x
->timeout
);
257 return (VALUE
)mq_send(x
->des
, x
->msg_ptr
, x
->msg_len
, x
->msg_prio
);
260 /* runs without GVL */
261 static VALUE
xrecv(void *ptr
)
263 struct rw_args
*x
= ptr
;
266 return (VALUE
)mq_timedreceive(x
->des
, x
->msg_ptr
, x
->msg_len
,
267 &x
->msg_prio
, x
->timeout
);
269 return (VALUE
)mq_receive(x
->des
, x
->msg_ptr
, x
->msg_len
, &x
->msg_prio
);
273 static void mark(void *ptr
)
275 struct posix_mq
*mq
= ptr
;
277 rb_gc_mark(mq
->name
);
278 rb_gc_mark(mq
->thread
);
283 static void _free(void *ptr
)
285 struct posix_mq
*mq
= ptr
;
287 if (mq
->des
!= MQD_INVALID
&& MQ_IO_NIL_P(mq
)) {
288 /* we ignore errors when gc-ing */
295 /* automatically called at creation (before initialize) */
296 static VALUE
alloc(VALUE klass
)
299 VALUE rv
= Data_Make_Struct(klass
, struct posix_mq
, mark
, _free
, mq
);
301 mq
->des
= MQD_INVALID
;
302 mq
->attr
.mq_flags
= 0;
303 mq
->attr
.mq_maxmsg
= 0;
304 mq
->attr
.mq_msgsize
= -1;
305 mq
->attr
.mq_curmsgs
= 0;
313 /* unwraps the posix_mq struct from self */
314 static struct posix_mq
*get(VALUE self
, int need_valid
)
318 Data_Get_Struct(self
, struct posix_mq
, mq
);
320 if (need_valid
&& mq
->des
== MQD_INVALID
)
321 rb_raise(rb_eIOError
, "closed queue descriptor");
326 static void check_struct_type(VALUE astruct
)
328 if (CLASS_OF(astruct
) == cAttr
)
330 astruct
= rb_inspect(astruct
);
331 rb_raise(rb_eTypeError
, "not a POSIX_MQ::Attr: %s",
332 StringValuePtr(astruct
));
335 static void rstruct2mqattr(struct mq_attr
*attr
, VALUE astruct
, int all
)
339 check_struct_type(astruct
);
340 attr
->mq_flags
= NUM2LONG(rb_funcall(astruct
, id_flags
, 0));
342 tmp
= rb_funcall(astruct
, id_maxmsg
, 0);
343 if (all
|| !NIL_P(tmp
))
344 attr
->mq_maxmsg
= NUM2LONG(tmp
);
346 tmp
= rb_funcall(astruct
, id_msgsize
, 0);
347 if (all
|| !NIL_P(tmp
))
348 attr
->mq_msgsize
= NUM2LONG(tmp
);
350 tmp
= rb_funcall(astruct
, id_curmsgs
, 0);
352 attr
->mq_curmsgs
= NUM2LONG(tmp
);
357 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
359 * Opens a POSIX message queue given by +name+. +name+ should start
360 * with a slash ("/") for portable applications.
362 * If a Symbol is given in place of integer +flags+, then:
364 * * +:r+ is equivalent to IO::RDONLY
365 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
366 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
368 * +mode+ is an integer and only used when IO::CREAT is used.
369 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
370 * If +mq_attr+ is not specified when creating a queue, then the
371 * system defaults will be used.
373 * See the manpage for mq_open(3) for more details on this function.
375 static VALUE
init(int argc
, VALUE
*argv
, VALUE self
)
377 struct posix_mq
*mq
= get(self
, 0);
379 VALUE name
, oflags
, mode
, attr
;
381 rb_scan_args(argc
, argv
, "13", &name
, &oflags
, &mode
, &attr
);
383 switch (TYPE(oflags
)) {
390 else if (oflags
== sym_w
)
391 x
.oflags
= O_CREAT
|O_WRONLY
;
392 else if (oflags
== sym_rw
)
393 x
.oflags
= O_CREAT
|O_RDWR
;
395 oflags
= rb_inspect(oflags
);
396 rb_raise(rb_eArgError
,
397 "symbol must be :r, :w, or :rw: %s",
398 StringValuePtr(oflags
));
403 x
.oflags
= NUM2INT(oflags
);
406 rb_raise(rb_eArgError
, "flags must be an int, :r, :w, or :wr");
409 x
.name
= StringValueCStr(name
);
412 switch (TYPE(mode
)) {
415 x
.mode
= NUM2UINT(mode
);
418 if (x
.oflags
& O_CREAT
) {
424 rb_raise(rb_eArgError
, "mode not an integer");
427 switch (TYPE(attr
)) {
430 rstruct2mqattr(&x
.attr
, attr
, 1);
432 /* principle of least surprise */
433 if (x
.attr
.mq_flags
& O_NONBLOCK
)
434 x
.oflags
|= O_NONBLOCK
;
439 check_struct_type(attr
);
442 mq
->des
= (mqd_t
)xopen(&x
);
443 if (mq
->des
== MQD_INVALID
) {
450 mq
->des
= (mqd_t
)xopen(&x
);
452 if (mq
->des
== MQD_INVALID
)
453 rb_sys_fail("mq_open");
456 mq
->name
= rb_str_dup(name
);
457 if (x
.oflags
& O_NONBLOCK
)
458 mq
->attr
.mq_flags
= O_NONBLOCK
;
465 * POSIX_MQ.unlink(name) => 1
467 * Unlinks the message queue given by +name+. The queue will be destroyed
468 * when the last process with the queue open closes its queue descriptors.
470 static VALUE
s_unlink(VALUE self
, VALUE name
)
472 mqd_t rv
= mq_unlink(StringValueCStr(name
));
474 if (rv
== MQD_INVALID
)
475 rb_sys_fail("mq_unlink");
484 * Unlinks the message queue to prevent other processes from accessing it.
485 * All existing queue descriptors to this queue including those opened by
486 * other processes are unaffected. The queue will only be destroyed
487 * when the last process with open descriptors to this queue closes
490 static VALUE
_unlink(VALUE self
)
492 struct posix_mq
*mq
= get(self
, 0);
495 assert(TYPE(mq
->name
) == T_STRING
&& "mq->name is not a string");
497 rv
= mq_unlink(RSTRING_PTR(mq
->name
));
498 if (rv
== MQD_INVALID
)
499 rb_sys_fail("mq_unlink");
504 static void setup_send_buffer(struct rw_args
*x
, VALUE buffer
)
506 buffer
= rb_obj_as_string(buffer
);
507 x
->msg_ptr
= RSTRING_PTR(buffer
);
508 x
->msg_len
= (size_t)RSTRING_LEN(buffer
);
511 static VALUE
_send(int sflags
, int argc
, VALUE
*argv
, VALUE self
);
514 * mq.send(string [,priority[, timeout]]) => nil
516 * Inserts the given +string+ into the message queue with an optional,
517 * unsigned integer +priority+. If the optional +timeout+ is specified,
518 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
519 * before +timeout+ seconds has elapsed. Without +timeout+, this method
520 * may block until the queue is writable.
522 * On some older systems, the +timeout+ argument is not currently
523 * supported and may raise NotImplementedError if +timeout+ is used.
525 static VALUE
my_send(int argc
, VALUE
*argv
, VALUE self
)
527 _send(0, argc
, argv
, self
);
530 static VALUE
_send(int sflags
, int argc
, VALUE
*argv
, VALUE self
)
532 struct posix_mq
*mq
= get(self
, 1);
534 VALUE buffer
, prio
, timeout
;
536 struct timespec expire
;
538 rb_scan_args(argc
, argv
, "12", &buffer
, &prio
, &timeout
);
540 setup_send_buffer(&x
, buffer
);
542 x
.timeout
= convert_timeout(&expire
, timeout
);
543 x
.msg_prio
= NIL_P(prio
) ? 0 : NUM2UINT(prio
);
545 rv
= (mqd_t
)rb_thread_blocking_region(xsend
, &x
, RUBY_UBF_IO
, 0);
546 if (rv
== MQD_INVALID
) {
547 if (errno
== EAGAIN
&& (sflags
& PMQ_TRY
))
549 rb_sys_fail("mq_send");
552 return (sflags
& PMQ_TRY
) ? Qtrue
: Qnil
;
559 * Inserts the given +string+ into the message queue with a
560 * default priority of 0 and no timeout.
562 static VALUE
send0(VALUE self
, VALUE buffer
)
564 struct posix_mq
*mq
= get(self
, 1);
568 setup_send_buffer(&x
, buffer
);
573 rv
= (mqd_t
)rb_thread_blocking_region(xsend
, &x
, RUBY_UBF_IO
, 0);
574 if (rv
== MQD_INVALID
)
575 rb_sys_fail("mq_send");
585 * Returns an IO.select-able +IO+ object. This method is only available
586 * under Linux and FreeBSD and is not intended to be portable.
588 static VALUE
to_io(VALUE self
)
590 struct posix_mq
*mq
= get(self
, 1);
591 int fd
= MQD_TO_FD(mq
->des
);
594 mq
->io
= rb_funcall(rb_cIO
, id_new
, 1, INT2NUM(fd
));
600 static VALUE
_receive(int rflags
, int argc
, VALUE
*argv
, VALUE self
);
604 * mq.receive([buffer, [timeout]]) => [ message, priority ]
606 * Takes the highest priority message off the queue and returns
607 * an array containing the message as a String and the Integer
608 * priority of the message.
610 * If the optional +buffer+ is present, then it must be a String
611 * which will receive the data.
613 * If the optional +timeout+ is present, then it may be a Float
614 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
615 * will be raised if +timeout+ has elapsed and there are no messages
618 * On some older systems, the +timeout+ argument is not currently
619 * supported and may raise NotImplementedError if +timeout+ is used.
621 static VALUE
receive(int argc
, VALUE
*argv
, VALUE self
)
623 return _receive(PMQ_WANTARRAY
, argc
, argv
, self
);
628 * mq.shift([buffer, [timeout]]) => message
630 * Takes the highest priority message off the queue and returns
631 * the message as a String.
633 * If the optional +buffer+ is present, then it must be a String
634 * which will receive the data.
636 * If the optional +timeout+ is present, then it may be a Float
637 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
638 * will be raised if +timeout+ has elapsed and there are no messages
641 * On some older systems, the +timeout+ argument is not currently
642 * supported and may raise NotImplementedError if +timeout+ is used.
644 static VALUE
shift(int argc
, VALUE
*argv
, VALUE self
)
646 return _receive(0, argc
, argv
, self
);
649 static VALUE
_receive(int rflags
, int argc
, VALUE
*argv
, VALUE self
)
651 struct posix_mq
*mq
= get(self
, 1);
653 VALUE buffer
, timeout
;
655 struct timespec expire
;
657 if (mq
->attr
.mq_msgsize
< 0) {
658 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
659 rb_sys_fail("mq_getattr");
662 rb_scan_args(argc
, argv
, "02", &buffer
, &timeout
);
663 x
.timeout
= convert_timeout(&expire
, timeout
);
666 buffer
= rb_str_new(0, mq
->attr
.mq_msgsize
);
669 rb_str_modify(buffer
);
670 rb_str_resize(buffer
, mq
->attr
.mq_msgsize
);
673 x
.msg_ptr
= RSTRING_PTR(buffer
);
674 x
.msg_len
= (size_t)mq
->attr
.mq_msgsize
;
677 r
= (ssize_t
)rb_thread_blocking_region(xrecv
, &x
, RUBY_UBF_IO
, 0);
679 if (errno
== EAGAIN
&& (rflags
& PMQ_TRY
))
681 rb_sys_fail("mq_receive");
684 rb_str_set_len(buffer
, r
);
686 if (rflags
& PMQ_WANTARRAY
)
687 return rb_ary_new3(2, buffer
, UINT2NUM(x
.msg_prio
));
695 * Returns a POSIX_MQ::Attr struct containing the attributes
696 * of the message queue. See the mq_getattr(3) manpage for
699 static VALUE
getattr(VALUE self
)
701 struct posix_mq
*mq
= get(self
, 1);
704 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
705 rb_sys_fail("mq_getattr");
707 return rb_funcall(cAttr
, id_new
, 4,
708 LONG2NUM(mq
->attr
.mq_flags
),
709 LONG2NUM(mq
->attr
.mq_maxmsg
),
710 LONG2NUM(mq
->attr
.mq_msgsize
),
711 LONG2NUM(mq
->attr
.mq_curmsgs
));
716 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
718 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
719 * See the mq_setattr(3) manpage for more details.
721 * Consider using the POSIX_MQ#nonblock= method as it is easier and
722 * more natural to use.
724 static VALUE
setattr(VALUE self
, VALUE astruct
)
726 struct posix_mq
*mq
= get(self
, 1);
727 struct mq_attr newattr
;
729 rstruct2mqattr(&newattr
, astruct
, 0);
731 if (mq_setattr(mq
->des
, &newattr
, NULL
) < 0)
732 rb_sys_fail("mq_setattr");
741 * Closes the underlying message queue descriptor.
742 * If this descriptor had a registered notification request, the request
743 * will be removed so another descriptor or process may register a
744 * notification request. Message queue descriptors are automatically
745 * closed by garbage collection.
747 static VALUE
_close(VALUE self
)
749 struct posix_mq
*mq
= get(self
, 1);
751 if (! MQ_IO_CLOSE(mq
)) {
752 if (mq_close(mq
->des
) == -1)
753 rb_sys_fail("mq_close");
755 mq
->des
= MQD_INVALID
;
762 * mq.closed? => true or false
764 * Returns +true+ if the message queue descriptor is closed and therefore
765 * unusable, otherwise +false+
767 static VALUE
closed(VALUE self
)
769 struct posix_mq
*mq
= get(self
, 0);
771 return mq
->des
== MQD_INVALID
? Qtrue
: Qfalse
;
778 * Returns the string name of message queue associated with +mq+
780 static VALUE
name(VALUE self
)
782 struct posix_mq
*mq
= get(self
, 0);
784 return rb_str_dup(mq
->name
);
787 static int lookup_sig(VALUE sig
)
793 sig
= rb_obj_as_string(sig
);
794 len
= RSTRING_LEN(sig
);
795 ptr
= RSTRING_PTR(sig
);
797 if (len
> 3 && !memcmp("SIG", ptr
, 3))
798 sig
= rb_str_new(ptr
+ 3, len
- 3);
801 VALUE mSignal
= rb_const_get(rb_cObject
, rb_intern("Signal"));
803 list
= rb_funcall(mSignal
, rb_intern("list"), 0, 0);
804 rb_global_variable(&list
);
807 sig
= rb_hash_aref(list
, sig
);
809 rb_raise(rb_eArgError
, "invalid signal: %s\n", ptr
);
815 * TODO: Under Linux, we could just use netlink directly
816 * the same way glibc does...
818 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
819 static void thread_notify_fd(union sigval sv
)
821 int fd
= sv
.sival_int
;
823 while ((write(fd
, "", 1) < 0) && (errno
== EINTR
|| errno
== EAGAIN
));
826 static void my_mq_notify(mqd_t des
, struct sigevent
*not)
828 mqd_t rv
= mq_notify(des
, not);
830 if (rv
== MQD_INVALID
) {
831 if (errno
== ENOMEM
) {
833 rv
= mq_notify(des
, not);
835 if (rv
== MQD_INVALID
)
836 rb_sys_fail("mq_notify");
841 static VALUE
setnotify_exec(VALUE self
, VALUE io
, VALUE thr
)
843 int fd
= NUM2INT(rb_funcall(io
, id_fileno
, 0, 0));
844 struct posix_mq
*mq
= get(self
, 1);
848 errno
= pthread_attr_init(&attr
);
849 if (errno
) rb_sys_fail("pthread_attr_init");
851 errno
= pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_DETACHED
);
852 if (errno
) rb_sys_fail("pthread_attr_setdetachstate");
854 #ifdef PTHREAD_STACK_MIN
855 (void)pthread_attr_setstacksize(&attr
, PTHREAD_STACK_MIN
);
858 not.sigev_notify
= SIGEV_THREAD
;
859 not.sigev_notify_function
= thread_notify_fd
;
860 not.sigev_notify_attributes
= &attr
;
861 not.sigev_value
.sival_int
= fd
;
863 if (!NIL_P(mq
->thread
))
864 rb_funcall(mq
->thread
, id_kill
, 0, 0);
867 my_mq_notify(mq
->des
, ¬);
873 static VALUE
notify_cleanup(VALUE self
)
875 struct posix_mq
*mq
= get(self
, 1);
877 if (!NIL_P(mq
->thread
)) {
878 rb_funcall(mq
->thread
, id_kill
, 0, 0);
886 * mq.notify = signal => signal
888 * Registers the notification request to deliver a given +signal+
889 * to the current process when message is received.
890 * If +signal+ is +nil+, it will unregister and disable the notification
891 * request to allow other processes to register a request.
892 * If +signal+ is +false+, it will register a no-op notification request
893 * which will prevent other processes from registering a notification.
894 * If +signal+ is an +IO+ object, it will spawn a thread upon the
895 * arrival of the next message and write one "\\0" byte to the file
896 * descriptor belonging to that IO object.
897 * Only one process may have a notification request for a queue
898 * at a time, Errno::EBUSY will be raised if there is already
899 * a notification request registration for the queue.
901 * Notifications are only fired once and processes must reregister
902 * for subsequent notifications.
904 * For readers of the mq_notify(3) manpage, passing +false+
905 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
906 * of passing a NULL notification pointer to mq_notify(3).
908 static VALUE
setnotify(VALUE self
, VALUE arg
)
910 struct posix_mq
*mq
= get(self
, 1);
912 struct sigevent
* notification
= ¬
915 notify_cleanup(self
);
916 not.sigev_notify
= SIGEV_SIGNAL
;
920 not.sigev_notify
= SIGEV_NONE
;
926 not.sigev_signo
= NUM2INT(arg
);
930 not.sigev_signo
= lookup_sig(arg
);
931 rv
= INT2NUM(not.sigev_signo
);
934 rb_raise(rb_eArgError
, "must be a signal or nil");
937 my_mq_notify(mq
->des
, notification
);
944 * mq.nonblock? => true or false
946 * Returns the current non-blocking state of the message queue descriptor.
948 static VALUE
nonblock_p(VALUE self
)
950 struct posix_mq
*mq
= get(self
, 1);
952 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
953 rb_sys_fail("mq_getattr");
954 return mq
->attr
.mq_flags
& O_NONBLOCK
? Qtrue
: Qfalse
;
959 * mq.nonblock = boolean => boolean
961 * Enables or disables non-blocking operation for the message queue
962 * descriptor. Errno::EAGAIN will be raised in situations where
963 * the queue would block. This is not compatible with +timeout+
964 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
966 static VALUE
setnonblock(VALUE self
, VALUE nb
)
968 struct mq_attr newattr
;
969 struct posix_mq
*mq
= get(self
, 1);
972 newattr
.mq_flags
= O_NONBLOCK
;
973 else if (nb
== Qfalse
)
974 newattr
.mq_flags
= 0;
976 rb_raise(rb_eArgError
, "must be true or false");
978 if (mq_setattr(mq
->des
, &newattr
, &mq
->attr
) < 0)
979 rb_sys_fail("mq_setattr");
981 mq
->attr
.mq_flags
= newattr
.mq_flags
;
986 static VALUE
tryinit(int argc
, VALUE
*argv
, VALUE self
)
988 init(argc
, argv
, self
);
989 setnonblock(self
, Qtrue
);
996 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
998 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
999 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1000 * on success instead of +nil+.
1002 * This does not guarantee non-blocking behavior, the message queue must
1003 * be made non-blocking before calling this method.
1005 static VALUE
trysend(int argc
, VALUE
*argv
, VALUE self
)
1007 _send(PMQ_TRY
, argc
, argv
, self
);
1012 * mq.tryshift([buffer [, timeout]]) => message or nil
1014 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1015 * Errno::EAGAIN when non-blocking operation is desired.
1017 * This does not guarantee non-blocking behavior, the message queue must
1018 * be made non-blocking before calling this method.
1020 static VALUE
tryshift(int argc
, VALUE
*argv
, VALUE self
)
1022 return _receive(PMQ_TRY
, argc
, argv
, self
);
1027 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1029 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1030 * Errno::EAGAIN when non-blocking operation is desired.
1032 * This does not guarantee non-blocking behavior, the message queue must
1033 * be made non-blocking before calling this method.
1035 static VALUE
tryreceive(int argc
, VALUE
*argv
, VALUE self
)
1037 return _receive(PMQ_WANTARRAY
|PMQ_TRY
, argc
, argv
, self
);
1040 void Init_posix_mq_ext(void)
1042 VALUE cPOSIX_MQ
= rb_define_class("POSIX_MQ", rb_cObject
);
1043 rb_define_alloc_func(cPOSIX_MQ
, alloc
);
1044 cAttr
= rb_const_get(cPOSIX_MQ
, rb_intern("Attr"));
1047 * The maximum number of open message descriptors supported
1048 * by the system. This may be -1, in which case it is dynamically
1049 * set at runtime. Consult your operating system documentation
1050 * for system-specific information about this.
1052 rb_define_const(cPOSIX_MQ
, "OPEN_MAX",
1053 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX
)));
1056 * The maximum priority that may be specified for POSIX_MQ#send
1057 * On POSIX-compliant systems, this is at least 31, but some
1058 * systems allow higher limits.
1059 * The minimum priority is always zero.
1061 rb_define_const(cPOSIX_MQ
, "PRIO_MAX",
1062 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX
)));
1064 rb_define_singleton_method(cPOSIX_MQ
, "unlink", s_unlink
, 1);
1066 rb_define_method(cPOSIX_MQ
, "initialize", init
, -1);
1067 rb_define_method(cPOSIX_MQ
, "send", my_send
, -1);
1068 rb_define_method(cPOSIX_MQ
, "<<", send0
, 1);
1069 rb_define_method(cPOSIX_MQ
, "trysend", trysend
, -1);
1070 rb_define_method(cPOSIX_MQ
, "receive", receive
, -1);
1071 rb_define_method(cPOSIX_MQ
, "tryreceive", tryreceive
, -1);
1072 rb_define_method(cPOSIX_MQ
, "shift", shift
, -1);
1073 rb_define_method(cPOSIX_MQ
, "tryshift", tryshift
, -1);
1074 rb_define_method(cPOSIX_MQ
, "attr", getattr
, 0);
1075 rb_define_method(cPOSIX_MQ
, "attr=", setattr
, 1);
1076 rb_define_method(cPOSIX_MQ
, "close", _close
, 0);
1077 rb_define_method(cPOSIX_MQ
, "closed?", closed
, 0);
1078 rb_define_method(cPOSIX_MQ
, "unlink", _unlink
, 0);
1079 rb_define_method(cPOSIX_MQ
, "name", name
, 0);
1080 rb_define_method(cPOSIX_MQ
, "notify=", setnotify
, 1);
1081 rb_define_method(cPOSIX_MQ
, "nonblock=", setnonblock
, 1);
1082 rb_define_method(cPOSIX_MQ
, "notify_exec", setnotify_exec
, 2);
1083 rb_define_method(cPOSIX_MQ
, "notify_cleanup", notify_cleanup
, 0);
1084 rb_define_method(cPOSIX_MQ
, "nonblock?", nonblock_p
, 0);
1086 rb_define_method(cPOSIX_MQ
, "to_io", to_io
, 0);
1089 id_new
= rb_intern("new");
1090 id_kill
= rb_intern("kill");
1091 id_fileno
= rb_intern("fileno");
1092 id_mul
= rb_intern("*");
1093 id_divmod
= rb_intern("divmod");
1094 id_flags
= rb_intern("flags");
1095 id_maxmsg
= rb_intern("maxmsg");
1096 id_msgsize
= rb_intern("msgsize");
1097 id_curmsgs
= rb_intern("curmsgs");
1098 sym_r
= ID2SYM(rb_intern("r"));
1099 sym_w
= ID2SYM(rb_intern("w"));
1100 sym_rw
= ID2SYM(rb_intern("rw"));