Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/client.rs')
| -rw-r--r-- | helix-dap/src/client.rs | 158 |
1 files changed, 50 insertions, 108 deletions
diff --git a/helix-dap/src/client.rs b/helix-dap/src/client.rs index fe4af188..371cf303 100644 --- a/helix-dap/src/client.rs +++ b/helix-dap/src/client.rs @@ -1,15 +1,14 @@ use crate::{ - registry::DebugAdapterId, - requests::{DisconnectArguments, TerminateArguments}, transport::{Payload, Request, Response, Transport}, types::*, - Error, Result, + Error, Result, ThreadId, }; -use helix_core::syntax::config::{DebugAdapterConfig, DebuggerQuirks}; +use helix_core::syntax::DebuggerQuirks; use serde_json::Value; use anyhow::anyhow; +pub use log::{error, info}; use std::{ collections::HashMap, future::Future, @@ -28,14 +27,10 @@ use tokio::{ #[derive(Debug)] pub struct Client { - id: DebugAdapterId, + id: usize, _process: Option<Child>, server_tx: UnboundedSender<Payload>, request_counter: AtomicU64, - connection_type: Option<ConnectionType>, - starting_request_args: Option<Value>, - /// The socket address of the debugger, if using TCP transport. - pub socket: Option<SocketAddr>, pub caps: Option<DebuggerCapabilities>, // thread_id -> frames pub stack_frames: HashMap<ThreadId, Vec<StackFrame>>, @@ -44,27 +39,26 @@ pub struct Client { /// Currently active frame for the current thread. pub active_frame: Option<usize>, pub quirks: DebuggerQuirks, - /// The config which was used to start this debugger. - pub config: Option<DebugAdapterConfig>, } impl Client { // Spawn a process and communicate with it by either TCP or stdio - // The returned stream includes the Client ID so consumers can differentiate between multiple clients pub async fn process( transport: &str, command: &str, args: Vec<&str>, port_arg: Option<&str>, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { if command.is_empty() { return Result::Err(Error::Other(anyhow!("Command not provided"))); } - match (transport, port_arg) { - ("tcp", Some(port_arg)) => Self::tcp_process(command, args, port_arg, id).await, - ("stdio", _) => Self::stdio(command, args, id), - _ => Result::Err(Error::Other(anyhow!("Incorrect transport {}", transport))), + if transport == "tcp" && port_arg.is_some() { + Self::tcp_process(command, args, port_arg.unwrap(), id).await + } else if transport == "stdio" { + Self::stdio(command, args, id) + } else { + Result::Err(Error::Other(anyhow!("Incorrect transport {}", transport))) } } @@ -72,11 +66,11 @@ impl Client { rx: Box<dyn AsyncBufRead + Unpin + Send>, tx: Box<dyn AsyncWrite + Unpin + Send>, err: Option<Box<dyn AsyncBufRead + Unpin + Send>>, - id: DebugAdapterId, + id: usize, process: Option<Child>, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + ) -> Result<(Self, UnboundedReceiver<Payload>)> { let (server_rx, server_tx) = Transport::start(rx, tx, err, id); - let (client_tx, client_rx) = unbounded_channel(); + let (client_rx, client_tx) = unbounded_channel(); let client = Self { id, @@ -84,26 +78,23 @@ impl Client { server_tx, request_counter: AtomicU64::new(0), caps: None, - connection_type: None, - starting_request_args: None, - socket: None, + // stack_frames: HashMap::new(), thread_states: HashMap::new(), thread_id: None, active_frame: None, quirks: DebuggerQuirks::default(), - config: None, }; - tokio::spawn(Self::recv(id, server_rx, client_tx)); + tokio::spawn(Self::recv(server_rx, client_rx)); - Ok((client, client_rx)) + Ok((client, client_tx)) } pub async fn tcp( addr: std::net::SocketAddr, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { let stream = TcpStream::connect(addr).await?; let (rx, tx) = stream.into_split(); Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), None, id, None) @@ -112,16 +103,15 @@ impl Client { pub fn stdio( cmd: &str, args: Vec<&str>, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { // Resolve path to the binary - let cmd = helix_stdx::env::which(cmd)?; + let cmd = which::which(cmd).map_err(|err| anyhow::anyhow!(err))?; let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::piped()) // make sure the process is reaped on drop .kill_on_drop(true) .spawn(); @@ -131,12 +121,16 @@ impl Client { // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); - let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); + let errors = process.stderr.take().map(BufReader::new); Self::streams( - Box::new(reader), + Box::new(BufReader::new(reader)), Box::new(writer), - Some(Box::new(stderr)), + // errors.map(|errors| Box::new(BufReader::new(errors))), + match errors { + Some(errors) => Some(Box::new(BufReader::new(errors))), + None => None, + }, id, Some(process), ) @@ -156,16 +150,12 @@ impl Client { ) } - pub fn starting_request_args(&self) -> Option<&Value> { - self.starting_request_args.as_ref() - } - pub async fn tcp_process( cmd: &str, args: Vec<&str>, port_format: &str, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { let port = Self::get_port().await.unwrap(); let process = Command::new(cmd) @@ -180,62 +170,45 @@ impl Client { // Wait for adapter to become ready for connection time::sleep(time::Duration::from_millis(500)).await; - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); - let stream = TcpStream::connect(socket).await?; + + let stream = TcpStream::connect(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + port, + )) + .await?; let (rx, tx) = stream.into_split(); - let mut result = Self::streams( + Self::streams( Box::new(BufReader::new(rx)), Box::new(tx), None, id, Some(process), - ); - - // Set the socket address for the client - if let Ok((client, _)) = &mut result { - client.socket = Some(socket); - } - - result + ) } - async fn recv( - id: DebugAdapterId, - mut server_rx: UnboundedReceiver<Payload>, - client_tx: UnboundedSender<(DebugAdapterId, Payload)>, - ) { + async fn recv(mut server_rx: UnboundedReceiver<Payload>, client_tx: UnboundedSender<Payload>) { while let Some(msg) = server_rx.recv().await { match msg { Payload::Event(ev) => { - client_tx - .send((id, Payload::Event(ev))) - .expect("Failed to send"); + client_tx.send(Payload::Event(ev)).expect("Failed to send"); } Payload::Response(_) => unreachable!(), Payload::Request(req) => { client_tx - .send((id, Payload::Request(req))) + .send(Payload::Request(req)) .expect("Failed to send"); } } } } - pub fn id(&self) -> DebugAdapterId { + pub fn id(&self) -> usize { self.id } - pub fn connection_type(&self) -> Option<ConnectionType> { - self.connection_type - } - fn next_request_id(&self) -> u64 { - // > The `seq` for the first message sent by a client or debug adapter - // > is 1, and for each subsequent message is 1 greater than the - // > previous message sent by that actor - // <https://microsoft.github.io/debug-adapter-protocol/specification#Base_Protocol_ProtocolMessage> - self.request_counter.fetch_add(1, Ordering::Relaxed) + 1 + self.request_counter.fetch_add(1, Ordering::Relaxed) } // Internal, called by specific DAP commands when resuming @@ -281,7 +254,7 @@ impl Client { // TODO: specifiable timeout, delay other calls until initialize success timeout(Duration::from_secs(20), callback_rx.recv()) .await - .map_err(|_| Error::Timeout(id))? // return Timeout + .map_err(|_| Error::Timeout)? // return Timeout .ok_or(Error::StreamClosed)? .map(|response| response.body.unwrap_or_default()) // TODO: check response.success @@ -361,43 +334,18 @@ impl Client { Ok(()) } - pub fn disconnect( - &mut self, - args: Option<DisconnectArguments>, - ) -> impl Future<Output = Result<Value>> { - self.connection_type = None; - self.call::<requests::Disconnect>(args) - } - - pub fn terminate( - &mut self, - args: Option<TerminateArguments>, - ) -> impl Future<Output = Result<Value>> { - self.connection_type = None; - self.call::<requests::Terminate>(args) + pub fn disconnect(&self) -> impl Future<Output = Result<Value>> { + self.call::<requests::Disconnect>(()) } - pub fn launch(&mut self, args: serde_json::Value) -> impl Future<Output = Result<Value>> { - self.connection_type = Some(ConnectionType::Launch); - self.starting_request_args = Some(args.clone()); + pub fn launch(&self, args: serde_json::Value) -> impl Future<Output = Result<Value>> { self.call::<requests::Launch>(args) } - pub fn attach(&mut self, args: serde_json::Value) -> impl Future<Output = Result<Value>> { - self.connection_type = Some(ConnectionType::Attach); - self.starting_request_args = Some(args.clone()); + pub fn attach(&self, args: serde_json::Value) -> impl Future<Output = Result<Value>> { self.call::<requests::Attach>(args) } - pub fn restart(&self) -> impl Future<Output = Result<Value>> { - let args = if let Some(args) = &self.starting_request_args { - args.clone() - } else { - Value::Null - }; - self.call::<requests::Restart>(args) - } - pub async fn set_breakpoints( &self, file: PathBuf, @@ -529,10 +477,4 @@ impl Client { self.call::<requests::SetExceptionBreakpoints>(args) } - - pub fn current_stack_frame(&self) -> Option<&StackFrame> { - self.stack_frames - .get(&self.thread_id?)? - .get(self.active_frame?) - } } |