valgrind memcheck is as clean as I can make it
[lwes-journaller/github-mirror.git] / src / queue_mqueue.c
blobb8c4121127afa5c132b3f41196d1ec5df098109d
1 /*======================================================================*
2 * Copyright (c) 2008, Yahoo! Inc. All rights reserved. *
3 * Copyright (c) 2010-2016, OpenX Inc. All rights reserved. *
4 * *
5 * Licensed under the New BSD License (the "License"); you may not use *
6 * this file except in compliance with the License. Unless required *
7 * by applicable law or agreed to in writing, software distributed *
8 * under the License is distributed on an "AS IS" BASIS, WITHOUT *
9 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
10 * See the License for the specific language governing permissions and *
11 * limitations under the License. See accompanying LICENSE file. *
12 *======================================================================*/
14 #include "config.h"
16 #include "queue.h"
17 #include "queue_mqueue.h"
19 #include "perror.h"
20 #include "opt.h"
21 #include "sig.h"
23 #if defined(HAVE_MQUEUE_H)
25 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
26 #include <mqueue.h>
27 #include <sys/time.h>
28 #include <sys/resource.h>
29 #include <fcntl.h>
30 #include <limits.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <time.h>
36 /* The following is for Solaris. */
37 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
38 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
39 #endif
41 struct priv {
42 char* path;
43 mqd_t mq;
44 size_t max_sz;
45 size_t max_cnt;
48 static void destructor (struct queue* this_queue)
50 struct priv* ppriv = (struct priv*)this_queue->priv;
52 free (ppriv->path);
53 free (ppriv);
55 this_queue->vtbl = 0;
56 this_queue->priv = 0;
59 static int xopen (struct queue* this_queue, int flags)
61 struct priv* ppriv = (struct priv*)this_queue->priv;
63 if ( (mqd_t)-1 == ppriv->mq )
65 struct mq_attr attr;
66 memset(&attr, 0, sizeof(attr));
67 attr.mq_maxmsg = ppriv->max_cnt;
68 attr.mq_msgsize = ppriv->max_sz;
70 errno = 0 ;
71 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
72 if ( (mqd_t)-1 == (ppriv->mq = mq_open (ppriv->path,
73 flags | O_CREAT, 0666, &attr)) )
75 /* Try resetting the mqueue resource parameters before formally
76 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
78 struct rlimit rlim;
79 PERROR("mq_open");
80 errno = 0;
81 attr.mq_maxmsg += 1;
82 rlim.rlim_cur = RLIM_INFINITY; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
83 rlim.rlim_max = RLIM_INFINITY ; /*rlim.rlim_cur ; */
84 if ( setrlimit (RLIMIT_MSGQUEUE, &rlim) )
86 PERROR("setrlimit");
88 attr.mq_maxmsg -= 1 ;
90 if ( (mqd_t)-1 == (ppriv->mq = mq_open (ppriv->path,
91 flags | O_CREAT,
92 0666, &attr)) )
94 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
95 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
96 ppriv->path, flags | O_CREAT, attr.mq_maxmsg,
97 attr.mq_msgsize);
98 return -1;
102 return 0;
105 static int xclose (struct queue* this_queue)
107 struct priv* ppriv = (struct priv*)this_queue->priv;
109 if ( (mqd_t)-1 != ppriv->mq )
111 if ( (mq_close (ppriv->mq)) < 0 )
113 PERROR("mq_close");
114 return -1;
118 ppriv->mq = (mqd_t)-1;
120 return 0;
123 static int xread (struct queue* this_queue, void* buf,
124 size_t count, int* pending)
126 struct priv* ppriv = (struct priv*)this_queue->priv;
127 struct mq_attr attr;
128 int mq_rec_rtrn;
129 unsigned int pri;
131 if ( 0 == buf )
133 LOG_ER("queue read with NULL buf pointer\n");
134 return -1;
136 if ( (mqd_t)-1 == ppriv->mq )
138 LOG_ER("queue read with queue closed.\n");
139 return -1;
142 LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done);
144 /* use blocking reads unless we're shutting down. */
145 struct timespec time_buf = { time(NULL), arg_wakup_interval_ms * 1000000 };
146 mq_rec_rtrn = mq_timedreceive (ppriv->mq, buf, count, &pri, &time_buf);
148 if (mq_rec_rtrn < 0 )
150 LOG_PROG("errno: %d %s\n", errno, strerror(errno));
151 switch ( errno )
153 default:
154 PERROR("mq_receive");
156 /* If we've been interrupted it's likely that we're shutting
157 down, so no need to print errors. */
158 case ETIMEDOUT:
159 case EINTR:
160 return mq_rec_rtrn;
164 if ( MQ_PRIO_MAX - 1 != pri )
166 LOG_ER("queue read returned message with unrecognized "
167 "priority (pri=%d).\n", pri);
170 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn);
172 if ( mq_getattr (ppriv->mq, &attr) < 0 )
174 PERROR("mq_getattr");
175 return -1;
178 *pending = attr.mq_curmsgs;
180 return mq_rec_rtrn;
183 static int xwrite (struct queue* this_queue, const void* buf, size_t count)
185 struct priv* ppriv = (struct priv*)this_queue->priv;
187 if ( 0 == buf )
189 LOG_ER("queue write with NULL buf pointer\n");
190 return -1;
192 if ( (mqd_t)-1 == ppriv->mq )
194 LOG_ER("queue write with queue closed.\n");
195 return -1;
198 LOG_PROG("about to call mq_send().\n");
200 if ( 0 != mq_send (ppriv->mq, buf, count, MQ_PRIO_MAX-1) )
202 PERROR("mq_send");
203 return -1;
206 return 0;
209 static void* alloc (struct queue* this_queue, size_t* newcount)
211 void *data = malloc (*newcount = ((struct priv*)this_queue->priv)->max_sz);
212 memset(data, 0, *newcount);
213 return data;
216 static void dealloc (struct queue* this_queue, void* buf)
218 (void) this_queue; /* appease -Wall -Werror */
219 free (buf);
222 int queue_mqueue_ctor (struct queue* this_queue,
223 const char* path,
224 size_t max_sz,
225 size_t max_cnt)
227 static struct queue_vtbl vtbl = {
228 destructor,
229 xopen, xclose,
230 xread, xwrite,
231 alloc, dealloc
234 struct priv* ppriv;
236 this_queue->vtbl = 0;
237 this_queue->priv = 0;
239 ppriv = (struct priv*)malloc(sizeof(struct priv));
240 if ( 0 == ppriv )
242 LOG_ER("malloc failed attempting to allocate %d bytes\n",
243 sizeof(*ppriv));
244 return -1;
246 memset(ppriv, 0, sizeof(*ppriv));
248 if ( 0 == (ppriv->path = strdup(path)) )
250 LOG_ER("strdup failed attempting to dup \"%s\"\n",
251 path);
252 free(ppriv);
253 return -1;
256 ppriv->mq = (mqd_t)-1;
257 ppriv->max_sz = max_sz;
258 ppriv->max_cnt = max_cnt;
260 this_queue->vtbl = &vtbl;
261 this_queue->priv = ppriv;
263 return 0;
266 #else /* if defined(HAVE_MQUEUE_H) */
268 int queue_mqueue_ctor (struct queue* this_queue,
269 const char* path,
270 size_t max_sz,
271 size_t max_cnt)
273 this_queue->vtbl = 0;
274 this_queue->priv = 0;
275 (void)path; /* appease -Wall -Werror */
276 (void)max_sz; /* appease -Wall -Werror */
277 (void)max_cnt; /* appease -Wall -Werror */
279 return -1;
282 #endif /* HAVE_MQUEUE_H */