3 /// FIFO queue of Data objects
7 Copyright (C) 2008, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "dataqueue.h"
23 #include "scoped_lock.h"
30 //////////////////////////////////////////////////////////////////////////////
33 DataQueue::DataQueue()
35 pthread_mutex_init(&m_waitMutex
, NULL
);
36 pthread_cond_init(&m_waitCond
, NULL
);
38 pthread_mutex_init(&m_accessMutex
, NULL
);
41 DataQueue::~DataQueue()
43 scoped_lock
lock(m_accessMutex
); // FIXME - is this sane?
45 while( m_queue
.size() ) {
46 delete m_queue
.front();
54 /// Pushes data into the end of the queue.
56 /// The queue owns this pointer as soon as the function is
57 /// called. In the case of an exception, it will be freed.
58 /// Performs a thread broadcast once new data has been added.
60 void DataQueue::push(Data
*data
)
65 scoped_lock
lock(m_accessMutex
);
69 scoped_lock
wait(m_waitMutex
);
70 pthread_cond_broadcast(&m_waitCond
);
82 /// Pops the next element off the front of the queue.
84 /// Returns 0 if empty.
85 /// The queue no longer owns this pointer upon return.
87 Data
* DataQueue::pop()
89 scoped_lock
lock(m_accessMutex
);
91 if( m_queue
.size() == 0 )
94 Data
*ret
= m_queue
.front();
102 /// Pops the next element off the front of the queue, and
103 /// waits until one exists if empty. If still no data
104 /// on timeout, returns null.
105 /// (unlock the access mutex while waiting!)
107 /// Timeout specified in milliseconds. Default is wait forever.
109 Data
* DataQueue::wait_pop(int timeout
)
113 // check if something's there already
115 scoped_lock
access(m_accessMutex
);
116 if( m_queue
.size() ) {
117 ret
= m_queue
.front();
123 // nothing there, so wait...
125 if( timeout
== -1 ) {
130 scoped_lock
wait(m_waitMutex
);
131 pthread_cond_wait(&m_waitCond
, &m_waitMutex
);
135 scoped_lock
access(m_accessMutex
);
136 size
= m_queue
.size();
138 // already have the lock, return now
139 ret
= m_queue
.front();
144 } while( size
== 0 );
147 // timeout in conditional wait
151 gettimeofday(&now
, NULL
);
152 to
.tv_sec
= now
.tv_sec
+ timeout
/ 1000;
153 to
.tv_nsec
= (now
.tv_usec
+ timeout
% 1000 * 1000) * 1000;
155 scoped_lock
wait(m_waitMutex
);
156 pthread_cond_timedwait(&m_waitCond
, &m_waitMutex
, &to
);
159 scoped_lock
access(m_accessMutex
);
160 if( m_queue
.size() == 0 )
163 ret
= m_queue
.front();
171 /// Pops all data from other and appends it to this.
173 /// After calling this function, other will be empty, and
174 /// this will contain all its data.
176 /// In the case of an exception, any uncopied data will
179 /// This is a locking optimization, so all copying can happen
180 /// inside one lock, instead of locking for each copy.
182 void DataQueue::append_from(DataQueue
&other
)
184 scoped_lock
us(m_accessMutex
);
185 scoped_lock
them(other
.m_accessMutex
);
187 while( other
.m_queue
.size() ) {
188 m_queue
.push( other
.m_queue
.front() );
190 // only pop after the copy, since in the
191 // case of an exception we want to leave other intact
199 /// Returns true if the queue is empty.
201 bool DataQueue::empty() const
203 scoped_lock
access(m_accessMutex
);
204 return m_queue
.size() == 0;
210 /// Returns number of items in the queue.
212 size_t DataQueue::size() const
214 scoped_lock
access(m_accessMutex
);
215 return m_queue
.size();
221 #ifdef __DQ_TEST_MODE__
226 using namespace Barry
;
228 void *WriteThread(void *userdata
)
230 DataQueue
*dq
= (DataQueue
*) userdata
;
232 dq
->push( new Data
);
233 dq
->push( new Data
);
235 dq
->push( new Data
);
240 void *ReadThread(void *userdata
)
242 DataQueue
*dq
= (DataQueue
*) userdata
;
245 if( Data
*d
= dq
->pop() ) {
246 cout
<< "Received via pop: " << d
<< endl
;
250 cout
<< "No data in the queue yet." << endl
;
253 while( Data
*d
= dq
->wait_pop(5010) ) {
254 cout
<< "Received: " << d
<< endl
;
263 from
.push( new Data
);
266 dq
.append_from(from
);
269 pthread_create(&t1
, NULL
, &ReadThread
, &dq
);
270 pthread_create(&t2
, NULL
, &WriteThread
, &dq
);
272 pthread_join(t2
, NULL
);
273 pthread_join(t1
, NULL
);