Skip to content

Commit

Permalink
refactor: replace channel macros with ext trait methods
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeStanger committed Dec 28, 2024
1 parent 67426bd commit 0f5036a
Show file tree
Hide file tree
Showing 48 changed files with 677 additions and 625 deletions.
153 changes: 153 additions & 0 deletions src/channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use crate::modules::ModuleUpdateEvent;
use crate::spawn;
use smithay_client_toolkit::reexports::calloop;
use std::fmt::Debug;
use tokio::sync::{broadcast, mpsc};

pub trait SyncSenderExt<T> {
/// Asynchronously sends a message on the channel,
/// panicking if it cannot be sent.
///
/// This should be used in cases where sending should *never* fail,
/// or where failing indicates a serious bug.
fn send_expect(&self, message: T);
}

impl<T> SyncSenderExt<T> for std::sync::mpsc::Sender<T> {
#[inline]
fn send_expect(&self, message: T) {
self.send(message).expect(crate::error::ERR_CHANNEL_SEND);
}
}

impl<T> SyncSenderExt<T> for calloop::channel::Sender<T> {
#[inline]
fn send_expect(&self, message: T) {
self.send(message).expect(crate::error::ERR_CHANNEL_SEND);
}
}

impl<T: Debug> SyncSenderExt<T> for broadcast::Sender<T> {
#[inline]
fn send_expect(&self, message: T) {
self.send(message).expect(crate::error::ERR_CHANNEL_SEND);
}
}

pub trait AsyncSenderExt<T>: Sync + Send + Sized + Clone {
/// Asynchronously sends a message on the channel,
/// panicking if it cannot be sent.
///
/// This should be used in cases where sending should *never* fail,
/// or where failing indicates a serious bug.
fn send_expect(&self, message: T) -> impl std::future::Future<Output = ()> + Send;

/// Asynchronously sends a message on the channel,
/// spawning a task to allow it to be sent in the background,
/// and panicking if it cannot be sent.
///
/// Note that this function will return *before* the message is sent.
///
/// This should be used in cases where sending should *never* fail,
/// or where failing indicates a serious bug.
#[inline]
fn send_spawn(&self, message: T)
where
Self: 'static,
T: Send + 'static,
{
let tx = self.clone();
spawn(async move { tx.send_expect(message).await });
}

/// Shorthand for [`AsyncSenderExt::send_expect`]
/// when sending a [`ModuleUpdateEvent::Update`].
#[inline]
async fn send_update<U: Clone>(&self, update: U)
where
Self: AsyncSenderExt<ModuleUpdateEvent<U>>,
{
self.send_expect(ModuleUpdateEvent::Update(update)).await;
}

/// Shorthand for [`AsyncSenderExt::send_spawn`]
/// when sending a [`ModuleUpdateEvent::Update`].
#[inline]
fn send_update_spawn<U>(&self, update: U)
where
Self: AsyncSenderExt<ModuleUpdateEvent<U>> + 'static,
U: Clone + Send + 'static,
{
self.send_spawn(ModuleUpdateEvent::Update(update));
}
}

impl<T: Send> AsyncSenderExt<T> for mpsc::Sender<T> {
#[inline]
async fn send_expect(&self, message: T) {
self.send(message)
.await
.expect(crate::error::ERR_CHANNEL_SEND);
}
}

pub trait MpscReceiverExt<T> {
/// Spawns a `GLib` future on the local thread, and calls `rx.recv()`
/// in a loop, passing the message to `f`.
///
/// This allows use of `GObjects` and futures in the same context.
fn recv_glib<F>(self, f: F)
where
F: FnMut(T) + 'static;
}

impl<T: 'static> MpscReceiverExt<T> for mpsc::Receiver<T> {
fn recv_glib<F>(mut self, mut f: F)
where
F: FnMut(T) + 'static,
{
glib::spawn_future_local(async move {
while let Some(val) = self.recv().await {
f(val);
}
});
}
}

pub trait BroadcastReceiverExt<T>
where
T: Debug + Clone + 'static,
{
/// Spawns a `GLib` future on the local thread, and calls `rx.recv()`
/// in a loop, passing the message to `f`.
///
/// This allows use of `GObjects` and futures in the same context.
fn recv_glib<F>(self, f: F)
where
F: FnMut(T) + 'static;
}

