From 2ad87d3a14b6c7817cc8e283348f47928350894c Mon Sep 17 00:00:00 2001 From: Noratrieb <48135649+Noratrieb@users.noreply.github.com> Date: Fri, 30 Aug 2024 22:25:09 +0200 Subject: [PATCH] Start implementing SFTP --- Cargo.lock | 16 +- bin/cluelesshd-sftp-server/Cargo.toml | 3 + bin/cluelesshd-sftp-server/src/main.rs | 103 ++++++++- bin/cluelesshd/src/connection.rs | 9 +- lib/cluelessh-format/src/lib.rs | 5 + lib/cluelessh-format/src/numbers.rs | 53 +++++ lib/cluelessh-sftp-proto/Cargo.toml | 10 - lib/cluelessh-sftp-proto/src/lib.rs | 1 - lib/cluelessh-sftp/Cargo.toml | 6 + lib/cluelessh-sftp/src/lib.rs | 290 ++++++++++++++++++++++++- lib/cluelessh-sftp/src/transport.rs | 85 ++++++++ 11 files changed, 549 insertions(+), 32 deletions(-) delete mode 100644 lib/cluelessh-sftp-proto/Cargo.toml delete mode 100644 lib/cluelessh-sftp-proto/src/lib.rs create mode 100644 lib/cluelessh-sftp/src/transport.rs diff --git a/Cargo.lock b/Cargo.lock index a21c207..2efbf79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -425,12 +425,13 @@ dependencies = [ [[package]] name = "cluelessh-sftp" version = "0.1.0" - -[[package]] -name = "cluelessh-sftp-proto" -version = "0.1.0" dependencies = [ + "cluelessh-format", "cluelessh-transport", + "eyre", + "rustix", + "tokio", + "tracing", ] [[package]] @@ -504,7 +505,10 @@ dependencies = [ name = "cluelesshd-sftp-server" version = "0.1.0" dependencies = [ + "cluelessh-sftp", "eyre", + "rustix", + "tokio", "tracing", "tracing-subscriber", ] @@ -1634,9 +1638,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.39.3" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", diff --git a/bin/cluelesshd-sftp-server/Cargo.toml b/bin/cluelesshd-sftp-server/Cargo.toml index 4012d5f..746788b 100644 --- a/bin/cluelesshd-sftp-server/Cargo.toml +++ b/bin/cluelesshd-sftp-server/Cargo.toml @@ -7,6 +7,9 @@ edition = "2021" eyre.workspace = true tracing.workspace = true tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +cluelessh-sftp = { path = "../../lib/cluelessh-sftp" } +tokio = "1.40.0" +rustix = { version = "0.38.35", features = ["stdio"] } [lints] workspace = true diff --git a/bin/cluelesshd-sftp-server/src/main.rs b/bin/cluelesshd-sftp-server/src/main.rs index 0ec0e9b..a09daba 100644 --- a/bin/cluelesshd-sftp-server/src/main.rs +++ b/bin/cluelesshd-sftp-server/src/main.rs @@ -1,8 +1,18 @@ -use eyre::Result; -use tracing::info; +use std::{ + fs::File, + io, + os::fd::OwnedFd, + pin::Pin, + task::{ready, Poll}, +}; + +use eyre::{Context, Result}; +use tokio::io::{unix::AsyncFd, AsyncRead, AsyncWrite}; +use tracing::debug; use tracing_subscriber::EnvFilter; -fn main() -> Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { let env_filter = EnvFilter::try_from_env("SFTP_LOG").unwrap_or_else(|_| EnvFilter::new("debug")); @@ -11,7 +21,90 @@ fn main() -> Result<()> { .with_env_filter(env_filter) .init(); - info!("mroooow!"); + let stdin = rustix::stdio::stdin().try_clone_to_owned()?; + let stdout = rustix::stdio::stdout().try_clone_to_owned()?; - Ok(()) + // Ensure that writing to stdout fails + if let Ok(full) = File::open("/dev/full") { + let _ = rustix::stdio::dup2_stdout(&full); + } + + let input = AsyncFdWrapper::from_fd(stdin)?; + let output = AsyncFdWrapper::from_fd(stdout)?; + + debug!("Starting SFTP server"); + + let mut server = cluelessh_sftp::SftpServer::new(input, output); + server.serve().await +} + +// TODO: Share with cluelesshd +struct AsyncFdWrapper { + fd: AsyncFd, +} + +impl AsyncFdWrapper { + fn from_fd(fd: OwnedFd) -> Result { + rustix::io::ioctl_fionbio(&fd, true).wrap_err("putting fd into nonblocking mode")?; + Ok(Self { + fd: AsyncFd::new(fd).wrap_err("failed to register async event")?, + }) + } +} + +impl AsyncRead for AsyncFdWrapper { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + loop { + let mut guard = ready!(self.fd.poll_read_ready(cx))?; + + let unfilled = buf.initialize_unfilled(); + match guard.try_io(|inner| { + rustix::io::read(inner.get_ref(), unfilled).map_err(io::Error::from) + }) { + Ok(Ok(len)) => { + buf.advance(len); + return Poll::Ready(Ok(())); + } + Ok(Err(err)) => return Poll::Ready(Err(err)), + Err(_would_block) => continue, + } + } + } +} + +impl AsyncWrite for AsyncFdWrapper { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + loop { + let mut guard = ready!(self.fd.poll_write_ready(cx))?; + + match guard + .try_io(|inner| rustix::io::write(inner.get_ref(), buf).map_err(io::Error::from)) + { + Ok(result) => return Poll::Ready(result), + Err(_would_block) => continue, + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } } diff --git a/bin/cluelesshd/src/connection.rs b/bin/cluelesshd/src/connection.rs index ba57dea..e025fc9 100644 --- a/bin/cluelesshd/src/connection.rs +++ b/bin/cluelesshd/src/connection.rs @@ -376,7 +376,10 @@ impl SessionState { if let Some(writer) = &mut self.writer { writer.shutdown().await?; } + // TODO: somehow this isn't enough to close an SFTP connection.... self.writer = None; + self.reader = None; + self.reader_ext = None; } ChannelUpdateKind::Open(_) | ChannelUpdateKind::Closed @@ -409,7 +412,11 @@ impl SessionState { Ok(()) } - async fn shell(&mut self, shell_command: Option, subsystem: Option) -> Result<()> { + async fn shell( + &mut self, + shell_command: Option, + subsystem: Option, + ) -> Result<()> { let mut fds = self .rpc_client .shell( diff --git a/lib/cluelessh-format/src/lib.rs b/lib/cluelessh-format/src/lib.rs index 70b545d..05644ef 100644 --- a/lib/cluelessh-format/src/lib.rs +++ b/lib/cluelessh-format/src/lib.rs @@ -14,6 +14,7 @@ impl std::error::Error for ParseError {} pub type Result = std::result::Result; +#[derive(Clone)] pub struct Reader<'a>(&'a [u8]); impl<'a> Reader<'a> { @@ -123,6 +124,10 @@ impl Writer { self.raw(&u32::to_be_bytes(v)); } + pub fn u64(&mut self, v: u64) { + self.raw(&u64::to_be_bytes(v)); + } + pub fn raw(&mut self, v: &[u8]) { self.0.extend_from_slice(v); } diff --git a/lib/cluelessh-format/src/numbers.rs b/lib/cluelessh-format/src/numbers.rs index 10299b4..129bfb2 100644 --- a/lib/cluelessh-format/src/numbers.rs +++ b/lib/cluelessh-format/src/numbers.rs @@ -111,3 +111,56 @@ consts! { } pub const SSH_EXTENDED_DATA_STDERR: u32 = 1; + +consts! { + u8, fn sftp_message_type_to_string, + const SSH_FXP_INIT = 1; + const SSH_FXP_VERSION = 2; + const SSH_FXP_OPEN = 3; + const SSH_FXP_CLOSE = 4; + const SSH_FXP_READ = 5; + const SSH_FXP_WRITE = 6; + const SSH_FXP_LSTAT = 7; + const SSH_FXP_FSTAT = 8; + const SSH_FXP_SETSTAT = 9; + const SSH_FXP_FSETSTAT = 10; + const SSH_FXP_OPENDIR = 11; + const SSH_FXP_READDIR = 12; + const SSH_FXP_REMOVE = 13; + const SSH_FXP_MKDIR = 14; + const SSH_FXP_RMDIR = 15; + const SSH_FXP_REALPATH = 16; + const SSH_FXP_STAT = 17; + const SSH_FXP_RENAME = 18; + const SSH_FXP_READLINK = 19; + const SSH_FXP_SYMLINK = 20; + const SSH_FXP_STATUS = 101; + const SSH_FXP_HANDLE = 102; + const SSH_FXP_DATA = 103; + const SSH_FXP_NAME = 104; + const SSH_FXP_ATTRS = 105; + const SSH_FXP_EXTENDED = 200; + const SSH_FXP_EXTENDED_REPLY = 201; +} + +consts! { + u32, fn sftp_error_code_to_string, + const SSH_FX_OK = 0; + const SSH_FX_EOF = 1; + const SSH_FX_NO_SUCH_FILE = 2; + const SSH_FX_PERMISSION_DENIED = 3; + const SSH_FX_FAILURE = 4; + const SSH_FX_BAD_MESSAGE = 5; + const SSH_FX_NO_CONNECTION = 6; + const SSH_FX_CONNECTION_LOST = 7; + const SSH_FX_OP_UNSUPPORTED = 8; +} + +consts! { + u32, fn sftp_file_attr_flag_to_string, + const SSH_FILEXFER_ATTR_SIZE = 0x00000001; + const SSH_FILEXFER_ATTR_UIDGID = 0x00000002; + const SSH_FILEXFER_ATTR_PERMISSIONS = 0x00000004; + const SSH_FILEXFER_ATTR_ACMODTIME = 0x00000008; + const SSH_FILEXFER_ATTR_EXTENDED = 0x80000000; +} diff --git a/lib/cluelessh-sftp-proto/Cargo.toml b/lib/cluelessh-sftp-proto/Cargo.toml deleted file mode 100644 index 672268e..0000000 --- a/lib/cluelessh-sftp-proto/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "cluelessh-sftp-proto" -version = "0.1.0" -edition = "2021" - -[dependencies] -cluelessh-transport = { path = "../cluelessh-transport" } - -[lints] -workspace = true diff --git a/lib/cluelessh-sftp-proto/src/lib.rs b/lib/cluelessh-sftp-proto/src/lib.rs deleted file mode 100644 index 8b13789..0000000 --- a/lib/cluelessh-sftp-proto/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/lib/cluelessh-sftp/Cargo.toml b/lib/cluelessh-sftp/Cargo.toml index a0d9a1e..5779220 100644 --- a/lib/cluelessh-sftp/Cargo.toml +++ b/lib/cluelessh-sftp/Cargo.toml @@ -4,6 +4,12 @@ version = "0.1.0" edition = "2021" [dependencies] +eyre.workspace = true +tokio = { version = "1.40.0", features = ["full"] } +cluelessh-transport = { path = "../cluelessh-transport" } +cluelessh-format = { path = "../cluelessh-format" } +tracing.workspace = true +rustix = { version = "0.38.35", features = ["fs"] } [lints] workspace = true diff --git a/lib/cluelessh-sftp/src/lib.rs b/lib/cluelessh-sftp/src/lib.rs index b93cf3f..9347fb9 100644 --- a/lib/cluelessh-sftp/src/lib.rs +++ b/lib/cluelessh-sftp/src/lib.rs @@ -1,14 +1,286 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right +mod transport; + +use std::{ + collections::HashMap, + io, + os::fd::OwnedFd, + path::Path, + pin::Pin, + sync::atomic::{AtomicU32, Ordering}, +}; + +use cluelessh_format::{numbers, Writer}; +use eyre::{bail, ensure, OptionExt, Result}; +use rustix::fs::{Mode, OFlags}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + sync::mpsc, +}; +use tracing::trace; +use transport::{Packet, PacketTransport}; + +pub struct SftpServer { + input: Pin>, + output: Pin>, + + state: SftpState, + + transport: PacketTransport, + + files: HashMap, + next_handle: AtomicU32, + + _events_send: mpsc::Sender, + events_recv: mpsc::Receiver, } -#[cfg(test)] -mod tests { - use super::*; +type Handle = u32; - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); +enum SftpState { + Init, + Open, +} + +const BUF_SIZE: usize = 1024; + +struct Event { + _data: Vec, +} + +impl SftpServer { + pub fn new( + input: impl AsyncRead + Send + Sync + 'static, + output: impl AsyncWrite + Send + Sync + 'static, + ) -> Self { + let (events_send, events_recv) = mpsc::channel(10); + Self { + input: Box::pin(input), + output: Box::pin(output), + + state: SftpState::Init, + + files: HashMap::new(), + next_handle: AtomicU32::new(0), + + transport: PacketTransport::new(), + _events_send: events_send, + events_recv, + } + } + + pub async fn serve(&mut self) -> Result<()> { + let mut buf = [0; BUF_SIZE]; + + loop { + tokio::select! { + read = self.input.read(&mut buf) => { + self.recv_byte(&buf[..read?]).await?; + } + _event = self.events_recv.recv() => { + todo!() + } + } + } + } + + async fn recv_byte(&mut self, bytes: &[u8]) -> Result<()> { + self.transport.recv_bytes(bytes)?; + + let packets = self.transport.packets(); + + for packet in packets { + let packet_type = packet.packet_type(); + let packet_type_string = numbers::sftp_message_type_to_string(packet_type); + trace!(%packet_type, %packet_type_string, packet_len = %packet.all_payload().len(), "Received packet"); + + if let SftpState::Init = self.state { + ensure!( + packet.packet_type() == numbers::SSH_FXP_INIT, + "Client did not send SSH_FXP_INIT" + ); + let mut p = packet.payload_reader(); + let version = p.u32()?; + ensure!( + version == 6 || version == 3, + "Unexpected version: {version}" + ); + // TODO: negotiate 6 nicely using the version-select extension + let mut w = Writer::new(); + w.u8(numbers::SSH_FXP_VERSION); + w.u32(3); // version + // newline extension + w.string("newline"); + w.string("\n"); + self.send_packet(w.finish()).await?; + self.state = SftpState::Open; + continue; + } + + let mut p = packet.payload_reader(); + + match packet_type { + numbers::SSH_FXP_CLOSE => { + let req_id = p.u32()?; + let _ = p.u32()?; + let handle = p.u32()?; + let Some(handle) = self.files.remove(&handle) else { + bail!("invalid handle"); + }; + drop(handle); + self.send_packet(status(req_id, numbers::SSH_FX_OK, "")) + .await?; + } + numbers::SSH_FXP_OPENDIR => { + let req_id = p.u32()?; + let path = p.utf8_string()?; + + // TODO: dont block lol + let result = + rustix::fs::open(path, OFlags::RDONLY | OFlags::DIRECTORY, Mode::empty()); + match result { + Ok(fd) => { + let handle = self.next_handle.fetch_add(1, Ordering::Relaxed); + self.files.insert(handle, fd); + let mut w = Writer::new(); + w.u8(numbers::SSH_FXP_HANDLE); + w.u32(req_id); + w.u32(4); // handle length + w.u32(handle); + self.send_packet(w.finish()).await?; + } + Err(err) => self.send_io_error(req_id, err.into()).await?, + } + } + numbers::SSH_FXP_READDIR => { + let req_id = p.u32()?; + let _ = p.u32()?; + let handle = p.u32()?; + let Some(handle) = self.files.get(&handle) else { + bail!("invalid handle"); + }; + let mut entries: Vec<(String, String, Attrs)> = Vec::new(); + let mut buf = Vec::with_capacity(8192); + let mut iter = rustix::fs::RawDir::new(handle, buf.spare_capacity_mut()); + while let Some(entry) = iter.next() { + let entry = entry?; // TODO: handle error + let name = entry.file_name().to_str()?.to_owned(); + entries.push((name.clone(), name, Attrs::default())); + } + + let mut w = Writer::new(); + w.u8(numbers::SSH_FXP_NAME); + w.u32(req_id); + w.u32(entries.len() as u32); + for entry in entries { + w.string(entry.0); + w.string(entry.1); + entry.2.encode(&mut w); + } + + self.send_packet(w.finish()).await?; + } + numbers::SSH_FXP_REALPATH => { + let req_id = p.u32()?; + let original_path = p.utf8_string()?; + + let p = Path::new(original_path).canonicalize(); + + match p { + Ok(p) => { + let mut w = Writer::new(); + w.u8(numbers::SSH_FXP_NAME); + w.u32(req_id); + w.u32(1); // count + + let filename = p + .as_os_str() + .to_str() + .ok_or_eyre("filename is invalid UTF-8")? + .as_bytes(); + w.string(filename); // filename + w.string(filename); // longname, TODO: this should be ls -l output lol + Attrs::default().encode(&mut w); // attrs, dummy + self.send_packet(w.finish()).await?; + } + Err(err) => self.send_io_error(req_id, err).await?, + } + } + _ => { + bail!("unknown packet: {packet_type_string} ({packet_type})") + } + } + } + + Ok(()) + } + + async fn send_io_error(&mut self, req_id: u32, err: io::Error) -> Result<()> { + self.send_packet(status(req_id, io_error_to_code(&err), &err.to_string())) + .await + } + + async fn send_packet(&mut self, body: impl AsRef<[u8]>) -> Result<()> { + let packet = Packet::from_body(body.as_ref()); + let packet_type = packet.packet_type(); + let packet_type_string = numbers::sftp_message_type_to_string(packet_type); + trace!(%packet_type, %packet_type_string, packet_len = %packet.all_payload().len(), "Sending packet"); + + // TODO: do this async... + self.output.write_all(packet.all_payload()).await?; + Ok(()) } } + +fn io_error_to_code(err: &io::Error) -> u32 { + match err.kind() { + io::ErrorKind::NotFound => numbers::SSH_FX_NO_SUCH_FILE, + io::ErrorKind::PermissionDenied => numbers::SSH_FX_PERMISSION_DENIED, + _ => numbers::SSH_FX_FAILURE, + } +} + +#[derive(Default)] +struct Attrs { + size: Option, + uid_gid: Option<(u32, u32)>, + permissions: Option, + atime_mtime: Option<(u32, u32)>, +} + +impl Attrs { + fn encode(&self, w: &mut Writer) { + use numbers::*; + + let flag = |bool, flag| if bool { flag } else { 0 }; + let flags = flag(self.size.is_some(), SSH_FILEXFER_ATTR_SIZE) + | flag(self.uid_gid.is_some(), SSH_FILEXFER_ATTR_UIDGID) + | flag(self.permissions.is_some(), SSH_FILEXFER_ATTR_PERMISSIONS) + | flag(self.atime_mtime.is_some(), SSH_FILEXFER_ATTR_ACMODTIME); + + w.u32(flags); + if let Some(size) = self.size { + w.u64(size); + }; + if let Some((uid, gid)) = self.uid_gid { + w.u32(uid); + w.u32(gid); + }; + if let Some(perm) = self.permissions { + w.u32(perm); + } + if let Some((atime, mtime)) = self.atime_mtime { + w.u32(atime); + w.u32(mtime); + } + } +} + +fn status(req_id: u32, code: u32, message: &str) -> Vec { + let mut w = Writer::new(); + w.u8(numbers::SSH_FXP_STATUS); + w.u32(req_id); + w.u32(code); + w.string(message); + w.string(""); + w.finish() +} diff --git a/lib/cluelessh-sftp/src/transport.rs b/lib/cluelessh-sftp/src/transport.rs new file mode 100644 index 0000000..d7959af --- /dev/null +++ b/lib/cluelessh-sftp/src/transport.rs @@ -0,0 +1,85 @@ +use std::collections::VecDeque; + +use cluelessh_format::{numbers, Reader}; +use cluelessh_transport::packet::PacketParser; +use eyre::{ensure, eyre, Result}; + +#[derive(Debug)] +pub struct Packet { + payload: Vec, +} + +impl Packet { + pub fn packet_type(&self) -> u8 { + self.payload[4] + } + + pub fn payload_reader(&self) -> Reader { + Reader::new(&&self.payload[5..]) + } + + pub fn all_payload(&self) -> &[u8] { + &self.payload + } + + pub fn from_body(body: &[u8]) -> Self { + let len = body.len() as u32; + let mut payload = Vec::new(); + payload.extend_from_slice(&u32::to_be_bytes(len)); + payload.extend_from_slice(&body); + Self { payload } + } +} + +pub struct PacketTransport { + parser: PacketParser, + packets: VecDeque, +} + +impl PacketTransport { + pub fn new() -> Self { + Self { + parser: PacketParser::new(), + packets: VecDeque::new(), + } + } + + pub fn packets(&mut self) -> impl IntoIterator { + std::mem::take(&mut self.packets) + } + + pub fn recv_bytes(&mut self, mut bytes: &[u8]) -> Result<()> { + while let Some(consumed) = self.recv_bytes_step(bytes)? { + bytes = &bytes[consumed..]; + if bytes.is_empty() { + break; + } + } + Ok(()) + } + + fn recv_bytes_step(&mut self, bytes: &[u8]) -> Result> { + let result = self + .parser + .recv_plaintext_bytes(bytes) + .map_err(|_| eyre!("invalid packet"))?; + + if let Some((consumed, result)) = result { + ensure!(result.len() > (4 + 1), "Empty packet"); + let packet = Packet { payload: result }; + if packet.packet_type() != numbers::SSH_FXP_INIT + && packet.packet_type() != numbers::SSH_FXP_VERSION + { + ensure!( + packet.all_payload().len() > (4 + 1 + 4), + "Missing request ID" + ); + } + self.packets.push_back(packet); + self.parser = PacketParser::new(); + return Ok(Some(consumed)); + } + + Ok(None) + } +}