1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <robinson@tuxfamily.org>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to:
20 # Free Software Foundation, Inc.
21 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
25 from __future__
import unicode_literals
27 from collections
import namedtuple
29 from multiprocessing
import Event
, Pipe
, RLock
35 logger
= logging
.getLogger(__name__
)
38 class Message(namedtuple('Message', 'topic, details, ts')):
39 """ Message to be passed between WiFi Radar components.
43 def __new__(_cls
, topic
, details
, ts
=None):
44 """ Build a Message to pass.
48 return tuple.__new
__(_cls
, (topic
, details
, ts
))
51 def bridge(local
, foreign
):
52 """ :func:`bridge` helps link two :class:`Dispatcher` objects.
55 local
.add_connector(a
)
56 foreign
.add_connector(b
)
60 class Dispatcher(object):
61 """ Dispatcher provides the base infrastruture for the WiFi Radar
62 publish/subscribe pattern. One Dispatcher should run in each
65 def __init__(self
, auto_start
=True):
66 """ Create an empty Dispatcher.
71 self
._pipes
_lock
= RLock()
72 self
._watching
= Event()
78 """ Close all pipes when the Dispatcher object is garbage
84 """ End the Dispatcher. :func:`close` joins the message processing
85 thread, which may delay the return.
87 # Stop the run loop before shutting everything down.
88 self
._watching
.clear()
89 # Work on a pre-built list of the keys, because we are deleting keys.
90 for pipe
in list(self
.pipes
):
91 self
._close
_connection
(pipe
)
94 def subscribe(self
, topics
=None):
95 """ Subscribe to messages with a topic in :data:`topics`.
96 :data:`topics` is a **str** (for one topic) or an **iterable**
97 (for more than one topic).
101 if isinstance(topics
, str):
103 topics
= list(topics
)
104 topics
.append('EXIT')
108 with self
._pipes
_lock
:
109 self
.pipes
[a
] = topics
112 def unsubscribe(self
, connection
):
113 """ Close the subscribed pipe, :param connection:.
115 with self
._pipes
_lock
:
116 self
._close
_connection
(self
._pairs
[connection
])
117 self
._close
_connection
(connection
)
118 del self
._pairs
[connection
]
120 def add_connector(self
, connection
):
121 """ Provide one side of a link between two :class:`Dispatcher`s.
122 It is assumed the :func:`add_connector` method will be called
123 on the other :class:`Dispatcher` with the other side of the
126 with self
._pipes
_lock
:
127 self
.pipes
[connection
] = ['ALL']
129 def remove_connector(self
, connection
):
130 """ Remove the :data:`connection` to another :class:`Dispatcher`.
133 connection
.send(Message('PIPE-CLOSE', ''))
135 # This pipe may have been closed already.
136 logger
.warning('attempted to send on closed Pipe '
137 '({PIPE}), continuing...'.format(PIPE
=connection
))
138 self
._close
_connection
(connection
)
140 def _close_connection(self
, connection
):
141 """ Close the :class:`Connection` object passed in
144 with self
._pipes
_lock
:
146 if connection
in self
.pipes
:
147 del self
.pipes
[connection
]
149 def _check_message(self
, msg
, connection
):
150 """ Process :data:`msg` that arrived on :data:`connection`.
152 if msg
.topic
== 'PIPE-CLOSE':
153 self
._close
_connection
(connection
)
156 """ Watch for incoming messages and dispatch to subscribers.
158 while self
._watching
.is_set():
159 with self
._pipes
_lock
:
160 pipes
= self
.pipes
.keys()
161 rlist
, _
, _
= select
.select(pipes
, [], [], 0.05)
165 except (EOFError, IOError):
166 logger
.warning('read on closed Pipe '
167 '({FD}), continuing...'.format(FD
=rfd
))
168 self
._close
_connection
(rfd
)
170 self
._check
_message
(msg
, rfd
)
171 for p
,t
in self
.pipes
.items():
172 if ((rfd
is not p
) and
173 ((msg
.topic
in t
) or ('ALL' in t
)) and
174 (not msg
.topic
.startswith('PIPE-'))):
178 """ Start running the Dispatcher's event loop in a thread.
180 # Only allow one event loop thread.
181 if self
._thread
is None:
182 self
._thread
= threading
.Thread(None, self
._run
,
183 'dispatcher_event_loop:{NAME}'.format(NAME
=self
), ())
187 """ Stop the Dispatcher's event loop thread.
189 if self
._thread
is not None: