html terminal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use crate::server::State;
use axum::extract::ws::{Message, WebSocket as RealWebSocket};
use futures_util::SinkExt;
use std::{
    sync::Arc,
    time::{Duration, Instant},
};
use tokio::{sync::broadcast::error::TryRecvError, task::JoinHandle};
use tokio_stream::StreamExt;

pub struct WebSocket(JoinHandle<()>);
impl WebSocket {
    pub async fn new(stream: RealWebSocket, state: Arc<State>) -> Self {
        let (mut sender, mut reciever) = futures::stream::StreamExt::split(stream);
        let mut stdout = state.stdout_html.subscribe();
        let ws_task = tokio::spawn(async move {
            dummy_print!("websocket");
            let mut last: Option<Instant> = None;
            let mut waiting: usize = 0;
            loop {
                let out = stdout.try_recv();
                let now = Instant::now();
                match out {
                    Err(e) => match e {
                        TryRecvError::Closed => fail!("closed"),
                        TryRecvError::Lagged(_) => continue, // no delay
                        _ => {
                            if let Some(earlier) = last {
                                let since = now.duration_since(earlier).as_millis();
                                if since > 200 || waiting > 15 {
                                    last.take();
                                    sender.flush().await.unwrap();
                                    waiting = 0;
                                    flush!();
                                }
                            }
                        }
                    },
                    Ok(m) => {
                        #[allow(unused_variables)]
                        for line in m.lines() {
                            input!("{line}");
                            if let Err(e) = sender.feed(Message::Text(line.to_owned())).await {
                                fail!("{e}");
                            };
                            waiting += 1;
                        }
                        last = Some(now);
                    }
                }
                match tokio::select! {
                    next = reciever.next() => next,
                    _ = async_std::task::sleep(Duration::from_millis(20)) =>continue,
                } {
                    Some(r) => match r {
                        Ok(m) => {
                            if let Message::Text(m) = m {
                                output!("{m}");
                                state.stdin.send(m).unwrap();
                            }
                        }
                        #[allow(unused_variables)]
                        Err(e) => {
                            fail!("{e}");
                        }
                    },
                    None => {
                        nooutput!();
                    }
                }
                async_std::task::sleep(Duration::from_millis(20)).await;
                continue;
            }
        });
        Self(ws_task)
    }

    pub async fn wait(self) {
        self.0.await.unwrap()
    }
}