html terminal
Diffstat (limited to 'src/websocket.rs')
| -rw-r--r-- | src/websocket.rs | 108 |
1 files changed, 50 insertions, 58 deletions
diff --git a/src/websocket.rs b/src/websocket.rs index 75591d4..df614f4 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -7,75 +7,67 @@ use std::{ }; use tokio::{sync::broadcast::error::TryRecvError, task::JoinHandle}; use tokio_stream::StreamExt; - pub struct WebSocket(JoinHandle<()>); impl WebSocket { - pub async fn new(stream: RealWebSocket, state: Arc<State>) -> Self { + pub async fn spawn(stream: RealWebSocket, state: Arc<State>) { let (mut sender, mut reciever) = futures::stream::StreamExt::split(stream); let mut stdout = state.stdout_html.subscribe(); - let ws_task = tokio::spawn(async move { - dummy_print!("websocket"); - let mut last: Option<Instant> = None; - let mut waiting: usize = 0; - loop { - let out = stdout.try_recv(); - let now = Instant::now(); - match out { - Err(e) => match e { - TryRecvError::Closed => fail!("closed"), - TryRecvError::Lagged(_) => continue, // no delay - _ => { - if let Some(earlier) = last { - let since = now.duration_since(earlier).as_millis(); - if since > 200 || waiting > 15 { - last.take(); - sender.flush().await.unwrap(); - waiting = 0; - flush!(); - } + dummy_print!("websocket"); + let mut last: Option<Instant> = None; + let mut waiting: usize = 0; + loop { + let out = stdout.try_recv(); + let now = Instant::now(); + match out { + Err(e) => match e { + TryRecvError::Closed => fail!("closed"), + TryRecvError::Lagged(_) => continue, // no delay + _ => { + if let Some(earlier) = last { + let since = now.duration_since(earlier).as_millis(); + if since > 200 || waiting > 15 { + last.take(); + sender.flush().await.unwrap(); + waiting = 0; + flush!(); } } - }, - Ok(m) => { - #[allow(unused_variables)] - for line in m.lines() { - input!("{line}"); - if let Err(e) = sender.feed(Message::Text(line.to_owned())).await { - fail!("{e}"); - }; - waiting += 1; - } - last = Some(now); } - } - match tokio::select! { - next = reciever.next() => next, - _ = async_std::task::sleep(Duration::from_millis(20)) =>continue, - } { - Some(r) => match r { - Ok(m) => { - if let Message::Text(m) = m { - output!("{m}"); - state.stdin.send(m).unwrap(); - } - } - #[allow(unused_variables)] - Err(e) => { + }, + Ok(m) => { + #[allow(unused_variables)] + for line in m.lines() { + input!("{line}"); + if let Err(e) = sender.feed(Message::Text(line.to_owned())).await { fail!("{e}"); + }; + waiting += 1; + } + last = Some(now); + } + } + match tokio::select! { + next = reciever.next() => next, + _ = async_std::task::sleep(Duration::from_millis(20)) => continue, + } { + Some(r) => match r { + Ok(m) => { + if let Message::Text(m) = m { + output!("{m}"); + state.stdin.send(m).unwrap(); } - }, - None => { - nooutput!(); } + #[allow(unused_variables)] + Err(e) => { + fail!("{e}"); + } + }, + None => { + nooutput!(); } - async_std::task::sleep(Duration::from_millis(20)).await; - continue; } - }); - Self(ws_task) - } - - pub async fn wait(self) { - self.0.await.unwrap() + async_std::task::sleep(Duration::from_millis(20)).await; + continue; + } } } |