Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/transport.rs')
| -rw-r--r-- | helix-dap/src/transport.rs | 36 |
1 files changed, 21 insertions, 15 deletions
diff --git a/helix-dap/src/transport.rs b/helix-dap/src/transport.rs index 6911e4e7..8ca408df 100644 --- a/helix-dap/src/transport.rs +++ b/helix-dap/src/transport.rs @@ -1,10 +1,10 @@ -use crate::{Error, Result}; +use crate::{registry::DebugAdapterId, Error, Result}; use anyhow::Context; use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::collections::HashMap; use std::sync::Arc; +use std::{collections::HashMap, fmt::Debug}; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::{ @@ -52,7 +52,7 @@ pub enum Payload { #[derive(Debug)] pub struct Transport { #[allow(unused)] - id: usize, + id: DebugAdapterId, pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>, } @@ -61,7 +61,7 @@ impl Transport { server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, server_stdin: Box<dyn AsyncWrite + Unpin + Send>, server_stderr: Option<Box<dyn AsyncBufRead + Unpin + Send>>, - id: usize, + id: DebugAdapterId, ) -> (UnboundedReceiver<Payload>, UnboundedSender<Payload>) { let (client_tx, rx) = unbounded_channel(); let (tx, client_rx) = unbounded_channel(); @@ -73,7 +73,7 @@ impl Transport { let transport = Arc::new(transport); - tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx)); + tokio::spawn(Self::recv(id, transport.clone(), server_stdout, client_tx)); tokio::spawn(Self::send(transport, server_stdin, client_rx)); if let Some(stderr) = server_stderr { tokio::spawn(Self::err(stderr)); @@ -83,6 +83,7 @@ impl Transport { } async fn recv_server_message( + id: DebugAdapterId, reader: &mut Box<dyn AsyncBufRead + Unpin + Send>, buffer: &mut String, content: &mut Vec<u8>, @@ -122,7 +123,7 @@ impl Transport { reader.read_exact(content).await?; let msg = std::str::from_utf8(content).context("invalid utf8 from server")?; - info!("<- DAP {}", msg); + info!("[{}] <- DAP {}", id, msg); // try parsing as output (server response) or call (server request) let output: serde_json::Result<Payload> = serde_json::from_str(msg); @@ -164,7 +165,7 @@ impl Transport { server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>, request: String, ) -> Result<()> { - info!("-> DAP {}", request); + info!("[{}] -> DAP {}", self.id, request); // send the headers server_stdin @@ -179,15 +180,18 @@ impl Transport { Ok(()) } - fn process_response(res: Response) -> Result<Response> { + fn process_response(&self, res: Response) -> Result<Response> { if res.success { - info!("<- DAP success in response to {}", res.request_seq); + info!( + "[{}] <- DAP success in response to {}", + self.id, res.request_seq + ); Ok(res) } else { error!( - "<- DAP error {:?} ({:?}) for command #{} {}", - res.message, res.body, res.request_seq, res.command + "[{}] <- DAP error {:?} ({:?}) for command #{} {}", + self.id, res.message, res.body, res.request_seq, res.command ); Err(Error::Other(anyhow::format_err!("{:?}", res.body))) @@ -205,7 +209,7 @@ impl Transport { let tx = self.pending_requests.lock().await.remove(&request_seq); match tx { - Some(tx) => match tx.send(Self::process_response(res)).await { + Some(tx) => match tx.send(self.process_response(res)).await { Ok(_) => (), Err(_) => error!( "Tried sending response into a closed channel (id={:?}), original request likely timed out", @@ -225,12 +229,12 @@ impl Transport { ref seq, .. }) => { - info!("<- DAP request {} #{}", command, seq); + info!("[{}] <- DAP request {} #{}", self.id, command, seq); client_tx.send(msg).expect("Failed to send"); Ok(()) } Payload::Event(ref event) => { - info!("<- DAP event {:?}", event); + info!("[{}] <- DAP event {:?}", self.id, event); client_tx.send(msg).expect("Failed to send"); Ok(()) } @@ -238,6 +242,7 @@ impl Transport { } async fn recv( + id: DebugAdapterId, transport: Arc<Self>, mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, client_tx: UnboundedSender<Payload>, @@ -246,6 +251,7 @@ impl Transport { let mut content_buffer = Vec::new(); loop { match Self::recv_server_message( + id, &mut server_stdout, &mut recv_buffer, &mut content_buffer, @@ -255,7 +261,7 @@ impl Transport { Ok(msg) => match transport.process_server_message(&client_tx, msg).await { Ok(_) => (), Err(err) => { - error!("err: <- {err:?}"); + error!(" [{id}] err: <- {err:?}"); break; } }, |