use std::collections::hash_map; use std::mem::take; use std::path::{is_separator, Path}; use std::{collections::HashMap, path::PathBuf, sync::Weak}; use globset::{Glob, GlobSet}; use helix_core::file_watcher::{EventType, Events, FileSystemDidChange}; use helix_event::register_hook; use helix_lsp_types::WatchKind; use tokio::sync::mpsc; use crate::{lsp, Client, LanguageServerId}; enum Event { /// file written by helix, special cased to not wait on FS FileWritten { path: PathBuf, }, FileWatcher(Events), Register { client: Weak, registration_id: String, options: lsp::DidChangeWatchedFilesRegistrationOptions, }, Unregister { client_id: LanguageServerId, registration_id: String, }, RemoveClient { client_id: LanguageServerId, }, } struct ClientState { client: Weak, registerations: HashMap, pending: Vec, } #[derive(Debug, Clone)] struct Interest { glob: Glob, server: LanguageServerId, id: u32, flags: WatchKind, } struct State { clients: HashMap, glob_matcher: GlobSet, interest: Vec, // used for matching to avoid reallocation candidates: Vec, } impl State { fn notify<'a>(&mut self, events: impl Iterator + Clone) { for (path, ty) in events { let (interest_kind, notification_type) = match ty { EventType::Create => (WatchKind::Create, lsp::FileChangeType::CREATED), EventType::Delete => (WatchKind::Delete, lsp::FileChangeType::DELETED), EventType::Modified => (WatchKind::Change, lsp::FileChangeType::CHANGED), EventType::Tempfile => continue, }; self.glob_matcher.matches_into(path, &mut self.candidates); for interest in self.candidates.drain(..) { let interest = &self.interest[interest]; if !interest.flags.contains(interest_kind) { continue; } let Ok(uri) = lsp::Url::from_file_path(path) else { continue; }; let event = lsp::FileEvent { uri, typ: notification_type, }; self.clients .get_mut(&interest.server) .unwrap() .pending .push(event); } } for client_state in self.clients.values_mut() { if client_state.pending.is_empty() { continue; } let Some(client) = client_state.client.upgrade() else { continue; }; log::debug!( "Sending didChangeWatchedFiles notification to client '{}'", client.name() ); client.did_change_watched_files(take(&mut client_state.pending)); } } fn purge_client(&mut self, id: LanguageServerId) { self.clients.remove(&id); let interest = self .interest .iter() .filter(|it| it.server != id) .cloned() .collect(); self.rebuild_globmatcher(interest); } fn rebuild_globmatcher(&mut self, interest: Vec) { let mut builder = GlobSet::builder(); for interest in &interest { builder.add(interest.glob.clone()); } match builder.build() { Ok(glob_matcher) => { self.glob_matcher = glob_matcher; self.interest = interest; } Err(err) => { log::error!("failde to build glob matcher for file watching: ({err})",); } } } } impl Default for State { fn default() -> State { State { clients: Default::default(), glob_matcher: Default::default(), interest: Default::default(), candidates: Vec::with_capacity(32), } } } /// The Handler uses a dedicated tokio task to respond to file change events by /// forwarding changes to LSPs that have registered for notifications with a /// matching glob. /// /// When an LSP registers for the DidChangeWatchedFiles notification, the /// Handler is notified by sending the registration details in addition to a /// weak reference to the LSP client. This is done so that the Handler can have /// access to the client without preventing the client from being dropped if it /// is closed and the Handler isn't properly notified. #[derive(Clone, Debug)] pub struct Handler { tx: mpsc::UnboundedSender, } impl Default for Handler { fn default() -> Self { Self::new() } } impl Handler { pub fn new() -> Self { let (tx, rx) = mpsc::unbounded_channel(); tokio::spawn(Self::run(rx)); let tx_ = tx.clone(); register_hook!(move |event: &mut FileSystemDidChange| { let _ = tx_.send(Event::FileWatcher(event.fs_events.clone())); Ok(()) }); Self { tx } } pub fn register( &self, client: Weak, registration_id: String, options: lsp::DidChangeWatchedFilesRegistrationOptions, ) { let _ = self.tx.send(Event::Register { client, registration_id, options, }); } pub fn unregister(&self, client_id: LanguageServerId, registration_id: String) { let _ = self.tx.send(Event::Unregister { client_id, registration_id, }); } pub fn file_changed(&self, path: PathBuf) { let _ = self.tx.send(Event::FileWritten { path }); } pub fn remove_client(&self, client_id: LanguageServerId) { let _ = self.tx.send(Event::RemoveClient { client_id }); } async fn run(mut rx: mpsc::UnboundedReceiver) { let mut state = State::default(); while let Some(event) = rx.recv().await { match event { Event::FileWatcher(events) => { let events = events .iter() .map(|event| (event.path.as_std_path(), event.ty)); state.notify(events); } Event::FileWritten { path } => { log::debug!("Received file event for {:?}", &path); state.notify([(&*path, EventType::Modified)].iter().cloned()); } Event::Register { client, registration_id, options: ops, } => { let Some(client_) = client.upgrade() else { continue; }; log::debug!( "Registering didChangeWatchedFiles for client '{}' with id '{}'", client_.name(), registration_id ); if !state .clients .get(&client_.id()) .is_some_and(|state| !state.client.ptr_eq(&client)) { state.purge_client(client_.id()); } let entry = state .clients .entry(client_.id()) .or_insert_with(|| ClientState { client: client.clone(), registerations: HashMap::with_capacity(8), pending: Vec::with_capacity(32), }); entry.client = client; let next_id = u32::try_from(entry.registerations.len()).unwrap(); let (mut interest, id) = match entry.registerations.entry(registration_id) { hash_map::Entry::Occupied(entry) => { let id = *entry.get(); let mut interest = Vec::with_capacity(state.interest.len()); interest.extend( state .interest .iter() .filter(|it| it.server != client_.id() || it.id != id) .cloned(), ); (interest, id) } hash_map::Entry::Vacant(entry) => { entry.insert(next_id); (state.interest.clone(), next_id) } }; for watcher in ops.watchers { if watcher.kind.is_some_and(|flags| flags.is_empty()) { continue; } let glob = match watcher.glob_pattern { helix_lsp_types::GlobPattern::String(pattern) => pattern, helix_lsp_types::GlobPattern::Relative(relative_pattern) => { let base_url = match relative_pattern.base_uri { helix_lsp_types::OneOf::Left(folder) => folder.uri, helix_lsp_types::OneOf::Right(url) => url, }; let Ok(mut base_dir) = base_url.to_file_path() else { log::error!( "{} provided invalid URL for watching '{base_url}'", client_.name(), ); continue; }; if let Ok(dir) = base_dir.canonicalize() { base_dir = dir } let Ok(mut base_dir) = base_dir.into_os_string().into_string() else { log::error!( "{} provided invalid URL for watching '{base_url}' (must be valid utf-8)", client_.name(), ); continue; }; if !base_dir.chars().next_back().is_some_and(is_separator) { base_dir.push('/'); } base_dir.push_str(&relative_pattern.pattern); base_dir } }; match Glob::new(&glob) { Ok(glob) => { interest.push(Interest { glob, server: client_.id(), id, flags: watcher.kind.unwrap_or(WatchKind::all()), }); } Err(err) => { log::error!( "{} provided invalid glob for watching '{glob}': ({err})", client_.name(), ); } } } state.rebuild_globmatcher(interest); } Event::Unregister { client_id, registration_id, } => { let Some(client_state) = state.clients.get_mut(&client_id) else { return; }; let Some(id) = client_state.registerations.remove(&*registration_id) else { return; }; let interest = state .interest .iter() .filter(|it| it.server != client_id || it.id != id) .cloned() .collect(); state.rebuild_globmatcher(interest); } Event::RemoveClient { client_id } => { log::debug!("Removing LSP client: {client_id}"); state.purge_client(client_id); } } } } }