error handling

This commit is contained in:
nora 2024-08-25 21:43:21 +02:00
parent b38b1d955b
commit ae425fdefa
4 changed files with 133 additions and 113 deletions

View file

@ -13,6 +13,7 @@ eyre.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
rustix = { version = "0.38.34", features = ["pty", "termios", "procfs", "process", "stdio"] }
users = "0.11.0"
futures = "0.3.30"
[lints]
workspace = true

View file

@ -7,6 +7,8 @@ use eyre::{bail, Context, OptionExt, Result};
use pty::Pty;
use rustix::termios::Winsize;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
process::Command,
sync::mpsc,
@ -83,20 +85,31 @@ async fn handle_connection(
) -> Result<()> {
info!(addr = %conn.peer_addr(), "Received a new connection");
let mut channel_tasks = Vec::new();
loop {
match conn.progress().await {
Ok(()) => {}
Err(cluelessh_tokio::server::Error::ServerError(err)) => {
return Err(err);
}
Err(cluelessh_tokio::server::Error::SshStatus(status)) => match status {
SshStatus::PeerError(err) => {
info!(?err, "disconnecting client after invalid operation");
return Ok(());
tokio::select! {
step = conn.progress() => match step {
Ok(()) => {}
Err(cluelessh_tokio::server::Error::ServerError(err)) => {
return Err(err);
}
SshStatus::Disconnect => {
info!("Received disconnect from client");
return Ok(());
Err(cluelessh_tokio::server::Error::SshStatus(status)) => match status {
SshStatus::PeerError(err) => {
info!(?err, "disconnecting client after invalid operation");
return Ok(());
}
SshStatus::Disconnect => {
info!("Received disconnect from client");
return Ok(());
}
},
},
result = futures::future::try_join_all(&mut channel_tasks), if channel_tasks.len() > 0 => {
debug!(?result, "error!");
match result {
Ok(_) => channel_tasks.clear(),
Err(err) => return Err(err as eyre::Report),
}
},
}
@ -104,12 +117,11 @@ async fn handle_connection(
while let Some(channel) = conn.next_new_channel() {
let user = conn.inner().authenticated_user().unwrap().to_owned();
if *channel.kind() == ChannelKind::Session {
tokio::spawn(async move {
let result = handle_session_channel(user, channel).await;
if let Err(err) = result {
error!(?err);
}
});
let channel_task = tokio::spawn(handle_session_channel(user, channel));
channel_tasks.push(Box::pin(async {
let result = channel_task.await;
result.wrap_err("task panicked").and_then(|result| result)
}));
} else {
warn!("Trying to open non-session channel");
}
@ -123,6 +135,9 @@ struct SessionState {
channel: Channel,
process_exit_send: mpsc::Sender<Result<ExitStatus, io::Error>>,
process_exit_recv: mpsc::Receiver<Result<ExitStatus, io::Error>>,
writer: Option<File>,
reader: Option<File>,
}
async fn handle_session_channel(user: String, channel: Channel) -> Result<()> {
@ -134,12 +149,16 @@ async fn handle_session_channel(user: String, channel: Channel) -> Result<()> {
channel,
process_exit_send,
process_exit_recv,
writer: None,
reader: None,
};
let mut read_buf = [0; 1024];
loop {
let pty_read = async {
match &mut state.pty {
Some(pty) => pty.ctrl_read_recv.recv().await,
let read = async {
match &mut state.reader {
Some(file) => file.read(&mut read_buf).await,
// Ensure that if this is None, the future never finishes so the state update and process exit can progress.
None => loop {
tokio::task::yield_now().await;
@ -170,11 +189,11 @@ async fn handle_session_channel(user: String, channel: Channel) -> Result<()> {
None => {}
}
}
read = pty_read => {
let Some(read) = read else {
read = read => {
let Ok(read) = read else {
bail!("failed to read");
};
let _ = state.channel.send(ChannelOperationKind::Data(read)).await;
let _ = state.channel.send(ChannelOperationKind::Data(read_buf[..read].to_vec())).await;
}
}
}
@ -184,7 +203,6 @@ impl SessionState {
async fn handle_channel_update(&mut self, update: ChannelUpdateKind) -> Result<()> {
match update {
ChannelUpdateKind::Request(req) => {
let success = ChannelOperationKind::Success;
match req {
ChannelRequest::PtyReq {
want_reply,
@ -195,8 +213,8 @@ impl SessionState {
height_px,
term_modes,
} => {
self.pty = Some(
pty::Pty::new(
match self
.pty_req(
term,
Winsize {
ws_row: height_rows as u16,
@ -206,46 +224,34 @@ impl SessionState {
},
term_modes,
)
.await?,
);
if want_reply {
self.channel.send(success).await?;
.await
{
Ok(()) => {
if want_reply {
self.channel.send(ChannelOperationKind::Success).await?;
}
}
Err(err) => {
debug!(%err, "Failed to open PTY");
if want_reply {
self.channel.send(ChannelOperationKind::Failure).await?;
}
}
}
}
ChannelRequest::Shell { want_reply } => {
let user = self.user.clone();
let user =
tokio::task::spawn_blocking(move || users::get_user_by_name(&user))
.await?
.ok_or_eyre("failed to find user")?;
let shell = user.shell();
let mut cmd = Command::new(shell);
cmd.env_clear();
if let Some(pty) = &self.pty {
pty.start_session_for_command(&mut cmd)?;
ChannelRequest::Shell { want_reply } => match self.shell().await {
Ok(()) => {
if want_reply {
self.channel.send(ChannelOperationKind::Success).await?;
}
}
// TODO: **user** home directory
cmd.current_dir(user.home_dir());
cmd.env("USER", user.name());
cmd.uid(user.uid());
cmd.gid(user.primary_group_id());
debug!(cmd = %shell.display(), uid = %user.uid(), gid = %user.primary_group_id(), "Executing process");
let mut shell = cmd.spawn()?;
let process_exit_send = self.process_exit_send.clone();
tokio::spawn(async move {
let result = shell.wait().await;
let _ = process_exit_send.send(result).await;
});
if want_reply {
self.channel.send(success).await?;
Err(err) => {
debug!(%err, "Failed to spawn shell");
if want_reply {
self.channel.send(ChannelOperationKind::Failure).await?;
}
}
}
},
ChannelRequest::Exec { .. } => {
todo!()
}
@ -254,9 +260,9 @@ impl SessionState {
};
}
ChannelUpdateKind::OpenFailed { .. } => todo!(),
ChannelUpdateKind::Data { data } => match &mut self.pty {
ChannelUpdateKind::Data { data } => match &mut self.writer {
Some(pty) => {
pty.ctrl_write_send.send(data).await?;
pty.write_all(&data).await?;
}
None => {}
},
@ -269,4 +275,48 @@ impl SessionState {
}
Ok(())
}
async fn pty_req(&mut self, term: String, winsize: Winsize, term_modes: Vec<u8>) -> Result<()> {
let pty = pty::Pty::new(term, winsize, term_modes).await?;
let controller = pty.controller().try_clone_to_owned()?;
self.pty = Some(pty);
self.writer = Some(File::from_std(std::fs::File::from(controller.try_clone()?)));
self.reader = Some(File::from_std(std::fs::File::from(controller)));
Ok(())
}
async fn shell(&mut self) -> Result<()> {
let user = self.user.clone();
let user = tokio::task::spawn_blocking(move || users::get_user_by_name(&user))
.await?
.ok_or_eyre("failed to find user")?;
let shell = user.shell();
let mut cmd = Command::new(shell);
cmd.env_clear();
if let Some(pty) = &self.pty {
pty.start_session_for_command(&mut cmd)?;
}
// TODO: **user** home directory
cmd.current_dir(user.home_dir());
cmd.env("USER", user.name());
cmd.uid(user.uid());
cmd.gid(user.primary_group_id());
debug!(cmd = %shell.display(), uid = %user.uid(), gid = %user.primary_group_id(), "Executing process");
let mut shell = cmd.spawn()?;
let process_exit_send = self.process_exit_send.clone();
tokio::spawn(async move {
let result = shell.wait().await;
let _ = process_exit_send.send(result).await;
});
debug!("Successfully spawned shell");
Ok(())
}
}

View file

@ -1,9 +1,6 @@
//! PTY-related operations for setting up the session.
use std::{
io::{Read, Write},
os::fd::{AsRawFd, BorrowedFd, OwnedFd},
};
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use eyre::{Context, Result};
use rustix::{
@ -11,17 +8,13 @@ use rustix::{
pty::OpenptFlags,
termios::Winsize,
};
use tokio::{process::Command, sync::mpsc, task::JoinHandle};
use tokio::process::Command;
pub struct Pty {
term: String,
#[expect(dead_code)]
writer_handle: JoinHandle<()>,
#[expect(dead_code)]
reader_handle: JoinHandle<()>,
pub ctrl_write_send: mpsc::Sender<Vec<u8>>,
pub ctrl_read_recv: mpsc::Receiver<Vec<u8>>,
controller: OwnedFd,
user_pty: OwnedFd,
user_pty_name: String,
}
@ -53,56 +46,31 @@ impl Pty {
let _ = modes;
rustix::termios::tcsetattr(&user_pty, rustix::termios::OptionalActions::Flush, &termios)?;
// Set up communication threads:
let mut controller_read = std::fs::File::from(controller);
let mut controller_write = controller_read.try_clone()?;
let (ctrl_write_send, mut ctrl_write_recv) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
let (ctrl_read_send, ctrl_read_recv) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
let writer_handle = tokio::task::spawn_blocking(move || {
while let Some(write) = ctrl_write_recv.blocking_recv() {
let _ = controller_write.write_all(&write);
}
});
let reader_handle = tokio::task::spawn_blocking(move || {
let mut buf = [0; 1024];
loop {
let Ok(read) = controller_read.read(&mut buf) else {
return;
};
let Ok(_) = ctrl_read_send.blocking_send(buf[..read].to_vec()) else {
return;
};
}
});
Ok(Self {
term,
writer_handle,
reader_handle,
ctrl_write_send,
ctrl_read_recv,
controller,
user_pty,
user_pty_name,
})
}
pub fn controller(&self) -> BorrowedFd<'_> {
self.controller.as_fd()
}
pub fn start_session_for_command(&self, cmd: &mut Command) -> Result<()> {
let user_pty = self.user_pty.as_raw_fd();
let user_pty = self.user_pty.try_clone()?;
unsafe {
cmd.pre_exec(move || {
let user_pty = BorrowedFd::borrow_raw(user_pty);
rustix::pty::grantpt(user_pty)?;
rustix::pty::grantpt(&user_pty)?;
let pid = rustix::process::setsid()?;
rustix::process::ioctl_tiocsctty(user_pty)?; // Set as the current controlling tty
rustix::termios::tcsetpgrp(user_pty, pid)?; // Set current process as tty controller
rustix::process::ioctl_tiocsctty(&user_pty)?; // Set as the current controlling tty
rustix::termios::tcsetpgrp(&user_pty, pid)?; // Set current process as tty controller
// Setup stdio with PTY.
rustix::stdio::dup2_stdin(user_pty)?;
rustix::stdio::dup2_stdout(user_pty)?;
rustix::stdio::dup2_stderr(user_pty)?;
rustix::stdio::dup2_stdin(&user_pty)?;
rustix::stdio::dup2_stdout(&user_pty)?;
rustix::stdio::dup2_stderr(&user_pty)?;
Ok(())
});