Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/vfs-notify/src/lib.rs')
-rw-r--r--crates/vfs-notify/src/lib.rs193
1 files changed, 123 insertions, 70 deletions
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs
index a87b75e68f..d0d3a84446 100644
--- a/crates/vfs-notify/src/lib.rs
+++ b/crates/vfs-notify/src/lib.rs
@@ -10,12 +10,15 @@
use std::{
fs,
path::{Component, Path},
+ sync::atomic::AtomicUsize,
};
use crossbeam_channel::{never, select, unbounded, Receiver, Sender};
-use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
+use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
-use vfs::loader;
+use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
+use rustc_hash::FxHashSet;
+use vfs::loader::{self, LoadingProgress};
use walkdir::WalkDir;
#[derive(Debug)]
@@ -59,7 +62,8 @@ type NotifyEvent = notify::Result<notify::Event>;
struct NotifyActor {
sender: loader::Sender,
- watched_entries: Vec<loader::Entry>,
+ watched_file_entries: FxHashSet<AbsPathBuf>,
+ watched_dir_entries: Vec<loader::Directories>,
// Drop order is significant.
watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
}
@@ -72,7 +76,12 @@ enum Event {
impl NotifyActor {
fn new(sender: loader::Sender) -> NotifyActor {
- NotifyActor { sender, watched_entries: Vec::new(), watcher: None }
+ NotifyActor {
+ sender,
+ watched_dir_entries: Vec::new(),
+ watched_file_entries: FxHashSet::default(),
+ watcher: None,
+ }
}
fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
@@ -104,35 +113,69 @@ impl NotifyActor {
let config_version = config.version;
let n_total = config.load.len();
- self.send(loader::Message::Progress {
+ self.watched_dir_entries.clear();
+ self.watched_file_entries.clear();
+
+ let send = |msg| (self.sender)(msg);
+ send(loader::Message::Progress {
n_total,
- n_done: None,
+ n_done: LoadingProgress::Started,
config_version,
dir: None,
});
- self.watched_entries.clear();
-
- for (i, entry) in config.load.into_iter().enumerate() {
- let watch = config.watch.contains(&i);
- if watch {
- self.watched_entries.push(entry.clone());
+ let (entry_tx, entry_rx) = unbounded();
+ let (watch_tx, watch_rx) = unbounded();
+ let processed = AtomicUsize::new(0);
+ config.load.into_par_iter().enumerate().for_each(move |(i, entry)| {
+ let do_watch = config.watch.contains(&i);
+ if do_watch {
+ _ = entry_tx.send(entry.clone());
}
- let files =
- self.load_entry(entry, watch, |file| loader::Message::Progress {
- n_total,
- n_done: Some(i),
- dir: Some(file),
- config_version,
- });
- self.send(loader::Message::Loaded { files });
- self.send(loader::Message::Progress {
+ let files = Self::load_entry(
+ |f| _ = watch_tx.send(f.to_owned()),
+ entry,
+ do_watch,
+ |file| {
+ send(loader::Message::Progress {
+ n_total,
+ n_done: LoadingProgress::Progress(
+ processed.load(std::sync::atomic::Ordering::Relaxed),
+ ),
+ dir: Some(file),
+ config_version,
+ })
+ },
+ );
+ send(loader::Message::Loaded { files });
+ send(loader::Message::Progress {
n_total,
- n_done: Some(i + 1),
+ n_done: LoadingProgress::Progress(
+ processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
+ ),
config_version,
dir: None,
});
+ });
+ for path in watch_rx {
+ self.watch(&path);
}
+ for entry in entry_rx {
+ match entry {
+ loader::Entry::Files(files) => {
+ self.watched_file_entries.extend(files)
+ }
+ loader::Entry::Directories(dir) => {
+ self.watched_dir_entries.push(dir)
+ }
+ }
+ }
+ self.send(loader::Message::Progress {
+ n_total,
+ n_done: LoadingProgress::Finished,
+ config_version,
+ dir: None,
+ });
}
Message::Invalidate(path) => {
let contents = read(path.as_path());
@@ -142,60 +185,69 @@ impl NotifyActor {
},
Event::NotifyEvent(event) => {
if let Some(event) = log_notify_error(event) {
- let files = event
- .paths
- .into_iter()
- .filter_map(|path| {
- Some(
- AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?)
+ if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
+ event.kind
+ {
+ let files = event
+ .paths
+ .into_iter()
+ .filter_map(|path| {
+ Some(
+ AbsPathBuf::try_from(
+ Utf8PathBuf::from_path_buf(path).ok()?,
+ )
.expect("path is absolute"),
- )
- })
- .filter_map(|path| {
- let meta = fs::metadata(&path).ok()?;
- if meta.file_type().is_dir()
- && self
- .watched_entries
- .iter()
- .any(|entry| entry.contains_dir(&path))
- {
- self.watch(path);
- return None;
- }
+ )
+ })
+ .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
+ let meta = fs::metadata(&path).ok()?;
+ if meta.file_type().is_dir()
+ && self
+ .watched_dir_entries
+ .iter()
+ .any(|dir| dir.contains_dir(&path))
+ {
+ self.watch(path.as_ref());
+ return None;
+ }
- if !meta.file_type().is_file() {
- return None;
- }
- if !self
- .watched_entries
- .iter()
- .any(|entry| entry.contains_file(&path))
- {
- return None;
- }
+ if !meta.file_type().is_file() {
+ return None;
+ }
- let contents = read(&path);
- Some((path, contents))
- })
- .collect();
- self.send(loader::Message::Changed { files });
+ if !(self.watched_file_entries.contains(&path)
+ || self
+ .watched_dir_entries
+ .iter()
+ .any(|dir| dir.contains_file(&path)))
+ {
+ return None;
+ }
+
+ let contents = read(&path);
+ Some((path, contents))
+ })
+ .collect();
+ self.send(loader::Message::Changed { files });
+ }
}
}
}
}
}
+
fn load_entry(
- &mut self,
+ mut watch: impl FnMut(&Path),
entry: loader::Entry,
- watch: bool,
- make_message: impl Fn(AbsPathBuf) -> loader::Message,
+ do_watch: bool,
+ send_message: impl Fn(AbsPathBuf),
) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
match entry {
loader::Entry::Files(files) => files
.into_iter()
.map(|file| {
- if watch {
- self.watch(file.clone());
+ if do_watch {
+ watch(file.as_ref());
}
let contents = read(file.as_path());
(file, contents)
@@ -205,7 +257,7 @@ impl NotifyActor {
let mut res = Vec::new();
for root in &dirs.include {
- self.send(make_message(root.clone()));
+ send_message(root.clone());
let walkdir =
WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
if !entry.file_type().is_dir() {
@@ -213,7 +265,7 @@ impl NotifyActor {
}
let path = entry.path();
- if path_is_parent_symlink(path) {
+ if path_might_be_cyclic(path) {
return false;
}
@@ -230,10 +282,10 @@ impl NotifyActor {
)
.ok()?;
if depth < 2 && is_dir {
- self.send(make_message(abs_path.clone()));
+ send_message(abs_path.clone());
}
- if is_dir && watch {
- self.watch(abs_path.clone());
+ if is_dir && do_watch {
+ watch(abs_path.as_ref());
}
if !is_file {
return None;
@@ -255,12 +307,13 @@ impl NotifyActor {
}
}
- fn watch(&mut self, path: AbsPathBuf) {
+ fn watch(&mut self, path: &Path) {
if let Some((watcher, _)) = &mut self.watcher {
- log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive));
+ log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
}
}
- fn send(&mut self, msg: loader::Message) {
+
+ fn send(&self, msg: loader::Message) {
(self.sender)(msg);
}
}
@@ -279,7 +332,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
/// heuristic is not sufficient to catch all symlink cycles (it's
/// possible to construct cycle using two or more symlinks), but it
/// catches common cases.
-fn path_is_parent_symlink(path: &Path) -> bool {
+fn path_might_be_cyclic(path: &Path) -> bool {
let Ok(destination) = std::fs::read_link(path) else {
return false;
};