ppp: minor cleanups to match other chatscripts order and documentation
[barry.git] / src / router.cc
blob2f6a7e0f5b846d4650693262be65854130f3860f
1 ///
2 /// \file router.cc
3 /// Support classes for the pluggable socket routing system.
4 ///
6 /*
7 Copyright (C) 2008-2010, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "router.h"
23 #include "scoped_lock.h"
24 #include "data.h"
25 #include "protostructs.h"
26 #include "usbwrap.h"
27 #include "endian.h"
28 #include "debug.h"
29 #include <unistd.h>
31 namespace Barry {
33 ///////////////////////////////////////////////////////////////////////////////
34 // SocketDataHandler default methods
36 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
38 // Just log the error
39 eout("SocketDataHandler: Error: " << error.what());
40 (void) error;
43 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
45 // Nothing to destroy
48 ///////////////////////////////////////////////////////////////////////////////
49 // SocketRoutingQueue constructors
51 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count,
52 int default_read_timeout)
53 : m_dev(0)
54 , m_writeEp(0)
55 , m_readEp(0)
56 , m_interest(false)
57 , m_seen_usb_error(false)
58 , m_timeout(default_read_timeout)
59 , m_continue_reading(false)
61 pthread_mutex_init(&m_mutex, NULL);
63 pthread_mutex_init(&m_readwaitMutex, NULL);
64 pthread_cond_init(&m_readwaitCond, NULL);
66 AllocateBuffers(prealloc_buffer_count);
69 SocketRoutingQueue::~SocketRoutingQueue()
71 // thread running?
72 if( m_continue_reading ) {
73 m_continue_reading = false;
74 pthread_join(m_usb_read_thread, NULL);
78 ///////////////////////////////////////////////////////////////////////////////
79 // protected members
82 // ReturnBuffer
84 /// Provides a method of returning a buffer to the free queue
85 /// after processing. The DataHandle class calls this automatically
86 /// from its destructor.
87 void SocketRoutingQueue::ReturnBuffer(Data *buf)
89 // don't need to lock here, since m_free handles its own locking
90 m_free.push(buf);
94 // SimpleReadThread()
96 /// Convenience thread to handle USB read activity.
97 ///
98 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
100 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
102 // read from USB and write to stdout until finished
103 q->m_seen_usb_error = false;
104 while( q->m_continue_reading ) {
105 try {
106 q->DoRead(1000); // timeout in milliseconds
108 catch (std::runtime_error const &e) {
109 eout("SimpleReadThread received uncaught exception: " << typeid(e).name() << " what: " << e.what());
111 catch (...) {
112 eout("SimpleReadThread recevied uncaught exception of unknown type");
115 return 0;
119 ///////////////////////////////////////////////////////////////////////////////
120 // public API
122 // These functions connect the router to an external Usb::Device
123 // object. Normally this is handled automatically by the
124 // Controller class, but are public here in case they are needed.
125 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
126 SocketDataHandlerPtr callback)
128 scoped_lock lock(m_mutex);
129 m_dev = dev;
130 m_usb_error_dev_callback = callback;
131 m_writeEp = writeEp;
132 m_readEp = readEp;
135 void SocketRoutingQueue::ClearUsbDevice()
137 scoped_lock lock(m_mutex);
138 m_dev = 0;
139 m_usb_error_dev_callback.reset();
140 lock.unlock();
142 // wait for the DoRead cycle to finish, so the external
143 // Usb::Device object doesn't close before we're done with it
144 scoped_lock wait(m_readwaitMutex);
145 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
148 bool SocketRoutingQueue::UsbDeviceReady()
150 scoped_lock lock(m_mutex);
151 return m_dev != 0 && !m_seen_usb_error;
155 // AllocateBuffers
157 /// This class starts out with no buffers, and will grow one buffer
158 /// at a time if needed. Call this to allocate count buffers
159 /// all at once and place them on the free queue. After calling
160 /// this function, at least count buffers will exist in the free
161 /// queue. If there are already count buffers, none will be added.
163 void SocketRoutingQueue::AllocateBuffers(int count)
165 int todo = count - m_free.size();
167 for( int i = 0; i < todo; i++ ) {
168 // m_free handles its own locking
169 m_free.push( new Data );
174 // DefaultRead (both variations)
176 /// Returns the data for the next unregistered socket.
177 /// Blocks until timeout or data is available.
178 /// Returns false (or null pointer) on timeout and no data.
179 /// With the return version of the function, there is no
180 /// copying performed.
182 /// This version performs a copy.
184 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
186 DataHandle buf = DefaultRead(timeout);
187 if( !buf.get() )
188 return false;
190 // copy to desired buffer
191 receive = *buf.get();
192 return true;
196 /// This version does not perform a copy.
198 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
200 if( m_seen_usb_error && timeout == -1 ) {
201 // If an error has been seen and not cleared then no
202 // more data will be read into the queue by
203 // DoRead(). Forcing the timeout to zero allows any
204 // data already in the queue to be read, but prevents
205 // waiting for data which will never arrive.
206 timeout = 0;
209 // m_default handles its own locking
210 // Be careful with the queue timeout, since its -1 means "forever"
211 Data *buf = m_default.wait_pop(timeout == -1 ? m_timeout : timeout);
212 return DataHandle(*this, buf);
216 // RegisterInterest
218 /// Register an interest in data from a certain socket. To read
219 /// from that socket, use the SocketRead() function from then on.
221 /// Any non-registered socket goes in the default queue
222 /// and must be read by DefaultRead()
224 /// If not null, handler is called when new data is read. It will
225 /// be called in the same thread instance that DoRead() is called from.
226 /// Handler is passed the DataQueue Data pointer, and so no
227 /// copying is done. Once the handler returns, the data is
228 /// considered processed and not added to the interested queue,
229 /// but instead returned to m_free.
231 /// Throws std::logic_error if already registered.
233 void SocketRoutingQueue::RegisterInterest(SocketId socket,
234 SocketDataHandlerPtr handler)
236 // modifying our own std::map, need a lock
237 scoped_lock lock(m_mutex);
239 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
240 if( qi != m_socketQueues.end() )
241 throw std::logic_error("RegisterInterest requesting a previously registered socket.");
243 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler) );
244 m_interest = true;
248 // UnregisterInterest
250 /// Unregisters interest in data from the given socket, and discards
251 /// any existing data in its interest queue. Any new incoming data
252 /// for this socket will be placed in the default queue.
254 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
256 // modifying our own std::map, need a lock
257 scoped_lock lock(m_mutex);
259 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
260 if( qi == m_socketQueues.end() )
261 return; // nothing registered, done
263 // salvage all our data buffers
264 m_free.append_from( qi->second->m_queue );
266 // remove the QueueEntryPtr from the map
267 m_socketQueues.erase( qi );
269 // check the interest flag
270 m_interest = m_socketQueues.size() > 0;
274 // SocketRead
276 /// Reads data from the interested socket cache. Can only read
277 /// from sockets that have been previously registered.
279 /// Blocks until timeout or data is available.
281 /// Returns false (or null pointer) on timeout and no data.
282 /// With the return version of the function, there is no
283 /// copying performed.
285 /// Throws std::logic_error if a socket was requested that was
286 /// not previously registered.
288 /// Copying is performed with this function.
290 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
292 DataHandle buf = SocketRead(socket, timeout);
293 if( !buf.get() )
294 return false;
296 // copy to desired buffer
297 receive = *buf.get();
298 return true;
302 /// Copying is not performed with this function.
304 /// Throws std::logic_error if a socket was requested that was
305 /// not previously registered.
307 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
309 QueueEntryPtr qep;
310 DataQueue *dq = 0;
312 // accessing our own std::map, need a lock
314 scoped_lock lock(m_mutex);
315 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
316 if( qi == m_socketQueues.end() )
317 throw std::logic_error("SocketRead requested data from unregistered socket.");
319 // got our queue, save the whole QueueEntryPtr (shared_ptr),
320 // and unlock, since we will be waiting on the DataQueue,
321 // not the socketQueues map
323 // This is safe, since even if UnregisterInterest is called,
324 // our pointer won't be deleted until our shared_ptr
325 // (QueueEntryPtr) goes out of scope.
327 // The remaining problem is that wait_pop() might wait
328 // forever if there is no timeout... c'est la vie.
329 // Should'a used a timeout. :-)
330 qep = qi->second;
331 dq = &qep->m_queue;
334 // get data from DataQueue
335 // Be careful with the queue timeout, since its -1 means "forever"
336 Data *buf = dq->wait_pop(timeout == -1 ? m_timeout : timeout);
338 // specifically delete our copy of shared pointer, in a locked
339 // environment
341 scoped_lock lock(m_mutex);
342 qep.reset();
345 return DataHandle(*this, buf);
348 // Returns true if data is available for that socket.
349 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
351 scoped_lock lock(m_mutex);
352 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
353 if( qi == m_socketQueues.end() )
354 return false;
355 return qi->second->m_queue.size() > 0;
359 // DoRead
361 /// Called by the application's "read thread" to read the next usb
362 /// packet and route it to the correct queue. Returns after every
363 /// read, even if a handler is associated with a queue.
364 /// Note: this function is safe to call before SetUsbDevice() is
365 /// called... it just doesn't do anything if there is no usb
366 /// device to work with.
368 /// Timeout is in milliseconds.
369 // This timeout is for the USB subsystem, so no special handling
370 // for it is needed... just use usbwrap's default timeout.
371 void SocketRoutingQueue::DoRead(int timeout)
373 class ReadWaitSignal
375 pthread_mutex_t &m_Mutex;
376 pthread_cond_t &m_Cond;
377 public:
378 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
379 : m_Mutex(mut), m_Cond(cond)
381 ~ReadWaitSignal()
383 scoped_lock wait(m_Mutex);
384 pthread_cond_signal(&m_Cond);
386 } readwait(m_readwaitMutex, m_readwaitCond);
388 Usb::Device * volatile dev = 0;
389 int readEp;
390 DataHandle buf(*this, 0);
392 // if we are not connected to a USB device yet, just wait
394 scoped_lock lock(m_mutex);
396 if( !m_dev || m_seen_usb_error ) {
397 lock.unlock(); // unlock early, since we're sleeping
398 // sleep only a short time, since things could be
399 // in the process of setup or teardown
400 usleep(125000);
401 return;
404 dev = m_dev;
405 readEp = m_readEp;
407 // fetch a free buffer
408 Data *raw = m_free.pop();
409 if( !raw )
410 buf = DataHandle(*this, new Data);
411 else
412 buf = DataHandle(*this, raw);
415 // take a chance and do the read unlocked, as this has the potential
416 // for blocking for a while
417 try {
419 Data &data = *buf.get();
421 if( !dev->BulkRead(readEp, data, timeout) )
422 return; // no data, done!
424 MAKE_PACKET(pack, data);
426 // make sure the size is right
427 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
428 return; // bad size, just skip
430 // extract the socket from the packet
431 uint16_t socket = btohs(pack->socket);
433 // we have data, now lock up again to place it
434 // in the right queue
435 scoped_lock lock(m_mutex);
437 // search for registration of socket
438 if( m_interest ) {
439 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
440 if( qi != m_socketQueues.end() ) {
441 SocketDataHandlerPtr &sdh = qi->second->m_handler;
443 // is there a handler?
444 if( sdh ) {
445 // unlock & let the handler process it
446 lock.unlock();
447 sdh->DataReceived(*buf.get());
448 return;
450 else {
451 qi->second->m_queue.push(buf.release());
452 return;
456 // fall through
459 // safe to unlock now, we are done with the map
460 lock.unlock();
462 // if we get here, send to default queue
463 m_default.push(buf.release());
465 catch( Usb::Timeout & ) {
466 // this is expected... just ignore
468 catch( Usb::Error &ue ) {
469 // set the flag first, in case any of the handlers
470 // are able to recover from this error
471 m_seen_usb_error = true;
473 // this is unexpected, but we're in a thread here...
474 // Need to iterate through all the registered handlers
475 // calling their error callback.
476 // Can't be locked when calling the callback, so need
477 // to make a list of them first.
478 scoped_lock lock(m_mutex);
479 std::vector<SocketDataHandlerPtr> handlers;
480 SocketQueueMap::iterator qi = m_socketQueues.begin();
481 while( qi != m_socketQueues.end() ) {
482 SocketDataHandlerPtr &sdh = qi->second->m_handler;
483 // is there a handler?
484 if( sdh ) {
485 handlers.push_back(sdh);
487 ++qi;
490 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
492 lock.unlock();
493 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
494 while( hi != handlers.end() ) {
495 (*hi)->Error(ue);
496 ++hi;
499 // and finally, call the specific error callback if available
500 if( usb_error_handler.get() ) {
501 usb_error_handler->Error(ue);
506 void SocketRoutingQueue::SpinoffSimpleReadThread()
508 // signal that it's ok to run inside the thread
509 if( m_continue_reading )
510 return; // already running
511 m_continue_reading = true;
513 // Start USB read thread, to handle all routing
514 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
515 if( ret ) {
516 m_continue_reading = false;
517 throw Barry::ErrnoError("SocketRoutingQueue: Error creating USB read thread.", ret);
521 } // namespace Barry