impl<T> BroadcastReceiverExt<T> for broadcast::Receiver<T>
where
T: Debug + Clone + 'static,
{
fn recv_glib<F>(mut self, mut f: F)
where
F: FnMut(T) + 'static,
{
glib::spawn_future_local(async move {
loop {
match self.recv().await {
Ok(val) => f(val),
Err(broadcast::error::RecvError::Lagged(count)) => {
tracing::warn!("Channel lagged behind by {count}, this may result in unexpected or broken behaviour");
}
Err(err) => {
tracing::error!("{err:?}");
break;
}
}
}
});
}
}
17 changes: 9 additions & 8 deletions src/clients/clipboard.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::wayland::{self, ClipboardItem};
use crate::{arc_mut, lock, register_client, spawn, try_send};
use crate::channels::AsyncSenderExt;
use crate::{arc_mut, lock, register_client, spawn};
use indexmap::map::Iter;
use indexmap::IndexMap;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -46,7 +47,7 @@ impl Client {
let senders = lock!(senders);
let iter = senders.iter();
for (tx, _) in iter {
try_send!(tx, ClipboardEvent::Add(item.clone()));
tx.send_spawn(ClipboardEvent::Add(item.clone()));
}

lock!(cache).insert(item, senders.len());
Expand Down Expand Up @@ -74,16 +75,16 @@ impl Client {
let removed_id = lock!(cache)
.remove_ref_first()
.expect("Clipboard cache unexpectedly empty");
try_send!(tx, ClipboardEvent::Remove(removed_id));
tx.send_spawn(ClipboardEvent::Remove(removed_id));
}
try_send!(tx, ClipboardEvent::Add(item.clone()));
tx.send_spawn(ClipboardEvent::Add(item.clone()));
}
},
|existing_id| {
let senders = lock!(senders);
let iter = senders.iter();
for (tx, _) in iter {
try_send!(tx, ClipboardEvent::Activate(existing_id));
tx.send_spawn(ClipboardEvent::Activate(existing_id));
}
},
);
Expand All @@ -106,7 +107,7 @@ impl Client {

let iter = cache.iter();
for (_, (item, _)) in iter {
try_send!(tx, ClipboardEvent::Add(item.clone()));
tx.send_spawn(ClipboardEvent::Add(item.clone()));
}
}

Expand All @@ -130,7 +131,7 @@ impl Client {
let senders = lock!(self.senders);
let iter = senders.iter();
for (tx, _) in iter {
try_send!(tx, ClipboardEvent::Activate(id));
tx.send_spawn(ClipboardEvent::Activate(id));
}
}

