1 //! Thread safe communication channel implementing `Evented`
2 use lazycell::{AtomicLazyCell, LazyCell};
3 use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{mpsc, Arc};
10 /// Creates a new asynchronous channel, where the `Receiver` can be registered
12 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
13 let (tx_ctl, rx_ctl) = ctl_pair();
14 let (tx, rx) = mpsc::channel();
16 let tx = Sender { tx, ctl: tx_ctl };
18 let rx = Receiver { rx, ctl: rx_ctl };
23 /// Creates a new synchronous, bounded channel where the `Receiver` can be
24 /// registered with `Poll`.
25 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
26 let (tx_ctl, rx_ctl) = ctl_pair();
27 let (tx, rx) = mpsc::sync_channel(bound);
29 let tx = SyncSender { tx, ctl: tx_ctl };
31 let rx = Receiver { rx, ctl: rx_ctl };
36 fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
37 let inner = Arc::new(Inner {
38 pending: AtomicUsize::new(0),
39 senders: AtomicUsize::new(1),
40 set_readiness: AtomicLazyCell::new(),
44 inner: Arc::clone(&inner),
47 let rx = ReceiverCtl {
48 registration: LazyCell::new(),
55 /// Tracks messages sent on a channel in order to update readiness.
60 /// Tracks messages received on a channel in order to track readiness.
62 registration: LazyCell<Registration>,
66 /// The sending half of a channel.
67 pub struct Sender<T> {
72 /// The sending half of a synchronous channel.
73 pub struct SyncSender<T> {
74 tx: mpsc::SyncSender<T>,
78 /// The receiving half of a channel.
79 pub struct Receiver<T> {
80 rx: mpsc::Receiver<T>,
84 /// An error returned from the `Sender::send` or `SyncSender::send` function.
85 pub enum SendError<T> {
89 /// The receiving half of the channel has disconnected.
93 /// An error returned from the `SyncSender::try_send` function.
94 pub enum TrySendError<T> {
98 /// Data could not be sent because it would require the callee to block.
101 /// The receiving half of the channel has disconnected.
106 // The number of outstanding messages for the receiver to read
107 pending: AtomicUsize,
108 // The number of sender handles
109 senders: AtomicUsize,
110 // The set readiness handle
111 set_readiness: AtomicLazyCell<SetReadiness>,
115 /// Attempts to send a value on this channel, returning it back if it could not be sent.
116 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
117 self.tx.send(t).map_err(SendError::from).and_then(|_| {
118 try!(self.ctl.inc());
124 impl<T> Clone for Sender<T> {
125 fn clone(&self) -> Sender<T> {
128 ctl: self.ctl.clone(),
133 impl<T> SyncSender<T> {
134 /// Sends a value on this synchronous channel.
136 /// This function will *block* until space in the internal buffer becomes
137 /// available or a receiver is available to hand off the message to.
138 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
139 self.tx.send(t).map_err(From::from).and_then(|_| {
140 try!(self.ctl.inc());
145 /// Attempts to send a value on this channel without blocking.
147 /// This method differs from `send` by returning immediately if the channel's
148 /// buffer is full or no receiver is waiting to acquire some data.
149 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
150 self.tx.try_send(t).map_err(From::from).and_then(|_| {
151 try!(self.ctl.inc());
157 impl<T> Clone for SyncSender<T> {
158 fn clone(&self) -> SyncSender<T> {
161 ctl: self.ctl.clone(),
166 impl<T> Receiver<T> {
167 /// Attempts to return a pending value on this receiver without blocking.
168 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
169 self.rx.try_recv().and_then(|res| {
170 let _ = self.ctl.dec();
176 impl<T> Evented for Receiver<T> {
183 ) -> io::Result<()> {
184 self.ctl.register(poll, token, interest, opts)
193 ) -> io::Result<()> {
194 self.ctl.reregister(poll, token, interest, opts)
197 fn deregister(&self, poll: &Poll) -> io::Result<()> {
198 self.ctl.deregister(poll)
204 * ===== SenderCtl / ReceiverCtl =====
209 /// Call to track that a message has been sent
210 fn inc(&self) -> io::Result<()> {
211 let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
214 // Toggle readiness to readable
215 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
216 try!(set_readiness.set_readiness(Ready::readable()));
224 impl Clone for SenderCtl {
225 fn clone(&self) -> SenderCtl {
226 self.inner.senders.fetch_add(1, Ordering::Relaxed);
228 inner: Arc::clone(&self.inner),
233 impl Drop for SenderCtl {
235 if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
242 fn dec(&self) -> io::Result<()> {
243 let first = self.inner.pending.load(Ordering::Acquire);
247 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
248 try!(set_readiness.set_readiness(Ready::empty()));
253 let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
255 if first == 1 && second > 1 {
256 // There are still pending messages. Since readiness was
257 // previously unset, it must be reset here
258 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
259 try!(set_readiness.set_readiness(Ready::readable()));
267 impl Evented for ReceiverCtl {
274 ) -> io::Result<()> {
275 if self.registration.borrow().is_some() {
276 return Err(io::Error::new(
277 io::ErrorKind::Other,
278 "receiver already registered",
282 let (registration, set_readiness) = Registration::new2();
283 poll.register(®istration, token, interest, opts)?;
285 if self.inner.pending.load(Ordering::Relaxed) > 0 {
286 // TODO: Don't drop readiness
287 let _ = set_readiness.set_readiness(Ready::readable());
292 .expect("unexpected state encountered");
296 .expect("unexpected state encountered");
307 ) -> io::Result<()> {
308 match self.registration.borrow() {
309 Some(registration) => poll.reregister(registration, token, interest, opts),
310 None => Err(io::Error::new(
311 io::ErrorKind::Other,
312 "receiver not registered",
317 fn deregister(&self, poll: &Poll) -> io::Result<()> {
318 match self.registration.borrow() {
319 Some(registration) => poll.deregister(registration),
320 None => Err(io::Error::new(
321 io::ErrorKind::Other,
322 "receiver not registered",
330 * ===== Error conversions =====
334 impl<T> From<mpsc::SendError<T>> for SendError<T> {
335 fn from(src: mpsc::SendError<T>) -> SendError<T> {
336 SendError::Disconnected(src.0)
340 impl<T> From<io::Error> for SendError<T> {
341 fn from(src: io::Error) -> SendError<T> {
346 impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
347 fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
349 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
350 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
355 impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
356 fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
357 TrySendError::Disconnected(src.0)
361 impl<T> From<io::Error> for TrySendError<T> {
362 fn from(src: io::Error) -> TrySendError<T> {
363 TrySendError::Io(src)
369 * ===== Implement Error, Debug and Display for Errors =====
373 impl<T: Any> error::Error for SendError<T> {
374 fn description(&self) -> &str {
376 SendError::Io(ref io_err) => io_err.description(),
377 SendError::Disconnected(..) => "Disconnected",
382 impl<T: Any> error::Error for TrySendError<T> {
383 fn description(&self) -> &str {
385 TrySendError::Io(ref io_err) => io_err.description(),
386 TrySendError::Full(..) => "Full",
387 TrySendError::Disconnected(..) => "Disconnected",
392 impl<T> fmt::Debug for SendError<T> {
393 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394 format_send_error(self, f)
398 impl<T> fmt::Display for SendError<T> {
399 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400 format_send_error(self, f)
404 impl<T> fmt::Debug for TrySendError<T> {
405 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
406 format_try_send_error(self, f)
410 impl<T> fmt::Display for TrySendError<T> {
411 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
412 format_try_send_error(self, f)
417 fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
419 SendError::Io(ref io_err) => write!(f, "{}", io_err),
420 SendError::Disconnected(..) => write!(f, "Disconnected"),
425 fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
427 TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
428 TrySendError::Full(..) => write!(f, "Full"),
429 TrySendError::Disconnected(..) => write!(f, "Disconnected"),