lib: added implicit ctor converter from DatabaseDatabase to DBListType
[barry/progweb.git] / src / router.cc
blobc27e0727174ff7458993c9bb33d38a719b642cc7
1 ///
2 /// \file router.cc
3 /// Support classes for the pluggable socket routing system.
4 ///
6 /*
7 Copyright (C) 2008-2012, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "router.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "protostructs.h"
26 #include "protocol.h"
27 #include "usbwrap.h"
28 #include "endian.h"
29 #include "debug.h"
30 #include <unistd.h>
31 #include <iostream>
32 #include <iomanip>
34 using namespace std;
36 namespace Barry {
38 ///////////////////////////////////////////////////////////////////////////////
39 // SocketDataHandler default methods
41 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
43 // Just log the error
44 eout("SocketDataHandler: Error: " << error.what());
45 (void) error;
48 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
50 // Nothing to destroy
53 ///////////////////////////////////////////////////////////////////////////////
54 // SocketRoutingQueue constructors
56 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count,
57 int default_read_timeout)
58 : m_dev(0)
59 , m_writeEp(0)
60 , m_readEp(0)
61 , m_interest(false)
62 , m_seen_usb_error(false)
63 , m_timeout(default_read_timeout)
64 , m_continue_reading(false)
66 pthread_mutex_init(&m_mutex, NULL);
68 pthread_mutex_init(&m_readwaitMutex, NULL);
69 pthread_cond_init(&m_readwaitCond, NULL);
71 AllocateBuffers(prealloc_buffer_count);
74 SocketRoutingQueue::~SocketRoutingQueue()
76 // thread running?
77 if( m_continue_reading ) {
78 m_continue_reading = false;
79 pthread_join(m_usb_read_thread, NULL);
82 // dump all unused packets to debug output
83 SocketQueueMap::const_iterator b = m_socketQueues.begin();
84 for( ; b != m_socketQueues.end(); ++b ) {
85 DumpSocketQueue(b->first, b->second->m_queue);
87 if( m_default.size() ) {
88 ddout("(Default queue is socket 0)");
89 DumpSocketQueue(0, m_default);
93 ///////////////////////////////////////////////////////////////////////////////
94 // protected members
97 // ReturnBuffer
99 /// Provides a method of returning a buffer to the free queue
100 /// after processing. The DataHandle class calls this automatically
101 /// from its destructor.
102 void SocketRoutingQueue::ReturnBuffer(Data *buf)
104 // don't need to lock here, since m_free handles its own locking
105 m_free.push(buf);
109 // QueuePacket
111 /// Helper function to add a buffer to a socket queue.
112 /// Returns false if no queue is available for that socket.
113 //// Also empties the DataHandle on success.
115 bool SocketRoutingQueue::QueuePacket(SocketId socket, DataHandle &buf)
117 if( m_interest ) {
118 // lock so we can access the m_socketQueues map safely
119 scoped_lock lock(m_mutex);
121 // search for registration of socket
122 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
123 if( qi != m_socketQueues.end() ) {
124 qi->second->m_queue.push(buf.release());
125 return true;
129 return false;
132 bool SocketRoutingQueue::QueuePacket(DataQueue &queue, DataHandle &buf)
134 // don't need to lock here, since queue handles its own locking
135 queue.push(buf.release());
136 return true;
140 // RouteOrQueuePacket
142 /// Same as QueuePacket, except sends the data to the callback if
143 /// a callback is available.
145 /// This function duplicates code from QueuePacket(), in order to
146 /// optimize the mutex locking.
148 bool SocketRoutingQueue::RouteOrQueuePacket(SocketId socket, DataHandle &buf)
150 // search for registration of socket
151 if( m_interest ) {
152 // lock so we can access the m_socketQueues map safely
153 scoped_lock lock(m_mutex);
155 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
156 if( qi != m_socketQueues.end() ) {
157 SocketDataHandlerPtr &sdh = qi->second->m_handler;
159 // is there a handler?
160 if( sdh ) {
161 // unlock & let the handler process it
162 lock.unlock();
163 sdh->DataReceived(*buf.get());
165 // no exceptions thrown, clear the
166 // DataHandle, sending packet back to its
167 // free list
168 buf.reset();
169 return true;
171 else {
172 qi->second->m_queue.push(buf.release());
173 return true;
178 return false;
182 // SimpleReadThread()
184 /// Convenience thread to handle USB read activity.
186 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
188 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
190 // read from USB and write to stdout until finished
191 q->m_seen_usb_error = false;
192 while( q->m_continue_reading ) {
193 try {
194 q->DoRead(1000); // timeout in milliseconds
196 catch (std::runtime_error const &e) {
197 eout("SimpleReadThread received uncaught exception: " << typeid(e).name() << " what: " << e.what());
199 catch (...) {
200 eout("SimpleReadThread recevied uncaught exception of unknown type");
203 return 0;
206 void SocketRoutingQueue::DumpSocketQueue(SocketId socket, const DataQueue &dq)
208 // dump a record of any unused packets in the queue, for debugging
209 if( dq.size() ) {
210 ddout("SocketRoutingQueue Leftovers: "
211 << dec << dq.size()
212 << " packet(s) for socket 0x"
213 << hex << (unsigned int) socket
214 << "\n"
215 << dq);
220 ///////////////////////////////////////////////////////////////////////////////
221 // public API
223 // These functions connect the router to an external Usb::Device
224 // object. Normally this is handled automatically by the
225 // Controller class, but are public here in case they are needed.
226 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
227 SocketDataHandlerPtr callback)
229 scoped_lock lock(m_mutex);
230 m_dev = dev;
231 m_usb_error_dev_callback = callback;
232 m_writeEp = writeEp;
233 m_readEp = readEp;
236 void SocketRoutingQueue::ClearUsbDevice()
238 scoped_lock lock(m_mutex);
239 m_dev = 0;
240 m_usb_error_dev_callback.reset();
241 lock.unlock();
243 // wait for the DoRead cycle to finish, so the external
244 // Usb::Device object doesn't close before we're done with it
245 scoped_lock wait(m_readwaitMutex);
246 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
249 bool SocketRoutingQueue::UsbDeviceReady()
251 scoped_lock lock(m_mutex);
252 return m_dev != 0 && !m_seen_usb_error;
256 // AllocateBuffers
258 /// This class starts out with no buffers, and will grow one buffer
259 /// at a time if needed. Call this to allocate count buffers
260 /// all at once and place them on the free queue. After calling
261 /// this function, at least count buffers will exist in the free
262 /// queue. If there are already count buffers, none will be added.
264 void SocketRoutingQueue::AllocateBuffers(int count)
266 int todo = count - m_free.size();
268 for( int i = 0; i < todo; i++ ) {
269 // m_free handles its own locking
270 m_free.push( new Data );
275 // DefaultRead (both variations)
277 /// Returns the data for the next unregistered socket.
278 /// Blocks until timeout or data is available.
279 /// Returns false (or null pointer) on timeout and no data.
280 /// With the return version of the function, there is no
281 /// copying performed.
283 /// This version performs a copy.
285 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
287 DataHandle buf = DefaultRead(timeout);
288 if( !buf.get() )
289 return false;
291 // copy to desired buffer
292 receive = *buf.get();
293 return true;
297 /// This version does not perform a copy.
299 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
301 if( m_seen_usb_error && timeout == -1 ) {
302 // If an error has been seen and not cleared then no
303 // more data will be read into the queue by
304 // DoRead(). Forcing the timeout to zero allows any
305 // data already in the queue to be read, but prevents
306 // waiting for data which will never arrive.
307 timeout = 0;
310 // m_default handles its own locking
311 // Be careful with the queue timeout, since its -1 means "forever"
312 Data *buf = m_default.wait_pop(timeout == -1 ? m_timeout : timeout);
313 return DataHandle(*this, buf);
317 // RegisterInterest
319 /// Register an interest in data from a certain socket. To read
320 /// from that socket, use the SocketRead() function from then on.
322 /// Any non-registered socket goes in the default queue
323 /// and must be read by DefaultRead()
325 /// If not null, handler is called when new data is read. It will
326 /// be called in the same thread instance that DoRead() is called from.
327 /// Handler is passed the DataQueue Data pointer, and so no
328 /// copying is done. Once the handler returns, the data is
329 /// considered processed and not added to the interested queue,
330 /// but instead returned to m_free.
332 /// Throws std::logic_error if already registered.
334 void SocketRoutingQueue::RegisterInterest(SocketId socket,
335 SocketDataHandlerPtr handler)
337 // modifying our own std::map, need a lock
338 scoped_lock lock(m_mutex);
340 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
341 if( qi != m_socketQueues.end() )
342 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
344 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
345 m_interest = true;
349 // UnregisterInterest
351 /// Unregisters interest in data from the given socket, and discards
352 /// any existing data in its interest queue. Any new incoming data
353 /// for this socket will be placed in the default queue.
355 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
357 // modifying our own std::map, need a lock
358 scoped_lock lock(m_mutex);
360 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
361 if( qi == m_socketQueues.end() )
362 return; // nothing registered, done
364 // dump a record of any unused packets in the queue, for debugging
365 DumpSocketQueue(qi->first, qi->second->m_queue);
367 // salvage all our data buffers
368 m_free.append_from( qi->second->m_queue );
370 // remove the QueueEntryPtr from the map
371 m_socketQueues.erase( qi );
373 // check the interest flag
374 m_interest = m_socketQueues.size() > 0;
378 // SocketRead
380 /// Reads data from the interested socket cache. Can only read
381 /// from sockets that have been previously registered.
383 /// Blocks until timeout or data is available.
385 /// Returns false (or null pointer) on timeout and no data.
386 /// With the return version of the function, there is no
387 /// copying performed.
389 /// Throws std::logic_error if a socket was requested that was
390 /// not previously registered.
392 /// Copying is performed with this function.
394 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
396 DataHandle buf = SocketRead(socket, timeout);
397 if( !buf.get() )
398 return false;
400 // copy to desired buffer
401 receive = *buf.get();
402 return true;
406 /// Copying is not performed with this function.
408 /// Throws std::logic_error if a socket was requested that was
409 /// not previously registered.
411 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
413 QueueEntryPtr qep;
414 DataQueue *dq = 0;
416 // accessing our own std::map, need a lock
418 scoped_lock lock(m_mutex);
419 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
420 if( qi == m_socketQueues.end() )
421 throw std::logic_error("SocketRead requested data from unregistered socket.");
423 // got our queue, save the whole QueueEntryPtr (shared_ptr),
424 // and unlock, since we will be waiting on the DataQueue,
425 // not the socketQueues map
427 // This is safe, since even if UnregisterInterest is called,
428 // our pointer won't be deleted until our shared_ptr
429 // (QueueEntryPtr) goes out of scope.
431 // The remaining problem is that wait_pop() might wait
432 // forever if there is no timeout... c'est la vie.
433 // Should'a used a timeout. :-)
434 qep = qi->second;
435 dq = &qep->m_queue;
438 // get data from DataQueue
439 // Be careful with the queue timeout, since its -1 means "forever"
440 Data *buf = dq->wait_pop(timeout == -1 ? m_timeout : timeout);
442 // specifically delete our copy of shared pointer, in a locked
443 // environment
445 scoped_lock lock(m_mutex);
446 qep.reset();
449 return DataHandle(*this, buf);
452 // Returns true if data is available for that socket.
453 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
455 scoped_lock lock(m_mutex);
456 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
457 if( qi == m_socketQueues.end() )
458 return false;
459 return qi->second->m_queue.size() > 0;
463 // DoRead
465 /// Called by the application's "read thread" to read the next usb
466 /// packet and route it to the correct queue. Returns after every
467 /// read, even if a handler is associated with a queue.
468 /// Note: this function is safe to call before SetUsbDevice() is
469 /// called... it just doesn't do anything if there is no usb
470 /// device to work with.
472 /// Timeout is in milliseconds.
473 // This timeout is for the USB subsystem, so no special handling
474 // for it is needed... just use usbwrap's default timeout.
475 void SocketRoutingQueue::DoRead(int timeout)
477 class ReadWaitSignal
479 pthread_mutex_t &m_Mutex;
480 pthread_cond_t &m_Cond;
481 public:
482 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
483 : m_Mutex(mut), m_Cond(cond)
485 ~ReadWaitSignal()
487 scoped_lock wait(m_Mutex);
488 pthread_cond_signal(&m_Cond);
490 } readwait(m_readwaitMutex, m_readwaitCond);
492 Usb::Device * volatile dev = 0;
493 int readEp;
494 DataHandle buf(*this, 0);
496 // if we are not connected to a USB device yet, just wait
498 scoped_lock lock(m_mutex);
500 if( !m_dev || m_seen_usb_error ) {
501 lock.unlock(); // unlock early, since we're sleeping
502 // sleep only a short time, since things could be
503 // in the process of setup or teardown
504 usleep(125000);
505 return;
508 dev = m_dev;
509 readEp = m_readEp;
511 // fetch a free buffer
512 Data *raw = m_free.pop();
513 if( !raw )
514 buf = DataHandle(*this, new Data);
515 else
516 buf = DataHandle(*this, raw);
519 // take a chance and do the read unlocked, as this has the potential
520 // for blocking for a while
521 try {
523 Data &data = *buf.get();
525 if( !dev->BulkRead(readEp, data, timeout) )
526 return; // no data, done!
528 MAKE_PACKET(pack, data);
530 // make sure the size is right
531 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
532 return; // bad size, just skip
534 // extract the socket from the packet
535 uint16_t socket = btohs(pack->socket);
537 // if this is a sequence packet, handle it specially
538 if( Protocol::IsSequencePacket(data) ) {
539 // sequence.socket is a single byte
540 socket = pack->u.sequence.socket;
542 //////////////////////////////////////////////
543 // ALWAYS queue sequence packets, so that
544 // the socket code can handle SyncSend()
545 if( !QueuePacket(socket, buf) ) {
546 // if no queue available for this
547 // socket, send it to the default
548 // queue
549 QueuePacket(m_default, buf);
552 // done with sequence packet
553 return;
556 // we have data, now route or queue it
557 if( RouteOrQueuePacket(socket, buf) )
558 return; // done
560 // if we get here, send to default queue
561 QueuePacket(m_default, buf);
563 catch( Usb::Timeout & ) {
564 // this is expected... just ignore
566 catch( Usb::Error &ue ) {
567 // set the flag first, in case any of the handlers
568 // are able to recover from this error
569 m_seen_usb_error = true;
571 // this is unexpected, but we're in a thread here...
572 // Need to iterate through all the registered handlers
573 // calling their error callback.
574 // Can't be locked when calling the callback, so need
575 // to make a list of them first.
576 scoped_lock lock(m_mutex);
577 std::vector<SocketDataHandlerPtr> handlers;
578 SocketQueueMap::iterator qi = m_socketQueues.begin();
579 while( qi != m_socketQueues.end() ) {
580 SocketDataHandlerPtr &sdh = qi->second->m_handler;
581 // is there a handler?
582 if( sdh ) {
583 handlers.push_back(sdh);
585 ++qi;
588 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
590 lock.unlock();
591 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
592 while( hi != handlers.end() ) {
593 (*hi)->Error(ue);
594 ++hi;
597 // and finally, call the specific error callback if available
598 if( usb_error_handler.get() ) {
599 usb_error_handler->Error(ue);
604 void SocketRoutingQueue::SpinoffSimpleReadThread()
606 // signal that it's ok to run inside the thread
607 if( m_continue_reading )
608 return; // already running
609 m_continue_reading = true;
611 // Start USB read thread, to handle all routing
612 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
613 if( ret ) {
614 m_continue_reading = false;
615 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
619 } // namespace Barry