Change byte sequences to unicode sequences in ConfigManager
[wifi-radar.git] / wifiradar / pubsub.py
blob2afb6cce2b9f20552411c7ac5dc649c3f86f81eb
1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <robinson@tuxfamily.org>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to:
20 # Free Software Foundation, Inc.
21 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
25 from __future__ import unicode_literals
27 from collections import namedtuple
28 import logging
29 from multiprocessing import Event, Pipe, RLock
30 import select
31 import threading
32 import time
34 # create a logger
35 logger = logging.getLogger(__name__)
38 class Message(namedtuple('Message', 'topic, details, ts')):
39 """ Message to be passed between WiFi Radar components.
40 """
41 __slots__ = ()
43 def __new__(_cls, topic, details, ts=None):
44 """ Build a Message to pass.
45 """
46 topic = topic.upper()
47 ts = time.time()
48 return tuple.__new__(_cls, (topic, details, ts))
51 def bridge(local, foreign):
52 """ :func:`bridge` helps link two :class:`Dispatcher` objects.
53 """
54 a, b = Pipe()
55 local.add_connector(a)
56 foreign.add_connector(b)
57 return (a, b)
60 class Dispatcher(object):
61 """ Dispatcher provides the base infrastruture for the WiFi Radar
62 publish/subscribe pattern. One Dispatcher should run in each
63 process.
64 """
65 def __init__(self, auto_start=True):
66 """ Create an empty Dispatcher.
67 """
68 self.pipes = dict()
69 self._pairs = dict()
70 self._thread = None
71 self._pipes_lock = RLock()
72 self._watching = Event()
73 self._watching.set()
74 if auto_start:
75 self.start()
77 def __del__(self):
78 """ Close all pipes when the Dispatcher object is garbage
79 collected.
80 """
81 self.close()
83 def close(self):
84 """ End the Dispatcher. :func:`close` joins the message processing
85 thread, which may delay the return.
86 """
87 # Stop the run loop before shutting everything down.
88 self._watching.clear()
89 # Work on a pre-built list of the keys, because we are deleting keys.
90 for pipe in list(self.pipes):
91 self._close_connection(pipe)
92 self.join()
94 def subscribe(self, topics=None):
95 """ Subscribe to messages with a topic in :data:`topics`.
96 :data:`topics` is a **str** (for one topic) or an **iterable**
97 (for more than one topic).
98 """
99 if topics is None:
100 topics = list()
101 if isinstance(topics, str):
102 topics = [topics]
103 topics = list(topics)
104 topics.append('EXIT')
106 a, b = Pipe()
107 self._pairs[b] = a
108 with self._pipes_lock:
109 self.pipes[a] = topics
110 return b
112 def unsubscribe(self, connection):
113 """ Close the subscribed pipe, :param connection:.
115 with self._pipes_lock:
116 self._close_connection(self._pairs[connection])
117 self._close_connection(connection)
118 del self._pairs[connection]
120 def add_connector(self, connection):
121 """ Provide one side of a link between two :class:`Dispatcher`s.
122 It is assumed the :func:`add_connector` method will be called
123 on the other :class:`Dispatcher` with the other side of the
124 Pipe.
126 with self._pipes_lock:
127 self.pipes[connection] = ['ALL']
129 def remove_connector(self, connection):
130 """ Remove the :data:`connection` to another :class:`Dispatcher`.
132 try:
133 connection.send(Message('PIPE-CLOSE', ''))
134 except IOError:
135 # This pipe may have been closed already.
136 logger.warning('attempted to send on closed Pipe '
137 '({PIPE}), continuing...'.format(PIPE=connection))
138 self._close_connection(connection)
140 def _close_connection(self, connection):
141 """ Close the :class:`Connection` object passed in
142 :data:`connection`.
144 with self._pipes_lock:
145 connection.close()
146 if connection in self.pipes:
147 del self.pipes[connection]
149 def _check_message(self, msg, connection):
150 """ Process :data:`msg` that arrived on :data:`connection`.
152 if msg.topic == 'PIPE-CLOSE':
153 self._close_connection(connection)
155 def _run(self):
156 """ Watch for incoming messages and dispatch to subscribers.
158 while self._watching.is_set():
159 with self._pipes_lock:
160 pipes = self.pipes.keys()
161 rlist, _, _ = select.select(pipes, [], [], 0.05)
162 for rfd in rlist:
163 try:
164 msg = rfd.recv()
165 except (EOFError, IOError):
166 logger.warning('read on closed Pipe '
167 '({FD}), continuing...'.format(FD=rfd))
168 self._close_connection(rfd)
169 else:
170 self._check_message(msg, rfd)
171 for p,t in self.pipes.items():
172 if ((rfd is not p) and
173 ((msg.topic in t) or ('ALL' in t)) and
174 (not msg.topic.startswith('PIPE-'))):
175 p.send(msg)
177 def start(self):
178 """ Start running the Dispatcher's event loop in a thread.
180 # Only allow one event loop thread.
181 if self._thread is None:
182 self._thread = threading.Thread(None, self._run,
183 'dispatcher_event_loop:{NAME}'.format(NAME=self), ())
184 self._thread.start()
186 def join(self):
187 """ Stop the Dispatcher's event loop thread.
189 if self._thread is not None:
190 self._thread.join()
191 self._thread = None