Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-term/src/job.rs')
-rw-r--r--helix-term/src/job.rs62
1 files changed, 14 insertions, 48 deletions
diff --git a/helix-term/src/job.rs b/helix-term/src/job.rs
index 72ed892d..2888b6eb 100644
--- a/helix-term/src/job.rs
+++ b/helix-term/src/job.rs
@@ -1,40 +1,13 @@
-use helix_event::status::StatusMessage;
-use helix_event::{runtime_local, send_blocking};
use helix_view::Editor;
-use once_cell::sync::OnceCell;
use crate::compositor::Compositor;
use futures_util::future::{BoxFuture, Future, FutureExt};
use futures_util::stream::{FuturesUnordered, StreamExt};
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-
-pub type EditorCompositorCallback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
-pub type EditorCallback = Box<dyn FnOnce(&mut Editor) + Send>;
-
-runtime_local! {
- static JOB_QUEUE: OnceCell<Sender<Callback>> = OnceCell::new();
-}
-
-pub async fn dispatch_callback(job: Callback) {
- let _ = JOB_QUEUE.wait().send(job).await;
-}
-
-pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) {
- let _ = JOB_QUEUE
- .wait()
- .send(Callback::EditorCompositor(Box::new(job)))
- .await;
-}
-
-pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) {
- let jobs = JOB_QUEUE.wait();
- send_blocking(jobs, Callback::EditorCompositor(Box::new(job)))
-}
pub enum Callback {
- EditorCompositor(EditorCompositorCallback),
- Editor(EditorCallback),
+ EditorCompositor(Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>),
+ Editor(Box<dyn FnOnce(&mut Editor) + Send>),
}
pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>;
@@ -45,11 +18,11 @@ pub struct Job {
pub wait: bool,
}
+#[derive(Default)]
pub struct Jobs {
- /// jobs that need to complete before we exit.
+ pub futures: FuturesUnordered<JobFuture>,
+ /// These are the ones that need to complete before we exit.
pub wait_futures: FuturesUnordered<JobFuture>,
- pub callbacks: Receiver<Callback>,
- pub status_messages: Receiver<StatusMessage>,
}
impl Job {
@@ -76,16 +49,8 @@ impl Job {
}
impl Jobs {
- #[allow(clippy::new_without_default)]
pub fn new() -> Self {
- let (tx, rx) = channel(1024);
- let _ = JOB_QUEUE.set(tx);
- let status_messages = helix_event::status::setup();
- Self {
- wait_futures: FuturesUnordered::new(),
- callbacks: rx,
- status_messages,
- }
+ Self::default()
}
pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
@@ -117,17 +82,18 @@ impl Jobs {
}
}
+ pub async fn next_job(&mut self) -> Option<anyhow::Result<Option<Callback>>> {
+ tokio::select! {
+ event = self.futures.next() => { event }
+ event = self.wait_futures.next() => { event }
+ }
+ }
+
pub fn add(&self, j: Job) {
if j.wait {
self.wait_futures.push(j.future);
} else {
- tokio::spawn(async move {
- match j.future.await {
- Ok(Some(cb)) => dispatch_callback(cb).await,
- Ok(None) => (),
- Err(err) => helix_event::status::report(err).await,
- }
- });
+ self.futures.push(j.future);
}
}