Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-term/src/job.rs')
-rw-r--r--helix-term/src/job.rs171
1 files changed, 0 insertions, 171 deletions
diff --git a/helix-term/src/job.rs b/helix-term/src/job.rs
deleted file mode 100644
index 72ed892d..00000000
--- a/helix-term/src/job.rs
+++ /dev/null
@@ -1,171 +0,0 @@
-use helix_event::status::StatusMessage;
-use helix_event::{runtime_local, send_blocking};
-use helix_view::Editor;
-use once_cell::sync::OnceCell;
-
-use crate::compositor::Compositor;
-
-use futures_util::future::{BoxFuture, Future, FutureExt};
-use futures_util::stream::{FuturesUnordered, StreamExt};
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-
-pub type EditorCompositorCallback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
-pub type EditorCallback = Box<dyn FnOnce(&mut Editor) + Send>;
-
-runtime_local! {
- static JOB_QUEUE: OnceCell<Sender<Callback>> = OnceCell::new();
-}
-
-pub async fn dispatch_callback(job: Callback) {
- let _ = JOB_QUEUE.wait().send(job).await;
-}
-
-pub async fn dispatch(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) {
- let _ = JOB_QUEUE
- .wait()
- .send(Callback::EditorCompositor(Box::new(job)))
- .await;
-}
-
-pub fn dispatch_blocking(job: impl FnOnce(&mut Editor, &mut Compositor) + Send + 'static) {
- let jobs = JOB_QUEUE.wait();
- send_blocking(jobs, Callback::EditorCompositor(Box::new(job)))
-}
-
-pub enum Callback {
- EditorCompositor(EditorCompositorCallback),
- Editor(EditorCallback),
-}
-
-pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>;
-
-pub struct Job {
- pub future: BoxFuture<'static, anyhow::Result<Option<Callback>>>,
- /// Do we need to wait for this job to finish before exiting?
- pub wait: bool,
-}
-
-pub struct Jobs {
- /// jobs that need to complete before we exit.
- pub wait_futures: FuturesUnordered<JobFuture>,
- pub callbacks: Receiver<Callback>,
- pub status_messages: Receiver<StatusMessage>,
-}
-
-impl Job {
- pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Self {
- Self {
- future: f.map(|r| r.map(|()| None)).boxed(),
- wait: false,
- }
- }
-
- pub fn with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
- f: F,
- ) -> Self {
- Self {
- future: f.map(|r| r.map(Some)).boxed(),
- wait: false,
- }
- }
-
- pub fn wait_before_exiting(mut self) -> Self {
- self.wait = true;
- self
- }
-}
-
-impl Jobs {
- #[allow(clippy::new_without_default)]
- pub fn new() -> Self {
- let (tx, rx) = channel(1024);
- let _ = JOB_QUEUE.set(tx);
- let status_messages = helix_event::status::setup();
- Self {
- wait_futures: FuturesUnordered::new(),
- callbacks: rx,
- status_messages,
- }
- }
-
- pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
- self.add(Job::new(f));
- }
-
- pub fn callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
- &mut self,
- f: F,
- ) {
- self.add(Job::with_callback(f));
- }
-
- pub fn handle_callback(
- &self,
- editor: &mut Editor,
- compositor: &mut Compositor,
- call: anyhow::Result<Option<Callback>>,
- ) {
- match call {
- Ok(None) => {}
- Ok(Some(call)) => match call {
- Callback::EditorCompositor(call) => call(editor, compositor),
- Callback::Editor(call) => call(editor),
- },
- Err(e) => {
- editor.set_error(format!("Async job failed: {}", e));
- }
- }
- }
-
- pub fn add(&self, j: Job) {
- if j.wait {
- self.wait_futures.push(j.future);
- } else {
- tokio::spawn(async move {
- match j.future.await {
- Ok(Some(cb)) => dispatch_callback(cb).await,
- Ok(None) => (),
- Err(err) => helix_event::status::report(err).await,
- }
- });
- }
- }
-
- /// Blocks until all the jobs that need to be waited on are done.
- pub async fn finish(
- &mut self,
- editor: &mut Editor,
- mut compositor: Option<&mut Compositor>,
- ) -> anyhow::Result<()> {
- log::debug!("waiting on jobs...");
- let mut wait_futures = std::mem::take(&mut self.wait_futures);
-
- while let (Some(job), tail) = wait_futures.into_future().await {
- match job {
- Ok(callback) => {
- wait_futures = tail;
-
- if let Some(callback) = callback {
- // clippy doesn't realize this is an error without the derefs
- #[allow(clippy::needless_option_as_deref)]
- match callback {
- Callback::EditorCompositor(call) if compositor.is_some() => {
- call(editor, compositor.as_deref_mut().unwrap())
- }
- Callback::Editor(call) => call(editor),
-
- // skip callbacks for which we don't have the necessary references
- _ => (),
- }
- }
- }
- Err(e) => {
- self.wait_futures = tail;
- return Err(e);
- }
- }
- }
-
- Ok(())
- }
-}