3 /// FIFO queue of Data objects
7 Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "dataqueue.h"
23 #include "scoped_lock.h"
29 //////////////////////////////////////////////////////////////////////////////
32 DataQueue::DataQueue()
34 pthread_mutex_init(&m_waitMutex
, NULL
);
35 pthread_cond_init(&m_waitCond
, NULL
);
37 pthread_mutex_init(&m_accessMutex
, NULL
);
40 DataQueue::~DataQueue()
42 scoped_lock
lock(m_accessMutex
); // FIXME - is this sane?
44 while( m_queue
.size() ) {
45 delete m_queue
.front();
53 /// Pushes data into the end of the queue.
55 /// The queue owns this pointer as soon as the function is
56 /// called. In the case of an exception, it will be freed.
57 /// Performs a thread broadcast once new data has been added.
59 void DataQueue::push(Data
*data
)
64 scoped_lock
lock(m_accessMutex
);
68 scoped_lock
wait(m_waitMutex
);
69 pthread_cond_broadcast(&m_waitCond
);
81 /// Pops the next element off the front of the queue.
83 /// Returns 0 if empty.
84 /// The queue no longer owns this pointer upon return.
86 Data
* DataQueue::pop()
88 scoped_lock
lock(m_accessMutex
);
90 if( m_queue
.size() == 0 )
93 Data
*ret
= m_queue
.front();
101 /// Pops the next element off the front of the queue, and
102 /// waits until one exists if empty. If still no data
103 /// on timeout, returns null.
104 /// (unlock the access mutex while waiting!)
106 /// Timeout specified in milliseconds. Default is wait forever.
108 Data
* DataQueue::wait_pop(int timeout
)
112 // check if something's there already
114 scoped_lock
access(m_accessMutex
);
115 if( m_queue
.size() ) {
116 ret
= m_queue
.front();
122 // nothing there, so wait...
124 if( timeout
== -1 ) {
129 scoped_lock
wait(m_waitMutex
);
130 pthread_cond_wait(&m_waitCond
, &m_waitMutex
);
134 scoped_lock
access(m_accessMutex
);
135 size
= m_queue
.size();
137 // already have the lock, return now
138 ret
= m_queue
.front();
143 } while( size
== 0 );
146 // timeout in conditional wait
148 scoped_lock
wait(m_waitMutex
);
149 pthread_cond_timedwait(&m_waitCond
, &m_waitMutex
,
150 ThreadTimeout(timeout
, &to
));
153 scoped_lock
access(m_accessMutex
);
154 if( m_queue
.size() == 0 )
157 ret
= m_queue
.front();
165 /// Pops all data from other and appends it to this.
167 /// After calling this function, other will be empty, and
168 /// this will contain all its data.
170 /// In the case of an exception, any uncopied data will
173 /// This is a locking optimization, so all copying can happen
174 /// inside one lock, instead of locking for each copy.
176 void DataQueue::append_from(DataQueue
&other
)
178 scoped_lock
us(m_accessMutex
);
179 scoped_lock
them(other
.m_accessMutex
);
181 while( other
.m_queue
.size() ) {
182 m_queue
.push( other
.m_queue
.front() );
184 // only pop after the copy, since in the
185 // case of an exception we want to leave other intact
193 /// Returns true if the queue is empty.
195 bool DataQueue::empty() const
197 scoped_lock
access(m_accessMutex
);
198 return m_queue
.size() == 0;
204 /// Returns number of items in the queue.
206 size_t DataQueue::size() const
208 scoped_lock
access(m_accessMutex
);
209 return m_queue
.size();
215 #ifdef __DQ_TEST_MODE__
220 using namespace Barry
;
222 void *WriteThread(void *userdata
)
224 DataQueue
*dq
= (DataQueue
*) userdata
;
226 dq
->push( new Data
);
227 dq
->push( new Data
);
229 dq
->push( new Data
);
234 void *ReadThread(void *userdata
)
236 DataQueue
*dq
= (DataQueue
*) userdata
;
239 if( Data
*d
= dq
->pop() ) {
240 cout
<< "Received via pop: " << d
<< endl
;
244 cout
<< "No data in the queue yet." << endl
;
247 while( Data
*d
= dq
->wait_pop(5010) ) {
248 cout
<< "Received: " << d
<< endl
;
257 from
.push( new Data
);
260 dq
.append_from(from
);
263 pthread_create(&t1
, NULL
, &ReadThread
, &dq
);
264 pthread_create(&t2
, NULL
, &WriteThread
, &dq
);
266 pthread_join(t2
, NULL
);
267 pthread_join(t1
, NULL
);