Do not serialize destructor calls.
[apr-util.git] / misc / apr_queue.c
blob8636a824a622856b2ee905d253f889e05f7b0f3f
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "apr.h"
19 #if APR_HAVE_STDIO_H
20 #include <stdio.h>
21 #endif
22 #if APR_HAVE_STDLIB_H
23 #include <stdlib.h>
24 #endif
25 #if APR_HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
29 #include "apu.h"
30 #include "apr_portable.h"
31 #include "apr_thread_mutex.h"
32 #include "apr_thread_cond.h"
33 #include "apr_errno.h"
34 #include "apr_queue.h"
36 #if APR_HAS_THREADS
37 /*
38 * define this to get debug messages
40 #define QUEUE_DEBUG
43 struct apr_queue_t {
44 void **data;
45 unsigned int nelts; /**< # elements */
46 unsigned int in; /**< next empty location */
47 unsigned int out; /**< next filled location */
48 unsigned int bounds;/**< max size of queue */
49 unsigned int full_waiters;
50 unsigned int empty_waiters;
51 apr_thread_mutex_t *one_big_mutex;
52 apr_thread_cond_t *not_empty;
53 apr_thread_cond_t *not_full;
54 int terminated;
57 #ifdef QUEUE_DEBUG
58 static void Q_DBG(char*msg, apr_queue_t *q) {
59 fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
60 apr_os_thread_current(),
61 q->nelts, q->in, q->out,
62 msg
65 #else
66 #define Q_DBG(x,y)
67 #endif
69 /**
70 * Detects when the apr_queue_t is full. This utility function is expected
71 * to be called from within critical sections, and is not threadsafe.
73 #define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
75 /**
76 * Detects when the apr_queue_t is empty. This utility function is expected
77 * to be called from within critical sections, and is not threadsafe.
79 #define apr_queue_empty(queue) ((queue)->nelts == 0)
81 /**
82 * Callback routine that is called to destroy this
83 * apr_queue_t when its pool is destroyed.
85 static apr_status_t queue_destroy(void *data)
87 apr_queue_t *queue = data;
89 /* Ignore errors here, we can't do anything about them anyway. */
91 apr_thread_cond_destroy(queue->not_empty);
92 apr_thread_cond_destroy(queue->not_full);
93 apr_thread_mutex_destroy(queue->one_big_mutex);
95 return APR_SUCCESS;
98 /**
99 * Initialize the apr_queue_t.
101 APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
102 unsigned int queue_capacity,
103 apr_pool_t *a)
105 apr_status_t rv;
106 apr_queue_t *queue;
107 queue = apr_palloc(a, sizeof(apr_queue_t));
108 *q = queue;
110 /* nested doesn't work ;( */
111 rv = apr_thread_mutex_create(&queue->one_big_mutex,
112 APR_THREAD_MUTEX_UNNESTED,
114 if (rv != APR_SUCCESS) {
115 return rv;
118 rv = apr_thread_cond_create(&queue->not_empty, a);
119 if (rv != APR_SUCCESS) {
120 return rv;
123 rv = apr_thread_cond_create(&queue->not_full, a);
124 if (rv != APR_SUCCESS) {
125 return rv;
128 /* Set all the data in the queue to NULL */
129 queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130 queue->bounds = queue_capacity;
131 queue->nelts = 0;
132 queue->in = 0;
133 queue->out = 0;
134 queue->terminated = 0;
135 queue->full_waiters = 0;
136 queue->empty_waiters = 0;
138 apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
140 return APR_SUCCESS;
144 * Push new data onto the queue. Blocks if the queue is full. Once
145 * the push operation has completed, it signals other threads waiting
146 * in apr_queue_pop() that they may continue consuming sockets.
148 APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
150 apr_status_t rv;
152 if (queue->terminated) {
153 return APR_EOF; /* no more elements ever again */
156 rv = apr_thread_mutex_lock(queue->one_big_mutex);
157 if (rv != APR_SUCCESS) {
158 return rv;
161 if (apr_queue_full(queue)) {
162 if (!queue->terminated) {
163 queue->full_waiters++;
164 rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165 queue->full_waiters--;
166 if (rv != APR_SUCCESS) {
167 apr_thread_mutex_unlock(queue->one_big_mutex);
168 return rv;
171 /* If we wake up and it's still empty, then we were interrupted */
172 if (apr_queue_full(queue)) {
173 Q_DBG("queue full (intr)", queue);
174 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175 if (rv != APR_SUCCESS) {
176 return rv;
178 if (queue->terminated) {
179 return APR_EOF; /* no more elements ever again */
181 else {
182 return APR_EINTR;
187 queue->data[queue->in] = data;
188 queue->in = (queue->in + 1) % queue->bounds;
189 queue->nelts++;
191 if (queue->empty_waiters) {
192 Q_DBG("sig !empty", queue);
193 rv = apr_thread_cond_signal(queue->not_empty);
194 if (rv != APR_SUCCESS) {
195 apr_thread_mutex_unlock(queue->one_big_mutex);
196 return rv;
200 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
201 return rv;
205 * Push new data onto the queue. Blocks if the queue is full. Once
206 * the push operation has completed, it signals other threads waiting
207 * in apr_queue_pop() that they may continue consuming sockets.
209 APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
211 apr_status_t rv;
213 if (queue->terminated) {
214 return APR_EOF; /* no more elements ever again */
217 rv = apr_thread_mutex_lock(queue->one_big_mutex);
218 if (rv != APR_SUCCESS) {
219 return rv;
222 if (apr_queue_full(queue)) {
223 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
224 return APR_EAGAIN;
227 queue->data[queue->in] = data;
228 queue->in = (queue->in + 1) % queue->bounds;
229 queue->nelts++;
231 if (queue->empty_waiters) {
232 Q_DBG("sig !empty", queue);
233 rv = apr_thread_cond_signal(queue->not_empty);
234 if (rv != APR_SUCCESS) {
235 apr_thread_mutex_unlock(queue->one_big_mutex);
236 return rv;
240 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
241 return rv;
245 * not thread safe
247 APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
248 return queue->nelts;
252 * Retrieves the next item from the queue. If there are no
253 * items available, it will block until one becomes available.
254 * Once retrieved, the item is placed into the address specified by
255 * 'data'.
257 APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
259 apr_status_t rv;
261 if (queue->terminated) {
262 return APR_EOF; /* no more elements ever again */
265 rv = apr_thread_mutex_lock(queue->one_big_mutex);
266 if (rv != APR_SUCCESS) {
267 return rv;
270 /* Keep waiting until we wake up and find that the queue is not empty. */
271 if (apr_queue_empty(queue)) {
272 if (!queue->terminated) {
273 queue->empty_waiters++;
274 rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
275 queue->empty_waiters--;
276 if (rv != APR_SUCCESS) {
277 apr_thread_mutex_unlock(queue->one_big_mutex);
278 return rv;
281 /* If we wake up and it's still empty, then we were interrupted */
282 if (apr_queue_empty(queue)) {
283 Q_DBG("queue empty (intr)", queue);
284 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
285 if (rv != APR_SUCCESS) {
286 return rv;
288 if (queue->terminated) {
289 return APR_EOF; /* no more elements ever again */
291 else {
292 return APR_EINTR;
297 *data = queue->data[queue->out];
298 queue->nelts--;
300 queue->out = (queue->out + 1) % queue->bounds;
301 if (queue->full_waiters) {
302 Q_DBG("signal !full", queue);
303 rv = apr_thread_cond_signal(queue->not_full);
304 if (rv != APR_SUCCESS) {
305 apr_thread_mutex_unlock(queue->one_big_mutex);
306 return rv;
310 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
311 return rv;
315 * Retrieves the next item from the queue. If there are no
316 * items available, return APR_EAGAIN. Once retrieved,
317 * the item is placed into the address specified by 'data'.
319 APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
321 apr_status_t rv;
323 if (queue->terminated) {
324 return APR_EOF; /* no more elements ever again */
327 rv = apr_thread_mutex_lock(queue->one_big_mutex);
328 if (rv != APR_SUCCESS) {
329 return rv;
332 if (apr_queue_empty(queue)) {
333 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
334 return APR_EAGAIN;
337 *data = queue->data[queue->out];
338 queue->nelts--;
340 queue->out = (queue->out + 1) % queue->bounds;
341 if (queue->full_waiters) {
342 Q_DBG("signal !full", queue);
343 rv = apr_thread_cond_signal(queue->not_full);
344 if (rv != APR_SUCCESS) {
345 apr_thread_mutex_unlock(queue->one_big_mutex);
346 return rv;
350 rv = apr_thread_mutex_unlock(queue->one_big_mutex);
351 return rv;
354 APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
356 apr_status_t rv;
357 Q_DBG("intr all", queue);
358 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
359 return rv;
361 apr_thread_cond_broadcast(queue->not_empty);
362 apr_thread_cond_broadcast(queue->not_full);
364 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
365 return rv;
368 return APR_SUCCESS;
371 APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
373 apr_status_t rv;
375 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
376 return rv;
379 /* we must hold one_big_mutex when setting this... otherwise,
380 * we could end up setting it and waking everybody up just after a
381 * would-be popper checks it but right before they block
383 queue->terminated = 1;
384 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
385 return rv;
387 return apr_queue_interrupt_all(queue);
390 #endif /* APR_HAS_THREADS */