1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
30 #include "apr_portable.h"
31 #include "apr_thread_mutex.h"
32 #include "apr_thread_cond.h"
33 #include "apr_errno.h"
34 #include "apr_queue.h"
38 * define this to get debug messages
45 unsigned int nelts
; /**< # elements */
46 unsigned int in
; /**< next empty location */
47 unsigned int out
; /**< next filled location */
48 unsigned int bounds
;/**< max size of queue */
49 unsigned int full_waiters
;
50 unsigned int empty_waiters
;
51 apr_thread_mutex_t
*one_big_mutex
;
52 apr_thread_cond_t
*not_empty
;
53 apr_thread_cond_t
*not_full
;
58 static void Q_DBG(char*msg
, apr_queue_t
*q
) {
59 fprintf(stderr
, "%ld\t#%d in %d out %d\t%s\n",
60 apr_os_thread_current(),
61 q
->nelts
, q
->in
, q
->out
,
70 * Detects when the apr_queue_t is full. This utility function is expected
71 * to be called from within critical sections, and is not threadsafe.
73 #define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
76 * Detects when the apr_queue_t is empty. This utility function is expected
77 * to be called from within critical sections, and is not threadsafe.
79 #define apr_queue_empty(queue) ((queue)->nelts == 0)
82 * Callback routine that is called to destroy this
83 * apr_queue_t when its pool is destroyed.
85 static apr_status_t
queue_destroy(void *data
)
87 apr_queue_t
*queue
= data
;
89 /* Ignore errors here, we can't do anything about them anyway. */
91 apr_thread_cond_destroy(queue
->not_empty
);
92 apr_thread_cond_destroy(queue
->not_full
);
93 apr_thread_mutex_destroy(queue
->one_big_mutex
);
99 * Initialize the apr_queue_t.
101 APU_DECLARE(apr_status_t
) apr_queue_create(apr_queue_t
**q
,
102 unsigned int queue_capacity
,
107 queue
= apr_palloc(a
, sizeof(apr_queue_t
));
110 /* nested doesn't work ;( */
111 rv
= apr_thread_mutex_create(&queue
->one_big_mutex
,
112 APR_THREAD_MUTEX_UNNESTED
,
114 if (rv
!= APR_SUCCESS
) {
118 rv
= apr_thread_cond_create(&queue
->not_empty
, a
);
119 if (rv
!= APR_SUCCESS
) {
123 rv
= apr_thread_cond_create(&queue
->not_full
, a
);
124 if (rv
!= APR_SUCCESS
) {
128 /* Set all the data in the queue to NULL */
129 queue
->data
= apr_pcalloc(a
, queue_capacity
* sizeof(void*));
130 queue
->bounds
= queue_capacity
;
134 queue
->terminated
= 0;
135 queue
->full_waiters
= 0;
136 queue
->empty_waiters
= 0;
138 apr_pool_cleanup_register(a
, queue
, queue_destroy
, apr_pool_cleanup_null
);
144 * Push new data onto the queue. Blocks if the queue is full. Once
145 * the push operation has completed, it signals other threads waiting
146 * in apr_queue_pop() that they may continue consuming sockets.
148 APU_DECLARE(apr_status_t
) apr_queue_push(apr_queue_t
*queue
, void *data
)
152 if (queue
->terminated
) {
153 return APR_EOF
; /* no more elements ever again */
156 rv
= apr_thread_mutex_lock(queue
->one_big_mutex
);
157 if (rv
!= APR_SUCCESS
) {
161 if (apr_queue_full(queue
)) {
162 if (!queue
->terminated
) {
163 queue
->full_waiters
++;
164 rv
= apr_thread_cond_wait(queue
->not_full
, queue
->one_big_mutex
);
165 queue
->full_waiters
--;
166 if (rv
!= APR_SUCCESS
) {
167 apr_thread_mutex_unlock(queue
->one_big_mutex
);
171 /* If we wake up and it's still empty, then we were interrupted */
172 if (apr_queue_full(queue
)) {
173 Q_DBG("queue full (intr)", queue
);
174 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
175 if (rv
!= APR_SUCCESS
) {
178 if (queue
->terminated
) {
179 return APR_EOF
; /* no more elements ever again */
187 queue
->data
[queue
->in
] = data
;
188 queue
->in
= (queue
->in
+ 1) % queue
->bounds
;
191 if (queue
->empty_waiters
) {
192 Q_DBG("sig !empty", queue
);
193 rv
= apr_thread_cond_signal(queue
->not_empty
);
194 if (rv
!= APR_SUCCESS
) {
195 apr_thread_mutex_unlock(queue
->one_big_mutex
);
200 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
205 * Push new data onto the queue. Blocks if the queue is full. Once
206 * the push operation has completed, it signals other threads waiting
207 * in apr_queue_pop() that they may continue consuming sockets.
209 APU_DECLARE(apr_status_t
) apr_queue_trypush(apr_queue_t
*queue
, void *data
)
213 if (queue
->terminated
) {
214 return APR_EOF
; /* no more elements ever again */
217 rv
= apr_thread_mutex_lock(queue
->one_big_mutex
);
218 if (rv
!= APR_SUCCESS
) {
222 if (apr_queue_full(queue
)) {
223 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
227 queue
->data
[queue
->in
] = data
;
228 queue
->in
= (queue
->in
+ 1) % queue
->bounds
;
231 if (queue
->empty_waiters
) {
232 Q_DBG("sig !empty", queue
);
233 rv
= apr_thread_cond_signal(queue
->not_empty
);
234 if (rv
!= APR_SUCCESS
) {
235 apr_thread_mutex_unlock(queue
->one_big_mutex
);
240 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
247 APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t
*queue
) {
252 * Retrieves the next item from the queue. If there are no
253 * items available, it will block until one becomes available.
254 * Once retrieved, the item is placed into the address specified by
257 APU_DECLARE(apr_status_t
) apr_queue_pop(apr_queue_t
*queue
, void **data
)
261 if (queue
->terminated
) {
262 return APR_EOF
; /* no more elements ever again */
265 rv
= apr_thread_mutex_lock(queue
->one_big_mutex
);
266 if (rv
!= APR_SUCCESS
) {
270 /* Keep waiting until we wake up and find that the queue is not empty. */
271 if (apr_queue_empty(queue
)) {
272 if (!queue
->terminated
) {
273 queue
->empty_waiters
++;
274 rv
= apr_thread_cond_wait(queue
->not_empty
, queue
->one_big_mutex
);
275 queue
->empty_waiters
--;
276 if (rv
!= APR_SUCCESS
) {
277 apr_thread_mutex_unlock(queue
->one_big_mutex
);
281 /* If we wake up and it's still empty, then we were interrupted */
282 if (apr_queue_empty(queue
)) {
283 Q_DBG("queue empty (intr)", queue
);
284 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
285 if (rv
!= APR_SUCCESS
) {
288 if (queue
->terminated
) {
289 return APR_EOF
; /* no more elements ever again */
297 *data
= queue
->data
[queue
->out
];
300 queue
->out
= (queue
->out
+ 1) % queue
->bounds
;
301 if (queue
->full_waiters
) {
302 Q_DBG("signal !full", queue
);
303 rv
= apr_thread_cond_signal(queue
->not_full
);
304 if (rv
!= APR_SUCCESS
) {
305 apr_thread_mutex_unlock(queue
->one_big_mutex
);
310 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
315 * Retrieves the next item from the queue. If there are no
316 * items available, return APR_EAGAIN. Once retrieved,
317 * the item is placed into the address specified by 'data'.
319 APU_DECLARE(apr_status_t
) apr_queue_trypop(apr_queue_t
*queue
, void **data
)
323 if (queue
->terminated
) {
324 return APR_EOF
; /* no more elements ever again */
327 rv
= apr_thread_mutex_lock(queue
->one_big_mutex
);
328 if (rv
!= APR_SUCCESS
) {
332 if (apr_queue_empty(queue
)) {
333 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
337 *data
= queue
->data
[queue
->out
];
340 queue
->out
= (queue
->out
+ 1) % queue
->bounds
;
341 if (queue
->full_waiters
) {
342 Q_DBG("signal !full", queue
);
343 rv
= apr_thread_cond_signal(queue
->not_full
);
344 if (rv
!= APR_SUCCESS
) {
345 apr_thread_mutex_unlock(queue
->one_big_mutex
);
350 rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
);
354 APU_DECLARE(apr_status_t
) apr_queue_interrupt_all(apr_queue_t
*queue
)
357 Q_DBG("intr all", queue
);
358 if ((rv
= apr_thread_mutex_lock(queue
->one_big_mutex
)) != APR_SUCCESS
) {
361 apr_thread_cond_broadcast(queue
->not_empty
);
362 apr_thread_cond_broadcast(queue
->not_full
);
364 if ((rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
)) != APR_SUCCESS
) {
371 APU_DECLARE(apr_status_t
) apr_queue_term(apr_queue_t
*queue
)
375 if ((rv
= apr_thread_mutex_lock(queue
->one_big_mutex
)) != APR_SUCCESS
) {
379 /* we must hold one_big_mutex when setting this... otherwise,
380 * we could end up setting it and waking everybody up just after a
381 * would-be popper checks it but right before they block
383 queue
->terminated
= 1;
384 if ((rv
= apr_thread_mutex_unlock(queue
->one_big_mutex
)) != APR_SUCCESS
) {
387 return apr_queue_interrupt_all(queue
);
390 #endif /* APR_HAS_THREADS */