From ae425fdefae5154a883c9107e86dc9ba29bd63ee Mon Sep 17 00:00:00 2001 From: Noratrieb <48135649+Noratrieb@users.noreply.github.com> Date: Sun, 25 Aug 2024 21:43:21 +0200 Subject: [PATCH] error handling --- Cargo.lock | 1 + bin/cluelesshd/Cargo.toml | 1 + bin/cluelesshd/src/main.rs | 180 +++++++++++++++++++++++-------------- bin/cluelesshd/src/pty.rs | 64 ++++--------- 4 files changed, 133 insertions(+), 113 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43b560b..562201e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,6 +434,7 @@ dependencies = [ "cluelessh-tokio", "cluelessh-transport", "eyre", + "futures", "rustix", "tokio", "tracing", diff --git a/bin/cluelesshd/Cargo.toml b/bin/cluelesshd/Cargo.toml index caef800..2cf40d6 100644 --- a/bin/cluelesshd/Cargo.toml +++ b/bin/cluelesshd/Cargo.toml @@ -13,6 +13,7 @@ eyre.workspace = true tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } rustix = { version = "0.38.34", features = ["pty", "termios", "procfs", "process", "stdio"] } users = "0.11.0" +futures = "0.3.30" [lints] workspace = true diff --git a/bin/cluelesshd/src/main.rs b/bin/cluelesshd/src/main.rs index 88c544b..1a54060 100644 --- a/bin/cluelesshd/src/main.rs +++ b/bin/cluelesshd/src/main.rs @@ -7,6 +7,8 @@ use eyre::{bail, Context, OptionExt, Result}; use pty::Pty; use rustix::termios::Winsize; use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, process::Command, sync::mpsc, @@ -83,20 +85,31 @@ async fn handle_connection( ) -> Result<()> { info!(addr = %conn.peer_addr(), "Received a new connection"); + let mut channel_tasks = Vec::new(); + loop { - match conn.progress().await { - Ok(()) => {} - Err(cluelessh_tokio::server::Error::ServerError(err)) => { - return Err(err); - } - Err(cluelessh_tokio::server::Error::SshStatus(status)) => match status { - SshStatus::PeerError(err) => { - info!(?err, "disconnecting client after invalid operation"); - return Ok(()); + tokio::select! { + step = conn.progress() => match step { + Ok(()) => {} + Err(cluelessh_tokio::server::Error::ServerError(err)) => { + return Err(err); } - SshStatus::Disconnect => { - info!("Received disconnect from client"); - return Ok(()); + Err(cluelessh_tokio::server::Error::SshStatus(status)) => match status { + SshStatus::PeerError(err) => { + info!(?err, "disconnecting client after invalid operation"); + return Ok(()); + } + SshStatus::Disconnect => { + info!("Received disconnect from client"); + return Ok(()); + } + }, + }, + result = futures::future::try_join_all(&mut channel_tasks), if channel_tasks.len() > 0 => { + debug!(?result, "error!"); + match result { + Ok(_) => channel_tasks.clear(), + Err(err) => return Err(err as eyre::Report), } }, } @@ -104,12 +117,11 @@ async fn handle_connection( while let Some(channel) = conn.next_new_channel() { let user = conn.inner().authenticated_user().unwrap().to_owned(); if *channel.kind() == ChannelKind::Session { - tokio::spawn(async move { - let result = handle_session_channel(user, channel).await; - if let Err(err) = result { - error!(?err); - } - }); + let channel_task = tokio::spawn(handle_session_channel(user, channel)); + channel_tasks.push(Box::pin(async { + let result = channel_task.await; + result.wrap_err("task panicked").and_then(|result| result) + })); } else { warn!("Trying to open non-session channel"); } @@ -123,6 +135,9 @@ struct SessionState { channel: Channel, process_exit_send: mpsc::Sender>, process_exit_recv: mpsc::Receiver>, + + writer: Option, + reader: Option, } async fn handle_session_channel(user: String, channel: Channel) -> Result<()> { @@ -134,12 +149,16 @@ async fn handle_session_channel(user: String, channel: Channel) -> Result<()> { channel, process_exit_send, process_exit_recv, + writer: None, + reader: None, }; + let mut read_buf = [0; 1024]; + loop { - let pty_read = async { - match &mut state.pty { - Some(pty) => pty.ctrl_read_recv.recv().await, + let read = async { + match &mut state.reader { + Some(file) => file.read(&mut read_buf).await, // Ensure that if this is None, the future never finishes so the state update and process exit can progress. None => loop { tokio::task::yield_now().await; @@ -170,11 +189,11 @@ async fn handle_session_channel(user: String, channel: Channel) -> Result<()> { None => {} } } - read = pty_read => { - let Some(read) = read else { + read = read => { + let Ok(read) = read else { bail!("failed to read"); }; - let _ = state.channel.send(ChannelOperationKind::Data(read)).await; + let _ = state.channel.send(ChannelOperationKind::Data(read_buf[..read].to_vec())).await; } } } @@ -184,7 +203,6 @@ impl SessionState { async fn handle_channel_update(&mut self, update: ChannelUpdateKind) -> Result<()> { match update { ChannelUpdateKind::Request(req) => { - let success = ChannelOperationKind::Success; match req { ChannelRequest::PtyReq { want_reply, @@ -195,8 +213,8 @@ impl SessionState { height_px, term_modes, } => { - self.pty = Some( - pty::Pty::new( + match self + .pty_req( term, Winsize { ws_row: height_rows as u16, @@ -206,46 +224,34 @@ impl SessionState { }, term_modes, ) - .await?, - ); - if want_reply { - self.channel.send(success).await?; + .await + { + Ok(()) => { + if want_reply { + self.channel.send(ChannelOperationKind::Success).await?; + } + } + Err(err) => { + debug!(%err, "Failed to open PTY"); + if want_reply { + self.channel.send(ChannelOperationKind::Failure).await?; + } + } } } - ChannelRequest::Shell { want_reply } => { - let user = self.user.clone(); - let user = - tokio::task::spawn_blocking(move || users::get_user_by_name(&user)) - .await? - .ok_or_eyre("failed to find user")?; - - let shell = user.shell(); - - let mut cmd = Command::new(shell); - cmd.env_clear(); - - if let Some(pty) = &self.pty { - pty.start_session_for_command(&mut cmd)?; + ChannelRequest::Shell { want_reply } => match self.shell().await { + Ok(()) => { + if want_reply { + self.channel.send(ChannelOperationKind::Success).await?; + } } - - // TODO: **user** home directory - cmd.current_dir(user.home_dir()); - cmd.env("USER", user.name()); - cmd.uid(user.uid()); - cmd.gid(user.primary_group_id()); - debug!(cmd = %shell.display(), uid = %user.uid(), gid = %user.primary_group_id(), "Executing process"); - - let mut shell = cmd.spawn()?; - - let process_exit_send = self.process_exit_send.clone(); - tokio::spawn(async move { - let result = shell.wait().await; - let _ = process_exit_send.send(result).await; - }); - if want_reply { - self.channel.send(success).await?; + Err(err) => { + debug!(%err, "Failed to spawn shell"); + if want_reply { + self.channel.send(ChannelOperationKind::Failure).await?; + } } - } + }, ChannelRequest::Exec { .. } => { todo!() } @@ -254,9 +260,9 @@ impl SessionState { }; } ChannelUpdateKind::OpenFailed { .. } => todo!(), - ChannelUpdateKind::Data { data } => match &mut self.pty { + ChannelUpdateKind::Data { data } => match &mut self.writer { Some(pty) => { - pty.ctrl_write_send.send(data).await?; + pty.write_all(&data).await?; } None => {} }, @@ -269,4 +275,48 @@ impl SessionState { } Ok(()) } + + async fn pty_req(&mut self, term: String, winsize: Winsize, term_modes: Vec) -> Result<()> { + let pty = pty::Pty::new(term, winsize, term_modes).await?; + let controller = pty.controller().try_clone_to_owned()?; + + self.pty = Some(pty); + self.writer = Some(File::from_std(std::fs::File::from(controller.try_clone()?))); + self.reader = Some(File::from_std(std::fs::File::from(controller))); + Ok(()) + } + + async fn shell(&mut self) -> Result<()> { + let user = self.user.clone(); + let user = tokio::task::spawn_blocking(move || users::get_user_by_name(&user)) + .await? + .ok_or_eyre("failed to find user")?; + + let shell = user.shell(); + + let mut cmd = Command::new(shell); + cmd.env_clear(); + + if let Some(pty) = &self.pty { + pty.start_session_for_command(&mut cmd)?; + } + + // TODO: **user** home directory + cmd.current_dir(user.home_dir()); + cmd.env("USER", user.name()); + cmd.uid(user.uid()); + cmd.gid(user.primary_group_id()); + + debug!(cmd = %shell.display(), uid = %user.uid(), gid = %user.primary_group_id(), "Executing process"); + + let mut shell = cmd.spawn()?; + + let process_exit_send = self.process_exit_send.clone(); + tokio::spawn(async move { + let result = shell.wait().await; + let _ = process_exit_send.send(result).await; + }); + debug!("Successfully spawned shell"); + Ok(()) + } } diff --git a/bin/cluelesshd/src/pty.rs b/bin/cluelesshd/src/pty.rs index f9c0a7f..1cf6e05 100644 --- a/bin/cluelesshd/src/pty.rs +++ b/bin/cluelesshd/src/pty.rs @@ -1,9 +1,6 @@ //! PTY-related operations for setting up the session. -use std::{ - io::{Read, Write}, - os::fd::{AsRawFd, BorrowedFd, OwnedFd}, -}; +use std::os::fd::{AsFd, BorrowedFd, OwnedFd}; use eyre::{Context, Result}; use rustix::{ @@ -11,17 +8,13 @@ use rustix::{ pty::OpenptFlags, termios::Winsize, }; -use tokio::{process::Command, sync::mpsc, task::JoinHandle}; +use tokio::process::Command; pub struct Pty { term: String, - #[expect(dead_code)] - writer_handle: JoinHandle<()>, - #[expect(dead_code)] - reader_handle: JoinHandle<()>, - pub ctrl_write_send: mpsc::Sender>, - pub ctrl_read_recv: mpsc::Receiver>, + controller: OwnedFd, + user_pty: OwnedFd, user_pty_name: String, } @@ -53,56 +46,31 @@ impl Pty { let _ = modes; rustix::termios::tcsetattr(&user_pty, rustix::termios::OptionalActions::Flush, &termios)?; - // Set up communication threads: - let mut controller_read = std::fs::File::from(controller); - let mut controller_write = controller_read.try_clone()?; - - let (ctrl_write_send, mut ctrl_write_recv) = tokio::sync::mpsc::channel::>(10); - let (ctrl_read_send, ctrl_read_recv) = tokio::sync::mpsc::channel::>(10); - - let writer_handle = tokio::task::spawn_blocking(move || { - while let Some(write) = ctrl_write_recv.blocking_recv() { - let _ = controller_write.write_all(&write); - } - }); - - let reader_handle = tokio::task::spawn_blocking(move || { - let mut buf = [0; 1024]; - loop { - let Ok(read) = controller_read.read(&mut buf) else { - return; - }; - let Ok(_) = ctrl_read_send.blocking_send(buf[..read].to_vec()) else { - return; - }; - } - }); - Ok(Self { term, - writer_handle, - reader_handle, - ctrl_write_send, - ctrl_read_recv, + controller, user_pty, user_pty_name, }) } + pub fn controller(&self) -> BorrowedFd<'_> { + self.controller.as_fd() + } + pub fn start_session_for_command(&self, cmd: &mut Command) -> Result<()> { - let user_pty = self.user_pty.as_raw_fd(); + let user_pty = self.user_pty.try_clone()?; unsafe { cmd.pre_exec(move || { - let user_pty = BorrowedFd::borrow_raw(user_pty); - rustix::pty::grantpt(user_pty)?; + rustix::pty::grantpt(&user_pty)?; let pid = rustix::process::setsid()?; - rustix::process::ioctl_tiocsctty(user_pty)?; // Set as the current controlling tty - rustix::termios::tcsetpgrp(user_pty, pid)?; // Set current process as tty controller + rustix::process::ioctl_tiocsctty(&user_pty)?; // Set as the current controlling tty + rustix::termios::tcsetpgrp(&user_pty, pid)?; // Set current process as tty controller // Setup stdio with PTY. - rustix::stdio::dup2_stdin(user_pty)?; - rustix::stdio::dup2_stdout(user_pty)?; - rustix::stdio::dup2_stderr(user_pty)?; + rustix::stdio::dup2_stdin(&user_pty)?; + rustix::stdio::dup2_stdout(&user_pty)?; + rustix::stdio::dup2_stderr(&user_pty)?; Ok(()) });