Unnamed repository; edit this file 'description' to name the repository.
LSP: Remove future wrapper from `Client::notify`, `Client::reply`
Previously LSP notifications were sent within a future and most callers used a `tokio::spawn` to send the notification, silently discarding any failures like problems serializing parameters or sending on the channel. It's possible that tokio could schedule futures out of intended order though which could cause notifications where order is important, like document synchronization, to become partially shuffled. This change removes the future wrapper and logs all internal failures. Also included in this commit is the same change for `Client::reply` which was also unnecessarily wrapped in a future. Co-authored-by: Pascal Kuthe <[email protected]>
Michael Davis 2025-02-02
parent 0ea401d · commit 5532ef3
-rw-r--r--helix-lsp/src/client.rs141
-rw-r--r--helix-lsp/src/file_event.rs18
-rw-r--r--helix-lsp/src/lib.rs12
-rw-r--r--helix-term/src/application.rs14
-rw-r--r--helix-view/src/document.rs15
-rw-r--r--helix-view/src/editor.rs15
6 files changed, 93 insertions, 122 deletions
diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs
index f0e42ee4..31742804 100644
--- a/helix-lsp/src/client.rs
+++ b/helix-lsp/src/client.rs
@@ -170,7 +170,7 @@ impl Client {
// and that we can therefore reuse the client (but are done now)
return;
}
- tokio::spawn(self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new()));
+ self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new())
}
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
@@ -456,29 +456,36 @@ impl Client {
}
/// Send a RPC notification to the language server.
- pub fn notify<R: lsp::notification::Notification>(
- &self,
- params: R::Params,
- ) -> impl Future<Output = Result<()>>
+ pub fn notify<R: lsp::notification::Notification>(&self, params: R::Params)
where
R::Params: serde::Serialize,
{
let server_tx = self.server_tx.clone();
- async move {
- let params = serde_json::to_value(params)?;
-
- let notification = jsonrpc::Notification {
- jsonrpc: Some(jsonrpc::Version::V2),
- method: R::METHOD.to_string(),
- params: Self::value_into_params(params),
- };
+ let params = match serde_json::to_value(params) {
+ Ok(params) => params,
+ Err(err) => {
+ log::error!(
+ "Failed to serialize params for notification '{}' for server '{}': {err}",
+ R::METHOD,
+ self.name,
+ );
+ return;
+ }
+ };
- server_tx
- .send(Payload::Notification(notification))
- .map_err(|e| Error::Other(e.into()))?;
+ let notification = jsonrpc::Notification {
+ jsonrpc: Some(jsonrpc::Version::V2),
+ method: R::METHOD.to_string(),
+ params: Self::value_into_params(params),
+ };
- Ok(())
+ if let Err(err) = server_tx.send(Payload::Notification(notification)) {
+ log::error!(
+ "Failed to send notification '{}' to server '{}': {err}",
+ R::METHOD,
+ self.name
+ );
}
}
@@ -487,31 +494,29 @@ impl Client {
&self,
id: jsonrpc::Id,
result: core::result::Result<Value, jsonrpc::Error>,
- ) -> impl Future<Output = Result<()>> {
+ ) -> Result<()> {
use jsonrpc::{Failure, Output, Success, Version};
let server_tx = self.server_tx.clone();
- async move {
- let output = match result {
- Ok(result) => Output::Success(Success {
- jsonrpc: Some(Version::V2),
- id,
- result: serde_json::to_value(result)?,
- }),
- Err(error) => Output::Failure(Failure {
- jsonrpc: Some(Version::V2),
- id,
- error,
- }),
- };
+ let output = match result {
+ Ok(result) => Output::Success(Success {
+ jsonrpc: Some(Version::V2),
+ id,
+ result,
+ }),
+ Err(error) => Output::Failure(Failure {
+ jsonrpc: Some(Version::V2),
+ id,
+ error,
+ }),
+ };
- server_tx
- .send(Payload::Response(output))
- .map_err(|e| Error::Other(e.into()))?;
+ server_tx
+ .send(Payload::Response(output))
+ .map_err(|e| Error::Other(e.into()))?;
- Ok(())
- }
+ Ok(())
}
// -------------------------------------------------------------------------------------------
@@ -693,7 +698,7 @@ impl Client {
self.request::<lsp::request::Shutdown>(()).await
}
- pub fn exit(&self) -> impl Future<Output = Result<()>> {
+ pub fn exit(&self) {
self.notify::<lsp::notification::Exit>(())
}
@@ -701,7 +706,8 @@ impl Client {
/// early if server responds with an error.
pub async fn shutdown_and_exit(&self) -> Result<()> {
self.shutdown().await?;
- self.exit().await
+ self.exit();
+ Ok(())
}
/// Forcefully shuts down the language server ignoring any errors.
@@ -709,24 +715,21 @@ impl Client {
if let Err(e) = self.shutdown().await {
log::warn!("language server failed to terminate gracefully - {}", e);
}
- self.exit().await
+ self.exit();
+ Ok(())
}
// -------------------------------------------------------------------------------------------
// Workspace
// -------------------------------------------------------------------------------------------
- pub fn did_change_configuration(&self, settings: Value) -> impl Future<Output = Result<()>> {
+ pub fn did_change_configuration(&self, settings: Value) {
self.notify::<lsp::notification::DidChangeConfiguration>(
lsp::DidChangeConfigurationParams { settings },
)
}
- pub fn did_change_workspace(
- &self,
- added: Vec<WorkspaceFolder>,
- removed: Vec<WorkspaceFolder>,
- ) -> impl Future<Output = Result<()>> {
+ pub fn did_change_workspace(&self, added: Vec<WorkspaceFolder>, removed: Vec<WorkspaceFolder>) {
self.notify::<DidChangeWorkspaceFolders>(DidChangeWorkspaceFoldersParams {
event: WorkspaceFoldersChangeEvent { added, removed },
})
@@ -766,12 +769,7 @@ impl Client {
})
}
- pub fn did_rename(
- &self,
- old_path: &Path,
- new_path: &Path,
- is_dir: bool,
- ) -> Option<impl Future<Output = std::result::Result<(), Error>>> {
+ pub fn did_rename(&self, old_path: &Path, new_path: &Path, is_dir: bool) -> Option<()> {
let capabilities = self.file_operations_intests();
if !capabilities.did_rename.has_interest(new_path, is_dir) {
return None;
@@ -789,7 +787,8 @@ impl Client {
old_uri: url_from_path(old_path)?,
new_uri: url_from_path(new_path)?,
}];
- Some(self.notify::<lsp::notification::DidRenameFiles>(lsp::RenameFilesParams { files }))
+ self.notify::<lsp::notification::DidRenameFiles>(lsp::RenameFilesParams { files });
+ Some(())
}
// -------------------------------------------------------------------------------------------
@@ -802,7 +801,7 @@ impl Client {
version: i32,
doc: &Rope,
language_id: String,
- ) -> impl Future<Output = Result<()>> {
+ ) {
self.notify::<lsp::notification::DidOpenTextDocument>(lsp::DidOpenTextDocumentParams {
text_document: lsp::TextDocumentItem {
uri,
@@ -929,7 +928,7 @@ impl Client {
old_text: &Rope,
new_text: &Rope,
changes: &ChangeSet,
- ) -> Option<impl Future<Output = Result<()>>> {
+ ) -> Option<()> {
let capabilities = self.capabilities.get().unwrap();
// Return early if the server does not support document sync.
@@ -961,18 +960,14 @@ impl Client {
kind => unimplemented!("{:?}", kind),
};
- Some(self.notify::<lsp::notification::DidChangeTextDocument>(
- lsp::DidChangeTextDocumentParams {
- text_document,
- content_changes: changes,
- },
- ))
+ self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams {
+ text_document,
+ content_changes: changes,
+ });
+ Some(())
}
- pub fn text_document_did_close(
- &self,
- text_document: lsp::TextDocumentIdentifier,
- ) -> impl Future<Output = Result<()>> {
+ pub fn text_document_did_close(&self, text_document: lsp::TextDocumentIdentifier) {
self.notify::<lsp::notification::DidCloseTextDocument>(lsp::DidCloseTextDocumentParams {
text_document,
})
@@ -984,7 +979,7 @@ impl Client {
&self,
text_document: lsp::TextDocumentIdentifier,
text: &Rope,
- ) -> Option<impl Future<Output = Result<()>>> {
+ ) -> Option<()> {
let capabilities = self.capabilities.get().unwrap();
let include_text = match &capabilities.text_document_sync.as_ref()? {
@@ -1002,12 +997,11 @@ impl Client {
lsp::TextDocumentSyncCapability::Kind(..) => false,
};
- Some(self.notify::<lsp::notification::DidSaveTextDocument>(
- lsp::DidSaveTextDocumentParams {
- text_document,
- text: include_text.then_some(text.into()),
- },
- ))
+ self.notify::<lsp::notification::DidSaveTextDocument>(lsp::DidSaveTextDocumentParams {
+ text_document,
+ text: include_text.then_some(text.into()),
+ });
+ Some(())
}
pub fn completion(
@@ -1540,10 +1534,7 @@ impl Client {
Some(self.call::<lsp::request::ExecuteCommand>(params))
}
- pub fn did_change_watched_files(
- &self,
- changes: Vec<lsp::FileEvent>,
- ) -> impl Future<Output = std::result::Result<(), Error>> {
+ pub fn did_change_watched_files(&self, changes: Vec<lsp::FileEvent>) {
self.notify::<lsp::notification::DidChangeWatchedFiles>(lsp::DidChangeWatchedFilesParams {
changes,
})
diff --git a/helix-lsp/src/file_event.rs b/helix-lsp/src/file_event.rs
index c7297d67..5e7f8ca6 100644
--- a/helix-lsp/src/file_event.rs
+++ b/helix-lsp/src/file_event.rs
@@ -113,17 +113,13 @@ impl Handler {
"Sending didChangeWatchedFiles notification to client '{}'",
client.name()
);
- if let Err(err) = crate::block_on(client
- .did_change_watched_files(vec![lsp::FileEvent {
- uri,
- // We currently always send the CHANGED state
- // since we don't actually have more context at
- // the moment.
- typ: lsp::FileChangeType::CHANGED,
- }]))
- {
- log::warn!("Failed to send didChangeWatchedFiles notification to client: {err}");
- }
+ client.did_change_watched_files(vec![lsp::FileEvent {
+ uri,
+ // We currently always send the CHANGED state
+ // since we don't actually have more context at
+ // the moment.
+ typ: lsp::FileChangeType::CHANGED,
+ }]);
true
});
}
diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs
index 7ece350a..5eeea81c 100644
--- a/helix-lsp/src/lib.rs
+++ b/helix-lsp/src/lib.rs
@@ -900,17 +900,7 @@ fn start_client(
}
// next up, notify<initialized>
- let notification_result = _client
- .notify::<lsp::notification::Initialized>(lsp::InitializedParams {})
- .await;
-
- if let Err(e) = notification_result {
- log::error!(
- "failed to notify language server of its initialization: {}",
- e
- );
- return;
- }
+ _client.notify::<lsp::notification::Initialized>(lsp::InitializedParams {});
initialize_notify.notify_one();
});
diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs
index 00aa7390..ba74a817 100644
--- a/helix-term/src/application.rs
+++ b/helix-term/src/application.rs
@@ -722,7 +722,7 @@ impl Application {
// This might not be required by the spec but Neovim does this as well, so it's
// probably a good idea for compatibility.
if let Some(config) = language_server.config() {
- tokio::spawn(language_server.did_change_configuration(config.clone()));
+ language_server.did_change_configuration(config.clone());
}
let docs = self
@@ -740,12 +740,12 @@ impl Application {
let language_id =
doc.language_id().map(ToOwned::to_owned).unwrap_or_default();
- tokio::spawn(language_server.text_document_did_open(
+ language_server.text_document_did_open(
url,
doc.version(),
doc.text(),
language_id,
- ));
+ );
}
}
Notification::PublishDiagnostics(mut params) => {
@@ -1131,7 +1131,13 @@ impl Application {
}
};
- tokio::spawn(language_server!().reply(id, reply));
+ let language_server = language_server!();
+ if let Err(err) = language_server.reply(id.clone(), reply) {
+ log::error!(
+ "Failed to send reply to server '{}' request {id}: {err}",
+ language_server.name()
+ );
+ }
}
Call::Invalid { id } => log::error!("LSP invalid method call id={:?}", id),
}
diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs
index 277b582e..06a708f0 100644
--- a/helix-view/src/document.rs
+++ b/helix-view/src/document.rs
@@ -1056,13 +1056,8 @@ impl Document {
if !language_server.is_initialized() {
continue;
}
- if let Some(notification) = identifier
- .clone()
- .and_then(|id| language_server.text_document_did_save(id, &text))
- {
- if let Err(err) = notification.await {
- log::error!("Failed to send textDocument/didSave: {err}");
- }
+ if let Some(id) = identifier.clone() {
+ language_server.text_document_did_save(id, &text);
}
}
@@ -1462,16 +1457,12 @@ impl Document {
// TODO: move to hook
// emit lsp notification
for language_server in self.language_servers() {
- let notify = language_server.text_document_did_change(
+ let _ = language_server.text_document_did_change(
self.versioned_identifier(),
&old_doc,
self.text(),
changes,
);
-
- if let Some(notify) = notify {
- tokio::spawn(notify);
- }
}
}
diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs
index 00fe719d..739dcfb4 100644
--- a/helix-view/src/editor.rs
+++ b/helix-view/src/editor.rs
@@ -1416,9 +1416,7 @@ impl Editor {
if !ls.is_initialized() {
continue;
}
- if let Some(notification) = ls.did_rename(old_path, &new_path, is_dir) {
- tokio::spawn(notification);
- };
+ ls.did_rename(old_path, &new_path, is_dir);
}
self.language_servers
.file_event_handler
@@ -1441,7 +1439,7 @@ impl Editor {
}
// if we are open in LSPs send did_close notification
for language_server in doc.language_servers() {
- tokio::spawn(language_server.text_document_did_close(doc.identifier()));
+ language_server.text_document_did_close(doc.identifier());
}
}
// we need to clear the list of language servers here so that
@@ -1522,7 +1520,7 @@ impl Editor {
});
for (_, language_server) in doc_language_servers_not_in_registry {
- tokio::spawn(language_server.text_document_did_close(doc.identifier()));
+ language_server.text_document_did_close(doc.identifier());
}
let language_servers_not_in_doc = language_servers.iter().filter(|(name, ls)| {
@@ -1533,12 +1531,12 @@ impl Editor {
for (_, language_server) in language_servers_not_in_doc {
// TODO: this now races with on_init code if the init happens too quickly
- tokio::spawn(language_server.text_document_did_open(
+ language_server.text_document_did_open(
doc_url.clone(),
doc.version(),
doc.text(),
language_id.clone(),
- ));
+ );
}
doc.language_servers = language_servers;
@@ -1797,8 +1795,7 @@ impl Editor {
self.saves.remove(&doc_id);
for language_server in doc.language_servers() {
- // TODO: track error
- tokio::spawn(language_server.text_document_did_close(doc.identifier()));
+ language_server.text_document_did_close(doc.identifier());
}
enum Action {