3 /// FIFO queue of Data objects
7 Copyright (C) 2008-2011, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "dataqueue.h"
23 #include "scoped_lock.h"
32 //////////////////////////////////////////////////////////////////////////////
35 DataQueue::DataQueue()
37 pthread_mutex_init(&m_waitMutex
, NULL
);
38 pthread_cond_init(&m_waitCond
, NULL
);
40 pthread_mutex_init(&m_accessMutex
, NULL
);
43 DataQueue::~DataQueue()
45 scoped_lock
lock(m_accessMutex
); // FIXME - is this sane?
47 while( m_queue
.size() ) {
52 // a push without locking - adds to the back
53 void DataQueue::raw_push(Data
*data
)
56 m_queue
.push_back(data
);
64 // a pop without locking - removes from the front, and returns it
65 Data
* DataQueue::raw_pop()
67 if( m_queue
.size() == 0 )
70 Data
*ret
= m_queue
.front();
78 /// Pushes data into the end of the queue.
80 /// The queue owns this pointer as soon as the function is
81 /// called. In the case of an exception, it will be freed.
82 /// Performs a thread broadcast once new data has been added.
84 void DataQueue::push(Data
*data
)
87 scoped_lock
lock(m_accessMutex
);
92 scoped_lock
wait(m_waitMutex
);
93 pthread_cond_broadcast(&m_waitCond
);
99 /// Pops the next element off the front of the queue.
101 /// Returns 0 if empty.
102 /// The queue no longer owns this pointer upon return.
104 Data
* DataQueue::pop()
106 scoped_lock
lock(m_accessMutex
);
113 /// Pops the next element off the front of the queue, and
114 /// waits until one exists if empty. If still no data
115 /// on timeout, returns null.
116 /// (unlock the access mutex while waiting!)
118 /// Timeout specified in milliseconds. Default is wait forever.
120 Data
* DataQueue::wait_pop(int timeout
)
122 // check if something's there already
124 scoped_lock
access(m_accessMutex
);
125 if( m_queue
.size() ) {
130 // nothing there, so wait...
132 if( timeout
== -1 ) {
137 scoped_lock
wait(m_waitMutex
);
138 pthread_cond_wait(&m_waitCond
, &m_waitMutex
);
142 scoped_lock
access(m_accessMutex
);
143 size
= m_queue
.size();
145 // already have the lock, return now
149 } while( size
== 0 );
152 // timeout in conditional wait
154 scoped_lock
wait(m_waitMutex
);
155 pthread_cond_timedwait(&m_waitCond
, &m_waitMutex
,
156 ThreadTimeout(timeout
, &to
));
159 scoped_lock
access(m_accessMutex
);
166 /// Pops all data from other and appends it to this.
168 /// After calling this function, other will be empty, and
169 /// this will contain all its data.
171 /// In the case of an exception, any uncopied data will
174 /// This is a locking optimization, so all copying can happen
175 /// inside one lock, instead of locking for each copy.
177 void DataQueue::append_from(DataQueue
&other
)
179 scoped_lock
us(m_accessMutex
);
180 scoped_lock
them(other
.m_accessMutex
);
182 while( other
.m_queue
.size() ) {
183 raw_push( other
.m_queue
.front() );
185 // only pop after the copy, since in the
186 // case of an exception we want to leave other intact
194 /// Returns true if the queue is empty.
196 bool DataQueue::empty() const
198 scoped_lock
access(m_accessMutex
);
199 return m_queue
.size() == 0;
205 /// Returns number of items in the queue.
207 size_t DataQueue::size() const
209 scoped_lock
access(m_accessMutex
);
210 return m_queue
.size();
213 void DataQueue::DumpAll(std::ostream
&os
) const
215 // queue is pushed to the back, and popped from the front
216 // (see raw_() functions) so this iterator direction will
217 // print the packets in the order they arrived
218 scoped_lock
access(m_accessMutex
);
219 queue_type::const_iterator b
= m_queue
.begin(), e
= m_queue
.end();
220 for( ; b
!= e
; ++b
) {
228 #ifdef __DQ_TEST_MODE__
233 using namespace Barry
;
235 void *WriteThread(void *userdata
)
237 DataQueue
*dq
= (DataQueue
*) userdata
;
239 dq
->push( new Data
);
240 dq
->push( new Data
);
242 dq
->push( new Data
);
247 void *ReadThread(void *userdata
)
249 DataQueue
*dq
= (DataQueue
*) userdata
;
252 if( Data
*d
= dq
->pop() ) {
253 cout
<< "Received via pop: " << d
<< endl
;
257 cout
<< "No data in the queue yet." << endl
;
260 while( Data
*d
= dq
->wait_pop(5010) ) {
261 cout
<< "Received: " << d
<< endl
;
270 from
.push( new Data
);
273 dq
.append_from(from
);
276 pthread_create(&t1
, NULL
, &ReadThread
, &dq
);
277 pthread_create(&t2
, NULL
, &WriteThread
, &dq
);
279 pthread_join(t2
, NULL
);
280 pthread_join(t1
, NULL
);