html terminal
Diffstat (limited to 'src/server.rs')
| -rw-r--r-- | src/server.rs | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..0cf6bc1 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,197 @@ +use crate::process::Process; +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, State as WsState, + }, + http::header::CONTENT_TYPE, + response::{AppendHeaders, Html, IntoResponse}, + routing::get, + Router, Server as AxumServer, +}; +use futures::sink::SinkExt; +use minify_html::{minify, Cfg}; +use paste::paste; +use std::{ + net::SocketAddr, + sync::{Arc, OnceLock}, + time::{Duration, Instant}, +}; +use tokio::sync::broadcast::{self, error::TryRecvError}; +use tokio_stream::StreamExt; + +struct State { + // sent from the process to the websockets + stdout: broadcast::Sender<String>, + // sent by websockets to the process + stdin: broadcast::Sender<String>, +} + +impl State { + fn new(stdin: broadcast::Sender<String>) -> Self { + let (stdout, _rx) = broadcast::channel(5); + Self { stdin, stdout } + } +} + +macro_rules! html { + ($file:expr) => { + get(paste!( + || async { + static [<$file:upper>]: OnceLock<Vec<u8>> = OnceLock::new(); + Html(from_utf8([<$file:upper>].get_or_init(|| { + minify( + include_bytes!(concat!("../html/", stringify!($file), ".html")), + &Cfg { + minify_js: true, + minify_css: true, + ..Default::default() + }, + ) + })).replace("ws://localhost:4001/connect/", &format!("{}", std::env::var("URL").unwrap_or("ws://localhost:4001/connect/".to_string())))) + } + )) + }; +} + +macro_rules! png { + ($file:expr) => { + get(|| async { + { + ( + AppendHeaders([(CONTENT_TYPE, "image/png")]), + include_bytes!(concat!("../media/", stringify!($file), ".png")), + ) + } + }) + }; +} + +pub struct Server; +impl Server { + pub async fn spawn(addr: SocketAddr, proc: Process) { + let (stdin_tx, stdin) = broadcast::channel(2); + let state = Arc::new(State::new(stdin_tx)); + let router = Router::new() + .route("/", html!(index)) + .route("/panel", html!(panel)) + .route("/plaguess.png", png!(plaguess)) + .route("/favicon.ico", png!(logo32)) + .route("/connect/:id", get(connect)) + .with_state(state.clone()); + let mut server_handle = tokio::spawn(async move { + AxumServer::bind(&addr) + .serve(router.into_make_service()) + .await + .unwrap() + }); + let mut process_handle = proc.input(stdin).output(state.stdout.clone()).link(); + tokio::select! { + _ = (&mut server_handle) => process_handle.abort(), + _ = (&mut process_handle) => server_handle.abort(), + } + panic!("oh no"); + } +} + +/// like [String::from_utf8_lossy] but instead of being lossy it panics +pub fn from_utf8(v: &[u8]) -> &str { + let mut iter = std::str::Utf8Chunks::new(v); + if let Some(chunk) = iter.next() { + let valid = chunk.valid(); + if chunk.invalid().is_empty() { + debug_assert_eq!(valid.len(), v.len()); + return valid; + } + } else { + return ""; + }; + unreachable!("invalid utf8") +} + +async fn connect( + ws: WebSocketUpgrade, + WsState(state): WsState<Arc<State>>, + Path(id): Path<String>, +) -> impl IntoResponse { + ws.on_upgrade(|socket| async move { + if std::env::var("ID").unwrap_or_else(|_| "4".to_string()) != id { + let mut s = futures::stream::StreamExt::split(socket).0; + let _ = s.send(Message::Text("correct id".to_string())).await; + return; + } + setup(socket, state).await; + }) +} + +async fn setup(stream: WebSocket, state: Arc<State>) { + let (mut sender, mut reciever) = futures::stream::StreamExt::split(stream); + let mut stdout = state.stdout.subscribe(); + let ws_task = tokio::spawn(async move { + define_print!("websocket"); + let mut last: Option<Instant> = None; + let mut waiting: usize = 0; + loop { + nextiter!(); + let out = stdout.try_recv(); + let now = Instant::now(); + match out { + Err(e) => match e { + TryRecvError::Closed => fail!("closed"), + _ => { + if let Some(earlier) = last { + let since = now.duration_since(earlier).as_millis(); + if since > 600 || waiting > 10 { + last.take(); + sender.flush().await.unwrap(); + waiting = 0; + flush!(); + } + } + noinput!(); + // async_std::task::sleep(Duration::from_millis(500)).await; + // cont!(); + } + }, + Ok(m) => { + input!("{m}"); + if let Err(e) = sender.feed(Message::Text(m)).await { + fail!("{e}"); + }; + last = Some(now); + waiting += 1; + } + } + match tokio::select! { + next = reciever.next() => next, + _ = async_std::task::sleep(Duration::from_millis(100)) => cont!(), + } { + Some(r) => match r { + Ok(m) => { + if let Message::Text(m) = m { + output!("{m}"); + state.stdin.send(m).unwrap(); + } + } + Err(e) => { + fail!("{e}"); + } + }, + None => { + nooutput!() + } + } + async_std::task::sleep(Duration::from_millis(100)).await; + cont!(); + } + }); + // let mut recv_task = tokio::spawn(async move { + // while let Some(Ok(Message::Text(m))) = reciever.try_next().await { + // println!("ws sent {m}"); + // state.stdin.send(m).unwrap(); + // } + // }); + + ws_task.await.unwrap(); + println!("websocket !! finish"); +} |