Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/vfs-notify/src/lib.rs')
-rw-r--r--crates/vfs-notify/src/lib.rs211
1 files changed, 142 insertions, 69 deletions
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs
index 7e0f9af7af..0ae8b7baf4 100644
--- a/crates/vfs-notify/src/lib.rs
+++ b/crates/vfs-notify/src/lib.rs
@@ -10,12 +10,15 @@
use std::{
fs,
path::{Component, Path},
+ sync::atomic::AtomicUsize,
};
-use crossbeam_channel::{never, select, unbounded, Receiver, Sender};
-use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
-use paths::{AbsPath, AbsPathBuf};
-use vfs::loader;
+use crossbeam_channel::{select, unbounded, Receiver, Sender};
+use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
+use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
+use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
+use rustc_hash::FxHashSet;
+use vfs::loader::{self, LoadingProgress};
use walkdir::WalkDir;
#[derive(Debug)]
@@ -59,7 +62,8 @@ type NotifyEvent = notify::Result<notify::Event>;
struct NotifyActor {
sender: loader::Sender,
- watched_entries: Vec<loader::Entry>,
+ watched_file_entries: FxHashSet<AbsPathBuf>,
+ watched_dir_entries: Vec<loader::Directories>,
// Drop order is significant.
watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
}
@@ -72,14 +76,22 @@ enum Event {
impl NotifyActor {
fn new(sender: loader::Sender) -> NotifyActor {
- NotifyActor { sender, watched_entries: Vec::new(), watcher: None }
+ NotifyActor {
+ sender,
+ watched_dir_entries: Vec::new(),
+ watched_file_entries: FxHashSet::default(),
+ watcher: None,
+ }
}
fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
- let watcher_receiver = self.watcher.as_ref().map(|(_, receiver)| receiver);
+ let Some((_, watcher_receiver)) = &self.watcher else {
+ return receiver.recv().ok().map(Event::Message);
+ };
+
select! {
recv(receiver) -> it => it.ok().map(Event::Message),
- recv(watcher_receiver.unwrap_or(&never())) -> it => Some(Event::NotifyEvent(it.unwrap())),
+ recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
}
}
@@ -94,7 +106,10 @@ impl NotifyActor {
let (watcher_sender, watcher_receiver) = unbounded();
let watcher = log_notify_error(RecommendedWatcher::new(
move |event| {
- watcher_sender.send(event).unwrap();
+ // we don't care about the error. If sending fails that usually
+ // means we were dropped, so unwrapping will just add to the
+ // panic noise.
+ _ = watcher_sender.send(event);
},
Config::default(),
));
@@ -104,35 +119,74 @@ impl NotifyActor {
let config_version = config.version;
let n_total = config.load.len();
+ self.watched_dir_entries.clear();
+ self.watched_file_entries.clear();
+
self.send(loader::Message::Progress {
n_total,
- n_done: None,
+ n_done: LoadingProgress::Started,
config_version,
dir: None,
});
- self.watched_entries.clear();
+ let (entry_tx, entry_rx) = unbounded();
+ let (watch_tx, watch_rx) = unbounded();
+ let processed = AtomicUsize::new(0);
- for (i, entry) in config.load.into_iter().enumerate() {
- let watch = config.watch.contains(&i);
- if watch {
- self.watched_entries.push(entry.clone());
+ config.load.into_par_iter().enumerate().for_each(|(i, entry)| {
+ let do_watch = config.watch.contains(&i);
+ if do_watch {
+ _ = entry_tx.send(entry.clone());
}
- let files =
- self.load_entry(entry, watch, |file| loader::Message::Progress {
- n_total,
- n_done: Some(i),
- dir: Some(file),
- config_version,
- });
+ let files = Self::load_entry(
+ |f| _ = watch_tx.send(f.to_owned()),
+ entry,
+ do_watch,
+ |file| {
+ self.send(loader::Message::Progress {
+ n_total,
+ n_done: LoadingProgress::Progress(
+ processed.load(std::sync::atomic::Ordering::Relaxed),
+ ),
+ dir: Some(file),
+ config_version,
+ });
+ },
+ );
self.send(loader::Message::Loaded { files });
self.send(loader::Message::Progress {
n_total,
- n_done: Some(i + 1),
+ n_done: LoadingProgress::Progress(
+ processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
+ ),
config_version,
dir: None,
});
+ });
+
+ drop(watch_tx);
+ for path in watch_rx {
+ self.watch(&path);
}
+
+ drop(entry_tx);
+ for entry in entry_rx {
+ match entry {
+ loader::Entry::Files(files) => {
+ self.watched_file_entries.extend(files)
+ }
+ loader::Entry::Directories(dir) => {
+ self.watched_dir_entries.push(dir)
+ }
+ }
+ }
+
+ self.send(loader::Message::Progress {
+ n_total,
+ n_done: LoadingProgress::Finished,
+ config_version,
+ dir: None,
+ });
}
Message::Invalidate(path) => {
let contents = read(path.as_path());
@@ -142,55 +196,69 @@ impl NotifyActor {
},
Event::NotifyEvent(event) => {
if let Some(event) = log_notify_error(event) {
- let files = event
- .paths
- .into_iter()
- .map(|path| AbsPathBuf::try_from(path).unwrap())
- .filter_map(|path| {
- let meta = fs::metadata(&path).ok()?;
- if meta.file_type().is_dir()
- && self
- .watched_entries
- .iter()
- .any(|entry| entry.contains_dir(&path))
- {
- self.watch(path);
- return None;
- }
+ if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
+ event.kind
+ {
+ let files = event
+ .paths
+ .into_iter()
+ .filter_map(|path| {
+ Some(
+ AbsPathBuf::try_from(
+ Utf8PathBuf::from_path_buf(path).ok()?,
+ )
+ .expect("path is absolute"),
+ )
+ })
+ .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
+ let meta = fs::metadata(&path).ok()?;
+ if meta.file_type().is_dir()
+ && self
+ .watched_dir_entries
+ .iter()
+ .any(|dir| dir.contains_dir(&path))
+ {
+ self.watch(path.as_ref());
+ return None;
+ }
- if !meta.file_type().is_file() {
- return None;
- }
- if !self
- .watched_entries
- .iter()
- .any(|entry| entry.contains_file(&path))
- {
- return None;
- }
+ if !meta.file_type().is_file() {
+ return None;
+ }
- let contents = read(&path);
- Some((path, contents))
- })
- .collect();
- self.send(loader::Message::Changed { files });
+ if !(self.watched_file_entries.contains(&path)
+ || self
+ .watched_dir_entries
+ .iter()
+ .any(|dir| dir.contains_file(&path)))
+ {
+ return None;
+ }
+
+ let contents = read(&path);
+ Some((path, contents))
+ })
+ .collect();
+ self.send(loader::Message::Changed { files });
+ }
}
}
}
}
}
+
fn load_entry(
- &mut self,
+ mut watch: impl FnMut(&Path),
entry: loader::Entry,
- watch: bool,
- make_message: impl Fn(AbsPathBuf) -> loader::Message,
+ do_watch: bool,
+ send_message: impl Fn(AbsPathBuf),
) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
match entry {
loader::Entry::Files(files) => files
.into_iter()
.map(|file| {
- if watch {
- self.watch(file.clone());
+ if do_watch {
+ watch(file.as_ref());
}
let contents = read(file.as_path());
(file, contents)
@@ -200,7 +268,7 @@ impl NotifyActor {
let mut res = Vec::new();
for root in &dirs.include {
- self.send(make_message(root.clone()));
+ send_message(root.clone());
let walkdir =
WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
if !entry.file_type().is_dir() {
@@ -208,7 +276,7 @@ impl NotifyActor {
}
let path = entry.path();
- if path_is_parent_symlink(path) {
+ if path_might_be_cyclic(path) {
return false;
}
@@ -220,12 +288,15 @@ impl NotifyActor {
let depth = entry.depth();
let is_dir = entry.file_type().is_dir();
let is_file = entry.file_type().is_file();
- let abs_path = AbsPathBuf::try_from(entry.into_path()).ok()?;
+ let abs_path = AbsPathBuf::try_from(
+ Utf8PathBuf::from_path_buf(entry.into_path()).ok()?,
+ )
+ .ok()?;
if depth < 2 && is_dir {
- self.send(make_message(abs_path.clone()));
+ send_message(abs_path.clone());
}
- if is_dir && watch {
- self.watch(abs_path.clone());
+ if is_dir && do_watch {
+ watch(abs_path.as_ref());
}
if !is_file {
return None;
@@ -247,13 +318,15 @@ impl NotifyActor {
}
}
- fn watch(&mut self, path: AbsPathBuf) {
+ fn watch(&mut self, path: &Path) {
if let Some((watcher, _)) = &mut self.watcher {
- log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive));
+ log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
}
}
- fn send(&mut self, msg: loader::Message) {
- (self.sender)(msg);
+
+ #[track_caller]
+ fn send(&self, msg: loader::Message) {
+ self.sender.send(msg).unwrap();
}
}
@@ -271,7 +344,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
/// heuristic is not sufficient to catch all symlink cycles (it's
/// possible to construct cycle using two or more symlinks), but it
/// catches common cases.
-fn path_is_parent_symlink(path: &Path) -> bool {
+fn path_might_be_cyclic(path: &Path) -> bool {
let Ok(destination) = std::fs::read_link(path) else {
return false;
};