Added ppp/README
[barry/pauldeden.git] / src / dataqueue.cc
blob00c7bf2229e1d9b6ebe618bfaad5a6e40c029599
1 ///
2 /// \file dataqueue.cc
3 /// FIFO queue of Data objects
4 ///
6 #include "dataqueue.h"
7 #include "scoped_lock.h"
8 #include "data.h"
9 #include <sys/time.h>
10 #include <time.h>
12 namespace Barry {
14 //////////////////////////////////////////////////////////////////////////////
15 // DataQueue class
17 DataQueue::DataQueue()
19 pthread_mutex_init(&m_waitMutex, NULL);
20 pthread_cond_init(&m_waitCond, NULL);
22 pthread_mutex_init(&m_accessMutex, NULL);
25 DataQueue::~DataQueue()
27 scoped_lock lock(m_accessMutex); // FIXME - is this sane?
29 while( m_queue.size() ) {
30 delete m_queue.front();
31 m_queue.pop();
36 // push
38 /// Pushes data into the end of the queue.
39 ///
40 /// The queue owns this pointer as soon as the function is
41 /// called. In the case of an exception, it will be freed.
42 /// Performs a thread broadcast once new data has been added.
43 ///
44 void DataQueue::push(Data *data)
46 try {
49 scoped_lock lock(m_accessMutex);
50 m_queue.push(data);
53 scoped_lock wait(m_waitMutex);
54 pthread_cond_broadcast(&m_waitCond);
57 catch(...) {
58 delete data;
59 throw;
64 // pop
66 /// Pops the next element off the front of the queue.
67 ///
68 /// Returns 0 if empty.
69 /// The queue no longer owns this pointer upon return.
70 ///
71 Data* DataQueue::pop()
73 scoped_lock lock(m_accessMutex);
75 if( m_queue.size() == 0 )
76 return 0;
78 Data *ret = m_queue.front();
79 m_queue.pop();
80 return ret;
84 // wait_pop
86 /// Pops the next element off the front of the queue, and
87 /// waits until one exists if empty. If still no data
88 /// on timeout, returns null.
89 /// (unlock the access mutex while waiting!)
90 ///
91 /// Timeout specified in milliseconds. Default is wait forever.
92 ///
93 Data* DataQueue::wait_pop(int timeout)
95 Data *ret = 0;
97 // check if something's there already
99 scoped_lock access(m_accessMutex);
100 if( m_queue.size() ) {
101 ret = m_queue.front();
102 m_queue.pop();
103 return ret;
107 // nothing there, so wait...
109 if( timeout == -1 ) {
110 // no timeout
111 int size = 0;
112 do {
114 scoped_lock wait(m_waitMutex);
115 pthread_cond_wait(&m_waitCond, &m_waitMutex);
118 // anything there?
119 scoped_lock access(m_accessMutex);
120 size = m_queue.size();
121 if( size != 0 ) {
122 // already have the lock, return now
123 ret = m_queue.front();
124 m_queue.pop();
125 return ret;
128 } while( size == 0 );
130 else {
131 // timeout in conditional wait
132 struct timeval now;
133 struct timespec to;
135 gettimeofday(&now, NULL);
136 to.tv_sec = now.tv_sec + timeout / 1000;
137 to.tv_nsec = (now.tv_usec + timeout % 1000 * 1000) * 1000;
139 scoped_lock wait(m_waitMutex);
140 pthread_cond_timedwait(&m_waitCond, &m_waitMutex, &to);
143 scoped_lock access(m_accessMutex);
144 if( m_queue.size() == 0 )
145 return 0;
147 ret = m_queue.front();
148 m_queue.pop();
149 return ret;
153 // append_from
155 /// Pops all data from other and appends it to this.
157 /// After calling this function, other will be empty, and
158 /// this will contain all its data.
160 /// In the case of an exception, any uncopied data will
161 /// remain in other.
163 /// This is a locking optimization, so all copying can happen
164 /// inside one lock, instead of locking for each copy.
166 void DataQueue::append_from(DataQueue &other)
168 scoped_lock us(m_accessMutex);
169 scoped_lock them(other.m_accessMutex);
171 while( other.m_queue.size() ) {
172 m_queue.push( other.m_queue.front() );
174 // only pop after the copy, since in the
175 // case of an exception we want to leave other intact
176 other.m_queue.pop();
181 // empty
183 /// Returns true if the queue is empty.
185 bool DataQueue::empty() const
187 scoped_lock access(m_accessMutex);
188 return m_queue.size() == 0;
192 // size
194 /// Returns number of items in the queue.
196 size_t DataQueue::size() const
198 scoped_lock access(m_accessMutex);
199 return m_queue.size();
202 } // namespace Barry
205 #ifdef __DQ_TEST_MODE__
207 #include <iostream>
209 using namespace std;
210 using namespace Barry;
212 void *WriteThread(void *userdata)
214 DataQueue *dq = (DataQueue*) userdata;
216 dq->push( new Data );
217 dq->push( new Data );
218 sleep(5);
219 dq->push( new Data );
221 return 0;
224 void *ReadThread(void *userdata)
226 DataQueue *dq = (DataQueue*) userdata;
228 sleep(1);
229 if( Data *d = dq->pop() ) {
230 cout << "Received via pop: " << d << endl;
231 delete d;
233 else {
234 cout << "No data in the queue yet." << endl;
237 while( Data *d = dq->wait_pop(5010) ) {
238 cout << "Received: " << d << endl;
239 delete d;
241 return 0;
244 int main()
246 DataQueue from;
247 from.push( new Data );
249 DataQueue dq;
250 dq.append_from(from);
252 pthread_t t1, t2;
253 pthread_create(&t1, NULL, &ReadThread, &dq);
254 pthread_create(&t2, NULL, &WriteThread, &dq);
256 pthread_join(t2, NULL);
257 pthread_join(t1, NULL);
260 #endif