child module

This commit is contained in:
nora 2021-12-19 15:51:45 +01:00
parent 267a67e441
commit d69c1be8c8
5 changed files with 151 additions and 108 deletions

4
.gitignore vendored
View file

@ -1,3 +1,5 @@
/target
.idea
*.iml
*.iml
service-manager.log

View file

@ -1,6 +0,0 @@
0.000298909s INFO service_manager: Starting service-manager...
at src/main.rs:94
0.002233976s INFO service_manager::controller: Entering main loop
at src/controller.rs:23

129
src/controller/child.rs Normal file
View file

@ -0,0 +1,129 @@
use crate::controller::{StdioSendBuf, STDIO_SEND_BUF_SIZE};
use crate::model::{ServiceStatus, SmError, SmResult};
use std::io::{Read, Write};
use std::process::{Child, ChildStderr};
use std::sync::mpsc::TryRecvError;
use std::sync::{mpsc, Arc, Mutex};
use std::{io, thread};
use tracing::error;
pub fn child_process_thread(
child: Child,
stdout_send: mpsc::Sender<StdioSendBuf>,
service_status: Arc<Mutex<ServiceStatus>>,
service_name: String,
terminate_channel: mpsc::Receiver<()>,
) -> SmResult {
let mut child = child;
let mut stdout = child
.stdout
.take()
.ok_or(SmError::Bug("Stdout of child could not be taken"))?;
let stderr = child
.stderr
.take()
.ok_or(SmError::Bug("Stderr of child could not be taken"))?;
let (stderr_terminate_send, stderr_terminate_recv) = mpsc::channel();
let stdout_send_2 = stdout_send.clone();
let stderr_thread_result = thread::Builder::new()
.name(format!("worker-stderr-({})", service_name))
.spawn(move || child_process_stderr_thread(stdout_send_2, stderr_terminate_recv, stderr));
if let Err(err) = stderr_thread_result {
error!(error = %err, "Failed to spawn stderr thread");
}
let result = loop {
match terminate_channel.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
// terminating the thread is a best-effort, it doesn't matter if it died
let _ = stderr_terminate_send.send(());
break Ok(());
}
Err(TryRecvError::Empty) => {}
}
let mut stdout_buf = [0; STDIO_SEND_BUF_SIZE];
match stdout.read(&mut stdout_buf) {
Ok(0) => {}
Ok(n) => {
stdout_send
.send((stdout_buf, n))
.map_err(|_| SmError::Bug("Failed to send stdout to main thread"))?;
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => break Err(e.into()),
};
match child.try_wait() {
Ok(None) => {}
Ok(Some(status)) => {
let mut status_lock = service_status.lock().map_err(|_| SmError::MutexPoisoned)?;
*status_lock = match status.code() {
Some(0) => ServiceStatus::Exited,
Some(code) => ServiceStatus::Failed(code),
None => ServiceStatus::Killed,
};
return Ok(());
}
Err(e) => break Err(e.into()),
}
};
match child.kill() {
Ok(()) => {
*service_status.lock().map_err(|_| SmError::MutexPoisoned)? = ServiceStatus::Killed
}
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {}
Err(e) => return Err(e.into()),
}
let mut send_message_buf = [0; STDIO_SEND_BUF_SIZE];
let kill_msg = "\n\n<Process was killed>\n";
send_message_buf
.as_mut_slice()
.write_all(kill_msg.as_bytes())?;
stdout_send
.send((send_message_buf, kill_msg.len()))
.map_err(|_| SmError::Bug("Failed to send stdout to main thread"))?;
result
}
fn child_process_stderr_thread(
stdout_send: mpsc::Sender<StdioSendBuf>,
terminate_channel: mpsc::Receiver<()>,
mut stderr: ChildStderr,
) {
loop {
match terminate_channel.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => return,
Err(TryRecvError::Empty) => {}
}
let mut stderr_buf = [0; STDIO_SEND_BUF_SIZE];
match stderr.read(&mut stderr_buf) {
Ok(0) => {}
Ok(n) => {
let result = stdout_send
.send((stderr_buf, n))
.map_err(|_| SmError::Bug("Failed to send stderr to main thread"));
if let Err(err) = result {
error!(error = %err);
}
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => {
error!(error = %err, "Error reading from stderr");
return;
}
};
}
}

View file

