1 /*======================================================================*
2 * Copyright (C) 2008 Light Weight Event System *
3 * All rights reserved. *
5 * This program is free software; you can redistribute it and/or modify *
6 * it under the terms of the GNU General Public License as published by *
7 * the Free Software Foundation; either version 2 of the License, or *
8 * (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software *
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, *
18 * Boston, MA 02110-1301 USA. *
19 *======================================================================*/
24 #include "queue_mqueue.h"
30 #if defined(HAVE_MQUEUE_H)
32 #include <sys/types.h> /* Must be included before mqueue.h on FreeBSD 4.9. */
35 #include <sys/resource.h>
43 /* The following is for Solaris. */
44 #if !defined(MQ_PRIO_MAX) && defined(_POSIX_MQ_PRIO_MAX)
45 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
55 static void destructor (struct queue
* this_queue
)
57 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
66 static int xopen (struct queue
* this_queue
, int flags
)
68 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
70 if ( (mqd_t
)-1 == ppriv
->mq
)
73 memset(&attr
, 0, sizeof(attr
));
74 attr
.mq_maxmsg
= ppriv
->max_cnt
;
75 attr
.mq_msgsize
= ppriv
->max_sz
;
78 /* see http://www.die.net/doc/linux/man/man7/mq_overview.7.html */
79 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
80 flags
| O_CREAT
, 0666, &attr
)) )
82 /* Try resetting the mqueue resource parameters before formally
83 * failing see http://www.die.net/doc/linux/man/man2/getrlimit.2.html
89 rlim
.rlim_cur
= RLIM_INFINITY
; /* attr.mq_maxmsg * sizeof(struct msg_msg *) + attr.mq_maxmsg * attr.mq_msgsize ; */
90 rlim
.rlim_max
= RLIM_INFINITY
; /*rlim.rlim_cur ; */
91 if ( setrlimit (RLIMIT_MSGQUEUE
, &rlim
) )
97 if ( (mqd_t
)-1 == (ppriv
->mq
= mq_open (ppriv
->path
,
101 LOG_ER("mq_open(path=\"%s\", flags=0x%04x, 0666, "
102 "{attr.mq_maxmsg=%i, attr.mq_msgsize=%i})\n",
103 ppriv
->path
, flags
| O_CREAT
, attr
.mq_maxmsg
,
113 static int xclose (struct queue
* this_queue
)
115 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
117 if ( (mqd_t
)-1 != ppriv
->mq
)
119 if ( (mq_close (ppriv
->mq
)) < 0 )
126 ppriv
->mq
= (mqd_t
)-1;
131 static int xread (struct queue
* this_queue
, void* buf
,
132 size_t count
, int* pending
)
134 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
141 LOG_ER("queue read with NULL buf pointer\n");
144 if ( (mqd_t
)-1 == ppriv
->mq
)
146 LOG_ER("queue read with queue closed.\n");
150 LOG_PROG("about to call mq_receive(). gbl_done=%d\n", gbl_done
);
152 /* use blocking reads unless we're shutting down. */
155 struct timespec time_buf
= { time(NULL
)+1, 500000000 };
156 mq_rec_rtrn
= mq_timedreceive (ppriv
->mq
, buf
, count
, &pri
, &time_buf
);
160 mq_rec_rtrn
= mq_receive (ppriv
->mq
, buf
, count
, &pri
);
163 if (mq_rec_rtrn
< 0 )
165 LOG_PROG("errno: %d %s\n", errno
, strerror(errno
));
169 PERROR("mq_receive");
171 /* If we've been interrupted it's likely that we're shutting
172 down, so no need to print errors. */
179 if ( MQ_PRIO_MAX
- 1 != pri
)
181 LOG_ER("queue read returned message with unrecognized "
182 "priority (pri=%d).\n", pri
);
185 LOG_PROG("mq_receive() returned %d.\n", mq_rec_rtrn
);
187 if ( mq_getattr (ppriv
->mq
, &attr
) < 0 )
189 PERROR("mq_getattr");
193 *pending
= attr
.mq_curmsgs
;
198 static int xwrite (struct queue
* this_queue
, const void* buf
, size_t count
)
200 struct priv
* ppriv
= (struct priv
*)this_queue
->priv
;
204 LOG_ER("queue write with NULL buf pointer\n");
207 if ( (mqd_t
)-1 == ppriv
->mq
)
209 LOG_ER("queue write with queue closed.\n");
213 LOG_PROG("about to call mq_send().\n");
215 if ( 0 != mq_send (ppriv
->mq
, buf
, count
, MQ_PRIO_MAX
-1) )
224 static void* alloc (struct queue
* this_queue
, size_t* newcount
)
226 return malloc (*newcount
= ((struct priv
*)this_queue
->priv
)->max_sz
);
229 static void dealloc (struct queue
* this_queue
, void* buf
)
231 (void) this_queue
; /* appease -Wall -Werror */
235 int queue_mqueue_ctor (struct queue
* this_queue
,
240 static struct queue_vtbl vtbl
= {
249 this_queue
->vtbl
= 0;
250 this_queue
->priv
= 0;
252 ppriv
= (struct priv
*)malloc(sizeof(struct priv
));
255 LOG_ER("malloc failed attempting to allocate %d bytes\n",
259 memset(ppriv
, 0, sizeof(*ppriv
));
261 if ( 0 == (ppriv
->path
= strdup(path
)) )
263 LOG_ER("strdup failed attempting to dup \"%s\"\n",
269 ppriv
->mq
= (mqd_t
)-1;
270 ppriv
->max_sz
= max_sz
;
271 ppriv
->max_cnt
= max_cnt
;
273 this_queue
->vtbl
= &vtbl
;
274 this_queue
->priv
= ppriv
;
279 #else /* if defined(HAVE_MQUEUE_H) */
281 int queue_mqueue_ctor (struct queue
* this_queue
,
286 this_queue
->vtbl
= 0;
287 this_queue
->priv
= 0;
288 (void)path
; /* appease -Wall -Werror */
289 (void)max_sz
; /* appease -Wall -Werror */
290 (void)max_cnt
; /* appease -Wall -Werror */
295 #endif /* HAVE_MQUEUE_H */