html terminal
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs135
1 files changed, 48 insertions, 87 deletions
diff --git a/src/server.rs b/src/server.rs
index 0cf6bc1..756c984 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,8 +1,11 @@
-use crate::process::Process;
+use crate::websocket::WebSocket;
+use crate::{process::Process, webhook::Webhook};
+use axum::http::StatusCode;
+use axum::response::{Redirect, Response};
use axum::{
extract::{
- ws::{Message, WebSocket, WebSocketUpgrade},
- Path, State as WsState,
+ ws::{Message, WebSocketUpgrade},
+ Path, State as StateW,
},
http::header::CONTENT_TYPE,
response::{AppendHeaders, Html, IntoResponse},
@@ -12,25 +15,30 @@ use axum::{
use futures::sink::SinkExt;
use minify_html::{minify, Cfg};
use paste::paste;
+use std::sync::Mutex;
use std::{
net::SocketAddr,
sync::{Arc, OnceLock},
- time::{Duration, Instant},
};
-use tokio::sync::broadcast::{self, error::TryRecvError};
-use tokio_stream::StreamExt;
+use tokio::sync::broadcast;
-struct State {
+pub struct State {
// sent from the process to the websockets
- stdout: broadcast::Sender<String>,
- // sent by websockets to the process
- stdin: broadcast::Sender<String>,
+ pub stdout_html: broadcast::Sender<String>,
+ pub stdout_plain: broadcast::Sender<String>,
+ // sent to the process
+ pub stdin: broadcast::Sender<String>,
}
impl State {
fn new(stdin: broadcast::Sender<String>) -> Self {
- let (stdout, _rx) = broadcast::channel(5);
- Self { stdin, stdout }
+ let (stdout_html, _) = broadcast::channel(16);
+ let (stdout_plain, _) = broadcast::channel(2);
+ Self {
+ stdin,
+ stdout_html,
+ stdout_plain,
+ }
}
}
@@ -77,7 +85,8 @@ impl Server {
.route("/panel", html!(panel))
.route("/plaguess.png", png!(plaguess))
.route("/favicon.ico", png!(logo32))
- .route("/connect/:id", get(connect))
+ .route("/connect/:id", get(connect_ws))
+ .route("/hook/*k", get(connect_wh))
.with_state(state.clone());
let mut server_handle = tokio::spawn(async move {
AxumServer::bind(&addr)
@@ -85,7 +94,7 @@ impl Server {
.await
.unwrap()
});
- let mut process_handle = proc.input(stdin).output(state.stdout.clone()).link();
+ let mut process_handle = proc.input(stdin).with_state(&state).link();
tokio::select! {
_ = (&mut server_handle) => process_handle.abort(),
_ = (&mut process_handle) => server_handle.abort(),
@@ -109,89 +118,41 @@ pub fn from_utf8(v: &[u8]) -> &str {
unreachable!("invalid utf8")
}
-async fn connect(
+fn matches(id: &str) -> bool {
+ std::env::var("ID").as_deref().unwrap_or("4") == id
+}
+
+async fn connect_ws(
ws: WebSocketUpgrade,
- WsState(state): WsState<Arc<State>>,
+ StateW(state): StateW<Arc<State>>,
Path(id): Path<String>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| async move {
- if std::env::var("ID").unwrap_or_else(|_| "4".to_string()) != id {
+ if !matches(&id) {
let mut s = futures::stream::StreamExt::split(socket).0;
let _ = s.send(Message::Text("correct id".to_string())).await;
return;
}
- setup(socket, state).await;
+ WebSocket::new(socket, state).await.wait().await;
})
}
-async fn setup(stream: WebSocket, state: Arc<State>) {
- let (mut sender, mut reciever) = futures::stream::StreamExt::split(stream);
- let mut stdout = state.stdout.subscribe();
- let ws_task = tokio::spawn(async move {
- define_print!("websocket");
- let mut last: Option<Instant> = None;
- let mut waiting: usize = 0;
- loop {
- nextiter!();
- let out = stdout.try_recv();
- let now = Instant::now();
- match out {
- Err(e) => match e {
- TryRecvError::Closed => fail!("closed"),
- _ => {
- if let Some(earlier) = last {
- let since = now.duration_since(earlier).as_millis();
- if since > 600 || waiting > 10 {
- last.take();
- sender.flush().await.unwrap();
- waiting = 0;
- flush!();
- }
- }
- noinput!();
- // async_std::task::sleep(Duration::from_millis(500)).await;
- // cont!();
- }
- },
- Ok(m) => {
- input!("{m}");
- if let Err(e) = sender.feed(Message::Text(m)).await {
- fail!("{e}");
- };
- last = Some(now);
- waiting += 1;
- }
- }
- match tokio::select! {
- next = reciever.next() => next,
- _ = async_std::task::sleep(Duration::from_millis(100)) => cont!(),
- } {
- Some(r) => match r {
- Ok(m) => {
- if let Message::Text(m) = m {
- output!("{m}");
- state.stdin.send(m).unwrap();
- }
- }
- Err(e) => {
- fail!("{e}");
- }
- },
- None => {
- nooutput!()
- }
- }
- async_std::task::sleep(Duration::from_millis(100)).await;
- cont!();
+async fn connect_wh(StateW(state): StateW<Arc<State>>, Path(params): Path<String>) -> Response {
+ static WEBHOOK: Mutex<Option<Webhook>> = Mutex::new(None); //one slot
+ let (id, url) = {
+ match params.split_once('/') {
+ None => return StatusCode::BAD_REQUEST.into_response(),
+ Some((a, b)) => (a, b),
}
- });
- // let mut recv_task = tokio::spawn(async move {
- // while let Some(Ok(Message::Text(m))) = reciever.try_next().await {
- // println!("ws sent {m}");
- // state.stdin.send(m).unwrap();
- // }
- // });
-
- ws_task.await.unwrap();
- println!("websocket !! finish");
+ };
+ if !matches(id) {
+ return StatusCode::FORBIDDEN.into_response();
+ }
+ if let Some(w) = WEBHOOK.lock().unwrap().as_ref() {
+ if w.running() {
+ return StatusCode::LOCKED.into_response();
+ }
+ }
+ *WEBHOOK.lock().unwrap() = Some(Webhook::new(state.stdout_plain.subscribe(), url).await);
+ Redirect::to("/panel").into_response()
}