3 /// Support classes for the pluggable socket routing system.
7 Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
23 #include "scoped_lock.h"
25 #include "protostructs.h"
33 ///////////////////////////////////////////////////////////////////////////////
34 // SocketDataHandler default methods
36 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
39 dout("SocketDataHandler: Error: " << error.what());
43 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
48 ///////////////////////////////////////////////////////////////////////////////
49 // SocketRoutingQueue constructors
51 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count)
56 , m_seen_usb_error(false)
57 , m_continue_reading(false)
59 pthread_mutex_init(&m_mutex, NULL);
61 pthread_mutex_init(&m_readwaitMutex, NULL);
62 pthread_cond_init(&m_readwaitCond, NULL);
64 AllocateBuffers(prealloc_buffer_count);
67 SocketRoutingQueue::~SocketRoutingQueue()
70 if( m_continue_reading ) {
71 m_continue_reading = false;
72 pthread_join(m_usb_read_thread, NULL);
76 ///////////////////////////////////////////////////////////////////////////////
82 /// Provides a method of returning a buffer to the free queue
83 /// after processing. The DataHandle class calls this automatically
84 /// from its destructor.
85 void SocketRoutingQueue::ReturnBuffer(Data *buf)
87 // don't need to lock here, since m_free handles its own locking
94 /// Convenience thread to handle USB read activity.
96 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
98 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
100 // read from USB and write to stdout until finished
101 q->m_seen_usb_error = false;
102 while( q->m_continue_reading ) {
103 q->DoRead(1000); // timeout in milliseconds
109 ///////////////////////////////////////////////////////////////////////////////
112 // These functions connect the router to an external Usb::Device
113 // object. Normally this is handled automatically by the
114 // Controller class, but are public here in case they are needed.
115 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
116 SocketDataHandlerPtr callback)
118 scoped_lock lock(m_mutex);
120 m_usb_error_dev_callback = callback;
125 void SocketRoutingQueue::ClearUsbDevice()
127 scoped_lock lock(m_mutex);
129 m_usb_error_dev_callback.reset();
132 // wait for the DoRead cycle to finish, so the external
133 // Usb::Device object doesn't close before we're done with it
134 scoped_lock wait(m_readwaitMutex);
135 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
138 bool SocketRoutingQueue::UsbDeviceReady()
140 scoped_lock lock(m_mutex);
141 return m_dev != 0 && !m_seen_usb_error;
147 /// This class starts out with no buffers, and will grow one buffer
148 /// at a time if needed. Call this to allocate count buffers
149 /// all at once and place them on the free queue. After calling
150 /// this function, at least count buffers will exist in the free
151 /// queue. If there are already count buffers, none will be added.
153 void SocketRoutingQueue::AllocateBuffers(int count)
155 int todo = count - m_free.size();
157 for( int i = 0; i < todo; i++ ) {
158 // m_free handles its own locking
159 m_free.push( new Data );
164 // DefaultRead (both variations)
166 /// Returns the data for the next unregistered socket.
167 /// Blocks until timeout or data is available.
168 /// Returns false (or null pointer) on timeout and no data.
169 /// With the return version of the function, there is no
170 /// copying performed.
172 /// This version performs a copy.
174 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
176 DataHandle buf = DefaultRead(timeout);
180 // copy to desired buffer
181 receive = *buf.get();
186 /// This version does not perform a copy.
188 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
190 // m_default handles its own locking
191 Data *buf = m_default.wait_pop(timeout);
192 return DataHandle(*this, buf);
198 /// Register an interest in data from a certain socket. To read
199 /// from that socket, use the SocketRead() function from then on.
201 /// Any non-registered socket goes in the default queue
202 /// and must be read by DefaultRead()
204 /// If not null, handler is called when new data is read. It will
205 /// be called in the same thread instance that DoRead() is called from.
206 /// Handler is passed the DataQueue Data pointer, and so no
207 /// copying is done. Once the handler returns, the data is
208 /// considered processed and not added to the interested queue,
209 /// but instead returned to m_free.
211 /// Throws std::logic_error if already registered.
213 void SocketRoutingQueue::RegisterInterest(SocketId socket,
214 SocketDataHandlerPtr handler)
216 // modifying our own std::map, need a lock
217 scoped_lock lock(m_mutex);
219 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
220 if( qi != m_socketQueues.end() )
221 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
223 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
228 // UnregisterInterest
230 /// Unregisters interest in data from the given socket, and discards
231 /// any existing data in its interest queue. Any new incoming data
232 /// for this socket will be placed in the default queue.
234 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
236 // modifying our own std::map, need a lock
237 scoped_lock lock(m_mutex);
239 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
240 if( qi == m_socketQueues.end() )
241 return; // nothing registered, done
243 // salvage all our data buffers
244 m_free.append_from( qi->second->m_queue );
246 // remove the QueueEntryPtr from the map
247 m_socketQueues.erase( qi );
249 // check the interest flag
250 m_interest = m_socketQueues.size() > 0;
256 /// Reads data from the interested socket cache. Can only read
257 /// from sockets that have been previously registered.
259 /// Blocks until timeout or data is available.
261 /// Returns false (or null pointer) on timeout and no data.
262 /// With the return version of the function, there is no
263 /// copying performed.
265 /// Throws std::logic_error if a socket was requested that was
266 /// not previously registered.
268 /// Copying is performed with this function.
270 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
272 DataHandle buf = SocketRead(socket, timeout);
276 // copy to desired buffer
277 receive = *buf.get();
282 /// Copying is not performed with this function.
284 /// Throws std::logic_error if a socket was requested that was
285 /// not previously registered.
287 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
292 // accessing our own std::map, need a lock
294 scoped_lock lock(m_mutex);
295 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
296 if( qi == m_socketQueues.end() )
297 throw std::logic_error("SocketRead requested data from unregistered socket.");
299 // got our queue, save the whole QueueEntryPtr (shared_ptr),
300 // and unlock, since we will be waiting on the DataQueue,
301 // not the socketQueues map
303 // This is safe, since even if UnregisterInterest is called,
304 // our pointer won't be deleted until our shared_ptr
305 // (QueueEntryPtr) goes out of scope.
307 // The remaining problem is that wait_pop() might wait
308 // forever if there is no timeout... c'est la vie.
309 // Should'a used a timeout. :-)
314 // get data from DataQueue
315 Data *buf = dq->wait_pop(timeout);
317 // specifically delete our copy of shared pointer, in a locked
320 scoped_lock lock(m_mutex);
324 return DataHandle(*this, buf);
327 // Returns true if data is available for that socket.
328 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
330 scoped_lock lock(m_mutex);
331 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
332 if( qi == m_socketQueues.end() )
334 return qi->second->m_queue.size() > 0;
340 /// Called by the application's "read thread" to read the next usb
341 /// packet and route it to the correct queue. Returns after every
342 /// read, even if a handler is associated with a queue.
343 /// Note: this function is safe to call before SetUsbDevice() is
344 /// called... it just doesn't do anything if there is no usb
345 /// device to work with.
347 /// Timeout is in milliseconds.
348 void SocketRoutingQueue::DoRead(int timeout)
352 pthread_mutex_t &m_Mutex;
353 pthread_cond_t &m_Cond;
355 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
356 : m_Mutex(mut), m_Cond(cond)
360 scoped_lock wait(m_Mutex);
361 pthread_cond_signal(&m_Cond);
363 } readwait(m_readwaitMutex, m_readwaitCond);
365 Usb::Device * volatile dev = 0;
367 DataHandle buf(*this, 0);
369 // if we are not connected to a USB device yet, just wait
371 scoped_lock lock(m_mutex);
373 if( !m_dev || m_seen_usb_error ) {
374 lock.unlock(); // unlock early, since we're sleeping
375 // sleep only a short time, since things could be
376 // in the process of setup or teardown
384 // fetch a free buffer
385 Data *raw = m_free.pop();
387 buf = DataHandle(*this, new Data);
389 buf = DataHandle(*this, raw);
392 // take a chance and do the read unlocked, as this has the potential
393 // for blocking for a while
396 Data &data = *buf.get();
398 if( !dev->BulkRead(readEp, data, timeout) )
399 return; // no data, done!
401 MAKE_PACKET(pack, data);
403 // make sure the size is right
404 if( data.GetSize() < sizeof(pack->socket) )
405 return; // bad size, just skip
407 // extract the socket from the packet
408 uint16_t socket = btohs(pack->socket);
410 // we have data, now lock up again to place it
411 // in the right queue
412 scoped_lock lock(m_mutex);
414 // search for registration of socket
416 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
417 if( qi != m_socketQueues.end() ) {
418 SocketDataHandlerPtr &sdh = qi->second->m_handler;
420 // is there a handler?
422 // unlock & let the handler process it
424 sdh->DataReceived(*buf.get());
428 qi->second->m_queue.push(buf.release());
436 // safe to unlock now, we are done with the map
439 // if we get here, send to default queue
440 m_default.push(buf.release());
442 catch( Usb::Timeout & ) {
443 // this is expected... just ignore
445 catch( Usb::Error &ue ) {
446 // set the flag first, in case any of the handlers
447 // are able to recover from this error
448 m_seen_usb_error = true;
450 // this is unexpected, but we're in a thread here...
451 // Need to iterate through all the registered handlers
452 // calling their error callback.
453 // Can't be locked when calling the callback, so need
454 // to make a list of them first.
455 scoped_lock lock(m_mutex);
456 std::vector<SocketDataHandlerPtr> handlers;
457 SocketQueueMap::iterator qi = m_socketQueues.begin();
458 while( qi != m_socketQueues.end() ) {
459 SocketDataHandlerPtr &sdh = qi->second->m_handler;
460 // is there a handler?
462 handlers.push_back(sdh);
467 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
470 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
471 while( hi != handlers.end() ) {
476 // and finally, call the specific error callback if available
477 if( usb_error_handler.get() ) {
478 usb_error_handler->Error(ue);
483 void SocketRoutingQueue::SpinoffSimpleReadThread()
485 // signal that it's ok to run inside the thread
486 if( m_continue_reading )
487 return; // already running
488 m_continue_reading = true;
490 // Start USB read thread, to handle all routing
491 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
493 m_continue_reading = false;
494 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);