Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/transport.rs')
-rw-r--r--helix-dap/src/transport.rs44
1 files changed, 18 insertions, 26 deletions
diff --git a/helix-dap/src/transport.rs b/helix-dap/src/transport.rs
index fdd60226..6911e4e7 100644
--- a/helix-dap/src/transport.rs
+++ b/helix-dap/src/transport.rs
@@ -1,10 +1,10 @@
-use crate::{registry::DebugAdapterId, Error, Result};
+use crate::{Error, Result};
use anyhow::Context;
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
+use std::collections::HashMap;
use std::sync::Arc;
-use std::{collections::HashMap, fmt::Debug};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::{
@@ -52,7 +52,7 @@ pub enum Payload {
#[derive(Debug)]
pub struct Transport {
#[allow(unused)]
- id: DebugAdapterId,
+ id: usize,
pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>,
}
@@ -61,7 +61,7 @@ impl Transport {
server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
server_stderr: Option<Box<dyn AsyncBufRead + Unpin + Send>>,
- id: DebugAdapterId,
+ id: usize,
) -> (UnboundedReceiver<Payload>, UnboundedSender<Payload>) {
let (client_tx, rx) = unbounded_channel();
let (tx, client_rx) = unbounded_channel();
@@ -73,7 +73,7 @@ impl Transport {
let transport = Arc::new(transport);
- tokio::spawn(Self::recv(id, transport.clone(), server_stdout, client_tx));
+ tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx));
tokio::spawn(Self::send(transport, server_stdin, client_rx));
if let Some(stderr) = server_stderr {
tokio::spawn(Self::err(stderr));
@@ -83,7 +83,6 @@ impl Transport {
}
async fn recv_server_message(
- id: DebugAdapterId,
reader: &mut Box<dyn AsyncBufRead + Unpin + Send>,
buffer: &mut String,
content: &mut Vec<u8>,
@@ -123,16 +122,14 @@ impl Transport {
reader.read_exact(content).await?;
let msg = std::str::from_utf8(content).context("invalid utf8 from server")?;
- info!("[{}] <- DAP {}", id, msg);
+ info!("<- DAP {}", msg);
- // NOTE: We avoid using `?` here, since it would return early on error
- // and skip clearing `content`. By returning the result directly instead,
- // we ensure `content.clear()` is always called.
- let output = sonic_rs::from_slice(content).map_err(Into::into);
+ // try parsing as output (server response) or call (server request)
+ let output: serde_json::Result<Payload> = serde_json::from_str(msg);
content.clear();
- output
+ Ok(output?)
}
async fn recv_server_error(
@@ -167,7 +164,7 @@ impl Transport {
server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
request: String,
) -> Result<()> {
- info!("[{}] -> DAP {}", self.id, request);
+ info!("-> DAP {}", request);
// send the headers
server_stdin
@@ -182,18 +179,15 @@ impl Transport {
Ok(())
}
- fn process_response(&self, res: Response) -> Result<Response> {
+ fn process_response(res: Response) -> Result<Response> {
if res.success {
- info!(
- "[{}] <- DAP success in response to {}",
- self.id, res.request_seq
- );
+ info!("<- DAP success in response to {}", res.request_seq);
Ok(res)
} else {
error!(
- "[{}] <- DAP error {:?} ({:?}) for command #{} {}",
- self.id, res.message, res.body, res.request_seq, res.command
+ "<- DAP error {:?} ({:?}) for command #{} {}",
+ res.message, res.body, res.request_seq, res.command
);
Err(Error::Other(anyhow::format_err!("{:?}", res.body)))
@@ -211,7 +205,7 @@ impl Transport {
let tx = self.pending_requests.lock().await.remove(&request_seq);
match tx {
- Some(tx) => match tx.send(self.process_response(res)).await {
+ Some(tx) => match tx.send(Self::process_response(res)).await {
Ok(_) => (),
Err(_) => error!(
"Tried sending response into a closed channel (id={:?}), original request likely timed out",
@@ -231,12 +225,12 @@ impl Transport {
ref seq,
..
}) => {
- info!("[{}] <- DAP request {} #{}", self.id, command, seq);
+ info!("<- DAP request {} #{}", command, seq);
client_tx.send(msg).expect("Failed to send");
Ok(())
}
Payload::Event(ref event) => {
- info!("[{}] <- DAP event {:?}", self.id, event);
+ info!("<- DAP event {:?}", event);
client_tx.send(msg).expect("Failed to send");
Ok(())
}
@@ -244,7 +238,6 @@ impl Transport {
}
async fn recv(
- id: DebugAdapterId,
transport: Arc<Self>,
mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
client_tx: UnboundedSender<Payload>,
@@ -253,7 +246,6 @@ impl Transport {
let mut content_buffer = Vec::new();
loop {
match Self::recv_server_message(
- id,
&mut server_stdout,
&mut recv_buffer,
&mut content_buffer,
@@ -263,7 +255,7 @@ impl Transport {
Ok(msg) => match transport.process_server_message(&client_tx, msg).await {
Ok(_) => (),
Err(err) => {
- error!(" [{id}] err: <- {err:?}");
+ error!("err: <- {err:?}");
break;
}
},