This commit is contained in:
nora 2022-05-10 14:47:41 +02:00
parent 2a458890bc
commit 7d45668246
4 changed files with 94 additions and 22 deletions

3
.rustfmt.toml Normal file
View file

@ -0,0 +1,3 @@
imports_granularity = "Crate"
newline_style = "Unix"
group_imports = "StdExternalCrate"

View file

@ -1,5 +1,4 @@
use std::ptr; use std::{ptr, sync::atomic::AtomicPtr};
use std::sync::atomic::AtomicPtr;
pub struct LinkedList<T> { pub struct LinkedList<T> {
head: AtomicPtr<Node<T>>, head: AtomicPtr<Node<T>>,

View file

@ -1,6 +1,8 @@
use std::cell::UnsafeCell; use std::{
use std::ops::{Deref, DerefMut}; cell::UnsafeCell,
use std::sync::atomic::{AtomicU8, Ordering}; ops::{Deref, DerefMut},
sync::atomic::{AtomicU8, Ordering},
};
const INIT: u8 = 0; const INIT: u8 = 0;
const ACQUIRED: u8 = 1; const ACQUIRED: u8 = 1;

View file

@ -1,35 +1,103 @@
use std::cell::UnsafeCell; use std::{
use std::mem::MaybeUninit; cell::UnsafeCell,
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; error::Error,
fmt::{Display, Formatter, Write},
mem::MaybeUninit,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Arc,
},
};
struct Spsc<const N: usize, T> { const BUF_SIZE: usize = 16;
/// A fancy ring buffer
struct Spsc<T> {
/// The first element in the buffer
start: AtomicUsize, start: AtomicUsize,
/// A pointer *at* the last element in the buffer. usize::MAX when it's empty,
/// in which case the next element will be put directly at `start`
end: AtomicUsize, end: AtomicUsize,
buffer: [UnsafeCell<MaybeUninit<T>>; N], buffer: [UnsafeCell<MaybeUninit<T>>; BUF_SIZE],
} }
impl<const N: usize, T> Spsc<N, T> { impl<T> Spsc<T> {
pub fn new() -> Self { fn new() -> Self {
Self { Self {
start: AtomicUsize::default(), start: AtomicUsize::default(),
end: AtomicUsize::default(), end: AtomicUsize::new(usize::MAX),
buffer: [UnsafeCell::new(MaybeUninit::uninit()); N], buffer: [UnsafeCell::new(MaybeUninit::uninit()); BUF_SIZE],
} }
} }
pub fn push(&self, value: T) { fn try_send(&self, value: T) -> Result<(), errors::QueueFullError> {
let end = self.end.load(Ordering::Acquire); let end = self.end.load(Ordering::Acquire);
let start = self.start.load(Ordering::Acquire); let start = self.start.load(Ordering::Acquire);
if end != 0 && end == start {}w let idx = if end == usize::MAX { start } else { end };
if end < self.buffer.len() { if end != usize::MAX && end + 1 != start {
let end = unsafe { &self.buffer[end] }; Err(errors::QueueFullError)
unsafe { end.get().write(value) }; } else {
self.end.fetch_and(1, Ordering::Release); unsafe { self.write(value, idx) };
} self.end.store(idx, Ordering::Release);
Ok(())
} }
} }
unsafe impl<const N: usize, T> Sync for Spsc<N, T> {} unsafe fn read(&self, index: usize) -> T {
unsafe impl<const N: usize, T> Send for Spsc<N, T> {} self.buffer[index].get().read()
}
unsafe fn write(&self, value: T, index: usize) {
self.buffer[index].get().write(value)
}
}
pub struct Producer<T> {
queue: Arc<Spsc<T>>,
}
impl<T> Producer<T> {
pub fn try_send(&self, value: T) -> Result<(), errors::QueueFullError> {
self.queue.try_send(value)
}
}
pub struct Consumer<T> {
queue: Arc<Spsc<T>>,
}
impl<T> Consumer<T> {}
unsafe impl<T> Sync for Spsc<T> {}
unsafe impl<T> Send for Spsc<T> {}
mod errors {
use std::{
error::Error,
fmt::{Display, Formatter},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct QueueFullError;
impl Display for QueueFullError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("spsc queue is full")
}
}
impl Error for QueueFullError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct QueueEmptyError;
impl Display for QueueEmptyError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("spsc queue is empty")
}
}
impl Error for QueueEmptyError {}
}