html terminal
Diffstat (limited to 'src/server.rs')
| -rw-r--r-- | src/server.rs | 135 |
1 files changed, 48 insertions, 87 deletions
diff --git a/src/server.rs b/src/server.rs index 0cf6bc1..756c984 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,11 @@ -use crate::process::Process; +use crate::websocket::WebSocket; +use crate::{process::Process, webhook::Webhook}; +use axum::http::StatusCode; +use axum::response::{Redirect, Response}; use axum::{ extract::{ - ws::{Message, WebSocket, WebSocketUpgrade}, - Path, State as WsState, + ws::{Message, WebSocketUpgrade}, + Path, State as StateW, }, http::header::CONTENT_TYPE, response::{AppendHeaders, Html, IntoResponse}, @@ -12,25 +15,30 @@ use axum::{ use futures::sink::SinkExt; use minify_html::{minify, Cfg}; use paste::paste; +use std::sync::Mutex; use std::{ net::SocketAddr, sync::{Arc, OnceLock}, - time::{Duration, Instant}, }; -use tokio::sync::broadcast::{self, error::TryRecvError}; -use tokio_stream::StreamExt; +use tokio::sync::broadcast; -struct State { +pub struct State { // sent from the process to the websockets - stdout: broadcast::Sender<String>, - // sent by websockets to the process - stdin: broadcast::Sender<String>, + pub stdout_html: broadcast::Sender<String>, + pub stdout_plain: broadcast::Sender<String>, + // sent to the process + pub stdin: broadcast::Sender<String>, } impl State { fn new(stdin: broadcast::Sender<String>) -> Self { - let (stdout, _rx) = broadcast::channel(5); - Self { stdin, stdout } + let (stdout_html, _) = broadcast::channel(16); + let (stdout_plain, _) = broadcast::channel(2); + Self { + stdin, + stdout_html, + stdout_plain, + } } } @@ -77,7 +85,8 @@ impl Server { .route("/panel", html!(panel)) .route("/plaguess.png", png!(plaguess)) .route("/favicon.ico", png!(logo32)) - .route("/connect/:id", get(connect)) + .route("/connect/:id", get(connect_ws)) + .route("/hook/*k", get(connect_wh)) .with_state(state.clone()); let mut server_handle = tokio::spawn(async move { AxumServer::bind(&addr) @@ -85,7 +94,7 @@ impl Server { .await .unwrap() }); - let mut process_handle = proc.input(stdin).output(state.stdout.clone()).link(); + let mut process_handle = proc.input(stdin).with_state(&state).link(); tokio::select! { _ = (&mut server_handle) => process_handle.abort(), _ = (&mut process_handle) => server_handle.abort(), @@ -109,89 +118,41 @@ pub fn from_utf8(v: &[u8]) -> &str { unreachable!("invalid utf8") } -async fn connect( +fn matches(id: &str) -> bool { + std::env::var("ID").as_deref().unwrap_or("4") == id +} + +async fn connect_ws( ws: WebSocketUpgrade, - WsState(state): WsState<Arc<State>>, + StateW(state): StateW<Arc<State>>, Path(id): Path<String>, ) -> impl IntoResponse { ws.on_upgrade(|socket| async move { - if std::env::var("ID").unwrap_or_else(|_| "4".to_string()) != id { + if !matches(&id) { let mut s = futures::stream::StreamExt::split(socket).0; let _ = s.send(Message::Text("correct id".to_string())).await; return; } - setup(socket, state).await; + WebSocket::new(socket, state).await.wait().await; }) } -async fn setup(stream: WebSocket, state: Arc<State>) { - let (mut sender, mut reciever) = futures::stream::StreamExt::split(stream); - let mut stdout = state.stdout.subscribe(); - let ws_task = tokio::spawn(async move { - define_print!("websocket"); - let mut last: Option<Instant> = None; - let mut waiting: usize = 0; - loop { - nextiter!(); - let out = stdout.try_recv(); - let now = Instant::now(); - match out { - Err(e) => match e { - TryRecvError::Closed => fail!("closed"), - _ => { - if let Some(earlier) = last { - let since = now.duration_since(earlier).as_millis(); - if since > 600 || waiting > 10 { - last.take(); - sender.flush().await.unwrap(); - waiting = 0; - flush!(); - } - } - noinput!(); - // async_std::task::sleep(Duration::from_millis(500)).await; - // cont!(); - } - }, - Ok(m) => { - input!("{m}"); - if let Err(e) = sender.feed(Message::Text(m)).await { - fail!("{e}"); - }; - last = Some(now); - waiting += 1; - } - } - match tokio::select! { - next = reciever.next() => next, - _ = async_std::task::sleep(Duration::from_millis(100)) => cont!(), - } { - Some(r) => match r { - Ok(m) => { - if let Message::Text(m) = m { - output!("{m}"); - state.stdin.send(m).unwrap(); - } - } - Err(e) => { - fail!("{e}"); - } - }, - None => { - nooutput!() - } - } - async_std::task::sleep(Duration::from_millis(100)).await; - cont!(); +async fn connect_wh(StateW(state): StateW<Arc<State>>, Path(params): Path<String>) -> Response { + static WEBHOOK: Mutex<Option<Webhook>> = Mutex::new(None); //one slot + let (id, url) = { + match params.split_once('/') { + None => return StatusCode::BAD_REQUEST.into_response(), + Some((a, b)) => (a, b), } - }); - // let mut recv_task = tokio::spawn(async move { - // while let Some(Ok(Message::Text(m))) = reciever.try_next().await { - // println!("ws sent {m}"); - // state.stdin.send(m).unwrap(); - // } - // }); - - ws_task.await.unwrap(); - println!("websocket !! finish"); + }; + if !matches(id) { + return StatusCode::FORBIDDEN.into_response(); + } + if let Some(w) = WEBHOOK.lock().unwrap().as_ref() { + if w.running() { + return StatusCode::LOCKED.into_response(); + } + } + *WEBHOOK.lock().unwrap() = Some(Webhook::new(state.stdout_plain.subscribe(), url).await); + Redirect::to("/panel").into_response() } |