This commit is contained in:
nora 2024-08-25 18:00:43 +02:00
parent 1c346659f6
commit 8114b5a195
9 changed files with 433 additions and 21 deletions

View file

@ -7,6 +7,11 @@ edition = "2021"
cluelessh-protocol = { path = "../../lib/cluelessh-protocol" }
cluelessh-tokio = { path = "../../lib/cluelessh-tokio" }
cluelessh-transport = { path = "../../lib/cluelessh-transport" }
tokio = { version = "1.39.2", features = ["full"] }
tracing.workspace = true
eyre.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }
rustix = { version = "0.38.34", features = ["pty", "termios", "procfs", "process", "stdio"] }
[lints]
workspace = true

View file

@ -1,3 +1,249 @@
fn main() {
println!("Hello, world!");
mod pty;
use std::{io, net::SocketAddr, process::ExitStatus, sync::Arc};
use cluelessh_tokio::{server::ServerAuthVerify, Channel};
use eyre::{bail, Context, Result};
use pty::Pty;
use rustix::termios::Winsize;
use tokio::{
net::{TcpListener, TcpStream},
process::Command,
sync::mpsc,
};
use tracing::{debug, error, info, info_span, warn, Instrument};
use cluelessh_protocol::{
connection::{ChannelKind, ChannelOperationKind, ChannelRequest},
ChannelUpdateKind, SshStatus,
};
use tracing_subscriber::EnvFilter;
#[tokio::main(flavor = "current_thread")]
async fn main() -> eyre::Result<()> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(env_filter).init();
let addr = "0.0.0.0:2222".to_owned();
let addr = addr
.parse::<SocketAddr>()
.wrap_err_with(|| format!("failed to parse listen addr '{addr}'"))?;
info!(%addr, "Starting server");
let listener = TcpListener::bind(addr).await.wrap_err("binding listener")?;
let auth_verify = ServerAuthVerify {
verify_password: Some(Arc::new(|auth| {
Box::pin(async move {
debug!(user = %auth.user, "Attempting password login");
// Don't worry queen, your password is correct!
warn!("Letting in unauthenticated user");
Ok(())
})
})),
verify_pubkey: None,
auth_banner: Some("welcome to my server!!!\r\ni hope you enjoy our stay.\r\n".to_owned()),
};
let mut listener = cluelessh_tokio::server::ServerListener::new(listener, auth_verify);
loop {
let next = listener.accept().await?;
let span = info_span!("connection", addr = %next.peer_addr());
tokio::spawn(
async move {
if let Err(err) = handle_connection(next).await {
if let Some(err) = err.downcast_ref::<std::io::Error>() {
if err.kind() == std::io::ErrorKind::ConnectionReset {
return;
}
}
error!(?err, "error handling connection");
}
info!("Finished connection");
}
.instrument(span),
);
}
}
async fn handle_connection(
mut conn: cluelessh_tokio::server::ServerConnection<TcpStream>,
) -> Result<()> {
info!(addr = %conn.peer_addr(), "Received a new connection");
loop {
match conn.progress().await {
Ok(()) => {}
Err(cluelessh_tokio::server::Error::ServerError(err)) => {
return Err(err);
}
Err(cluelessh_tokio::server::Error::SshStatus(status)) => match status {
SshStatus::PeerError(err) => {
info!(?err, "disconnecting client after invalid operation");
return Ok(());
}
SshStatus::Disconnect => {
info!("Received disconnect from client");
return Ok(());
}
},
}
while let Some(channel) = conn.next_new_channel() {
if *channel.kind() == ChannelKind::Session {
tokio::spawn(async {
let _ = handle_session_channel(channel).await;
});
} else {
warn!("Trying to open non-session channel");
}
}
}
}
struct SessionState {
pty: Option<Pty>,
channel: Channel,
process_exit_send: mpsc::Sender<Result<ExitStatus, io::Error>>,
process_exit_recv: mpsc::Receiver<Result<ExitStatus, io::Error>>,
}
async fn handle_session_channel(channel: Channel) -> Result<()> {
let (process_exit_send, process_exit_recv) = tokio::sync::mpsc::channel(1);
let mut state = SessionState {
pty: None,
channel,
process_exit_send,
process_exit_recv,
};
loop {
let pty_read = async {
match &mut state.pty {
Some(pty) => pty.ctrl_read_recv.recv().await,
// Ensure that if this is None, the future never finishes so the state update and process exit can progress.
None => loop {
tokio::task::yield_now().await;
},
}
};
tokio::select! {
update = state.channel.next_update() => {
match update {
Ok(update) => state.handle_channel_update(update).await?,
Err(err) => return Err(err),
}
}
exit = state.process_exit_recv.recv() => {
match exit {
Some(exit) => {
let exit = exit?;
state.channel.send(ChannelOperationKind::Eof).await?;
// TODO: also handle exit-signal
state.channel
.send(ChannelOperationKind::Request(ChannelRequest::ExitStatus {
status: exit.code().unwrap_or(0) as u32,
}))
.await?;
state.channel.send(ChannelOperationKind::Close).await?;
return Ok(());
}
None => {}
}
}
read = pty_read => {
let Some(read) = read else {
bail!("failed to read");
};
let _ = state.channel.send(ChannelOperationKind::Data(read)).await;
}
}
}
}
impl SessionState {
async fn handle_channel_update(&mut self, update: ChannelUpdateKind) -> Result<()> {
match update {
ChannelUpdateKind::Request(req) => {
let success = ChannelOperationKind::Success;
match req {
ChannelRequest::PtyReq {
want_reply,
term,
width_chars,
height_rows,
width_px,
height_px,
term_modes,
} => {
self.pty = Some(
pty::Pty::new(
term,
Winsize {
ws_row: height_rows as u16,
ws_col: width_chars as u16,
ws_xpixel: width_px as u16,
ws_ypixel: height_px as u16,
},
term_modes,
)
.await?,
);
if want_reply {
self.channel.send(success).await?;
}
}
ChannelRequest::Shell { want_reply } => {
let shell = "bash";
let mut cmd = Command::new(shell);
cmd.env_clear();
if let Some(pty) = &self.pty {
pty.start_session_for_command(&mut cmd)?;
}
// TODO: **user** home directory
cmd.current_dir(std::env::var("HOME").unwrap_or_else(|_| "/".to_owned()));
cmd.env("USER", "nora");
let mut shell = cmd.spawn()?;
let process_exit_send = self.process_exit_send.clone();
tokio::spawn(async move {
let result = shell.wait().await;
let _ = process_exit_send.send(result).await;
});
if want_reply {
self.channel.send(success).await?;
}
}
ChannelRequest::Exec { .. } => {
todo!()
}
ChannelRequest::ExitStatus { .. } => {}
ChannelRequest::Env { .. } => {}
};
}
ChannelUpdateKind::OpenFailed { .. } => todo!(),
ChannelUpdateKind::Data { data } => match &mut self.pty {
Some(pty) => {
pty.ctrl_write_send.send(data).await?;
}
None => {}
},
ChannelUpdateKind::Open(_)
| ChannelUpdateKind::Closed
| ChannelUpdateKind::ExtendedData { .. }
| ChannelUpdateKind::Eof
| ChannelUpdateKind::Success
| ChannelUpdateKind::Failure => { /* ignore */ }
}
Ok(())
}
}

