lib: added belt-and-suspenders fix to usb Read functions
[barry.git] / src / router.cc
1 ///
2 /// \file       router.cc
3 ///             Support classes for the pluggable socket routing system.
4 ///
5
6 /*
7     Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
8
9     This program is free software; you can redistribute it and/or modify
10     it under the terms of the GNU General Public License as published by
11     the Free Software Foundation; either version 2 of the License, or
12     (at your option) any later version.
13
14     This program is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
17
18     See the GNU General Public License in the COPYING file at the
19     root directory of this project for more details.
20 */
21
22 #include "router.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "protostructs.h"
26 #include "usbwrap.h"
27 #include "endian.h"
28 #include "debug.h"
29 #include <unistd.h>
30
31 namespace Barry {
32
33 ///////////////////////////////////////////////////////////////////////////////
34 // SocketDataHandler default methods
35
36 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
37 {
38         // Just log the error
39         dout("SocketDataHandler: Error: " << error.what());
40         (void) error;
41 }
42
43 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
44 {
45         // Nothing to destroy
46 }
47
48 ///////////////////////////////////////////////////////////////////////////////
49 // SocketRoutingQueue constructors
50
51 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count)
52         : m_dev(0)
53         , m_writeEp(0)
54         , m_readEp(0)
55         , m_interest(false)
56         , m_seen_usb_error(false)
57         , m_continue_reading(false)
58 {
59         pthread_mutex_init(&m_mutex, NULL);
60
61         pthread_mutex_init(&m_readwaitMutex, NULL);
62         pthread_cond_init(&m_readwaitCond, NULL);
63
64         AllocateBuffers(prealloc_buffer_count);
65 }
66
67 SocketRoutingQueue::~SocketRoutingQueue()
68 {
69         // thread running?
70         if( m_continue_reading ) {
71                 m_continue_reading = false;
72                 pthread_join(m_usb_read_thread, NULL);
73         }
74 }
75
76 ///////////////////////////////////////////////////////////////////////////////
77 // protected members
78
79 //
80 // ReturnBuffer
81 //
82 /// Provides a method of returning a buffer to the free queue
83 /// after processing.  The DataHandle class calls this automatically
84 /// from its destructor.
85 void SocketRoutingQueue::ReturnBuffer(Data *buf)
86 {
87         // don't need to lock here, since m_free handles its own locking
88         m_free.push(buf);
89 }
90
91 //
92 // SimpleReadThread()
93 //
94 /// Convenience thread to handle USB read activity.
95 ///
96 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
97 {
98         SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
99
100         // read from USB and write to stdout until finished
101         q->m_seen_usb_error = false;
102         while( q->m_continue_reading ) {
103                 q->DoRead(1000);        // timeout in milliseconds
104         }
105         return 0;
106 }
107
108
109 ///////////////////////////////////////////////////////////////////////////////
110 // public API
111
112 // These functions connect the router to an external Usb::Device
113 // object.  Normally this is handled automatically by the
114 // Controller class, but are public here in case they are needed.
115 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
116                                         SocketDataHandlerPtr callback)
117 {
118         scoped_lock lock(m_mutex);
119         m_dev = dev;
120         m_usb_error_dev_callback = callback;
121         m_writeEp = writeEp;
122         m_readEp = readEp;
123 }
124
125 void SocketRoutingQueue::ClearUsbDevice()
126 {
127         scoped_lock lock(m_mutex);
128         m_dev = 0;
129         m_usb_error_dev_callback.reset();
130         lock.unlock();
131
132         // wait for the DoRead cycle to finish, so the external
133         // Usb::Device object doesn't close before we're done with it
134         scoped_lock wait(m_readwaitMutex);
135         pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
136 }
137
138 bool SocketRoutingQueue::UsbDeviceReady()
139 {
140         scoped_lock lock(m_mutex);
141         return m_dev != 0 && !m_seen_usb_error;
142 }
143
144 //
145 // AllocateBuffers
146 //
147 /// This class starts out with no buffers, and will grow one buffer
148 /// at a time if needed.  Call this to allocate count buffers
149 /// all at once and place them on the free queue.  After calling
150 /// this function, at least count buffers will exist in the free
151 /// queue.  If there are already count buffers, none will be added.
152 ///
153 void SocketRoutingQueue::AllocateBuffers(int count)
154 {
155         int todo = count - m_free.size();
156
157         for( int i = 0; i < todo; i++ ) {
158                 // m_free handles its own locking
159                 m_free.push( new Data );
160         }
161 }
162
163 //
164 // DefaultRead (both variations)
165 //
166 /// Returns the data for the next unregistered socket.
167 /// Blocks until timeout or data is available.
168 /// Returns false (or null pointer) on timeout and no data.
169 /// With the return version of the function, there is no
170 /// copying performed.
171 ///
172 /// This version performs a copy.
173 ///
174 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
175 {
176         DataHandle buf = DefaultRead(timeout);
177         if( !buf.get() )
178                 return false;
179
180         // copy to desired buffer
181         receive = *buf.get();
182         return true;
183 }
184
185 ///
186 /// This version does not perform a copy.
187 ///
188 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
189 {
190         // m_default handles its own locking
191         Data *buf = m_default.wait_pop(timeout);
192         return DataHandle(*this, buf);
193 }
194
195 //
196 // RegisterInterest
197 //
198 /// Register an interest in data from a certain socket.  To read
199 /// from that socket, use the SocketRead() function from then on.
200 ///
201 /// Any non-registered socket goes in the default queue
202 /// and must be read by DefaultRead()
203 ///
204 /// If not null, handler is called when new data is read.  It will
205 /// be called in the same thread instance that DoRead() is called from.
206 /// Handler is passed the DataQueue Data pointer, and so no
207 /// copying is done.  Once the handler returns, the data is
208 /// considered processed and not added to the interested queue,
209 /// but instead returned to m_free.
210 ///
211 /// Throws std::logic_error if already registered.
212 ///
213 void SocketRoutingQueue::RegisterInterest(SocketId socket,
214                                           SocketDataHandlerPtr handler)
215 {
216         // modifying our own std::map, need a lock
217         scoped_lock lock(m_mutex);
218
219         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
220         if( qi != m_socketQueues.end() )
221                 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
222
223         m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
224         m_interest = true;
225 }
226
227 //
228 // UnregisterInterest
229 //
230 /// Unregisters interest in data from the given socket, and discards
231 /// any existing data in its interest queue.  Any new incoming data
232 /// for this socket will be placed in the default queue.
233 ///
234 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
235 {
236         // modifying our own std::map, need a lock
237         scoped_lock lock(m_mutex);
238
239         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
240         if( qi == m_socketQueues.end() )
241                 return; // nothing registered, done
242
243         // salvage all our data buffers
244         m_free.append_from( qi->second->m_queue );
245
246         // remove the QueueEntryPtr from the map
247         m_socketQueues.erase( qi );
248
249         // check the interest flag
250         m_interest = m_socketQueues.size() > 0;
251 }
252
253 //
254 // SocketRead
255 //
256 /// Reads data from the interested socket cache.  Can only read
257 /// from sockets that have been previously registered.
258 ///
259 /// Blocks until timeout or data is available.
260 ///
261 /// Returns false (or null pointer) on timeout and no data.
262 /// With the return version of the function, there is no
263 /// copying performed.
264 ///
265 /// Throws std::logic_error if a socket was requested that was
266 /// not previously registered.
267 ///
268 /// Copying is performed with this function.
269 ///
270 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
271 {
272         DataHandle buf = SocketRead(socket, timeout);
273         if( !buf.get() )
274                 return false;
275
276         // copy to desired buffer
277         receive = *buf.get();
278         return true;
279 }
280
281 ///
282 /// Copying is not performed with this function.
283 ///
284 /// Throws std::logic_error if a socket was requested that was
285 /// not previously registered.
286 ///
287 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
288 {
289         QueueEntryPtr qep;
290         DataQueue *dq = 0;
291
292         // accessing our own std::map, need a lock
293         {
294                 scoped_lock lock(m_mutex);
295                 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
296                 if( qi == m_socketQueues.end() )
297                         throw std::logic_error("SocketRead requested data from unregistered socket.");
298
299                 // got our queue, save the whole QueueEntryPtr (shared_ptr),
300                 // and unlock, since we will be waiting on the DataQueue,
301                 // not the socketQueues map
302                 //
303                 // This is safe, since even if UnregisterInterest is called,
304                 // our pointer won't be deleted until our shared_ptr
305                 // (QueueEntryPtr) goes out of scope.
306                 //
307                 // The remaining problem is that wait_pop() might wait
308                 // forever if there is no timeout... c'est la vie.
309                 // Should'a used a timeout. :-)
310                 qep = qi->second;
311                 dq = &qep->m_queue;
312         }
313
314         // get data from DataQueue
315         Data *buf = dq->wait_pop(timeout);
316
317         // specifically delete our copy of shared pointer, in a locked
318         // environment
319         {
320                 scoped_lock lock(m_mutex);
321                 qep.reset();
322         }
323
324         return DataHandle(*this, buf);
325 }
326
327 // Returns true if data is available for that socket.
328 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
329 {
330         scoped_lock lock(m_mutex);
331         SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
332         if( qi == m_socketQueues.end() )
333                 return false;
334         return qi->second->m_queue.size() > 0;
335 }
336
337 //
338 // DoRead
339 //
340 /// Called by the application's "read thread" to read the next usb
341 /// packet and route it to the correct queue.  Returns after every
342 /// read, even if a handler is associated with a queue.
343 /// Note: this function is safe to call before SetUsbDevice() is
344 /// called... it just doesn't do anything if there is no usb
345 /// device to work with.
346 ///
347 /// Timeout is in milliseconds.
348 void SocketRoutingQueue::DoRead(int timeout)
349 {
350         class ReadWaitSignal
351         {
352                 pthread_mutex_t &m_Mutex;
353                 pthread_cond_t &m_Cond;
354         public:
355                 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
356                         : m_Mutex(mut), m_Cond(cond)
357                         {}
358                 ~ReadWaitSignal()
359                 {
360                         scoped_lock wait(m_Mutex);
361                         pthread_cond_signal(&m_Cond);
362                 }
363         } readwait(m_readwaitMutex, m_readwaitCond);
364
365         Usb::Device * volatile dev = 0;
366         int readEp;
367         DataHandle buf(*this, 0);
368
369         // if we are not connected to a USB device yet, just wait
370         {
371                 scoped_lock lock(m_mutex);
372
373                 if( !m_dev || m_seen_usb_error ) {
374                         lock.unlock();  // unlock early, since we're sleeping
375                         // sleep only a short time, since things could be
376                         // in the process of setup or teardown
377                         usleep(125000);
378                         return;
379                 }
380
381                 dev = m_dev;
382                 readEp = m_readEp;
383
384                 // fetch a free buffer
385                 Data *raw = m_free.pop();
386                 if( !raw )
387                         buf = DataHandle(*this, new Data);
388                 else
389                         buf = DataHandle(*this, raw);
390         }
391
392         // take a chance and do the read unlocked, as this has the potential
393         // for blocking for a while
394         try {
395
396                 Data &data = *buf.get();
397
398                 if( !dev->BulkRead(readEp, data, timeout) )
399                         return; // no data, done!
400
401                 MAKE_PACKET(pack, data);
402                 
403                 // make sure the size is right
404                 if( data.GetSize() < sizeof(pack->socket) )
405                         return; // bad size, just skip
406
407                 // extract the socket from the packet
408                 uint16_t socket = btohs(pack->socket);
409
410                 // we have data, now lock up again to place it
411                 // in the right queue
412                 scoped_lock lock(m_mutex);
413
414                 // search for registration of socket
415                 if( m_interest ) {
416                         SocketQueueMap::iterator qi = m_socketQueues.find(socket);
417                         if( qi != m_socketQueues.end() ) {
418                                 SocketDataHandlerPtr &sdh = qi->second->m_handler;
419
420                                 // is there a handler?
421                                 if( sdh ) {
422                                         // unlock & let the handler process it
423                                         lock.unlock();
424                                         sdh->DataReceived(*buf.get());
425                                         return;
426                                 }
427                                 else {
428                                         qi->second->m_queue.push(buf.release());
429                                         return;
430                                 }
431                         }
432
433                         // fall through
434                 }
435
436                 // safe to unlock now, we are done with the map
437                 lock.unlock();
438
439                 // if we get here, send to default queue
440                 m_default.push(buf.release());
441         }
442         catch( Usb::Timeout & ) {
443                 // this is expected... just ignore
444         }
445         catch( Usb::Error &ue ) {
446                 // set the flag first, in case any of the handlers
447                 // are able to recover from this error
448                 m_seen_usb_error = true;
449
450                 // this is unexpected, but we're in a thread here...
451                 // Need to iterate through all the registered handlers
452                 // calling their error callback.
453                 // Can't be locked when calling the callback, so need
454                 // to make a list of them first.
455                 scoped_lock lock(m_mutex);
456                 std::vector<SocketDataHandlerPtr> handlers;
457                 SocketQueueMap::iterator qi = m_socketQueues.begin();
458                 while( qi != m_socketQueues.end() ) {
459                         SocketDataHandlerPtr &sdh = qi->second->m_handler;
460                         // is there a handler?
461                         if( sdh ) {
462                                 handlers.push_back(sdh);
463                         }
464                         ++qi;
465                 }
466
467                 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
468
469                 lock.unlock();
470                 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
471                 while( hi != handlers.end() ) {
472                         (*hi)->Error(ue);
473                         ++hi;
474                 }
475
476                 // and finally, call the specific error callback if available
477                 if( usb_error_handler.get() ) {
478                         usb_error_handler->Error(ue);
479                 }
480         }
481 }
482
483 void SocketRoutingQueue::SpinoffSimpleReadThread()
484 {
485         // signal that it's ok to run inside the thread
486         if( m_continue_reading )
487                 return; // already running
488         m_continue_reading = true;
489
490         // Start USB read thread, to handle all routing
491         int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
492         if( ret ) {
493                 m_continue_reading = false;
494                 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
495         }
496 }
497
498 } // namespace Barry
499