From 0f5036a49865c4f635ffb5e72d1849d6d59c555c Mon Sep 17 00:00:00 2001 From: Jake Stanger Date: Sat, 28 Dec 2024 00:28:50 +0000 Subject: [PATCH] refactor: replace channel macros with ext trait methods --- src/channels.rs | 153 ++++++++++++++++++ src/clients/clipboard.rs | 17 +- src/clients/compositor/hyprland.rs | 55 +++---- src/clients/compositor/sway.rs | 10 +- src/clients/libinput.rs | 7 +- src/clients/music/mpd.rs | 7 +- src/clients/music/mpris.rs | 13 +- src/clients/swaync/mod.rs | 5 +- src/clients/volume/mod.rs | 5 +- src/clients/volume/sink.rs | 11 +- src/clients/volume/sink_input.rs | 11 +- src/clients/wayland/mod.rs | 26 +-- src/clients/wayland/wl_output.rs | 35 ++-- src/clients/wayland/wlr_data_control/mod.rs | 19 ++- .../wayland/wlr_foreign_toplevel/mod.rs | 11 +- src/dynamic_value/dynamic_bool.rs | 14 +- src/dynamic_value/dynamic_string.rs | 13 +- src/image/provider.rs | 10 +- src/ipc/server/mod.rs | 12 +- src/ironvar.rs | 6 +- src/macros.rs | 131 --------------- src/main.rs | 4 +- src/modules/cairo.rs | 25 ++- src/modules/clipboard.rs | 25 ++- src/modules/clock.rs | 13 +- src/modules/custom/button.rs | 16 +- src/modules/custom/mod.rs | 10 +- src/modules/custom/progress.rs | 10 +- src/modules/custom/slider.rs | 23 ++- src/modules/focused.rs | 27 ++-- src/modules/keys.rs | 24 ++- src/modules/label.rs | 11 +- src/modules/launcher/item.rs | 29 ++-- src/modules/launcher/mod.rs | 48 +++--- src/modules/mod.rs | 115 +++++++------ src/modules/music/mod.rs | 53 +++--- src/modules/networkmanager.rs | 12 +- src/modules/notifications.rs | 13 +- src/modules/script.rs | 11 +- src/modules/sway/mode.rs | 15 +- src/modules/sysinfo.rs | 11 +- src/modules/tray/mod.rs | 31 ++-- src/modules/upower.rs | 26 +-- src/modules/volume.rs | 92 +++++++---- src/modules/workspaces.rs | 58 ++++--- src/popup.rs | 15 +- src/script.rs | 7 +- src/style.rs | 7 +- 48 files changed, 677 insertions(+), 625 deletions(-) create mode 100644 src/channels.rs diff --git a/src/channels.rs b/src/channels.rs new file mode 100644 index 00000000..28b98b27 --- /dev/null +++ b/src/channels.rs @@ -0,0 +1,153 @@ +use crate::modules::ModuleUpdateEvent; +use crate::spawn; +use smithay_client_toolkit::reexports::calloop; +use std::fmt::Debug; +use tokio::sync::{broadcast, mpsc}; + +pub trait SyncSenderExt { + /// Asynchronously sends a message on the channel, + /// panicking if it cannot be sent. + /// + /// This should be used in cases where sending should *never* fail, + /// or where failing indicates a serious bug. + fn send_expect(&self, message: T); +} + +impl SyncSenderExt for std::sync::mpsc::Sender { + #[inline] + fn send_expect(&self, message: T) { + self.send(message).expect(crate::error::ERR_CHANNEL_SEND); + } +} + +impl SyncSenderExt for calloop::channel::Sender { + #[inline] + fn send_expect(&self, message: T) { + self.send(message).expect(crate::error::ERR_CHANNEL_SEND); + } +} + +impl SyncSenderExt for broadcast::Sender { + #[inline] + fn send_expect(&self, message: T) { + self.send(message).expect(crate::error::ERR_CHANNEL_SEND); + } +} + +pub trait AsyncSenderExt: Sync + Send + Sized + Clone { + /// Asynchronously sends a message on the channel, + /// panicking if it cannot be sent. + /// + /// This should be used in cases where sending should *never* fail, + /// or where failing indicates a serious bug. + fn send_expect(&self, message: T) -> impl std::future::Future + Send; + + /// Asynchronously sends a message on the channel, + /// spawning a task to allow it to be sent in the background, + /// and panicking if it cannot be sent. + /// + /// Note that this function will return *before* the message is sent. + /// + /// This should be used in cases where sending should *never* fail, + /// or where failing indicates a serious bug. + #[inline] + fn send_spawn(&self, message: T) + where + Self: 'static, + T: Send + 'static, + { + let tx = self.clone(); + spawn(async move { tx.send_expect(message).await }); + } + + /// Shorthand for [`AsyncSenderExt::send_expect`] + /// when sending a [`ModuleUpdateEvent::Update`]. + #[inline] + async fn send_update(&self, update: U) + where + Self: AsyncSenderExt>, + { + self.send_expect(ModuleUpdateEvent::Update(update)).await; + } + + /// Shorthand for [`AsyncSenderExt::send_spawn`] + /// when sending a [`ModuleUpdateEvent::Update`]. + #[inline] + fn send_update_spawn(&self, update: U) + where + Self: AsyncSenderExt> + 'static, + U: Clone + Send + 'static, + { + self.send_spawn(ModuleUpdateEvent::Update(update)); + } +} + +impl AsyncSenderExt for mpsc::Sender { + #[inline] + async fn send_expect(&self, message: T) { + self.send(message) + .await + .expect(crate::error::ERR_CHANNEL_SEND); + } +} + +pub trait MpscReceiverExt { + /// Spawns a `GLib` future on the local thread, and calls `rx.recv()` + /// in a loop, passing the message to `f`. + /// + /// This allows use of `GObjects` and futures in the same context. + fn recv_glib(self, f: F) + where + F: FnMut(T) + 'static; +} + +impl MpscReceiverExt for mpsc::Receiver { + fn recv_glib(mut self, mut f: F) + where + F: FnMut(T) + 'static, + { + glib::spawn_future_local(async move { + while let Some(val) = self.recv().await { + f(val); + } + }); + } +} + +pub trait BroadcastReceiverExt +where + T: Debug + Clone + 'static, +{ + /// Spawns a `GLib` future on the local thread, and calls `rx.recv()` + /// in a loop, passing the message to `f`. + /// + /// This allows use of `GObjects` and futures in the same context. + fn recv_glib(self, f: F) + where + F: FnMut(T) + 'static; +} + +impl BroadcastReceiverExt for broadcast::Receiver +where + T: Debug + Clone + 'static, +{ + fn recv_glib(mut self, mut f: F) + where + F: FnMut(T) + 'static, + { + glib::spawn_future_local(async move { + loop { + match self.recv().await { + Ok(val) => f(val), + Err(broadcast::error::RecvError::Lagged(count)) => { + tracing::warn!("Channel lagged behind by {count}, this may result in unexpected or broken behaviour"); + } + Err(err) => { + tracing::error!("{err:?}"); + break; + } + } + } + }); + } +} diff --git a/src/clients/clipboard.rs b/src/clients/clipboard.rs index 1752486f..6169e57c 100644 --- a/src/clients/clipboard.rs +++ b/src/clients/clipboard.rs @@ -1,5 +1,6 @@ use super::wayland::{self, ClipboardItem}; -use crate::{arc_mut, lock, register_client, spawn, try_send}; +use crate::channels::AsyncSenderExt; +use crate::{arc_mut, lock, register_client, spawn}; use indexmap::map::Iter; use indexmap::IndexMap; use std::sync::{Arc, Mutex}; @@ -46,7 +47,7 @@ impl Client { let senders = lock!(senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Add(item.clone())); + tx.send_spawn(ClipboardEvent::Add(item.clone())); } lock!(cache).insert(item, senders.len()); @@ -74,16 +75,16 @@ impl Client { let removed_id = lock!(cache) .remove_ref_first() .expect("Clipboard cache unexpectedly empty"); - try_send!(tx, ClipboardEvent::Remove(removed_id)); + tx.send_spawn(ClipboardEvent::Remove(removed_id)); } - try_send!(tx, ClipboardEvent::Add(item.clone())); + tx.send_spawn(ClipboardEvent::Add(item.clone())); } }, |existing_id| { let senders = lock!(senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Activate(existing_id)); + tx.send_spawn(ClipboardEvent::Activate(existing_id)); } }, ); @@ -106,7 +107,7 @@ impl Client { let iter = cache.iter(); for (_, (item, _)) in iter { - try_send!(tx, ClipboardEvent::Add(item.clone())); + tx.send_spawn(ClipboardEvent::Add(item.clone())); } } @@ -130,7 +131,7 @@ impl Client { let senders = lock!(self.senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Activate(id)); + tx.send_spawn(ClipboardEvent::Activate(id)); } } @@ -140,7 +141,7 @@ impl Client { let senders = lock!(self.senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Remove(id)); + tx.send_spawn(ClipboardEvent::Remove(id)); } } } diff --git a/src/clients/compositor/hyprland.rs b/src/clients/compositor/hyprland.rs index f7bb09f9..4ff8af51 100644 --- a/src/clients/compositor/hyprland.rs +++ b/src/clients/compositor/hyprland.rs @@ -1,5 +1,6 @@ use super::{Visibility, Workspace, WorkspaceClient, WorkspaceUpdate}; -use crate::{arc_mut, lock, send, spawn_blocking}; +use crate::channels::SyncSenderExt; +use crate::{arc_mut, lock, spawn_blocking}; use color_eyre::Result; use hyprland::data::{Workspace as HWorkspace, Workspaces}; use hyprland::dispatch::{Dispatch, DispatchType, WorkspaceIdentifierWithSpecial}; @@ -58,7 +59,7 @@ impl Client { let workspace = Self::get_workspace(&workspace_name, prev_workspace.as_ref()); if let Some(workspace) = workspace { - send!(tx, WorkspaceUpdate::Add(workspace)); + tx.send_expect(WorkspaceUpdate::Add(workspace)); } }); } @@ -139,7 +140,7 @@ impl Client { let workspace = Self::get_workspace(&workspace_name, prev_workspace.as_ref()); if let Some(workspace) = workspace { - send!(tx, WorkspaceUpdate::Move(workspace.clone())); + tx.send_expect(WorkspaceUpdate::Move(workspace.clone())); if !workspace.visibility.is_focused() { Self::send_focus_change(&mut prev_workspace, workspace, &tx); @@ -155,13 +156,10 @@ impl Client { event_listener.add_workspace_rename_handler(move |data| { let _lock = lock!(lock); - send!( - tx, - WorkspaceUpdate::Rename { - id: data.workspace_id as i64, - name: data.workspace_name - } - ); + tx.send_expect(WorkspaceUpdate::Rename { + id: data.workspace_id as i64, + name: data.workspace_name, + }); }); } @@ -172,7 +170,7 @@ impl Client { event_listener.add_workspace_destroy_handler(move |data| { let _lock = lock!(lock); debug!("Received workspace destroy: {data:?}"); - send!(tx, WorkspaceUpdate::Remove(data.workspace_id as i64)); + tx.send_expect(WorkspaceUpdate::Remove(data.workspace_id as i64)); }); } @@ -193,13 +191,10 @@ impl Client { error!("Unable to locate client"); }, |c| { - send!( - tx, - WorkspaceUpdate::Urgent { - id: c.workspace.id as i64, - urgent: true, - } - ); + tx.send_expect(WorkspaceUpdate::Urgent { + id: c.workspace.id as i64, + urgent: true, + }); }, ); }); @@ -218,21 +213,15 @@ impl Client { workspace: Workspace, tx: &Sender, ) { - send!( - tx, - WorkspaceUpdate::Focus { - old: prev_workspace.take(), - new: workspace.clone(), - } - ); + tx.send_expect(WorkspaceUpdate::Focus { + old: prev_workspace.take(), + new: workspace.clone(), + }); - send!( - tx, - WorkspaceUpdate::Urgent { - id: workspace.id, - urgent: false, - } - ); + tx.send_expect(WorkspaceUpdate::Urgent { + id: workspace.id, + urgent: false, + }); prev_workspace.replace(workspace); } @@ -292,7 +281,7 @@ impl WorkspaceClient for Client { }) .collect(); - send!(tx, WorkspaceUpdate::Init(workspaces)); + tx.send_expect(WorkspaceUpdate::Init(workspaces)); } rx diff --git a/src/clients/compositor/sway.rs b/src/clients/compositor/sway.rs index e7091a12..1e685ac7 100644 --- a/src/clients/compositor/sway.rs +++ b/src/clients/compositor/sway.rs @@ -1,11 +1,11 @@ use super::{Visibility, Workspace, WorkspaceClient, WorkspaceUpdate}; -use crate::{await_sync, send}; +use crate::await_sync; +use crate::channels::SyncSenderExt; +use crate::clients::sway::Client; use color_eyre::Result; use swayipc_async::{Node, WorkspaceChange, WorkspaceEvent}; use tokio::sync::broadcast::{channel, Receiver}; -use crate::clients::sway::Client; - impl WorkspaceClient for Client { fn focus(&self, id: String) -> Result<()> { await_sync(async move { @@ -27,13 +27,13 @@ impl WorkspaceClient for Client { let event = WorkspaceUpdate::Init(workspaces.into_iter().map(Workspace::from).collect()); - send!(tx, event); + tx.send_expect(event); drop(client); self.add_listener::(move |event| { let update = WorkspaceUpdate::from(event.clone()); - send!(tx, update); + tx.send_expect(update); }) .await .expect("to add listener"); diff --git a/src/clients/libinput.rs b/src/clients/libinput.rs index 9259e006..e924c686 100644 --- a/src/clients/libinput.rs +++ b/src/clients/libinput.rs @@ -1,4 +1,5 @@ -use crate::{arc_rw, read_lock, send, spawn, spawn_blocking, write_lock}; +use crate::channels::SyncSenderExt; +use crate::{arc_rw, read_lock, spawn, spawn_blocking, write_lock}; use color_eyre::{Report, Result}; use evdev_rs::enums::{int_to_ev_key, EventCode, EV_KEY, EV_LED}; use evdev_rs::DeviceWrapper; @@ -182,7 +183,7 @@ impl Client { let data = KeyData { device_path, key }; if let Ok(event) = data.try_into() { - send!(tx, event); + tx.send_expect(event); } }); } @@ -210,7 +211,7 @@ impl Client { if caps_event.is_ok() { debug!("new keyboard device: {name} | {}", device_path.display()); write_lock!(self.known_devices).push(device_path.to_path_buf()); - send!(self.tx, Event::Device); + self.tx.send_expect(Event::Device); } } } diff --git a/src/clients/music/mpd.rs b/src/clients/music/mpd.rs index f1239c9d..67e1e5d7 100644 --- a/src/clients/music/mpd.rs +++ b/src/clients/music/mpd.rs @@ -1,7 +1,8 @@ use super::{ MusicClient, PlayerState, PlayerUpdate, ProgressTick, Status, Track, TICK_INTERVAL_MS, }; -use crate::{await_sync, send, spawn, Ironbar}; +use crate::channels::SyncSenderExt; +use crate::{await_sync, spawn, Ironbar}; use color_eyre::Report; use color_eyre::Result; use mpd_client::client::{ConnectionEvent, Subsystem}; @@ -97,7 +98,7 @@ impl Client { let status = Status::from(status); let update = PlayerUpdate::Update(Box::new(track), status); - send!(tx, update); + tx.send_expect(update); } Ok(()) @@ -113,7 +114,7 @@ impl Client { elapsed: status.elapsed, }); - send!(tx, update); + tx.send_expect(update); } } } diff --git a/src/clients/music/mpris.rs b/src/clients/music/mpris.rs index 95e63f23..5d0e16e9 100644 --- a/src/clients/music/mpris.rs +++ b/src/clients/music/mpris.rs @@ -1,6 +1,7 @@ use super::{MusicClient, PlayerState, PlayerUpdate, Status, Track, TICK_INTERVAL_MS}; +use crate::channels::SyncSenderExt; use crate::clients::music::ProgressTick; -use crate::{arc_mut, lock, send, spawn_blocking}; +use crate::{arc_mut, lock, spawn_blocking}; use color_eyre::Result; use mpris::{DBusError, Event, Metadata, PlaybackStatus, Player, PlayerFinder}; use std::cmp; @@ -137,7 +138,7 @@ impl Client { let mut players_locked = lock!(players); players_locked.remove(identity); if players_locked.is_empty() { - send!(tx, PlayerUpdate::Update(Box::new(None), Status::default())); + tx.send_expect(PlayerUpdate::Update(Box::new(None), Status::default())); } }; @@ -212,7 +213,7 @@ impl Client { let track = Track::from(metadata); let player_update = PlayerUpdate::Update(Box::new(Some(track)), status); - send!(tx, player_update); + tx.send_expect(player_update); Ok(()) } @@ -242,7 +243,7 @@ impl Client { duration: metadata.length(), }); - send!(tx, update); + tx.send_expect(update); } } } @@ -319,7 +320,9 @@ impl MusicClient for Client { state: PlayerState::Stopped, volume_percent: None, }; - send!(self.tx, PlayerUpdate::Update(Box::new(None), status)); + + self.tx + .send_expect(PlayerUpdate::Update(Box::new(None), status)); } rx diff --git a/src/clients/swaync/mod.rs b/src/clients/swaync/mod.rs index 9b5c1161..58370b00 100644 --- a/src/clients/swaync/mod.rs +++ b/src/clients/swaync/mod.rs @@ -1,6 +1,7 @@ mod dbus; -use crate::{register_fallible_client, send, spawn}; +use crate::channels::SyncSenderExt; +use crate::{register_fallible_client, spawn}; use color_eyre::{Report, Result}; use dbus::SwayNcProxy; use serde::Deserialize; @@ -56,7 +57,7 @@ impl Client { while let Some(ev) = stream.next().await { let ev = ev.body::().expect("to deserialize"); debug!("Received event: {ev:?}"); - send!(tx, ev); + tx.send_expect(ev); } }); } diff --git a/src/clients/volume/mod.rs b/src/clients/volume/mod.rs index 3db503ff..21ba9914 100644 --- a/src/clients/volume/mod.rs +++ b/src/clients/volume/mod.rs @@ -1,7 +1,7 @@ mod sink; mod sink_input; -use crate::{arc_mut, lock, register_client, send, spawn_blocking, APP_ID}; +use crate::{arc_mut, lock, register_client, spawn_blocking, APP_ID}; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::introspect::{Introspector, ServerInfo}; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet, Operation}; @@ -14,6 +14,7 @@ use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use tracing::{debug, error, info, warn}; +use crate::channels::SyncSenderExt; pub use sink::Sink; pub use sink_input::SinkInput; @@ -269,7 +270,7 @@ fn set_default_sink( { sink.active = true; debug!("Set sink active: {}", sink.name); - send!(tx, Event::UpdateSink(sink.clone())); + tx.send_expect(Event::UpdateSink(sink.clone())); } else { warn!("Couldn't find sink: {}", default_sink_name); } diff --git a/src/clients/volume/sink.rs b/src/clients/volume/sink.rs index d08b99e1..235ea90d 100644 --- a/src/clients/volume/sink.rs +++ b/src/clients/volume/sink.rs @@ -1,5 +1,6 @@ use super::{percent_to_volume, volume_to_percent, ArcMutVec, Client, ConnectionState, Event}; -use crate::{lock, send}; +use crate::channels::SyncSenderExt; +use crate::lock; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::introspect::SinkInfo; use libpulse_binding::context::subscribe::Operation; @@ -59,7 +60,7 @@ impl Client { let ListResult::Item(info) = info else { return; }; - send!(tx, info.volume); + tx.send_expect(info.volume); }); let new_volume = percent_to_volume(volume_percent); @@ -123,7 +124,7 @@ pub fn add(info: ListResult<&SinkInfo>, sinks: &ArcMutVec, tx: &broadcast: }; lock!(sinks).push(info.into()); - send!(tx, Event::AddSink(info.into())); + tx.send_expect(Event::AddSink(info.into())); } fn update( @@ -162,7 +163,7 @@ fn update( } } - send!(tx, Event::UpdateSink(sink)); + tx.send_expect(Event::UpdateSink(sink)); } fn remove(index: u32, sinks: &ArcMutVec, tx: &broadcast::Sender) { @@ -170,6 +171,6 @@ fn remove(index: u32, sinks: &ArcMutVec, tx: &broadcast::Sender) { if let Some(pos) = sinks.iter().position(|s| s.index == index) { let info = sinks.remove(pos); - send!(tx, Event::RemoveSink(info.name)); + tx.send_expect(Event::RemoveSink(info.name)); } } diff --git a/src/clients/volume/sink_input.rs b/src/clients/volume/sink_input.rs index 102aed2e..553c9de1 100644 --- a/src/clients/volume/sink_input.rs +++ b/src/clients/volume/sink_input.rs @@ -1,5 +1,6 @@ use super::{percent_to_volume, volume_to_percent, ArcMutVec, Client, ConnectionState, Event}; -use crate::{lock, send}; +use crate::channels::SyncSenderExt; +use crate::lock; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::introspect::SinkInputInfo; use libpulse_binding::context::subscribe::Operation; @@ -47,7 +48,7 @@ impl Client { let ListResult::Item(info) = info else { return; }; - send!(tx, info.volume); + tx.send_expect(info.volume); }); let new_volume = percent_to_volume(volume_percent); @@ -113,7 +114,7 @@ pub fn add( }; lock!(inputs).push(info.into()); - send!(tx, Event::AddInput(info.into())); + tx.send_expect(Event::AddInput(info.into())); } fn update( @@ -135,7 +136,7 @@ fn update( inputs[pos] = info.into(); } - send!(tx, Event::UpdateInput(info.into())); + tx.send_expect(Event::UpdateInput(info.into())); } fn remove(index: u32, inputs: &ArcMutVec, tx: &broadcast::Sender) { @@ -143,6 +144,6 @@ fn remove(index: u32, inputs: &ArcMutVec, tx: &broadcast::Sender send!(output_tx, event), + Event::Output(event) => output_tx.send_expect(event), #[cfg(any(feature = "focused", feature = "launcher"))] - Event::Toplevel(event) => send!(toplevel_tx, event), + Event::Toplevel(event) => toplevel_tx.send_expect(event), #[cfg(feature = "clipboard")] - Event::Clipboard(item) => send!(clipboard_tx, item), + Event::Clipboard(item) => clipboard_tx.send_expect(item), }; } }); @@ -177,7 +178,7 @@ impl Client { /// Sends a request to the environment event loop, /// and returns the response. fn send_request(&self, request: Request) -> Response { - send!(self.tx, request); + self.tx.send_expect(request); lock!(self.rx).recv().expect(ERR_CHANNEL_RECV) } @@ -322,12 +323,12 @@ impl Environment { match event { Msg(Request::Roundtrip) => { debug!("received roundtrip request"); - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "ipc")] Msg(Request::OutputInfoAll) => { let infos = env.output_info_all(); - send!(env.response_tx, Response::OutputInfoAll(infos)); + env.response_tx.send_expect(Response::OutputInfoAll(infos)); } #[cfg(any(feature = "focused", feature = "launcher"))] Msg(Request::ToplevelInfoAll) => { @@ -336,7 +337,8 @@ impl Environment { .iter() .filter_map(ToplevelHandle::info) .collect(); - send!(env.response_tx, Response::ToplevelInfoAll(infos)); + env.response_tx + .send_expect(Response::ToplevelInfoAll(infos)); } #[cfg(feature = "launcher")] Msg(Request::ToplevelFocus(id)) => { @@ -350,7 +352,7 @@ impl Environment { handle.focus(&seat); } - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "launcher")] Msg(Request::ToplevelMinimize(id)) => { @@ -363,17 +365,17 @@ impl Environment { handle.minimize(); } - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "clipboard")] Msg(Request::CopyToClipboard(item)) => { env.copy_to_clipboard(item); - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "clipboard")] Msg(Request::ClipboardItem) => { let item = lock!(env.clipboard).clone(); - send!(env.response_tx, Response::ClipboardItem(item)); + env.response_tx.send_expect(Response::ClipboardItem(item)); } calloop_channel::Event::Closed => error!("request channel unexpectedly closed"), } diff --git a/src/clients/wayland/wl_output.rs b/src/clients/wayland/wl_output.rs index da3b9b12..602978a3 100644 --- a/src/clients/wayland/wl_output.rs +++ b/src/clients/wayland/wl_output.rs @@ -1,5 +1,5 @@ use super::{Client, Environment, Event}; -use crate::try_send; +use crate::channels::AsyncSenderExt; use smithay_client_toolkit::output::{OutputHandler, OutputInfo, OutputState}; use tokio::sync::broadcast; use tracing::{debug, error}; @@ -63,13 +63,10 @@ impl OutputHandler for Environment { fn new_output(&mut self, _conn: &Connection, _qh: &QueueHandle, output: WlOutput) { debug!("Handler received new output"); if let Some(info) = self.output_state.info(&output) { - try_send!( - self.event_tx, - Event::Output(OutputEvent { - output: info, - event_type: OutputEventType::New - }) - ); + self.event_tx.send_spawn(Event::Output(OutputEvent { + output: info, + event_type: OutputEventType::New, + })); } else { error!("Output is missing information!"); } @@ -78,13 +75,10 @@ impl OutputHandler for Environment { fn update_output(&mut self, _conn: &Connection, _qh: &QueueHandle, output: WlOutput) { debug!("Handle received output update"); if let Some(info) = self.output_state.info(&output) { - try_send!( - self.event_tx, - Event::Output(OutputEvent { - output: info, - event_type: OutputEventType::Update - }) - ); + self.event_tx.send_spawn(Event::Output(OutputEvent { + output: info, + event_type: OutputEventType::Update, + })); } else { error!("Output is missing information!"); } @@ -93,13 +87,10 @@ impl OutputHandler for Environment { fn output_destroyed(&mut self, _conn: &Connection, _qh: &QueueHandle, output: WlOutput) { debug!("Handle received output destruction"); if let Some(info) = self.output_state.info(&output) { - try_send!( - self.event_tx, - Event::Output(OutputEvent { - output: info, - event_type: OutputEventType::Destroyed - }) - ); + self.event_tx.send_spawn(Event::Output(OutputEvent { + output: info, + event_type: OutputEventType::Destroyed, + })); } else { error!("Output is missing information!"); } diff --git a/src/clients/wayland/wlr_data_control/mod.rs b/src/clients/wayland/wlr_data_control/mod.rs index dbe5f8e9..faa749d5 100644 --- a/src/clients/wayland/wlr_data_control/mod.rs +++ b/src/clients/wayland/wlr_data_control/mod.rs @@ -7,7 +7,8 @@ use self::device::{DataControlDeviceDataExt, DataControlDeviceHandler}; use self::offer::{DataControlDeviceOffer, DataControlOfferHandler, SelectionOffer}; use self::source::DataControlSourceHandler; use super::{Client, Environment, Event, Request, Response}; -use crate::{lock, try_send, Ironbar}; +use crate::channels::AsyncSenderExt; +use crate::{lock, Ironbar}; use device::DataControlDevice; use glib::Bytes; use nix::fcntl::{fcntl, F_GETPIPE_SZ, F_SETPIPE_SZ}; @@ -226,14 +227,11 @@ impl DataControlDeviceHandler for Environment { let Some(mime_type) = MimeType::parse_multiple(&mime_types) else { lock!(self.clipboard).take(); // send an event so the clipboard module is aware it's changed - try_send!( - self.event_tx, - Event::Clipboard(ClipboardItem { - id: usize::MAX, - mime_type: String::new().into(), - value: Arc::new(ClipboardValue::Other) - }) - ); + self.event_tx.send_spawn(Event::Clipboard(ClipboardItem { + id: usize::MAX, + mime_type: String::new().into(), + value: Arc::new(ClipboardValue::Other), + })); return; }; @@ -258,7 +256,8 @@ impl DataControlDeviceHandler for Environment { match Self::read_file(&mime_type, file.get_mut()) { Ok(item) => { lock!(clipboard).replace(item.clone()); - try_send!(tx, Event::Clipboard(item)); + + tx.send_spawn(Event::Clipboard(item)); } Err(err) => error!("{err:?}"), } diff --git a/src/clients/wayland/wlr_foreign_toplevel/mod.rs b/src/clients/wayland/wlr_foreign_toplevel/mod.rs index c9baf0fe..cc0eb125 100644 --- a/src/clients/wayland/wlr_foreign_toplevel/mod.rs +++ b/src/clients/wayland/wlr_foreign_toplevel/mod.rs @@ -4,11 +4,11 @@ pub mod manager; use self::handle::ToplevelHandleHandler; use self::manager::ToplevelManagerHandler; use super::{Client, Environment, Event, Request, Response}; -use crate::try_send; use tokio::sync::broadcast; use tracing::{debug, error, trace}; use wayland_client::{Connection, QueueHandle}; +use crate::channels::AsyncSenderExt; pub use handle::{ToplevelHandle, ToplevelInfo}; #[derive(Debug, Clone)] @@ -71,7 +71,8 @@ impl ToplevelHandleHandler for Environment { trace!("Adding new handle: {info:?}"); self.handles.push(handle.clone()); if let Some(info) = handle.info() { - try_send!(self.event_tx, Event::Toplevel(ToplevelEvent::New(info))); + self.event_tx + .send_spawn(Event::Toplevel(ToplevelEvent::New(info))); } } None => { @@ -92,7 +93,8 @@ impl ToplevelHandleHandler for Environment { Some(info) => { trace!("Updating handle: {info:?}"); if let Some(info) = handle.info() { - try_send!(self.event_tx, Event::Toplevel(ToplevelEvent::Update(info))); + self.event_tx + .send_spawn(Event::Toplevel(ToplevelEvent::Update(info))); } } None => { @@ -111,7 +113,8 @@ impl ToplevelHandleHandler for Environment { self.handles.retain(|h| h != &handle); if let Some(info) = handle.info() { - try_send!(self.event_tx, Event::Toplevel(ToplevelEvent::Remove(info))); + self.event_tx + .send_spawn(Event::Toplevel(ToplevelEvent::Remove(info))); } } } diff --git a/src/dynamic_value/dynamic_bool.rs b/src/dynamic_value/dynamic_bool.rs index c2162fc2..306d5c21 100644 --- a/src/dynamic_value/dynamic_bool.rs +++ b/src/dynamic_value/dynamic_bool.rs @@ -1,7 +1,8 @@ +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::script::Script; -use crate::{glib_recv_mpsc, spawn, try_send}; +use crate::spawn; #[cfg(feature = "ipc")] -use crate::{send_async, Ironbar}; +use crate::Ironbar; use cfg_if::cfg_if; use serde::Deserialize; use tokio::sync::mpsc; @@ -18,7 +19,7 @@ pub enum DynamicBool { } impl DynamicBool { - pub fn subscribe(self, mut f: F) + pub fn subscribe(self, f: F) where F: FnMut(bool) + 'static, { @@ -41,15 +42,14 @@ impl DynamicBool { }; let (tx, rx) = mpsc::channel(32); - - glib_recv_mpsc!(rx, val => f(val)); + rx.recv_glib(f); spawn(async move { match value { DynamicBool::Script(script) => { script .run(None, |_, success| { - try_send!(tx, success); + tx.send_spawn(success); }) .await; } @@ -62,7 +62,7 @@ impl DynamicBool { while let Ok(value) = rx.recv().await { let has_value = value.is_some_and(|s| is_truthy(&s)); - send_async!(tx, has_value); + tx.send_expect(has_value).await; } } DynamicBool::Unknown(_) => unreachable!(), diff --git a/src/dynamic_value/dynamic_string.rs b/src/dynamic_value/dynamic_string.rs index 7c0dbdb9..fbdb57e2 100644 --- a/src/dynamic_value/dynamic_string.rs +++ b/src/dynamic_value/dynamic_string.rs @@ -1,7 +1,8 @@ +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::script::{OutputStream, Script}; #[cfg(feature = "ipc")] use crate::Ironbar; -use crate::{arc_mut, glib_recv_mpsc, lock, spawn, try_send}; +use crate::{arc_mut, lock, spawn}; use tokio::sync::mpsc; /// A segment of a dynamic string, @@ -25,7 +26,7 @@ enum DynamicStringSegment { /// label.set_label_escaped(&string); /// }); /// ``` -pub fn dynamic_string(input: &str, mut f: F) +pub fn dynamic_string(input: &str, f: F) where F: FnMut(String) + 'static, { @@ -55,7 +56,7 @@ where let _: String = std::mem::replace(&mut label_parts[i], out); let string = label_parts.join(""); - try_send!(tx, string); + tx.send_spawn(string); } }) .await; @@ -80,7 +81,7 @@ where let _: String = std::mem::replace(&mut label_parts[i], value); let string = label_parts.join(""); - try_send!(tx, string); + tx.send_spawn(string); } } }); @@ -88,12 +89,12 @@ where } } - glib_recv_mpsc!(rx , val => f(val)); + rx.recv_glib(f); // initialize if is_static { let label_parts = lock!(label_parts).join(""); - try_send!(tx, label_parts); + tx.send_spawn(label_parts); } } diff --git a/src/image/provider.rs b/src/image/provider.rs index 5004c37f..7b46e351 100644 --- a/src/image/provider.rs +++ b/src/image/provider.rs @@ -1,6 +1,7 @@ +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::desktop_file::get_desktop_icon_name; #[cfg(feature = "http")] -use crate::{glib_recv_mpsc, send_async, spawn}; +use crate::spawn; use cfg_if::cfg_if; use color_eyre::{Help, Report, Result}; use gtk::cairo::Surface; @@ -151,14 +152,14 @@ impl<'a> ImageProvider<'a> { spawn(async move { let bytes = Self::get_bytes_from_http(url).await; if let Ok(bytes) = bytes { - send_async!(tx, bytes); + tx.send_expect(bytes).await; } }); { let size = self.size; let image = image.clone(); - glib_recv_mpsc!(rx, bytes => { + rx.recv_glib(move |bytes| { let stream = MemoryInputStream::from_bytes(&bytes); let scale = image.scale_factor(); @@ -173,8 +174,7 @@ impl<'a> ImageProvider<'a> { ); // Different error types makes this a bit awkward - match pixbuf.map(|pixbuf| Self::create_and_load_surface(&pixbuf, &image)) - { + match pixbuf.map(|pixbuf| Self::create_and_load_surface(&pixbuf, &image)) { Ok(Err(err)) => error!("{err:?}"), Err(err) => error!("{err:?}"), _ => {} diff --git a/src/ipc/server/mod.rs b/src/ipc/server/mod.rs index 463e8faa..546d25e9 100644 --- a/src/ipc/server/mod.rs +++ b/src/ipc/server/mod.rs @@ -13,11 +13,11 @@ use tokio::net::{UnixListener, UnixStream}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{debug, error, info, warn}; +use super::Ipc; +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::ipc::{Command, Response}; use crate::style::load_css; -use crate::{glib_recv_mpsc, send_async, spawn, try_send, Ironbar}; - -use super::Ipc; +use crate::{spawn, Ironbar}; impl Ipc { /// Starts the IPC server on its socket. @@ -66,9 +66,9 @@ impl Ipc { }); let application = application.clone(); - glib_recv_mpsc!(cmd_rx, command => { + cmd_rx.recv_glib(move |command| { let res = Self::handle_command(command, &application, &ironbar); - try_send!(res_tx, res); + res_tx.send_spawn(res); }); } @@ -91,7 +91,7 @@ impl Ipc { debug!("Received command: {command:?}"); - send_async!(cmd_tx, command); + cmd_tx.send_expect(command).await; let res = res_rx .recv() .await diff --git a/src/ironvar.rs b/src/ironvar.rs index 190fea20..f0867cbd 100644 --- a/src/ironvar.rs +++ b/src/ironvar.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../docs/Ironvars.md")] -use crate::send; +use crate::channels::SyncSenderExt; use color_eyre::{Report, Result}; use std::collections::HashMap; use tokio::sync::broadcast; @@ -94,14 +94,14 @@ impl IronVar { /// The change is broadcast to all receivers. fn set(&mut self, value: Option) { self.value.clone_from(&value); - send!(self.tx, value); + self.tx.send_expect(value); } /// Subscribes to the variable. /// The latest value is immediately sent to all receivers. fn subscribe(&self) -> broadcast::Receiver> { let rx = self.tx.subscribe(); - send!(self.tx, self.value.clone()); + self.tx.send_expect(self.value.clone()); rx } } diff --git a/src/macros.rs b/src/macros.rs index 2fe924f0..c2a9007f 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -26,137 +26,6 @@ macro_rules! module_impl { }; } -/// Sends a message on an asynchronous `Sender` using `send()` -/// Panics if the message cannot be sent. -/// -/// # Usage: -/// -/// ```rs -/// send_async!(tx, "my message"); -/// ``` -#[macro_export] -macro_rules! send_async { - ($tx:expr, $msg:expr) => { - $tx.send($msg).await.expect($crate::error::ERR_CHANNEL_SEND) - }; -} - -/// Sends a message on a synchronous `Sender` using `send()` -/// Panics if the message cannot be sent. -/// -/// # Usage: -/// -/// ```rs -/// send!(tx, "my message"); -/// ``` -#[macro_export] -macro_rules! send { - ($tx:expr, $msg:expr) => { - $tx.send($msg).expect($crate::error::ERR_CHANNEL_SEND) - }; -} - -/// Sends a message on a synchronous `Sender` using `try_send()` -/// Panics if the message cannot be sent. -/// -/// # Usage: -/// -/// ```rs -/// try_send!(tx, "my message"); -/// ``` -#[macro_export] -macro_rules! try_send { - ($tx:expr, $msg:expr) => { - $tx.try_send($msg).expect($crate::error::ERR_CHANNEL_SEND) - }; -} - -/// Sends a message, wrapped inside a `ModuleUpdateEvent::Update` variant, -/// on an asynchronous `Sender` using `send()`. -/// -/// This is a convenience wrapper around `send_async` -/// to avoid needing to write the full enum every time. -/// -/// Panics if the message cannot be sent. -/// -/// # Usage: -/// -/// ```rs -/// module_update!(tx, "my event"); -/// ``` -#[macro_export] -macro_rules! module_update { - ($tx:expr, $msg:expr) => { - send_async!($tx, $crate::modules::ModuleUpdateEvent::Update($msg)) - }; -} - -/// Spawns a `GLib` future on the local thread, and calls `rx.recv()` -/// in a loop. -/// -/// This allows use of `GObjects` and futures in the same context. -/// -/// For use with receivers which return a `Result`. -/// -/// # Example -/// -/// ```rs -/// let (tx, mut rx) = broadcast::channel(32); -/// glib_recv(rx, msg => println!("{msg}")); -/// ``` -#[macro_export] -macro_rules! glib_recv { - ($rx:expr, $func:ident) => { glib_recv!($rx, ev => $func(ev)) }; - - ($rx:expr, $val:ident => $expr:expr) => {{ - glib::spawn_future_local(async move { - // re-delcare in case ie `context.subscribe()` is passed directly - let mut rx = $rx; - loop { - match rx.recv().await { - Ok($val) => $expr, - Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => { - tracing::warn!("Channel lagged behind by {count}, this may result in unexpected or broken behaviour"); - } - Err(err) => { - tracing::error!("{err:?}"); - break; - } - } - } - }); - }}; -} - -/// Spawns a `GLib` future on the local thread, and calls `rx.recv()` -/// in a loop. -/// -/// This allows use of `GObjects` and futures in the same context. -/// -/// For use with receivers which return an `Option`, -/// such as Tokio's `mpsc` channel. -/// -/// # Example -/// -/// ```rs -/// let (tx, mut rx) = broadcast::channel(32); -/// glib_recv_mpsc(rx, msg => println!("{msg}")); -/// ``` -#[macro_export] -macro_rules! glib_recv_mpsc { - ($rx:expr, $func:ident) => { glib_recv_mpsc!($rx, ev => $func(ev)) }; - - ($rx:expr, $val:ident => $expr:expr) => {{ - glib::spawn_future_local(async move { - // re-delcare in case ie `context.subscribe()` is passed directly - let mut rx = $rx; - while let Some($val) = rx.recv().await { - $expr - } - }); - }}; -} - /// Locks a `Mutex`. /// Panics if the `Mutex` cannot be locked. /// diff --git a/src/main.rs b/src/main.rs index 03e50910..b15c8d9e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,7 @@ use tracing::{debug, error, info, warn}; use universal_config::ConfigLoader; use crate::bar::{create_bar, Bar}; +use crate::channels::SyncSenderExt; use crate::clients::wayland::OutputEventType; use crate::clients::Clients; use crate::config::{Config, MonitorConfig}; @@ -36,6 +37,7 @@ use crate::ironvar::VariableManager; use crate::style::load_css; mod bar; +mod channels; #[cfg(feature = "cli")] mod cli; mod clients; @@ -201,7 +203,7 @@ impl Ironbar { .expect("Error setting Ctrl-C handler"); let hold = app.hold(); - send!(activate_tx, hold); + activate_tx.send_expect(hold); }); { diff --git a/src/modules/cairo.rs b/src/modules/cairo.rs index b5c144ff..8095a5b9 100644 --- a/src/modules/cairo.rs +++ b/src/modules/cairo.rs @@ -1,6 +1,7 @@ +use crate::channels::{AsyncSenderExt, BroadcastReceiverExt}; use crate::config::CommonConfig; -use crate::modules::{Module, ModuleInfo, ModuleParts, ModuleUpdateEvent, WidgetContext}; -use crate::{glib_recv, module_impl, spawn, try_send}; +use crate::modules::{Module, ModuleInfo, ModuleParts, WidgetContext}; +use crate::{module_impl, spawn}; use cairo::{Format, ImageSurface}; use glib::translate::IntoGlibPtr; use glib::Propagation; @@ -87,7 +88,7 @@ impl Module for CairoModule { debug!("{event:?}"); if event.paths.first().is_some_and(|p| p == &path) { - try_send!(tx, ModuleUpdateEvent::Update(())); + tx.send_update_spawn(()); } } Err(e) => error!("Error occurred when watching stylesheet: {:?}", e), @@ -187,22 +188,20 @@ impl Module for CairoModule { } }); - glib_recv!(context.subscribe(), _ev => { + context.subscribe().recv_glib(move |_ev| { let res = fs::read_to_string(&self.path) .map(|s| s.replace("function draw", format!("function __draw_{id}").as_str())); match res { - Ok(script) => { - match lua.load(&script).exec() { - Ok(()) => {}, - Err(Error::SyntaxError { message, ..}) => { - let message = message.split_once("]:").expect("to exist").1; - error!("[lua syntax error] {}:{message}", self.path.display()); - }, - Err(err) => error!("lua error: {err:?}") + Ok(script) => match lua.load(&script).exec() { + Ok(()) => {} + Err(Error::SyntaxError { message, .. }) => { + let message = message.split_once("]:").expect("to exist").1; + error!("[lua syntax error] {}:{message}", self.path.display()); } + Err(err) => error!("lua error: {err:?}"), }, - Err(err) => error!("{err:?}") + Err(err) => error!("{err:?}"), } }); diff --git a/src/modules/clipboard.rs b/src/modules/clipboard.rs index 9dbd68c3..ce3b71f6 100644 --- a/src/modules/clipboard.rs +++ b/src/modules/clipboard.rs @@ -1,3 +1,4 @@ +use crate::channels::{AsyncSenderExt, BroadcastReceiverExt}; use crate::clients::clipboard::{self, ClipboardEvent}; use crate::clients::wayland::{ClipboardItem, ClipboardValue}; use crate::config::{CommonConfig, TruncateMode}; @@ -6,7 +7,7 @@ use crate::image::new_icon_button; use crate::modules::{ Module, ModuleInfo, ModuleParts, ModulePopup, ModuleUpdateEvent, PopupButton, WidgetContext, }; -use crate::{glib_recv, module_impl, spawn, try_send}; +use crate::{module_impl, spawn}; use glib::Propagation; use gtk::gdk_pixbuf::Pixbuf; use gtk::gio::{Cancellable, MemoryInputStream}; @@ -103,18 +104,16 @@ impl Module