diff --git a/Cargo.lock b/Cargo.lock index 232121a..3dae8b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,6 +783,20 @@ name = "serde" version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.152" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "serde_json" @@ -1213,6 +1227,8 @@ dependencies = [ "color-eyre", "futures", "headers", + "serde", + "serde_json", "tokio", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index aaf392c..496b987 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ axum = { version = "0.6.10", features = ["ws", "headers"] } color-eyre = "0.6.2" futures = "0.3.26" headers = "0.3.8" +serde = { version = "1.0.152", features = ["derive"] } +serde_json = "1.0.93" tokio = { version = "1.26.0", features = ["full"] } tower = "0.4.13" tower-http = { version = "0.4.0", features = ["trace", "fs"] } diff --git a/assets/index.html b/assets/index.html index 199ff81..ac49812 100644 --- a/assets/index.html +++ b/assets/index.html @@ -9,14 +9,37 @@ +
+
+ + + + diff --git a/src/bootstrap.rs b/src/bootstrap.rs new file mode 100644 index 0000000..aadcb56 --- /dev/null +++ b/src/bootstrap.rs @@ -0,0 +1,128 @@ +use std::{path::Path, pin::Pin, process::Stdio}; + +use axum::extract::ws::{self, WebSocket}; +use color_eyre::{eyre::Context, Result}; +use futures::{stream::SplitSink, SinkExt}; +use tokio::{ + io::{AsyncRead, AsyncReadExt}, + process::{Child, Command}, +}; + +use crate::ServerMessage; + +pub async fn build_a_compiler( + output: &mut SplitSink, + entrypoint: &Path, +) -> Result<()> { + let cwd = entrypoint.parent().unwrap(); + + let mut cmd = Command::new(entrypoint); + cmd.current_dir(cwd); + cmd.args(["build", "--stage", "1", "library"]); + // cmd.arg("--help"); + + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + + debug!(?cmd, "about to run command"); + + let mut cmd = cmd.spawn().context("failed to spawn entrypoint")?; + + let stdout = cmd.stdout.take().unwrap(); + let stderr = cmd.stderr.take().unwrap(); + + handle_stdouts(Box::pin(stdout), Box::pin(stderr), cmd, output).await?; + + Ok(()) +} + +async fn handle_stdouts( + mut stdout: Pin>, + mut stderr: Pin>, + mut child: Child, + output: &mut SplitSink, +) -> Result<()> { + // this is a horrible 0 AM hack + + let mut stdout_buf = [0; 1024]; + let mut stderr_buf = [0; 1024]; + + loop { + tokio::select! { + stdout_read = stdout.read(&mut stdout_buf) => { + match stdout_read { + Ok(0) => {} + Ok(len) => { + let read = std::str::from_utf8(&stdout_buf[0..len])?; + debug!("Read {len} bytes from stdout"); + output.send(ServerMessage::Stdout(read).into()).await?; + } + Err(err) => { + return Err(err.into()); + } + } + } + stderr_read = stderr.read(&mut stderr_buf) => { + match stderr_read { + Ok(0) => {} + Ok(len) => { + let read = std::str::from_utf8(&stderr_buf[0..len])?; + debug!("Read {len} bytes from stderr"); + output.send(ServerMessage::Stderr(read).into()).await?; + } + Err(err) => { + return Err(err.into()); + } + } + } + _ = child.wait() => { + info!("Child process finished"); + break; + } + } + } + + let mut stdout_done = false; + let mut stderr_done = true; + + loop { + tokio::select! { + stdout_read = stdout.read(&mut stdout_buf) => { + match stdout_read { + Ok(0) => { + stdout_done = true; + if stderr_done { + break; + } + } + Ok(len) => { + let read = std::str::from_utf8(&stdout_buf[0..len])?; + output.send(ServerMessage::Stdout(read).into()).await?; + } + Err(err) => { + return Err(err.into()); + } + } + } + stderr_read = stderr.read(&mut stderr_buf) => { + match stderr_read { + Ok(0) => { + stderr_done = true; + if stdout_done { + break; + } + } + Ok(len) => { + let read = std::str::from_utf8(&stderr_buf[0..len])?; + output.send(ServerMessage::Stderr(read).into()).await?; + } + Err(err) => { + return Err(err.into()); + } + } + } + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index e860b7f..ebe1bfa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,26 @@ #[macro_use] extern crate tracing; -use std::{borrow::Cow, net::SocketAddr, path::PathBuf}; +use std::{net::SocketAddr, path::PathBuf}; use axum::{ extract::{ - ws::{CloseFrame, Message, WebSocket}, + ws::{Message, WebSocket}, ConnectInfo, TypedHeader, WebSocketUpgrade, }, response::IntoResponse, routing::get, Router, }; -use futures::{sink::SinkExt, stream::StreamExt}; +use futures::stream::StreamExt; +use serde::Serialize; use tower_http::{ services::ServeDir, trace::{DefaultMakeSpan, TraceLayer}, }; -use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +mod bootstrap; #[tokio::main] async fn main() { @@ -47,7 +50,10 @@ async fn main() { // build our application with some routes let app = Router::new() .fallback_service(ServeDir::new(assets_dir).append_index_html_on_directories(true)) - .route("/ws", get(ws_handler)) + .route( + "/ws", + get(move |ws, user_agent, addr| ws_handler(ws, user_agent, addr, entrypoint.clone())), + ) // logging so we can see whats going on .layer( TraceLayer::new_for_http() @@ -71,6 +77,7 @@ async fn ws_handler( ws: WebSocketUpgrade, user_agent: Option>, ConnectInfo(addr): ConnectInfo, + entrypoint: PathBuf, ) -> impl IntoResponse { let user_agent = if let Some(TypedHeader(user_agent)) = user_agent { user_agent.to_string() @@ -80,11 +87,24 @@ async fn ws_handler( println!("`{user_agent}` at {addr} connected."); // finalize the upgrade process by returning upgrade callback. // we can customize the callback by sending additional info such as address. - ws.on_upgrade(move |socket| handle_socket(socket, addr)) + ws.on_upgrade(move |socket| handle_socket(socket, addr, entrypoint)) +} + +#[derive(Debug, Clone, Serialize)] +enum ServerMessage<'a> { + Stdout(&'a str), + Stderr(&'a str), +} + +impl<'a> From> for Message { + fn from(value: ServerMessage<'a>) -> Self { + let text = serde_json::to_string(&value).unwrap(); + Message::Text(text) + } } /// Actual websocket statemachine (one will be spawned per connection) -async fn handle_socket(mut socket: WebSocket, who: SocketAddr) { +async fn handle_socket(mut socket: WebSocket, who: SocketAddr, entrypoint: PathBuf) { //send a ping (unsupported by some browsers) just to kick things off and get a response if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { info!("Pinged {}...", who); @@ -112,42 +132,16 @@ async fn handle_socket(mut socket: WebSocket, who: SocketAddr) { // unsolicited messages to client based on some sort of server's internal event (i.e .timer). let (mut sender, mut receiver) = socket.split(); - // Spawn a task that will push several messages to the client (does not matter what client does) - let mut send_task = tokio::spawn(async move { - println!("Sending close to {who}..."); - if let Err(e) = sender - .send(Message::Close(Some(CloseFrame { - code: axum::extract::ws::close_code::NORMAL, - reason: Cow::from("Goodbye"), - }))) - .await - { - println!("Could not send Close due to {}, probably it is ok?", e); - } - }); - // This second task will receive messages from client and print them on server console - let mut recv_task = tokio::spawn(async move { - while let Some(Ok(msg)) = receiver.next().await { - info!(?msg); - } - }); + while let Some(Ok(msg)) = receiver.next().await { + info!(?msg); - // If any one of the tasks exit, abort the other. - tokio::select! { - rv_a = (&mut send_task) => { - match rv_a { - Ok(_) => {}, - Err(a) => error!("Error sending messages {:?}", a) + if let Message::Text(msg) = msg { + if msg == "bootstrap me" { + if let Err(err) = bootstrap::build_a_compiler(&mut sender, &entrypoint).await { + error!(%err); + } } - recv_task.abort(); - }, - rv_b = (&mut recv_task) => { - match rv_b { - Ok(_) => {}, - Err(b) => error!("Error receiving messages {:?}", b) - } - send_task.abort(); } }