3 /// Support classes for the pluggable socket routing system.
7 Copyright (C) 2008-2013, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
24 #include "scoped_lock.h"
26 #include "protostructs.h"
39 ///////////////////////////////////////////////////////////////////////////////
40 // SocketDataHandler default methods
42 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error
&error
)
45 eout("SocketDataHandler: Error: " << error
.what());
49 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
54 ///////////////////////////////////////////////////////////////////////////////
55 // SocketRoutingQueue constructors
57 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count
,
58 int default_read_timeout
)
63 , m_seen_usb_error(false)
64 , m_timeout(default_read_timeout
)
65 , m_continue_reading(false)
67 pthread_mutex_init(&m_mutex
, NULL
);
69 pthread_mutex_init(&m_readwaitMutex
, NULL
);
70 pthread_cond_init(&m_readwaitCond
, NULL
);
72 AllocateBuffers(prealloc_buffer_count
);
75 SocketRoutingQueue::~SocketRoutingQueue()
78 if( m_continue_reading
) {
79 m_continue_reading
= false;
80 pthread_join(m_usb_read_thread
, NULL
);
83 // dump all unused packets to debug output
84 SocketQueueMap::const_iterator b
= m_socketQueues
.begin();
85 for( ; b
!= m_socketQueues
.end(); ++b
) {
86 DumpSocketQueue(b
->first
, b
->second
->m_queue
);
88 if( m_default
.size() ) {
89 ddout("(Default queue is socket 0)");
90 DumpSocketQueue(0, m_default
);
94 ///////////////////////////////////////////////////////////////////////////////
100 /// Provides a method of returning a buffer to the free queue
101 /// after processing. The DataHandle class calls this automatically
102 /// from its destructor.
103 void SocketRoutingQueue::ReturnBuffer(Data
*buf
)
105 // don't need to lock here, since m_free handles its own locking
112 /// Helper function to add a buffer to a socket queue.
113 /// Returns false if no queue is available for that socket.
114 //// Also empties the DataHandle on success.
116 bool SocketRoutingQueue::QueuePacket(SocketId socket
, DataHandle
&buf
)
119 // lock so we can access the m_socketQueues map safely
120 scoped_lock
lock(m_mutex
);
122 // search for registration of socket
123 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
124 if( qi
!= m_socketQueues
.end() ) {
125 qi
->second
->m_queue
.push(buf
.release());
133 bool SocketRoutingQueue::QueuePacket(DataQueue
&queue
, DataHandle
&buf
)
135 // don't need to lock here, since queue handles its own locking
136 queue
.push(buf
.release());
141 // RouteOrQueuePacket
143 /// Same as QueuePacket, except sends the data to the callback if
144 /// a callback is available.
146 /// This function duplicates code from QueuePacket(), in order to
147 /// optimize the mutex locking.
149 bool SocketRoutingQueue::RouteOrQueuePacket(SocketId socket
, DataHandle
&buf
)
151 // search for registration of socket
153 // lock so we can access the m_socketQueues map safely
154 scoped_lock
lock(m_mutex
);
156 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
157 if( qi
!= m_socketQueues
.end() ) {
158 SocketDataHandlerPtr
&sdh
= qi
->second
->m_handler
;
160 // is there a handler?
162 // unlock & let the handler process it
164 sdh
->DataReceived(*buf
.get());
166 // no exceptions thrown, clear the
167 // DataHandle, sending packet back to its
173 qi
->second
->m_queue
.push(buf
.release());
183 // SimpleReadThread()
185 /// Convenience thread to handle USB read activity.
187 void *SocketRoutingQueue::SimpleReadThread(void *userptr
)
189 SocketRoutingQueue
*q
= (SocketRoutingQueue
*)userptr
;
191 // read from USB and write to stdout until finished
192 q
->m_seen_usb_error
= false;
193 while( q
->m_continue_reading
) {
195 q
->DoRead(1000); // timeout in milliseconds
197 catch (std::runtime_error
const &e
) {
198 eout(_("SimpleReadThread received uncaught exception: ") << typeid(e
).name() << _(" what: ") << e
.what());
201 eout(_("SimpleReadThread recevied uncaught exception of unknown type"));
207 void SocketRoutingQueue::DumpSocketQueue(SocketId socket
, const DataQueue
&dq
)
209 // dump a record of any unused packets in the queue, for debugging
211 ddout(_("SocketRoutingQueue Leftovers: ")
213 << _(" packet(s) for socket: ") << "0x"
214 << hex
<< (unsigned int) socket
221 ///////////////////////////////////////////////////////////////////////////////
224 // These functions connect the router to an external Usb::Device
225 // object. Normally this is handled automatically by the
226 // Controller class, but are public here in case they are needed.
227 void SocketRoutingQueue::SetUsbDevice(Usb::Device
*dev
, int writeEp
, int readEp
,
228 SocketDataHandlerPtr callback
)
230 scoped_lock
lock(m_mutex
);
232 m_usb_error_dev_callback
= callback
;
237 void SocketRoutingQueue::ClearUsbDevice()
239 scoped_lock
lock(m_mutex
);
241 m_usb_error_dev_callback
.reset();
244 // wait for the DoRead cycle to finish, so the external
245 // Usb::Device object doesn't close before we're done with it
246 scoped_lock
wait(m_readwaitMutex
);
247 pthread_cond_wait(&m_readwaitCond
, &m_readwaitMutex
);
250 bool SocketRoutingQueue::UsbDeviceReady()
252 scoped_lock
lock(m_mutex
);
253 return m_dev
!= 0 && !m_seen_usb_error
;
259 /// This class starts out with no buffers, and will grow one buffer
260 /// at a time if needed. Call this to allocate count buffers
261 /// all at once and place them on the free queue. After calling
262 /// this function, at least count buffers will exist in the free
263 /// queue. If there are already count buffers, none will be added.
265 void SocketRoutingQueue::AllocateBuffers(int count
)
267 int todo
= count
- m_free
.size();
269 for( int i
= 0; i
< todo
; i
++ ) {
270 // m_free handles its own locking
271 m_free
.push( new Data
);
276 // DefaultRead (both variations)
278 /// Returns the data for the next unregistered socket.
279 /// Blocks until timeout or data is available.
280 /// Returns false (or null pointer) on timeout and no data.
281 /// With the return version of the function, there is no
282 /// copying performed.
284 /// This version performs a copy.
286 bool SocketRoutingQueue::DefaultRead(Data
&receive
, int timeout
)
288 DataHandle buf
= DefaultRead(timeout
);
292 // copy to desired buffer
293 receive
= *buf
.get();
298 /// This version does not perform a copy.
300 DataHandle
SocketRoutingQueue::DefaultRead(int timeout
)
302 if( m_seen_usb_error
&& timeout
== -1 ) {
303 // If an error has been seen and not cleared then no
304 // more data will be read into the queue by
305 // DoRead(). Forcing the timeout to zero allows any
306 // data already in the queue to be read, but prevents
307 // waiting for data which will never arrive.
311 // m_default handles its own locking
312 // Be careful with the queue timeout, since its -1 means "forever"
313 Data
*buf
= m_default
.wait_pop(timeout
== -1 ? m_timeout
: timeout
);
314 return DataHandle(*this, buf
);
320 /// Register an interest in data from a certain socket. To read
321 /// from that socket, use the SocketRead() function from then on.
323 /// Any non-registered socket goes in the default queue
324 /// and must be read by DefaultRead()
326 /// If not null, handler is called when new data is read. It will
327 /// be called in the same thread instance that DoRead() is called from.
328 /// Handler is passed the DataQueue Data pointer, and so no
329 /// copying is done. Once the handler returns, the data is
330 /// considered processed and not added to the interested queue,
331 /// but instead returned to m_free.
333 /// Throws std::logic_error if already registered.
335 void SocketRoutingQueue::RegisterInterest(SocketId socket
,
336 SocketDataHandlerPtr handler
)
338 // modifying our own std::map, need a lock
339 scoped_lock
lock(m_mutex
);
341 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
342 if( qi
!= m_socketQueues
.end() )
343 throw std::logic_error(_("RegisterInterest requesting a previously registered socket."));
345 m_socketQueues
[socket
] = QueueEntryPtr( new QueueEntry(handler
) );
350 // UnregisterInterest
352 /// Unregisters interest in data from the given socket, and discards
353 /// any existing data in its interest queue. Any new incoming data
354 /// for this socket will be placed in the default queue.
356 void SocketRoutingQueue::UnregisterInterest(SocketId socket
)
358 // modifying our own std::map, need a lock
359 scoped_lock
lock(m_mutex
);
361 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
362 if( qi
== m_socketQueues
.end() )
363 return; // nothing registered, done
365 // dump a record of any unused packets in the queue, for debugging
366 DumpSocketQueue(qi
->first
, qi
->second
->m_queue
);
368 // salvage all our data buffers
369 m_free
.append_from( qi
->second
->m_queue
);
371 // remove the QueueEntryPtr from the map
372 m_socketQueues
.erase( qi
);
374 // check the interest flag
375 m_interest
= m_socketQueues
.size() > 0;
381 /// Reads data from the interested socket cache. Can only read
382 /// from sockets that have been previously registered.
384 /// Blocks until timeout or data is available.
386 /// Returns false (or null pointer) on timeout and no data.
387 /// With the return version of the function, there is no
388 /// copying performed.
390 /// Throws std::logic_error if a socket was requested that was
391 /// not previously registered.
393 /// Copying is performed with this function.
395 bool SocketRoutingQueue::SocketRead(SocketId socket
, Data
&receive
, int timeout
)
397 DataHandle buf
= SocketRead(socket
, timeout
);
401 // copy to desired buffer
402 receive
= *buf
.get();
407 /// Copying is not performed with this function.
409 /// Throws std::logic_error if a socket was requested that was
410 /// not previously registered.
412 DataHandle
SocketRoutingQueue::SocketRead(SocketId socket
, int timeout
)
417 // accessing our own std::map, need a lock
419 scoped_lock
lock(m_mutex
);
420 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
421 if( qi
== m_socketQueues
.end() )
422 throw std::logic_error(_("SocketRead requested data from unregistered socket."));
424 // got our queue, save the whole QueueEntryPtr (shared_ptr),
425 // and unlock, since we will be waiting on the DataQueue,
426 // not the socketQueues map
428 // This is safe, since even if UnregisterInterest is called,
429 // our pointer won't be deleted until our shared_ptr
430 // (QueueEntryPtr) goes out of scope.
432 // The remaining problem is that wait_pop() might wait
433 // forever if there is no timeout... c'est la vie.
434 // Should'a used a timeout. :-)
439 // get data from DataQueue
440 // Be careful with the queue timeout, since its -1 means "forever"
441 Data
*buf
= dq
->wait_pop(timeout
== -1 ? m_timeout
: timeout
);
443 // specifically delete our copy of shared pointer, in a locked
446 scoped_lock
lock(m_mutex
);
450 return DataHandle(*this, buf
);
453 // Returns true if data is available for that socket.
454 bool SocketRoutingQueue::IsAvailable(SocketId socket
) const
456 scoped_lock
lock(m_mutex
);
457 SocketQueueMap::const_iterator qi
= m_socketQueues
.find(socket
);
458 if( qi
== m_socketQueues
.end() )
460 return qi
->second
->m_queue
.size() > 0;
466 /// Called by the application's "read thread" to read the next usb
467 /// packet and route it to the correct queue. Returns after every
468 /// read, even if a handler is associated with a queue.
469 /// Note: this function is safe to call before SetUsbDevice() is
470 /// called... it just doesn't do anything if there is no usb
471 /// device to work with.
473 /// Timeout is in milliseconds.
474 // This timeout is for the USB subsystem, so no special handling
475 // for it is needed... just use usbwrap's default timeout.
476 void SocketRoutingQueue::DoRead(int timeout
)
480 pthread_mutex_t
&m_Mutex
;
481 pthread_cond_t
&m_Cond
;
483 ReadWaitSignal(pthread_mutex_t
&mut
, pthread_cond_t
&cond
)
484 : m_Mutex(mut
), m_Cond(cond
)
488 scoped_lock
wait(m_Mutex
);
489 pthread_cond_signal(&m_Cond
);
491 } readwait(m_readwaitMutex
, m_readwaitCond
);
493 Usb::Device
* volatile dev
= 0;
495 DataHandle
buf(*this, 0);
497 // if we are not connected to a USB device yet, just wait
499 scoped_lock
lock(m_mutex
);
501 if( !m_dev
|| m_seen_usb_error
) {
502 lock
.unlock(); // unlock early, since we're sleeping
503 // sleep only a short time, since things could be
504 // in the process of setup or teardown
512 // fetch a free buffer
513 Data
*raw
= m_free
.pop();
515 buf
= DataHandle(*this, new Data
);
517 buf
= DataHandle(*this, raw
);
520 // take a chance and do the read unlocked, as this has the potential
521 // for blocking for a while
524 Data
&data
= *buf
.get();
526 if( !dev
->BulkRead(readEp
, data
, timeout
) )
527 return; // no data, done!
529 MAKE_PACKET(pack
, data
);
531 // make sure the size is right
532 if( data
.GetSize() < SB_PACKET_SOCKET_SIZE
)
533 return; // bad size, just skip
535 // extract the socket from the packet
536 uint16_t socket
= btohs(pack
->socket
);
538 // if this is a sequence packet, handle it specially
539 if( Protocol::IsSequencePacket(data
) ) {
540 // sequence.socket is a single byte
541 socket
= pack
->u
.sequence
.socket
;
543 //////////////////////////////////////////////
544 // ALWAYS queue sequence packets, so that
545 // the socket code can handle SyncSend()
546 if( !QueuePacket(socket
, buf
) ) {
547 // if no queue available for this
548 // socket, send it to the default
550 QueuePacket(m_default
, buf
);
553 // done with sequence packet
557 // we have data, now route or queue it
558 if( RouteOrQueuePacket(socket
, buf
) )
561 // if we get here, send to default queue
562 QueuePacket(m_default
, buf
);
564 catch( Usb::Timeout
& ) {
565 // this is expected... just ignore
567 catch( Usb::Error
&ue
) {
568 // set the flag first, in case any of the handlers
569 // are able to recover from this error
570 m_seen_usb_error
= true;
572 // this is unexpected, but we're in a thread here...
573 // Need to iterate through all the registered handlers
574 // calling their error callback.
575 // Can't be locked when calling the callback, so need
576 // to make a list of them first.
577 scoped_lock
lock(m_mutex
);
578 std::vector
<SocketDataHandlerPtr
> handlers
;
579 SocketQueueMap::iterator qi
= m_socketQueues
.begin();
580 while( qi
!= m_socketQueues
.end() ) {
581 SocketDataHandlerPtr
&sdh
= qi
->second
->m_handler
;
582 // is there a handler?
584 handlers
.push_back(sdh
);
589 SocketDataHandlerPtr usb_error_handler
= m_usb_error_dev_callback
;
592 std::vector
<SocketDataHandlerPtr
>::iterator hi
= handlers
.begin();
593 while( hi
!= handlers
.end() ) {
598 // and finally, call the specific error callback if available
599 if( usb_error_handler
.get() ) {
600 usb_error_handler
->Error(ue
);
605 void SocketRoutingQueue::SpinoffSimpleReadThread()
607 // signal that it's ok to run inside the thread
608 if( m_continue_reading
)
609 return; // already running
610 m_continue_reading
= true;
612 // Start USB read thread, to handle all routing
613 int ret
= pthread_create(&m_usb_read_thread
, NULL
, &SimpleReadThread
, this);
615 m_continue_reading
= false;
616 throw Barry::ErrnoError(_("SocketRoutingQueue: Error creating USB read thread."), ret
);