Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-term/src/job.rs')
| -rw-r--r-- | helix-term/src/job.rs | 62 |
1 files changed, 14 insertions, 48 deletions
diff --git a/helix-term/src/job.rs b/helix-term/src/job.rs index 72ed892d..2888b6eb 100644 --- a/helix-term/src/job.rs +++ b/helix-term/src/job.rs @@ -1,40 +1,13 @@ -use helix_event::status::StatusMessage; -use helix_event::{runtime_local, send_blocking}; use helix_view::Editor; -use once_cell::sync::OnceCell; use crate::compositor::Compositor; use futures_util::future::{BoxFuture, Future, FutureExt}; use futures_util::stream::{FuturesUnordered, StreamExt}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; - -pub type EditorCompositorCallback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>; -pub type EditorCallback = Box<dyn FnOnce(&mut Editor) + Send>; - -runtime_local! { - static JOB_QUEUE: OnceCell<Sender<Callback>> = OnceCell::new(); -} - -pub async fn dispatch_callback(job: Callback) { - let _ = JOB_QUEUE.wait().send(job).await; -} - -pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { - let _ = JOB_QUEUE - .wait() - .send(Callback::EditorCompositor(Box::new(job))) - .await; -} - -pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) { - let jobs = JOB_QUEUE.wait(); - send_blocking(jobs, Callback::EditorCompositor(Box::new(job))) -} pub enum Callback { - EditorCompositor(EditorCompositorCallback), - Editor(EditorCallback), + EditorCompositor(Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>), + Editor(Box<dyn FnOnce(&mut Editor) + Send>), } pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>; @@ -45,11 +18,11 @@ pub struct Job { pub wait: bool, } +#[derive(Default)] pub struct Jobs { - /// jobs that need to complete before we exit. + pub futures: FuturesUnordered<JobFuture>, + /// These are the ones that need to complete before we exit. pub wait_futures: FuturesUnordered<JobFuture>, - pub callbacks: Receiver<Callback>, - pub status_messages: Receiver<StatusMessage>, } impl Job { @@ -76,16 +49,8 @@ impl Job { } impl Jobs { - #[allow(clippy::new_without_default)] pub fn new() -> Self { - let (tx, rx) = channel(1024); - let _ = JOB_QUEUE.set(tx); - let status_messages = helix_event::status::setup(); - Self { - wait_futures: FuturesUnordered::new(), - callbacks: rx, - status_messages, - } + Self::default() } pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) { @@ -117,17 +82,18 @@ impl Jobs { } } + pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> { + tokio::select! { + event = self.futures.next() => { event } + event = self.wait_futures.next() => { event } + } + } + pub fn add(&self, j: Job) { if j.wait { self.wait_futures.push(j.future); } else { - tokio::spawn(async move { - match j.future.await { - Ok(Some(cb)) => dispatch_callback(cb).await, - Ok(None) => (), - Err(err) => helix_event::status::report(err).await, - } - }); + self.futures.push(j.future); } } |