1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <seankrobinson@gmail.com>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 from collections
import namedtuple
26 from multiprocessing
import Event
, Pipe
, RLock
32 logger
= logging
.getLogger(__name__
)
35 class Message(namedtuple('Message', 'topic, details, ts')):
36 """ Message to be passed between WiFi Radar components.
40 def __new__(_cls
, topic
, details
, ts
=None):
41 """ Build a Message to pass.
45 return tuple.__new
__(_cls
, (topic
, details
, ts
))
48 def bridge(local
, foreign
):
49 """ :func:`bridge` helps link two :class:`Dispatcher` objects.
52 local
.add_connector(a
)
53 foreign
.add_connector(b
)
57 class Dispatcher(object):
58 """ Dispatcher provides the base infrastruture for the WiFi Radar
59 publish/subscribe pattern. One Dispatcher should run in each
63 """ Create an empty Dispatcher.
67 self
._pipes
_lock
= RLock()
68 self
._watching
= Event()
73 """ Close all pipes when the Dispatcher object is garbage
79 """ End the Dispatcher. :func:`close` joins the message processing
80 thread, which may delay the return.
82 # Stop the run loop before shutting everything down.
83 self
._watching
.clear()
84 # Work on a pre-built list of the keys, because we are deleting keys.
85 for pipe
in list(self
.pipes
):
86 self
._close
_connection
(pipe
)
89 def subscribe(self
, topics
=None):
90 """ Subscribe to messages with a topic in :data:`topics`.
91 :data:`topics` is a **str** (for one topic) or an **iterable**
92 (for more than one topic).
96 if isinstance(topics
, str):
103 with self
._pipes
_lock
:
104 self
.pipes
[a
] = topics
107 def unsubscribe(self
, connection
):
108 """ Close the subscribed pipe, :param connection:.
110 with self
._pipes
_lock
:
111 self
._close
_connection
(self
._pairs
[connection
])
112 self
._close
_connection
(connection
)
113 del self
._pairs
[connection
]
115 def add_connector(self
, connection
):
116 """ Provide one side of a link between two :class:`Dispatcher`s.
117 It is assumed the :func:`add_connector` method will be called
118 on the other :class:`Dispatcher` with the other side of the
121 with self
._pipes
_lock
:
122 self
.pipes
[connection
] = ['ALL']
124 def remove_connector(self
, connection
):
125 """ Remove the :data:`connection` to another :class:`Dispatcher`.
128 connection
.send(Message('PIPE-CLOSE', ''))
130 # This pipe may have been closed already.
131 logger
.warning('attempted to send on closed ' +
132 'Pipe ({}), continuing...'.format(connection
))
133 self
._close
_connection
(connection
)
135 def _close_connection(self
, connection
):
136 """ Close the :class:`Connection` object passed in
139 with self
._pipes
_lock
:
141 if connection
in self
.pipes
:
142 del self
.pipes
[connection
]
144 def _check_message(self
, msg
, connection
):
145 """ Process :data:`msg` that arrived on :data:`connection`.
147 if msg
.topic
== 'PIPE-CLOSE':
148 self
._close
_connection
(connection
)
151 """ Watch for incoming messages and dispatch to subscribers.
153 while self
._watching
.is_set():
154 with self
._pipes
_lock
:
155 pipes
= self
.pipes
.keys()
156 rlist
, _
, _
= select
.select(pipes
, [], [], 0.05)
160 except (EOFError, IOError):
161 logger
.warning('read on closed ' +
162 'Pipe ({}), continuing...'.format(rfd
))
163 self
._close
_connection
(rfd
)
165 self
._check
_message
(msg
, rfd
)
166 for p
,t
in self
.pipes
.items():
167 if ((rfd
is not p
) and
168 ((msg
.topic
in t
) or ('ALL' in t
)) and
169 (not msg
.topic
.startswith('PIPE-'))):
173 """ Start running the Dispatcher's event loop in a thread.
175 self
._thread
= threading
.Thread(None, self
._run
,
176 'dispatcher_event_loop:{}'.format(self
), ())
180 """ Stop the Dispatcher's event loop thread.