Backed out 5 changesets (bug 1890092, bug 1888683) for causing build bustages & crash...
[gecko.git] / third_party / rust / oneshot-uniffi / src / lib.rs
blob94bb35d12ac02b904e972d156d881e092b693295
1 //! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance
2 //! can only transport a single message. This has a few nice outcomes. One thing is that
3 //! the implementation can be very efficient, utilizing the knowledge that there will
4 //! only be one message. But more importantly, it allows the API to be expressed in such
5 //! a way that certain edge cases that you don't want to care about when only sending a
6 //! single message on a channel does not exist. For example: The sender can't be copied
7 //! or cloned, and the send method takes ownership and consumes the sender.
8 //! So you are guaranteed, at the type level, that there can only be one message sent.
9 //!
10 //! The sender's send method is non-blocking, and potentially lock- and wait-free.
11 //! See documentation on [Sender::send] for situations where it might not be fully wait-free.
12 //! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time
13 //! limited thread blocking receive operations. The receiver also implements `Future` and
14 //! supports asynchronously awaiting the message.
15 //!
16 //!
17 //! # Examples
18 //!
19 //! This example sets up a background worker that processes requests coming in on a standard
20 //! mpsc channel and replies on a oneshot channel provided with each request. The worker can
21 //! be interacted with both from sync and async contexts since the oneshot receiver
22 //! can receive both blocking and async.
23 //!
24 //! ```rust
25 //! use std::sync::mpsc;
26 //! use std::thread;
27 //! use std::time::Duration;
28 //!
29 //! type Request = String;
30 //!
31 //! // Starts a background thread performing some computation on requests sent to it.
32 //! // Delivers the response back over a oneshot channel.
33 //! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender<usize>)> {
34 //!     let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender<usize>)>();
35 //!     thread::spawn(move || {
36 //!         for (request_data, response_sender) in request_receiver.iter() {
37 //!             let compute_operation = || request_data.len();
38 //!             let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel
39 //!         }
40 //!     });
41 //!     request_sender
42 //! }
43 //!
44 //! let processor = spawn_processing_thread();
45 //!
46 //! // If compiled with `std` the library can receive messages with timeout on regular threads
47 //! #[cfg(feature = "std")] {
48 //!     let (response_sender, response_receiver) = oneshot::channel();
49 //!     let request = Request::from("data from sync thread");
50 //!
51 //!     processor.send((request, response_sender)).expect("Processor down");
52 //!     match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel
53 //!         Ok(result) => println!("Processor returned {}", result),
54 //!         Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"),
55 //!         Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"),
56 //!     }
57 //! }
58 //!
59 //! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context
60 //! #[cfg(feature = "async")] {
61 //!     tokio::runtime::Runtime::new()
62 //!         .unwrap()
63 //!         .block_on(async move {
64 //!             let (response_sender, response_receiver) = oneshot::channel();
65 //!             let request = Request::from("data from sync thread");
66 //!
67 //!             processor.send((request, response_sender)).expect("Processor down");
68 //!             match response_receiver.await { // <- Receive on the oneshot channel asynchronously
69 //!                 Ok(result) => println!("Processor returned {}", result),
70 //!                 Err(_e) => panic!("Processor exited"),
71 //!             }
72 //!         });
73 //! }
74 //! ```
75 //!
76 //! # Sync vs async
77 //!
78 //! The main motivation for writing this library was that there were no (known to me) channel
79 //! implementations allowing you to seamlessly send messages between a normal thread and an async
80 //! task, or the other way around. If message passing is the way you are communicating, of course
81 //! that should work smoothly between the sync and async parts of the program!
82 //!
83 //! This library achieves that by having a fast and cheap send operation that can
84 //! be used in both sync threads and async tasks. The receiver has both thread blocking
85 //! receive methods for synchronous usage, and implements `Future` for asynchronous usage.
86 //!
87 //! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on
88 //! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should
89 //! be possible to use this library with any executor.
90 //!
92 // # Implementation description
94 // When a channel is created via the channel function, it creates a single heap allocation
95 // containing:
96 // * A one byte atomic integer that represents the current channel state,
97 // * Uninitialized memory to fit the message,
98 // * Uninitialized memory to fit the waker that can wake the receiving task or thread up.
100 // The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1].
101 // So with all features enabled (the default) each channel allocates 25 bytes plus the size of the
102 // message, plus any padding needed to get correct memory alignment.
104 // The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint
105 // to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to
106 // be consumed or dropped signal via the state that it is gone. And the second one see this and
107 // frees the memory.
109 // ## Footnotes
111 // [1]: Mind that the waker only takes zero bytes when all features are disabled, making it
112 //      impossible to *wait* for the message. `try_recv` the only available method in this scenario.
114 #![deny(rust_2018_idioms)]
115 #![cfg_attr(not(feature = "std"), no_std)]
117 #[cfg(not(loom))]
118 extern crate alloc;
120 use core::{
121     marker::PhantomData,
122     mem::{self, MaybeUninit},
123     ptr::{self, NonNull},
126 #[cfg(not(loom))]
127 use core::{
128     cell::UnsafeCell,
129     sync::atomic::{fence, AtomicU8, Ordering::*},
131 #[cfg(loom)]
132 use loom::{
133     cell::UnsafeCell,
134     sync::atomic::{fence, AtomicU8, Ordering::*},
137 #[cfg(all(feature = "async", not(loom)))]
138 use core::hint;
139 #[cfg(all(feature = "async", loom))]
140 use loom::hint;
142 #[cfg(feature = "async")]
143 use core::{
144     pin::Pin,
145     task::{self, Poll},
147 #[cfg(feature = "std")]
148 use std::time::{Duration, Instant};
150 #[cfg(feature = "std")]
151 mod thread {
152     #[cfg(not(loom))]
153     pub use std::thread::{current, park, park_timeout, yield_now, Thread};
155     #[cfg(loom)]
156     pub use loom::thread::{current, park, yield_now, Thread};
158     // loom does not support parking with a timeout. So we just
159     // yield. This means that the "park" will "spuriously" wake up
160     // way too early. But the code should properly handle this.
161     // One thing to note is that very short timeouts are needed
162     // when using loom, since otherwise the looping will cause
163     // an overflow in loom.
164     #[cfg(loom)]
165     pub fn park_timeout(_timeout: std::time::Duration) {
166         loom::thread::yield_now()
167     }
170 #[cfg(loom)]
171 mod loombox;
172 #[cfg(not(loom))]
173 use alloc::boxed::Box;
174 #[cfg(loom)]
175 use loombox::Box;
177 mod errors;
178 pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError};
180 /// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`].
181 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
182     // Allocate the channel on the heap and get the pointer.
183     // The last endpoint of the channel to be alive is responsible for freeing the channel
184     // and dropping any object that might have been written to it.
186     let channel_ptr = Box::into_raw(Box::new(Channel::new()));
188     // SAFETY: `channel_ptr` came from a Box and thus is not null
189     let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) };
191     (
192         Sender {
193             channel_ptr,
194             _invariant: PhantomData,
195         },
196         Receiver { channel_ptr },
197     )
200 #[derive(Debug)]
201 pub struct Sender<T> {
202     channel_ptr: NonNull<Channel<T>>,
203     // In reality we want contravariance, however we can't obtain that.
204     //
205     // Consider the following scenario:
206     // ```
207     // let (mut tx, rx) = channel::<&'short u8>();
208     // let (tx2, rx2) = channel::<&'long u8>();
209     //
210     // tx = tx2;
211     //
212     // // Pretend short_ref is some &'short u8
213     // tx.send(short_ref).unwrap();
214     // let long_ref = rx2.recv().unwrap();
215     // ```
216     //
217     // If this type were covariant then we could safely extend lifetimes, which is not okay.
218     // Hence, we enforce invariance.
219     _invariant: PhantomData<fn(T) -> T>,
222 #[derive(Debug)]
223 pub struct Receiver<T> {
224     // Covariance is the right choice here. Consider the example presented in Sender, and you'll
225     // see that if we replaced `rx` instead then we would get the expected behavior
226     channel_ptr: NonNull<Channel<T>>,
229 unsafe impl<T: Send> Send for Sender<T> {}
230 unsafe impl<T: Send> Send for Receiver<T> {}
231 impl<T> Unpin for Receiver<T> {}
233 impl<T> Sender<T> {
234     /// Sends `message` over the channel to the corresponding [`Receiver`].
235     ///
236     /// Returns an error if the receiver has already been dropped. The message can
237     /// be extracted from the error.
238     ///
239     /// This method is lock-free and wait-free when sending on a channel that the
240     /// receiver is currently not receiving on. If the receiver is receiving during the send
241     /// operation this method includes waking up the thread/task. Unparking a thread involves
242     /// a mutex in Rust's standard library at the time of writing this.
243     /// How lock-free waking up an async task is
244     /// depends on your executor. If this method returns a `SendError`, please mind that dropping
245     /// the error involves running any drop implementation on the message type, and freeing the
246     /// channel's heap allocation, which might or might not be lock-free.
247     pub fn send(self, message: T) -> Result<(), SendError<T>> {
248         let channel_ptr = self.channel_ptr;
250         // Don't run our Drop implementation if send was called, any cleanup now happens here
251         mem::forget(self);
253         // SAFETY: The channel exists on the heap for the entire duration of this method and we
254         // only ever acquire shared references to it. Note that if the receiver disconnects it
255         // does not free the channel.
256         let channel = unsafe { channel_ptr.as_ref() };
258         // Write the message into the channel on the heap.
259         // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE
260         // state, and since we're responsible for setting that state, we can guarantee that we have
261         // exclusive access to this memory location to perform this write.
262         unsafe { channel.write_message(message) };
264         // Set the state to signal there is a message on the channel.
265         // ORDERING: we use release ordering to ensure the write of the message is visible to the
266         // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state,
267         // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization
268         // independent of this operation.
269         //
270         // EMPTY + 1 = MESSAGE
271         // RECEIVING + 1 = UNPARKING
272         // DISCONNECTED + 1 = invalid, however this state is never observed
273         match channel.state.fetch_add(1, Release) {
274             // The receiver is alive and has not started waiting. Send done.
275             EMPTY => Ok(()),
276             // The receiver is waiting. Wake it up so it can return the message.
277             RECEIVING => {
278                 // ORDERING: Synchronizes with the write of the waker to memory, and prevents the
279                 // taking of the waker from being ordered before this operation.
280                 fence(Acquire);
282                 // Take the waker, but critically do not unpark it. If we unparked now, then the
283                 // receiving thread could still observe the UNPARKING state and re-park, meaning
284                 // that after we change to the MESSAGE state, it would remain parked indefinitely
285                 // or until a spurious wakeup.
286                 // SAFETY: at this point we are in the UNPARKING state, and the receiving thread
287                 // does not access the waker while in this state, nor does it free the channel
288                 // allocation in this state.
289                 let waker = unsafe { channel.take_waker() };
291                 // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load
292                 // in the receiving thread, ensuring that both our read of the waker and write of
293                 // the message happen-before the taking of the message and freeing of the channel.
294                 // Furthermore, we need acquire ordering to ensure the unparking of the receiver
295                 // happens after the channel state is updated.
296                 channel.state.swap(MESSAGE, AcqRel);
298                 // Note: it is possible that between the store above and this statement that
299                 // the receiving thread is spuriously unparked, takes the message, and frees
300                 // the channel allocation. However, we took ownership of the channel out of
301                 // that allocation, and freeing the channel does not drop the waker since the
302                 // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of
303                 // whether or not the receive has completed by this point.
304                 waker.unpark();
306                 Ok(())
307             }
308             // The receiver was already dropped. The error is responsible for freeing the channel.
309             // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so
310             // we can transfer exclusive ownership of the channel's resources to the error.
311             // Moreover, since we just placed the message in the channel, the channel contains a
312             // valid message.
313             DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }),
314             _ => unreachable!(),
315         }
316     }
319 impl<T> Drop for Sender<T> {
320     fn drop(&mut self) {
321         // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or
322         // DISCONNECTED states. If we are in the MESSAGE state, then we called
323         // mem::forget(self), so we should not be in this function call. If we are in the
324         // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is
325         // unreachable, or was dropped and observed that our side was still alive, and thus didn't
326         // free the channel.
327         let channel = unsafe { self.channel_ptr.as_ref() };
329         // Set the channel state to disconnected and read what state the receiver was in
330         // ORDERING: we don't need release ordering here since there are no modifications we
331         // need to make visible to other thread, and the Err(RECEIVING) branch handles
332         // synchronization independent of this cmpxchg
333         //
334         // EMPTY ^ 001 = DISCONNECTED
335         // RECEIVING ^ 001 = UNPARKING
336         // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed
337         match channel.state.fetch_xor(0b001, Relaxed) {
338             // The receiver has not started waiting, nor is it dropped.
339             EMPTY => (),
340             // The receiver is waiting. Wake it up so it can detect that the channel disconnected.
341             RECEIVING => {
342                 // See comments in Sender::send
344                 fence(Acquire);
346                 let waker = unsafe { channel.take_waker() };
348                 // We still need release ordering here to make sure our read of the waker happens
349                 // before this, and acquire ordering to ensure the unparking of the receiver
350                 // happens after this.
351                 channel.state.swap(DISCONNECTED, AcqRel);
353                 // The Acquire ordering above ensures that the write of the DISCONNECTED state
354                 // happens-before unparking the receiver.
355                 waker.unpark();
356             }
357             // The receiver was already dropped. We are responsible for freeing the channel.
358             DISCONNECTED => {
359                 // SAFETY: when the receiver switches the state to DISCONNECTED they have received
360                 // the message or will no longer be trying to receive the message, and have
361                 // observed that the sender is still alive, meaning that we're responsible for
362                 // freeing the channel allocation.
363                 unsafe { dealloc(self.channel_ptr) };
364             }
365             _ => unreachable!(),
366         }
367     }
370 impl<T> Receiver<T> {
371     /// Checks if there is a message in the channel without blocking. Returns:
372     ///  * `Ok(message)` if there was a message in the channel.
373     ///  * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message.
374     ///  * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the
375     ///    message has already been extracted by a previous receive call.
376     ///
377     /// If a message is returned, the channel is disconnected and any subsequent receive operation
378     /// using this receiver will return an error.
379     ///
380     /// This method is completely lock-free and wait-free. The only thing it does is an atomic
381     /// integer load of the channel state. And if there is a message in the channel it additionally
382     /// performs one atomic integer store and copies the message from the heap to the stack for
383     /// returning it.
384     pub fn try_recv(&self) -> Result<T, TryRecvError> {
385         // SAFETY: The channel will not be freed while this method is still running.
386         let channel = unsafe { self.channel_ptr.as_ref() };
388         // ORDERING: we use acquire ordering to synchronize with the store of the message.
389         match channel.state.load(Acquire) {
390             MESSAGE => {
391                 // It's okay to break up the load and store since once we're in the message state
392                 // the sender no longer modifies the state
393                 // ORDERING: at this point the sender has done its job and is no longer active, so
394                 // we don't need to make any side effects visible to it
395                 channel.state.store(DISCONNECTED, Relaxed);
397                 // SAFETY: we are in the MESSAGE state so the message is present
398                 Ok(unsafe { channel.take_message() })
399             }
400             EMPTY => Err(TryRecvError::Empty),
401             DISCONNECTED => Err(TryRecvError::Disconnected),
402             #[cfg(feature = "async")]
403             RECEIVING | UNPARKING => Err(TryRecvError::Empty),
404             _ => unreachable!(),
405         }
406     }
408     /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
409     /// disconnected.
410     ///
411     /// This method will always block the current thread if there is no data available and it is
412     /// still possible for the message to be sent. Once the message is sent to the corresponding
413     /// [`Sender`], then this receiver will wake up and return that message.
414     ///
415     /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while
416     /// this call is blocking, this call will wake up and return `Err` to indicate that the message
417     /// can never be received on this channel.
418     ///
419     /// If a sent message has already been extracted from this channel this method will return an
420     /// error.
421     ///
422     /// # Panics
423     ///
424     /// Panics if called after this receiver has been polled asynchronously.
425     #[cfg(feature = "std")]
426     pub fn recv(self) -> Result<T, RecvError> {
427         // Note that we don't need to worry about changing the state to disconnected or setting the
428         // state to an invalid value at any point in this function because we take ownership of
429         // self, and this function does not exit until the message has been received or both side
430         // of the channel are inactive and cleaned up.
432         let channel_ptr = self.channel_ptr;
434         // Don't run our Drop implementation if we are receiving consuming ourselves.
435         mem::forget(self);
437         // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
438         // is still alive, meaning that even if the sender was dropped then it would have observed
439         // the fact that we're still alive and left the responsibility of deallocating the
440         // channel to us, so channel_ptr is valid
441         let channel = unsafe { channel_ptr.as_ref() };
443         // ORDERING: we use acquire ordering to synchronize with the write of the message in the
444         // case that it's available
445         match channel.state.load(Acquire) {
446             // The sender is alive but has not sent anything yet. We prepare to park.
447             EMPTY => {
448                 // Conditionally add a delay here to help the tests trigger the edge cases where
449                 // the sender manages to be dropped or send something before we are able to store
450                 // our waker object in the channel.
451                 #[cfg(oneshot_test_delay)]
452                 std::thread::sleep(std::time::Duration::from_millis(10));
454                 // Write our waker instance to the channel.
455                 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
456                 // try to access the waker until it sees the state set to RECEIVING below
457                 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
459                 // Switch the state to RECEIVING. We need to do this in one atomic step in case the
460                 // sender disconnected or sent the message while we wrote the waker to memory. We
461                 // don't need to do a compare exchange here however because if the original state
462                 // was not EMPTY, then the sender has either finished sending the message or is
463                 // being dropped, so the RECEIVING state will never be observed after we return.
464                 // ORDERING: we use release ordering so the sender can synchronize with our writing
465                 // of the waker to memory. The individual branches handle any additional
466                 // synchronizaton
467                 match channel.state.swap(RECEIVING, Release) {
468                     // We stored our waker, now we park until the sender has changed the state
469                     EMPTY => loop {
470                         thread::park();
472                         // ORDERING: synchronize with the write of the message
473                         match channel.state.load(Acquire) {
474                             // The sender sent the message while we were parked.
475                             MESSAGE => {
476                                 // SAFETY: we are in the message state so the message is valid
477                                 let message = unsafe { channel.take_message() };
479                                 // SAFETY: the Sender delegates the responsibility of deallocating
480                                 // the channel to us upon sending the message
481                                 unsafe { dealloc(channel_ptr) };
483                                 break Ok(message);
484                             }
485                             // The sender was dropped while we were parked.
486                             DISCONNECTED => {
487                                 // SAFETY: the Sender doesn't deallocate the channel allocation in
488                                 // its drop implementation if we're receiving
489                                 unsafe { dealloc(channel_ptr) };
491                                 break Err(RecvError);
492                             }
493                             // State did not change, spurious wakeup, park again.
494                             RECEIVING | UNPARKING => (),
495                             _ => unreachable!(),
496                         }
497                     },
498                     // The sender sent the message while we prepared to park.
499                     MESSAGE => {
500                         // ORDERING: Synchronize with the write of the message. This branch is
501                         // unlikely to be taken, so it's likely more efficient to use a fence here
502                         // instead of AcqRel ordering on the RMW operation
503                         fence(Acquire);
505                         // SAFETY: we started in the empty state and the sender switched us to the
506                         // message state. This means that it did not take the waker, so we're
507                         // responsible for dropping it.
508                         unsafe { channel.drop_waker() };
510                         // SAFETY: we are in the message state so the message is valid
511                         let message = unsafe { channel.take_message() };
513                         // SAFETY: the Sender delegates the responsibility of deallocating the
514                         // channel to us upon sending the message
515                         unsafe { dealloc(channel_ptr) };
517                         Ok(message)
518                     }
519                     // The sender was dropped before sending anything while we prepared to park.
520                     DISCONNECTED => {
521                         // SAFETY: we started in the empty state and the sender switched us to the
522                         // disconnected state. It does not take the waker when it does this so we
523                         // need to drop it.
524                         unsafe { channel.drop_waker() };
526                         // SAFETY: the sender does not deallocate the channel if it switches from
527                         // empty to disconnected so we need to free the allocation
528                         unsafe { dealloc(channel_ptr) };
530                         Err(RecvError)
531                     }
532                     _ => unreachable!(),
533                 }
534             }
535             // The sender already sent the message.
536             MESSAGE => {
537                 // SAFETY: we are in the message state so the message is valid
538                 let message = unsafe { channel.take_message() };
540                 // SAFETY: we are already in the message state so the sender has been forgotten
541                 // and it's our job to clean up resources
542                 unsafe { dealloc(channel_ptr) };
544                 Ok(message)
545             }
546             // The sender was dropped before sending anything, or we already received the message.
547             DISCONNECTED => {
548                 // SAFETY: the sender does not deallocate the channel if it switches from empty to
549                 // disconnected so we need to free the allocation
550                 unsafe { dealloc(channel_ptr) };
552                 Err(RecvError)
553             }
554             // The receiver must have been `Future::poll`ed prior to this call.
555             #[cfg(feature = "async")]
556             RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
557             _ => unreachable!(),
558         }
559     }
561     /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is
562     /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit
563     /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver.
564     ///
565     /// If a message is returned, the channel is disconnected and any subsequent receive operation
566     /// using this receiver will return an error.
567     ///
568     /// # Panics
569     ///
570     /// Panics if called after this receiver has been polled asynchronously.
571     #[cfg(feature = "std")]
572     pub fn recv_ref(&self) -> Result<T, RecvError> {
573         self.start_recv_ref(RecvError, |channel| {
574             loop {
575                 thread::park();
577                 // ORDERING: we use acquire ordering to synchronize with the write of the message
578                 match channel.state.load(Acquire) {
579                     // The sender sent the message while we were parked.
580                     // We take the message and mark the channel disconnected.
581                     MESSAGE => {
582                         // ORDERING: the sender is inactive at this point so we don't need to make
583                         // any reads or writes visible to the sending thread
584                         channel.state.store(DISCONNECTED, Relaxed);
586                         // SAFETY: we were just in the message state so the message is valid
587                         break Ok(unsafe { channel.take_message() });
588                     }
589                     // The sender was dropped while we were parked.
590                     DISCONNECTED => break Err(RecvError),
591                     // State did not change, spurious wakeup, park again.
592                     RECEIVING | UNPARKING => (),
593                     _ => unreachable!(),
594                 }
595             }
596         })
597     }
599     /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns:
600     ///  * `Ok(message)` if there was a message in the channel before the timeout was reached.
601     ///  * `Err(Timeout)` if no message arrived on the channel before the timeout was reached.
602     ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
603     ///    has already been extracted by a previous receive call.
604     ///
605     /// If a message is returned, the channel is disconnected and any subsequent receive operation
606     /// using this receiver will return an error.
607     ///
608     /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point
609     /// in the future this falls back to an indefinitely blocking receive operation.
610     ///
611     /// # Panics
612     ///
613     /// Panics if called after this receiver has been polled asynchronously.
614     #[cfg(feature = "std")]
615     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
616         match Instant::now().checked_add(timeout) {
617             Some(deadline) => self.recv_deadline(deadline),
618             None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected),
619         }
620     }
622     /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns:
623     ///  * `Ok(message)` if there was a message in the channel before the deadline was reached.
624     ///  * `Err(Timeout)` if no message arrived on the channel before the deadline was reached.
625     ///  * `Err(Disconnected)` if the sender was dropped before sending anything or if the message
626     ///    has already been extracted by a previous receive call.
627     ///
628     /// If a message is returned, the channel is disconnected and any subsequent receive operation
629     /// using this receiver will return an error.
630     ///
631     /// # Panics
632     ///
633     /// Panics if called after this receiver has been polled asynchronously.
634     #[cfg(feature = "std")]
635     pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
636         /// # Safety
637         ///
638         /// If the sender is unparking us after a message send, the message must already have been
639         /// written to the channel and an acquire memory barrier issued before calling this function
640         #[cold]
641         unsafe fn wait_for_unpark<T>(channel: &Channel<T>) -> Result<T, RecvTimeoutError> {
642             loop {
643                 thread::park();
645                 // ORDERING: The callee has already synchronized with any message write
646                 match channel.state.load(Relaxed) {
647                     MESSAGE => {
648                         // ORDERING: the sender has been dropped, so this update only
649                         // needs to be visible to us
650                         channel.state.store(DISCONNECTED, Relaxed);
651                         break Ok(channel.take_message());
652                     }
653                     DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
654                     // The sender is still unparking us. We continue on the empty state here since
655                     // the current implementation eagerly sets the state to EMPTY upon timeout.
656                     EMPTY => (),
657                     _ => unreachable!(),
658                 }
659             }
660         }
662         self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| {
663             loop {
664                 match deadline.checked_duration_since(Instant::now()) {
665                     Some(timeout) => {
666                         thread::park_timeout(timeout);
668                         // ORDERING: synchronize with the write of the message
669                         match channel.state.load(Acquire) {
670                             // The sender sent the message while we were parked.
671                             MESSAGE => {
672                                 // ORDERING: the sender has been `mem::forget`-ed so this update
673                                 // only needs to be visible to us.
674                                 channel.state.store(DISCONNECTED, Relaxed);
676                                 // SAFETY: we either are in the message state or were just in the
677                                 // message state
678                                 break Ok(unsafe { channel.take_message() });
679                             }
680                             // The sender was dropped while we were parked.
681                             DISCONNECTED => break Err(RecvTimeoutError::Disconnected),
682                             // State did not change, spurious wakeup, park again.
683                             RECEIVING | UNPARKING => (),
684                             _ => unreachable!(),
685                         }
686                     }
687                     None => {
688                         // ORDERING: synchronize with the write of the message
689                         match channel.state.swap(EMPTY, Acquire) {
690                             // We reached the end of the timeout without receiving a message
691                             RECEIVING => {
692                                 // SAFETY: we were in the receiving state and are now in the empty
693                                 // state, so the sender has not and will not try to read the waker,
694                                 // so we have exclusive access to drop it.
695                                 unsafe { channel.drop_waker() };
697                                 break Err(RecvTimeoutError::Timeout);
698                             }
699                             // The sender sent the message while we were parked.
700                             MESSAGE => {
701                                 // Same safety and ordering as the Some branch
703                                 channel.state.store(DISCONNECTED, Relaxed);
704                                 break Ok(unsafe { channel.take_message() });
705                             }
706                             // The sender was dropped while we were parked.
707                             DISCONNECTED => {
708                                 // ORDERING: we were originally in the disconnected state meaning
709                                 // that the sender is inactive and no longer observing the state,
710                                 // so we only need to change it back to DISCONNECTED for if the
711                                 // receiver is dropped or a recv* method is called again
712                                 channel.state.store(DISCONNECTED, Relaxed);
714                                 break Err(RecvTimeoutError::Disconnected);
715                             }
716                             // The sender sent the message and started unparking us
717                             UNPARKING => {
718                                 // We were in the UNPARKING state and are now in the EMPTY state.
719                                 // We wait to be properly unparked and to observe if the sender
720                                 // sets MESSAGE or DISCONNECTED state.
721                                 // SAFETY: The load above has synchronized with any message write.
722                                 break unsafe { wait_for_unpark(channel) };
723                             }
724                             _ => unreachable!(),
725                         }
726                     }
727                 }
728             }
729         })
730     }
732     /// Begins the process of receiving on the channel by reference. If the message is already
733     /// ready, or the sender has disconnected, then this function will return the appropriate
734     /// Result immediately. Otherwise, it will write the waker to memory, check to see if the
735     /// sender has finished or disconnected again, and then will call `finish`. `finish` is
736     /// thus responsible for cleaning up the channel's resources appropriately before it returns,
737     /// such as destroying the waker, for instance.
738     #[cfg(feature = "std")]
739     #[inline]
740     fn start_recv_ref<E>(
741         &self,
742         disconnected_error: E,
743         finish: impl FnOnce(&Channel<T>) -> Result<T, E>,
744     ) -> Result<T, E> {
745         // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
746         // is still alive, meaning that even if the sender was dropped then it would have observed
747         // the fact that we're still alive and left the responsibility of deallocating the
748         // channel to us, so `self.channel` is valid
749         let channel = unsafe { self.channel_ptr.as_ref() };
751         // ORDERING: synchronize with the write of the message
752         match channel.state.load(Acquire) {
753             // The sender is alive but has not sent anything yet. We prepare to park.
754             EMPTY => {
755                 // Conditionally add a delay here to help the tests trigger the edge cases where
756                 // the sender manages to be dropped or send something before we are able to store
757                 // our waker object in the channel.
758                 #[cfg(oneshot_test_delay)]
759                 std::thread::sleep(std::time::Duration::from_millis(10));
761                 // Write our waker instance to the channel.
762                 // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
763                 // try to access the waker until it sees the state set to RECEIVING below
764                 unsafe { channel.write_waker(ReceiverWaker::current_thread()) };
766                 // ORDERING: we use release ordering on success so the sender can synchronize with
767                 // our write of the waker. We use relaxed ordering on failure since the sender does
768                 // not need to synchronize with our write and the individual match arms handle any
769                 // additional synchronization
770                 match channel
771                     .state
772                     .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
773                 {
774                     // We stored our waker, now we delegate to the callback to finish the receive
775                     // operation
776                     Ok(_) => finish(channel),
777                     // The sender sent the message while we prepared to finish
778                     Err(MESSAGE) => {
779                         // See comments in `recv` for ordering and safety
781                         fence(Acquire);
783                         unsafe { channel.drop_waker() };
785                         // ORDERING: the sender has been `mem::forget`-ed so this update only
786                         // needs to be visible to us
787                         channel.state.store(DISCONNECTED, Relaxed);
789                         // SAFETY: The MESSAGE state tells us there is a correctly initialized
790                         // message
791                         Ok(unsafe { channel.take_message() })
792                     }
793                     // The sender was dropped before sending anything while we prepared to park.
794                     Err(DISCONNECTED) => {
795                         // See comments in `recv` for safety
796                         unsafe { channel.drop_waker() };
797                         Err(disconnected_error)
798                     }
799                     _ => unreachable!(),
800                 }
801             }
802             // The sender sent the message. We take the message and mark the channel disconnected.
803             MESSAGE => {
804                 // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be
805                 // visible to us
806                 channel.state.store(DISCONNECTED, Relaxed);
808                 // SAFETY: we are in the message state so the message is valid
809                 Ok(unsafe { channel.take_message() })
810             }
811             // The sender was dropped before sending anything, or we already received the message.
812             DISCONNECTED => Err(disconnected_error),
813             // The receiver must have been `Future::poll`ed prior to this call.
814             #[cfg(feature = "async")]
815             RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR),
816             _ => unreachable!(),
817         }
818     }
821 #[cfg(feature = "async")]
822 impl<T> core::future::Future for Receiver<T> {
823     type Output = Result<T, RecvError>;
825     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
826         // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver
827         // is still alive, meaning that even if the sender was dropped then it would have observed
828         // the fact that we're still alive and left the responsibility of deallocating the
829         // channel to us, so `self.channel` is valid
830         let channel = unsafe { self.channel_ptr.as_ref() };
832         // ORDERING: we use acquire ordering to synchronize with the store of the message.
833         match channel.state.load(Acquire) {
834             // The sender is alive but has not sent anything yet.
835             EMPTY => {
836                 // SAFETY: We can't be in the forbidden states, and no waker in the channel.
837                 unsafe { channel.write_async_waker(cx) }
838             }
839             // We were polled again while waiting for the sender. Replace the waker with the new one.
840             RECEIVING => {
841                 // ORDERING: We use relaxed ordering on both success and failure since we have not
842                 // written anything above that must be released, and the individual match arms
843                 // handle any additional synchronization.
844                 match channel
845                     .state
846                     .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed)
847                 {
848                     // We successfully changed the state back to EMPTY. Replace the waker.
849                     // This is the most likely branch to be taken, which is why we don't use any
850                     // memory barriers in the compare_exchange above.
851                     Ok(_) => {
852                         // SAFETY: We wrote the waker in a previous call to poll. We do not need
853                         // a memory barrier since the previous write here was by ourselves.
854                         unsafe { channel.drop_waker() };
855                         // SAFETY: We can't be in the forbidden states, and no waker in the channel.
856                         unsafe { channel.write_async_waker(cx) }
857                     }
858                     // The sender sent the message while we prepared to replace the waker.
859                     // We take the message and mark the channel disconnected.
860                     // The sender has already taken the waker.
861                     Err(MESSAGE) => {
862                         // ORDERING: Synchronize with the write of the message. This branch is
863                         // unlikely to be taken.
864                         channel.state.swap(DISCONNECTED, Acquire);
865                         // SAFETY: The state tells us the sender has initialized the message.
866                         Poll::Ready(Ok(unsafe { channel.take_message() }))
867                     }
868                     // The sender was dropped before sending anything while we prepared to park.
869                     // The sender has taken the waker already.
870                     Err(DISCONNECTED) => Poll::Ready(Err(RecvError)),
871                     // The sender is currently waking us up.
872                     Err(UNPARKING) => {
873                         // We can't trust that the old waker that the sender has access to
874                         // is honored by the async runtime at this point. So we wake ourselves
875                         // up to get polled instantly again.
876                         cx.waker().wake_by_ref();
877                         Poll::Pending
878                     }
879                     _ => unreachable!(),
880                 }
881             }
882             // The sender sent the message.
883             MESSAGE => {
884                 // ORDERING: the sender has been dropped so this update only needs to be
885                 // visible to us
886                 channel.state.store(DISCONNECTED, Relaxed);
887                 Poll::Ready(Ok(unsafe { channel.take_message() }))
888             }
889             // The sender was dropped before sending anything, or we already received the message.
890             DISCONNECTED => Poll::Ready(Err(RecvError)),
891             // The sender has observed the RECEIVING state and is currently reading the waker from
892             // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED
893             // state. We busy loop here since we know the sender is done very soon.
894             UNPARKING => loop {
895                 hint::spin_loop();
896                 // ORDERING: The load above has already synchronized with the write of the message.
897                 match channel.state.load(Relaxed) {
898                     MESSAGE => {
899                         // ORDERING: the sender has been dropped, so this update only
900                         // needs to be visible to us
901                         channel.state.store(DISCONNECTED, Relaxed);
902                         // SAFETY: We observed the MESSAGE state
903                         break Poll::Ready(Ok(unsafe { channel.take_message() }));
904                     }
905                     DISCONNECTED => break Poll::Ready(Err(RecvError)),
906                     UNPARKING => (),
907                     _ => unreachable!(),
908                 }
909             },
910             _ => unreachable!(),
911         }
912     }
915 impl<T> Drop for Receiver<T> {
916     fn drop(&mut self) {
917         // SAFETY: since the receiving side is still alive the sender would have observed that and
918         // left deallocating the channel allocation to us.
919         let channel = unsafe { self.channel_ptr.as_ref() };
921         // Set the channel state to disconnected and read what state the receiver was in
922         match channel.state.swap(DISCONNECTED, Acquire) {
923             // The sender has not sent anything, nor is it dropped.
924             EMPTY => (),
925             // The sender already sent something. We must drop it, and free the channel.
926             MESSAGE => {
927                 // SAFETY: we are in the message state so the message is initialized
928                 unsafe { channel.drop_message() };
930                 // SAFETY: see safety comment at top of function
931                 unsafe { dealloc(self.channel_ptr) };
932             }
933             // The receiver has been polled.
934             #[cfg(feature = "async")]
935             RECEIVING => {
936                 // TODO: figure this out when async is fixed
937                 unsafe { channel.drop_waker() };
938             }
939             // The sender was already dropped. We are responsible for freeing the channel.
940             DISCONNECTED => {
941                 // SAFETY: see safety comment at top of function
942                 unsafe { dealloc(self.channel_ptr) };
943             }
944             _ => unreachable!(),
945         }
946     }
949 /// All the values that the `Channel::state` field can have during the lifetime of a channel.
950 mod states {
951     // These values are very explicitly chosen so that we can replace some cmpxchg calls with
952     // fetch_* calls.
954     /// The initial channel state. Active while both endpoints are still alive, no message has been
955     /// sent, and the receiver is not receiving.
956     pub const EMPTY: u8 = 0b011;
957     /// A message has been sent to the channel, but the receiver has not yet read it.
958     pub const MESSAGE: u8 = 0b100;
959     /// No message has yet been sent on the channel, but the receiver is currently receiving.
960     pub const RECEIVING: u8 = 0b000;
961     #[cfg(any(feature = "std", feature = "async"))]
962     pub const UNPARKING: u8 = 0b001;
963     /// The channel has been closed. This means that either the sender or receiver has been dropped,
964     /// or the message sent to the channel has already been received. Since this is a oneshot
965     /// channel, it is disconnected after the one message it is supposed to hold has been
966     /// transmitted.
967     pub const DISCONNECTED: u8 = 0b010;
969 use states::*;
971 /// Internal channel data structure structure. the `channel` method allocates and puts one instance
972 /// of this struct on the heap for each oneshot channel instance. The struct holds:
973 /// * The current state of the channel.
974 /// * The message in the channel. This memory is uninitialized until the message is sent.
975 /// * The waker instance for the thread or task that is currently receiving on this channel.
976 ///   This memory is uninitialized until the receiver starts receiving.
977 struct Channel<T> {
978     state: AtomicU8,
979     message: UnsafeCell<MaybeUninit<T>>,
980     waker: UnsafeCell<MaybeUninit<ReceiverWaker>>,
983 impl<T> Channel<T> {
984     pub fn new() -> Self {
985         Self {
986             state: AtomicU8::new(EMPTY),
987             message: UnsafeCell::new(MaybeUninit::uninit()),
988             waker: UnsafeCell::new(MaybeUninit::uninit()),
989         }
990     }
992     #[inline(always)]
993     unsafe fn message(&self) -> &MaybeUninit<T> {
994         #[cfg(loom)]
995         {
996             self.message.with(|ptr| &*ptr)
997         }
999         #[cfg(not(loom))]
1000         {
1001             &*self.message.get()
1002         }
1003     }
1005     #[inline(always)]
1006     unsafe fn with_message_mut<F>(&self, op: F)
1007     where
1008         F: FnOnce(&mut MaybeUninit<T>),
1009     {
1010         #[cfg(loom)]
1011         {
1012             self.message.with_mut(|ptr| op(&mut *ptr))
1013         }
1015         #[cfg(not(loom))]
1016         {
1017             op(&mut *self.message.get())
1018         }
1019     }
1021     #[inline(always)]
1022     #[cfg(any(feature = "std", feature = "async"))]
1023     unsafe fn with_waker_mut<F>(&self, op: F)
1024     where
1025         F: FnOnce(&mut MaybeUninit<ReceiverWaker>),
1026     {
1027         #[cfg(loom)]
1028         {
1029             self.waker.with_mut(|ptr| op(&mut *ptr))
1030         }
1032         #[cfg(not(loom))]
1033         {
1034             op(&mut *self.waker.get())
1035         }
1036     }
1038     #[inline(always)]
1039     unsafe fn write_message(&self, message: T) {
1040         self.with_message_mut(|slot| slot.as_mut_ptr().write(message));
1041     }
1043     #[inline(always)]
1044     unsafe fn take_message(&self) -> T {
1045         #[cfg(loom)]
1046         {
1047             self.message.with(|ptr| ptr::read(ptr)).assume_init()
1048         }
1050         #[cfg(not(loom))]
1051         {
1052             ptr::read(self.message.get()).assume_init()
1053         }
1054     }
1056     #[inline(always)]
1057     unsafe fn drop_message(&self) {
1058         self.with_message_mut(|slot| slot.assume_init_drop());
1059     }
1061     #[cfg(any(feature = "std", feature = "async"))]
1062     #[inline(always)]
1063     unsafe fn write_waker(&self, waker: ReceiverWaker) {
1064         self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker));
1065     }
1067     #[inline(always)]
1068     unsafe fn take_waker(&self) -> ReceiverWaker {
1069         #[cfg(loom)]
1070         {
1071             self.waker.with(|ptr| ptr::read(ptr)).assume_init()
1072         }
1074         #[cfg(not(loom))]
1075         {
1076             ptr::read(self.waker.get()).assume_init()
1077         }
1078     }
1080     #[cfg(any(feature = "std", feature = "async"))]
1081     #[inline(always)]
1082     unsafe fn drop_waker(&self) {
1083         self.with_waker_mut(|slot| slot.assume_init_drop());
1084     }
1086     /// # Safety
1087     ///
1088     /// * `Channel::waker` must not have a waker stored in it when calling this method.
1089     /// * Channel state must not be RECEIVING or UNPARKING when calling this method.
1090     #[cfg(feature = "async")]
1091     unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll<Result<T, RecvError>> {
1092         // Write our thread instance to the channel.
1093         // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not
1094         // try to access the waker until it sees the state set to RECEIVING below
1095         self.write_waker(ReceiverWaker::task_waker(cx));
1097         // ORDERING: we use release ordering on success so the sender can synchronize with
1098         // our write of the waker. We use relaxed ordering on failure since the sender does
1099         // not need to synchronize with our write and the individual match arms handle any
1100         // additional synchronization
1101         match self
1102             .state
1103             .compare_exchange(EMPTY, RECEIVING, Release, Relaxed)
1104         {
1105             // We stored our waker, now we return and let the sender wake us up
1106             Ok(_) => Poll::Pending,
1107             // The sender sent the message while we prepared to park.
1108             // We take the message and mark the channel disconnected.
1109             Err(MESSAGE) => {
1110                 // ORDERING: Synchronize with the write of the message. This branch is
1111                 // unlikely to be taken, so it's likely more efficient to use a fence here
1112                 // instead of AcqRel ordering on the compare_exchange operation
1113                 fence(Acquire);
1115                 // SAFETY: we started in the EMPTY state and the sender switched us to the
1116                 // MESSAGE state. This means that it did not take the waker, so we're
1117                 // responsible for dropping it.
1118                 self.drop_waker();
1120                 // ORDERING: sender does not exist, so this update only needs to be visible to us
1121                 self.state.store(DISCONNECTED, Relaxed);
1123                 // SAFETY: The MESSAGE state tells us there is a correctly initialized message
1124                 Poll::Ready(Ok(self.take_message()))
1125             }
1126             // The sender was dropped before sending anything while we prepared to park.
1127             Err(DISCONNECTED) => {
1128                 // SAFETY: we started in the EMPTY state and the sender switched us to the
1129                 // DISCONNECTED state. This means that it did not take the waker, so we're
1130                 // responsible for dropping it.
1131                 self.drop_waker();
1132                 Poll::Ready(Err(RecvError))
1133             }
1134             _ => unreachable!(),
1135         }
1136     }
1139 enum ReceiverWaker {
1140     /// The receiver is waiting synchronously. Its thread is parked.
1141     #[cfg(feature = "std")]
1142     Thread(thread::Thread),
1143     /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`.
1144     #[cfg(feature = "async")]
1145     Task(task::Waker),
1146     /// A little hack to not make this enum an uninhibitable type when no features are enabled.
1147     #[cfg(not(any(feature = "async", feature = "std")))]
1148     _Uninhabited,
1151 impl ReceiverWaker {
1152     #[cfg(feature = "std")]
1153     pub fn current_thread() -> Self {
1154         Self::Thread(thread::current())
1155     }
1157     #[cfg(feature = "async")]
1158     pub fn task_waker(cx: &task::Context<'_>) -> Self {
1159         Self::Task(cx.waker().clone())
1160     }
1162     pub fn unpark(self) {
1163         match self {
1164             #[cfg(feature = "std")]
1165             ReceiverWaker::Thread(thread) => thread.unpark(),
1166             #[cfg(feature = "async")]
1167             ReceiverWaker::Task(waker) => waker.wake(),
1168             #[cfg(not(any(feature = "async", feature = "std")))]
1169             ReceiverWaker::_Uninhabited => unreachable!(),
1170         }
1171     }
1174 #[cfg(not(loom))]
1175 #[test]
1176 fn receiver_waker_size() {
1177     let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) {
1178         (false, false) => 0,
1179         (false, true) => 16,
1180         (true, false) => 8,
1181         (true, true) => 24,
1182     };
1183     assert_eq!(mem::size_of::<ReceiverWaker>(), expected);
1186 #[cfg(all(feature = "std", feature = "async"))]
1187 const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str =
1188     "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled";
1190 #[inline]
1191 pub(crate) unsafe fn dealloc<T>(channel: NonNull<Channel<T>>) {
1192     drop(Box::from_raw(channel.as_ptr()))