Minor tweaks to the PO details
[barry.git] / src / router.cc
blobfc6aa278ae6f756861bdcb362d7839bda9dacbb4
1 ///
2 /// \file router.cc
3 /// Support classes for the pluggable socket routing system.
4 ///
6 /*
7 Copyright (C) 2008-2009, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "router.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "protostructs.h"
26 #include "usbwrap.h"
27 #include "endian.h"
28 #include "debug.h"
29 #include <unistd.h>
31 namespace Barry {
33 ///////////////////////////////////////////////////////////////////////////////
34 // SocketRoutingQueue constructors
36 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count)
37 : m_dev(0)
38 , m_writeEp(0)
39 , m_readEp(0)
40 , m_interest(false)
41 , m_continue_reading(false)
43 pthread_mutex_init(&m_mutex, NULL);
45 pthread_mutex_init(&m_readwaitMutex, NULL);
46 pthread_cond_init(&m_readwaitCond, NULL);
48 AllocateBuffers(prealloc_buffer_count);
51 SocketRoutingQueue::~SocketRoutingQueue()
53 // thread running?
54 if( m_continue_reading ) {
55 m_continue_reading = false;
56 pthread_join(m_usb_read_thread, NULL);
60 ///////////////////////////////////////////////////////////////////////////////
61 // protected members
64 // ReturnBuffer
66 /// Provides a method of returning a buffer to the free queue
67 /// after processing. The DataHandle class calls this automatically
68 /// from its destructor.
69 void SocketRoutingQueue::ReturnBuffer(Data *buf)
71 // don't need to lock here, since m_free handles its own locking
72 m_free.push(buf);
76 // SimpleReadThread()
78 /// Convenience thread to handle USB read activity.
79 ///
80 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
82 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
84 // read from USB and write to stdout until finished
85 std::string msg;
86 while( q->m_continue_reading ) {
87 if( !q->DoRead(msg, 1000) ) { // timeout in milliseconds
88 eout("Error in SimpleReadThread: " << msg);
91 return 0;
95 ///////////////////////////////////////////////////////////////////////////////
96 // public API
98 // These functions connect the router to an external Usb::Device
99 // object. Normally this is handled automatically by the
100 // Controller class, but are public here in case they are needed.
101 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp)
103 scoped_lock lock(m_mutex);
104 m_dev = dev;
105 m_writeEp = writeEp;
106 m_readEp = readEp;
109 void SocketRoutingQueue::ClearUsbDevice()
111 scoped_lock lock(m_mutex);
112 m_dev = 0;
113 lock.unlock();
115 // wait for the DoRead cycle to finish, so the external
116 // Usb::Device object doesn't close before we're done with it
117 scoped_lock wait(m_readwaitMutex);
118 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
121 bool SocketRoutingQueue::UsbDeviceReady()
123 scoped_lock lock(m_mutex);
124 return m_dev != 0;
128 // AllocateBuffers
130 /// This class starts out with no buffers, and will grow one buffer
131 /// at a time if needed. Call this to allocate count buffers
132 /// all at once and place them on the free queue. After calling
133 /// this function, at least count buffers will exist in the free
134 /// queue. If there are already count buffers, none will be added.
136 void SocketRoutingQueue::AllocateBuffers(int count)
138 int todo = count - m_free.size();
140 for( int i = 0; i < todo; i++ ) {
141 // m_free handles its own locking
142 m_free.push( new Data );
147 // DefaultRead (both variations)
149 /// Returns the data for the next unregistered socket.
150 /// Blocks until timeout or data is available.
151 /// Returns false (or null pointer) on timeout and no data.
152 /// With the return version of the function, there is no
153 /// copying performed.
155 /// This version performs a copy.
157 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
159 DataHandle buf = DefaultRead(timeout);
160 if( !buf.get() )
161 return false;
163 // copy to desired buffer
164 receive = *buf.get();
165 return true;
169 /// This version does not perform a copy.
171 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
173 // m_default handles its own locking
174 Data *buf = m_default.wait_pop(timeout);
175 return DataHandle(*this, buf);
179 // RegisterInterest
181 /// Register an interest in data from a certain socket. To read
182 /// from that socket, use the SocketRead() function from then on.
184 /// Any non-registered socket goes in the default queue
185 /// and must be read by DefaultRead()
187 /// If not null, handler is called when new data is read. It will
188 /// be called in the same thread instance that DoRead() is called from.
189 /// Handler is passed the DataQueue Data pointer, and so no
190 /// copying is done. Once the handler returns, the data is
191 /// considered processed and not added to the interested queue,
192 /// but instead returned to m_free.
194 /// Throws std::logic_error if already registered.
196 void SocketRoutingQueue::RegisterInterest(SocketId socket,
197 SocketDataHandler handler,
198 void *context)
200 // modifying our own std::map, need a lock
201 scoped_lock lock(m_mutex);
203 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
204 if( qi != m_socketQueues.end() )
205 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
207 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler, context) );
208 m_interest = true;
212 // UnregisterInterest
214 /// Unregisters interest in data from the given socket, and discards
215 /// any existing data in its interest queue. Any new incoming data
216 /// for this socket will be placed in the default queue.
218 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
220 // modifying our own std::map, need a lock
221 scoped_lock lock(m_mutex);
223 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
224 if( qi == m_socketQueues.end() )
225 return; // nothing registered, done
227 // salvage all our data buffers
228 m_free.append_from( qi->second->m_queue );
230 // remove the QueueEntryPtr from the map
231 m_socketQueues.erase( qi );
233 // check the interest flag
234 m_interest = m_socketQueues.size() > 0;
238 // SocketRead
240 /// Reads data from the interested socket cache. Can only read
241 /// from sockets that have been previously registered.
243 /// Blocks until timeout or data is available.
245 /// Returns false (or null pointer) on timeout and no data.
246 /// With the return version of the function, there is no
247 /// copying performed.
249 /// Throws std::logic_error if a socket was requested that was
250 /// not previously registered.
252 /// Copying is performed with this function.
254 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
256 DataHandle buf = SocketRead(socket, timeout);
257 if( !buf.get() )
258 return false;
260 // copy to desired buffer
261 receive = *buf.get();
262 return true;
266 /// Copying is not performed with this function.
268 /// Throws std::logic_error if a socket was requested that was
269 /// not previously registered.
271 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
273 QueueEntryPtr qep;
274 DataQueue *dq = 0;
276 // accessing our own std::map, need a lock
278 scoped_lock lock(m_mutex);
279 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
280 if( qi == m_socketQueues.end() )
281 throw std::logic_error("SocketRead requested data from unregistered socket.");
283 // got our queue, save the whole QueueEntryPtr (shared_ptr),
284 // and unlock, since we will be waiting on the DataQueue,
285 // not the socketQueues map
287 // This is safe, since even if UnregisterInterest is called,
288 // our pointer won't be deleted until our shared_ptr
289 // (QueueEntryPtr) goes out of scope.
291 // The remaining problem is that wait_pop() might wait
292 // forever if there is no timeout... c'est la vie.
293 // Should'a used a timeout. :-)
294 qep = qi->second;
295 dq = &qep->m_queue;
298 // get data from DataQueue
299 Data *buf = dq->wait_pop(timeout);
301 // specifically delete our copy of shared pointer, in a locked
302 // environment
304 scoped_lock lock(m_mutex);
305 qep.reset();
308 return DataHandle(*this, buf);
311 // Returns true if data is available for that socket.
312 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
314 scoped_lock lock(m_mutex);
315 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
316 if( qi == m_socketQueues.end() )
317 return false;
318 return qi->second->m_queue.size() > 0;
322 // DoRead
324 /// Called by the application's "read thread" to read the next usb
325 /// packet and route it to the correct queue. Returns after every
326 /// read, even if a handler is associated with a queue.
327 /// Note: this function is safe to call before SetUsbDevice() is
328 /// called... it just doesn't do anything if there is no usb
329 /// device to work with.
331 /// Timeout is in milliseconds.
333 /// Returns false in the case of USB errors and puts the error message
334 /// in msg.
336 bool SocketRoutingQueue::DoRead(std::string &msg, int timeout)
338 class ReadWaitSignal
340 pthread_mutex_t &m_Mutex;
341 pthread_cond_t &m_Cond;
342 public:
343 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
344 : m_Mutex(mut), m_Cond(cond)
346 ~ReadWaitSignal()
348 scoped_lock wait(m_Mutex);
349 pthread_cond_signal(&m_Cond);
351 } readwait(m_readwaitMutex, m_readwaitCond);
353 Usb::Device * volatile dev = 0;
354 int readEp;
355 DataHandle buf(*this, 0);
357 // if we are not connected to a USB device yet, just wait
359 scoped_lock lock(m_mutex);
361 if( !m_dev ) {
362 lock.unlock(); // unlock early, since we're sleeping
363 // sleep only a short time, since things could be
364 // in the process of setup or teardown
365 usleep(125000);
366 return true;
369 dev = m_dev;
370 readEp = m_readEp;
372 // fetch a free buffer
373 Data *raw = m_free.pop();
374 if( !raw )
375 buf = DataHandle(*this, new Data);
376 else
377 buf = DataHandle(*this, raw);
380 // take a chance and do the read unlocked, as this has the potential
381 // for blocking for a while
382 try {
384 Data &data = *buf.get();
386 if( !dev->BulkRead(readEp, data, timeout) )
387 return true; // no data, done!
389 MAKE_PACKET(pack, data);
391 // make sure the size is right
392 if( data.GetSize() < sizeof(pack->socket) )
393 return true; // bad size, just skip
395 // extract the socket from the packet
396 uint16_t socket = btohs(pack->socket);
398 // we have data, now lock up again to place it
399 // in the right queue
400 scoped_lock lock(m_mutex);
402 // search for registration of socket
403 if( m_interest ) {
404 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
405 if( qi != m_socketQueues.end() ) {
406 SocketDataHandler sdh = qi->second->m_handler;
407 void *ctx = qi->second->m_context;
409 // is there a handler?
410 if( sdh ) {
411 // unlock & let the handler process it
412 lock.unlock();
413 (*sdh)(ctx, buf.get());
414 return true;
416 else {
417 qi->second->m_queue.push(buf.release());
418 return true;
422 // fall through
425 // safe to unlock now, we are done with the map
426 lock.unlock();
428 // if we get here, send to default queue
429 m_default.push(buf.release());
430 return true;
433 catch( Usb::Timeout & ) {
434 // this is expected... just ignore
436 catch( Usb::Error &ue ) {
437 // this is unexpected, but we're in a thread here...
438 // return false and the caller decide how to handle it
439 msg = ue.what();
440 return false;
443 return true;
446 void SocketRoutingQueue::SpinoffSimpleReadThread()
448 // signal that it's ok to run inside the thread
449 if( m_continue_reading )
450 return; // already running
451 m_continue_reading = true;
453 // Start USB read thread, to handle all routing
454 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
455 if( ret ) {
456 m_continue_reading = false;
457 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
461 } // namespace Barry