Must use (void) instead of () for function declarations.
[lwes-journaller.git] / src / queue_msg.c
blob97c6d234049dd21ca41669b2c5c6e11ddee5f25e
1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
4 * *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
14 * *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
21 #include "config.h"
23 #include "queue.h"
24 #include "queue_msg.h"
26 #include "perror.h"
28 #if HAVE_SYS_MSG_H
30 #include <fcntl.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
35 #include <sys/types.h>
36 #include <sys/ipc.h>
37 #include <sys/msg.h>
39 struct priv {
40 char* path;
41 key_t key;
42 int mq;
43 size_t max_sz;
44 size_t max_cnt;
45 int flags;
48 #define MSG_TYPE 1 /* Must be positive non-zero. */
50 struct local_msgbuf {
51 long mtype; /* Type of received/sent message. */
52 char mtext[1]; /* Text of the message. */
56 static void destructor (struct queue* this_queue)
58 struct priv* ppriv = (struct priv*)this_queue->priv;
60 if ( -1 != ppriv->mq )
62 msgctl (ppriv->mq, IPC_RMID, NULL); /* We ignore errors at this point. */
65 free (ppriv->path);
66 free (ppriv);
68 this_queue->vtbl = 0;
69 this_queue->priv = 0;
72 /* This function will increase the maximum message size limit on Linux
73 * systems. It will silently do nothing on other systems.
76 static void set_max_msg_size (size_t max_sz)
78 size_t old_max = 0;
80 FILE* fp = fopen("/proc/sys/kernel/msgmax", "r+");
81 if ( NULL == fp )
83 return;
86 fscanf (fp, "%ld", &old_max);
88 if ( max_sz > old_max ) {
89 rewind (fp);
90 fprintf (fp, "%ld", max_sz);
93 fclose (fp);
96 static int xopen (struct queue* this_queue, int flags)
98 struct priv* ppriv = (struct priv*)this_queue->priv;
99 struct msqid_ds ds;
100 int mq;
102 if ( -1 != ppriv->mq )
104 LOG_ER("Queue already open.\n");
105 return -1;
108 /* Get a handle to our message queue. */
109 mq = msgget (ppriv->key, 0666 | IPC_CREAT);
110 if ( -1 == mq )
112 PERROR("msgget()");
113 return -1;
116 /* Set the queue size in bytes. */
117 set_max_msg_size (ppriv->max_sz);
119 memset (&ds, 0, sizeof(ds));
120 ds.msg_qbytes = ppriv->max_sz * ppriv->max_cnt;
121 if ( msgctl (mq, IPC_SET, &ds) )
123 if ( EPERM == errno )
125 LOG_ER("Permission denied attempting to set the queue "
126 "size using msgctl() -- you may need to be root.\n");
128 else
130 PERROR("msgctl()");
131 return -1;
135 ppriv->mq = mq;
136 ppriv->flags = flags;
138 return 0;
141 static int xclose (struct queue* this_queue)
143 struct priv* ppriv = (struct priv*)this_queue->priv;
145 ppriv->mq = -1; /* No additional close operation required. */
146 return 0;
149 static int xread (struct queue* this_queue, void* buf,
150 size_t count, int* pending)
152 struct priv* ppriv = (struct priv*)this_queue->priv;
154 struct local_msgbuf* mp;
155 struct msqid_ds ds;
156 int msgrcv_ret;
158 if ( O_WRONLY == ppriv->flags )
160 LOG_ER("Queue read attempted without appropriate permission.\n");
161 return -1;
163 if ( 0 == buf )
165 LOG_ER("Queue read with NULL buf pointer\n");
166 return -1;
168 if ( -1 == ppriv->mq )
170 LOG_ER("Queue read with queue closed.\n");
171 return -1;
173 if ( count > ppriv->max_sz )
175 LOG_ER("Queue read with count (%d) > max_sz (%d).\n",
176 count, ppriv->max_sz);
177 return -1;
180 mp = (struct local_msgbuf*)(((char*)buf) - sizeof(mp->mtype));
182 /* Receive bytes. */
183 if ( (msgrcv_ret = msgrcv(ppriv->mq, mp, count, MSG_TYPE, 0)) < 0 )
185 switch ( errno )
187 default:
188 PERROR("msgrcv");
190 /* If we've been interrupted or if the message queue was
191 * removed, it's likely that we're shutting down, so no need to
192 * print errors. */
193 case EIDRM:
194 case EINTR:
195 return msgrcv_ret;
199 if ( MSG_TYPE != mp->mtype )
201 LOG_WARN("Unrecognized message type %d.\n", mp->mtype);
204 if ( msgctl(ppriv->mq, IPC_STAT, &ds) )
206 PERROR("msgctl(ppriv->mq, IPC_GET, &ds)");
207 return -1;
210 *pending = ds.msg_qnum;
212 return msgrcv_ret;
215 static int xwrite (struct queue* this_queue, const void* buf, size_t count)
217 int retry = 99;
218 struct priv* ppriv = (struct priv*)this_queue->priv;
220 struct local_msgbuf* mp;
221 int msgsnd_ret;
223 if ( O_RDONLY == ppriv->flags )
225 LOG_ER("Queue write attempted without appropriate permission.\n");
226 return -1;
228 if ( 0 == buf )
230 LOG_ER("Queue write with NULL buf pointer.\n");
231 return -1;
233 if ( -1 == ppriv->mq )
235 LOG_ER("Queue write with queue closed.\n");
236 return -1;
238 if ( count > ppriv->max_sz )
240 LOG_ER("Queue write with count (%d) > max_sz (%d).\n",
241 count, ppriv->max_sz);
242 return -1;
245 mp = (struct local_msgbuf*)(((char*)buf) - sizeof(mp->mtype));
247 mp->mtype = MSG_TYPE; /* Must be positive non-zero. */
249 while ( ((msgsnd_ret = msgsnd(ppriv->mq, mp, count, IPC_NOWAIT)) < 0)
250 && retry )
252 /* We'll try msgsnd a few times, since with IPC_NOWAIT we might
253 * get a transient error if the queue is full.
255 switch (errno )
257 case EAGAIN:
258 --retry;
259 LOG_ER("Queue full, will now retry msgsnd.\n");
260 break;
262 default:
263 PERROR("msgsnd");
264 return msgsnd_ret;
268 return msgsnd_ret;
271 static void* alloc (struct queue* this_queue, size_t* newcount)
273 struct local_msgbuf* mp;
274 struct priv* ppriv = (struct priv*)this_queue->priv;
275 char* ret = (char*)malloc(ppriv->max_sz + sizeof(mp->mtype));
277 if ( ret )
279 *newcount = ppriv->max_sz;
280 return ret + sizeof(mp->mtype);
283 return 0;
286 static void dealloc (struct queue* this_queue, void* buf)
288 struct local_msgbuf* mp =
289 (struct local_msgbuf*)((char*)buf - sizeof(mp->mtype));
290 (void)this_queue; /* appease -Wall -Werror */
291 free(mp);
294 int queue_msg_ctor (struct queue* this_queue,
295 const char* path,
296 size_t max_sz,
297 size_t max_cnt)
299 static struct queue_vtbl vtbl = {
300 destructor,
301 xopen, xclose,
302 xread, xwrite,
303 alloc, dealloc
306 struct priv* ppriv;
308 this_queue->vtbl = 0;
309 this_queue->priv = 0;
311 ppriv = (struct priv*)malloc (sizeof(struct priv));
312 if ( 0 == ppriv )
314 LOG_ER("Failed to allocate %d bytes for queue data.\n", sizeof(*ppriv));
315 return -1;
317 memset(ppriv, 0, sizeof(*ppriv));
319 if ( 0 == (ppriv->path = strdup(path)) )
321 LOG_ER("Strdup failed attempting to dup \"%s\".\n", path);
322 free(ppriv);
323 return -1;
326 /* We try to convert the path into an appropriate numeric key to
327 identify this queue. */
328 if ( sscanf(path, "%d", (int*)&ppriv->key) != 1 )
330 /* A default key is provided. */
331 ppriv->key = 1;
334 ppriv->mq = -1;
335 ppriv->max_sz = max_sz;
336 ppriv->max_cnt = max_cnt;
338 this_queue->vtbl = &vtbl;
339 this_queue->priv = ppriv;
341 return 0;
344 #else /* if HAVE_SYS_MSG_H */
346 int queue_msg_ctor (struct queue* this_queue,
347 const char* path,
348 size_t max_sz,
349 size_t max_cnt)
351 this_queue->vtbl = 0;
352 this_queue->priv = 0;
353 return -1;
356 #endif /* HAVE_SYS_MSG_H */