include headers in distribution
[lwes-journaller.git] / src / queue_mqueue.c
blobce193181e40eeaeea06aba0794bc1aed3d6e4e7e
1 /*======================================================================*
2 * Copyright (c) 2008, Yahoo! Inc. All rights reserved. *
3 * *
4 * Licensed under the New BSD License (the "License"); you may not use *
5 * this file except in compliance with the License. Unless required *
6 * by applicable law or agreed to in writing, software distributed *
7 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
8 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
9 * See the License for the specific language governing permissions and *
10 * limitations under the License. See accompanying LICENSE file. *
11 *======================================================================*/
13 #include "config.h"
15 #include "queue.h"
16 #include "queue_mqueue.h"
18 #include "perror.h"
19 #include "opt.h"
20 #include "sig.h"
22 #if defined(HAVE_MQUEUE_H)
24 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
25 #include <mqueue.h>
26 #include <sys/time.h>
27 #include <sys/resource.h>
28 #include <fcntl.h>
29 #include <limits.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <time.h>
35 /* The following is for Solaris. */
36 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
37 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
38 #endif
40 struct priv {
41 char* path;
42 mqd_t mq;
43 size_t max_sz;
44 size_t max_cnt;
47 static void destructor (struct queue* this_queue)
49 struct priv* ppriv = (struct priv*)this_queue->priv;
51 free (ppriv->path);
52 free (ppriv);
54 this_queue->vtbl = 0;
55 this_queue->priv = 0;
58 static int xopen (struct queue* this_queue, int flags)
60 struct priv* ppriv = (struct priv*)this_queue->priv;
62 if ( (mqd_t)-1 == ppriv->mq )
64 struct mq_attr attr;
65 memset(&attr, 0, sizeof(attr));
66 attr.mq_maxmsg = ppriv->max_cnt;
67 attr.mq_msgsize = ppriv->max_sz;
69 errno = 0 ;
70 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
71 if ( (mqd_t)-1 == (ppriv->mq = mq_open (ppriv->path,
72 flags | O_CREAT, 0666, &attr)) )
74 /* Try resetting the mqueue resource parameters before formally
75 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
77 struct rlimit rlim;
78 PERROR("mq_open");
79 errno = 0;
80 attr.mq_maxmsg += 1;
81 rlim.rlim_cur = RLIM_INFINITY; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
82 rlim.rlim_max = RLIM_INFINITY ; /*rlim.rlim_cur ; */
83 if ( setrlimit (RLIMIT_MSGQUEUE, &rlim) )
85 PERROR("setrlimit");
87 attr.mq_maxmsg -= 1 ;
89 if ( (mqd_t)-1 == (ppriv->mq = mq_open (ppriv->path,
90 flags | O_CREAT,
91 0666, &attr)) )
93 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
94 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
95 ppriv->path, flags | O_CREAT, attr.mq_maxmsg,
96 attr.mq_msgsize);
97 return -1;
102 return 0;
105 static int xclose (struct queue* this_queue)
107 struct priv* ppriv = (struct priv*)this_queue->priv;
109 if ( (mqd_t)-1 != ppriv->mq )
111 if ( (mq_close (ppriv->mq)) < 0 )
113 PERROR("mq_close");
114 return -1;
118 ppriv->mq = (mqd_t)-1;
120 return 0;
123 static int xread (struct queue* this_queue, void* buf,
124 size_t count, int* pending)
126 struct priv* ppriv = (struct priv*)this_queue->priv;
127 struct mq_attr attr;
128 int mq_rec_rtrn;
129 unsigned int pri;
131 if ( 0 == buf )
133 LOG_ER("queue read with NULL buf pointer\n");
134 return -1;
136 if ( (mqd_t)-1 == ppriv->mq )
138 LOG_ER("queue read with queue closed.\n");
139 return -1;
142 LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done);
144 /* use blocking reads unless we're shutting down. */
145 if (gbl_done)
147 struct timespec time_buf = { time(NULL)+1, 500000000 };
148 mq_rec_rtrn = mq_timedreceive (ppriv->mq, buf, count, &pri, &time_buf);
150 else
152 mq_rec_rtrn = mq_receive (ppriv->mq, buf, count, &pri);
155 if (mq_rec_rtrn < 0 )
157 LOG_PROG("errno: %d %s\n", errno, strerror(errno));
158 switch ( errno )
160 default:
161 PERROR("mq_receive");
163 /* If we've been interrupted it's likely that we're shutting
164 down, so no need to print errors. */
165 case ETIMEDOUT:
166 case EINTR:
167 return mq_rec_rtrn;
171 if ( MQ_PRIO_MAX - 1 != pri )
173 LOG_ER("queue read returned message with unrecognized "
174 "priority (pri=%d).\n", pri);
177 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn);
179 if ( mq_getattr (ppriv->mq, &attr) < 0 )
181 PERROR("mq_getattr");
182 return -1;
185 *pending = attr.mq_curmsgs;
187 return mq_rec_rtrn;
190 static int xwrite (struct queue* this_queue, const void* buf, size_t count)
192 struct priv* ppriv = (struct priv*)this_queue->priv;
194 if ( 0 == buf )
196 LOG_ER("queue write with NULL buf pointer\n");
197 return -1;
199 if ( (mqd_t)-1 == ppriv->mq )
201 LOG_ER("queue write with queue closed.\n");
202 return -1;
205 LOG_PROG("about to call mq_send().\n");
207 if ( 0 != mq_send (ppriv->mq, buf, count, MQ_PRIO_MAX-1) )
209 PERROR("mq_send");
210 return -1;
213 return 0;
216 static void* alloc (struct queue* this_queue, size_t* newcount)
218 return malloc (*newcount = ((struct priv*)this_queue->priv)->max_sz);
221 static void dealloc (struct queue* this_queue, void* buf)
223 (void) this_queue; /* appease -Wall -Werror */
224 free (buf);
227 int queue_mqueue_ctor (struct queue* this_queue,
228 const char* path,
229 size_t max_sz,
230 size_t max_cnt)
232 static struct queue_vtbl vtbl = {
233 destructor,
234 xopen, xclose,
235 xread, xwrite,
236 alloc, dealloc
239 struct priv* ppriv;
241 this_queue->vtbl = 0;
242 this_queue->priv = 0;
244 ppriv = (struct priv*)malloc(sizeof(struct priv));
245 if ( 0 == ppriv )
247 LOG_ER("malloc failed attempting to allocate %d bytes\n",
248 sizeof(*ppriv));
249 return -1;
251 memset(ppriv, 0, sizeof(*ppriv));
253 if ( 0 == (ppriv->path = strdup(path)) )
255 LOG_ER("strdup failed attempting to dup \"%s\"\n",
256 path);
257 free(ppriv);
258 return -1;
261 ppriv->mq = (mqd_t)-1;
262 ppriv->max_sz = max_sz;
263 ppriv->max_cnt = max_cnt;
265 this_queue->vtbl = &vtbl;
266 this_queue->priv = ppriv;
268 return 0;
271 #else /* if defined(HAVE_MQUEUE_H) */
273 int queue_mqueue_ctor (struct queue* this_queue,
274 const char* path,
275 size_t max_sz,
276 size_t max_cnt)
278 this_queue->vtbl = 0;
279 this_queue->priv = 0;
280 (void)path; /* appease -Wall -Werror */
281 (void)max_sz; /* appease -Wall -Werror */
282 (void)max_cnt; /* appease -Wall -Werror */
284 return -1;
287 #endif /* HAVE_MQUEUE_H */