113
bin/cluelesshd/src/pty.rs Normal file
View file

@ -0,0 +1,113 @@
//! PTY-related operations for setting up the session.
use std::{
io::{Read, Write},
os::fd::{AsRawFd, BorrowedFd, OwnedFd},
path::PathBuf,
};
use eyre::{Context, Result};
use rustix::{
fs::{Mode, OFlags},
pty::OpenptFlags,
termios::Winsize,
};
use tokio::{process::Command, sync::mpsc, task::JoinHandle};
pub struct Pty {
term: String,
#[expect(dead_code)]
writer_handle: JoinHandle<()>,
#[expect(dead_code)]
reader_handle: JoinHandle<()>,
pub ctrl_write_send: mpsc::Sender<Vec<u8>>,
pub ctrl_read_recv: mpsc::Receiver<Vec<u8>>,
user_pty: OwnedFd,
}
impl Pty {
pub async fn new(term: String, winsize: Winsize, modes: Vec<u8>) -> Result<Self> {
tokio::task::spawn_blocking(move || Self::new_blocking(term, winsize, modes)).await?
}
pub fn new_blocking(term: String, winsize: Winsize, modes: Vec<u8>) -> Result<Self> {
// Create new PTY:
let controller = rustix::pty::openpt(OpenptFlags::RDWR | OpenptFlags::NOCTTY)
.wrap_err("opening controller pty")?;
rustix::pty::unlockpt(&controller).wrap_err("unlocking pty")?;
let user_pty_name = rustix::pty::ptsname(&controller, Vec::new())?;
let user_pty_name =
std::str::from_utf8(user_pty_name.as_bytes()).wrap_err("pty name is invalid UTF-8")?;
let user_pty_name = PathBuf::from(user_pty_name);
let user_pty =
rustix::fs::open(user_pty_name, OFlags::RDWR | OFlags::NOCTTY, Mode::empty())?;
// Configure terminal:
rustix::termios::tcsetwinsize(&user_pty, winsize)?;
let termios = rustix::termios::tcgetattr(&user_pty)?;
// TODO: set modes
// <https://datatracker.ietf.org/doc/html/rfc4254#section-8>
let _ = termios;
let _ = modes;
rustix::termios::tcsetattr(&user_pty, rustix::termios::OptionalActions::Flush, &termios)?;
// Set up communication threads:
let mut controller_read = std::fs::File::from(controller);
let mut controller_write = controller_read.try_clone()?;
let (ctrl_write_send, mut ctrl_write_recv) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
let (ctrl_read_send, ctrl_read_recv) = tokio::sync::mpsc::channel::<Vec<u8>>(10);
let writer_handle = tokio::task::spawn_blocking(move || {
while let Some(write) = ctrl_write_recv.blocking_recv() {
let _ = controller_write.write_all(&write);
}
});
let reader_handle = tokio::task::spawn_blocking(move || {
let mut buf = [0; 1024];
loop {
let Ok(read) = controller_read.read(&mut buf) else {
return;
};
let Ok(_) = ctrl_read_send.blocking_send(buf[..read].to_vec()) else {
return;
};
}
});
Ok(Self {
term,
writer_handle,
reader_handle,
ctrl_write_send,
ctrl_read_recv,
user_pty,
})
}
pub fn start_session_for_command(&self, cmd: &mut Command) -> Result<()> {
let user_pty = self.user_pty.as_raw_fd();
unsafe {
cmd.pre_exec(move || {
let user_pty = BorrowedFd::borrow_raw(user_pty);
rustix::pty::grantpt(user_pty)?;
let pid = rustix::process::setsid()?;
rustix::process::ioctl_tiocsctty(user_pty)?; // Set as the current controlling tty
rustix::termios::tcsetpgrp(user_pty, pid)?; // Set current process as tty controller
// Setup stdio with PTY.
rustix::stdio::dup2_stdin(user_pty)?;
rustix::stdio::dup2_stdout(user_pty)?;
rustix::stdio::dup2_stderr(user_pty)?;
Ok(())
});
cmd.env("TERM", &self.term);
}
Ok(())
}
}