1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
14 # define NUM2TIMET NUM2INT
18 # define RB_GC_GUARD(v) (*(volatile VALUE *)&(v))
31 #if defined(__linux__)
32 # define MQD_TO_FD(mqd) (int)(mqd)
33 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
34 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
36 # define MQ_IO_MARK(mq) ((void)(0))
37 # define MQ_IO_SET(mq,val) ((void)(0))
38 # define MQ_IO_CLOSE(mq) ((void)(0))
39 # define MQ_IO_NILP(mq) ((void)(1))
53 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
54 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
55 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
56 static int MQ_IO_CLOSE(struct posix_mq
*mq
)
61 /* not safe during GC */
69 static VALUE cPOSIX_MQ
, cAttr
;
70 static ID id_new
, id_kill
, id_fileno
, id_mul
, id_divmod
;
71 static ID sym_r
, sym_w
, sym_rw
;
72 static const mqd_t MQD_INVALID
= (mqd_t
)-1;
74 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
76 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
79 # define RSTRING_LEN(s) (RSTRING(s)->len)
82 # define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
85 # define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
88 #ifndef HAVE_RB_STR_SET_LEN
90 # define rb_str_set_len(str,len) rb_str_resize(str,len)
91 # else /* 1.8.6 optimized version */
92 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
93 static void rb_18_str_set_len(VALUE str
, long len
)
95 RSTRING(str
)->len
= len
;
96 RSTRING(str
)->ptr
[len
] = '\0';
98 # define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
99 # endif /* ! RUBINIUS */
100 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
102 #ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
103 static VALUE
rb_struct_alloc_noinit(VALUE
class)
105 return rb_funcall(class, id_new
, 0, 0);
107 #endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
109 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
110 #ifndef HAVE_RB_THREAD_BLOCKING_REGION
111 # include <rubysig.h>
112 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
113 typedef void rb_unblock_function_t(void *);
114 typedef VALUE
rb_blocking_function_t(void *);
116 rb_thread_blocking_region(
117 rb_blocking_function_t
*func
, void *data1
,
118 rb_unblock_function_t
*ubf
, void *data2
)
122 assert(RUBY_UBF_IO
== ubf
&& "RUBY_UBF_IO required for emulation");
130 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
132 /* used to pass arguments to mq_open inside blocking region */
141 /* used to pass arguments to mq_send/mq_receive inside blocking region */
147 struct timespec
*timeout
;
150 static void num2timespec(struct timespec
*ts
, VALUE t
)
155 ts
->tv_sec
= NUM2TIMET(t
);
160 double val
= RFLOAT_VALUE(t
);
164 ts
->tv_nsec
= (long)(d
* 1e9
+ 0.5);
166 ts
->tv_nsec
= (long)(-d
* 1e9
+ 0.5);
167 if (ts
->tv_nsec
> 0) {
168 ts
->tv_nsec
= 1000000000 - ts
->tv_nsec
;
172 ts
->tv_sec
= (time_t)f
;
174 rb_raise(rb_eRangeError
, "%f out of range", val
);
175 ts
->tv_sec
= (time_t)f
;
180 VALUE ary
= rb_funcall(t
, id_divmod
, 1, INT2FIX(1));
182 Check_Type(ary
, T_ARRAY
);
184 ts
->tv_sec
= NUM2TIMET(rb_ary_entry(ary
, 0));
185 f
= rb_ary_entry(ary
, 1);
186 f
= rb_funcall(f
, id_mul
, 1, INT2FIX(1000000000));
187 ts
->tv_nsec
= NUM2LONG(f
);
192 static struct timespec
*convert_timeout(struct timespec
*dest
, VALUE t
)
194 struct timespec ts
, now
;
199 num2timespec(&ts
, t
);
200 clock_gettime(CLOCK_REALTIME
, &now
);
201 dest
->tv_sec
= now
.tv_sec
+ ts
.tv_sec
;
202 dest
->tv_nsec
= now
.tv_nsec
+ ts
.tv_nsec
;
204 if (dest
->tv_nsec
> 1000000000) {
205 dest
->tv_nsec
-= 1000000000;
212 /* (may) run without GVL */
213 static VALUE
xopen(void *ptr
)
215 struct open_args
*x
= ptr
;
219 case 2: rv
= mq_open(x
->name
, x
->oflags
); break;
220 case 3: rv
= mq_open(x
->name
, x
->oflags
, x
->mode
, NULL
); break;
221 case 4: rv
= mq_open(x
->name
, x
->oflags
, x
->mode
, &x
->attr
); break;
222 default: rv
= MQD_INVALID
;
228 /* runs without GVL */
229 static VALUE
xsend(void *ptr
)
231 struct rw_args
*x
= ptr
;
234 return (VALUE
)mq_timedsend(x
->des
, x
->msg_ptr
, x
->msg_len
,
235 x
->msg_prio
, x
->timeout
);
237 return (VALUE
)mq_send(x
->des
, x
->msg_ptr
, x
->msg_len
, x
->msg_prio
);
240 /* runs without GVL */
241 static VALUE
xrecv(void *ptr
)
243 struct rw_args
*x
= ptr
;
246 return (VALUE
)mq_timedreceive(x
->des
, x
->msg_ptr
, x
->msg_len
,
247 &x
->msg_prio
, x
->timeout
);
249 return (VALUE
)mq_receive(x
->des
, x
->msg_ptr
, x
->msg_len
, &x
->msg_prio
);
253 static void mark(void *ptr
)
255 struct posix_mq
*mq
= ptr
;
257 rb_gc_mark(mq
->name
);
258 rb_gc_mark(mq
->thread
);
263 static void _free(void *ptr
)
265 struct posix_mq
*mq
= ptr
;
267 if (mq
->des
!= MQD_INVALID
&& MQ_IO_NIL_P(mq
)) {
268 /* we ignore errors when gc-ing */
275 /* automatically called at creation (before initialize) */
276 static VALUE
alloc(VALUE klass
)
279 VALUE rv
= Data_Make_Struct(klass
, struct posix_mq
, mark
, _free
, mq
);
281 mq
->des
= MQD_INVALID
;
282 mq
->attr
.mq_flags
= 0;
283 mq
->attr
.mq_maxmsg
= 0;
284 mq
->attr
.mq_msgsize
= -1;
285 mq
->attr
.mq_curmsgs
= 0;
293 /* unwraps the posix_mq struct from self */
294 static struct posix_mq
*get(VALUE self
, int need_valid
)
298 Data_Get_Struct(self
, struct posix_mq
, mq
);
300 if (need_valid
&& mq
->des
== MQD_INVALID
)
301 rb_raise(rb_eIOError
, "closed queue descriptor");
306 /* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
307 static void attr_from_struct(struct mq_attr
*attr
, VALUE astruct
, int all
)
311 if (CLASS_OF(astruct
) != cAttr
) {
312 RB_GC_GUARD(astruct
) = rb_inspect(astruct
);
313 rb_raise(rb_eArgError
, "not a POSIX_MQ::Attr: %s",
314 RSTRING_PTR(astruct
));
317 ptr
= RSTRUCT_PTR(astruct
);
319 attr
->mq_flags
= NUM2LONG(ptr
[0]);
321 if (all
|| !NIL_P(ptr
[1]))
322 attr
->mq_maxmsg
= NUM2LONG(ptr
[1]);
323 if (all
|| !NIL_P(ptr
[2]))
324 attr
->mq_msgsize
= NUM2LONG(ptr
[2]);
326 attr
->mq_curmsgs
= NUM2LONG(ptr
[3]);
331 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
333 * Opens a POSIX message queue given by +name+. +name+ should start
334 * with a slash ("/") for portable applications.
336 * If a Symbol is given in place of integer +flags+, then:
338 * * +:r+ is equivalent to IO::RDONLY
339 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
340 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
342 * +mode+ is an integer and only used when IO::CREAT is used.
343 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
344 * If +mq_attr+ is not specified when creating a queue, then the
345 * system defaults will be used.
347 * See the manpage for mq_open(3) for more details on this function.
349 static VALUE
init(int argc
, VALUE
*argv
, VALUE self
)
351 struct posix_mq
*mq
= get(self
, 0);
353 VALUE name
, oflags
, mode
, attr
;
355 rb_scan_args(argc
, argv
, "13", &name
, &oflags
, &mode
, &attr
);
357 if (TYPE(name
) != T_STRING
)
358 rb_raise(rb_eArgError
, "name must be a string");
360 switch (TYPE(oflags
)) {
367 else if (oflags
== sym_w
)
368 x
.oflags
= O_CREAT
|O_WRONLY
;
369 else if (oflags
== sym_rw
)
370 x
.oflags
= O_CREAT
|O_RDWR
;
372 RB_GC_GUARD(oflags
) = oflags
;
373 rb_raise(rb_eArgError
,
374 "symbol must be :r, :w, or :rw: %s",
375 RSTRING_PTR(oflags
));
380 x
.oflags
= NUM2INT(oflags
);
383 rb_raise(rb_eArgError
, "flags must be an int, :r, :w, or :wr");
386 x
.name
= RSTRING_PTR(name
);
389 switch (TYPE(mode
)) {
392 x
.mode
= NUM2UINT(mode
);
395 if (x
.oflags
& O_CREAT
) {
401 rb_raise(rb_eArgError
, "mode not an integer");
404 switch (TYPE(attr
)) {
407 attr_from_struct(&x
.attr
, attr
, 1);
409 /* principle of least surprise */
410 if (x
.attr
.mq_flags
& O_NONBLOCK
)
411 x
.oflags
|= O_NONBLOCK
;
416 RB_GC_GUARD(attr
) = rb_inspect(attr
);
417 rb_raise(rb_eArgError
, "attr must be a POSIX_MQ::Attr: %s",
421 mq
->des
= (mqd_t
)xopen(&x
);
422 if (mq
->des
== MQD_INVALID
) {
423 if (errno
== ENOMEM
|| errno
== EMFILE
|| errno
== ENFILE
) {
425 mq
->des
= (mqd_t
)xopen(&x
);
427 if (mq
->des
== MQD_INVALID
)
428 rb_sys_fail("mq_open");
431 mq
->name
= rb_str_dup(name
);
432 if (x
.oflags
& O_NONBLOCK
)
433 mq
->attr
.mq_flags
= O_NONBLOCK
;
440 * POSIX_MQ.unlink(name) => 1
442 * Unlinks the message queue given by +name+. The queue will be destroyed
443 * when the last process with the queue open closes its queue descriptors.
445 static VALUE
s_unlink(VALUE self
, VALUE name
)
449 if (TYPE(name
) != T_STRING
)
450 rb_raise(rb_eArgError
, "argument must be a string");
452 rv
= mq_unlink(RSTRING_PTR(name
));
453 if (rv
== MQD_INVALID
)
454 rb_sys_fail("mq_unlink");
463 * Unlinks the message queue to prevent other processes from accessing it.
464 * All existing queue descriptors to this queue including those opened by
465 * other processes are unaffected. The queue will only be destroyed
466 * when the last process with open descriptors to this queue closes
469 static VALUE
_unlink(VALUE self
)
471 struct posix_mq
*mq
= get(self
, 0);
474 assert(TYPE(mq
->name
) == T_STRING
&& "mq->name is not a string");
476 rv
= mq_unlink(RSTRING_PTR(mq
->name
));
477 if (rv
== MQD_INVALID
)
478 rb_sys_fail("mq_unlink");
483 static void setup_send_buffer(struct rw_args
*x
, VALUE buffer
)
485 buffer
= rb_obj_as_string(buffer
);
486 x
->msg_ptr
= RSTRING_PTR(buffer
);
487 x
->msg_len
= (size_t)RSTRING_LEN(buffer
);
492 * mq.send(string [,priority[, timeout]]) => nil
494 * Inserts the given +string+ into the message queue with an optional,
495 * unsigned integer +priority+. If the optional +timeout+ is specified,
496 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
497 * before +timeout+ seconds has elapsed. Without +timeout+, this method
498 * may block until the queue is writable.
500 static VALUE
_send(int argc
, VALUE
*argv
, VALUE self
)
502 struct posix_mq
*mq
= get(self
, 1);
504 VALUE buffer
, prio
, timeout
;
506 struct timespec expire
;
508 rb_scan_args(argc
, argv
, "12", &buffer
, &prio
, &timeout
);
510 setup_send_buffer(&x
, buffer
);
512 x
.timeout
= convert_timeout(&expire
, timeout
);
513 x
.msg_prio
= NIL_P(prio
) ? 0 : NUM2UINT(prio
);
515 if (mq
->attr
.mq_flags
& O_NONBLOCK
)
516 rv
= (mqd_t
)xsend(&x
);
518 rv
= (mqd_t
)rb_thread_blocking_region(xsend
, &x
,
520 if (rv
== MQD_INVALID
)
521 rb_sys_fail("mq_send");
530 * Inserts the given +string+ into the message queue with a
531 * default priority of 0 and no timeout.
533 static VALUE
send0(VALUE self
, VALUE buffer
)
535 struct posix_mq
*mq
= get(self
, 1);
539 setup_send_buffer(&x
, buffer
);
544 if (mq
->attr
.mq_flags
& O_NONBLOCK
)
545 rv
= (mqd_t
)xsend(&x
);
547 rv
= (mqd_t
)rb_thread_blocking_region(xsend
, &x
,
550 if (rv
== MQD_INVALID
)
551 rb_sys_fail("mq_send");
561 * Returns an IO.select-able +IO+ object. This method is only available
562 * under Linux and FreeBSD and is not intended to be portable.
564 static VALUE
to_io(VALUE self
)
566 struct posix_mq
*mq
= get(self
, 1);
567 int fd
= MQD_TO_FD(mq
->des
);
570 mq
->io
= rb_funcall(rb_cIO
, id_new
, 1, INT2NUM(fd
));
576 static VALUE
_receive(int wantarray
, int argc
, VALUE
*argv
, VALUE self
);
580 * mq.receive([buffer, [timeout]]) => [ message, priority ]
582 * Takes the highest priority message off the queue and returns
583 * an array containing the message as a String and the Integer
584 * priority of the message.
586 * If the optional +buffer+ is present, then it must be a String
587 * which will receive the data.
589 * If the optional +timeout+ is present, then it may be a Float
590 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
591 * will be raised if +timeout+ has elapsed and there are no messages
594 static VALUE
receive(int argc
, VALUE
*argv
, VALUE self
)
596 return _receive(1, argc
, argv
, self
);
601 * mq.shift([buffer, [timeout]]) => message
603 * Takes the highest priority message off the queue and returns
604 * the message as a String.
606 * If the optional +buffer+ is present, then it must be a String
607 * which will receive the data.
609 * If the optional +timeout+ is present, then it may be a Float
610 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
611 * will be raised if +timeout+ has elapsed and there are no messages
614 static VALUE
shift(int argc
, VALUE
*argv
, VALUE self
)
616 return _receive(0, argc
, argv
, self
);
619 static VALUE
_receive(int wantarray
, int argc
, VALUE
*argv
, VALUE self
)
621 struct posix_mq
*mq
= get(self
, 1);
623 VALUE buffer
, timeout
;
625 struct timespec expire
;
627 if (mq
->attr
.mq_msgsize
< 0) {
628 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
629 rb_sys_fail("mq_getattr");
632 rb_scan_args(argc
, argv
, "02", &buffer
, &timeout
);
633 x
.timeout
= convert_timeout(&expire
, timeout
);
636 buffer
= rb_str_new(0, mq
->attr
.mq_msgsize
);
639 rb_str_modify(buffer
);
640 rb_str_resize(buffer
, mq
->attr
.mq_msgsize
);
643 x
.msg_ptr
= RSTRING_PTR(buffer
);
644 x
.msg_len
= (size_t)mq
->attr
.mq_msgsize
;
647 if (mq
->attr
.mq_flags
& O_NONBLOCK
) {
648 r
= (ssize_t
)xrecv(&x
);
650 r
= (ssize_t
)rb_thread_blocking_region(xrecv
, &x
,
654 rb_sys_fail("mq_receive");
656 rb_str_set_len(buffer
, r
);
659 return rb_ary_new3(2, buffer
, UINT2NUM(x
.msg_prio
));
667 * Returns a POSIX_MQ::Attr struct containing the attributes
668 * of the message queue. See the mq_getattr(3) manpage for
671 static VALUE
getattr(VALUE self
)
673 struct posix_mq
*mq
= get(self
, 1);
677 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
678 rb_sys_fail("mq_getattr");
680 astruct
= rb_struct_alloc_noinit(cAttr
);
681 ptr
= RSTRUCT_PTR(astruct
);
682 ptr
[0] = LONG2NUM(mq
->attr
.mq_flags
);
683 ptr
[1] = LONG2NUM(mq
->attr
.mq_maxmsg
);
684 ptr
[2] = LONG2NUM(mq
->attr
.mq_msgsize
);
685 ptr
[3] = LONG2NUM(mq
->attr
.mq_curmsgs
);
692 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
694 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
695 * See the mq_setattr(3) manpage for more details.
697 * Consider using the POSIX_MQ#nonblock= method as it is easier and
698 * more natural to use.
700 static VALUE
setattr(VALUE self
, VALUE astruct
)
702 struct posix_mq
*mq
= get(self
, 1);
703 struct mq_attr newattr
;
705 attr_from_struct(&newattr
, astruct
, 0);
707 if (mq_setattr(mq
->des
, &newattr
, NULL
) < 0)
708 rb_sys_fail("mq_setattr");
717 * Closes the underlying message queue descriptor.
718 * If this descriptor had a registered notification request, the request
719 * will be removed so another descriptor or process may register a
720 * notification request. Message queue descriptors are automatically
721 * closed by garbage collection.
723 static VALUE
_close(VALUE self
)
725 struct posix_mq
*mq
= get(self
, 1);
727 if (! MQ_IO_CLOSE(mq
)) {
728 if (mq_close(mq
->des
) == -1)
729 rb_sys_fail("mq_close");
731 mq
->des
= MQD_INVALID
;
738 * mq.closed? => true or false
740 * Returns +true+ if the message queue descriptor is closed and therefore
741 * unusable, otherwise +false+
743 static VALUE
closed(VALUE self
)
745 struct posix_mq
*mq
= get(self
, 0);
747 return mq
->des
== MQD_INVALID
? Qtrue
: Qfalse
;
754 * Returns the string name of message queue associated with +mq+
756 static VALUE
name(VALUE self
)
758 struct posix_mq
*mq
= get(self
, 0);
760 return rb_str_dup(mq
->name
);
763 static int lookup_sig(VALUE sig
)
769 sig
= rb_obj_as_string(sig
);
770 len
= RSTRING_LEN(sig
);
771 ptr
= RSTRING_PTR(sig
);
773 if (len
> 3 && !memcmp("SIG", ptr
, 3))
774 sig
= rb_str_new(ptr
+ 3, len
- 3);
777 VALUE mSignal
= rb_const_get(rb_cObject
, rb_intern("Signal"));
779 list
= rb_funcall(mSignal
, rb_intern("list"), 0, 0);
780 rb_global_variable(&list
);
783 sig
= rb_hash_aref(list
, sig
);
785 rb_raise(rb_eArgError
, "invalid signal: %s\n", ptr
);
791 * TODO: Under Linux, we could just use netlink directly
792 * the same way glibc does...
794 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
795 static void thread_notify_fd(union sigval sv
)
797 int fd
= sv
.sival_int
;
799 while ((write(fd
, "", 1) < 0) && (errno
== EINTR
|| errno
== EAGAIN
));
802 static void my_mq_notify(mqd_t des
, struct sigevent
*not)
804 mqd_t rv
= mq_notify(des
, not);
806 if (rv
== MQD_INVALID
) {
807 if (errno
== ENOMEM
) {
809 rv
= mq_notify(des
, not);
811 if (rv
== MQD_INVALID
)
812 rb_sys_fail("mq_notify");
817 static VALUE
setnotify_exec(VALUE self
, VALUE io
, VALUE thr
)
819 int fd
= NUM2INT(rb_funcall(io
, id_fileno
, 0, 0));
820 struct posix_mq
*mq
= get(self
, 1);
824 errno
= pthread_attr_init(&attr
);
825 if (errno
) rb_sys_fail("pthread_attr_init");
827 errno
= pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_DETACHED
);
828 if (errno
) rb_sys_fail("pthread_attr_setdetachstate");
830 #ifdef PTHREAD_STACK_MIN
831 (void)pthread_attr_setstacksize(&attr
, PTHREAD_STACK_MIN
);
834 not.sigev_notify
= SIGEV_THREAD
;
835 not.sigev_notify_function
= thread_notify_fd
;
836 not.sigev_notify_attributes
= &attr
;
837 not.sigev_value
.sival_int
= fd
;
839 if (!NIL_P(mq
->thread
))
840 rb_funcall(mq
->thread
, id_kill
, 0, 0);
843 my_mq_notify(mq
->des
, ¬);
849 static VALUE
notify_cleanup(VALUE self
)
851 struct posix_mq
*mq
= get(self
, 1);
853 if (!NIL_P(mq
->thread
)) {
854 rb_funcall(mq
->thread
, id_kill
, 0, 0);
862 * mq.notify = signal => signal
864 * Registers the notification request to deliver a given +signal+
865 * to the current process when message is received.
866 * If +signal+ is +nil+, it will unregister and disable the notification
867 * request to allow other processes to register a request.
868 * If +signal+ is +false+, it will register a no-op notification request
869 * which will prevent other processes from registering a notification.
870 * If +signal+ is an +IO+ object, it will spawn a thread upon the
871 * arrival of the next message and write one "\\0" byte to the file
872 * descriptor belonging to that IO object.
873 * Only one process may have a notification request for a queue
874 * at a time, Errno::EBUSY will be raised if there is already
875 * a notification request registration for the queue.
877 * Notifications are only fired once and processes must reregister
878 * for subsequent notifications.
880 * For readers of the mq_notify(3) manpage, passing +false+
881 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
882 * of passing a NULL notification pointer to mq_notify(3).
884 static VALUE
setnotify(VALUE self
, VALUE arg
)
886 struct posix_mq
*mq
= get(self
, 1);
888 struct sigevent
* notification
= ¬
891 notify_cleanup(self
);
892 not.sigev_notify
= SIGEV_SIGNAL
;
896 not.sigev_notify
= SIGEV_NONE
;
902 not.sigev_signo
= NUM2INT(arg
);
906 not.sigev_signo
= lookup_sig(arg
);
907 rv
= INT2NUM(not.sigev_signo
);
910 rb_raise(rb_eArgError
, "must be a signal or nil");
913 my_mq_notify(mq
->des
, notification
);
920 * mq.nonblock? => true or false
922 * Returns the current non-blocking state of the message queue descriptor.
924 static VALUE
getnonblock(VALUE self
)
926 struct posix_mq
*mq
= get(self
, 1);
928 return mq
->attr
.mq_flags
& O_NONBLOCK
? Qtrue
: Qfalse
;
933 * mq.nonblock = boolean => boolean
935 * Enables or disables non-blocking operation for the message queue
936 * descriptor. Errno::EAGAIN will be raised in situations where
937 * the queue would block. This is not compatible with +timeout+
938 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
940 static VALUE
setnonblock(VALUE self
, VALUE nb
)
942 struct mq_attr newattr
;
943 struct posix_mq
*mq
= get(self
, 1);
946 newattr
.mq_flags
= O_NONBLOCK
;
947 else if (nb
== Qfalse
)
948 newattr
.mq_flags
= 0;
950 rb_raise(rb_eArgError
, "must be true or false");
952 if (mq_setattr(mq
->des
, &newattr
, &mq
->attr
) < 0)
953 rb_sys_fail("mq_setattr");
955 mq
->attr
.mq_flags
= newattr
.mq_flags
;
960 void Init_posix_mq_ext(void)
962 cPOSIX_MQ
= rb_define_class("POSIX_MQ", rb_cObject
);
963 rb_define_alloc_func(cPOSIX_MQ
, alloc
);
964 cAttr
= rb_const_get(cPOSIX_MQ
, rb_intern("Attr"));
967 * The maximum number of open message descriptors supported
968 * by the system. This may be -1, in which case it is dynamically
969 * set at runtime. Consult your operating system documentation
970 * for system-specific information about this.
972 rb_define_const(cPOSIX_MQ
, "OPEN_MAX",
973 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX
)));
976 * The maximum priority that may be specified for POSIX_MQ#send
977 * On POSIX-compliant systems, this is at least 31, but some
978 * systems allow higher limits.
979 * The minimum priority is always zero.
981 rb_define_const(cPOSIX_MQ
, "PRIO_MAX",
982 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX
)));
984 rb_define_singleton_method(cPOSIX_MQ
, "unlink", s_unlink
, 1);
986 rb_define_method(cPOSIX_MQ
, "initialize", init
, -1);
987 rb_define_method(cPOSIX_MQ
, "send", _send
, -1);
988 rb_define_method(cPOSIX_MQ
, "<<", send0
, 1);
989 rb_define_method(cPOSIX_MQ
, "receive", receive
, -1);
990 rb_define_method(cPOSIX_MQ
, "shift", shift
, -1);
991 rb_define_method(cPOSIX_MQ
, "attr", getattr
, 0);
992 rb_define_method(cPOSIX_MQ
, "attr=", setattr
, 1);
993 rb_define_method(cPOSIX_MQ
, "close", _close
, 0);
994 rb_define_method(cPOSIX_MQ
, "closed?", closed
, 0);
995 rb_define_method(cPOSIX_MQ
, "unlink", _unlink
, 0);
996 rb_define_method(cPOSIX_MQ
, "name", name
, 0);
997 rb_define_method(cPOSIX_MQ
, "notify=", setnotify
, 1);
998 rb_define_method(cPOSIX_MQ
, "nonblock=", setnonblock
, 1);
999 rb_define_method(cPOSIX_MQ
, "notify_exec", setnotify_exec
, 2);
1000 rb_define_method(cPOSIX_MQ
, "notify_cleanup", notify_cleanup
, 0);
1001 rb_define_method(cPOSIX_MQ
, "nonblock?", getnonblock
, 0);
1003 rb_define_method(cPOSIX_MQ
, "to_io", to_io
, 0);
1006 id_new
= rb_intern("new");
1007 id_kill
= rb_intern("kill");
1008 id_fileno
= rb_intern("fileno");
1009 id_mul
= rb_intern("*");
1010 id_divmod
= rb_intern("divmod");
1011 sym_r
= ID2SYM(rb_intern("r"));
1012 sym_w
= ID2SYM(rb_intern("w"));
1013 sym_rw
= ID2SYM(rb_intern("rw"));