1 # -*- coding: utf-8 -*-
3 # pubsub.py - publish/subscribe support
5 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
7 # Copyright (C) 2014 Sean Robinson <seankrobinson@gmail.com>
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; version 2 of the License.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License in LICENSE.GPL for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
24 from collections
import namedtuple
26 from multiprocessing
import Event
, Pipe
, RLock
30 logger
= logging
.getLogger(__name__
)
33 Message
= namedtuple('Message', 'topic, details')
36 class Dispatcher(object):
37 """ Dispatcher provides the base infrastruture for the WiFi Radar
38 publish/subscribe pattern. One Dispatcher should run in each
42 """ Create an empty Dispatcher.
46 self
._pipes
_lock
= RLock()
47 self
._watching
= Event()
51 """ Close all pipes when the Dispatcher object is garbage
57 """ End the Dispatcher.
59 # Stop the run loop before shutting everything down.
60 self
._watching
.clear()
61 # Work on a pre-built list of the keys, because we are deleting keys.
62 for pipe
in list(self
.pipes
):
63 self
._close
_connection
(pipe
)
65 def subscribe(self
, topics
=None):
66 """ Subscribe to messages with a topic in :data:`topics`.
67 :data:`topics` is a **str** (for one topic) or an **iterable**
68 (for more than one topic).
72 if isinstance(topics
, str):
79 with self
._pipes
_lock
:
80 self
.pipes
[a
] = topics
83 def unsubscribe(self
, connection
):
84 """ Close the subscribed pipe, :param connection:.
86 with self
._pipes
_lock
:
87 self
._close
_connection
(self
._pairs
[connection
])
88 self
._close
_connection
(connection
)
89 del self
._pairs
[connection
]
91 def add_connector(self
, connection
):
92 """ Provide one side of a link between two :class:`Dispatcher`s.
93 It is assumed the :func:`add_connector` method will be called
94 on the other :class:`Dispatcher` with the other side of the
97 with self
._pipes
_lock
:
98 self
.pipes
[connection
] = ['ALL']
100 def remove_connector(self
, connection
):
101 """ Remove the :data:`connection` to another :class:`Dispatcher`.
104 connection
.send(Message('PIPE-CLOSE', ''))
106 # This pipe may have been closed already.
107 logger
.warning('attempted to send on closed ' +
108 'Pipe ({}), continuing...'.format(connection
))
109 self
._close
_connection
(connection
)
111 def _close_connection(self
, connection
):
112 """ Close the :class:`Connection` object passed in
115 with self
._pipes
_lock
:
117 if connection
in self
.pipes
:
118 del self
.pipes
[connection
]
120 def _check_message(self
, msg
, connection
):
121 """ Process :data:`msg` that arrived on :data:`connection`.
123 if msg
.topic
== 'PIPE-CLOSE':
124 self
._close
_connection
(connection
)
127 """ Watch for incoming messages and dispatch to subscribers.
129 while self
._watching
.is_set():
130 with self
._pipes
_lock
:
131 pipes
= self
.pipes
.keys()
132 rlist
, _
, _
= select
.select(pipes
, [], [], 0.05)
136 except (EOFError, IOError):
137 logger
.warning('read on closed ' +
138 'Pipe ({}), continuing...'.format(rfd
))
139 self
._close
_connection
(rfd
)
141 self
._check
_message
(msg
, rfd
)
142 for p
,t
in self
.pipes
.items():
143 if ((rfd
is not p
) and
144 ((msg
.topic
in t
) or (t
== ['ALL']))):