use calloop for daemon

Should fix #3 because we no longer have the cooked loop
This commit is contained in:
nora 2026-02-07 17:23:18 +01:00
parent 14e5170f4f
commit 13bd759ded
3 changed files with 127 additions and 127 deletions

41
Cargo.lock generated
View file

@ -522,18 +522,43 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "calloop"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb9f6e1368bd4621d2c86baa7e37de77a938adf5221e5dd3d6133340101b309e"
dependencies = [
"bitflags 2.9.4",
"polling",
"rustix 1.1.2",
"slab",
"tracing",
]
[[package]] [[package]]
name = "calloop-wayland-source" name = "calloop-wayland-source"
version = "0.3.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a66a987056935f7efce4ab5668920b5d0dac4a7c99991a67395f13702ddd20" checksum = "95a66a987056935f7efce4ab5668920b5d0dac4a7c99991a67395f13702ddd20"
dependencies = [ dependencies = [
"calloop", "calloop 0.13.0",
"rustix 0.38.44", "rustix 0.38.44",
"wayland-backend", "wayland-backend",
"wayland-client", "wayland-client",
] ]
[[package]]
name = "calloop-wayland-source"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138efcf0940a02ebf0cc8d1eff41a1682a46b431630f4c52450d6265876021fa"
dependencies = [
"calloop 0.14.3",
"rustix 1.1.2",
"wayland-backend",
"wayland-client",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.2.36" version = "1.2.36"
@ -621,12 +646,13 @@ dependencies = [
name = "clippyboard-daemon" name = "clippyboard-daemon"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"calloop 0.14.3",
"calloop-wayland-source 0.4.1",
"ciborium", "ciborium",
"clippyboard-shared", "clippyboard-shared",
"ctrlc", "ctrlc",
"dirs", "dirs",
"eyre", "eyre",
"rustix 1.1.2",
"serde", "serde",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@ -792,7 +818,7 @@ dependencies = [
"libc", "libc",
"option-ext", "option-ext",
"redox_users", "redox_users",
"windows-sys 0.60.2", "windows-sys 0.61.0",
] ]
[[package]] [[package]]
@ -2680,7 +2706,7 @@ dependencies = [
"errno", "errno",
"libc", "libc",
"linux-raw-sys 0.11.0", "linux-raw-sys 0.11.0",
"windows-sys 0.60.2", "windows-sys 0.61.0",
] ]
[[package]] [[package]]
@ -2818,8 +2844,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3457dea1f0eb631b4034d61d4d8c32074caa6cd1ab2d59f2327bd8461e2c0016" checksum = "3457dea1f0eb631b4034d61d4d8c32074caa6cd1ab2d59f2327bd8461e2c0016"
dependencies = [ dependencies = [
"bitflags 2.9.4", "bitflags 2.9.4",
"calloop", "calloop 0.13.0",
"calloop-wayland-source", "calloop-wayland-source 0.3.0",
"cursor-icon", "cursor-icon",
"libc", "libc",
"log", "log",
@ -3070,6 +3096,7 @@ version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [ dependencies = [
"log",
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
"tracing-core", "tracing-core",
@ -4055,7 +4082,7 @@ dependencies = [
"bitflags 2.9.4", "bitflags 2.9.4",
"block2", "block2",
"bytemuck", "bytemuck",
"calloop", "calloop 0.13.0",
"cfg_aliases", "cfg_aliases",
"concurrent-queue", "concurrent-queue",
"core-foundation 0.9.4", "core-foundation 0.9.4",

View file

@ -9,10 +9,11 @@ ciborium = "0.2.2"
ctrlc = "3.5.0" ctrlc = "3.5.0"
dirs = "6.0.0" dirs = "6.0.0"
eyre = "0.6.12" eyre = "0.6.12"
rustix = "1.1.2"
serde = "1.0.219" serde = "1.0.219"
tracing = { version = "0.1.41", features = ["attributes"] } tracing = { version = "0.1.41", features = ["attributes"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
wayland-backend = { version = "0.3.11", features = ["client_system"] } wayland-backend = { version = "0.3.11", features = ["client_system"] }
wayland-client = "0.31.11" wayland-client = "0.31.11"
wayland-protocols = { version = "0.32.9", features = ["staging", "client"] } wayland-protocols = { version = "0.32.9", features = ["staging", "client"] }
calloop = "0.14.3"
calloop-wayland-source = "0.4.1"

View file

@ -1,17 +1,18 @@
use calloop::EventLoop;
use calloop::Interest;
use calloop::Mode;
use calloop::generic::Generic;
use calloop_wayland_source::WaylandSource;
use clippyboard_shared::HistoryItem; use clippyboard_shared::HistoryItem;
use eyre::Context; use eyre::Context;
use eyre::ContextCompat;
use eyre::bail; use eyre::bail;
use rustix::event::PollFd;
use rustix::event::PollFlags;
use rustix::fs::OFlags;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::convert::Infallible; use std::convert::Infallible;
use std::io; use std::io;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::io::PipeReader;
use std::io::{BufReader, BufWriter, PipeWriter, Read, Write}; use std::io::{BufReader, BufWriter, PipeWriter, Read, Write};
use std::ops::Deref;
use std::os::fd::AsFd; use std::os::fd::AsFd;
use std::os::unix::net::{UnixListener, UnixStream}; use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf; use std::path::PathBuf;
@ -24,7 +25,6 @@ use tracing::error;
use tracing::info; use tracing::info;
use tracing::warn; use tracing::warn;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use wayland_client::EventQueue;
use wayland_client::protocol::wl_registry::WlRegistry; use wayland_client::protocol::wl_registry::WlRegistry;
use wayland_client::protocol::wl_seat::WlSeat; use wayland_client::protocol::wl_seat::WlSeat;
use wayland_client::{Dispatch, Proxy, QueueHandle, event_created_child}; use wayland_client::{Dispatch, Proxy, QueueHandle, event_created_child};
@ -43,14 +43,26 @@ const MAX_HISTORY_BYTE_SIZE: usize = 100_000_000;
const MIME_TYPES: &[&str] = &["text/plain", "image/png", "image/jpg"]; const MIME_TYPES: &[&str] = &["text/plain", "image/png", "image/jpg"];
struct SharedState { struct SharedStateInner {
next_item_id: AtomicU64, next_item_id: AtomicU64,
items: Mutex<Vec<HistoryItem>>, items: Mutex<Vec<HistoryItem>>,
notify_write_send: PipeWriter,
data_control_manager: OnceLock<ExtDataControlManagerV1>, data_control_manager: OnceLock<ExtDataControlManagerV1>,
data_control_devices: Mutex<HashMap</*seat global name */ u32, ExtDataControlDeviceV1>>, data_control_devices: Mutex<HashMap</*seat global name */ u32, ExtDataControlDeviceV1>>,
qh: QueueHandle<WlState>, qh: QueueHandle<SharedState>,
}
struct SharedState {
inner: Arc<SharedStateInner>,
/// wl_seat that arrived before the data control manager so we weren't able to grab their device immediatly.
deferred_seats: Vec<WlSeat>,
}
impl Deref for SharedState {
type Target = SharedStateInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
} }
struct InProgressOffer { struct InProgressOffer {
@ -58,14 +70,7 @@ struct InProgressOffer {
time: Duration, time: Duration,
} }
struct WlState { impl Dispatch<WlRegistry, ()> for SharedState {
shared_state: Arc<SharedState>,
/// wl_seat that arrived before the data control manager so we weren't able to grab their device immediatly.
deferred_seats: Vec<WlSeat>,
}
impl Dispatch<WlRegistry, ()> for WlState {
fn event( fn event(
state: &mut Self, state: &mut Self,
proxy: &WlRegistry, proxy: &WlRegistry,
@ -84,14 +89,13 @@ impl Dispatch<WlRegistry, ()> for WlState {
info!("A new seat was connected"); info!("A new seat was connected");
let seat: WlSeat = proxy.bind(name, 1, qhandle, ()); let seat: WlSeat = proxy.bind(name, 1, qhandle, ());
match state.shared_state.data_control_manager.get() { match state.data_control_manager.get() {
None => { None => {
state.deferred_seats.push(seat); state.deferred_seats.push(seat);
} }
Some(manager) => { Some(manager) => {
let device = manager.get_data_device(&seat, qhandle, ()); let device = manager.get_data_device(&seat, qhandle, ());
state state
.shared_state
.data_control_devices .data_control_devices
.lock() .lock()
.unwrap() .unwrap()
@ -104,7 +108,7 @@ impl Dispatch<WlRegistry, ()> for WlState {
for seat in state.deferred_seats.drain(..) { for seat in state.deferred_seats.drain(..) {
let device = manager.get_data_device(&seat, qhandle, ()); let device = manager.get_data_device(&seat, qhandle, ());
state state
.shared_state .inner
.data_control_devices .data_control_devices
.lock() .lock()
.unwrap() .unwrap()
@ -112,7 +116,6 @@ impl Dispatch<WlRegistry, ()> for WlState {
} }
state state
.shared_state
.data_control_manager .data_control_manager
.set(manager) .set(manager)
.expect("ext_data_control_manager_v1 already set, global appeared twice?"); .expect("ext_data_control_manager_v1 already set, global appeared twice?");
@ -120,18 +123,13 @@ impl Dispatch<WlRegistry, ()> for WlState {
} }
wayland_client::protocol::wl_registry::Event::GlobalRemove { name } => { wayland_client::protocol::wl_registry::Event::GlobalRemove { name } => {
// try to remove, if it's not a wl_seat it may not exist // try to remove, if it's not a wl_seat it may not exist
state state.data_control_devices.lock().unwrap().remove(&name);
.shared_state
.data_control_devices
.lock()
.unwrap()
.remove(&name);
} }
_ => {} _ => {}
} }
} }
} }
impl Dispatch<ExtDataControlManagerV1, ()> for WlState { impl Dispatch<ExtDataControlManagerV1, ()> for SharedState {
fn event( fn event(
_state: &mut Self, _state: &mut Self,
_proxy: &ExtDataControlManagerV1, _proxy: &ExtDataControlManagerV1,
@ -143,7 +141,7 @@ impl Dispatch<ExtDataControlManagerV1, ()> for WlState {
// no events at the time of writing // no events at the time of writing
} }
} }
impl Dispatch<WlSeat, ()> for WlState { impl Dispatch<WlSeat, ()> for SharedState {
fn event( fn event(
_state: &mut Self, _state: &mut Self,
_proxy: &WlSeat, _proxy: &WlSeat,
@ -155,7 +153,7 @@ impl Dispatch<WlSeat, ()> for WlState {
// we don't care about anything about the seat // we don't care about anything about the seat
} }
} }
impl Dispatch<ExtDataControlDeviceV1, ()> for WlState { impl Dispatch<ExtDataControlDeviceV1, ()> for SharedState {
fn event( fn event(
state: &mut Self, state: &mut Self,
_proxy: &ExtDataControlDeviceV1, _proxy: &ExtDataControlDeviceV1,
@ -191,7 +189,7 @@ impl Dispatch<ExtDataControlDeviceV1, ()> for WlState {
}; };
drop(mime_types); drop(mime_types);
let history_state = state.shared_state.clone(); let history_state = state.inner.clone();
let time = offer_data.time; let time = offer_data.time;
let (reader, writer) = std::io::pipe().unwrap(); let (reader, writer) = std::io::pipe().unwrap();
@ -240,7 +238,7 @@ impl Dispatch<ExtDataControlDeviceV1, ()> for WlState {
} }
} }
event_created_child!(WlState, ExtDataControlDeviceV1, [ event_created_child!(SharedState, ExtDataControlDeviceV1, [
EVT_DATA_OFFER_OPCODE => (ExtDataControlOfferV1, InProgressOffer { EVT_DATA_OFFER_OPCODE => (ExtDataControlOfferV1, InProgressOffer {
mime_types: Default::default(), mime_types: Default::default(),
time: SystemTime::now() time: SystemTime::now()
@ -250,7 +248,7 @@ impl Dispatch<ExtDataControlDeviceV1, ()> for WlState {
]); ]);
} }
impl Dispatch<ExtDataControlOfferV1, InProgressOffer> for WlState { impl Dispatch<ExtDataControlOfferV1, InProgressOffer> for SharedState {
fn event( fn event(
_state: &mut Self, _state: &mut Self,
_proxy: &ExtDataControlOfferV1, _proxy: &ExtDataControlOfferV1,
@ -268,7 +266,7 @@ impl Dispatch<ExtDataControlOfferV1, InProgressOffer> for WlState {
} }
} }
impl Dispatch<ExtDataControlSourceV1, OfferData> for WlState { impl Dispatch<ExtDataControlSourceV1, OfferData> for SharedState {
fn event( fn event(
_state: &mut Self, _state: &mut Self,
proxy: &ExtDataControlSourceV1, proxy: &ExtDataControlSourceV1,
@ -302,15 +300,9 @@ impl Dispatch<ExtDataControlSourceV1, OfferData> for WlState {
} }
} }
impl SharedState {
fn notify_wayland_request(&self) {
let _ = (&self.notify_write_send).write_all(&[0]);
}
}
fn do_copy_into_clipboard( fn do_copy_into_clipboard(
entry: HistoryItem, entry: HistoryItem,
shared_state: &SharedState, shared_state: &SharedStateInner,
) -> Result<(), eyre::Error> { ) -> Result<(), eyre::Error> {
for device in &*shared_state.data_control_devices.lock().unwrap() { for device in &*shared_state.data_control_devices.lock().unwrap() {
let data_source = shared_state let data_source = shared_state
@ -341,34 +333,8 @@ fn do_copy_into_clipboard(
Ok(()) Ok(())
} }
fn dispatch_wayland(
mut queue: EventQueue<WlState>,
mut wl_state: WlState,
notify_write_recv: PipeReader,
) -> eyre::Result<()> {
loop {
queue
.dispatch_pending(&mut wl_state)
.wrap_err("dispatching Wayland events")?;
let read_guard = queue
.prepare_read()
.wrap_err("preparing read from Wayland socket")?;
let _ = queue.flush();
let pollfd1_read = PollFd::from_borrowed_fd(read_guard.connection_fd(), PollFlags::IN);
let pollfd_signal = PollFd::from_borrowed_fd(notify_write_recv.as_fd(), PollFlags::IN);
let _ = rustix::event::poll(&mut [pollfd1_read, pollfd_signal], None);
read_guard
.read_without_dispatch()
.wrap_err("reading from wayland socket")?;
}
}
#[tracing::instrument(skip(peer, shared_state))] #[tracing::instrument(skip(peer, shared_state))]
fn handle_peer(mut peer: UnixStream, shared_state: &SharedState) -> eyre::Result<()> { fn handle_peer(mut peer: UnixStream, shared_state: &SharedStateInner) -> eyre::Result<()> {
let mut request = [0; 1]; let mut request = [0; 1];
let Ok(()) = peer.read_exact(&mut request) else { let Ok(()) = peer.read_exact(&mut request) else {
return Ok(()); return Ok(());
@ -396,7 +362,7 @@ struct OfferData(Arc<[u8]>);
fn handle_copy_message( fn handle_copy_message(
mut peer: UnixStream, mut peer: UnixStream,
shared_state: &SharedState, shared_state: &SharedStateInner,
) -> Result<(), eyre::Error> { ) -> Result<(), eyre::Error> {
let mut id = [0; 8]; let mut id = [0; 8];
peer.read_exact(&mut id).wrap_err("failed to read id")?; peer.read_exact(&mut id).wrap_err("failed to read id")?;
@ -412,25 +378,21 @@ fn handle_copy_message(
do_copy_into_clipboard(item, &shared_state).wrap_err("doing copy")?; do_copy_into_clipboard(item, &shared_state).wrap_err("doing copy")?;
shared_state.notify_wayland_request();
Ok(()) Ok(())
} }
fn handle_clear_message(shared_state: &SharedState) -> eyre::Result<()> { fn handle_clear_message(shared_state: &SharedStateInner) -> eyre::Result<()> {
shared_state.items.lock().unwrap().clear(); shared_state.items.lock().unwrap().clear();
for device in &*shared_state.data_control_devices.lock().unwrap() { for device in &*shared_state.data_control_devices.lock().unwrap() {
device.1.set_selection(None); device.1.set_selection(None);
} }
shared_state.notify_wayland_request();
Ok(()) Ok(())
} }
fn read_fd_into_history( fn read_fd_into_history(
history_state: &SharedState, history_state: &SharedStateInner,
time: std::time::Duration, time: std::time::Duration,
mime: String, mime: String,
data_reader: impl Read, data_reader: impl Read,
@ -514,35 +476,27 @@ pub fn main_inner(socket_path: &PathBuf) -> eyre::Result<Infallible> {
let conn = let conn =
wayland_client::Connection::connect_to_env().wrap_err("connecting to the compositor")?; wayland_client::Connection::connect_to_env().wrap_err("connecting to the compositor")?;
let mut queue = conn.new_event_queue::<WlState>(); let mut queue = conn.new_event_queue::<SharedState>();
let (notify_write_recv, notify_write_send) = std::io::pipe().expect("todo"); let mut shared_state = SharedState {
inner: Arc::new(SharedStateInner {
next_item_id: AtomicU64::new(0),
items: Mutex::new(Vec::<HistoryItem>::new()),
let shared_state = Arc::new(SharedState { data_control_manager: OnceLock::new(),
next_item_id: AtomicU64::new(0), data_control_devices: Mutex::new(HashMap::new()),
items: Mutex::new(Vec::<HistoryItem>::new()), qh: queue.handle(),
notify_write_send, }),
data_control_manager: OnceLock::new(),
data_control_devices: Mutex::new(HashMap::new()),
qh: queue.handle(),
});
let history_state2 = shared_state.clone();
let mut wl_state = WlState {
deferred_seats: Vec::new(), deferred_seats: Vec::new(),
shared_state: history_state2,
}; };
conn.display().get_registry(&queue.handle(), ()); conn.display().get_registry(&queue.handle(), ());
queue queue
.roundtrip(&mut wl_state) .roundtrip(&mut shared_state)
.wrap_err("failed to set up wayland state")?; .wrap_err("failed to set up wayland state")?;
if wl_state.shared_state.data_control_manager.get().is_none() { if shared_state.data_control_manager.get().is_none() {
bail!( bail!(
"{} not found, the ext-data-control-v1 Wayland extension is likely unsupported by your compositor.\n\ "{} not found, the ext-data-control-v1 Wayland extension is likely unsupported by your compositor.\n\
check https://wayland.app/protocols/ext-data-control-v1#compositor-support\ check https://wayland.app/protocols/ext-data-control-v1#compositor-support\
@ -551,35 +505,53 @@ pub fn main_inner(socket_path: &PathBuf) -> eyre::Result<Infallible> {
); );
} }
rustix::fs::fcntl_setfl(notify_write_recv.as_fd(), OFlags::NONBLOCK).expect("todo"); let mut event_loop =
rustix::fs::fcntl_setfl(conn.as_fd(), OFlags::NONBLOCK).expect("TODO"); EventLoop::<SharedState>::try_new().wrap_err("failed to initialize event_loop")?;
let socket_path_clone = socket_path.to_owned(); WaylandSource::new(conn.clone(), queue)
std::thread::spawn(move || { .insert(event_loop.handle())
if let Err(err) = dispatch_wayland(queue, wl_state, notify_write_recv) { .unwrap();
error!("error on Wayland thread: {err:?}");
cleanup(&socket_path_clone); event_loop
std::process::exit(1); .handle()
} .insert_source(
}); Generic::new(socket, Interest::READ, Mode::Level),
|_, socket, shared_state| {
let peer = socket.accept();
match peer {
Ok((peer, _)) => {
let history_state = shared_state.inner.clone();
std::thread::spawn(move || {
let result = handle_peer(peer, &history_state);
if let Err(err) = result {
warn!("Error handling peer: {err:?}");
}
});
}
Err(err) => {
warn!("Error accepting peer: {err}");
}
}
Ok(calloop::PostAction::Continue)
},
)
.wrap_err("failed to register socket event source")?;
info!("Listening on {}", socket_path.display()); info!("Listening on {}", socket_path.display());
for peer in socket.incoming() { let socket_path_clone = socket_path.clone();
match peer {
Ok(peer) => { let result = event_loop.run(
let history_state = shared_state.clone(); std::time::Duration::from_millis(100),
std::thread::spawn(move || { &mut shared_state,
let result = handle_peer(peer, &history_state); |_| {},
if let Err(err) = result { );
warn!("Error handling peer: {err:?}");
} if let Err(err) = result {
}); error!("error on Wayland thread: {err:?}");
} cleanup(&socket_path_clone);
Err(err) => { std::process::exit(1);
warn!("Error accepting peer: {err}");
}
}
} }
unreachable!("socket.incoming will never return None") unreachable!("socket.incoming will never return None")