Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/vfs-notify/src/lib.rs')
| -rw-r--r-- | crates/vfs-notify/src/lib.rs | 193 |
1 files changed, 123 insertions, 70 deletions
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index a87b75e68f..d0d3a84446 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -10,12 +10,15 @@ use std::{ fs, path::{Component, Path}, + sync::atomic::AtomicUsize, }; use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; -use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; -use vfs::loader; +use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator}; +use rustc_hash::FxHashSet; +use vfs::loader::{self, LoadingProgress}; use walkdir::WalkDir; #[derive(Debug)] @@ -59,7 +62,8 @@ type NotifyEvent = notify::Result<notify::Event>; struct NotifyActor { sender: loader::Sender, - watched_entries: Vec<loader::Entry>, + watched_file_entries: FxHashSet<AbsPathBuf>, + watched_dir_entries: Vec<loader::Directories>, // Drop order is significant. watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>, } @@ -72,7 +76,12 @@ enum Event { impl NotifyActor { fn new(sender: loader::Sender) -> NotifyActor { - NotifyActor { sender, watched_entries: Vec::new(), watcher: None } + NotifyActor { + sender, + watched_dir_entries: Vec::new(), + watched_file_entries: FxHashSet::default(), + watcher: None, + } } fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> { @@ -104,35 +113,69 @@ impl NotifyActor { let config_version = config.version; let n_total = config.load.len(); - self.send(loader::Message::Progress { + self.watched_dir_entries.clear(); + self.watched_file_entries.clear(); + + let send = |msg| (self.sender)(msg); + send(loader::Message::Progress { n_total, - n_done: None, + n_done: LoadingProgress::Started, config_version, dir: None, }); - self.watched_entries.clear(); - - for (i, entry) in config.load.into_iter().enumerate() { - let watch = config.watch.contains(&i); - if watch { - self.watched_entries.push(entry.clone()); + let (entry_tx, entry_rx) = unbounded(); + let (watch_tx, watch_rx) = unbounded(); + let processed = AtomicUsize::new(0); + config.load.into_par_iter().enumerate().for_each(move |(i, entry)| { + let do_watch = config.watch.contains(&i); + if do_watch { + _ = entry_tx.send(entry.clone()); } - let files = - self.load_entry(entry, watch, |file| loader::Message::Progress { - n_total, - n_done: Some(i), - dir: Some(file), - config_version, - }); - self.send(loader::Message::Loaded { files }); - self.send(loader::Message::Progress { + let files = Self::load_entry( + |f| _ = watch_tx.send(f.to_owned()), + entry, + do_watch, + |file| { + send(loader::Message::Progress { + n_total, + n_done: LoadingProgress::Progress( + processed.load(std::sync::atomic::Ordering::Relaxed), + ), + dir: Some(file), + config_version, + }) + }, + ); + send(loader::Message::Loaded { files }); + send(loader::Message::Progress { n_total, - n_done: Some(i + 1), + n_done: LoadingProgress::Progress( + processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1, + ), config_version, dir: None, }); + }); + for path in watch_rx { + self.watch(&path); } + for entry in entry_rx { + match entry { + loader::Entry::Files(files) => { + self.watched_file_entries.extend(files) + } + loader::Entry::Directories(dir) => { + self.watched_dir_entries.push(dir) + } + } + } + self.send(loader::Message::Progress { + n_total, + n_done: LoadingProgress::Finished, + config_version, + dir: None, + }); } Message::Invalidate(path) => { let contents = read(path.as_path()); @@ -142,60 +185,69 @@ impl NotifyActor { }, Event::NotifyEvent(event) => { if let Some(event) = log_notify_error(event) { - let files = event - .paths - .into_iter() - .filter_map(|path| { - Some( - AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?) + if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) = + event.kind + { + let files = event + .paths + .into_iter() + .filter_map(|path| { + Some( + AbsPathBuf::try_from( + Utf8PathBuf::from_path_buf(path).ok()?, + ) .expect("path is absolute"), - ) - }) - .filter_map(|path| { - let meta = fs::metadata(&path).ok()?; - if meta.file_type().is_dir() - && self - .watched_entries - .iter() - .any(|entry| entry.contains_dir(&path)) - { - self.watch(path); - return None; - } + ) + }) + .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> { + let meta = fs::metadata(&path).ok()?; + if meta.file_type().is_dir() + && self + .watched_dir_entries + .iter() + .any(|dir| dir.contains_dir(&path)) + { + self.watch(path.as_ref()); + return None; + } - if !meta.file_type().is_file() { - return None; - } - if !self - .watched_entries - .iter() - .any(|entry| entry.contains_file(&path)) - { - return None; - } + if !meta.file_type().is_file() { + return None; + } - let contents = read(&path); - Some((path, contents)) - }) - .collect(); - self.send(loader::Message::Changed { files }); + if !(self.watched_file_entries.contains(&path) + || self + .watched_dir_entries + .iter() + .any(|dir| dir.contains_file(&path))) + { + return None; + } + + let contents = read(&path); + Some((path, contents)) + }) + .collect(); + self.send(loader::Message::Changed { files }); + } } } } } } + fn load_entry( - &mut self, + mut watch: impl FnMut(&Path), entry: loader::Entry, - watch: bool, - make_message: impl Fn(AbsPathBuf) -> loader::Message, + do_watch: bool, + send_message: impl Fn(AbsPathBuf), ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> { match entry { loader::Entry::Files(files) => files .into_iter() .map(|file| { - if watch { - self.watch(file.clone()); + if do_watch { + watch(file.as_ref()); } let contents = read(file.as_path()); (file, contents) @@ -205,7 +257,7 @@ impl NotifyActor { let mut res = Vec::new(); for root in &dirs.include { - self.send(make_message(root.clone())); + send_message(root.clone()); let walkdir = WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| { if !entry.file_type().is_dir() { @@ -213,7 +265,7 @@ impl NotifyActor { } let path = entry.path(); - if path_is_parent_symlink(path) { + if path_might_be_cyclic(path) { return false; } @@ -230,10 +282,10 @@ impl NotifyActor { ) .ok()?; if depth < 2 && is_dir { - self.send(make_message(abs_path.clone())); + send_message(abs_path.clone()); } - if is_dir && watch { - self.watch(abs_path.clone()); + if is_dir && do_watch { + watch(abs_path.as_ref()); } if !is_file { return None; @@ -255,12 +307,13 @@ impl NotifyActor { } } - fn watch(&mut self, path: AbsPathBuf) { + fn watch(&mut self, path: &Path) { if let Some((watcher, _)) = &mut self.watcher { - log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)); + log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive)); } } - fn send(&mut self, msg: loader::Message) { + + fn send(&self, msg: loader::Message) { (self.sender)(msg); } } @@ -279,7 +332,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> { /// heuristic is not sufficient to catch all symlink cycles (it's /// possible to construct cycle using two or more symlinks), but it /// catches common cases. -fn path_is_parent_symlink(path: &Path) -> bool { +fn path_might_be_cyclic(path: &Path) -> bool { let Ok(destination) = std::fs::read_link(path) else { return false; }; |