3 /// FIFO queue of Data objects
7 #include "scoped_lock.h"
14 //////////////////////////////////////////////////////////////////////////////
17 DataQueue::DataQueue()
19 pthread_mutex_init(&m_waitMutex
, NULL
);
20 pthread_cond_init(&m_waitCond
, NULL
);
22 pthread_mutex_init(&m_accessMutex
, NULL
);
25 DataQueue::~DataQueue()
27 scoped_lock
lock(m_accessMutex
); // FIXME - is this sane?
29 while( m_queue
.size() ) {
30 delete m_queue
.front();
38 /// Pushes data into the end of the queue.
40 /// The queue owns this pointer as soon as the function is
41 /// called. In the case of an exception, it will be freed.
42 /// Performs a thread broadcast once new data has been added.
44 void DataQueue::push(Data
*data
)
49 scoped_lock
lock(m_accessMutex
);
53 scoped_lock
wait(m_waitMutex
);
54 pthread_cond_broadcast(&m_waitCond
);
66 /// Pops the next element off the front of the queue.
68 /// Returns 0 if empty.
69 /// The queue no longer owns this pointer upon return.
71 Data
* DataQueue::pop()
73 scoped_lock
lock(m_accessMutex
);
75 if( m_queue
.size() == 0 )
78 Data
*ret
= m_queue
.front();
86 /// Pops the next element off the front of the queue, and
87 /// waits until one exists if empty. If still no data
88 /// on timeout, returns null.
89 /// (unlock the access mutex while waiting!)
91 /// Timeout specified in milliseconds. Default is wait forever.
93 Data
* DataQueue::wait_pop(int timeout
)
97 // check if something's there already
99 scoped_lock
access(m_accessMutex
);
100 if( m_queue
.size() ) {
101 ret
= m_queue
.front();
107 // nothing there, so wait...
109 if( timeout
== -1 ) {
114 scoped_lock
wait(m_waitMutex
);
115 pthread_cond_wait(&m_waitCond
, &m_waitMutex
);
119 scoped_lock
access(m_accessMutex
);
120 size
= m_queue
.size();
122 // already have the lock, return now
123 ret
= m_queue
.front();
128 } while( size
== 0 );
131 // timeout in conditional wait
135 gettimeofday(&now
, NULL
);
136 to
.tv_sec
= now
.tv_sec
+ timeout
/ 1000;
137 to
.tv_nsec
= (now
.tv_usec
+ timeout
% 1000 * 1000) * 1000;
139 scoped_lock
wait(m_waitMutex
);
140 pthread_cond_timedwait(&m_waitCond
, &m_waitMutex
, &to
);
143 scoped_lock
access(m_accessMutex
);
144 if( m_queue
.size() == 0 )
147 ret
= m_queue
.front();
155 /// Pops all data from other and appends it to this.
157 /// After calling this function, other will be empty, and
158 /// this will contain all its data.
160 /// In the case of an exception, any uncopied data will
163 /// This is a locking optimization, so all copying can happen
164 /// inside one lock, instead of locking for each copy.
166 void DataQueue::append_from(DataQueue
&other
)
168 scoped_lock
us(m_accessMutex
);
169 scoped_lock
them(other
.m_accessMutex
);
171 while( other
.m_queue
.size() ) {
172 m_queue
.push( other
.m_queue
.front() );
174 // only pop after the copy, since in the
175 // case of an exception we want to leave other intact
183 /// Returns true if the queue is empty.
185 bool DataQueue::empty() const
187 scoped_lock
access(m_accessMutex
);
188 return m_queue
.size() == 0;
194 /// Returns number of items in the queue.
196 size_t DataQueue::size() const
198 scoped_lock
access(m_accessMutex
);
199 return m_queue
.size();
205 #ifdef __DQ_TEST_MODE__
210 using namespace Barry
;
212 void *WriteThread(void *userdata
)
214 DataQueue
*dq
= (DataQueue
*) userdata
;
216 dq
->push( new Data
);
217 dq
->push( new Data
);
219 dq
->push( new Data
);
224 void *ReadThread(void *userdata
)
226 DataQueue
*dq
= (DataQueue
*) userdata
;
229 if( Data
*d
= dq
->pop() ) {
230 cout
<< "Received via pop: " << d
<< endl
;
234 cout
<< "No data in the queue yet." << endl
;
237 while( Data
*d
= dq
->wait_pop(5010) ) {
238 cout
<< "Received: " << d
<< endl
;
247 from
.push( new Data
);
250 dq
.append_from(from
);
253 pthread_create(&t1
, NULL
, &ReadThread
, &dq
);
254 pthread_create(&t2
, NULL
, &WriteThread
, &dq
);
256 pthread_join(t2
, NULL
);
257 pthread_join(t1
, NULL
);