Use blocking reads unless we're shutting down, so we don't have to call time() once...
[lwes-journaller.git] / src / queue_mqueue.c
blobcdb99941b299f15a469bb8a17fc5c22c43d24d67
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "queue.h"
24 #include "queue_mqueue.h"
26 #include "perror.h"
27 #include "opt.h"
28 #include "sig.h"
30 #if defined(HAVE_MQUEUE_H)
32 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
33 #include <mqueue.h>
34 #include <sys/time.h>
35 #include <sys/resource.h>
36 #include <fcntl.h>
37 #include <limits.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <time.h>
43 /* The following is for Solaris. */
44 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
45 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
46 #endif
48 struct priv {
49 char* path;
50 mqd_t mq;
51 size_t max_sz;
52 size_t max_cnt;
55 static void destructor (struct queue* this_queue)
57 struct priv* ppriv = (struct priv*)this_queue->priv;
59 free (ppriv->path);
60 free (ppriv);
62 this_queue->vtbl = 0;
63 this_queue->priv = 0;
66 static int xopen (struct queue* this_queue, int flags)
68 struct priv* ppriv = (struct priv*)this_queue->priv;
70 if ( (mqd_t)-1 == ppriv->mq )
72 struct mq_attr attr;
73 memset(&attr, 0, sizeof(attr));
74 attr.mq_maxmsg = ppriv->max_cnt;
75 attr.mq_msgsize = ppriv->max_sz;
77 errno = 0 ;
78 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
79 if ( (mqd_t)-1 == (ppriv->mq = mq_open (ppriv->path,
80 flags | O_CREAT, 0666, &attr)) )
82 /* Try resetting the mqueue resource parameters before formally
83 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
85 struct rlimit rlim;
86 PERROR("mq_open");
87 errno = 0;
88 attr.mq_maxmsg += 1;
89 rlim.rlim_cur = RLIM_INFINITY; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
90 rlim.rlim_max = RLIM_INFINITY ; /*rlim.rlim_cur ; */
91 if ( setrlimit (RLIMIT_MSGQUEUE, &rlim) )
93 PERROR("setrlimit");
95 attr.mq_maxmsg -= 1 ;
97 if ( (mqd_t)-1 == (ppriv->mq = mq_open (ppriv->path,
98 flags | O_CREAT,
99 0666, &attr)) )
101 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
102 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
103 ppriv->path, flags | O_CREAT, attr.mq_maxmsg,
104 attr.mq_msgsize);
105 return -1;
110 return 0;
113 static int xclose (struct queue* this_queue)
115 struct priv* ppriv = (struct priv*)this_queue->priv;
117 if ( (mqd_t)-1 != ppriv->mq )
119 if ( (mq_close (ppriv->mq)) < 0 )
121 PERROR("mq_close");
122 return -1;
126 ppriv->mq = (mqd_t)-1;
128 return 0;
131 static int xread (struct queue* this_queue, void* buf,
132 size_t count, int* pending)
134 struct priv* ppriv = (struct priv*)this_queue->priv;
135 struct mq_attr attr;
136 int mq_rec_rtrn;
137 unsigned int pri;
139 if ( 0 == buf )
141 LOG_ER("queue read with NULL buf pointer\n");
142 return -1;
144 if ( (mqd_t)-1 == ppriv->mq )
146 LOG_ER("queue read with queue closed.\n");
147 return -1;
150 LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done);
152 /* use blocking reads unless we're shutting down. */
153 if (gbl_done)
155 struct timespec time_buf = { time(NULL)+1, 500000000 };
156 mq_rec_rtrn = mq_timedreceive (ppriv->mq, buf, count, &pri, &time_buf);
158 else
160 mq_rec_rtrn = mq_receive (ppriv->mq, buf, count, &pri);
163 if (mq_rec_rtrn < 0 )
165 LOG_PROG("errno: %d %s\n", errno, strerror(errno));
166 switch ( errno )
168 default:
169 PERROR("mq_receive");
171 /* If we've been interrupted it's likely that we're shutting
172 down, so no need to print errors. */
173 case ETIMEDOUT:
174 case EINTR:
175 return mq_rec_rtrn;
179 if ( MQ_PRIO_MAX - 1 != pri )
181 LOG_ER("queue read returned message with unrecognized "
182 "priority (pri=%d).\n", pri);
185 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn);
187 if ( mq_getattr (ppriv->mq, &attr) < 0 )
189 PERROR("mq_getattr");
190 return -1;
193 *pending = attr.mq_curmsgs;
195 return mq_rec_rtrn;
198 static int xwrite (struct queue* this_queue, const void* buf, size_t count)
200 struct priv* ppriv = (struct priv*)this_queue->priv;
202 if ( 0 == buf )
204 LOG_ER("queue write with NULL buf pointer\n");
205 return -1;
207 if ( (mqd_t)-1 == ppriv->mq )
209 LOG_ER("queue write with queue closed.\n");
210 return -1;
213 LOG_PROG("about to call mq_send().\n");
215 if ( 0 != mq_send (ppriv->mq, buf, count, MQ_PRIO_MAX-1) )
217 PERROR("mq_send");
218 return -1;
221 return 0;
224 static void* alloc (struct queue* this_queue, size_t* newcount)
226 return malloc (*newcount = ((struct priv*)this_queue->priv)->max_sz);
229 static void dealloc (struct queue* this_queue, void* buf)
231 (void) this_queue; /* appease -Wall -Werror */
232 free (buf);
235 int queue_mqueue_ctor (struct queue* this_queue,
236 const char* path,
237 size_t max_sz,
238 size_t max_cnt)
240 static struct queue_vtbl vtbl = {
241 destructor,
242 xopen, xclose,
243 xread, xwrite,
244 alloc, dealloc
247 struct priv* ppriv;
249 this_queue->vtbl = 0;
250 this_queue->priv = 0;
252 ppriv = (struct priv*)malloc(sizeof(struct priv));
253 if ( 0 == ppriv )
255 LOG_ER("malloc failed attempting to allocate %d bytes\n",
256 sizeof(*ppriv));
257 return -1;
259 memset(ppriv, 0, sizeof(*ppriv));
261 if ( 0 == (ppriv->path = strdup(path)) )
263 LOG_ER("strdup failed attempting to dup \"%s\"\n",
264 path);
265 free(ppriv);
266 return -1;
269 ppriv->mq = (mqd_t)-1;
270 ppriv->max_sz = max_sz;
271 ppriv->max_cnt = max_cnt;
273 this_queue->vtbl = &vtbl;
274 this_queue->priv = ppriv;
276 return 0;
279 #else /* if defined(HAVE_MQUEUE_H) */
281 int queue_mqueue_ctor (struct queue* this_queue,
282 const char* path,
283 size_t max_sz,
284 size_t max_cnt)
286 this_queue->vtbl = 0;
287 this_queue->priv = 0;
288 (void)path; /* appease -Wall -Werror */
289 (void)max_sz; /* appease -Wall -Werror */
290 (void)max_cnt; /* appease -Wall -Werror */
292 return -1;
295 #endif /* HAVE_MQUEUE_H */