rpm: removed opensuse special cases
[barry.git] / src / dataqueue.cc
blob72d896cf307fadf0da99d5b7303f70cae8c4f2fb
1 ///
2 /// \file dataqueue.cc
3 /// FIFO queue of Data objects
4 ///
6 /*
7 Copyright (C) 2008-2011, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "dataqueue.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "time.h"
26 #include <iostream>
28 using namespace std;
30 namespace Barry {
32 //////////////////////////////////////////////////////////////////////////////
33 // DataQueue class
35 DataQueue::DataQueue()
37 pthread_mutex_init(&m_waitMutex, NULL);
38 pthread_cond_init(&m_waitCond, NULL);
40 pthread_mutex_init(&m_accessMutex, NULL);
43 DataQueue::~DataQueue()
45 scoped_lock lock(m_accessMutex); // FIXME - is this sane?
47 while( m_queue.size() ) {
48 delete raw_pop();
52 // a push without locking - adds to the back
53 void DataQueue::raw_push(Data *data)
55 try {
56 m_queue.push_back(data);
58 catch(...) {
59 delete data;
60 throw;
64 // a pop without locking - removes from the front, and returns it
65 Data* DataQueue::raw_pop()
67 if( m_queue.size() == 0 )
68 return 0;
70 Data *ret = m_queue.front();
71 m_queue.pop_front();
72 return ret;
76 // push
78 /// Pushes data into the end of the queue.
79 ///
80 /// The queue owns this pointer as soon as the function is
81 /// called. In the case of an exception, it will be freed.
82 /// Performs a thread broadcast once new data has been added.
83 ///
84 void DataQueue::push(Data *data)
87 scoped_lock lock(m_accessMutex);
88 raw_push(data);
91 // on success, signal
92 scoped_lock wait(m_waitMutex);
93 pthread_cond_broadcast(&m_waitCond);
97 // pop
99 /// Pops the next element off the front of the queue.
101 /// Returns 0 if empty.
102 /// The queue no longer owns this pointer upon return.
104 Data* DataQueue::pop()
106 scoped_lock lock(m_accessMutex);
107 return raw_pop();
111 // wait_pop
113 /// Pops the next element off the front of the queue, and
114 /// waits until one exists if empty. If still no data
115 /// on timeout, returns null.
116 /// (unlock the access mutex while waiting!)
118 /// Timeout specified in milliseconds. Default is wait forever.
120 Data* DataQueue::wait_pop(int timeout)
122 // check if something's there already
124 scoped_lock access(m_accessMutex);
125 if( m_queue.size() ) {
126 return raw_pop();
130 // nothing there, so wait...
132 if( timeout == -1 ) {
133 // no timeout
134 int size = 0;
135 do {
137 scoped_lock wait(m_waitMutex);
138 pthread_cond_wait(&m_waitCond, &m_waitMutex);
141 // anything there?
142 scoped_lock access(m_accessMutex);
143 size = m_queue.size();
144 if( size != 0 ) {
145 // already have the lock, return now
146 return raw_pop();
149 } while( size == 0 );
151 else {
152 // timeout in conditional wait
153 struct timespec to;
154 scoped_lock wait(m_waitMutex);
155 pthread_cond_timedwait(&m_waitCond, &m_waitMutex,
156 ThreadTimeout(timeout, &to));
159 scoped_lock access(m_accessMutex);
160 return raw_pop();
164 // append_from
166 /// Pops all data from other and appends it to this.
168 /// After calling this function, other will be empty, and
169 /// this will contain all its data.
171 /// In the case of an exception, any uncopied data will
172 /// remain in other.
174 /// This is a locking optimization, so all copying can happen
175 /// inside one lock, instead of locking for each copy.
177 void DataQueue::append_from(DataQueue &other)
179 scoped_lock us(m_accessMutex);
180 scoped_lock them(other.m_accessMutex);
182 while( other.m_queue.size() ) {
183 raw_push( other.m_queue.front() );
185 // only pop after the copy, since in the
186 // case of an exception we want to leave other intact
187 other.raw_pop();
192 // empty
194 /// Returns true if the queue is empty.
196 bool DataQueue::empty() const
198 scoped_lock access(m_accessMutex);
199 return m_queue.size() == 0;
203 // size
205 /// Returns number of items in the queue.
207 size_t DataQueue::size() const
209 scoped_lock access(m_accessMutex);
210 return m_queue.size();
213 void DataQueue::DumpAll(std::ostream &os) const
215 // queue is pushed to the back, and popped from the front
216 // (see raw_() functions) so this iterator direction will
217 // print the packets in the order they arrived
218 scoped_lock access(m_accessMutex);
219 queue_type::const_iterator b = m_queue.begin(), e = m_queue.end();
220 for( ; b != e; ++b ) {
221 os << **b << endl;
225 } // namespace Barry
228 #ifdef __DQ_TEST_MODE__
230 #include <iostream>
232 using namespace std;
233 using namespace Barry;
235 void *WriteThread(void *userdata)
237 DataQueue *dq = (DataQueue*) userdata;
239 dq->push( new Data );
240 dq->push( new Data );
241 sleep(5);
242 dq->push( new Data );
244 return 0;
247 void *ReadThread(void *userdata)
249 DataQueue *dq = (DataQueue*) userdata;
251 sleep(1);
252 if( Data *d = dq->pop() ) {
253 cout << "Received via pop: " << d << endl;
254 delete d;
256 else {
257 cout << "No data in the queue yet." << endl;
260 while( Data *d = dq->wait_pop(5010) ) {
261 cout << "Received: " << d << endl;
262 delete d;
264 return 0;
267 int main()
269 DataQueue from;
270 from.push( new Data );
272 DataQueue dq;
273 dq.append_from(from);
275 pthread_t t1, t2;
276 pthread_create(&t1, NULL, &ReadThread, &dq);
277 pthread_create(&t2, NULL, &WriteThread, &dq);
279 pthread_join(t2, NULL);
280 pthread_join(t1, NULL);
283 #endif