html terminal
Diffstat (limited to 'src/websocket.rs')
-rw-r--r--src/websocket.rs81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/websocket.rs b/src/websocket.rs
new file mode 100644
index 0000000..75591d4
--- /dev/null
+++ b/src/websocket.rs
@@ -0,0 +1,81 @@
+use crate::server::State;
+use axum::extract::ws::{Message, WebSocket as RealWebSocket};
+use futures_util::SinkExt;
+use std::{
+ sync::Arc,
+ time::{Duration, Instant},
+};
+use tokio::{sync::broadcast::error::TryRecvError, task::JoinHandle};
+use tokio_stream::StreamExt;
+
+pub struct WebSocket(JoinHandle<()>);
+impl WebSocket {
+ pub async fn new(stream: RealWebSocket, state: Arc<State>) -> Self {
+ let (mut sender, mut reciever) = futures::stream::StreamExt::split(stream);
+ let mut stdout = state.stdout_html.subscribe();
+ let ws_task = tokio::spawn(async move {
+ dummy_print!("websocket");
+ let mut last: Option<Instant> = None;
+ let mut waiting: usize = 0;
+ loop {
+ let out = stdout.try_recv();
+ let now = Instant::now();
+ match out {
+ Err(e) => match e {
+ TryRecvError::Closed => fail!("closed"),
+ TryRecvError::Lagged(_) => continue, // no delay
+ _ => {
+ if let Some(earlier) = last {
+ let since = now.duration_since(earlier).as_millis();
+ if since > 200 || waiting > 15 {
+ last.take();
+ sender.flush().await.unwrap();
+ waiting = 0;
+ flush!();
+ }
+ }
+ }
+ },
+ Ok(m) => {
+ #[allow(unused_variables)]
+ for line in m.lines() {
+ input!("{line}");
+ if let Err(e) = sender.feed(Message::Text(line.to_owned())).await {
+ fail!("{e}");
+ };
+ waiting += 1;
+ }
+ last = Some(now);
+ }
+ }
+ match tokio::select! {
+ next = reciever.next() => next,
+ _ = async_std::task::sleep(Duration::from_millis(20)) =>continue,
+ } {
+ Some(r) => match r {
+ Ok(m) => {
+ if let Message::Text(m) = m {
+ output!("{m}");
+ state.stdin.send(m).unwrap();
+ }
+ }
+ #[allow(unused_variables)]
+ Err(e) => {
+ fail!("{e}");
+ }
+ },
+ None => {
+ nooutput!();
+ }
+ }
+ async_std::task::sleep(Duration::from_millis(20)).await;
+ continue;
+ }
+ });
+ Self(ws_task)
+ }
+
+ pub async fn wait(self) {
+ self.0.await.unwrap()
+ }
+}