diff --git a/Cargo.lock b/Cargo.lock index 6f08d7b..4f5809b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -366,6 +366,16 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "fs4" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21dabded2e32cd57ded879041205c60a4a4c4bab47bd0fd2fa8b01f30849f02b" +dependencies = [ + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "futures" version = "0.3.30" @@ -958,6 +968,7 @@ dependencies = [ "common-base", "common-runtime", "common-telemetry", + "fs4", "futures", "redis-protocol", "roxy", diff --git a/Cargo.toml b/Cargo.toml index 2097a33..400cb4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ resolver = "2" binrw = "0.13" bytes = { version = "1.6", features = ["serde"] } derive_builder = "0.20" +fs4 = { version = "0.8", features = ["sync"] } futures = { version = "0.3", features = ["std"] } once_cell = "1.19" parking_lot = "0.12" diff --git a/src/roxy/src/storage.rs b/src/roxy/src/storage.rs index 6ce21f0..e9f466f 100644 --- a/src/roxy/src/storage.rs +++ b/src/roxy/src/storage.rs @@ -55,6 +55,12 @@ pub struct StorageConfig { rocksdb: RocksDBConfig, } +impl StorageConfig { + pub fn dbpath(&self) -> &str { + &self.dbpath + } +} + #[derive( Debug, Clone, Copy, Default, PartialEq, Eq, VariantArray, strum::IntoStaticStr, strum::Display, )] diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index fe0a96a..5ee726e 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -11,6 +11,7 @@ path = "src/rudeus.rs" [dependencies] bytes.workspace = true +fs4.workspace = true futures.workspace = true redis-protocol.workspace = true roxy.workspace = true diff --git a/src/server/src/error.rs b/src/server/src/error.rs index 7758d9b..aaa0abe 100644 --- a/src/server/src/error.rs +++ b/src/server/src/error.rs @@ -21,6 +21,18 @@ pub enum Error { UnknownCommand { cmd: String }, #[snafu(transparent)] ExecuteError { source: commands::error::Error }, + #[snafu(display("Failed to bind to address '{}': {}", bind, source))] + BindError { + bind: String, + source: std::io::Error, + }, + #[snafu(display("Another instance is already Instaunning"))] + InstanceAlreadyExists { source: std::io::Error }, + #[snafu(display("Failed to acquire process file lock: '{}'", path))] + AcquireFileLock { + source: std::io::Error, + path: String, + }, } pub type Result = std::result::Result; diff --git a/src/server/src/lib.rs b/src/server/src/lib.rs index 2bebe67..1903a4b 100644 --- a/src/server/src/lib.rs +++ b/src/server/src/lib.rs @@ -16,4 +16,5 @@ pub mod connection; pub mod error; +pub mod rudeus_lock; pub mod server; diff --git a/src/server/src/rudeus.rs b/src/server/src/rudeus.rs index ef46eb1..9f5c009 100644 --- a/src/server/src/rudeus.rs +++ b/src/server/src/rudeus.rs @@ -19,6 +19,7 @@ use common_runtime::global_runtime::{block_on_network, init_global_runtimes}; use common_telemetry::log::{self, LoggingOption}; use roxy::storage::{Storage, StorageConfig}; use serde::{Deserialize, Serialize}; +use server::rudeus_lock::try_lock_rudeus; use server::server::{Server, ServerConfig}; #[derive(Debug, Deserialize, Serialize)] @@ -35,11 +36,12 @@ fn main() -> Result<(), Box> { let config: RudeusConfig = toml::from_str(&config_str)?; let _log_workers = log::init(&config.logging); + let _rudeus_lock = try_lock_rudeus(config.storage.dbpath())?; let mut storage = Storage::try_new(config.storage)?; storage.open(roxy::storage::OpenMode::Default)?; let server = Server::new(Arc::new(storage), config.server); - block_on_network(server.start()); + block_on_network(server.start())?; Ok(()) } diff --git a/src/server/src/rudeus_lock.rs b/src/server/src/rudeus_lock.rs new file mode 100644 index 0000000..51da211 --- /dev/null +++ b/src/server/src/rudeus_lock.rs @@ -0,0 +1,54 @@ +// Copyright 2024 Rudeus Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rudeus lock is a simple file lock implementation for ensuring only one instance of a program is running. + +use std::fs::{File, OpenOptions}; + +use fs4::FileExt as _; +use snafu::ResultExt as _; + +use crate::error::{AcquireFileLockSnafu, InstanceAlreadyExistsSnafu, Result}; + +pub struct RudeusLock { + file: File, +} + +impl RudeusLock { + fn new(file: File) -> Result { + file.try_lock_exclusive() + .context(InstanceAlreadyExistsSnafu)?; + Ok(Self { file }) + } +} + +impl Drop for RudeusLock { + fn drop(&mut self) { + self.file.unlock().unwrap(); + } +} + +pub fn try_lock_rudeus(storage_path: &str) -> Result { + let path = std::path::Path::new(storage_path).join("rudeus.lock"); + let file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(true) + .open(&path) + .context(AcquireFileLockSnafu { + path: path.display().to_string(), + })?; + RudeusLock::new(file) +} diff --git a/src/server/src/server.rs b/src/server/src/server.rs index 90dce14..3a2e7cb 100644 --- a/src/server/src/server.rs +++ b/src/server/src/server.rs @@ -1,9 +1,11 @@ use common_telemetry::log::info; use roxy::storage::{Storage, StorageRef}; use serde::{Deserialize, Serialize}; +use snafu::ResultExt; use tokio::net::TcpListener; use crate::connection::Connection; +use crate::error::{BindSnafu, Result}; #[derive(Debug, Serialize, Deserialize)] pub struct ServerConfig { @@ -27,10 +29,12 @@ impl Server { &self.storage } - pub async fn start(&self) { + pub async fn start(&self) -> Result<()> { // static self here is to make sure that the server is alive for the lifetime of the program info!("Rudeus listening on: {}", self.bind); - let listener = TcpListener::bind(&self.bind).await.unwrap(); + let listener = TcpListener::bind(&self.bind).await.context(BindSnafu { + bind: self.bind.clone(), + })?; loop { let stream = listener.accept().await.map(|(socket, _)| socket).unwrap(); let storage = self.storage.clone();