Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-event/src/cancel.rs')
| -rw-r--r-- | helix-event/src/cancel.rs | 282 |
1 files changed, 7 insertions, 275 deletions
diff --git a/helix-event/src/cancel.rs b/helix-event/src/cancel.rs index 2029c945..f027be80 100644 --- a/helix-event/src/cancel.rs +++ b/helix-event/src/cancel.rs @@ -1,18 +1,15 @@ -use std::borrow::Borrow; use std::future::Future; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::Arc; -use tokio::sync::Notify; +pub use oneshot::channel as cancelation; +use tokio::sync::oneshot; -pub async fn cancelable_future<T>( - future: impl Future<Output = T>, - cancel: impl Borrow<TaskHandle>, -) -> Option<T> { +pub type CancelTx = oneshot::Sender<()>; +pub type CancelRx = oneshot::Receiver<()>; + +pub async fn cancelable_future<T>(future: impl Future<Output = T>, cancel: CancelRx) -> Option<T> { tokio::select! { biased; - _ = cancel.borrow().canceled() => { + _ = cancel => { None } res = future => { @@ -20,268 +17,3 @@ pub async fn cancelable_future<T>( } } } - -#[derive(Default, Debug)] -struct Shared { - state: AtomicU64, - // `Notify` has some features that we don't really need here because it - // supports waking single tasks (`notify_one`) and does its own (more - // complicated) state tracking, we could reimplement the waiter linked list - // with modest effort and reduce memory consumption by one word/8 bytes and - // reduce code complexity/number of atomic operations. - // - // I don't think that's worth the complexity (unsafe code). - // - // if we only cared about async code then we could also only use a notify - // (without the generation count), this would be equivalent (or maybe more - // correct if we want to allow cloning the TX) but it would be extremly slow - // to frequently check for cancelation from sync code - notify: Notify, -} - -impl Shared { - fn generation(&self) -> u32 { - self.state.load(Relaxed) as u32 - } - - fn num_running(&self) -> u32 { - (self.state.load(Relaxed) >> 32) as u32 - } - - /// Increments the generation count and sets `num_running` - /// to the provided value, this operation is not with - /// regard to the generation counter (doesn't use `fetch_add`) - /// so the calling code must ensure it cannot execute concurrently - /// to maintain correctness (but not safety) - fn inc_generation(&self, num_running: u32) -> (u32, u32) { - let state = self.state.load(Relaxed); - let generation = state as u32; - let prev_running = (state >> 32) as u32; - // no need to create a new generation if the refcount is zero (fastpath) - if prev_running == 0 && num_running == 0 { - return (generation, 0); - } - let new_generation = generation.saturating_add(1); - self.state.store( - new_generation as u64 | ((num_running as u64) << 32), - Relaxed, - ); - self.notify.notify_waiters(); - (new_generation, prev_running) - } - - fn inc_running(&self, generation: u32) { - let mut state = self.state.load(Relaxed); - loop { - let current_generation = state as u32; - if current_generation != generation { - break; - } - let off = 1 << 32; - let res = self.state.compare_exchange_weak( - state, - state.saturating_add(off), - Relaxed, - Relaxed, - ); - match res { - Ok(_) => break, - Err(new_state) => state = new_state, - } - } - } - - fn dec_running(&self, generation: u32) { - let mut state = self.state.load(Relaxed); - loop { - let current_generation = state as u32; - if current_generation != generation { - break; - } - let num_running = (state >> 32) as u32; - // running can't be zero here, that would mean we miscounted somewhere - assert_ne!(num_running, 0); - let off = 1 << 32; - let res = self - .state - .compare_exchange_weak(state, state - off, Relaxed, Relaxed); - match res { - Ok(_) => break, - Err(new_state) => state = new_state, - } - } - } -} - -// This intentionally doesn't implement `Clone` and requires a mutable reference -// for cancelation to avoid races (in inc_generation). - -/// A task controller allows managing a single subtask enabling the controller -/// to cancel the subtask and to check whether it is still running. -/// -/// For efficiency reasons the controller can be reused/restarted, -/// in that case the previous task is automatically canceled. -/// -/// If the controller is dropped, the subtasks are automatically canceled. -#[derive(Default, Debug)] -pub struct TaskController { - shared: Arc<Shared>, -} - -impl TaskController { - pub fn new() -> Self { - TaskController::default() - } - /// Cancels the active task (handle). - /// - /// Returns whether any tasks were still running before the cancelation. - pub fn cancel(&mut self) -> bool { - self.shared.inc_generation(0).1 != 0 - } - - /// Checks whether there are any task handles - /// that haven't been dropped (or canceled) yet. - pub fn is_running(&self) -> bool { - self.shared.num_running() != 0 - } - - /// Starts a new task and cancels the previous task (handles). - pub fn restart(&mut self) -> TaskHandle { - TaskHandle { - generation: self.shared.inc_generation(1).0, - shared: self.shared.clone(), - } - } -} - -impl Drop for TaskController { - fn drop(&mut self) { - self.cancel(); - } -} - -/// A handle that is used to link a task with a task controller. -/// -/// It can be used to cancel async futures very efficiently but can also be checked for -/// cancelation very quickly (single atomic read) in blocking code. -/// The handle can be cheaply cloned (reference counted). -/// -/// The TaskController can check whether a task is "running" by inspecting the -/// refcount of the (current) tasks handles. Therefore, if that information -/// is important, ensure that the handle is not dropped until the task fully -/// completes. -pub struct TaskHandle { - shared: Arc<Shared>, - generation: u32, -} - -impl Clone for TaskHandle { - fn clone(&self) -> Self { - self.shared.inc_running(self.generation); - TaskHandle { - shared: self.shared.clone(), - generation: self.generation, - } - } -} - -impl Drop for TaskHandle { - fn drop(&mut self) { - self.shared.dec_running(self.generation); - } -} - -impl TaskHandle { - /// Waits until [`TaskController::cancel`] is called for the corresponding - /// [`TaskController`]. Immediately returns if `cancel` was already called since - pub async fn canceled(&self) { - let notified = self.shared.notify.notified(); - if !self.is_canceled() { - notified.await - } - } - - pub fn is_canceled(&self) -> bool { - self.generation != self.shared.generation() - } -} - -#[cfg(test)] -mod tests { - use std::future::poll_fn; - - use futures_executor::block_on; - use tokio::task::yield_now; - - use crate::{cancelable_future, TaskController}; - - #[test] - fn immediate_cancel() { - let mut controller = TaskController::new(); - let handle = controller.restart(); - controller.cancel(); - assert!(handle.is_canceled()); - controller.restart(); - assert!(handle.is_canceled()); - - let res = block_on(cancelable_future( - poll_fn(|_cx| std::task::Poll::Ready(())), - handle, - )); - assert!(res.is_none()); - } - - #[test] - fn running_count() { - let mut controller = TaskController::new(); - let handle = controller.restart(); - assert!(controller.is_running()); - assert!(!handle.is_canceled()); - drop(handle); - assert!(!controller.is_running()); - assert!(!controller.cancel()); - let handle = controller.restart(); - assert!(!handle.is_canceled()); - assert!(controller.is_running()); - let handle2 = handle.clone(); - assert!(!handle.is_canceled()); - assert!(controller.is_running()); - drop(handle2); - assert!(!handle.is_canceled()); - assert!(controller.is_running()); - assert!(controller.cancel()); - assert!(handle.is_canceled()); - assert!(!controller.is_running()); - } - - #[test] - fn no_cancel() { - let mut controller = TaskController::new(); - let handle = controller.restart(); - assert!(!handle.is_canceled()); - - let res = block_on(cancelable_future( - poll_fn(|_cx| std::task::Poll::Ready(())), - handle, - )); - assert!(res.is_some()); - } - - #[test] - fn delayed_cancel() { - let mut controller = TaskController::new(); - let handle = controller.restart(); - - let mut hit = false; - let res = block_on(cancelable_future( - async { - controller.cancel(); - hit = true; - yield_now().await; - }, - handle, - )); - assert!(res.is_none()); - assert!(hit); - } -} |