1 #define _XOPEN_SOURCE 600
2 #ifdef HAVE_SYS_SELECT_H
3 # include <sys/select.h>
14 # define NUM2TIMET NUM2INT
27 #if defined(__linux__)
28 # define MQD_TO_FD(mqd) (int)(mqd)
29 # define FD_TO_MQD(fd) (mqd_t)(fd)
30 #elif defined(HAVE___MQ_OSHANDLE) /* FreeBSD */
31 # define MQD_TO_FD(mqd) __mq_oshandle(mqd)
33 # define MQ_IO_MARK(mq) ((void)(0))
34 # define MQ_IO_SET(mq,val) ((void)(0))
35 # define MQ_IO_CLOSE(mq) ((int)(0))
36 # define MQ_IO_NIL_P(mq) ((int)(1))
37 # define MQ_IO_SET_AUTOCLOSE(mq, boolean) for(;0;)
52 static ID id_setautoclose
;
53 # define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
54 # define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
55 # define MQ_IO_NIL_P(mq) NIL_P((mq)->io)
57 static void MQ_IO_SET_AUTOCLOSE(struct posix_mq
*mq
, VALUE boolean
)
60 rb_funcall(mq
->io
, id_setautoclose
, 1, boolean
);
63 static int MQ_IO_CLOSE(struct posix_mq
*mq
)
68 /* not safe during GC */
76 # define PMQ_WANTARRAY (1<<0)
77 # define PMQ_TRY (1<<1)
80 static ID id_new
, id_kill
, id_fileno
, id_divmod
;
81 static ID id_flags
, id_maxmsg
, id_msgsize
, id_curmsgs
;
82 static VALUE sym_r
, sym_w
, sym_rw
;
83 static const mqd_t MQD_INVALID
= (mqd_t
)-1;
85 /* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
87 # define RSTRING_PTR(s) (RSTRING(s)->ptr)
90 # define RSTRING_LEN(s) (RSTRING(s)->len)
93 # define RFLOAT_VALUE(f) (RFLOAT(f)->value)
96 #ifndef HAVE_RB_STR_SET_LEN
97 /* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
99 # error upgrade Rubinius, rb_str_set_len should be available
101 static void rb_18_str_set_len(VALUE str
, long len
)
103 RSTRING(str
)->len
= len
;
104 RSTRING(str
)->ptr
[len
] = '\0';
106 #define rb_str_set_len rb_18_str_set_len
107 #endif /* !defined(HAVE_RB_STR_SET_LEN) */
109 #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
111 # include <ruby/thread.h>
112 # define WITHOUT_GVL(fn,a,ubf,b) \
113 rb_thread_call_without_gvl((fn),(a),(ubf),(b))
114 #elif defined(HAVE_RB_THREAD_BLOCKING_REGION)
115 typedef VALUE (*my_blocking_fn_t
)(void*);
116 # define WITHOUT_GVL(fn,a,ubf,b) \
117 rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b))
120 /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
121 # include <rubysig.h>
122 # define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
123 typedef void rb_unblock_function_t(void *);
124 typedef void * rb_blocking_function_t(void *);
125 static void * WITHOUT_GVL(rb_blocking_function_t
*func
, void *data1
,
126 rb_unblock_function_t
*ubf
, void *data2
)
130 assert(RUBY_UBF_IO
== ubf
&& "RUBY_UBF_IO required for emulation");
138 #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
140 /* used to pass arguments to mq_open inside blocking region */
150 /* used to pass arguments to mq_send/mq_receive inside blocking region */
160 struct timespec
*timeout
;
163 #ifndef HAVE_MQ_TIMEDSEND
165 not_timedsend(mqd_t mqdes
, const char *msg_ptr
,
166 size_t msg_len
, unsigned msg_prio
,
167 const struct timespec
*abs_timeout
)
169 rb_bug("mq_timedsend workaround failed");
172 # define mq_timedsend not_timedsend
174 #ifndef HAVE_MQ_TIMEDRECEIVE
176 not_timedreceive(mqd_t mqdes
, char *msg_ptr
,
177 size_t msg_len
, unsigned *msg_prio
,
178 const struct timespec
*abs_timeout
)
180 rb_bug("mq_timedreceive workaround failed");
183 # define mq_timedreceive not_timedreceive
186 #if defined(HAVE_MQ_TIMEDRECEIVE) && defined(HAVE_MQ_TIMEDSEND)
187 static void num2timespec(struct timespec
*ts
, VALUE t
)
192 ts
->tv_sec
= NUM2TIMET(t
);
197 double val
= RFLOAT_VALUE(t
);
201 ts
->tv_nsec
= (long)(d
* 1e9
+ 0.5);
203 ts
->tv_nsec
= (long)(-d
* 1e9
+ 0.5);
204 if (ts
->tv_nsec
> 0) {
205 ts
->tv_nsec
= 1000000000 - ts
->tv_nsec
;
209 ts
->tv_sec
= (time_t)f
;
211 rb_raise(rb_eRangeError
, "%f out of range", val
);
216 VALUE ary
= rb_funcall(t
, id_divmod
, 1, INT2FIX(1));
218 Check_Type(ary
, T_ARRAY
);
220 ts
->tv_sec
= NUM2TIMET(rb_ary_entry(ary
, 0));
221 f
= rb_ary_entry(ary
, 1);
222 f
= rb_funcall(f
, '*', 1, INT2FIX(1000000000));
223 ts
->tv_nsec
= NUM2LONG(f
);
228 static void num2timespec(struct timespec
*ts
, VALUE t
)
230 rb_raise(rb_eNotImpError
,
231 "mq_timedsend and/or mq_timedreceive missing");
235 static struct timespec
*convert_timeout(struct timespec
*dest
, VALUE t
)
237 struct timespec ts
, now
;
242 num2timespec(&ts
, t
);
243 clock_gettime(CLOCK_REALTIME
, &now
);
244 dest
->tv_sec
= now
.tv_sec
+ ts
.tv_sec
;
245 dest
->tv_nsec
= now
.tv_nsec
+ ts
.tv_nsec
;
247 if (dest
->tv_nsec
> 1000000000) {
248 dest
->tv_nsec
-= 1000000000;
255 /* (may) run without GVL */
256 static void * xopen(void *ptr
)
258 struct open_args
*x
= ptr
;
261 case 2: x
->des
= mq_open(x
->name
, x
->oflags
); break;
262 case 3: x
->des
= mq_open(x
->name
, x
->oflags
, x
->mode
, NULL
); break;
263 case 4: x
->des
= mq_open(x
->name
, x
->oflags
, x
->mode
, &x
->attr
); break;
264 default: x
->des
= MQD_INVALID
;
270 /* runs without GVL */
271 static void *xsend(void *ptr
)
273 struct rw_args
*x
= ptr
;
275 x
->retval
= x
->timeout
?
276 mq_timedsend(x
->des
, x
->msg_ptr
, x
->msg_len
,
277 x
->msg_prio
, x
->timeout
) :
278 mq_send(x
->des
, x
->msg_ptr
, x
->msg_len
, x
->msg_prio
);
283 /* runs without GVL */
284 static void * xrecv(void *ptr
)
286 struct rw_args
*x
= ptr
;
288 x
->received
= x
->timeout
?
289 mq_timedreceive(x
->des
, x
->msg_ptr
, x
->msg_len
,
290 &x
->msg_prio
, x
->timeout
) :
291 mq_receive(x
->des
, x
->msg_ptr
, x
->msg_len
, &x
->msg_prio
);
297 static void mark(void *ptr
)
299 struct posix_mq
*mq
= ptr
;
301 rb_gc_mark(mq
->name
);
302 rb_gc_mark(mq
->thread
);
307 static void _free(void *ptr
)
309 struct posix_mq
*mq
= ptr
;
311 if (mq
->des
!= MQD_INVALID
&& MQ_IO_NIL_P(mq
)) {
312 /* we ignore errors when gc-ing */
320 static size_t memsize(const void *ptr
)
322 return sizeof(struct posix_mq
);
325 static const rb_data_type_t mqtype
= {
327 { mark
, _free
, memsize
, /* reserved */ },
328 /* parent, data, [ flags ] */
331 /* automatically called at creation (before initialize) */
332 static VALUE
alloc(VALUE klass
)
335 VALUE rv
= TypedData_Make_Struct(klass
, struct posix_mq
, &mqtype
, mq
);
337 mq
->des
= MQD_INVALID
;
339 mq
->attr
.mq_flags
= 0;
340 mq
->attr
.mq_maxmsg
= 0;
341 mq
->attr
.mq_msgsize
= -1;
342 mq
->attr
.mq_curmsgs
= 0;
350 /* unwraps the posix_mq struct from self */
351 static struct posix_mq
*get(VALUE self
, int need_valid
)
355 TypedData_Get_Struct(self
, struct posix_mq
, &mqtype
, mq
);
357 if (need_valid
&& mq
->des
== MQD_INVALID
)
358 rb_raise(rb_eIOError
, "closed queue descriptor");
363 static void check_struct_type(VALUE astruct
)
365 if (CLASS_OF(astruct
) == cAttr
)
367 astruct
= rb_inspect(astruct
);
368 rb_raise(rb_eTypeError
, "not a POSIX_MQ::Attr: %s",
369 StringValuePtr(astruct
));
372 static void rstruct2mqattr(struct mq_attr
*attr
, VALUE astruct
, int all
)
376 check_struct_type(astruct
);
377 attr
->mq_flags
= NUM2LONG(rb_funcall(astruct
, id_flags
, 0));
379 tmp
= rb_funcall(astruct
, id_maxmsg
, 0);
380 if (all
|| !NIL_P(tmp
))
381 attr
->mq_maxmsg
= NUM2LONG(tmp
);
383 tmp
= rb_funcall(astruct
, id_msgsize
, 0);
384 if (all
|| !NIL_P(tmp
))
385 attr
->mq_msgsize
= NUM2LONG(tmp
);
387 tmp
= rb_funcall(astruct
, id_curmsgs
, 0);
389 attr
->mq_curmsgs
= NUM2LONG(tmp
);
395 * POSIX_MQ.for_fd(socket) => mq
397 * Adopts a socket as a POSIX message queue. Argument will be
398 * checked to ensure it is a POSIX message queue socket.
400 * This is useful for adopting systemd sockets passed via the
401 * ListenMessageQueue directive.
402 * Returns a +POSIX_MQ+ instance. This method is only available
403 * under Linux and FreeBSD and is not intended to be portable.
406 static VALUE
for_fd(VALUE klass
, VALUE socket
)
408 VALUE mqv
= alloc(klass
);
409 struct posix_mq
*mq
= get(mqv
, 0);
413 mqd
= FD_TO_MQD(NUM2INT(socket
));
415 if (mq_getattr(mqd
, &mq
->attr
) < 0)
416 rb_sys_fail("provided file descriptor is not a POSIX MQ");
421 #endif /* FD_TO_MQD */
425 * POSIX_MQ.new(name [, flags [, mode [, mq_attr]]) => mq
427 * Opens a POSIX message queue given by +name+. +name+ should start
428 * with a slash ("/") for portable applications.
430 * If a Symbol is given in place of integer +flags+, then:
432 * * +:r+ is equivalent to IO::RDONLY
433 * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
434 * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
436 * +mode+ is an integer and only used when IO::CREAT is used.
437 * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
438 * If +mq_attr+ is not specified when creating a queue, then the
439 * system defaults will be used.
441 * See the manpage for mq_open(3) for more details on this function.
443 static VALUE
init(int argc
, VALUE
*argv
, VALUE self
)
445 struct posix_mq
*mq
= get(self
, 0);
447 VALUE name
, oflags
, mode
, attr
;
449 rb_scan_args(argc
, argv
, "13", &name
, &oflags
, &mode
, &attr
);
451 switch (TYPE(oflags
)) {
458 else if (oflags
== sym_w
)
459 x
.oflags
= O_CREAT
|O_WRONLY
;
460 else if (oflags
== sym_rw
)
461 x
.oflags
= O_CREAT
|O_RDWR
;
463 oflags
= rb_inspect(oflags
);
464 rb_raise(rb_eArgError
,
465 "symbol must be :r, :w, or :rw: %s",
466 StringValuePtr(oflags
));
471 x
.oflags
= NUM2INT(oflags
);
474 rb_raise(rb_eArgError
, "flags must be an int, :r, :w, or :wr");
477 x
.name
= StringValueCStr(name
);
480 switch (TYPE(mode
)) {
483 x
.mode
= NUM2UINT(mode
);
486 if (x
.oflags
& O_CREAT
) {
492 rb_raise(rb_eArgError
, "mode not an integer");
495 switch (TYPE(attr
)) {
498 rstruct2mqattr(&x
.attr
, attr
, 1);
500 /* principle of least surprise */
501 if (x
.attr
.mq_flags
& O_NONBLOCK
)
502 x
.oflags
|= O_NONBLOCK
;
507 check_struct_type(attr
);
512 if (mq
->des
== MQD_INVALID
) {
522 if (mq
->des
== MQD_INVALID
)
523 rb_sys_fail("mq_open");
526 mq
->name
= rb_str_new_frozen(name
);
527 if (x
.oflags
& O_NONBLOCK
)
528 mq
->attr
.mq_flags
= O_NONBLOCK
;
535 * POSIX_MQ.unlink(name) => 1
537 * Unlinks the message queue given by +name+. The queue will be destroyed
538 * when the last process with the queue open closes its queue descriptors.
540 static VALUE
s_unlink(VALUE self
, VALUE name
)
542 int rv
= mq_unlink(StringValueCStr(name
));
545 rb_sys_fail("mq_unlink");
554 * Unlinks the message queue to prevent other processes from accessing it.
555 * All existing queue descriptors to this queue including those opened by
556 * other processes are unaffected. The queue will only be destroyed
557 * when the last process with open descriptors to this queue closes
560 static VALUE
_unlink(VALUE self
)
562 struct posix_mq
*mq
= get(self
, 0);
565 if (NIL_P(mq
->name
)) {
566 rb_raise(rb_eArgError
, "can not unlink an adopted socket");
569 assert(TYPE(mq
->name
) == T_STRING
&& "mq->name is not a string");
571 rv
= mq_unlink(RSTRING_PTR(mq
->name
));
573 rb_sys_fail("mq_unlink");
578 static void setup_send_buffer(struct rw_args
*x
, VALUE buffer
)
580 buffer
= rb_obj_as_string(buffer
);
581 x
->msg_ptr
= RSTRING_PTR(buffer
);
582 x
->msg_len
= (size_t)RSTRING_LEN(buffer
);
585 static VALUE
_send(int sflags
, int argc
, VALUE
*argv
, VALUE self
);
589 * mq.send(string [,priority[, timeout]]) => true
591 * Inserts the given +string+ into the message queue with an optional,
592 * unsigned integer +priority+. If the optional +timeout+ is specified,
593 * then Errno::ETIMEDOUT will be raised if the operation cannot complete
594 * before +timeout+ seconds has elapsed. Without +timeout+, this method
595 * may block until the queue is writable.
597 * On some older systems, the +timeout+ argument is not currently
598 * supported and may raise NotImplementedError if +timeout+ is used.
600 static VALUE
my_send(int argc
, VALUE
*argv
, VALUE self
)
602 return _send(0, argc
, argv
, self
);
605 static VALUE
_send(int sflags
, int argc
, VALUE
*argv
, VALUE self
)
607 struct posix_mq
*mq
= get(self
, 1);
609 VALUE buffer
, prio
, timeout
;
610 struct timespec expire
;
612 rb_scan_args(argc
, argv
, "12", &buffer
, &prio
, &timeout
);
614 setup_send_buffer(&x
, buffer
);
616 x
.timeout
= convert_timeout(&expire
, timeout
);
617 x
.msg_prio
= NIL_P(prio
) ? 0 : NUM2UINT(prio
);
620 WITHOUT_GVL(xsend
, &x
, RUBY_UBF_IO
, 0);
624 if (errno
== EAGAIN
&& (sflags
& PMQ_TRY
))
626 rb_sys_fail("mq_send");
636 * Inserts the given +string+ into the message queue with a
637 * default priority of 0 and no timeout.
639 * Returns itself so its calls may be chained. This use is only
640 * recommended only for users who expect blocking behavior from
643 static VALUE
send0(VALUE self
, VALUE buffer
)
645 struct posix_mq
*mq
= get(self
, 1);
648 setup_send_buffer(&x
, buffer
);
654 WITHOUT_GVL(xsend
, &x
, RUBY_UBF_IO
, 0);
658 rb_sys_fail("mq_send");
669 * Returns an IO.select-able +IO+ object. This method is only available
670 * under Linux and FreeBSD and is not intended to be portable.
672 static VALUE
to_io(VALUE self
)
674 struct posix_mq
*mq
= get(self
, 1);
675 int fd
= MQD_TO_FD(mq
->des
);
678 mq
->io
= rb_funcall(rb_cIO
, id_new
, 1, INT2NUM(fd
));
681 rb_funcall(mq
->io
, id_setautoclose
, 1, Qfalse
);
688 static VALUE
_receive(int rflags
, int argc
, VALUE
*argv
, VALUE self
);
692 * mq.receive([buffer, [timeout]]) => [ message, priority ]
694 * Takes the highest priority message off the queue and returns
695 * an array containing the message as a String and the Integer
696 * priority of the message.
698 * If the optional +buffer+ is present, then it must be a String
699 * which will receive the data.
701 * If the optional +timeout+ is present, then it may be a Float
702 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
703 * will be raised if +timeout+ has elapsed and there are no messages
706 * On some older systems, the +timeout+ argument is not currently
707 * supported and may raise NotImplementedError if +timeout+ is used.
709 static VALUE
receive(int argc
, VALUE
*argv
, VALUE self
)
711 return _receive(PMQ_WANTARRAY
, argc
, argv
, self
);
716 * mq.shift([buffer, [timeout]]) => message
718 * Takes the highest priority message off the queue and returns
719 * the message as a String.
721 * If the optional +buffer+ is present, then it must be a String
722 * which will receive the data.
724 * If the optional +timeout+ is present, then it may be a Float
725 * or Integer specifying the timeout in seconds. Errno::ETIMEDOUT
726 * will be raised if +timeout+ has elapsed and there are no messages
729 * On some older systems, the +timeout+ argument is not currently
730 * supported and may raise NotImplementedError if +timeout+ is used.
732 static VALUE
shift(int argc
, VALUE
*argv
, VALUE self
)
734 return _receive(0, argc
, argv
, self
);
737 static VALUE
_receive(int rflags
, int argc
, VALUE
*argv
, VALUE self
)
739 struct posix_mq
*mq
= get(self
, 1);
741 VALUE buffer
, timeout
;
742 struct timespec expire
;
744 if (mq
->attr
.mq_msgsize
< 0) {
745 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
746 rb_sys_fail("mq_getattr");
749 rb_scan_args(argc
, argv
, "02", &buffer
, &timeout
);
750 x
.timeout
= convert_timeout(&expire
, timeout
);
753 buffer
= rb_str_new(0, mq
->attr
.mq_msgsize
);
756 rb_str_modify(buffer
);
757 rb_str_resize(buffer
, mq
->attr
.mq_msgsize
);
760 x
.msg_ptr
= RSTRING_PTR(buffer
);
761 x
.msg_len
= (size_t)mq
->attr
.mq_msgsize
;
765 WITHOUT_GVL(xrecv
, &x
, RUBY_UBF_IO
, 0);
766 if (x
.received
< 0) {
769 if (errno
== EAGAIN
&& (rflags
& PMQ_TRY
))
771 rb_sys_fail("mq_receive");
774 rb_str_set_len(buffer
, x
.received
);
776 if (rflags
& PMQ_WANTARRAY
)
777 return rb_ary_new3(2, buffer
, UINT2NUM(x
.msg_prio
));
785 * Returns a POSIX_MQ::Attr struct containing the attributes
786 * of the message queue. See the mq_getattr(3) manpage for
789 static VALUE
getattr(VALUE self
)
791 struct posix_mq
*mq
= get(self
, 1);
793 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
794 rb_sys_fail("mq_getattr");
796 return rb_funcall(cAttr
, id_new
, 4,
797 LONG2NUM(mq
->attr
.mq_flags
),
798 LONG2NUM(mq
->attr
.mq_maxmsg
),
799 LONG2NUM(mq
->attr
.mq_msgsize
),
800 LONG2NUM(mq
->attr
.mq_curmsgs
));
805 * mq.attr = POSIX_MQ::Attr(IO::NONBLOCK) => mq_attr
807 * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
808 * See the mq_setattr(3) manpage for more details.
810 * Consider using the POSIX_MQ#nonblock= method as it is easier and
811 * more natural to use.
813 static VALUE
setattr(VALUE self
, VALUE astruct
)
815 struct posix_mq
*mq
= get(self
, 1);
816 struct mq_attr newattr
;
818 rstruct2mqattr(&newattr
, astruct
, 0);
820 if (mq_setattr(mq
->des
, &newattr
, NULL
) < 0)
821 rb_sys_fail("mq_setattr");
830 * Closes the underlying message queue descriptor.
831 * If this descriptor had a registered notification request, the request
832 * will be removed so another descriptor or process may register a
833 * notification request. Message queue descriptors are automatically
834 * closed by garbage collection.
836 static VALUE
_close(VALUE self
)
840 if (IDEMPOTENT_IO_CLOSE
) { /* defined in extconf.rb */
842 if (!mq
|| (mq
->des
== MQD_INVALID
))
848 if (! MQ_IO_CLOSE(mq
)) {
849 if (mq_close(mq
->des
) < 0)
850 rb_sys_fail("mq_close");
852 mq
->des
= MQD_INVALID
;
859 * mq.closed? => true or false
861 * Returns +true+ if the message queue descriptor is closed and therefore
862 * unusable, otherwise +false+
864 static VALUE
closed(VALUE self
)
866 struct posix_mq
*mq
= get(self
, 0);
868 return mq
->des
== MQD_INVALID
? Qtrue
: Qfalse
;
875 * Returns the string name of message queue associated with +mq+
877 static VALUE
name(VALUE self
)
879 struct posix_mq
*mq
= get(self
, 0);
881 if (NIL_P(mq
->name
)) {
883 * We could use readlink(2) on /proc/self/fd/N, but lots of
885 * http://stackoverflow.com/questions/1188757/
887 rb_raise(rb_eArgError
, "can not get name of an adopted socket");
890 /* XXX compatibility: in retrospect, we could return a frozen string */
891 return rb_str_dup(mq
->name
);
894 static int lookup_sig(VALUE sig
)
900 sig
= rb_obj_as_string(sig
);
901 len
= RSTRING_LEN(sig
);
902 ptr
= RSTRING_PTR(sig
);
904 if (len
> 3 && !memcmp("SIG", ptr
, 3))
905 sig
= rb_str_new(ptr
+ 3, len
- 3);
908 VALUE mSignal
= rb_const_get(rb_cObject
, rb_intern("Signal"));
910 list
= rb_funcall(mSignal
, rb_intern("list"), 0);
912 rb_global_variable(&list
);
915 sig
= rb_hash_aref(list
, sig
);
917 rb_raise(rb_eArgError
, "invalid signal: %s\n", ptr
);
923 * TODO: Under Linux, we could just use netlink directly
924 * the same way glibc does...
926 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
927 static void thread_notify_fd(union sigval sv
)
929 int fd
= sv
.sival_int
;
931 while ((write(fd
, "", 1) < 0) && (errno
== EINTR
|| errno
== EAGAIN
));
934 static void my_mq_notify(mqd_t des
, struct sigevent
*not)
936 int rv
= mq_notify(des
, not);
939 if (errno
== ENOMEM
) {
941 rv
= mq_notify(des
, not);
944 rb_sys_fail("mq_notify");
948 static void lower_stack_size(pthread_attr_t
*attr
)
950 /* some OSes have ridiculously small stack sizes */
951 #ifdef PTHREAD_STACK_MIN
952 size_t stack_size
= PTHREAD_STACK_MIN
;
953 size_t min_size
= 4096;
955 if (stack_size
< min_size
)
956 stack_size
= min_size
;
957 pthread_attr_setstacksize(attr
, stack_size
);
962 static VALUE
setnotify_exec(VALUE self
, VALUE io
, VALUE thr
)
964 int fd
= NUM2INT(rb_funcall(io
, id_fileno
, 0));
965 struct posix_mq
*mq
= get(self
, 1);
969 errno
= pthread_attr_init(&attr
);
970 if (errno
) rb_sys_fail("pthread_attr_init");
972 errno
= pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_DETACHED
);
973 if (errno
) rb_sys_fail("pthread_attr_setdetachstate");
975 lower_stack_size(&attr
);
976 not.sigev_notify
= SIGEV_THREAD
;
977 not.sigev_notify_function
= thread_notify_fd
;
978 not.sigev_notify_attributes
= &attr
;
979 not.sigev_value
.sival_int
= fd
;
981 if (!NIL_P(mq
->thread
))
982 rb_funcall(mq
->thread
, id_kill
, 0);
985 my_mq_notify(mq
->des
, ¬);
991 static VALUE
notify_cleanup(VALUE self
)
993 struct posix_mq
*mq
= get(self
, 1);
995 if (!NIL_P(mq
->thread
)) {
996 rb_funcall(mq
->thread
, id_kill
, 0);
1004 * mq.notify = signal => signal
1006 * Registers the notification request to deliver a given +signal+
1007 * to the current process when message is received.
1008 * If +signal+ is +nil+, it will unregister and disable the notification
1009 * request to allow other processes to register a request.
1010 * If +signal+ is +false+, it will register a no-op notification request
1011 * which will prevent other processes from registering a notification.
1012 * If +signal+ is an +IO+ object, it will spawn a thread upon the
1013 * arrival of the next message and write one "\\0" byte to the file
1014 * descriptor belonging to that IO object.
1015 * Only one process may have a notification request for a queue
1016 * at a time, Errno::EBUSY will be raised if there is already
1017 * a notification request registration for the queue.
1019 * Notifications are only fired once and processes must reregister
1020 * for subsequent notifications.
1022 * For readers of the mq_notify(3) manpage, passing +false+
1023 * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
1024 * of passing a NULL notification pointer to mq_notify(3).
1026 static VALUE
setnotify(VALUE self
, VALUE arg
)
1028 struct posix_mq
*mq
= get(self
, 1);
1029 struct sigevent
not;
1030 struct sigevent
* notification
= ¬
1033 notify_cleanup(self
);
1034 not.sigev_notify
= SIGEV_SIGNAL
;
1036 switch (TYPE(arg
)) {
1038 not.sigev_notify
= SIGEV_NONE
;
1041 notification
= NULL
;
1044 not.sigev_signo
= NUM2INT(arg
);
1048 not.sigev_signo
= lookup_sig(arg
);
1049 rv
= INT2NUM(not.sigev_signo
);
1052 rb_raise(rb_eArgError
, "must be a signal or nil");
1055 my_mq_notify(mq
->des
, notification
);
1062 * mq.nonblock? => true or false
1064 * Returns the current non-blocking state of the message queue descriptor.
1066 static VALUE
nonblock_p(VALUE self
)
1068 struct posix_mq
*mq
= get(self
, 1);
1070 if (mq_getattr(mq
->des
, &mq
->attr
) < 0)
1071 rb_sys_fail("mq_getattr");
1072 return mq
->attr
.mq_flags
& O_NONBLOCK
? Qtrue
: Qfalse
;
1077 * mq.nonblock = boolean => boolean
1079 * Enables or disables non-blocking operation for the message queue
1080 * descriptor. Errno::EAGAIN will be raised in situations where
1081 * the queue would block. This is not compatible with +timeout+
1082 * arguments to POSIX_MQ#send and POSIX_MQ#receive.
1084 static VALUE
setnonblock(VALUE self
, VALUE nb
)
1086 struct mq_attr newattr
;
1087 struct posix_mq
*mq
= get(self
, 1);
1090 newattr
.mq_flags
= O_NONBLOCK
;
1091 else if (nb
== Qfalse
)
1092 newattr
.mq_flags
= 0;
1094 rb_raise(rb_eArgError
, "must be true or false");
1096 if (mq_setattr(mq
->des
, &newattr
, &mq
->attr
) < 0)
1097 rb_sys_fail("mq_setattr");
1099 mq
->attr
.mq_flags
= newattr
.mq_flags
;
1106 * mq.autoclose = boolean => boolean
1108 * Determines whether or not the _mq_ will be closed automatically
1111 static VALUE
setautoclose(VALUE self
, VALUE autoclose
)
1113 struct posix_mq
*mq
= get(self
, 1);
1115 MQ_IO_SET_AUTOCLOSE(mq
, autoclose
);
1116 mq
->autoclose
= RTEST(autoclose
) ? 1 : 0;
1122 * mq.autoclose? => boolean
1124 * Returns whether or not the _mq_ will be closed automatically
1127 static VALUE
autoclose_p(VALUE self
)
1129 struct posix_mq
*mq
= get(self
, 1);
1131 return mq
->autoclose
? Qtrue
: Qfalse
;
1136 * mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
1138 * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
1139 * Errno::EAGAIN when non-blocking operation is desired and returns +true+
1140 * on success instead of +nil+.
1142 * This does not guarantee non-blocking behavior, the message queue must
1143 * be made non-blocking before calling this method.
1145 static VALUE
trysend(int argc
, VALUE
*argv
, VALUE self
)
1147 return _send(PMQ_TRY
, argc
, argv
, self
);
1152 * mq.tryshift([buffer [, timeout]]) => message or nil
1154 * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
1155 * Errno::EAGAIN when non-blocking operation is desired.
1157 * This does not guarantee non-blocking behavior, the message queue must
1158 * be made non-blocking before calling this method.
1160 static VALUE
tryshift(int argc
, VALUE
*argv
, VALUE self
)
1162 return _receive(PMQ_TRY
, argc
, argv
, self
);
1167 * mq.tryreceive([buffer [, timeout]]) => [ message, priority ] or nil
1169 * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
1170 * Errno::EAGAIN when non-blocking operation is desired.
1172 * This does not guarantee non-blocking behavior, the message queue must
1173 * be made non-blocking before calling this method.
1175 static VALUE
tryreceive(int argc
, VALUE
*argv
, VALUE self
)
1177 return _receive(PMQ_WANTARRAY
|PMQ_TRY
, argc
, argv
, self
);
1180 void Init_posix_mq_ext(void)
1182 VALUE cPOSIX_MQ
= rb_define_class("POSIX_MQ", rb_cObject
);
1183 rb_define_alloc_func(cPOSIX_MQ
, alloc
);
1184 cAttr
= rb_const_get(cPOSIX_MQ
, rb_intern("Attr"));
1187 * The maximum number of open message descriptors supported
1188 * by the system. This may be -1, in which case it is dynamically
1189 * set at runtime. Consult your operating system documentation
1190 * for system-specific information about this.
1192 rb_define_const(cPOSIX_MQ
, "OPEN_MAX",
1193 LONG2NUM(sysconf(_SC_MQ_OPEN_MAX
)));
1196 * The maximum priority that may be specified for POSIX_MQ#send
1197 * On POSIX-compliant systems, this is at least 31, but some
1198 * systems allow higher limits.
1199 * The minimum priority is always zero.
1201 rb_define_const(cPOSIX_MQ
, "PRIO_MAX",
1202 LONG2NUM(sysconf(_SC_MQ_PRIO_MAX
)));
1204 rb_define_singleton_method(cPOSIX_MQ
, "unlink", s_unlink
, 1);
1206 rb_define_private_method(cPOSIX_MQ
, "initialize", init
, -1);
1207 rb_define_method(cPOSIX_MQ
, "send", my_send
, -1);
1208 rb_define_method(cPOSIX_MQ
, "<<", send0
, 1);
1209 rb_define_method(cPOSIX_MQ
, "trysend", trysend
, -1);
1210 rb_define_method(cPOSIX_MQ
, "receive", receive
, -1);
1211 rb_define_method(cPOSIX_MQ
, "tryreceive", tryreceive
, -1);
1212 rb_define_method(cPOSIX_MQ
, "shift", shift
, -1);
1213 rb_define_method(cPOSIX_MQ
, "tryshift", tryshift
, -1);
1214 rb_define_method(cPOSIX_MQ
, "attr", getattr
, 0);
1215 rb_define_method(cPOSIX_MQ
, "attr=", setattr
, 1);
1216 rb_define_method(cPOSIX_MQ
, "close", _close
, 0);
1217 rb_define_method(cPOSIX_MQ
, "closed?", closed
, 0);
1218 rb_define_method(cPOSIX_MQ
, "unlink", _unlink
, 0);
1219 rb_define_method(cPOSIX_MQ
, "name", name
, 0);
1220 rb_define_method(cPOSIX_MQ
, "notify=", setnotify
, 1);
1221 rb_define_method(cPOSIX_MQ
, "nonblock=", setnonblock
, 1);
1222 rb_define_private_method(cPOSIX_MQ
, "notify_exec", setnotify_exec
, 2);
1223 rb_define_private_method(cPOSIX_MQ
, "notify_cleanup", notify_cleanup
, 0);
1224 rb_define_method(cPOSIX_MQ
, "nonblock?", nonblock_p
, 0);
1225 rb_define_method(cPOSIX_MQ
, "autoclose?", autoclose_p
, 0);
1226 rb_define_method(cPOSIX_MQ
, "autoclose=", setautoclose
, 1);
1228 rb_define_method(cPOSIX_MQ
, "to_io", to_io
, 0);
1229 id_setautoclose
= rb_intern("autoclose=");
1233 rb_define_module_function(cPOSIX_MQ
, "for_fd", for_fd
, 1);
1236 id_new
= rb_intern("new");
1237 id_kill
= rb_intern("kill");
1238 id_fileno
= rb_intern("fileno");
1239 id_divmod
= rb_intern("divmod");
1240 id_flags
= rb_intern("flags");
1241 id_maxmsg
= rb_intern("maxmsg");
1242 id_msgsize
= rb_intern("msgsize");
1243 id_curmsgs
= rb_intern("curmsgs");
1244 sym_r
= ID2SYM(rb_intern("r"));
1245 sym_w
= ID2SYM(rb_intern("w"));
1246 sym_rw
= ID2SYM(rb_intern("rw"));