Rename PreferencesEditor save to apply
[wifi-radar.git] / wifiradar / pubsub.py
blob0e3deaea1321ee35819d73c7f28d9d0087359323
1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <seankrobinson@gmail.com>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 from collections import namedtuple
25 import logging
26 from multiprocessing import Event, Pipe, RLock
27 import select
28 import threading
29 import time
31 # create a logger
32 logger = logging.getLogger(__name__)
35 class Message(namedtuple('Message', 'topic, details, ts')):
36 """ Message to be passed between WiFi Radar components.
37 """
38 __slots__ = ()
40 def __new__(_cls, topic, details, ts=None):
41 """ Build a Message to pass.
42 """
43 topic = topic.upper()
44 ts = time.time()
45 return tuple.__new__(_cls, (topic, details, ts))
48 def bridge(local, foreign):
49 """ :func:`bridge` helps link two :class:`Dispatcher` objects.
50 """
51 a, b = Pipe()
52 local.add_connector(a)
53 foreign.add_connector(b)
54 return (a, b)
57 class Dispatcher(object):
58 """ Dispatcher provides the base infrastruture for the WiFi Radar
59 publish/subscribe pattern. One Dispatcher should run in each
60 process.
61 """
62 def __init__(self):
63 """ Create an empty Dispatcher.
64 """
65 self.pipes = dict()
66 self._pairs = dict()
67 self._pipes_lock = RLock()
68 self._watching = Event()
69 self._watching.set()
70 self.start()
72 def __del__(self):
73 """ Close all pipes when the Dispatcher object is garbage
74 collected.
75 """
76 self.close()
78 def close(self):
79 """ End the Dispatcher. :func:`close` joins the message processing
80 thread, which may delay the return.
81 """
82 # Stop the run loop before shutting everything down.
83 self._watching.clear()
84 # Work on a pre-built list of the keys, because we are deleting keys.
85 for pipe in list(self.pipes):
86 self._close_connection(pipe)
87 self.join()
89 def subscribe(self, topics=None):
90 """ Subscribe to messages with a topic in :data:`topics`.
91 :data:`topics` is a **str** (for one topic) or an **iterable**
92 (for more than one topic).
93 """
94 if topics is None:
95 topics = list()
96 if isinstance(topics, str):
97 topics = [topics]
98 topics = list(topics)
99 topics.append('EXIT')
101 a, b = Pipe()
102 self._pairs[b] = a
103 with self._pipes_lock:
104 self.pipes[a] = topics
105 return b
107 def unsubscribe(self, connection):
108 """ Close the subscribed pipe, :param connection:.
110 with self._pipes_lock:
111 self._close_connection(self._pairs[connection])
112 self._close_connection(connection)
113 del self._pairs[connection]
115 def add_connector(self, connection):
116 """ Provide one side of a link between two :class:`Dispatcher`s.
117 It is assumed the :func:`add_connector` method will be called
118 on the other :class:`Dispatcher` with the other side of the
119 Pipe.
121 with self._pipes_lock:
122 self.pipes[connection] = ['ALL']
124 def remove_connector(self, connection):
125 """ Remove the :data:`connection` to another :class:`Dispatcher`.
127 try:
128 connection.send(Message('PIPE-CLOSE', ''))
129 except IOError:
130 # This pipe may have been closed already.
131 logger.warning('attempted to send on closed ' +
132 'Pipe ({}), continuing...'.format(connection))
133 self._close_connection(connection)
135 def _close_connection(self, connection):
136 """ Close the :class:`Connection` object passed in
137 :data:`connection`.
139 with self._pipes_lock:
140 connection.close()
141 if connection in self.pipes:
142 del self.pipes[connection]
144 def _check_message(self, msg, connection):
145 """ Process :data:`msg` that arrived on :data:`connection`.
147 if msg.topic == 'PIPE-CLOSE':
148 self._close_connection(connection)
150 def _run(self):
151 """ Watch for incoming messages and dispatch to subscribers.
153 while self._watching.is_set():
154 with self._pipes_lock:
155 pipes = self.pipes.keys()
156 rlist, _, _ = select.select(pipes, [], [], 0.05)
157 for rfd in rlist:
158 try:
159 msg = rfd.recv()
160 except (EOFError, IOError):
161 logger.warning('read on closed ' +
162 'Pipe ({}), continuing...'.format(rfd))
163 self._close_connection(rfd)
164 else:
165 self._check_message(msg, rfd)
166 for p,t in self.pipes.items():
167 if ((rfd is not p) and
168 ((msg.topic in t) or ('ALL' in t)) and
169 (not msg.topic.startswith('PIPE-'))):
170 p.send(msg)
172 def start(self):
173 """ Start running the Dispatcher's event loop in a thread.
175 self._thread = threading.Thread(None, self._run,
176 'dispatcher_event_loop:{}'.format(self), ())
177 self._thread.start()
179 def join(self):
180 """ Stop the Dispatcher's event loop thread.
182 self._thread.join()