1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
24 #include "queue_mqueue.h"
29 #if defined(HAVE_MQUEUE_H)
31 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
34 #include <sys/resource.h>
41 /* The following is for Solaris. */
42 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
43 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
53 static void destructor (struct queue
* this_queue
)
55 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
64 static int xopen (struct queue
* this_queue
, int flags
)
66 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
68 if ( (mqd_t
)-1 == ppriv
->mq
)
71 memset(&attr
, 0, sizeof(attr
));
72 attr
.mq_maxmsg
= ppriv
->max_cnt
;
73 attr
.mq_msgsize
= ppriv
->max_sz
;
76 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
77 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
78 flags
| O_CREAT
, 0666, &attr
)) )
80 /* Try resetting the mqueue resource parameters before formally
81 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
87 rlim
.rlim_cur
= RLIM_INFINITY
; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
88 rlim
.rlim_max
= RLIM_INFINITY
; /*rlim.rlim_cur ; */
89 if ( setrlimit (RLIMIT_MSGQUEUE
, &rlim
) )
95 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
99 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
100 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
101 ppriv
->path
, flags
| O_CREAT
, attr
.mq_maxmsg
,
111 static int xclose (struct queue
* this_queue
)
113 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
115 if ( (mqd_t
)-1 != ppriv
->mq
)
117 if ( (mq_close (ppriv
->mq
)) < 0 )
124 ppriv
->mq
= (mqd_t
)-1;
129 static int xread (struct queue
* this_queue
, void* buf
,
130 size_t count
, int* pending
)
132 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
139 LOG_ER("queue read with NULL buf pointer\n");
142 if ( (mqd_t
)-1 == ppriv
->mq
)
144 LOG_ER("queue read with queue closed.\n");
148 LOG_PROG("about to call mq_receive().\n");
150 if ( (mq_rec_rtrn
= mq_receive (ppriv
->mq
, buf
, count
, &pri
)) < 0 )
155 PERROR("mq_receive");
157 /* If we've been interrupted it's likely that we're shutting
158 down, so no need to print errors. */
164 if ( MQ_PRIO_MAX
- 1 != pri
)
166 LOG_ER("queue read returned message with unrecognized "
167 "priority (pri=%d).\n", pri
);
170 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn
);
172 if ( mq_getattr (ppriv
->mq
, &attr
) < 0 )
174 PERROR("mq_getattr");
178 *pending
= attr
.mq_curmsgs
;
183 static int xwrite (struct queue
* this_queue
, const void* buf
, size_t count
)
185 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
189 LOG_ER("queue write with NULL buf pointer\n");
192 if ( (mqd_t
)-1 == ppriv
->mq
)
194 LOG_ER("queue write with queue closed.\n");
198 LOG_PROG("about to call mq_send().\n");
200 if ( 0 != mq_send (ppriv
->mq
, buf
, count
, MQ_PRIO_MAX
-1) )
209 static void* alloc (struct queue
* this_queue
, size_t* newcount
)
211 return malloc (*newcount
= ((struct priv
*)this_queue
->priv
)->max_sz
);
214 static void dealloc (struct queue
* this_queue
, void* buf
)
216 (void) this_queue
; /* appease -Wall -Werror */
220 int queue_mqueue_ctor (struct queue
* this_queue
,
225 static struct queue_vtbl vtbl
= {
234 this_queue
->vtbl
= 0;
235 this_queue
->priv
= 0;
237 ppriv
= (struct priv
*)malloc(sizeof(struct priv
));
240 LOG_ER("malloc failed attempting to allocate %d bytes\n",
244 memset(ppriv
, 0, sizeof(*ppriv
));
246 if ( 0 == (ppriv
->path
= strdup(path
)) )
248 LOG_ER("strdup failed attempting to dup \"%s\"\n",
254 ppriv
->mq
= (mqd_t
)-1;
255 ppriv
->max_sz
= max_sz
;
256 ppriv
->max_cnt
= max_cnt
;
258 this_queue
->vtbl
= &vtbl
;
259 this_queue
->priv
= ppriv
;
264 #else /* if defined(HAVE_MQUEUE_H) */
266 int queue_mqueue_ctor (struct queue
* this_queue
,
271 this_queue
->vtbl
= 0;
272 this_queue
->priv
= 0;
273 (void)path
; /* appease -Wall -Werror */
274 (void)max_sz
; /* appease -Wall -Werror */
275 (void)max_cnt
; /* appease -Wall -Werror */
280 #endif /* HAVE_MQUEUE_H */