From 0fe40e35162547955971422d5ab92864c369d22b Mon Sep 17 00:00:00 2001 From: Serhij S Date: Fri, 26 Jul 2024 01:48:06 +0200 Subject: [PATCH] rtsc 0.3 support --- .github/workflows/ci.yml | 8 ++++++-- Cargo.toml | 10 +++++++++- README.md | 14 ++++++++++++++ src/lib.rs | 9 +++++++++ src/server.rs | 8 ++++---- 5 files changed, 42 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7363b0d..fcd771a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,8 +17,12 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: cargo test - run: cargo test --all-features --all-targets + - name: cargo test default + run: cargo test --all-targets + - name: cargo test locking-rt + run: cargo test --no-default-features --all-features -F locking-rt + - name: cargo test locking-rt-safe + run: cargo test --no-default-features --all-features -F locking-rt-safe fmt: runs-on: ubuntu-latest steps: diff --git a/Cargo.toml b/Cargo.toml index 6f3cd05..45bd24b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,12 +23,20 @@ tracing = "0.1.40" serde = { version = "1.0.203", features = ["derive"] } once_cell = "1.19.0" tokio = { version = "1.38.0", features = ["net", "io-util", "time"], optional = true } -rtsc = "0.2" +rtsc = { path = "/opt/rtsc" } +parking_lot = { version = "0.12.3", optional = true } +parking_lot_rt = { version = "0.12.1", optional = true } [features] async = ["tokio"] full = ["async"] +locking-default = ["dep:parking_lot", "rtsc/parking_lot"] +locking-rt = ["dep:parking_lot_rt"] +locking-rt-safe = [] + +default = ["locking-default"] + [dev-dependencies] imageproc = "0.22" image = "0.23" diff --git a/README.md b/README.md index 58b83bb..04cb5b3 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,20 @@ overhead for an embedded application it is included into: RVideo streams can be received with clients provided by crate. For ready-to-use UI, see the [`rvideo-view`](https://crates.io/crates/rvideo-view) crate. +## Locking safety + +By default, the server uses [parking_lot](https://crates.io/crates/parking_lot) +for locking. For real-time applications, the following features are available: + +* `locking-rt` - use [parking_lot_rt](https://crates.io/crates/parking_lot_rt) + crate which is a spin-free fork of parking_lot. + +* `locking-rt-safe` - use [rtsc](https://crates.io/crates/rtsc) + priority-inheritance locking, which is not affected by priority inversion + (Linux only). + +Note: to switch locking policy, disable the crate default features. + ## About RVideo is a part of [RoboPLC](https://www.roboplc.com/) project. diff --git a/src/lib.rs b/src/lib.rs index 9bb8819..65ed9e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,15 @@ pub use server::Server; use server::StreamServerInner; use std::net::ToSocketAddrs; +#[cfg(feature = "locking-default")] +use parking_lot::{Condvar, Mutex, RawMutex}; + +#[cfg(feature = "locking-rt")] +use parking_lot_rt::{Condvar, Mutex, RawMutex}; + +#[cfg(feature = "locking-rt-safe")] +use rtsc::pi::{Condvar, Mutex, RawMutex}; + const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); static DEFAULT_SERVER: Lazy = Lazy::new(|| Server::new(DEFAULT_TIMEOUT)); diff --git a/src/server.rs b/src/server.rs index 71be878..1680e20 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,7 +8,6 @@ use std::{ }; use binrw::{BinRead, BinWrite}; -use rtsc::locking::Mutex; use rtsc::{cell::DataCell, semaphore::Semaphore}; use tracing::{error, trace}; @@ -16,7 +15,7 @@ const DEFAULT_MAX_CLIENTS: usize = 16; use crate::{Error, Format, Frame, Greetings, Stream, StreamInfo, StreamSelect, API_VERSION}; -type FrameCell = DataCell; +type FrameCell = DataCell; struct StreamInternal { format: Format, @@ -65,7 +64,8 @@ impl Server { /// Run the server pub fn serve(&self, addr: impl ToSocketAddrs + std::fmt::Debug) -> Result<(), Error> { trace!(?addr, "starting server"); - let semaphore = Semaphore::new(self.inner.max_clients.load(atomic::Ordering::Relaxed)); + let semaphore: Semaphore = + Semaphore::new(self.inner.max_clients.load(atomic::Ordering::Relaxed)); let listener = TcpListener::bind(addr)?; while let Ok((mut socket, addr)) = listener.accept() { trace!(?addr, "new connection"); @@ -82,7 +82,7 @@ impl Server { } pub(crate) struct StreamServerInner { - streams: Mutex>, + streams: crate::Mutex>, client_id: atomic::AtomicUsize, timeout: Duration, max_clients: atomic::AtomicUsize,