Unnamed repository; edit this file 'description' to name the repository.
lt;()> { buffer.truncate(0); if err.read_line(buffer).await? == 0 { return Err(Error::StreamClosed); }; error!("err <- {}", buffer); Ok(()) } async fn send_payload_to_server( &self, server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>, mut payload: Payload, ) -> Result<()> { if let Payload::Request(request) = &mut payload { if let Some(back) = request.back_ch.take() { self.pending_requests.lock().await.insert(request.seq, back); } } let json = serde_json::to_string(&payload)?; self.send_string_to_server(server_stdin, json).await } async fn send_string_to_server( &self, server_stdin: &mut Box<dyn AsyncWrite + Unpin + Send>, request: String, ) -> Result<()> { info!("-> DAP {}", request); // send the headers server_stdin .write_all(format!("Content-Length: {}\r\n\r\n", request.len()).as_bytes()) .await?; // send the body server_stdin.write_all(request.as_bytes()).await?; server_stdin.flush().await?; Ok(()) } fn process_response(res: Response) -> Result<Response> { if res.success { info!("<- DAP success in response to {}", res.request_seq); Ok(res) } else { error!( "<- DAP error {:?} ({:?}) for command #{} {}", res.message, res.body, res.request_seq, res.command ); Err(Error::Other(anyhow::format_err!("{:?}", res.body))) } } async fn process_server_message( &self, client_tx: &UnboundedSender<Payload>, msg: Payload, ) -> Result<()> { match msg { Payload::Response(res) => { let request_seq = res.request_seq; let tx = self.pending_requests.lock().await.remove(&request_seq); match tx { Some(tx) => match tx.send(Self::process_response(res)).await { Ok(_) => (), Err(_) => error!( "Tried sending response into a closed channel (id={:?}), original request likely timed out", request_seq ), } None => { warn!("Response to nonexistent request #{}", res.request_seq); client_tx.send(Payload::Response(res)).expect("Failed to send"); } } Ok(()) } Payload::Request(Request { ref command, ref seq, .. }) => { info!("<- DAP request {} #{}", command, seq); client_tx.send(msg).expect("Failed to send"); Ok(()) } Payload::Event(ref event) => { info!("<- DAP event {:?}", event); client_tx.send(msg).expect("Failed to send"); Ok(()) } } } async fn recv( transport: Arc<Self>, mut server_stdout: Box<dyn AsyncBufRead + Unpin + Send>, client_tx: UnboundedSender<Payload>, ) { let mut recv_buffer = String::new(); loop { match Self::recv_server_message(&mut server_stdout, &mut recv_buffer).await { Ok(msg) => { transport .process_server_message(&client_tx, msg) .await .unwrap(); } Err(err) => { error!("err: <- {:?}", err); break; } } } } async fn send( transport: Arc<Self>, mut server_stdin: Box<dyn AsyncWrite + Unpin + Send>, mut client_rx: UnboundedReceiver<Payload>, ) { while let Some(payload) = client_rx.recv().await { transport .send_payload_to_server(&mut server_stdin, payload) .await .unwrap() } } async fn err(mut server_stderr: Box<dyn AsyncBufRead + Unpin + Send>) { let mut recv_buffer = String::new(); loop { match Self::recv_server_error(&mut server_stderr, &mut recv_buffer).await { Ok(_) => {} Err(err) => { error!("err: <- {:?}", err); break; } } } } }