Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/transport.rs')
| -rw-r--r-- | helix-dap/src/transport.rs | 106 |
1 files changed, 36 insertions, 70 deletions
diff --git a/helix-dap/src/transport.rs b/helix-dap/src/transport.rs index fdd60226..0f646b6a 100644 --- a/helix-dap/src/transport.rs +++ b/helix-dap/src/transport.rs @@ -1,10 +1,10 @@ -use crate::{registry::DebugAdapterId, Error, Result}; +use crate::{Error, Event, Result}; use anyhow::Context; use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::collections::HashMap; use std::sync::Arc; -use std::{collections::HashMap, fmt::Debug}; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::{ @@ -32,17 +32,11 @@ pub struct Response { pub body: Option<Value>, } -#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] -pub struct Event { - pub event: String, - pub body: Option<Value>, -} - #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(tag = "type", rename_all = "camelCase")] pub enum Payload { // type = "event" - Event(Event), + Event(Box<Event>), // type = "response" Response(Response), // type = "request" @@ -52,7 +46,7 @@ pub enum Payload { #[derive(Debug)] pub struct Transport { #[allow(unused)] - id: DebugAdapterId, + id: usize, pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>, } @@ -61,7 +55,7 @@ impl Transport { server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, server_stdin: Box<dyn AsyncWrite + Unpin + Send>, server_stderr: Option<Box<dyn AsyncBufRead + Unpin + Send>>, - id: DebugAdapterId, + id: usize, ) -> (UnboundedReceiver<Payload>, UnboundedSender<Payload>) { let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel(); @@ -73,7 +67,7 @@ impl Transport { let transport = Arc::new(transport); - tokio::spawn(Self::recv(id, transport.clone(), server_stdout, client_tx)); + tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::send(transport, server_stdin, client_rx)); if let Some(stderr) = server_stderr { tokio::spawn(Self::err(stderr)); @@ -83,14 +77,12 @@ impl Transport { } async fn recv_server_message( - id: DebugAdapterId, reader: &mut Box<dyn AsyncBufRead + Unpin + Send>, buffer: &mut String, - content: &mut Vec<u8>, ) -> Result<Payload> { let mut content_length = None; loop { - buffer.clear(); + buffer.truncate(0); if reader.read_line(buffer).await? == 0 { return Err(Error::StreamClosed); }; @@ -119,20 +111,18 @@ impl Transport { } let content_length = content_length.context("missing content length")?; - content.resize(content_length, 0); - reader.read_exact(content).await?; - let msg = std::str::from_utf8(content).context("invalid utf8 from server")?; - info!("[{}] <- DAP {}", id, msg); + //TODO: reuse vector + let mut content = vec![0; content_length]; + reader.read_exact(&mut content).await?; + let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?; - // NOTE: We avoid using `?` here, since it would return early on error - // and skip clearing `content`. By returning the result directly instead, - // we ensure `content.clear()` is always called. - let output = sonic_rs::from_slice(content).map_err(Into::into); + info!("<- DAP {}", msg); - content.clear(); + // try parsing as output (server response) or call (server request) + let output: serde_json::Result<Payload> = serde_json::from_str(msg); - output + Ok(output?) } async fn recv_server_error( @@ -167,7 +157,7 @@ impl Transport { server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>, request: String, ) -> Result<()> { - info!("[{}] -> DAP {}", self.id, request); + info!("-> DAP {}", request); // send the headers server_stdin @@ -182,18 +172,15 @@ impl Transport { Ok(()) } - fn process_response(&self, res: Response) -> Result<Response> { + fn process_response(res: Response) -> Result<Response> { if res.success { - info!( - "[{}] <- DAP success in response to {}", - self.id, res.request_seq - ); + info!("<- DAP success in response to {}", res.request_seq); Ok(res) } else { error!( - "[{}] <- DAP error {:?} ({:?}) for command #{} {}", - self.id, res.message, res.body, res.request_seq, res.command + "<- DAP error {:?} ({:?}) for command #{} {}", + res.message, res.body, res.request_seq, res.command ); Err(Error::Other(anyhow::format_err!("{:?}", res.body))) @@ -211,7 +198,7 @@ impl Transport { let tx = self.pending_requests.lock().await.remove(&request_seq); match tx { - Some(tx) => match tx.send(self.process_response(res)).await { + Some(tx) => match tx.send(Self::process_response(res)).await { Ok(_) => (), Err(_) => error!( "Tried sending response into a closed channel (id={:?}), original request likely timed out", @@ -231,58 +218,37 @@ impl Transport { ref seq, .. }) => { - info!("[{}] <- DAP request {} #{}", self.id, command, seq); + info!("<- DAP request {} #{}", command, seq); client_tx.send(msg).expect("Failed to send"); Ok(()) } Payload::Event(ref event) => { - info!("[{}] <- DAP event {:?}", self.id, event); + info!("<- DAP event {:?}", event); client_tx.send(msg).expect("Failed to send"); Ok(()) } } } - async fn recv( - id: DebugAdapterId, + async fn recv_inner( transport: Arc<Self>, mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, client_tx: UnboundedSender<Payload>, - ) { + ) -> Result<()> { let mut recv_buffer = String::new(); - let mut content_buffer = Vec::new(); loop { - match Self::recv_server_message( - id, - &mut server_stdout, - &mut recv_buffer, - &mut content_buffer, - ) - .await - { - Ok(msg) => match transport.process_server_message(&client_tx, msg).await { - Ok(_) => (), - Err(err) => { - error!(" [{id}] err: <- {err:?}"); - break; - } - }, - Err(err) => { - if !matches!(err, Error::StreamClosed) { - error!("Exiting after unexpected error: {err:?}"); - } + let msg = Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await?; + transport.process_server_message(&client_tx, msg).await?; + } + } - // Close any outstanding requests. - for (id, tx) in transport.pending_requests.lock().await.drain() { - match tx.send(Err(Error::StreamClosed)).await { - Ok(_) => (), - Err(_) => { - error!("Could not close request on a closed channel (id={id})"); - } - } - } - } - } + async fn recv( + transport: Arc<Self>, + server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, + client_tx: UnboundedSender<Payload>, + ) { + if let Err(err) = Self::recv_inner(transport, server_stdout, client_tx).await { + error!("err: <- {:?}", err); } } |