tzwrapper.cc: fixed use of iterator after erase
[barry.git] / src / router.cc
blobf5533355ea9004f3575be569064e7f720d5d387a
1 ///
2 /// \file router.cc
3 /// Support classes for the pluggable socket routing system.
4 ///
6 /*
7 Copyright (C) 2008-2013, Net Direct Inc. (http://www.netdirect.ca/)
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 See the GNU General Public License in the COPYING file at the
19 root directory of this project for more details.
22 #include "i18n.h"
23 #include "router.h"
24 #include "scoped_lock.h"
25 #include "data.h"
26 #include "protostructs.h"
27 #include "protocol.h"
28 #include "usbwrap.h"
29 #include "endian.h"
30 #include "debug.h"
31 #include <unistd.h>
32 #include <iostream>
33 #include <iomanip>
35 using namespace std;
37 namespace Barry {
39 ///////////////////////////////////////////////////////////////////////////////
40 // SocketDataHandler default methods
42 void SocketRoutingQueue::SocketDataHandler::Error(Barry::Error &error)
44 // Just log the error
45 eout("SocketDataHandler: Error: " << error.what());
46 (void) error;
49 SocketRoutingQueue::SocketDataHandler::~SocketDataHandler()
51 // Nothing to destroy
54 ///////////////////////////////////////////////////////////////////////////////
55 // SocketRoutingQueue constructors
57 SocketRoutingQueue::SocketRoutingQueue(int prealloc_buffer_count,
58 int default_read_timeout)
59 : m_dev(0)
60 , m_writeEp(0)
61 , m_readEp(0)
62 , m_interest(false)
63 , m_seen_usb_error(false)
64 , m_timeout(default_read_timeout)
65 , m_continue_reading(false)
67 pthread_mutex_init(&m_mutex, NULL);
69 pthread_mutex_init(&m_readwaitMutex, NULL);
70 pthread_cond_init(&m_readwaitCond, NULL);
72 AllocateBuffers(prealloc_buffer_count);
75 SocketRoutingQueue::~SocketRoutingQueue()
77 // thread running?
78 if( m_continue_reading ) {
79 m_continue_reading = false;
80 pthread_join(m_usb_read_thread, NULL);
83 // dump all unused packets to debug output
84 SocketQueueMap::const_iterator b = m_socketQueues.begin();
85 for( ; b != m_socketQueues.end(); ++b ) {
86 DumpSocketQueue(b->first, b->second->m_queue);
88 if( m_default.size() ) {
89 ddout("(Default queue is socket 0)");
90 DumpSocketQueue(0, m_default);
94 ///////////////////////////////////////////////////////////////////////////////
95 // protected members
98 // ReturnBuffer
100 /// Provides a method of returning a buffer to the free queue
101 /// after processing. The DataHandle class calls this automatically
102 /// from its destructor.
103 void SocketRoutingQueue::ReturnBuffer(Data *buf)
105 // don't need to lock here, since m_free handles its own locking
106 m_free.push(buf);
110 // QueuePacket
112 /// Helper function to add a buffer to a socket queue.
113 /// Returns false if no queue is available for that socket.
114 //// Also empties the DataHandle on success.
116 bool SocketRoutingQueue::QueuePacket(SocketId socket, DataHandle &buf)
118 if( m_interest ) {
119 bool seq = Protocol::IsSequencePacket(*buf.get());
121 // lock so we can access the m_socketQueues map safely
122 scoped_lock lock(m_mutex);
124 // search for registration of socket
125 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
126 if( qi != m_socketQueues.end() ) {
127 if( seq ) {
128 if( (qi->second->m_type & SequencePackets) == 0 )
129 return false;
131 else {
132 if( (qi->second->m_type & DataPackets) == 0 )
133 return false;
135 qi->second->m_queue.push(buf.release());
136 return true;
140 return false;
143 bool SocketRoutingQueue::QueuePacket(DataQueue &queue, DataHandle &buf)
145 // don't need to lock here, since queue handles its own locking
146 queue.push(buf.release());
147 return true;
151 // RouteOrQueuePacket
153 /// Same as QueuePacket, except sends the data to the callback if
154 /// a callback is available.
156 /// This function duplicates code from QueuePacket(), in order to
157 /// optimize the mutex locking.
159 bool SocketRoutingQueue::RouteOrQueuePacket(SocketId socket, DataHandle &buf)
161 // search for registration of socket
162 if( m_interest ) {
163 // lock so we can access the m_socketQueues map safely
164 scoped_lock lock(m_mutex);
166 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
167 if( qi != m_socketQueues.end() ) {
168 SocketDataHandlerPtr &sdh = qi->second->m_handler;
170 // is there a handler?
171 if( sdh ) {
172 // unlock & let the handler process it
173 lock.unlock();
174 sdh->DataReceived(*buf.get());
176 // no exceptions thrown, clear the
177 // DataHandle, sending packet back to its
178 // free list
179 buf.reset();
180 return true;
182 else {
183 qi->second->m_queue.push(buf.release());
184 return true;
189 return false;
193 // SimpleReadThread()
195 /// Convenience thread to handle USB read activity.
197 void *SocketRoutingQueue::SimpleReadThread(void *userptr)
199 SocketRoutingQueue *q = (SocketRoutingQueue *)userptr;
201 // read from USB and write to stdout until finished
202 q->m_seen_usb_error = false;
203 while( q->m_continue_reading ) {
204 try {
205 q->DoRead(1000); // timeout in milliseconds
207 catch (std::runtime_error const &e) {
208 eout(_("SimpleReadThread received uncaught exception: ") << typeid(e).name() << _(" what: ") << e.what());
210 catch (...) {
211 eout(_("SimpleReadThread recevied uncaught exception of unknown type"));
214 return 0;
217 void SocketRoutingQueue::DumpSocketQueue(SocketId socket, const DataQueue &dq)
219 // dump a record of any unused packets in the queue, for debugging
220 if( dq.size() ) {
221 ddout(_("SocketRoutingQueue Leftovers: ")
222 << dec << dq.size()
223 << _(" packet(s) for socket: ") << "0x"
224 << hex << (unsigned int) socket
225 << "\n"
226 << dq);
231 ///////////////////////////////////////////////////////////////////////////////
232 // public API
234 // These functions connect the router to an external Usb::Device
235 // object. Normally this is handled automatically by the
236 // Controller class, but are public here in case they are needed.
237 void SocketRoutingQueue::SetUsbDevice(Usb::Device *dev, int writeEp, int readEp,
238 SocketDataHandlerPtr callback)
240 scoped_lock lock(m_mutex);
241 m_dev = dev;
242 m_usb_error_dev_callback = callback;
243 m_writeEp = writeEp;
244 m_readEp = readEp;
247 void SocketRoutingQueue::ClearUsbDevice()
249 scoped_lock lock(m_mutex);
250 m_dev = 0;
251 m_usb_error_dev_callback.reset();
252 lock.unlock();
254 // wait for the DoRead cycle to finish, so the external
255 // Usb::Device object doesn't close before we're done with it
256 scoped_lock wait(m_readwaitMutex);
257 pthread_cond_wait(&m_readwaitCond, &m_readwaitMutex);
260 bool SocketRoutingQueue::UsbDeviceReady()
262 scoped_lock lock(m_mutex);
263 return m_dev != 0 && !m_seen_usb_error;
267 // AllocateBuffers
269 /// This class starts out with no buffers, and will grow one buffer
270 /// at a time if needed. Call this to allocate count buffers
271 /// all at once and place them on the free queue. After calling
272 /// this function, at least count buffers will exist in the free
273 /// queue. If there are already count buffers, none will be added.
275 void SocketRoutingQueue::AllocateBuffers(int count)
277 int todo = count - m_free.size();
279 for( int i = 0; i < todo; i++ ) {
280 // m_free handles its own locking
281 m_free.push( new Data );
286 // DefaultRead (both variations)
288 /// Returns the data for the next unregistered socket.
289 /// Blocks until timeout or data is available.
290 /// Returns false (or null pointer) on timeout and no data.
291 /// With the return version of the function, there is no
292 /// copying performed.
294 /// This version performs a copy.
296 bool SocketRoutingQueue::DefaultRead(Data &receive, int timeout)
298 DataHandle buf = DefaultRead(timeout);
299 if( !buf.get() )
300 return false;
302 // copy to desired buffer
303 receive = *buf.get();
304 return true;
308 /// This version does not perform a copy.
310 DataHandle SocketRoutingQueue::DefaultRead(int timeout)
312 if( m_seen_usb_error && timeout == -1 ) {
313 // If an error has been seen and not cleared then no
314 // more data will be read into the queue by
315 // DoRead(). Forcing the timeout to zero allows any
316 // data already in the queue to be read, but prevents
317 // waiting for data which will never arrive.
318 timeout = 0;
321 // m_default handles its own locking
322 // Be careful with the queue timeout, since its -1 means "forever"
323 Data *buf = m_default.wait_pop(timeout == -1 ? m_timeout : timeout);
324 return DataHandle(*this, buf);
328 // RegisterInterestAndType
330 /// Register an interest in data from a certain socket. To read
331 /// from that socket, use the SocketRead() function from then on.
333 /// Any non-registered socket goes in the default queue
334 /// and must be read by DefaultRead()
336 /// If not null, handler is called when new data is read. It will
337 /// be called in the same thread instance that DoRead() is called from.
338 /// Handler is passed the DataQueue Data pointer, and so no
339 /// copying is done. Once the handler returns, the data is
340 /// considered processed and not added to the interested queue,
341 /// but instead returned to m_free.
343 /// Throws std::logic_error if already registered.
345 void SocketRoutingQueue::RegisterInterestAndType(SocketId socket,
346 SocketDataHandlerPtr handler,
347 InterestType type)
349 // modifying our own std::map, need a lock
350 scoped_lock lock(m_mutex);
352 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
353 if( qi != m_socketQueues.end() )
354 throw std::logic_error(_("RegisterInterest requesting a previously registered socket."));
356 m_socketQueues[socket] = QueueEntryPtr( new QueueEntry(handler, type) );
357 m_interest = true;
361 // RegisterInterest
363 /// This behaves like RegisterInterest(SocketId, SocketDataHandlerPtr, InterestType)
364 /// but defaults to an InterestType of SequenceAndDataPackets
366 void SocketRoutingQueue::RegisterInterest(SocketId socket,
367 SocketDataHandlerPtr handler)
369 RegisterInterestAndType(socket, handler, SequenceAndDataPackets);
373 // UnregisterInterest
375 /// Unregisters interest in data from the given socket, and discards
376 /// any existing data in its interest queue. Any new incoming data
377 /// for this socket will be placed in the default queue.
379 void SocketRoutingQueue::UnregisterInterest(SocketId socket)
381 // modifying our own std::map, need a lock
382 scoped_lock lock(m_mutex);
384 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
385 if( qi == m_socketQueues.end() )
386 return; // nothing registered, done
388 // dump a record of any unused packets in the queue, for debugging
389 DumpSocketQueue(qi->first, qi->second->m_queue);
391 // salvage all our data buffers
392 m_free.append_from( qi->second->m_queue );
394 // remove the QueueEntryPtr from the map
395 m_socketQueues.erase( qi );
397 // check the interest flag
398 m_interest = m_socketQueues.size() > 0;
403 // ChangeInterest
405 /// Changes the type of data that a client is interested in for a certain socket.
406 /// Interest in the socket must have previously been registered by a call
407 /// to RegisterInterest().
408 void SocketRoutingQueue::ChangeInterest(SocketId socket, InterestType type)
410 // modifying our own std::map, need a lock
411 scoped_lock lock(m_mutex);
413 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
414 if( qi == m_socketQueues.end() )
415 throw std::logic_error("ChangeInterest requires a previously registered socket.");
417 qi->second->m_type = type;
421 // SocketRead
423 /// Reads data from the interested socket cache. Can only read
424 /// from sockets that have been previously registered.
426 /// Blocks until timeout or data is available.
428 /// Returns false (or null pointer) on timeout and no data.
429 /// With the return version of the function, there is no
430 /// copying performed.
432 /// Throws std::logic_error if a socket was requested that was
433 /// not previously registered.
435 /// Copying is performed with this function.
437 bool SocketRoutingQueue::SocketRead(SocketId socket, Data &receive, int timeout)
439 DataHandle buf = SocketRead(socket, timeout);
440 if( !buf.get() )
441 return false;
443 // copy to desired buffer
444 receive = *buf.get();
445 return true;
449 /// Copying is not performed with this function.
451 /// Throws std::logic_error if a socket was requested that was
452 /// not previously registered.
454 DataHandle SocketRoutingQueue::SocketRead(SocketId socket, int timeout)
456 QueueEntryPtr qep;
457 DataQueue *dq = 0;
459 // accessing our own std::map, need a lock
461 scoped_lock lock(m_mutex);
462 SocketQueueMap::iterator qi = m_socketQueues.find(socket);
463 if( qi == m_socketQueues.end() )
464 throw std::logic_error(_("SocketRead requested data from unregistered socket."));
466 // got our queue, save the whole QueueEntryPtr (shared_ptr),
467 // and unlock, since we will be waiting on the DataQueue,
468 // not the socketQueues map
470 // This is safe, since even if UnregisterInterest is called,
471 // our pointer won't be deleted until our shared_ptr
472 // (QueueEntryPtr) goes out of scope.
474 // The remaining problem is that wait_pop() might wait
475 // forever if there is no timeout... c'est la vie.
476 // Should'a used a timeout. :-)
477 qep = qi->second;
478 dq = &qep->m_queue;
481 // get data from DataQueue
482 // Be careful with the queue timeout, since its -1 means "forever"
483 Data *buf = dq->wait_pop(timeout == -1 ? m_timeout : timeout);
485 // specifically delete our copy of shared pointer, in a locked
486 // environment
488 scoped_lock lock(m_mutex);
489 qep.reset();
492 return DataHandle(*this, buf);
495 // Returns true if data is available for that socket.
496 bool SocketRoutingQueue::IsAvailable(SocketId socket) const
498 scoped_lock lock(m_mutex);
499 SocketQueueMap::const_iterator qi = m_socketQueues.find(socket);
500 if( qi == m_socketQueues.end() )
501 return false;
502 return qi->second->m_queue.size() > 0;
506 // DoRead
508 /// Called by the application's "read thread" to read the next usb
509 /// packet and route it to the correct queue. Returns after every
510 /// read, even if a handler is associated with a queue.
511 /// Note: this function is safe to call before SetUsbDevice() is
512 /// called... it just doesn't do anything if there is no usb
513 /// device to work with.
515 /// Timeout is in milliseconds.
516 // This timeout is for the USB subsystem, so no special handling
517 // for it is needed... just use usbwrap's default timeout.
518 void SocketRoutingQueue::DoRead(int timeout)
520 class ReadWaitSignal
522 pthread_mutex_t &m_Mutex;
523 pthread_cond_t &m_Cond;
524 public:
525 ReadWaitSignal(pthread_mutex_t &mut, pthread_cond_t &cond)
526 : m_Mutex(mut), m_Cond(cond)
528 ~ReadWaitSignal()
530 scoped_lock wait(m_Mutex);
531 pthread_cond_signal(&m_Cond);
533 } readwait(m_readwaitMutex, m_readwaitCond);
535 Usb::Device * volatile dev = 0;
536 int readEp;
537 DataHandle buf(*this, 0);
539 // if we are not connected to a USB device yet, just wait
541 scoped_lock lock(m_mutex);
543 if( !m_dev || m_seen_usb_error ) {
544 lock.unlock(); // unlock early, since we're sleeping
545 // sleep only a short time, since things could be
546 // in the process of setup or teardown
547 usleep(125000);
548 return;
551 dev = m_dev;
552 readEp = m_readEp;
554 // fetch a free buffer
555 Data *raw = m_free.pop();
556 if( !raw )
557 buf = DataHandle(*this, new Data);
558 else
559 buf = DataHandle(*this, raw);
562 // take a chance and do the read unlocked, as this has the potential
563 // for blocking for a while
564 try {
566 Data &data = *buf.get();
568 if( !dev->BulkRead(readEp, data, timeout) )
569 return; // no data, done!
571 MAKE_PACKET(pack, data);
573 // make sure the size is right
574 if( data.GetSize() < SB_PACKET_SOCKET_SIZE )
575 return; // bad size, just skip
577 // extract the socket from the packet
578 uint16_t socket = btohs(pack->socket);
580 // if this is a sequence packet, handle it specially
581 if( Protocol::IsSequencePacket(data) ) {
582 // sequence.socket is a single byte
583 socket = pack->u.sequence.socket;
585 //////////////////////////////////////////////
586 // ALWAYS queue sequence packets, so that
587 // the socket code can handle SyncSend()
588 if( !QueuePacket(socket, buf) ) {
589 // if no queue available for this
590 // socket, send it to the default
591 // queue
592 QueuePacket(m_default, buf);
595 // done with sequence packet
596 return;
599 // we have data, now route or queue it
600 if( RouteOrQueuePacket(socket, buf) )
601 return; // done
603 // if we get here, send to default queue
604 QueuePacket(m_default, buf);
606 catch( Usb::Timeout & ) {
607 // this is expected... just ignore
609 catch( Usb::Error &ue ) {
610 // set the flag first, in case any of the handlers
611 // are able to recover from this error
612 m_seen_usb_error = true;
614 // this is unexpected, but we're in a thread here...
615 // Need to iterate through all the registered handlers
616 // calling their error callback.
617 // Can't be locked when calling the callback, so need
618 // to make a list of them first.
619 scoped_lock lock(m_mutex);
620 std::vector<SocketDataHandlerPtr> handlers;
621 SocketQueueMap::iterator qi = m_socketQueues.begin();
622 while( qi != m_socketQueues.end() ) {
623 SocketDataHandlerPtr &sdh = qi->second->m_handler;
624 // is there a handler?
625 if( sdh ) {
626 handlers.push_back(sdh);
628 ++qi;
631 SocketDataHandlerPtr usb_error_handler = m_usb_error_dev_callback;
633 lock.unlock();
634 std::vector<SocketDataHandlerPtr>::iterator hi = handlers.begin();
635 while( hi != handlers.end() ) {
636 (*hi)->Error(ue);
637 ++hi;
640 // and finally, call the specific error callback if available
641 if( usb_error_handler.get() ) {
642 usb_error_handler->Error(ue);
647 void SocketRoutingQueue::SpinoffSimpleReadThread()
649 // signal that it's ok to run inside the thread
650 if( m_continue_reading )
651 return; // already running
652 m_continue_reading = true;
654 // Start USB read thread, to handle all routing
655 int ret = pthread_create(&m_usb_read_thread, NULL, &SimpleReadThread, this);
656 if( ret ) {
657 m_continue_reading = false;
658 throw Barry::ErrnoError(_("SocketRoutingQueue: Error creating USB read thread."), ret);
662 } // namespace Barry