Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-lsp/src/transport.rs')
| -rw-r--r-- | helix-lsp/src/transport.rs | 142 |
1 files changed, 50 insertions, 92 deletions
diff --git a/helix-lsp/src/transport.rs b/helix-lsp/src/transport.rs index fa4966c4..3e3e06ee 100644 --- a/helix-lsp/src/transport.rs +++ b/helix-lsp/src/transport.rs @@ -1,8 +1,4 @@ -use crate::{ - jsonrpc, - lsp::{self, notification::Notification as _}, - Error, LanguageServerId, Result, -}; +use crate::{jsonrpc, Error, Result}; use anyhow::Context; use log::{error, info}; use serde::{Deserialize, Serialize}; @@ -41,8 +37,7 @@ enum ServerMessage { #[derive(Debug)] pub struct Transport { - id: LanguageServerId, - name: String, + id: usize, pending_requests: Mutex<HashMap<jsonrpc::Id, Sender<Result<Value>>>>, } @@ -51,10 +46,9 @@ impl Transport { server_stdout: BufReader<ChildStdout>, server_stdin: BufWriter<ChildStdin>, server_stderr: BufReader<ChildStderr>, - id: LanguageServerId, - name: String, + id: usize, ) -> ( - UnboundedReceiver<(LanguageServerId, jsonrpc::Call)>, + UnboundedReceiver<(usize, jsonrpc::Call)>, UnboundedSender<Payload>, Arc<Notify>, ) { @@ -64,7 +58,6 @@ impl Transport { let transport = Self { id, - name, pending_requests: Mutex::new(HashMap::default()), }; @@ -90,15 +83,13 @@ impl Transport { async fn recv_server_message( reader: &mut (impl AsyncBufRead + Unpin + Send), buffer: &mut String, - content: &mut Vec<u8>, - language_server_name: &str, ) -> Result<ServerMessage> { let mut content_length = None; loop { - buffer.clear(); + buffer.truncate(0); if reader.read_line(buffer).await? == 0 { return Err(Error::StreamClosed); - } + }; // debug!("<- header {:?}", buffer); @@ -127,32 +118,29 @@ impl Transport { } let content_length = content_length.context("missing content length")?; - content.resize(content_length, 0); - reader.read_exact(content).await?; - let msg = std::str::from_utf8(content).context("invalid utf8 from server")?; - info!("{language_server_name} <- {msg}"); + //TODO: reuse vector + let mut content = vec![0; content_length]; + reader.read_exact(&mut content).await?; + let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?; - // NOTE: We avoid using `?` here, since it would return early on error - // and skip clearing `content`. By returning the result directly instead, - // we ensure `content.clear()` is always called. - let output = sonic_rs::from_slice(content).map_err(Into::into); + info!("<- {}", msg); - content.clear(); + // try parsing as output (server response) or call (server request) + let output: serde_json::Result<ServerMessage> = serde_json::from_str(msg); - output + Ok(output?) } async fn recv_server_error( err: &mut (impl AsyncBufRead + Unpin + Send), buffer: &mut String, - language_server_name: &str, ) -> Result<()> { buffer.truncate(0); if err.read_line(buffer).await? == 0 { return Err(Error::StreamClosed); }; - error!("{language_server_name} err <- {buffer:?}"); + error!("err <- {:?}", buffer); Ok(()) } @@ -174,17 +162,15 @@ impl Transport { Payload::Notification(value) => serde_json::to_string(&value)?, Payload::Response(error) => serde_json::to_string(&error)?, }; - self.send_string_to_server(server_stdin, json, &self.name) - .await + self.send_string_to_server(server_stdin, json).await } async fn send_string_to_server( &self, server_stdin: &mut BufWriter<ChildStdin>, request: String, - language_server_name: &str, ) -> Result<()> { - info!("{language_server_name} -> {request}"); + info!("-> {}", request); // send the headers server_stdin @@ -201,15 +187,11 @@ impl Transport { async fn process_server_message( &self, - client_tx: &UnboundedSender<(LanguageServerId, jsonrpc::Call)>, + client_tx: &UnboundedSender<(usize, jsonrpc::Call)>, msg: ServerMessage, - language_server_name: &str, ) -> Result<()> { match msg { - ServerMessage::Output(output) => { - self.process_request_response(output, language_server_name) - .await? - } + ServerMessage::Output(output) => self.process_request_response(output).await?, ServerMessage::Call(call) => { client_tx .send((self.id, call)) @@ -220,15 +202,14 @@ impl Transport { Ok(()) } - async fn process_request_response( - &self, - output: jsonrpc::Output, - language_server_name: &str, - ) -> Result<()> { + async fn process_request_response(&self, output: jsonrpc::Output) -> Result<()> { let (id, result) = match output { - jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => (id, Ok(result)), + jsonrpc::Output::Success(jsonrpc::Success { id, result, .. }) => { + info!("<- {}", result); + (id, Ok(result)) + } jsonrpc::Output::Failure(jsonrpc::Failure { id, error, .. }) => { - error!("{language_server_name} <- {error}"); + error!("<- {}", error); (id, Err(error.into())) } }; @@ -255,39 +236,21 @@ impl Transport { async fn recv( transport: Arc<Self>, mut server_stdout: BufReader<ChildStdout>, - client_tx: UnboundedSender<(LanguageServerId, jsonrpc::Call)>, + client_tx: UnboundedSender<(usize, jsonrpc::Call)>, ) { let mut recv_buffer = String::new(); - let mut content_buffer = Vec::new(); loop { - match Self::recv_server_message( - &mut server_stdout, - &mut recv_buffer, - &mut content_buffer, - &transport.name, - ) - .await - { + match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await { Ok(msg) => { - match transport - .process_server_message(&client_tx, msg, &transport.name) - .await - { + match transport.process_server_message(&client_tx, msg).await { Ok(_) => {} Err(err) => { - error!("{} err: <- {err:?}", transport.name); + error!("err: <- {:?}", err); break; } }; } - Err(err) => { - if !matches!(err, Error::StreamClosed) { - error!( - "Exiting {} after unexpected error: {err:?}", - &transport.name - ); - } - + Err(Error::StreamClosed) => { // Close any outstanding requests. for (id, tx) in transport.pending_requests.lock().await.drain() { match tx.send(Err(Error::StreamClosed)).await { @@ -299,14 +262,15 @@ impl Transport { } // Hack: inject a terminated notification so we trigger code that needs to happen after exit + use lsp_types::notification::Notification as _; let notification = ServerMessage::Call(jsonrpc::Call::Notification(jsonrpc::Notification { jsonrpc: None, - method: lsp::notification::Exit::METHOD.to_string(), + method: lsp_types::notification::Exit::METHOD.to_string(), params: jsonrpc::Params::None, })); match transport - .process_server_message(&client_tx, notification, &transport.name) + .process_server_message(&client_tx, notification) .await { Ok(_) => {} @@ -316,19 +280,21 @@ impl Transport { } break; } + Err(err) => { + error!("err: <- {:?}", err); + break; + } } } } - async fn err(transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) { + async fn err(_transport: Arc<Self>, mut server_stderr: BufReader<ChildStderr>) { let mut recv_buffer = String::new(); loop { - match Self::recv_server_error(&mut server_stderr, &mut recv_buffer, &transport.name) - .await - { + match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await { Ok(_) => {} Err(err) => { - error!("{} err: <- {err:?}", transport.name); + error!("err: <- {:?}", err); break; } } @@ -338,7 +304,7 @@ impl Transport { async fn send( transport: Arc<Self>, mut server_stdin: BufWriter<ChildStdin>, - client_tx: UnboundedSender<(LanguageServerId, jsonrpc::Call)>, + client_tx: UnboundedSender<(usize, jsonrpc::Call)>, mut client_rx: UnboundedReceiver<Payload>, initialize_notify: Arc<Notify>, ) { @@ -347,8 +313,8 @@ impl Transport { // Determine if a message is allowed to be sent early fn is_initialize(payload: &Payload) -> bool { - use lsp::{ - notification::Initialized, + use lsp_types::{ + notification::{Initialized, Notification}, request::{Initialize, Request}, }; match payload { @@ -365,11 +331,6 @@ impl Transport { } } - fn is_shutdown(payload: &Payload) -> bool { - use lsp::request::{Request, Shutdown}; - matches!(payload, Payload::Request { value: jsonrpc::MethodCall { method, .. }, .. } if method == Shutdown::METHOD) - } - // TODO: events that use capabilities need to do the right thing loop { @@ -379,18 +340,18 @@ impl Transport { // server successfully initialized is_pending = false; + use lsp_types::notification::Notification; // Hack: inject an initialized notification so we trigger code that needs to happen after init let notification = ServerMessage::Call(jsonrpc::Call::Notification(jsonrpc::Notification { jsonrpc: None, - method: lsp::notification::Initialized::METHOD.to_string(), + method: lsp_types::notification::Initialized::METHOD.to_string(), params: jsonrpc::Params::None, })); - let language_server_name = &transport.name; - match transport.process_server_message(&client_tx, notification, language_server_name).await { + match transport.process_server_message(&client_tx, notification).await { Ok(_) => {} Err(err) => { - error!("{language_server_name} err: <- {err:?}"); + error!("err: <- {:?}", err); } } @@ -400,17 +361,14 @@ impl Transport { match transport.send_payload_to_server(&mut server_stdin, msg).await { Ok(_) => {} Err(err) => { - error!("{language_server_name} err: <- {err:?}"); + error!("err: <- {:?}", err); } } } } msg = client_rx.recv() => { if let Some(msg) = msg { - if is_pending && is_shutdown(&msg) { - log::info!("Language server not initialized, shutting down"); - break; - } else if is_pending && !is_initialize(&msg) { + if is_pending && !is_initialize(&msg) { // ignore notifications if let Payload::Notification(_) = msg { continue; @@ -422,7 +380,7 @@ impl Transport { match transport.send_payload_to_server(&mut server_stdin, msg).await { Ok(_) => {} Err(err) => { - error!("{} err: <- {err:?}", transport.name); + error!("err: <- {:?}", err); } } } |