lib: added DBDataBuilder class
[barry.git] / src / dataqueue.cc
blobc22c61c9a18a52baff7ec58cc7eccd8126bcda2e
1 ///
2 /// \file dataqueue.cc
3 /// FIFO queue of Data objects
4 ///
6 /*
7 Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "dataqueue.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "time.h"
27 namespace Barry {
29 //////////////////////////////////////////////////////////////////////////////
30 // DataQueue class
32 DataQueue::DataQueue()
34 pthread_mutex_init(&m_waitMutex, NULL);
35 pthread_cond_init(&m_waitCond, NULL);
37 pthread_mutex_init(&m_accessMutex, NULL);
40 DataQueue::~DataQueue()
42 scoped_lock lock(m_accessMutex); // FIXME - is this sane?
44 while( m_queue.size() ) {
45 delete m_queue.front();
46 m_queue.pop();
51 // push
53 /// Pushes data into the end of the queue.
54 ///
55 /// The queue owns this pointer as soon as the function is
56 /// called. In the case of an exception, it will be freed.
57 /// Performs a thread broadcast once new data has been added.
58 ///
59 void DataQueue::push(Data *data)
61 try {
64 scoped_lock lock(m_accessMutex);
65 m_queue.push(data);
68 scoped_lock wait(m_waitMutex);
69 pthread_cond_broadcast(&m_waitCond);
72 catch(...) {
73 delete data;
74 throw;
79 // pop
81 /// Pops the next element off the front of the queue.
82 ///
83 /// Returns 0 if empty.
84 /// The queue no longer owns this pointer upon return.
85 ///
86 Data* DataQueue::pop()
88 scoped_lock lock(m_accessMutex);
90 if( m_queue.size() == 0 )
91 return 0;
93 Data *ret = m_queue.front();
94 m_queue.pop();
95 return ret;
99 // wait_pop
101 /// Pops the next element off the front of the queue, and
102 /// waits until one exists if empty. If still no data
103 /// on timeout, returns null.
104 /// (unlock the access mutex while waiting!)
106 /// Timeout specified in milliseconds. Default is wait forever.
108 Data* DataQueue::wait_pop(int timeout)
110 Data *ret = 0;
112 // check if something's there already
114 scoped_lock access(m_accessMutex);
115 if( m_queue.size() ) {
116 ret = m_queue.front();
117 m_queue.pop();
118 return ret;
122 // nothing there, so wait...
124 if( timeout == -1 ) {
125 // no timeout
126 int size = 0;
127 do {
129 scoped_lock wait(m_waitMutex);
130 pthread_cond_wait(&m_waitCond, &m_waitMutex);
133 // anything there?
134 scoped_lock access(m_accessMutex);
135 size = m_queue.size();
136 if( size != 0 ) {
137 // already have the lock, return now
138 ret = m_queue.front();
139 m_queue.pop();
140 return ret;
143 } while( size == 0 );
145 else {
146 // timeout in conditional wait
147 struct timespec to;
148 scoped_lock wait(m_waitMutex);
149 pthread_cond_timedwait(&m_waitCond, &m_waitMutex,
150 ThreadTimeout(timeout, &to));
153 scoped_lock access(m_accessMutex);
154 if( m_queue.size() == 0 )
155 return 0;
157 ret = m_queue.front();
158 m_queue.pop();
159 return ret;
163 // append_from
165 /// Pops all data from other and appends it to this.
167 /// After calling this function, other will be empty, and
168 /// this will contain all its data.
170 /// In the case of an exception, any uncopied data will
171 /// remain in other.
173 /// This is a locking optimization, so all copying can happen
174 /// inside one lock, instead of locking for each copy.
176 void DataQueue::append_from(DataQueue &other)
178 scoped_lock us(m_accessMutex);
179 scoped_lock them(other.m_accessMutex);
181 while( other.m_queue.size() ) {
182 m_queue.push( other.m_queue.front() );
184 // only pop after the copy, since in the
185 // case of an exception we want to leave other intact
186 other.m_queue.pop();
191 // empty
193 /// Returns true if the queue is empty.
195 bool DataQueue::empty() const
197 scoped_lock access(m_accessMutex);
198 return m_queue.size() == 0;
202 // size
204 /// Returns number of items in the queue.
206 size_t DataQueue::size() const
208 scoped_lock access(m_accessMutex);
209 return m_queue.size();
212 } // namespace Barry
215 #ifdef __DQ_TEST_MODE__
217 #include <iostream>
219 using namespace std;
220 using namespace Barry;
222 void *WriteThread(void *userdata)
224 DataQueue *dq = (DataQueue*) userdata;
226 dq->push( new Data );
227 dq->push( new Data );
228 sleep(5);
229 dq->push( new Data );
231 return 0;
234 void *ReadThread(void *userdata)
236 DataQueue *dq = (DataQueue*) userdata;
238 sleep(1);
239 if( Data *d = dq->pop() ) {
240 cout << "Received via pop: " << d << endl;
241 delete d;
243 else {
244 cout << "No data in the queue yet." << endl;
247 while( Data *d = dq->wait_pop(5010) ) {
248 cout << "Received: " << d << endl;
249 delete d;
251 return 0;
254 int main()
256 DataQueue from;
257 from.push( new Data );
259 DataQueue dq;
260 dq.append_from(from);
262 pthread_t t1, t2;
263 pthread_create(&t1, NULL, &ReadThread, &dq);
264 pthread_create(&t2, NULL, &WriteThread, &dq);
266 pthread_join(t2, NULL);
267 pthread_join(t1, NULL);
270 #endif