test: if using -pedantic, also use -ansi, so gettext.h can detect it correctly
[barry.git] / src / router.cc
blob05737ca9b1134e2a7a772d2c1440306423aa77b8
1 ///
2 /// \file router.cc
3 /// Support classes for the pluggable socket routing system.
4 ///
6 /*
7 Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "router.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "protostructs.h"
26 #include "usbwrap.h"
27 #include "endian.h"
28 #include "debug.h"
29 #include <unistd.h>
31 namespace Barry {
33 ///////////////////////////////////////////////////////////////////////////////
34 // SocketDataHandler default methods
36 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
38 // Just log the error
39 eout("SocketDataHandler: Error: " << error.what());
40 (void) error;
43 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
45 // Nothing to destroy
48 ///////////////////////////////////////////////////////////////////////////////
49 // SocketRoutingQueue constructors
51 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count)
52 : m_dev(0)
53 , m_writeEp(0)
54 , m_readEp(0)
55 , m_interest(false)
56 , m_seen_usb_error(false)
57 , m_continue_reading(false)
59 pthread_mutex_init(&m_mutex, NULL);
61 pthread_mutex_init(&m_readwaitMutex, NULL);
62 pthread_cond_init(&m_readwaitCond, NULL);
64 AllocateBuffers(prealloc_buffer_count);
67 SocketRoutingQueue::~SocketRoutingQueue()
69 // thread running?
70 if( m_continue_reading ) {
71 m_continue_reading = false;
72 pthread_join(m_usb_read_thread, NULL);
76 ///////////////////////////////////////////////////////////////////////////////
77 // protected members
80 // ReturnBuffer
82 /// Provides a method of returning a buffer to the free queue
83 /// after processing. The DataHandle class calls this automatically
84 /// from its destructor.
85 void SocketRoutingQueue::ReturnBuffer(Data *buf)
87 // don't need to lock here, since m_free handles its own locking
88 m_free.push(buf);
92 // SimpleReadThread()
94 /// Convenience thread to handle USB read activity.
95 ///
96 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
98 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
100 // read from USB and write to stdout until finished
101 q->m_seen_usb_error = false;
102 while( q->m_continue_reading ) {
103 try {
104 q->DoRead(1000); // timeout in milliseconds
106 catch (std::runtime_error const &e) {
107 eout("SimpleReadThread received uncaught exception: " << typeid(e).name() << " what: " << e.what());
109 catch (...) {
110 eout("SimpleReadThread recevied uncaught exception of unknown type");
113 return 0;
117 ///////////////////////////////////////////////////////////////////////////////
118 // public API
120 // These functions connect the router to an external Usb::Device
121 // object. Normally this is handled automatically by the
122 // Controller class, but are public here in case they are needed.
123 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
124 SocketDataHandlerPtr callback)
126 scoped_lock lock(m_mutex);
127 m_dev = dev;
128 m_usb_error_dev_callback = callback;
129 m_writeEp = writeEp;
130 m_readEp = readEp;
133 void SocketRoutingQueue::ClearUsbDevice()
135 scoped_lock lock(m_mutex);
136 m_dev = 0;
137 m_usb_error_dev_callback.reset();
138 lock.unlock();
140 // wait for the DoRead cycle to finish, so the external
141 // Usb::Device object doesn't close before we're done with it
142 scoped_lock wait(m_readwaitMutex);
143 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
146 bool SocketRoutingQueue::UsbDeviceReady()
148 scoped_lock lock(m_mutex);
149 return m_dev != 0 && !m_seen_usb_error;
153 // AllocateBuffers
155 /// This class starts out with no buffers, and will grow one buffer
156 /// at a time if needed. Call this to allocate count buffers
157 /// all at once and place them on the free queue. After calling
158 /// this function, at least count buffers will exist in the free
159 /// queue. If there are already count buffers, none will be added.
161 void SocketRoutingQueue::AllocateBuffers(int count)
163 int todo = count - m_free.size();
165 for( int i = 0; i < todo; i++ ) {
166 // m_free handles its own locking
167 m_free.push( new Data );
172 // DefaultRead (both variations)
174 /// Returns the data for the next unregistered socket.
175 /// Blocks until timeout or data is available.
176 /// Returns false (or null pointer) on timeout and no data.
177 /// With the return version of the function, there is no
178 /// copying performed.
180 /// This version performs a copy.
182 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
184 DataHandle buf = DefaultRead(timeout);
185 if( !buf.get() )
186 return false;
188 // copy to desired buffer
189 receive = *buf.get();
190 return true;
194 /// This version does not perform a copy.
196 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
198 // m_default handles its own locking
199 Data *buf = m_default.wait_pop(timeout);
200 return DataHandle(*this, buf);
204 // RegisterInterest
206 /// Register an interest in data from a certain socket. To read
207 /// from that socket, use the SocketRead() function from then on.
209 /// Any non-registered socket goes in the default queue
210 /// and must be read by DefaultRead()
212 /// If not null, handler is called when new data is read. It will
213 /// be called in the same thread instance that DoRead() is called from.
214 /// Handler is passed the DataQueue Data pointer, and so no
215 /// copying is done. Once the handler returns, the data is
216 /// considered processed and not added to the interested queue,
217 /// but instead returned to m_free.
219 /// Throws std::logic_error if already registered.
221 void SocketRoutingQueue::RegisterInterest(SocketId socket,
222 SocketDataHandlerPtr handler)
224 // modifying our own std::map, need a lock
225 scoped_lock lock(m_mutex);
227 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
228 if( qi != m_socketQueues.end() )
229 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
231 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
232 m_interest = true;
236 // UnregisterInterest
238 /// Unregisters interest in data from the given socket, and discards
239 /// any existing data in its interest queue. Any new incoming data
240 /// for this socket will be placed in the default queue.
242 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
244 // modifying our own std::map, need a lock
245 scoped_lock lock(m_mutex);
247 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
248 if( qi == m_socketQueues.end() )
249 return; // nothing registered, done
251 // salvage all our data buffers
252 m_free.append_from( qi->second->m_queue );
254 // remove the QueueEntryPtr from the map
255 m_socketQueues.erase( qi );
257 // check the interest flag
258 m_interest = m_socketQueues.size() > 0;
262 // SocketRead
264 /// Reads data from the interested socket cache. Can only read
265 /// from sockets that have been previously registered.
267 /// Blocks until timeout or data is available.
269 /// Returns false (or null pointer) on timeout and no data.
270 /// With the return version of the function, there is no
271 /// copying performed.
273 /// Throws std::logic_error if a socket was requested that was
274 /// not previously registered.
276 /// Copying is performed with this function.
278 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
280 DataHandle buf = SocketRead(socket, timeout);
281 if( !buf.get() )
282 return false;
284 // copy to desired buffer
285 receive = *buf.get();
286 return true;
290 /// Copying is not performed with this function.
292 /// Throws std::logic_error if a socket was requested that was
293 /// not previously registered.
295 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
297 QueueEntryPtr qep;
298 DataQueue *dq = 0;
300 // accessing our own std::map, need a lock
302 scoped_lock lock(m_mutex);
303 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
304 if( qi == m_socketQueues.end() )
305 throw std::logic_error("SocketRead requested data from unregistered socket.");
307 // got our queue, save the whole QueueEntryPtr (shared_ptr),
308 // and unlock, since we will be waiting on the DataQueue,
309 // not the socketQueues map
311 // This is safe, since even if UnregisterInterest is called,
312 // our pointer won't be deleted until our shared_ptr
313 // (QueueEntryPtr) goes out of scope.
315 // The remaining problem is that wait_pop() might wait
316 // forever if there is no timeout... c'est la vie.
317 // Should'a used a timeout. :-)
318 qep = qi->second;
319 dq = &qep->m_queue;
322 // get data from DataQueue
323 Data *buf = dq->wait_pop(timeout);
325 // specifically delete our copy of shared pointer, in a locked
326 // environment
328 scoped_lock lock(m_mutex);
329 qep.reset();
332 return DataHandle(*this, buf);
335 // Returns true if data is available for that socket.
336 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
338 scoped_lock lock(m_mutex);
339 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
340 if( qi == m_socketQueues.end() )
341 return false;
342 return qi->second->m_queue.size() > 0;
346 // DoRead
348 /// Called by the application's "read thread" to read the next usb
349 /// packet and route it to the correct queue. Returns after every
350 /// read, even if a handler is associated with a queue.
351 /// Note: this function is safe to call before SetUsbDevice() is
352 /// called... it just doesn't do anything if there is no usb
353 /// device to work with.
355 /// Timeout is in milliseconds.
356 void SocketRoutingQueue::DoRead(int timeout)
358 class ReadWaitSignal
360 pthread_mutex_t &m_Mutex;
361 pthread_cond_t &m_Cond;
362 public:
363 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
364 : m_Mutex(mut), m_Cond(cond)
366 ~ReadWaitSignal()
368 scoped_lock wait(m_Mutex);
369 pthread_cond_signal(&m_Cond);
371 } readwait(m_readwaitMutex, m_readwaitCond);
373 Usb::Device * volatile dev = 0;
374 int readEp;
375 DataHandle buf(*this, 0);
377 // if we are not connected to a USB device yet, just wait
379 scoped_lock lock(m_mutex);
381 if( !m_dev || m_seen_usb_error ) {
382 lock.unlock(); // unlock early, since we're sleeping
383 // sleep only a short time, since things could be
384 // in the process of setup or teardown
385 usleep(125000);
386 return;
389 dev = m_dev;
390 readEp = m_readEp;
392 // fetch a free buffer
393 Data *raw = m_free.pop();
394 if( !raw )
395 buf = DataHandle(*this, new Data);
396 else
397 buf = DataHandle(*this, raw);
400 // take a chance and do the read unlocked, as this has the potential
401 // for blocking for a while
402 try {
404 Data &data = *buf.get();
406 if( !dev->BulkRead(readEp, data, timeout) )
407 return; // no data, done!
409 MAKE_PACKET(pack, data);
411 // make sure the size is right
412 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
413 return; // bad size, just skip
415 // extract the socket from the packet
416 uint16_t socket = btohs(pack->socket);
418 // we have data, now lock up again to place it
419 // in the right queue
420 scoped_lock lock(m_mutex);
422 // search for registration of socket
423 if( m_interest ) {
424 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
425 if( qi != m_socketQueues.end() ) {
426 SocketDataHandlerPtr &sdh = qi->second->m_handler;
428 // is there a handler?
429 if( sdh ) {
430 // unlock & let the handler process it
431 lock.unlock();
432 sdh->DataReceived(*buf.get());
433 return;
435 else {
436 qi->second->m_queue.push(buf.release());
437 return;
441 // fall through
444 // safe to unlock now, we are done with the map
445 lock.unlock();
447 // if we get here, send to default queue
448 m_default.push(buf.release());
450 catch( Usb::Timeout & ) {
451 // this is expected... just ignore
453 catch( Usb::Error &ue ) {
454 // set the flag first, in case any of the handlers
455 // are able to recover from this error
456 m_seen_usb_error = true;
458 // this is unexpected, but we're in a thread here...
459 // Need to iterate through all the registered handlers
460 // calling their error callback.
461 // Can't be locked when calling the callback, so need
462 // to make a list of them first.
463 scoped_lock lock(m_mutex);
464 std::vector<SocketDataHandlerPtr> handlers;
465 SocketQueueMap::iterator qi = m_socketQueues.begin();
466 while( qi != m_socketQueues.end() ) {
467 SocketDataHandlerPtr &sdh = qi->second->m_handler;
468 // is there a handler?
469 if( sdh ) {
470 handlers.push_back(sdh);
472 ++qi;
475 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
477 lock.unlock();
478 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
479 while( hi != handlers.end() ) {
480 (*hi)->Error(ue);
481 ++hi;
484 // and finally, call the specific error callback if available
485 if( usb_error_handler.get() ) {
486 usb_error_handler->Error(ue);
491 void SocketRoutingQueue::SpinoffSimpleReadThread()
493 // signal that it's ok to run inside the thread
494 if( m_continue_reading )
495 return; // already running
496 m_continue_reading = true;
498 // Start USB read thread, to handle all routing
499 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
500 if( ret ) {
501 m_continue_reading = false;
502 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
506 } // namespace Barry