Bug 1687945 [wpt PR 27273] - [resources] Fix conftest.py for pytest>6, a=testonly
[gecko.git] / third_party / rust / mio-extras / src / channel.rs
blob9ebc73bb7f63293993e97aa4dcd1afee31e99c08
1 //! Thread safe communication channel implementing `Evented`
2 use lazycell::{AtomicLazyCell, LazyCell};
3 use mio::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
4 use std::any::Any;
5 use std::error;
6 use std::sync::atomic::{AtomicUsize, Ordering};
7 use std::sync::{mpsc, Arc};
8 use std::{fmt, io};
10 /// Creates a new asynchronous channel, where the `Receiver` can be registered
11 /// with `Poll`.
12 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
13     let (tx_ctl, rx_ctl) = ctl_pair();
14     let (tx, rx) = mpsc::channel();
16     let tx = Sender { tx, ctl: tx_ctl };
18     let rx = Receiver { rx, ctl: rx_ctl };
20     (tx, rx)
23 /// Creates a new synchronous, bounded channel where the `Receiver` can be
24 /// registered with `Poll`.
25 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
26     let (tx_ctl, rx_ctl) = ctl_pair();
27     let (tx, rx) = mpsc::sync_channel(bound);
29     let tx = SyncSender { tx, ctl: tx_ctl };
31     let rx = Receiver { rx, ctl: rx_ctl };
33     (tx, rx)
36 fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
37     let inner = Arc::new(Inner {
38         pending: AtomicUsize::new(0),
39         senders: AtomicUsize::new(1),
40         set_readiness: AtomicLazyCell::new(),
41     });
43     let tx = SenderCtl {
44         inner: Arc::clone(&inner),
45     };
47     let rx = ReceiverCtl {
48         registration: LazyCell::new(),
49         inner,
50     };
52     (tx, rx)
55 /// Tracks messages sent on a channel in order to update readiness.
56 struct SenderCtl {
57     inner: Arc<Inner>,
60 /// Tracks messages received on a channel in order to track readiness.
61 struct ReceiverCtl {
62     registration: LazyCell<Registration>,
63     inner: Arc<Inner>,
66 /// The sending half of a channel.
67 pub struct Sender<T> {
68     tx: mpsc::Sender<T>,
69     ctl: SenderCtl,
72 /// The sending half of a synchronous channel.
73 pub struct SyncSender<T> {
74     tx: mpsc::SyncSender<T>,
75     ctl: SenderCtl,
78 /// The receiving half of a channel.
79 pub struct Receiver<T> {
80     rx: mpsc::Receiver<T>,
81     ctl: ReceiverCtl,
84 /// An error returned from the `Sender::send` or `SyncSender::send` function.
85 pub enum SendError<T> {
86     /// An IO error.
87     Io(io::Error),
89     /// The receiving half of the channel has disconnected.
90     Disconnected(T),
93 /// An error returned from the `SyncSender::try_send` function.
94 pub enum TrySendError<T> {
95     /// An IO error.
96     Io(io::Error),
98     /// Data could not be sent because it would require the callee to block.
99     Full(T),
101     /// The receiving half of the channel has disconnected.
102     Disconnected(T),
105 struct Inner {
106     // The number of outstanding messages for the receiver to read
107     pending: AtomicUsize,
108     // The number of sender handles
109     senders: AtomicUsize,
110     // The set readiness handle
111     set_readiness: AtomicLazyCell<SetReadiness>,
114 impl<T> Sender<T> {
115     /// Attempts to send a value on this channel, returning it back if it could not be sent.
116     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
117         self.tx.send(t).map_err(SendError::from).and_then(|_| {
118             try!(self.ctl.inc());
119             Ok(())
120         })
121     }
124 impl<T> Clone for Sender<T> {
125     fn clone(&self) -> Sender<T> {
126         Sender {
127             tx: self.tx.clone(),
128             ctl: self.ctl.clone(),
129         }
130     }
133 impl<T> SyncSender<T> {
134     /// Sends a value on this synchronous channel.
135     ///
136     /// This function will *block* until space in the internal buffer becomes
137     /// available or a receiver is available to hand off the message to.
138     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
139         self.tx.send(t).map_err(From::from).and_then(|_| {
140             try!(self.ctl.inc());
141             Ok(())
142         })
143     }
145     /// Attempts to send a value on this channel without blocking.
146     ///
147     /// This method differs from `send` by returning immediately if the channel's
148     /// buffer is full or no receiver is waiting to acquire some data.
149     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
150         self.tx.try_send(t).map_err(From::from).and_then(|_| {
151             try!(self.ctl.inc());
152             Ok(())
153         })
154     }
157 impl<T> Clone for SyncSender<T> {
158     fn clone(&self) -> SyncSender<T> {
159         SyncSender {
160             tx: self.tx.clone(),
161             ctl: self.ctl.clone(),
162         }
163     }
166 impl<T> Receiver<T> {
167     /// Attempts to return a pending value on this receiver without blocking.
168     pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
169         self.rx.try_recv().and_then(|res| {
170             let _ = self.ctl.dec();
171             Ok(res)
172         })
173     }
176 impl<T> Evented for Receiver<T> {
177     fn register(
178         &self,
179         poll: &Poll,
180         token: Token,
181         interest: Ready,
182         opts: PollOpt,
183     ) -> io::Result<()> {
184         self.ctl.register(poll, token, interest, opts)
185     }
187     fn reregister(
188         &self,
189         poll: &Poll,
190         token: Token,
191         interest: Ready,
192         opts: PollOpt,
193     ) -> io::Result<()> {
194         self.ctl.reregister(poll, token, interest, opts)
195     }
197     fn deregister(&self, poll: &Poll) -> io::Result<()> {
198         self.ctl.deregister(poll)
199     }
204  * ===== SenderCtl / ReceiverCtl =====
206  */
208 impl SenderCtl {
209     /// Call to track that a message has been sent
210     fn inc(&self) -> io::Result<()> {
211         let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
213         if 0 == cnt {
214             // Toggle readiness to readable
215             if let Some(set_readiness) = self.inner.set_readiness.borrow() {
216                 try!(set_readiness.set_readiness(Ready::readable()));
217             }
218         }
220         Ok(())
221     }
224 impl Clone for SenderCtl {
225     fn clone(&self) -> SenderCtl {
226         self.inner.senders.fetch_add(1, Ordering::Relaxed);
227         SenderCtl {
228             inner: Arc::clone(&self.inner),
229         }
230     }
233 impl Drop for SenderCtl {
234     fn drop(&mut self) {
235         if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
236             let _ = self.inc();
237         }
238     }
241 impl ReceiverCtl {
242     fn dec(&self) -> io::Result<()> {
243         let first = self.inner.pending.load(Ordering::Acquire);
245         if first == 1 {
246             // Unset readiness
247             if let Some(set_readiness) = self.inner.set_readiness.borrow() {
248                 try!(set_readiness.set_readiness(Ready::empty()));
249             }
250         }
252         // Decrement
253         let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
255         if first == 1 && second > 1 {
256             // There are still pending messages. Since readiness was
257             // previously unset, it must be reset here
258             if let Some(set_readiness) = self.inner.set_readiness.borrow() {
259                 try!(set_readiness.set_readiness(Ready::readable()));
260             }
261         }
263         Ok(())
264     }
267 impl Evented for ReceiverCtl {
268     fn register(
269         &self,
270         poll: &Poll,
271         token: Token,
272         interest: Ready,
273         opts: PollOpt,
274     ) -> io::Result<()> {
275         if self.registration.borrow().is_some() {
276             return Err(io::Error::new(
277                 io::ErrorKind::Other,
278                 "receiver already registered",
279             ));
280         }
282         let (registration, set_readiness) = Registration::new2();
283         poll.register(&registration, token, interest, opts)?;
285         if self.inner.pending.load(Ordering::Relaxed) > 0 {
286             // TODO: Don't drop readiness
287             let _ = set_readiness.set_readiness(Ready::readable());
288         }
290         self.registration
291             .fill(registration)
292             .expect("unexpected state encountered");
293         self.inner
294             .set_readiness
295             .fill(set_readiness)
296             .expect("unexpected state encountered");
298         Ok(())
299     }
301     fn reregister(
302         &self,
303         poll: &Poll,
304         token: Token,
305         interest: Ready,
306         opts: PollOpt,
307     ) -> io::Result<()> {
308         match self.registration.borrow() {
309             Some(registration) => poll.reregister(registration, token, interest, opts),
310             None => Err(io::Error::new(
311                 io::ErrorKind::Other,
312                 "receiver not registered",
313             )),
314         }
315     }
317     fn deregister(&self, poll: &Poll) -> io::Result<()> {
318         match self.registration.borrow() {
319             Some(registration) => poll.deregister(registration),
320             None => Err(io::Error::new(
321                 io::ErrorKind::Other,
322                 "receiver not registered",
323             )),
324         }
325     }
330  * ===== Error conversions =====
332  */
334 impl<T> From<mpsc::SendError<T>> for SendError<T> {
335     fn from(src: mpsc::SendError<T>) -> SendError<T> {
336         SendError::Disconnected(src.0)
337     }
340 impl<T> From<io::Error> for SendError<T> {
341     fn from(src: io::Error) -> SendError<T> {
342         SendError::Io(src)
343     }
346 impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
347     fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
348         match src {
349             mpsc::TrySendError::Full(v) => TrySendError::Full(v),
350             mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
351         }
352     }
355 impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
356     fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
357         TrySendError::Disconnected(src.0)
358     }
361 impl<T> From<io::Error> for TrySendError<T> {
362     fn from(src: io::Error) -> TrySendError<T> {
363         TrySendError::Io(src)
364     }
369  * ===== Implement Error, Debug and Display for Errors =====
371  */
373 impl<T: Any> error::Error for SendError<T> {
374     fn description(&self) -> &str {
375         match *self {
376             SendError::Io(ref io_err) => io_err.description(),
377             SendError::Disconnected(..) => "Disconnected",
378         }
379     }
382 impl<T: Any> error::Error for TrySendError<T> {
383     fn description(&self) -> &str {
384         match *self {
385             TrySendError::Io(ref io_err) => io_err.description(),
386             TrySendError::Full(..) => "Full",
387             TrySendError::Disconnected(..) => "Disconnected",
388         }
389     }
392 impl<T> fmt::Debug for SendError<T> {
393     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394         format_send_error(self, f)
395     }
398 impl<T> fmt::Display for SendError<T> {
399     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400         format_send_error(self, f)
401     }
404 impl<T> fmt::Debug for TrySendError<T> {
405     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
406         format_try_send_error(self, f)
407     }
410 impl<T> fmt::Display for TrySendError<T> {
411     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
412         format_try_send_error(self, f)
413     }
416 #[inline]
417 fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
418     match *e {
419         SendError::Io(ref io_err) => write!(f, "{}", io_err),
420         SendError::Disconnected(..) => write!(f, "Disconnected"),
421     }
424 #[inline]
425 fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
426     match *e {
427         TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
428         TrySendError::Full(..) => write!(f, "Full"),
429         TrySendError::Disconnected(..) => write!(f, "Disconnected"),
430     }