Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'helix-dap/src/client.rs')
| -rw-r--r-- | helix-dap/src/client.rs | 108 |
1 files changed, 44 insertions, 64 deletions
diff --git a/helix-dap/src/client.rs b/helix-dap/src/client.rs index fe4af188..55ebab57 100644 --- a/helix-dap/src/client.rs +++ b/helix-dap/src/client.rs @@ -1,11 +1,10 @@ use crate::{ - registry::DebugAdapterId, - requests::{DisconnectArguments, TerminateArguments}, + requests::DisconnectArguments, transport::{Payload, Request, Response, Transport}, types::*, - Error, Result, + Error, Result, ThreadId, }; -use helix_core::syntax::config::{DebugAdapterConfig, DebuggerQuirks}; +use helix_core::syntax::DebuggerQuirks; use serde_json::Value; @@ -28,14 +27,12 @@ use tokio::{ #[derive(Debug)] pub struct Client { - id: DebugAdapterId, + id: usize, _process: Option<Child>, server_tx: UnboundedSender<Payload>, request_counter: AtomicU64, connection_type: Option<ConnectionType>, starting_request_args: Option<Value>, - /// The socket address of the debugger, if using TCP transport. - pub socket: Option<SocketAddr>, pub caps: Option<DebuggerCapabilities>, // thread_id -> frames pub stack_frames: HashMap<ThreadId, Vec<StackFrame>>, @@ -44,20 +41,23 @@ pub struct Client { /// Currently active frame for the current thread. pub active_frame: Option<usize>, pub quirks: DebuggerQuirks, - /// The config which was used to start this debugger. - pub config: Option<DebugAdapterConfig>, +} + +#[derive(Clone, Copy, Debug)] +pub enum ConnectionType { + Launch, + Attach, } impl Client { // Spawn a process and communicate with it by either TCP or stdio - // The returned stream includes the Client ID so consumers can differentiate between multiple clients pub async fn process( transport: &str, command: &str, args: Vec<&str>, port_arg: Option<&str>, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { if command.is_empty() { return Result::Err(Error::Other(anyhow!("Command not provided"))); } @@ -72,9 +72,9 @@ impl Client { rx: Box<dyn AsyncBufRead + Unpin + Send>, tx: Box<dyn AsyncWrite + Unpin + Send>, err: Option<Box<dyn AsyncBufRead + Unpin + Send>>, - id: DebugAdapterId, + id: usize, process: Option<Child>, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + ) -> Result<(Self, UnboundedReceiver<Payload>)> { let (server_rx, server_tx) = Transport::start(rx, tx, err, id); let (client_tx, client_rx) = unbounded_channel(); @@ -86,24 +86,22 @@ impl Client { caps: None, connection_type: None, starting_request_args: None, - socket: None, stack_frames: HashMap::new(), thread_states: HashMap::new(), thread_id: None, active_frame: None, quirks: DebuggerQuirks::default(), - config: None, }; - tokio::spawn(Self::recv(id, server_rx, client_tx)); + tokio::spawn(Self::recv(server_rx, client_tx)); Ok((client, client_rx)) } pub async fn tcp( addr: std::net::SocketAddr, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { let stream = TcpStream::connect(addr).await?; let (rx, tx) = stream.into_split(); Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), None, id, None) @@ -112,16 +110,15 @@ impl Client { pub fn stdio( cmd: &str, args: Vec<&str>, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { // Resolve path to the binary - let cmd = helix_stdx::env::which(cmd)?; + let cmd = which::which(cmd).map_err(|err| anyhow::anyhow!(err))?; let process = Command::new(cmd) .args(args) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::piped()) // make sure the process is reaped on drop .kill_on_drop(true) .spawn(); @@ -131,12 +128,16 @@ impl Client { // TODO: do we need bufreader/writer here? or do we use async wrappers on unblock? let writer = BufWriter::new(process.stdin.take().expect("Failed to open stdin")); let reader = BufReader::new(process.stdout.take().expect("Failed to open stdout")); - let stderr = BufReader::new(process.stderr.take().expect("Failed to open stderr")); + let errors = process.stderr.take().map(BufReader::new); Self::streams( - Box::new(reader), + Box::new(BufReader::new(reader)), Box::new(writer), - Some(Box::new(stderr)), + // errors.map(|errors| Box::new(BufReader::new(errors))), + match errors { + Some(errors) => Some(Box::new(BufReader::new(errors))), + None => None, + }, id, Some(process), ) @@ -156,16 +157,16 @@ impl Client { ) } - pub fn starting_request_args(&self) -> Option<&Value> { - self.starting_request_args.as_ref() + pub fn starting_request_args(&self) -> &Option<Value> { + &self.starting_request_args } pub async fn tcp_process( cmd: &str, args: Vec<&str>, port_format: &str, - id: DebugAdapterId, - ) -> Result<(Self, UnboundedReceiver<(DebugAdapterId, Payload)>)> { + id: usize, + ) -> Result<(Self, UnboundedReceiver<Payload>)> { let port = Self::get_port().await.unwrap(); let process = Command::new(cmd) @@ -180,49 +181,40 @@ impl Client { // Wait for adapter to become ready for connection time::sleep(time::Duration::from_millis(500)).await; - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port); - let stream = TcpStream::connect(socket).await?; + + let stream = TcpStream::connect(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + port, + )) + .await?; let (rx, tx) = stream.into_split(); - let mut result = Self::streams( + Self::streams( Box::new(BufReader::new(rx)), Box::new(tx), None, id, Some(process), - ); - - // Set the socket address for the client - if let Ok((client, _)) = &mut result { - client.socket = Some(socket); - } - - result + ) } - async fn recv( - id: DebugAdapterId, - mut server_rx: UnboundedReceiver<Payload>, - client_tx: UnboundedSender<(DebugAdapterId, Payload)>, - ) { + async fn recv(mut server_rx: UnboundedReceiver<Payload>, client_tx: UnboundedSender<Payload>) { while let Some(msg) = server_rx.recv().await { match msg { Payload::Event(ev) => { - client_tx - .send((id, Payload::Event(ev))) - .expect("Failed to send"); + client_tx.send(Payload::Event(ev)).expect("Failed to send"); } Payload::Response(_) => unreachable!(), Payload::Request(req) => { client_tx - .send((id, Payload::Request(req))) + .send(Payload::Request(req)) .expect("Failed to send"); } } } } - pub fn id(&self) -> DebugAdapterId { + pub fn id(&self) -> usize { self.id } @@ -231,11 +223,7 @@ impl Client { } fn next_request_id(&self) -> u64 { - // > The `seq` for the first message sent by a client or debug adapter - // > is 1, and for each subsequent message is 1 greater than the - // > previous message sent by that actor - // <https://microsoft.github.io/debug-adapter-protocol/specification#Base_Protocol_ProtocolMessage> - self.request_counter.fetch_add(1, Ordering::Relaxed) + 1 + self.request_counter.fetch_add(1, Ordering::Relaxed) } // Internal, called by specific DAP commands when resuming @@ -369,14 +357,6 @@ impl Client { self.call::<requests::Disconnect>(args) } - pub fn terminate( - &mut self, - args: Option<TerminateArguments>, - ) -> impl Future<Output = Result<Value>> { - self.connection_type = None; - self.call::<requests::Terminate>(args) - } - pub fn launch(&mut self, args: serde_json::Value) -> impl Future<Output = Result<Value>> { self.connection_type = Some(ConnectionType::Launch); self.starting_request_args = Some(args.clone()); |