Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-term/src/job.rs')
| -rw-r--r-- | helix-term/src/job.rs | 123 |
1 files changed, 26 insertions, 97 deletions
diff --git a/helix-term/src/job.rs b/helix-term/src/job.rs index 72ed892d..4fa38174 100644 --- a/helix-term/src/job.rs +++ b/helix-term/src/job.rs @@ -1,42 +1,11 @@ -use helix_event::status::StatusMessage; -use helix_event::{runtime_local, send_blocking}; use helix_view::Editor; -use once_cell::sync::OnceCell; use crate::compositor::Compositor; -use futures_util::future::{BoxFuture, Future, FutureExt}; +use futures_util::future::{self, BoxFuture, Future, FutureExt}; use futures_util::stream::{FuturesUnordered, StreamExt}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; - -pub type EditorCompositorCallback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>; -pub type EditorCallback = Box<dyn FnOnce(&mut Editor) + Send>; - -runtime_local! { - static JOB_QUEUE: OnceCell<Sender<Callback>> = OnceCell::new(); -} - -pub async fn dispatch_callback(job: Callback) { - let _ = JOB_QUEUE.wait().send(job).await; -} - -pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { - let _ = JOB_QUEUE - .wait() - .send(Callback::EditorCompositor(Box::new(job))) - .await; -} - -pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { - let jobs = JOB_QUEUE.wait(); - send_blocking(jobs, Callback::EditorCompositor(Box::new(job))) -} - -pub enum Callback { - EditorCompositor(EditorCompositorCallback), - Editor(EditorCallback), -} +pub type Callback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>; pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>; pub struct Job { @@ -45,16 +14,16 @@ pub struct Job { pub wait: bool, } +#[derive(Default)] pub struct Jobs { - /// jobs that need to complete before we exit. + pub futures: FuturesUnordered<JobFuture>, + /// These are the ones that need to complete before we exit. pub wait_futures: FuturesUnordered<JobFuture>, - pub callbacks: Receiver<Callback>, - pub status_messages: Receiver<StatusMessage>, } impl Job { - pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Self { - Self { + pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Job { + Job { future: f.map(|r| r.map(|()| None)).boxed(), wait: false, } @@ -62,30 +31,22 @@ impl Job { pub fn with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>( f: F, - ) -> Self { - Self { + ) -> Job { + Job { future: f.map(|r| r.map(Some)).boxed(), wait: false, } } - pub fn wait_before_exiting(mut self) -> Self { + pub fn wait_before_exiting(mut self) -> Job { self.wait = true; self } } impl Jobs { - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - let (tx, rx) = channel(1024); - let _ = JOB_QUEUE.set(tx); - let status_messages = helix_event::status::setup(); - Self { - wait_futures: FuturesUnordered::new(), - callbacks: rx, - status_messages, - } + pub fn new() -> Jobs { + Jobs::default() } pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) { @@ -107,65 +68,33 @@ impl Jobs { ) { match call { Ok(None) => {} - Ok(Some(call)) => match call { - Callback::EditorCompositor(call) => call(editor, compositor), - Callback::Editor(call) => call(editor), - }, + Ok(Some(call)) => { + call(editor, compositor); + } Err(e) => { editor.set_error(format!("Async job failed: {}", e)); } } } + pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> { + tokio::select! { + event = self.futures.next() => { event } + event = self.wait_futures.next() => { event } + } + } + pub fn add(&self, j: Job) { if j.wait { self.wait_futures.push(j.future); } else { - tokio::spawn(async move { - match j.future.await { - Ok(Some(cb)) => dispatch_callback(cb).await, - Ok(None) => (), - Err(err) => helix_event::status::report(err).await, - } - }); + self.futures.push(j.future); } } /// Blocks until all the jobs that need to be waited on are done. - pub async fn finish( - &mut self, - editor: &mut Editor, - mut compositor: Option<&mut Compositor>, - ) -> anyhow::Result<()> { - log::debug!("waiting on jobs..."); - let mut wait_futures = std::mem::take(&mut self.wait_futures); - - while let (Some(job), tail) = wait_futures.into_future().await { - match job { - Ok(callback) => { - wait_futures = tail; - - if let Some(callback) = callback { - // clippy doesn't realize this is an error without the derefs - #[allow(clippy::needless_option_as_deref)] - match callback { - Callback::EditorCompositor(call) if compositor.is_some() => { - call(editor, compositor.as_deref_mut().unwrap()) - } - Callback::Editor(call) => call(editor), - - // skip callbacks for which we don't have the necessary references - _ => (), - } - } - } - Err(e) => { - self.wait_futures = tail; - return Err(e); - } - } - } - - Ok(()) + pub fn finish(&mut self) { + let wait_futures = std::mem::take(&mut self.wait_futures); + helix_lsp::block_on(wait_futures.for_each(|_| future::ready(()))); } } |