factor out auth

This commit is contained in:
nora 2024-08-25 16:14:55 +02:00
parent b0acf03502
commit 1c346659f6
7 changed files with 267 additions and 156 deletions

View file

@ -1,4 +1,4 @@
use cluelessh_connection::{ChannelKind, ChannelNumber, ChannelOperation, ChannelOperationKind};
use cluelessh_connection::{ChannelKind, ChannelNumber, ChannelOperation};
use std::{collections::HashMap, pin::Pin, sync::Arc};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -8,7 +8,7 @@ use futures::future::BoxFuture;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info, warn};
use crate::Channel;
use crate::{Channel, ChannelState, PendingChannel};
pub struct ClientConnection<S> {
stream: Pin<Box<S>>,
@ -27,14 +27,6 @@ pub struct ClientConnection<S> {
auth: ClientAuth,
}
enum ChannelState {
Pending {
ready_send: tokio::sync::oneshot::Sender<Result<(), String>>,
updates_send: tokio::sync::mpsc::Sender<ChannelUpdateKind>,
},
Ready(tokio::sync::mpsc::Sender<ChannelUpdateKind>),
}
pub struct ClientAuth {
pub username: String,
pub prompt_password: Arc<dyn Fn() -> BoxFuture<'static, Result<String>> + Send + Sync>,
@ -53,11 +45,6 @@ pub struct SignatureResult {
pub signature: Vec<u8>,
}
pub struct PendingChannel {
ready_recv: tokio::sync::oneshot::Receiver<Result<(), String>>,
channel: Channel,
}
impl<S: AsyncRead + AsyncWrite> ClientConnection<S> {
pub async fn connect(stream: S, auth: ClientAuth) -> Result<Self> {
let (operations_send, operations_recv) = tokio::sync::mpsc::channel(15);
@ -272,22 +259,3 @@ impl<S: AsyncRead + AsyncWrite> ClientConnection<S> {
}
}
}
impl PendingChannel {
pub async fn wait_ready(self) -> Result<Channel, Option<String>> {
match self.ready_recv.await {
Ok(Ok(())) => Ok(self.channel),
Ok(Err(err)) => Err(Some(err)),
Err(_) => Err(None),
}
}
}
impl Channel {
pub async fn send_operation(&mut self, op: ChannelOperationKind) -> Result<()> {
self.ops_send
.send(self.number.construct_op(op))
.await
.map_err(Into::into)
}
}

View file

@ -31,3 +31,25 @@ impl Channel {
&self.kind
}
}
enum ChannelState {
Pending {
ready_send: tokio::sync::oneshot::Sender<Result<(), String>>,
updates_send: tokio::sync::mpsc::Sender<ChannelUpdateKind>,
},
Ready(tokio::sync::mpsc::Sender<ChannelUpdateKind>),
}
pub struct PendingChannel {
ready_recv: tokio::sync::oneshot::Receiver<Result<(), String>>,
channel: Channel,
}
impl PendingChannel {
pub async fn wait_ready(self) -> Result<Channel, Option<String>> {
match self.ready_recv.await {
Ok(Ok(())) => Ok(self.channel),
Ok(Err(err)) => Err(Some(err)),
Err(_) => Err(None),
}
}
}

View file

@ -1,24 +1,30 @@
use cluelessh_connection::{ChannelKind, ChannelNumber, ChannelOperation};
use futures::future::BoxFuture;
use std::{
collections::{HashMap, VecDeque},
collections::{HashMap, HashSet, VecDeque},
net::SocketAddr,
pin::Pin,
sync::Arc,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
use cluelessh_protocol::{ChannelUpdateKind, SshStatus};
use eyre::{eyre, ContextCompat, Result, WrapErr};
use cluelessh_protocol::{
auth::{AuthOption, VerifyPassword, VerifyPubkey},
ChannelUpdateKind, SshStatus,
};
use eyre::{eyre, ContextCompat, OptionExt, Result, WrapErr};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use crate::Channel;
use crate::{Channel, ChannelState, PendingChannel};
pub struct ServerListener {
listener: TcpListener,
// todo ratelimits etc
auth_verify: ServerAuthVerify,
// TODO ratelimits etc
}
pub struct ServerConnection<S> {
@ -38,38 +44,27 @@ pub struct ServerConnection<S> {
/// New channels opened by the peer.
new_channels: VecDeque<Channel>,
}
enum ChannelState {
Pending {
ready_send: tokio::sync::oneshot::Sender<Result<(), String>>,
updates_send: tokio::sync::mpsc::Sender<ChannelUpdateKind>,
},
Ready(tokio::sync::mpsc::Sender<ChannelUpdateKind>),
auth_verify: ServerAuthVerify,
}
enum Operation {
VerifyPassword {
user: String,
password: String,
},
VerifyPubkey {
session_identifier: [u8; 32],
user: String,
pubkey: Vec<u8>,
},
VerifyPassword(Result<()>),
VerifyPubkey(Result<()>),
}
pub struct SignatureResult {
pub key_alg_name: &'static str,
pub public_key: Vec<u8>,
pub signature: Vec<u8>,
#[derive(Clone)]
pub struct ServerAuthVerify {
pub verify_password:
Option<Arc<dyn Fn(VerifyPassword) -> BoxFuture<'static, Result<()>> + Send + Sync>>,
pub verify_pubkey:
Option<Arc<dyn Fn(VerifyPubkey) -> BoxFuture<'static, Result<()>> + Send + Sync>>,
}
fn _assert_send_sync() {
fn send<T: Send + Sync>() {}
send::<ServerAuthVerify>();
}
pub struct PendingChannel {
ready_recv: tokio::sync::oneshot::Receiver<Result<(), String>>,
channel: Channel,
}
pub enum Error {
SshStatus(SshStatus),
ServerError(eyre::Report),
@ -81,22 +76,41 @@ impl From<eyre::Report> for Error {
}
impl ServerListener {
pub fn new(listener: TcpListener) -> Self {
Self { listener }
pub fn new(listener: TcpListener, auth_verify: ServerAuthVerify) -> Self {
Self {
listener,
auth_verify,
}
}
pub async fn accept(&mut self) -> Result<ServerConnection<TcpStream>> {
let (conn, peer_addr) = self.listener.accept().await?;
Ok(ServerConnection::new(conn, peer_addr))
Ok(ServerConnection::new(
conn,
peer_addr,
self.auth_verify.clone(),
))
}
}
impl<S: AsyncRead + AsyncWrite> ServerConnection<S> {
pub fn new(stream: S, peer_addr: SocketAddr) -> Self {
pub fn new(stream: S, peer_addr: SocketAddr, auth_verify: ServerAuthVerify) -> Self {
let (operations_send, operations_recv) = tokio::sync::mpsc::channel(15);
let (channel_ops_send, channel_ops_recv) = tokio::sync::mpsc::channel(15);
let mut options = HashSet::new();
if auth_verify.verify_password.is_some() {
options.insert(AuthOption::Password);
}
if auth_verify.verify_pubkey.is_some() {
options.insert(AuthOption::PublicKey);
}
if options.is_empty() {
panic!("no auth options provided");
}
Self {
stream: Box::pin(stream),
peer_addr,
@ -110,8 +124,10 @@ impl<S: AsyncRead + AsyncWrite> ServerConnection<S> {
cluelessh_transport::server::ServerConnection::new(
cluelessh_protocol::ThreadRngRand,
),
options,
),
new_channels: VecDeque::new(),
auth_verify,
}
}
@ -125,28 +141,28 @@ impl<S: AsyncRead + AsyncWrite> ServerConnection<S> {
if let Some(auth) = self.proto.auth() {
for req in auth.server_requests() {
match req {
cluelessh_protocol::auth::ServerRequest::VerifyPassword { user, password } => {
cluelessh_protocol::auth::ServerRequest::VerifyPassword(password_verify) => {
let send = self.operations_send.clone();
let verify = self
.auth_verify
.verify_password
.clone()
.ok_or_eyre("password auth not supported")?;
tokio::spawn(async move {
let _ = send
.send(Operation::VerifyPassword { user, password })
.await;
let result = verify(password_verify).await;
let _ = send.send(Operation::VerifyPassword(result)).await;
});
}
cluelessh_protocol::auth::ServerRequest::VerifyPubkey {
session_identifier,
pubkey,
user,
} => {
cluelessh_protocol::auth::ServerRequest::VerifyPubkey(pubkey_verify) => {
let send = self.operations_send.clone();
let verify = self
.auth_verify
.verify_pubkey
.clone()
.ok_or_eyre("pubkey auth not supported")?;
tokio::spawn(async move {
let _ = send
.send(Operation::VerifyPubkey {
session_identifier,
user,
pubkey,
})
.await;
let result = verify(pubkey_verify).await;
let _ = send.send(Operation::VerifyPubkey(result)).await;
});
}
}
@ -247,7 +263,7 @@ impl<S: AsyncRead + AsyncWrite> ServerConnection<S> {
let read = read.wrap_err("reading from connection")?;
if read == 0 {
info!("Did not read any bytes from TCP stream, EOF");
return Ok(());
return Err(Error::SshStatus(SshStatus::Disconnect));
}
if let Err(err) = self.proto.recv_bytes(&self.buf[..read]) {
return Err(Error::SshStatus(err));
@ -261,8 +277,12 @@ impl<S: AsyncRead + AsyncWrite> ServerConnection<S> {
}
op = self.operations_recv.recv() => {
match op {
Some(Operation::VerifyPubkey { .. }) => todo!(),
Some(Operation::VerifyPassword { .. }) => todo!(),
Some(Operation::VerifyPubkey(result)) => if let Some(auth) = self.proto.auth() {
auth.verification_result(result.is_ok());
},
Some(Operation::VerifyPassword(result)) => if let Some(auth) = self.proto.auth() {
auth.verification_result(result.is_ok());
},
None => {}
}
self.send_off_data().await?;
@ -315,13 +335,3 @@ impl<S: AsyncRead + AsyncWrite> ServerConnection<S> {
self.new_channels.pop_front()
}
}
impl PendingChannel {
pub async fn wait_ready(self) -> Result<Channel, Option<String>> {
match self.ready_recv.await {
Ok(Ok(())) => Ok(self.channel),
Ok(Err(err)) => Err(Some(err)),
Err(_) => Err(None),
}
}
}