Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/transport.rs')
-rw-r--r--helix-dap/src/transport.rs116
1 files changed, 36 insertions, 80 deletions
diff --git a/helix-dap/src/transport.rs b/helix-dap/src/transport.rs
index fdd60226..dd03e568 100644
--- a/helix-dap/src/transport.rs
+++ b/helix-dap/src/transport.rs
@@ -1,10 +1,10 @@
-use crate::{registry::DebugAdapterId, Error, Result};
+use crate::{Error, Event, Result};
use anyhow::Context;
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
+use std::collections::HashMap;
use std::sync::Arc;
-use std::{collections::HashMap, fmt::Debug};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::{
@@ -32,17 +32,11 @@ pub struct Response {
pub body: Option<Value>,
}
-#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
-pub struct Event {
- pub event: String,
- pub body: Option<Value>,
-}
-
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum Payload {
// type = "event"
- Event(Event),
+ Event(Box<Event>),
// type = "response"
Response(Response),
// type = "request"
@@ -52,7 +46,7 @@ pub enum Payload {
#[derive(Debug)]
pub struct Transport {
#[allow(unused)]
- id: DebugAdapterId,
+ id: usize,
pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>,
}
@@ -61,7 +55,7 @@ impl Transport {
server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
server_stderr: Option<Box<dyn AsyncBufRead + Unpin + Send>>,
- id: DebugAdapterId,
+ id: usize,
) -> (UnboundedReceiver<Payload>, UnboundedSender<Payload>) {
let (client_tx, rx) = unbounded_channel();
let (tx, client_rx) = unbounded_channel();
@@ -73,7 +67,7 @@ impl Transport {
let transport = Arc::new(transport);
- tokio::spawn(Self::recv(id, transport.clone(), server_stdout, client_tx));
+ tokio::spawn(Self::recv(transport.clone(), server_stdout, client_tx));
tokio::spawn(Self::send(transport, server_stdin, client_rx));
if let Some(stderr) = server_stderr {
tokio::spawn(Self::err(stderr));
@@ -83,14 +77,12 @@ impl Transport {
}
async fn recv_server_message(
- id: DebugAdapterId,
reader: &mut Box<dyn AsyncBufRead + Unpin + Send>,
buffer: &mut String,
- content: &mut Vec<u8>,
) -> Result<Payload> {
let mut content_length = None;
loop {
- buffer.clear();
+ buffer.truncate(0);
if reader.read_line(buffer).await? == 0 {
return Err(Error::StreamClosed);
};
@@ -119,20 +111,18 @@ impl Transport {
}
let content_length = content_length.context("missing content length")?;
- content.resize(content_length, 0);
- reader.read_exact(content).await?;
- let msg = std::str::from_utf8(content).context("invalid utf8 from server")?;
- info!("[{}] <- DAP {}", id, msg);
+ //TODO: reuse vector
+ let mut content = vec![0; content_length];
+ reader.read_exact(&mut content).await?;
+ let msg = std::str::from_utf8(&content).context("invalid utf8 from server")?;
- // NOTE: We avoid using `?` here, since it would return early on error
- // and skip clearing `content`. By returning the result directly instead,
- // we ensure `content.clear()` is always called.
- let output = sonic_rs::from_slice(content).map_err(Into::into);
+ info!("<- DAP {}", msg);
- content.clear();
+ // try parsing as output (server response) or call (server request)
+ let output: serde_json::Result<Payload> = serde_json::from_str(msg);
- output
+ Ok(output?)
}
async fn recv_server_error(
@@ -167,7 +157,7 @@ impl Transport {
server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
request: String,
) -> Result<()> {
- info!("[{}] -> DAP {}", self.id, request);
+ info!("-> DAP {}", request);
// send the headers
server_stdin
@@ -182,18 +172,15 @@ impl Transport {
Ok(())
}
- fn process_response(&self, res: Response) -> Result<Response> {
+ fn process_response(res: Response) -> Result<Response> {
if res.success {
- info!(
- "[{}] <- DAP success in response to {}",
- self.id, res.request_seq
- );
+ info!("<- DAP success in response to {}", res.request_seq);
Ok(res)
} else {
error!(
- "[{}] <- DAP error {:?} ({:?}) for command #{} {}",
- self.id, res.message, res.body, res.request_seq, res.command
+ "<- DAP error {:?} ({:?}) for command #{} {}",
+ res.message, res.body, res.request_seq, res.command
);
Err(Error::Other(anyhow::format_err!("{:?}", res.body)))
@@ -211,7 +198,7 @@ impl Transport {
let tx = self.pending_requests.lock().await.remove(&request_seq);
match tx {
- Some(tx) => match tx.send(self.process_response(res)).await {
+ Some(tx) => match tx.send(Self::process_response(res)).await {
Ok(_) => (),
Err(_) => error!(
"Tried sending response into a closed channel (id={:?}), original request likely timed out",
@@ -231,12 +218,12 @@ impl Transport {
ref seq,
..
}) => {
- info!("[{}] <- DAP request {} #{}", self.id, command, seq);
+ info!("<- DAP request {} #{}", command, seq);
client_tx.send(msg).expect("Failed to send");
Ok(())
}
Payload::Event(ref event) => {
- info!("[{}] <- DAP event {:?}", self.id, event);
+ info!("<- DAP event {:?}", event);
client_tx.send(msg).expect("Failed to send");
Ok(())
}
@@ -244,68 +231,37 @@ impl Transport {
}
async fn recv(
- id: DebugAdapterId,
transport: Arc<Self>,
mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
client_tx: UnboundedSender<Payload>,
) {
let mut recv_buffer = String::new();
- let mut content_buffer = Vec::new();
loop {
- match Self::recv_server_message(
- id,
- &mut server_stdout,
- &mut recv_buffer,
- &mut content_buffer,
- )
- .await
- {
- Ok(msg) => match transport.process_server_message(&client_tx, msg).await {
- Ok(_) => (),
- Err(err) => {
- error!(" [{id}] err: <- {err:?}");
- break;
- }
- },
+ match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await {
+ Ok(msg) => {
+ transport
+ .process_server_message(&client_tx, msg)
+ .await
+ .unwrap();
+ }
Err(err) => {
- if !matches!(err, Error::StreamClosed) {
- error!("Exiting after unexpected error: {err:?}");
- }
-
- // Close any outstanding requests.
- for (id, tx) in transport.pending_requests.lock().await.drain() {
- match tx.send(Err(Error::StreamClosed)).await {
- Ok(_) => (),
- Err(_) => {
- error!("Could not close request on a closed channel (id={id})");
- }
- }
- }
+ error!("err: <- {:?}", err);
+ break;
}
}
}
}
- async fn send_inner(
+ async fn send(
transport: Arc<Self>,
mut server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
mut client_rx: UnboundedReceiver<Payload>,
- ) -> Result<()> {
+ ) {
while let Some(payload) = client_rx.recv().await {
transport
.send_payload_to_server(&mut server_stdin, payload)
- .await?;
- }
- Ok(())
- }
-
- async fn send(
- transport: Arc<Self>,
- server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
- client_rx: UnboundedReceiver<Payload>,
- ) {
- if let Err(err) = Self::send_inner(transport, server_stdin, client_rx).await {
- error!("err: <- {:?}", err);
+ .await
+ .unwrap()
}
}