diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..4d7dd9e --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,3 @@ +imports_granularity = "Crate" +newline_style = "Unix" +group_imports = "StdExternalCrate" \ No newline at end of file diff --git a/src/linked_list.rs b/src/linked_list.rs index ad10414..3533316 100644 --- a/src/linked_list.rs +++ b/src/linked_list.rs @@ -1,5 +1,4 @@ -use std::ptr; -use std::sync::atomic::AtomicPtr; +use std::{ptr, sync::atomic::AtomicPtr}; pub struct LinkedList { head: AtomicPtr>, diff --git a/src/mutex.rs b/src/mutex.rs index d004fa4..892b18b 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,6 +1,8 @@ -use std::cell::UnsafeCell; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicU8, Ordering}; +use std::{ + cell::UnsafeCell, + ops::{Deref, DerefMut}, + sync::atomic::{AtomicU8, Ordering}, +}; const INIT: u8 = 0; const ACQUIRED: u8 = 1; diff --git a/src/spsc.rs b/src/spsc.rs index ab45c7b..797407c 100644 --- a/src/spsc.rs +++ b/src/spsc.rs @@ -1,35 +1,103 @@ -use std::cell::UnsafeCell; -use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::{ + cell::UnsafeCell, + error::Error, + fmt::{Display, Formatter, Write}, + mem::MaybeUninit, + sync::{ + atomic::{AtomicPtr, AtomicUsize, Ordering}, + Arc, + }, +}; -struct Spsc { +const BUF_SIZE: usize = 16; + +/// A fancy ring buffer +struct Spsc { + /// The first element in the buffer start: AtomicUsize, + /// A pointer *at* the last element in the buffer. usize::MAX when it's empty, + /// in which case the next element will be put directly at `start` end: AtomicUsize, - buffer: [UnsafeCell>; N], + buffer: [UnsafeCell>; BUF_SIZE], } -impl Spsc { - pub fn new() -> Self { +impl Spsc { + fn new() -> Self { Self { start: AtomicUsize::default(), - end: AtomicUsize::default(), - buffer: [UnsafeCell::new(MaybeUninit::uninit()); N], + end: AtomicUsize::new(usize::MAX), + buffer: [UnsafeCell::new(MaybeUninit::uninit()); BUF_SIZE], } } - pub fn push(&self, value: T) { + fn try_send(&self, value: T) -> Result<(), errors::QueueFullError> { let end = self.end.load(Ordering::Acquire); let start = self.start.load(Ordering::Acquire); - if end != 0 && end == start {}w + let idx = if end == usize::MAX { start } else { end }; - if end < self.buffer.len() { - let end = unsafe { &self.buffer[end] }; - unsafe { end.get().write(value) }; - self.end.fetch_and(1, Ordering::Release); + if end != usize::MAX && end + 1 != start { + Err(errors::QueueFullError) + } else { + unsafe { self.write(value, idx) }; + self.end.store(idx, Ordering::Release); + Ok(()) } } + + unsafe fn read(&self, index: usize) -> T { + self.buffer[index].get().read() + } + + unsafe fn write(&self, value: T, index: usize) { + self.buffer[index].get().write(value) + } } -unsafe impl Sync for Spsc {} -unsafe impl Send for Spsc {} +pub struct Producer { + queue: Arc>, +} + +impl Producer { + pub fn try_send(&self, value: T) -> Result<(), errors::QueueFullError> { + self.queue.try_send(value) + } +} + +pub struct Consumer { + queue: Arc>, +} + +impl Consumer {} + +unsafe impl Sync for Spsc {} +unsafe impl Send for Spsc {} + +mod errors { + use std::{ + error::Error, + fmt::{Display, Formatter}, + }; + + #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] + pub struct QueueFullError; + + impl Display for QueueFullError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("spsc queue is full") + } + } + + impl Error for QueueFullError {} + + #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] + pub struct QueueEmptyError; + + impl Display for QueueEmptyError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("spsc queue is empty") + } + } + + impl Error for QueueEmptyError {} +}