Unnamed repository; edit this file 'description' to name the repository.
perf: Request cancellation while processing changed files
Lukas Wirth 12 months ago
parent d7e977a · commit 8d9b318
-rw-r--r--Cargo.lock1
-rw-r--r--crates/rust-analyzer/src/global_state.rs163
-rw-r--r--crates/stdx/Cargo.toml1
-rw-r--r--crates/stdx/src/thread/pool.rs51
4 files changed, 136 insertions, 80 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8d6c8284e4..34469656dc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2228,6 +2228,7 @@ version = "0.0.0"
dependencies = [
"backtrace",
"crossbeam-channel",
+ "crossbeam-utils",
"itertools 0.14.0",
"jod-thread",
"libc",
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index 3b3b9c8797..1a31525b46 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -3,7 +3,7 @@
//!
//! Each tick provides an immutable snapshot of the state as `WorldSnapshot`.
-use std::{ops::Not as _, time::Instant};
+use std::{ops::Not as _, panic::AssertUnwindSafe, time::Instant};
use crossbeam_channel::{Receiver, Sender, unbounded};
use hir::ChangeWithProcMacros;
@@ -19,6 +19,7 @@ use parking_lot::{
use proc_macro_api::ProcMacroClient;
use project_model::{ManifestPath, ProjectWorkspace, ProjectWorkspaceKind, WorkspaceBuildScripts};
use rustc_hash::{FxHashMap, FxHashSet};
+use stdx::thread;
use tracing::{Level, span, trace};
use triomphe::Arc;
use vfs::{AbsPathBuf, AnchoredPathBuf, ChangeKind, Vfs, VfsPath};
@@ -78,6 +79,7 @@ pub(crate) struct GlobalState {
pub(crate) task_pool: Handle<TaskPool<Task>, Receiver<Task>>,
pub(crate) fmt_pool: Handle<TaskPool<Task>, Receiver<Task>>,
+ pub(crate) cancellation_pool: thread::Pool,
pub(crate) config: Arc<Config>,
pub(crate) config_errors: Option<ConfigErrors>,
@@ -210,6 +212,7 @@ impl GlobalState {
let handle = TaskPool::new_with_threads(sender, 1);
Handle { handle, receiver }
};
+ let cancellation_pool = thread::Pool::new(1);
let task_queue = {
let (sender, receiver) = unbounded();
@@ -230,6 +233,7 @@ impl GlobalState {
req_queue: ReqQueue::default(),
task_pool,
fmt_pool,
+ cancellation_pool,
loader,
config: Arc::new(config.clone()),
analysis_host,
@@ -290,7 +294,6 @@ impl GlobalState {
pub(crate) fn process_changes(&mut self) -> bool {
let _p = span!(Level::INFO, "GlobalState::process_changes").entered();
-
// We cannot directly resolve a change in a ratoml file to a format
// that can be used by the config module because config talks
// in `SourceRootId`s instead of `FileId`s and `FileId` -> `SourceRootId`
@@ -298,66 +301,75 @@ impl GlobalState {
let mut modified_ratoml_files: FxHashMap<FileId, (ChangeKind, vfs::VfsPath)> =
FxHashMap::default();
- let (change, modified_rust_files, workspace_structure_change) = {
- let mut change = ChangeWithProcMacros::default();
- let mut guard = self.vfs.write();
- let changed_files = guard.0.take_changes();
- if changed_files.is_empty() {
- return false;
- }
+ let mut change = ChangeWithProcMacros::default();
+ let mut guard = self.vfs.write();
+ let changed_files = guard.0.take_changes();
+ if changed_files.is_empty() {
+ return false;
+ }
- // downgrade to read lock to allow more readers while we are normalizing text
- let guard = RwLockWriteGuard::downgrade_to_upgradable(guard);
- let vfs: &Vfs = &guard.0;
-
- let mut workspace_structure_change = None;
- // A file was added or deleted
- let mut has_structure_changes = false;
- let mut bytes = vec![];
- let mut modified_rust_files = vec![];
- for file in changed_files.into_values() {
- let vfs_path = vfs.file_path(file.file_id);
- if let Some(("rust-analyzer", Some("toml"))) = vfs_path.name_and_extension() {
- // Remember ids to use them after `apply_changes`
- modified_ratoml_files.insert(file.file_id, (file.kind(), vfs_path.clone()));
- }
+ let (change, modified_rust_files, workspace_structure_change) =
+ self.cancellation_pool.scoped(|s| {
+ // start cancellation in parallel, this will kick off lru eviction
+ // allowing us to do meaningful work while waiting
+ let analysis_host = AssertUnwindSafe(&mut self.analysis_host);
+ s.spawn(thread::ThreadIntent::LatencySensitive, || {
+ { analysis_host }.0.request_cancellation()
+ });
+
+ // downgrade to read lock to allow more readers while we are normalizing text
+ let guard = RwLockWriteGuard::downgrade_to_upgradable(guard);
+ let vfs: &Vfs = &guard.0;
+
+ let mut workspace_structure_change = None;
+ // A file was added or deleted
+ let mut has_structure_changes = false;
+ let mut bytes = vec![];
+ let mut modified_rust_files = vec![];
+ for file in changed_files.into_values() {
+ let vfs_path = vfs.file_path(file.file_id);
+ if let Some(("rust-analyzer", Some("toml"))) = vfs_path.name_and_extension() {
+ // Remember ids to use them after `apply_changes`
+ modified_ratoml_files.insert(file.file_id, (file.kind(), vfs_path.clone()));
+ }
- if let Some(path) = vfs_path.as_path() {
- has_structure_changes |= file.is_created_or_deleted();
+ if let Some(path) = vfs_path.as_path() {
+ has_structure_changes |= file.is_created_or_deleted();
- if file.is_modified() && path.extension() == Some("rs") {
- modified_rust_files.push(file.file_id);
- }
+ if file.is_modified() && path.extension() == Some("rs") {
+ modified_rust_files.push(file.file_id);
+ }
- let additional_files = self
- .config
- .discover_workspace_config()
- .map(|cfg| {
- cfg.files_to_watch.iter().map(String::as_str).collect::<Vec<&str>>()
- })
- .unwrap_or_default();
-
- let path = path.to_path_buf();
- if file.is_created_or_deleted() {
- workspace_structure_change.get_or_insert((path, false)).1 |=
- self.crate_graph_file_dependencies.contains(vfs_path);
- } else if reload::should_refresh_for_change(
- &path,
- file.kind(),
- &additional_files,
- ) {
- trace!(?path, kind = ?file.kind(), "refreshing for a change");
- workspace_structure_change.get_or_insert((path.clone(), false));
+ let additional_files = self
+ .config
+ .discover_workspace_config()
+ .map(|cfg| {
+ cfg.files_to_watch.iter().map(String::as_str).collect::<Vec<&str>>()
+ })
+ .unwrap_or_default();
+
+ let path = path.to_path_buf();
+ if file.is_created_or_deleted() {
+ workspace_structure_change.get_or_insert((path, false)).1 |=
+ self.crate_graph_file_dependencies.contains(vfs_path);
+ } else if reload::should_refresh_for_change(
+ &path,
+ file.kind(),
+ &additional_files,
+ ) {
+ trace!(?path, kind = ?file.kind(), "refreshing for a change");
+ workspace_structure_change.get_or_insert((path.clone(), false));
+ }
}
- }
- // Clear native diagnostics when their file gets deleted
- if !file.exists() {
- self.diagnostics.clear_native_for(file.file_id);
- }
+ // Clear native diagnostics when their file gets deleted
+ if !file.exists() {
+ self.diagnostics.clear_native_for(file.file_id);
+ }
- let text =
- if let vfs::Change::Create(v, _) | vfs::Change::Modify(v, _) = file.change {
+ let text = if let vfs::Change::Create(v, _) | vfs::Change::Modify(v, _) =
+ file.change
+ {
String::from_utf8(v).ok().map(|text| {
// FIXME: Consider doing normalization in the `vfs` instead? That allows
// getting rid of some locking
@@ -367,29 +379,28 @@ impl GlobalState {
} else {
None
};
- // delay `line_endings_map` changes until we are done normalizing the text
- // this allows delaying the re-acquisition of the write lock
- bytes.push((file.file_id, text));
- }
- let (vfs, line_endings_map) = &mut *RwLockUpgradableReadGuard::upgrade(guard);
- bytes.into_iter().for_each(|(file_id, text)| {
- let text = match text {
- None => None,
- Some((text, line_endings)) => {
- line_endings_map.insert(file_id, line_endings);
- Some(text)
- }
- };
- change.change_file(file_id, text);
+ // delay `line_endings_map` changes until we are done normalizing the text
+ // this allows delaying the re-acquisition of the write lock
+ bytes.push((file.file_id, text));
+ }
+ let (vfs, line_endings_map) = &mut *RwLockUpgradableReadGuard::upgrade(guard);
+ bytes.into_iter().for_each(|(file_id, text)| {
+ let text = match text {
+ None => None,
+ Some((text, line_endings)) => {
+ line_endings_map.insert(file_id, line_endings);
+ Some(text)
+ }
+ };
+ change.change_file(file_id, text);
+ });
+ if has_structure_changes {
+ let roots = self.source_root_config.partition(vfs);
+ change.set_roots(roots);
+ }
+ (change, modified_rust_files, workspace_structure_change)
});
- if has_structure_changes {
- let roots = self.source_root_config.partition(vfs);
- change.set_roots(roots);
- }
- (change, modified_rust_files, workspace_structure_change)
- };
- let _p = span!(Level::INFO, "GlobalState::process_changes/apply_change").entered();
self.analysis_host.apply_change(change);
if !modified_ratoml_files.is_empty()
|| !self.config.same_source_root_parent_map(&self.local_roots_parent_map)
diff --git a/crates/stdx/Cargo.toml b/crates/stdx/Cargo.toml
index 7bda106764..b37aded6f6 100644
--- a/crates/stdx/Cargo.toml
+++ b/crates/stdx/Cargo.toml
@@ -17,6 +17,7 @@ jod-thread = "1.0.0"
crossbeam-channel.workspace = true
itertools.workspace = true
tracing.workspace = true
+crossbeam-utils = "0.8.21"
# Think twice before adding anything here
[target.'cfg(unix)'.dependencies]
diff --git a/crates/stdx/src/thread/pool.rs b/crates/stdx/src/thread/pool.rs
index a8de4db624..8d76c5fd1f 100644
--- a/crates/stdx/src/thread/pool.rs
+++ b/crates/stdx/src/thread/pool.rs
@@ -8,6 +8,7 @@
//! the threading utilities in [`crate::thread`].
use std::{
+ marker::PhantomData,
panic::{self, UnwindSafe},
sync::{
Arc,
@@ -16,8 +17,9 @@ use std::{
};
use crossbeam_channel::{Receiver, Sender};
+use crossbeam_utils::sync::WaitGroup;
-use super::{Builder, JoinHandle, ThreadIntent};
+use crate::thread::{Builder, JoinHandle, ThreadIntent};
pub struct Pool {
// `_handles` is never read: the field is present
@@ -79,9 +81,6 @@ impl Pool {
Self { _handles: handles.into_boxed_slice(), extant_tasks, job_sender }
}
- /// # Panics
- ///
- /// Panics if job panics
pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
where
F: FnOnce() + Send + UnwindSafe + 'static,
@@ -97,6 +96,17 @@ impl Pool {
self.job_sender.send(job).unwrap();
}
+ pub fn scoped<'pool, 'scope, F, R>(&'pool self, f: F) -> R
+ where
+ F: FnOnce(&Scope<'pool, 'scope>) -> R,
+ {
+ let wg = WaitGroup::new();
+ let scope = Scope { pool: self, wg, _marker: PhantomData };
+ let r = f(&scope);
+ scope.wg.wait();
+ r
+ }
+
#[must_use]
pub fn len(&self) -> usize {
self.extant_tasks.load(Ordering::SeqCst)
@@ -107,3 +117,36 @@ impl Pool {
self.len() == 0
}
}
+
+pub struct Scope<'pool, 'scope> {
+ pool: &'pool Pool,
+ wg: WaitGroup,
+ _marker: PhantomData<fn(&'scope ()) -> &'scope ()>,
+}
+
+impl<'scope> Scope<'_, 'scope> {
+ pub fn spawn<F>(&self, intent: ThreadIntent, f: F)
+ where
+ F: 'scope + FnOnce() + Send + UnwindSafe,
+ {
+ let wg = self.wg.clone();
+ let f = Box::new(move || {
+ if cfg!(debug_assertions) {
+ intent.assert_is_used_on_current_thread();
+ }
+ f();
+ drop(wg);
+ });
+
+ let job = Job {
+ requested_intent: intent,
+ f: unsafe {
+ std::mem::transmute::<
+ Box<dyn 'scope + FnOnce() + Send + UnwindSafe>,
+ Box<dyn 'static + FnOnce() + Send + UnwindSafe>,
+ >(f)
+ },
+ };
+ self.pool.job_sender.send(job).unwrap();
+ }
+}