1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
24 #include "queue_msg.h"
35 #include <sys/types.h>
48 #define MSG_TYPE 1 /* Must be positive non-zero. */
51 long mtype
; /* Type of received/sent message. */
52 char mtext
[1]; /* Text of the message. */
56 static void destructor (struct queue
* this_queue
)
58 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
60 if ( -1 != ppriv
->mq
)
62 msgctl (ppriv
->mq
, IPC_RMID
, NULL
); /* We ignore errors at this point. */
72 /* This function will increase the maximum message size limit on Linux
73 * systems. It will silently do nothing on other systems.
76 static void set_max_msg_size (size_t max_sz
)
80 FILE* fp
= fopen("/proc/sys/kernel/msgmax", "r+");
86 fscanf (fp
, "%ld", &old_max
);
88 if ( max_sz
> old_max
) {
90 fprintf (fp
, "%ld", max_sz
);
96 static int xopen (struct queue
* this_queue
, int flags
)
98 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
102 if ( -1 != ppriv
->mq
)
104 LOG_ER("Queue already open.\n");
108 /* Get a handle to our message queue. */
109 mq
= msgget (ppriv
->key
, 0666 | IPC_CREAT
);
116 /* Set the queue size in bytes. */
117 set_max_msg_size (ppriv
->max_sz
);
119 memset (&ds
, 0, sizeof(ds
));
120 ds
.msg_qbytes
= ppriv
->max_sz
* ppriv
->max_cnt
;
121 if ( msgctl (mq
, IPC_SET
, &ds
) )
123 if ( EPERM
== errno
)
125 LOG_ER("Permission denied attempting to set the queue "
126 "size using msgctl() -- you may need to be root.\n");
136 ppriv
->flags
= flags
;
141 static int xclose (struct queue
* this_queue
)
143 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
145 ppriv
->mq
= -1; /* No additional close operation required. */
149 static int xread (struct queue
* this_queue
, void* buf
,
150 size_t count
, int* pending
)
152 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
154 struct local_msgbuf
* mp
;
158 if ( O_WRONLY
== ppriv
->flags
)
160 LOG_ER("Queue read attempted without appropriate permission.\n");
165 LOG_ER("Queue read with NULL buf pointer\n");
168 if ( -1 == ppriv
->mq
)
170 LOG_ER("Queue read with queue closed.\n");
173 if ( count
> ppriv
->max_sz
)
175 LOG_ER("Queue read with count (%d) > max_sz (%d).\n",
176 count
, ppriv
->max_sz
);
180 mp
= (struct local_msgbuf
*)(((char*)buf
) - sizeof(mp
->mtype
));
183 if ( (msgrcv_ret
= msgrcv(ppriv
->mq
, mp
, count
, MSG_TYPE
, 0)) < 0 )
190 /* If we've been interrupted or if the message queue was
191 * removed, it's likely that we're shutting down, so no need to
199 if ( MSG_TYPE
!= mp
->mtype
)
201 LOG_WARN("Unrecognized message type %d.\n", mp
->mtype
);
204 if ( msgctl(ppriv
->mq
, IPC_STAT
, &ds
) )
206 PERROR("msgctl(ppriv->mq, IPC_GET, &ds)");
210 *pending
= ds
.msg_qnum
;
215 static int xwrite (struct queue
* this_queue
, const void* buf
, size_t count
)
218 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
220 struct local_msgbuf
* mp
;
223 if ( O_RDONLY
== ppriv
->flags
)
225 LOG_ER("Queue write attempted without appropriate permission.\n");
230 LOG_ER("Queue write with NULL buf pointer.\n");
233 if ( -1 == ppriv
->mq
)
235 LOG_ER("Queue write with queue closed.\n");
238 if ( count
> ppriv
->max_sz
)
240 LOG_ER("Queue write with count (%d) > max_sz (%d).\n",
241 count
, ppriv
->max_sz
);
245 mp
= (struct local_msgbuf
*)(((char*)buf
) - sizeof(mp
->mtype
));
247 mp
->mtype
= MSG_TYPE
; /* Must be positive non-zero. */
249 while ( ((msgsnd_ret
= msgsnd(ppriv
->mq
, mp
, count
, IPC_NOWAIT
)) < 0)
252 /* We'll try msgsnd a few times, since with IPC_NOWAIT we might
253 * get a transient error if the queue is full.
259 LOG_ER("Queue full, will now retry msgsnd.\n");
271 static void* alloc (struct queue
* this_queue
, size_t* newcount
)
273 struct local_msgbuf
* mp
;
274 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
275 char* ret
= (char*)malloc(ppriv
->max_sz
+ sizeof(mp
->mtype
));
279 *newcount
= ppriv
->max_sz
;
280 return ret
+ sizeof(mp
->mtype
);
286 static void dealloc (struct queue
* this_queue
, void* buf
)
288 struct local_msgbuf
* mp
=
289 (struct local_msgbuf
*)((char*)buf
- sizeof(mp
->mtype
));
290 (void)this_queue
; /* appease -Wall -Werror */
294 int queue_msg_ctor (struct queue
* this_queue
,
299 static struct queue_vtbl vtbl
= {
308 this_queue
->vtbl
= 0;
309 this_queue
->priv
= 0;
311 ppriv
= (struct priv
*)malloc (sizeof(struct priv
));
314 LOG_ER("Failed to allocate %d bytes for queue data.\n", sizeof(*ppriv
));
317 memset(ppriv
, 0, sizeof(*ppriv
));
319 if ( 0 == (ppriv
->path
= strdup(path
)) )
321 LOG_ER("Strdup failed attempting to dup \"%s\".\n", path
);
326 /* We try to convert the path into an appropriate numeric key to
327 identify this queue. */
328 if ( sscanf(path
, "%d", (int*)&ppriv
->key
) != 1 )
330 /* A default key is provided. */
335 ppriv
->max_sz
= max_sz
;
336 ppriv
->max_cnt
= max_cnt
;
338 this_queue
->vtbl
= &vtbl
;
339 this_queue
->priv
= ppriv
;
344 #else /* if HAVE_SYS_MSG_H */
346 int queue_msg_ctor (struct queue
* this_queue
,
351 this_queue
->vtbl
= 0;
352 this_queue
->priv
= 0;
356 #endif /* HAVE_SYS_MSG_H */