3 /// Support classes for the pluggable socket routing system.
7 Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
23 #include "scoped_lock.h"
25 #include "protostructs.h"
33 ///////////////////////////////////////////////////////////////////////////////
34 // SocketDataHandler default methods
36 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error
&error
)
39 eout("SocketDataHandler: Error: " << error
.what());
43 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
48 ///////////////////////////////////////////////////////////////////////////////
49 // SocketRoutingQueue constructors
51 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count
)
56 , m_seen_usb_error(false)
57 , m_continue_reading(false)
59 pthread_mutex_init(&m_mutex
, NULL
);
61 pthread_mutex_init(&m_readwaitMutex
, NULL
);
62 pthread_cond_init(&m_readwaitCond
, NULL
);
64 AllocateBuffers(prealloc_buffer_count
);
67 SocketRoutingQueue::~SocketRoutingQueue()
70 if( m_continue_reading
) {
71 m_continue_reading
= false;
72 pthread_join(m_usb_read_thread
, NULL
);
76 ///////////////////////////////////////////////////////////////////////////////
82 /// Provides a method of returning a buffer to the free queue
83 /// after processing. The DataHandle class calls this automatically
84 /// from its destructor.
85 void SocketRoutingQueue::ReturnBuffer(Data
*buf
)
87 // don't need to lock here, since m_free handles its own locking
94 /// Convenience thread to handle USB read activity.
96 void *SocketRoutingQueue::SimpleReadThread(void *userptr
)
98 SocketRoutingQueue
*q
= (SocketRoutingQueue
*)userptr
;
100 // read from USB and write to stdout until finished
101 q
->m_seen_usb_error
= false;
102 while( q
->m_continue_reading
) {
103 q
->DoRead(1000); // timeout in milliseconds
109 ///////////////////////////////////////////////////////////////////////////////
112 // These functions connect the router to an external Usb::Device
113 // object. Normally this is handled automatically by the
114 // Controller class, but are public here in case they are needed.
115 void SocketRoutingQueue::SetUsbDevice(Usb::Device
*dev
, int writeEp
, int readEp
,
116 SocketDataHandlerPtr callback
)
118 scoped_lock
lock(m_mutex
);
120 m_usb_error_dev_callback
= callback
;
125 void SocketRoutingQueue::ClearUsbDevice()
127 scoped_lock
lock(m_mutex
);
129 m_usb_error_dev_callback
.reset();
132 // wait for the DoRead cycle to finish, so the external
133 // Usb::Device object doesn't close before we're done with it
134 scoped_lock
wait(m_readwaitMutex
);
135 pthread_cond_wait(&m_readwaitCond
, &m_readwaitMutex
);
138 bool SocketRoutingQueue::UsbDeviceReady()
140 scoped_lock
lock(m_mutex
);
141 return m_dev
!= 0 && !m_seen_usb_error
;
147 /// This class starts out with no buffers, and will grow one buffer
148 /// at a time if needed. Call this to allocate count buffers
149 /// all at once and place them on the free queue. After calling
150 /// this function, at least count buffers will exist in the free
151 /// queue. If there are already count buffers, none will be added.
153 void SocketRoutingQueue::AllocateBuffers(int count
)
155 int todo
= count
- m_free
.size();
157 for( int i
= 0; i
< todo
; i
++ ) {
158 // m_free handles its own locking
159 m_free
.push( new Data
);
164 // DefaultRead (both variations)
166 /// Returns the data for the next unregistered socket.
167 /// Blocks until timeout or data is available.
168 /// Returns false (or null pointer) on timeout and no data.
169 /// With the return version of the function, there is no
170 /// copying performed.
172 /// This version performs a copy.
174 bool SocketRoutingQueue::DefaultRead(Data
&receive
, int timeout
)
176 DataHandle buf
= DefaultRead(timeout
);
180 // copy to desired buffer
181 receive
= *buf
.get();
186 /// This version does not perform a copy.
188 DataHandle
SocketRoutingQueue::DefaultRead(int timeout
)
190 if( m_seen_usb_error
&& timeout
== -1 )
191 // If an error has been seen and not cleared then no
192 // more data will be read into the queue by
193 // DoRead(). Forcing the timeout to zero allows any
194 // data already in the queue to be read, but prevents
195 // waiting for data which will never arrive.
197 // m_default handles its own locking
198 Data
*buf
= m_default
.wait_pop(timeout
);
199 return DataHandle(*this, buf
);
205 /// Register an interest in data from a certain socket. To read
206 /// from that socket, use the SocketRead() function from then on.
208 /// Any non-registered socket goes in the default queue
209 /// and must be read by DefaultRead()
211 /// If not null, handler is called when new data is read. It will
212 /// be called in the same thread instance that DoRead() is called from.
213 /// Handler is passed the DataQueue Data pointer, and so no
214 /// copying is done. Once the handler returns, the data is
215 /// considered processed and not added to the interested queue,
216 /// but instead returned to m_free.
218 /// Throws std::logic_error if already registered.
220 void SocketRoutingQueue::RegisterInterest(SocketId socket
,
221 SocketDataHandlerPtr handler
)
223 // modifying our own std::map, need a lock
224 scoped_lock
lock(m_mutex
);
226 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
227 if( qi
!= m_socketQueues
.end() )
228 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
230 m_socketQueues
[socket
] = QueueEntryPtr( new QueueEntry(handler
) );
235 // UnregisterInterest
237 /// Unregisters interest in data from the given socket, and discards
238 /// any existing data in its interest queue. Any new incoming data
239 /// for this socket will be placed in the default queue.
241 void SocketRoutingQueue::UnregisterInterest(SocketId socket
)
243 // modifying our own std::map, need a lock
244 scoped_lock
lock(m_mutex
);
246 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
247 if( qi
== m_socketQueues
.end() )
248 return; // nothing registered, done
250 // salvage all our data buffers
251 m_free
.append_from( qi
->second
->m_queue
);
253 // remove the QueueEntryPtr from the map
254 m_socketQueues
.erase( qi
);
256 // check the interest flag
257 m_interest
= m_socketQueues
.size() > 0;
263 /// Reads data from the interested socket cache. Can only read
264 /// from sockets that have been previously registered.
266 /// Blocks until timeout or data is available.
268 /// Returns false (or null pointer) on timeout and no data.
269 /// With the return version of the function, there is no
270 /// copying performed.
272 /// Throws std::logic_error if a socket was requested that was
273 /// not previously registered.
275 /// Copying is performed with this function.
277 bool SocketRoutingQueue::SocketRead(SocketId socket
, Data
&receive
, int timeout
)
279 DataHandle buf
= SocketRead(socket
, timeout
);
283 // copy to desired buffer
284 receive
= *buf
.get();
289 /// Copying is not performed with this function.
291 /// Throws std::logic_error if a socket was requested that was
292 /// not previously registered.
294 DataHandle
SocketRoutingQueue::SocketRead(SocketId socket
, int timeout
)
299 // accessing our own std::map, need a lock
301 scoped_lock
lock(m_mutex
);
302 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
303 if( qi
== m_socketQueues
.end() )
304 throw std::logic_error("SocketRead requested data from unregistered socket.");
306 // got our queue, save the whole QueueEntryPtr (shared_ptr),
307 // and unlock, since we will be waiting on the DataQueue,
308 // not the socketQueues map
310 // This is safe, since even if UnregisterInterest is called,
311 // our pointer won't be deleted until our shared_ptr
312 // (QueueEntryPtr) goes out of scope.
314 // The remaining problem is that wait_pop() might wait
315 // forever if there is no timeout... c'est la vie.
316 // Should'a used a timeout. :-)
321 // get data from DataQueue
322 Data
*buf
= dq
->wait_pop(timeout
);
324 // specifically delete our copy of shared pointer, in a locked
327 scoped_lock
lock(m_mutex
);
331 return DataHandle(*this, buf
);
334 // Returns true if data is available for that socket.
335 bool SocketRoutingQueue::IsAvailable(SocketId socket
) const
337 scoped_lock
lock(m_mutex
);
338 SocketQueueMap::const_iterator qi
= m_socketQueues
.find(socket
);
339 if( qi
== m_socketQueues
.end() )
341 return qi
->second
->m_queue
.size() > 0;
347 /// Called by the application's "read thread" to read the next usb
348 /// packet and route it to the correct queue. Returns after every
349 /// read, even if a handler is associated with a queue.
350 /// Note: this function is safe to call before SetUsbDevice() is
351 /// called... it just doesn't do anything if there is no usb
352 /// device to work with.
354 /// Timeout is in milliseconds.
355 void SocketRoutingQueue::DoRead(int timeout
)
359 pthread_mutex_t
&m_Mutex
;
360 pthread_cond_t
&m_Cond
;
362 ReadWaitSignal(pthread_mutex_t
&mut
, pthread_cond_t
&cond
)
363 : m_Mutex(mut
), m_Cond(cond
)
367 scoped_lock
wait(m_Mutex
);
368 pthread_cond_signal(&m_Cond
);
370 } readwait(m_readwaitMutex
, m_readwaitCond
);
372 Usb::Device
* volatile dev
= 0;
374 DataHandle
buf(*this, 0);
376 // if we are not connected to a USB device yet, just wait
378 scoped_lock
lock(m_mutex
);
380 if( !m_dev
|| m_seen_usb_error
) {
381 lock
.unlock(); // unlock early, since we're sleeping
382 // sleep only a short time, since things could be
383 // in the process of setup or teardown
391 // fetch a free buffer
392 Data
*raw
= m_free
.pop();
394 buf
= DataHandle(*this, new Data
);
396 buf
= DataHandle(*this, raw
);
399 // take a chance and do the read unlocked, as this has the potential
400 // for blocking for a while
403 Data
&data
= *buf
.get();
405 if( !dev
->BulkRead(readEp
, data
, timeout
) )
406 return; // no data, done!
408 MAKE_PACKET(pack
, data
);
410 // make sure the size is right
411 if( data
.GetSize() < SB_PACKET_SOCKET_SIZE
)
412 throw UnroutableReadError(data
.GetSize(), sizeof(pack
->socket
));
414 // extract the socket from the packet
415 uint16_t socket
= btohs(pack
->socket
);
417 // we have data, now lock up again to place it
418 // in the right queue
419 scoped_lock
lock(m_mutex
);
421 // search for registration of socket
423 SocketQueueMap::iterator qi
= m_socketQueues
.find(socket
);
424 if( qi
!= m_socketQueues
.end() ) {
425 SocketDataHandlerPtr
&sdh
= qi
->second
->m_handler
;
427 // is there a handler?
429 // unlock & let the handler process it
431 sdh
->DataReceived(*buf
.get());
435 qi
->second
->m_queue
.push(buf
.release());
443 // safe to unlock now, we are done with the map
446 // if we get here, send to default queue
447 m_default
.push(buf
.release());
449 catch( Usb::Timeout
& ) {
450 // this is expected... just ignore
452 catch( Usb::Error
&ue
) {
453 // set the flag first, in case any of the handlers
454 // are able to recover from this error
455 m_seen_usb_error
= true;
457 NotifyHandlersOfError(ue
);
459 catch( UnroutableReadError
&e
) {
460 // Although this isn't a USB error the usb error flag
461 // is set. This is because devices seem to never send
462 // data which is this small and therefore the only
463 // time this error is seen (so far) is when the USB
464 // port has been reset
465 m_seen_usb_error
= true;
467 NotifyHandlersOfError(e
);
471 void SocketRoutingQueue::NotifyHandlersOfError(Barry::Error
&error
)
473 // Need to iterate through all the registered handlers
474 // calling their error callback.
475 // Can't be locked when calling the callback, so need
476 // to make a list of them first.
477 scoped_lock
lock(m_mutex
);
478 std::vector
<SocketDataHandlerPtr
> handlers
;
479 SocketQueueMap::iterator qi
= m_socketQueues
.begin();
480 while( qi
!= m_socketQueues
.end() ) {
481 SocketDataHandlerPtr
&sdh
= qi
->second
->m_handler
;
482 // is there a handler?
484 handlers
.push_back(sdh
);
489 SocketDataHandlerPtr usb_error_handler
= m_usb_error_dev_callback
;
492 std::vector
<SocketDataHandlerPtr
>::iterator hi
= handlers
.begin();
493 while( hi
!= handlers
.end() ) {
498 // and finally, call the specific error callback if available
499 if( usb_error_handler
.get() ) {
500 usb_error_handler
->Error(error
);
504 void SocketRoutingQueue::SpinoffSimpleReadThread()
506 // signal that it's ok to run inside the thread
507 if( m_continue_reading
)
508 return; // already running
509 m_continue_reading
= true;
511 // Start USB read thread, to handle all routing
512 int ret
= pthread_create(&m_usb_read_thread
, NULL
, &SimpleReadThread
, this);
514 m_continue_reading
= false;
515 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret
);