Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/transport.rs')
-rw-r--r--helix-dap/src/transport.rs324
1 files changed, 0 insertions, 324 deletions
diff --git a/helix-dap/src/transport.rs b/helix-dap/src/transport.rs
deleted file mode 100644
index fdd60226..00000000
--- a/helix-dap/src/transport.rs
+++ /dev/null
@@ -1,324 +0,0 @@
-use crate::{registry::DebugAdapterId, Error, Result};
-use anyhow::Context;
-use log::{error, info, warn};
-use serde::{Deserialize, Serialize};
-use serde_json::Value;
-use std::sync::Arc;
-use std::{collections::HashMap, fmt::Debug};
-use tokio::{
- io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt},
- sync::{
- mpsc::{unbounded_channel, Sender, UnboundedReceiver, UnboundedSender},
- Mutex,
- },
-};
-
-#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct Request {
- #[serde(skip)]
- pub back_ch: Option<Sender<Result<Response>>>,
- pub seq: u64,
- pub command: String,
- pub arguments: Option<Value>,
-}
-
-#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
-pub struct Response {
- // seq is omitted as unused and is not sent by some implementations
- pub request_seq: u64,
- pub success: bool,
- pub command: String,
- pub message: Option<String>,
- pub body: Option<Value>,
-}
-
-#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
-pub struct Event {
- pub event: String,
- pub body: Option<Value>,
-}
-
-#[derive(Debug, Clone, Deserialize, Serialize)]
-#[serde(tag = "type", rename_all = "camelCase")]
-pub enum Payload {
- // type = "event"
- Event(Event),
- // type = "response"
- Response(Response),
- // type = "request"
- Request(Request),
-}
-
-#[derive(Debug)]
-pub struct Transport {
- #[allow(unused)]
- id: DebugAdapterId,
- pending_requests: Mutex<HashMap<u64, Sender<Result<Response>>>>,
-}
-
-impl Transport {
- pub fn start(
- server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
- server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
- server_stderr: Option<Box<dyn AsyncBufRead + Unpin + Send>>,
- id: DebugAdapterId,
- ) -> (UnboundedReceiver<Payload>, UnboundedSender<Payload>) {
- let (client_tx, rx) = unbounded_channel();
- let (tx, client_rx) = unbounded_channel();
-
- let transport = Self {
- id,
- pending_requests: Mutex::new(HashMap::default()),
- };
-
- let transport = Arc::new(transport);
-
- tokio::spawn(Self::recv(id, transport.clone(), server_stdout, client_tx));
- tokio::spawn(Self::send(transport, server_stdin, client_rx));
- if let Some(stderr) = server_stderr {
- tokio::spawn(Self::err(stderr));
- }
-
- (rx, tx)
- }
-
- async fn recv_server_message(
- id: DebugAdapterId,
- reader: &mut Box<dyn AsyncBufRead + Unpin + Send>,
- buffer: &mut String,
- content: &mut Vec<u8>,
- ) -> Result<Payload> {
- let mut content_length = None;
- loop {
- buffer.clear();
- if reader.read_line(buffer).await? == 0 {
- return Err(Error::StreamClosed);
- };
-
- if buffer == "\r\n" {
- // look for an empty CRLF line
- break;
- }
-
- let header = buffer.trim();
- let parts = header.split_once(": ");
-
- match parts {
- Some(("Content-Length", value)) => {
- content_length = Some(value.parse().context("invalid content length")?);
- }
- Some((_, _)) => {}
- None => {
- // Workaround: Some non-conformant language servers will output logging and other garbage
- // into the same stream as JSON-RPC messages. This can also happen from shell scripts that spawn
- // the server. Skip such lines and log a warning.
-
- // warn!("Failed to parse header: {:?}", header);
- }
- }
- }
-
- let content_length = content_length.context("missing content length")?;
- content.resize(content_length, 0);
- reader.read_exact(content).await?;
- let msg = std::str::from_utf8(content).context("invalid utf8 from server")?;
-
- info!("[{}] <- DAP {}", id, msg);
-
- // NOTE: We avoid using `?` here, since it would return early on error
- // and skip clearing `content`. By returning the result directly instead,
- // we ensure `content.clear()` is always called.
- let output = sonic_rs::from_slice(content).map_err(Into::into);
-
- content.clear();
-
- output
- }
-
- async fn recv_server_error(
- err: &mut (impl AsyncBufRead + Unpin + Send),
- buffer: &mut String,
- ) -> Result<()> {
- buffer.truncate(0);
- if err.read_line(buffer).await? == 0 {
- return Err(Error::StreamClosed);
- };
- error!("err <- {}", buffer);
-
- Ok(())
- }
-
- async fn send_payload_to_server(
- &self,
- server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
- mut payload: Payload,
- ) -> Result<()> {
- if let Payload::Request(request) = &mut payload {
- if let Some(back) = request.back_ch.take() {
- self.pending_requests.lock().await.insert(request.seq, back);
- }
- }
- let json = serde_json::to_string(&payload)?;
- self.send_string_to_server(server_stdin, json).await
- }
-
- async fn send_string_to_server(
- &self,
- server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>,
- request: String,
- ) -> Result<()> {
- info!("[{}] -> DAP {}", self.id, request);
-
- // send the headers
- server_stdin
- .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes())
- .await?;
-
- // send the body
- server_stdin.write_all(request.as_bytes()).await?;
-
- server_stdin.flush().await?;
-
- Ok(())
- }
-
- fn process_response(&self, res: Response) -> Result<Response> {
- if res.success {
- info!(
- "[{}] <- DAP success in response to {}",
- self.id, res.request_seq
- );
-
- Ok(res)
- } else {
- error!(
- "[{}] <- DAP error {:?} ({:?}) for command #{} {}",
- self.id, res.message, res.body, res.request_seq, res.command
- );
-
- Err(Error::Other(anyhow::format_err!("{:?}", res.body)))
- }
- }
-
- async fn process_server_message(
- &self,
- client_tx: &UnboundedSender<Payload>,
- msg: Payload,
- ) -> Result<()> {
- match msg {
- Payload::Response(res) => {
- let request_seq = res.request_seq;
- let tx = self.pending_requests.lock().await.remove(&request_seq);
-
- match tx {
- Some(tx) => match tx.send(self.process_response(res)).await {
- Ok(_) => (),
- Err(_) => error!(
- "Tried sending response into a closed channel (id={:?}), original request likely timed out",
- request_seq
- ),
- }
- None => {
- warn!("Response to nonexistent request #{}", res.request_seq);
- client_tx.send(Payload::Response(res)).expect("Failed to send");
- }
- }
-
- Ok(())
- }
- Payload::Request(Request {
- ref command,
- ref seq,
- ..
- }) => {
- info!("[{}] <- DAP request {} #{}", self.id, command, seq);
- client_tx.send(msg).expect("Failed to send");
- Ok(())
- }
- Payload::Event(ref event) => {
- info!("[{}] <- DAP event {:?}", self.id, event);
- client_tx.send(msg).expect("Failed to send");
- Ok(())
- }
- }
- }
-
- async fn recv(
- id: DebugAdapterId,
- transport: Arc<Self>,
- mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>,
- client_tx: UnboundedSender<Payload>,
- ) {
- let mut recv_buffer = String::new();
- let mut content_buffer = Vec::new();
- loop {
- match Self::recv_server_message(
- id,
- &mut server_stdout,
- &mut recv_buffer,
- &mut content_buffer,
- )
- .await
- {
- Ok(msg) => match transport.process_server_message(&client_tx, msg).await {
- Ok(_) => (),
- Err(err) => {
- error!(" [{id}] err: <- {err:?}");
- break;
- }
- },
- Err(err) => {
- if !matches!(err, Error::StreamClosed) {
- error!("Exiting after unexpected error: {err:?}");
- }
-
- // Close any outstanding requests.
- for (id, tx) in transport.pending_requests.lock().await.drain() {
- match tx.send(Err(Error::StreamClosed)).await {
- Ok(_) => (),
- Err(_) => {
- error!("Could not close request on a closed channel (id={id})");
- }
- }
- }
- }
- }
- }
- }
-
- async fn send_inner(
- transport: Arc<Self>,
- mut server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
- mut client_rx: UnboundedReceiver<Payload>,
- ) -> Result<()> {
- while let Some(payload) = client_rx.recv().await {
- transport
- .send_payload_to_server(&mut server_stdin, payload)
- .await?;
- }
- Ok(())
- }
-
- async fn send(
- transport: Arc<Self>,
- server_stdin: Box<dyn AsyncWrite + Unpin + Send>,
- client_rx: UnboundedReceiver<Payload>,
- ) {
- if let Err(err) = Self::send_inner(transport, server_stdin, client_rx).await {
- error!("err: <- {:?}", err);
- }
- }
-
- async fn err(mut server_stderr: Box<dyn AsyncBufRead + Unpin + Send>) {
- let mut recv_buffer = String::new();
- loop {
- match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await {
- Ok(_) => {}
- Err(err) => {
- error!("err: <- {:?}", err);
- break;
- }
- }
- }
- }
-}