1 /*======================================================================*
2 * Copyright (c) 2008, Yahoo! Inc. All rights reserved. *
3 * Copyright (c) 2010-2016, OpenX Inc. All rights reserved. *
5 * Licensed under the New BSD License (the "License"); you may not use *
6 * this file except in compliance with the License. Unless required *
7 * by applicable law or agreed to in writing, software distributed *
8 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
10 * See the License for the specific language governing permissions and *
11 * limitations under the License. See accompanying LICENSE file. *
12 *======================================================================*/
17 #include "queue_mqueue.h"
23 #if defined(HAVE_MQUEUE_H)
25 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
28 #include <sys/resource.h>
36 /* The following is for Solaris. */
37 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
38 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
48 static void destructor (struct queue
* this_queue
)
50 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
59 static int xopen (struct queue
* this_queue
, int flags
)
61 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
63 if ( (mqd_t
)-1 == ppriv
->mq
)
66 memset(&attr
, 0, sizeof(attr
));
67 attr
.mq_maxmsg
= ppriv
->max_cnt
;
68 attr
.mq_msgsize
= ppriv
->max_sz
;
71 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
72 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
73 flags
| O_CREAT
, 0666, &attr
)) )
75 /* Try resetting the mqueue resource parameters before formally
76 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
82 rlim
.rlim_cur
= RLIM_INFINITY
; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
83 rlim
.rlim_max
= RLIM_INFINITY
; /*rlim.rlim_cur ; */
84 if ( setrlimit (RLIMIT_MSGQUEUE
, &rlim
) )
90 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
94 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
95 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
96 ppriv
->path
, flags
| O_CREAT
, attr
.mq_maxmsg
,
105 static int xclose (struct queue
* this_queue
)
107 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
109 if ( (mqd_t
)-1 != ppriv
->mq
)
111 if ( (mq_close (ppriv
->mq
)) < 0 )
118 ppriv
->mq
= (mqd_t
)-1;
123 static int xread (struct queue
* this_queue
, void* buf
,
124 size_t count
, int* pending
)
126 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
133 LOG_ER("queue read with NULL buf pointer\n");
136 if ( (mqd_t
)-1 == ppriv
->mq
)
138 LOG_ER("queue read with queue closed.\n");
142 LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done
);
144 /* use blocking reads unless we're shutting down. */
145 struct timespec time_buf
= { time(NULL
), arg_wakup_interval_ms
* 1000000 };
146 mq_rec_rtrn
= mq_timedreceive (ppriv
->mq
, buf
, count
, &pri
, &time_buf
);
148 if (mq_rec_rtrn
< 0 )
150 LOG_PROG("errno: %d %s\n", errno
, strerror(errno
));
154 PERROR("mq_receive");
156 /* If we've been interrupted it's likely that we're shutting
157 down, so no need to print errors. */
164 if ( MQ_PRIO_MAX
- 1 != pri
)
166 LOG_ER("queue read returned message with unrecognized "
167 "priority (pri=%d).\n", pri
);
170 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn
);
172 if ( mq_getattr (ppriv
->mq
, &attr
) < 0 )
174 PERROR("mq_getattr");
178 *pending
= attr
.mq_curmsgs
;
183 static int xwrite (struct queue
* this_queue
, const void* buf
, size_t count
)
185 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
189 LOG_ER("queue write with NULL buf pointer\n");
192 if ( (mqd_t
)-1 == ppriv
->mq
)
194 LOG_ER("queue write with queue closed.\n");
198 LOG_PROG("about to call mq_send().\n");
200 if ( 0 != mq_send (ppriv
->mq
, buf
, count
, MQ_PRIO_MAX
-1) )
209 static void* alloc (struct queue
* this_queue
, size_t* newcount
)
211 void *data
= malloc (*newcount
= ((struct priv
*)this_queue
->priv
)->max_sz
);
212 memset(data
, 0, *newcount
);
216 static void dealloc (struct queue
* this_queue
, void* buf
)
218 (void) this_queue
; /* appease -Wall -Werror */
222 int queue_mqueue_ctor (struct queue
* this_queue
,
227 static struct queue_vtbl vtbl
= {
236 this_queue
->vtbl
= 0;
237 this_queue
->priv
= 0;
239 ppriv
= (struct priv
*)malloc(sizeof(struct priv
));
242 LOG_ER("malloc failed attempting to allocate %d bytes\n",
246 memset(ppriv
, 0, sizeof(*ppriv
));
248 if ( 0 == (ppriv
->path
= strdup(path
)) )
250 LOG_ER("strdup failed attempting to dup \"%s\"\n",
256 ppriv
->mq
= (mqd_t
)-1;
257 ppriv
->max_sz
= max_sz
;
258 ppriv
->max_cnt
= max_cnt
;
260 this_queue
->vtbl
= &vtbl
;
261 this_queue
->priv
= ppriv
;
266 #else /* if defined(HAVE_MQUEUE_H) */
268 int queue_mqueue_ctor (struct queue
* this_queue
,
273 this_queue
->vtbl
= 0;
274 this_queue
->priv
= 0;
275 (void)path
; /* appease -Wall -Werror */
276 (void)max_sz
; /* appease -Wall -Werror */
277 (void)max_cnt
; /* appease -Wall -Werror */
282 #endif /* HAVE_MQUEUE_H */