add tests and fixes

This commit is contained in:
nora 2024-08-26 21:40:49 +02:00
parent 688394cac9
commit a59bcb069d
10 changed files with 202 additions and 37 deletions

View file

@ -58,7 +58,8 @@ struct Channel {
/// Queued data that we want to send, but have not been able to because of the window limits.
/// Whenever we get more window space, we will send this data.
queued_data: Vec<u8>,
queued_data_default: Vec<u8>,
queued_data_extended: HashMap<u32, Vec<u8>>,
}
/// An update from a channel.
@ -133,6 +134,7 @@ pub enum ChannelOperationKind {
Success,
Failure,
Data(Vec<u8>),
ExtendedData(u32, Vec<u8>),
Request(ChannelRequest),
Eof,
Close,
@ -212,7 +214,8 @@ impl ChannelsState {
our_window_size: initial_window_size,
our_window_size_increase_step: initial_window_size,
queued_data: Vec::new(),
queued_data_default: Vec::new(),
queued_data_extended: HashMap::new(),
}),
);
@ -255,7 +258,8 @@ impl ChannelsState {
our_window_size,
our_window_size_increase_step: our_window_size,
queued_data: Vec::new(),
queued_data_default: Vec::new(),
queued_data_extended: HashMap::new(),
}),
);
@ -297,11 +301,38 @@ impl ChannelsState {
.checked_add(bytes_to_add)
.ok_or_else(|| peer_error!("window size larger than 2^32"))?;
if !channel.queued_data.is_empty() {
let limit =
cmp::min(channel.queued_data.len(), channel.peer_window_size as usize);
let data_to_send = channel.queued_data.splice(..limit, []).collect::<Vec<_>>();
self.send_data(our_channel, &data_to_send);
if !channel.queued_data_default.is_empty() {
let limit = cmp::min(
channel.queued_data_default.len(),
channel.peer_window_size as usize,
);
let data_to_send = channel
.queued_data_default
.splice(..limit, [])
.collect::<Vec<_>>();
self.send_data(our_channel, &data_to_send, None);
}
// After potentially sending default data, see if we can send some extended data too.
let channel = self.channel(our_channel)?;
let data_keys = channel
.queued_data_extended
.keys()
.copied()
.collect::<Vec<_>>();
for number in data_keys {
let channel = self.channel(our_channel)?;
let peer_window_size = channel.peer_window_size;
let queued_data_extended =
channel.queued_data_extended.get_mut(&number).unwrap();
if !queued_data_extended.is_empty() {
let limit = cmp::min(queued_data_extended.len(), peer_window_size as usize);
let data_to_send =
queued_data_extended.splice(..limit, []).collect::<Vec<_>>();
self.send_data(our_channel, &data_to_send, Some(number));
}
}
}
numbers::SSH_MSG_CHANNEL_DATA => {
@ -569,7 +600,10 @@ impl ChannelsState {
ChannelOperationKind::Success => self.send_channel_success(peer),
ChannelOperationKind::Failure => self.send_channel_failure(peer),
ChannelOperationKind::Data(data) => {
self.send_data(op.number, &data);
self.send_data(op.number, &data, None);
}
ChannelOperationKind::ExtendedData(code, data) => {
self.send_data(op.number, &data, Some(code));
}
ChannelOperationKind::Request(req) => {
let packet = match req {
@ -623,7 +657,12 @@ impl ChannelsState {
}
}
fn send_data(&mut self, channel_number: ChannelNumber, data: &[u8]) {
fn send_data(
&mut self,
channel_number: ChannelNumber,
data: &[u8],
extended_code: Option<u32>,
) {
let channel = self.channel(channel_number).unwrap();
let mut chunks = data.chunks(channel.peer_max_packet_size as usize);
@ -647,11 +686,24 @@ impl ChannelsState {
// It's over, we have exhausted all window space.
// Queue the rest of the bytes.
let channel = self.channel(channel_number).unwrap();
channel.queued_data.extend_from_slice(to_keep);
for data in chunks {
channel.queued_data.extend_from_slice(data);
match extended_code {
Some(extended) => {
let queued_data_extended =
channel.queued_data_extended.entry(extended).or_default();
queued_data_extended.extend_from_slice(to_keep);
for data in chunks {
queued_data_extended.extend_from_slice(data);
}
debug!(channel = %channel_number, queue_len = %channel.queued_data_extended.len(), "Exhausted window space, queueing the rest of the data");
}
None => {
channel.queued_data_default.extend_from_slice(to_keep);
for data in chunks {
channel.queued_data_default.extend_from_slice(data);
}
debug!(channel = %channel_number, queue_len = %channel.queued_data_default.len(), "Exhausted window space, queueing the rest of the data");
}
}
debug!(channel = %channel_number, queue_len = %channel.queued_data.len(), "Exhausted window space, queueing the rest of the data");
return;
}
Some(space) => channel.peer_window_size = space,
@ -713,6 +765,7 @@ impl ChannelOperation {
ChannelOperationKind::Success => "success",
ChannelOperationKind::Failure => "failure",
ChannelOperationKind::Data(_) => "data",
ChannelOperationKind::ExtendedData(_, _) => "extended-data",
ChannelOperationKind::Request(req) => match req {
ChannelRequest::PtyReq { .. } => "pty-req",
ChannelRequest::Shell { .. } => "shell",
@ -824,6 +877,7 @@ mod tests {
assert_response_types(state, &[]);
}
// TODO: test with extended data
#[test]
fn respect_peer_windowing() {
let state = &mut ChannelsState::new(true);