Add connectors to Dispatcher and tests
[wifi-radar.git] / wifiradar / pubsub.py
blob4ab74305225d865d8509e466316131aba0fd402c
1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <seankrobinson@gmail.com>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 from collections import namedtuple
25 import logging
26 from multiprocessing import Event, Pipe, RLock
27 import select
29 # create a logger
30 logger = logging.getLogger(__name__)
33 Message = namedtuple('Message', 'topic, details')
36 class Dispatcher(object):
37 """ Dispatcher provides the base infrastruture for the WiFi Radar
38 publish/subscribe pattern. One Dispatcher should run in each
39 process.
40 """
41 def __init__(self):
42 """ Create an empty Dispatcher.
43 """
44 self.pipes = dict()
45 self._pairs = dict()
46 self._pipes_lock = RLock()
47 self._watching = Event()
48 self._watching.set()
50 def __del__(self):
51 """ Close all pipes when the Dispatcher object is garbage
52 collected.
53 """
54 self.close()
56 def close(self):
57 """ End the Dispatcher.
58 """
59 # Stop the run loop before shutting everything down.
60 self._watching.clear()
61 # Work on a pre-built list of the keys, because we are deleting keys.
62 for pipe in list(self.pipes):
63 self._close_connection(pipe)
65 def subscribe(self, topics=None):
66 """ Subscribe to messages with a topic in :data:`topics`.
67 :data:`topics` is a **str** (for one topic) or an **iterable**
68 (for more than one topic).
69 """
70 if topics is None:
71 topics = list()
72 if isinstance(topics, str):
73 topics = [topics]
74 topics = list(topics)
75 topics.append('EXIT')
77 a, b = Pipe()
78 self._pairs[b] = a
79 with self._pipes_lock:
80 self.pipes[a] = topics
81 return b
83 def unsubscribe(self, connection):
84 """ Close the subscribed pipe, :param connection:.
85 """
86 with self._pipes_lock:
87 self._close_connection(self._pairs[connection])
88 self._close_connection(connection)
89 del self._pairs[connection]
91 def add_connector(self, connection):
92 """ Provide one side of a link between two :class:`Dispatcher`s.
93 It is assumed the :func:`add_connector` method will be called
94 on the other :class:`Dispatcher` with the other side of the
95 Pipe.
96 """
97 with self._pipes_lock:
98 self.pipes[connection] = ['ALL']
100 def remove_connector(self, connection):
101 """ Remove the :data:`connection` to another :class:`Dispatcher`.
103 try:
104 connection.send(Message('PIPE-CLOSE', ''))
105 except IOError:
106 # This pipe may have been closed already.
107 logger.warning('attempted to send on closed ' +
108 'Pipe ({}), continuing...'.format(connection))
109 self._close_connection(connection)
111 def _close_connection(self, connection):
112 """ Close the :class:`Connection` object passed in
113 :data:`connection`.
115 with self._pipes_lock:
116 connection.close()
117 if connection in self.pipes:
118 del self.pipes[connection]
120 def _check_message(self, msg, connection):
121 """ Process :data:`msg` that arrived on :data:`connection`.
123 if msg.topic == 'PIPE-CLOSE':
124 self._close_connection(connection)
126 def run(self):
127 """ Watch for incoming messages and dispatch to subscribers.
129 while self._watching.is_set():
130 with self._pipes_lock:
131 pipes = self.pipes.keys()
132 rlist, _, _ = select.select(pipes, [], [], 0.05)
133 for rfd in rlist:
134 try:
135 msg = rfd.recv()
136 except (EOFError, IOError):
137 logger.warning('read on closed ' +
138 'Pipe ({}), continuing...'.format(rfd))
139 self._close_connection(rfd)
140 else:
141 self._check_message(msg, rfd)
142 for p,t in self.pipes.items():
143 if ((rfd is not p) and
144 ((msg.topic in t) or (t == ['ALL']))):
145 p.send(msg)