Expand All @@ -140,7 +141,7 @@ impl Client {
let senders = lock!(self.senders);
let iter = senders.iter();
for (tx, _) in iter {
try_send!(tx, ClipboardEvent::Remove(id));
tx.send_spawn(ClipboardEvent::Remove(id));
}
}
}
Expand Down
55 changes: 22 additions & 33 deletions src/clients/compositor/hyprland.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{Visibility, Workspace, WorkspaceClient, WorkspaceUpdate};
use crate::{arc_mut, lock, send, spawn_blocking};
use crate::channels::SyncSenderExt;
use crate::{arc_mut, lock, spawn_blocking};
use color_eyre::Result;
use hyprland::data::{Workspace as HWorkspace, Workspaces};
use hyprland::dispatch::{Dispatch, DispatchType, WorkspaceIdentifierWithSpecial};
Expand Down Expand Up @@ -58,7 +59,7 @@ impl Client {
let workspace = Self::get_workspace(&workspace_name, prev_workspace.as_ref());

if let Some(workspace) = workspace {
send!(tx, WorkspaceUpdate::Add(workspace));
tx.send_expect(WorkspaceUpdate::Add(workspace));
}
});
}
Expand Down Expand Up @@ -139,7 +140,7 @@ impl Client {
let workspace = Self::get_workspace(&workspace_name, prev_workspace.as_ref());

if let Some(workspace) = workspace {
send!(tx, WorkspaceUpdate::Move(workspace.clone()));
tx.send_expect(WorkspaceUpdate::Move(workspace.clone()));

if !workspace.visibility.is_focused() {
Self::send_focus_change(&mut prev_workspace, workspace, &tx);
Expand All @@ -155,13 +156,10 @@ impl Client {
event_listener.add_workspace_rename_handler(move |data| {
let _lock = lock!(lock);

send!(
tx,
WorkspaceUpdate::Rename {
id: data.workspace_id as i64,
name: data.workspace_name
}
);
tx.send_expect(WorkspaceUpdate::Rename {
id: data.workspace_id as i64,
name: data.workspace_name,
});
});
}

Expand All @@ -172,7 +170,7 @@ impl Client {
event_listener.add_workspace_destroy_handler(move |data| {
let _lock = lock!(lock);
debug!("Received workspace destroy: {data:?}");
send!(tx, WorkspaceUpdate::Remove(data.workspace_id as i64));
tx.send_expect(WorkspaceUpdate::Remove(data.workspace_id as i64));
});
}

Expand All @@ -193,13 +191,10 @@ impl Client {
error!("Unable to locate client");
},
|c| {
send!(
tx,
WorkspaceUpdate::Urgent {
id: c.workspace.id as i64,
urgent: true,
}
);
tx.send_expect(WorkspaceUpdate::Urgent {
id: c.workspace.id as i64,
urgent: true,
});
},
);
});
Expand All @@ -218,21 +213,15 @@ impl Client {
workspace: Workspace,
tx: &Sender<WorkspaceUpdate>,
) {
send!(
tx,
WorkspaceUpdate::Focus {
old: prev_workspace.take(),
new: workspace.clone(),
}
);
tx.send_expect(WorkspaceUpdate::Focus {
old: prev_workspace.take(),
new: workspace.clone(),
});

send!(
tx,
WorkspaceUpdate::Urgent {
id: workspace.id,
urgent: false,
}
);
tx.send_expect(WorkspaceUpdate::Urgent {
id: workspace.id,
urgent: false,
});

prev_workspace.replace(workspace);
}
Expand Down Expand Up @@ -292,7 +281,7 @@ impl WorkspaceClient for Client {
})
.collect();

send!(tx, WorkspaceUpdate::Init(workspaces));
tx.send_expect(WorkspaceUpdate::Init(workspaces));
}

rx
Expand Down
10 changes: 5 additions & 5 deletions src/clients/compositor/sway.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::{Visibility, Workspace, WorkspaceClient, WorkspaceUpdate};
use crate::{await_sync, send};
use crate::await_sync;
use crate::channels::SyncSenderExt;
use crate::clients::sway::Client;
use color_eyre::Result;
use swayipc_async::{Node, WorkspaceChange, WorkspaceEvent};
use tokio::sync::broadcast::{channel, Receiver};

use crate::clients::sway::Client;

impl WorkspaceClient for Client {
fn focus(&self, id: String) -> Result<()> {
await_sync(async move {
Expand All @@ -27,13 +27,13 @@ impl WorkspaceClient for Client {
let event =
WorkspaceUpdate::Init(workspaces.into_iter().map(Workspace::from).collect());

send!(tx, event);
tx.send_expect(event);

drop(client);

self.add_listener::<swayipc_async::WorkspaceEvent>(move |event| {
let update = WorkspaceUpdate::from(event.clone());
send!(tx, update);
tx.send_expect(update);
})
.await
.expect("to add listener");
Expand Down
7 changes: 4 additions & 3 deletions src/clients/libinput.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{arc_rw, read_lock, send, spawn, spawn_blocking, write_lock};
use crate::channels::SyncSenderExt;
use crate::{arc_rw, read_lock, spawn, spawn_blocking, write_lock};
use color_eyre::{Report, Result};
use evdev_rs::enums::{int_to_ev_key, EventCode, EV_KEY, EV_LED};
use evdev_rs::DeviceWrapper;
Expand Down Expand Up @@ -182,7 +183,7 @@ impl Client {
let data = KeyData { device_path, key };

if let Ok(event) = data.try_into() {
send!(tx, event);
tx.send_expect(event);
}
});
}
Expand Down Expand Up @@ -210,7 +211,7 @@ impl Client {
if caps_event.is_ok() {
debug!("new keyboard device: {name} | {}", device_path.display());
write_lock!(self.known_devices).push(device_path.to_path_buf());
send!(self.tx, Event::Device);
self.tx.send_expect(Event::Device);
}
}
}
Expand Down
Loading

0 comments on commit 0f5036a

Please sign in to comment.