@ -1,16 +1,18 @@
mod child;
use crate::controller::child::child_process_thread;
use crate::model::config::Config;
use crate::model::{AppState, Service, ServiceStatus, SmError, SmResult, StdIoStream};
use crate::{view, App};
use crossterm::event;
use crossterm::event::{Event, KeyCode};
use std::collections::HashMap;
use std::io;
use std::io::{ErrorKind, Read, Write};
use std::process::{Child, Command, Stdio};
use std::sync::mpsc::TryRecvError;
use std::io::{ErrorKind, Write};
use std::process::{ChildStderr, Command, Stdio};
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use tracing::{error, info, trace_span};
use std::{io, thread};
use tracing::{error, info};
use tui::backend::Backend;
use tui::widgets::TableState;
use tui::Terminal;
@ -186,7 +188,7 @@ impl App {
fn start_service(&mut self, index: usize) -> SmResult {
let service = &mut self.table.services[index];
trace_span!("Starting service", name = %service.name);
info!(name = %service.name, "Starting service");
*service.status.lock()? = ServiceStatus::Running;
@ -223,11 +225,18 @@ impl App {
self.thread_terminates.insert(index, terminate_send);
let service_status = service.status.clone();
let service_name = service.name.clone();
let spawn_result = std::thread::Builder::new()
.name(format!("worker-{}", service.name))
let spawn_result = thread::Builder::new()
.name(format!("worker-({})", service.name))
.spawn(move || {
match child_process_thread(child, stdout_send, service_status, terminate_recv) {
match child_process_thread(
child,
stdout_send,
service_status,
service_name,
terminate_recv,
) {
Ok(_) => {}
Err(err) => {
error!(error = %err, "Error processing service");
@ -242,94 +251,3 @@ impl App {
Ok(())
}
}
fn child_process_thread(
child: Child,
stdout_send: mpsc::Sender<StdioSendBuf>,
service_status: Arc<Mutex<ServiceStatus>>,
terminate_channel: mpsc::Receiver<()>,
) -> SmResult {
let mut child = child;
let mut stdout = child
.stdout
.take()
.ok_or(SmError::Bug("Stdout of child could not be taken"))?;
let mut stderr = child
.stderr
.take()
.ok_or(SmError::Bug("Stderr of child could not be taken"))?;
let stdout_send_2 = stdout_send.clone();
std::thread::spawn(move || {
let mut stderr_buf = [0; STDIO_SEND_BUF_SIZE];
match stderr.read(&mut stderr_buf) {
Ok(0) => {}
Ok(n) => {
let result = stdout_send_2
.send((stderr_buf, n))
.map_err(|_| SmError::Bug("Failed to send stderr to main thread"));
if let Err(err) = result {
error!(error = %err);
}
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => error!(error = %err, "Error reading from stderr"),
};
});
let result = loop {
match terminate_channel.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => break Ok(()),
Err(TryRecvError::Empty) => {}
}
let mut stdout_buf = [0; STDIO_SEND_BUF_SIZE];
match stdout.read(&mut stdout_buf) {
Ok(0) => {}
Ok(n) => {
stdout_send
.send((stdout_buf, n))
.map_err(|_| SmError::Bug("Failed to send stdout to main thread"))?;
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => break Err(e.into()),
};
match child.try_wait() {
Ok(None) => {}
Ok(Some(status)) => {
let mut status_lock = service_status.lock().map_err(|_| SmError::MutexPoisoned)?;
*status_lock = match status.code() {
Some(0) => ServiceStatus::Exited,
Some(code) => ServiceStatus::Failed(code),
None => ServiceStatus::Killed,
};
return Ok(());
}
Err(e) => break Err(e.into()),
}
};
match child.kill() {
Ok(()) => {
*service_status.lock().map_err(|_| SmError::MutexPoisoned)? = ServiceStatus::Killed
}
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {}
Err(e) => return Err(e.into()),
}
let mut send_message_buf = [0; STDIO_SEND_BUF_SIZE];
let kill_msg = "\n\n<Process was killed>\n";
send_message_buf
.as_mut_slice()
.write_all(kill_msg.as_bytes())?;
stdout_send
.send((send_message_buf, kill_msg.len()))
.map_err(|_| SmError::Bug("Failed to send stdout to main thread"))?;
result
}

View file

@ -87,7 +87,7 @@ fn setup_logging() {
tracing_subscriber::fmt()
.with_timer(tracing_subscriber::fmt::time::uptime())
.with_ansi(false)
.pretty()
.with_thread_names(true)
.with_writer(log_file)
.init();