1 /*======================================================================*
2 * Copyright (c) 2008, Yahoo! Inc. All rights reserved. *
4 * Licensed under the New BSD License (the "License"); you may not use *
5 * this file except in compliance with the License. Unless required *
6 * by applicable law or agreed to in writing, software distributed *
7 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
8 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
9 * See the License for the specific language governing permissions and *
10 * limitations under the License. See accompanying LICENSE file. *
11 *======================================================================*/
16 #include "queue_mqueue.h"
22 #if defined(HAVE_MQUEUE_H)
24 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
27 #include <sys/resource.h>
35 /* The following is for Solaris. */
36 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
37 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
47 static void destructor (struct queue
* this_queue
)
49 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
58 static int xopen (struct queue
* this_queue
, int flags
)
60 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
62 if ( (mqd_t
)-1 == ppriv
->mq
)
65 memset(&attr
, 0, sizeof(attr
));
66 attr
.mq_maxmsg
= ppriv
->max_cnt
;
67 attr
.mq_msgsize
= ppriv
->max_sz
;
70 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
71 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
72 flags
| O_CREAT
, 0666, &attr
)) )
74 /* Try resetting the mqueue resource parameters before formally
75 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
81 rlim
.rlim_cur
= RLIM_INFINITY
; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
82 rlim
.rlim_max
= RLIM_INFINITY
; /*rlim.rlim_cur ; */
83 if ( setrlimit (RLIMIT_MSGQUEUE
, &rlim
) )
89 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
93 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
94 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
95 ppriv
->path
, flags
| O_CREAT
, attr
.mq_maxmsg
,
105 static int xclose (struct queue
* this_queue
)
107 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
109 if ( (mqd_t
)-1 != ppriv
->mq
)
111 if ( (mq_close (ppriv
->mq
)) < 0 )
118 ppriv
->mq
= (mqd_t
)-1;
123 static int xread (struct queue
* this_queue
, void* buf
,
124 size_t count
, int* pending
)
126 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
133 LOG_ER("queue read with NULL buf pointer\n");
136 if ( (mqd_t
)-1 == ppriv
->mq
)
138 LOG_ER("queue read with queue closed.\n");
142 LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done
);
144 /* use blocking reads unless we're shutting down. */
147 struct timespec time_buf
= { time(NULL
)+1, 500000000 };
148 mq_rec_rtrn
= mq_timedreceive (ppriv
->mq
, buf
, count
, &pri
, &time_buf
);
152 mq_rec_rtrn
= mq_receive (ppriv
->mq
, buf
, count
, &pri
);
155 if (mq_rec_rtrn
< 0 )
157 LOG_PROG("errno: %d %s\n", errno
, strerror(errno
));
161 PERROR("mq_receive");
163 /* If we've been interrupted it's likely that we're shutting
164 down, so no need to print errors. */
171 if ( MQ_PRIO_MAX
- 1 != pri
)
173 LOG_ER("queue read returned message with unrecognized "
174 "priority (pri=%d).\n", pri
);
177 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn
);
179 if ( mq_getattr (ppriv
->mq
, &attr
) < 0 )
181 PERROR("mq_getattr");
185 *pending
= attr
.mq_curmsgs
;
190 static int xwrite (struct queue
* this_queue
, const void* buf
, size_t count
)
192 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
196 LOG_ER("queue write with NULL buf pointer\n");
199 if ( (mqd_t
)-1 == ppriv
->mq
)
201 LOG_ER("queue write with queue closed.\n");
205 LOG_PROG("about to call mq_send().\n");
207 if ( 0 != mq_send (ppriv
->mq
, buf
, count
, MQ_PRIO_MAX
-1) )
216 static void* alloc (struct queue
* this_queue
, size_t* newcount
)
218 return malloc (*newcount
= ((struct priv
*)this_queue
->priv
)->max_sz
);
221 static void dealloc (struct queue
* this_queue
, void* buf
)
223 (void) this_queue
; /* appease -Wall -Werror */
227 int queue_mqueue_ctor (struct queue
* this_queue
,
232 static struct queue_vtbl vtbl
= {
241 this_queue
->vtbl
= 0;
242 this_queue
->priv
= 0;
244 ppriv
= (struct priv
*)malloc(sizeof(struct priv
));
247 LOG_ER("malloc failed attempting to allocate %d bytes\n",
251 memset(ppriv
, 0, sizeof(*ppriv
));
253 if ( 0 == (ppriv
->path
= strdup(path
)) )
255 LOG_ER("strdup failed attempting to dup \"%s\"\n",
261 ppriv
->mq
= (mqd_t
)-1;
262 ppriv
->max_sz
= max_sz
;
263 ppriv
->max_cnt
= max_cnt
;
265 this_queue
->vtbl
= &vtbl
;
266 this_queue
->priv
= ppriv
;
271 #else /* if defined(HAVE_MQUEUE_H) */
273 int queue_mqueue_ctor (struct queue
* this_queue
,
278 this_queue
->vtbl
= 0;
279 this_queue
->priv
= 0;
280 (void)path
; /* appease -Wall -Werror */
281 (void)max_sz
; /* appease -Wall -Werror */
282 (void)max_cnt
; /* appease -Wall -Werror */
287 #endif /* HAVE_MQUEUE_H */