diff --git a/crates/daemon/src/connections.rs b/crates/daemon/src/connections.rs index d7a50923..285d4dc4 100644 --- a/crates/daemon/src/connections.rs +++ b/crates/daemon/src/connections.rs @@ -1,400 +1,64 @@ -use anyhow::bail; -use itertools::Itertools; +use std::time::{Duration, SystemTime}; + +use uuid::Uuid; + use moor_kernel::tasks::sessions::SessionError; use moor_values::var::objid::Objid; use rpc_common::RpcRequestError; -use serde_json::{json, Value}; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::RwLock; -use std::time::{Duration, Instant, SystemTime}; -use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tracing::{error, info, warn}; -use uuid::Uuid; - -const CONNECTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30); -#[derive(Debug, Clone, Eq, PartialEq)] -pub(crate) struct ConnectionRecord { - pub(crate) client_id: Uuid, - pub(crate) player: Objid, - pub(crate) name: String, - pub(crate) last_activity: SystemTime, - pub(crate) connect_time: SystemTime, - pub(crate) last_ping: Instant, -} - -/// A database for tracking the client connections, persistently between restarts - -pub struct Connections { - connections_file: PathBuf, - connections_list: RwLock, - next_connection_id: AtomicI64, -} - -#[derive(Clone, Debug)] -struct ConnectionList { - client_connections: HashMap, - connections_client: HashMap>, -} - -impl ConnectionList { - /// Sync out the list of connections to a (human readable) file which we can restore from - /// at restart. - async fn sync(&self, connections_file: &Path) -> Result<(), anyhow::Error> { - // Both hashtables can be reconstituded from the list of connection records, so we only need - // to write that out. - let mut file = File::create(connections_file).await?; - let mut connections = vec![]; - for (_, records) in self.connections_client.iter() { - for record in records { - let connect_time = record - .connect_time - .duration_since(SystemTime::UNIX_EPOCH)? - .as_secs_f64(); - let last_activity = record - .last_activity - .duration_since(SystemTime::UNIX_EPOCH)? - .as_secs_f64(); - let entry = json!({ - "client_id": record.client_id.to_string(), - "player": record.player.to_literal(), - "name": record.name, - "connect_time": connect_time, - "last_activity": last_activity, - }); - connections.push(entry); - } - } - let json_str = serde_json::to_string_pretty(&connections)?; - file.write_all(json_str.as_bytes()).await?; - Ok(()) - } - - async fn from_file(connections_file: &Path) -> Result { - // Reconstitute the list of connections from the file. - let mut file = File::open(connections_file).await?; - let mut contents = vec![]; - file.read_to_end(&mut contents).await?; - let connections: Vec = serde_json::from_slice(&contents)?; - let mut client_connections = HashMap::new(); - let mut connections_client = HashMap::new(); - for record in connections { - let client_id = record - .get("client_id") - .ok_or_else(|| anyhow::anyhow!("Missing client_id"))?; - let client_id = Uuid::parse_str(client_id.as_str().unwrap())?; - let player = record - .get("player") - .ok_or_else(|| anyhow::anyhow!("Missing player"))? - .as_str() - .ok_or_else(|| anyhow::anyhow!("Invalid player value"))?; - // Object id: #1234 is the format, and we need the 1234 part. - let Some(player_oid) = player.strip_prefix('#') else { - bail!("Invalid player value"); - }; - let player = Objid(player_oid.parse()?); +pub const CONNECTION_TIMEOUT_DURATION: Duration = Duration::from_secs(30); - let name = record - .get("name") - .ok_or_else(|| anyhow::anyhow!("Missing name"))? - .as_str() - .ok_or_else(|| anyhow::anyhow!("Invalid name value"))?; - - let connect_time_str = record - .get("connect_time") - .ok_or_else(|| anyhow::anyhow!("Missing connect_time"))?; - let connect_time_since_epoch = connect_time_str.as_f64().ok_or_else(|| { - anyhow::anyhow!("Invalid connect_time value ({})", connect_time_str) - })?; - let connect_time = SystemTime::UNIX_EPOCH - .checked_add(std::time::Duration::from_secs_f64(connect_time_since_epoch)) - .ok_or_else(|| { - anyhow::anyhow!("Invalid connect_time value ({})", connect_time_since_epoch) - })?; - - let last_activity_str = record - .get("last_activity") - .ok_or_else(|| anyhow::anyhow!("Missing last_activity"))?; - let last_activity_since_epoch = last_activity_str.as_f64().ok_or_else(|| { - anyhow::anyhow!("Invalid last_activity value: {:?}", last_activity_str) - })?; - let last_activity = SystemTime::UNIX_EPOCH - .checked_add(std::time::Duration::from_secs_f64( - last_activity_since_epoch, - )) - .ok_or_else(|| anyhow::anyhow!("Invalid last_activity value"))?; - - let cr = ConnectionRecord { - client_id, - player, - name: name.to_string(), - last_activity, - connect_time, - last_ping: Instant::now(), - }; - - client_connections.insert(cr.client_id, cr.player); - match connections_client.get_mut(&cr.player) { - None => { - connections_client.insert(cr.player, vec![cr]); - } - Some(ref mut crs) => { - crs.push(cr); - } - } - } - - Ok(Self { - client_connections, - connections_client, - }) - } -} - -impl Connections { - pub(crate) async fn new(connections_file: PathBuf) -> Self { - // Attempt reconstitute from file, and if that file doesn't exist, create a new one. - let connections_list = match ConnectionList::from_file(&connections_file).await { - Ok(cl) => cl, - Err(e) => { - warn!("No connections list file at: {}, creating a fresh list", e); - ConnectionList { - client_connections: HashMap::new(), - connections_client: HashMap::new(), - } - } - }; - Self { - connections_file, - connections_list: RwLock::new(connections_list), - next_connection_id: AtomicI64::new(-4), - } - } - - async fn sync(&self) { - let copy = self.connections_list.read().unwrap().clone(); - let connections_file = self.connections_file.clone(); - // TODO: this may not need to run so frequently. Maybe we can do it on a timer? - tokio::spawn(async move { - if let Err(e) = copy.sync(&connections_file).await { - error!("Error syncing connections: {}", e); - } - }); - } - - pub(crate) async fn activity_for_client( +#[async_trait::async_trait] +pub trait ConnectionsDB { + /// Update the connection record for the given connection object to point to the given player. + /// This is used when a player logs in. + async fn update_client_connection( &self, - client_id: Uuid, - connobj: Objid, - ) -> Result<(), anyhow::Error> { - let mut inner = self.connections_list.write().unwrap(); - let connection_record = inner - .connections_client - .get_mut(&connobj) - .ok_or_else(|| anyhow::anyhow!("No connection for player: {}", connobj))? - .iter_mut() - .find(|cr| cr.client_id == client_id) - .ok_or_else(|| anyhow::anyhow!("No connection record for client: {}", client_id))?; - connection_record.last_activity = SystemTime::now(); - Ok(()) - } + from_connection: Objid, + to_player: Objid, + ) -> Result<(), anyhow::Error>; - /// Update the last ping time for a client / connection. - pub(crate) async fn notify_is_alive( + /// Create a new connection object for the given client. + async fn new_connection( &self, client_id: Uuid, - connection: Objid, - ) -> Result<(), anyhow::Error> { - let mut inner = self.connections_list.write().unwrap(); - let connections = inner - .connections_client - .get_mut(&connection) - .ok_or_else(|| anyhow::anyhow!("No connection for player: {}", connection))?; - for cr in connections.iter_mut() { - if cr.client_id == client_id { - cr.last_ping = Instant::now(); - break; - } - } - Ok(()) - } - - pub(crate) async fn ping_check(&self) { - let to_remove = { - let inner = self.connections_list.read().unwrap(); - - // Check all connections to see if they have timed out (no ping response in N interval). - // If any have, remove them from the list. - let mut to_remove = vec![]; - for (_, clients) in inner.connections_client.iter() { - for c in clients { - if c.last_ping.elapsed() > CONNECTION_TIMEOUT_DURATION { - to_remove.push(c.client_id); - } - } - } - to_remove - }; - for client in to_remove { - info!("Client {} timed out, removing", client); - self.remove_client_connection(client) - .await - .expect("Unable to remove client connection"); - } - } - - /// Return all connection objects (player or not) - pub(crate) async fn connections(&self) -> Vec { - self.connections_list - .read() - .unwrap() - .connections_client - .keys() - .cloned() - .collect() - } - - pub(crate) async fn is_valid_client(&self, client_id: Uuid) -> bool { - self.connections_list - .read() - .unwrap() - .client_connections - .contains_key(&client_id) - } - - pub(crate) async fn connection_object_for_client(&self, client_id: Uuid) -> Option { - self.connections_list - .read() - .unwrap() - .client_connections - .get(&client_id) - .cloned() - } + hostname: String, + ) -> Result; - pub(crate) async fn remove_client_connection( + /// Record activity for the given client. + async fn record_client_activity( &self, client_id: Uuid, - ) -> Result<(), anyhow::Error> { - { - let mut inner = self.connections_list.write().unwrap(); - - let Some(connection) = inner.client_connections.remove(&client_id) else { - bail!("No (expected) connection for client: {}", client_id); - }; - - let Some(clients) = inner.connections_client.get_mut(&connection) else { - bail!("No (expected) connection record for player: {}", connection); - }; - - clients.retain(|c| c.client_id != client_id); - } - self.sync().await; - Ok(()) - } + connobj: Objid, + ) -> Result<(), anyhow::Error>; - pub(crate) async fn new_connection( + /// Update the last ping time for a client / connection. + async fn notify_is_alive( &self, client_id: Uuid, - hostname: String, - ) -> Result { - let connection_id = { - let mut inner = self.connections_list.write().unwrap(); - - // We should not already have an object connection id for this client. If we do, - // respond with an error. - - if inner.client_connections.contains_key(&client_id) { - return Err(RpcRequestError::AlreadyConnected); - } + connection: Objid, + ) -> Result<(), anyhow::Error>; - // Get a new connection id, and create an entry for it. - let connection_id = Objid(self.next_connection_id.fetch_sub(1, Ordering::SeqCst)); - inner.client_connections.insert(client_id, connection_id); - inner.connections_client.insert( - connection_id, - vec![ConnectionRecord { - client_id, - player: connection_id, - name: hostname, - last_activity: SystemTime::now(), - connect_time: SystemTime::now(), - last_ping: Instant::now(), - }], - ); - connection_id - }; - self.sync().await; - Ok(connection_id) - } + /// Prune any connections that have not been active for longer than the required duration. + async fn ping_check(&self); - pub(crate) async fn connection_records_for( - &self, - player: Objid, - ) -> Result, SessionError> { - let inner = self.connections_list.read().unwrap(); - let Some(connections) = inner.connections_client.get(&player) else { - return Ok(vec![]); - }; + async fn last_activity_for(&self, connection: Objid) -> Result; - if connections.is_empty() { - return Ok(vec![]); - } + async fn connection_name_for(&self, player: Objid) -> Result; - Ok(connections - .iter() - .sorted_by_key(|a| a.last_activity) - .cloned() - .collect()) - } + async fn connected_seconds_for(&self, player: Objid) -> Result; - pub(crate) async fn update_client_connection( - &self, - from_connection: Objid, - to_player: Objid, - ) -> Result<(), anyhow::Error> { - { - let mut inner = self.connections_list.write().unwrap(); + async fn client_ids_for(&self, player: Objid) -> Result, SessionError>; - let mut connection_records = inner - .connections_client - .remove(&from_connection) - .expect("connection record missing"); - assert_eq!( - connection_records.len(), - 1, - "connection record for unlogged in connection has multiple entries" - ); - let mut cr = connection_records.pop().unwrap(); - cr.player = to_player; - cr.last_activity = SystemTime::now(); + /// Return all connection objects (player or not) + async fn connections(&self) -> Vec; - inner.client_connections.insert(cr.client_id, to_player); - match inner.connections_client.get_mut(&to_player) { - None => { - inner.connections_client.insert(to_player, vec![cr]); - } - Some(ref mut crs) => { - crs.push(cr); - } - } - inner.connections_client.remove(&from_connection); - } - self.sync().await; + /// Return whether the given client is a valid client. + async fn is_valid_client(&self, client_id: Uuid) -> bool; - Ok(()) - } -} + /// Retrieve the connection object for the given client. + async fn connection_object_for_client(&self, client_id: Uuid) -> Option; -impl Drop for Connections { - fn drop(&mut self) { - let copy = self.connections_list.read().unwrap().clone(); - let connections_file = self.connections_file.clone(); - if let Err(e) = tokio::runtime::Handle::current().block_on(copy.sync(&connections_file)) { - error!("Error syncing connections: {}", e); - } - info!("Connections sync'd: {:?} ... {:?}", connections_file, copy); - } + /// Remove the given client from the connection database. + async fn remove_client_connection(&self, client_id: Uuid) -> Result<(), anyhow::Error>; } diff --git a/crates/daemon/src/connections_tb.rs b/crates/daemon/src/connections_tb.rs new file mode 100644 index 00000000..9ab7748b --- /dev/null +++ b/crates/daemon/src/connections_tb.rs @@ -0,0 +1,583 @@ +//! An implementation of the connections db that uses tuplebox. + +use std::collections::HashSet; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use anyhow::Error; +use async_trait::async_trait; +use strum::{Display, EnumCount, EnumIter, IntoEnumIterator}; +use tracing::{debug, error, warn}; +use uuid::Uuid; + +use moor_db::tuplebox; +use moor_db::tuplebox::tb::{RelationInfo, TupleBox}; +use moor_db::tuplebox::RelationId; +use moor_kernel::tasks::sessions::SessionError; +use moor_values::util::slice_ref::SliceRef; +use moor_values::var::objid::Objid; +use moor_values::AsByteBuffer; +use rpc_common::RpcRequestError; + +use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION}; + +const CONNECTIONS_DB_MEM_SIZE: usize = 1 << 26; +const CONNECTIONS_DB_PAGE_SIZE: usize = 32768; +pub struct ConnectionsTb { + tb: Arc, +} + +impl ConnectionsTb { + pub async fn new(path: Option) -> Self { + let mut relations: Vec = ConnectionRelation::iter() + .map(|r| { + RelationInfo { + name: r.to_string(), + domain_type_id: 0, /* tbd */ + codomain_type_id: 0, + secondary_indexed: false, + } + }) + .collect(); + relations[ConnectionRelation::ClientConnection as usize].secondary_indexed = true; + + let tb = TupleBox::new( + CONNECTIONS_DB_MEM_SIZE, + CONNECTIONS_DB_PAGE_SIZE, + path, + &relations, + 1, + ) + .await; + Self { tb } + } +} + +#[repr(usize)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, EnumIter, EnumCount, Display)] +enum ConnectionRelation { + // One to many, client id <-> connection/player object. Secondary index will seek on object id. + ClientConnection = 0, + // Client -> SystemTime of last activity + ClientActivity = 1, + // Client connect time. + ClientConnectTime = 2, + // Client last ping time. + ClientPingTime = 3, + // Client hostname / connection "name" + ClientName = 4, +} + +impl ConnectionsTb { + async fn most_recent_client_connection( + tx: &tuplebox::Transaction, + connection_obj: Objid, + ) -> Result, SessionError> { + let clients = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .seek_by_codomain(connection_obj.as_sliceref()) + .await + .expect("Unable to seek client connection"); + + // Seek the most recent activity for the connection, so pull in the activity relation for + // each client. + let mut times = Vec::new(); + for (client, _) in clients { + if let Ok(last_activity) = tx + .relation(RelationId(ConnectionRelation::ClientActivity as usize)) + .await + .seek_by_domain(client.clone()) + .await + { + let epoch_time_millis: u128 = + u128::from_le_bytes(last_activity.1.as_slice().try_into().unwrap()); + let time = SystemTime::UNIX_EPOCH + Duration::from_millis(epoch_time_millis as u64); + times.push((client.clone(), time)); + } else { + warn!( + ?client, + ?connection_obj, + "Unable to find last activity for client" + ); + } + } + times.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap()); + Ok(times) + } +} + +fn sliceref_as_time(slc: SliceRef) -> SystemTime { + let epoch_time_millis: u128 = u128::from_le_bytes(slc.as_slice().try_into().unwrap()); + SystemTime::UNIX_EPOCH + Duration::from_millis(epoch_time_millis as u64) +} + +fn now_as_sliceref() -> SliceRef { + SliceRef::from_bytes( + &SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() + .to_le_bytes(), + ) +} + +#[async_trait] +impl ConnectionsDB for ConnectionsTb { + async fn update_client_connection( + &self, + from_connection: Objid, + to_player: Objid, + ) -> Result<(), Error> { + let tx = self.tb.clone().start_tx(); + let client_ids = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .seek_by_codomain(from_connection.as_sliceref()) + .await + .expect("Unable to seek client connection"); + if client_ids.is_empty() { + error!(?from_connection, ?to_player, "No client ids for connection"); + return Err(Error::msg("No client ids for connection")); + } + for (client_id, _) in client_ids { + let _ = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .update_tuple(client_id.clone(), to_player.as_sliceref()) + .await; + } + tx.commit().await?; + Ok(()) + } + + async fn new_connection( + &self, + client_id: Uuid, + hostname: String, + ) -> Result { + // The connection object is pulled from the sequence, then we invert it and subtract from + // -4 to get the connection object, since they always grow downwards from there. + let connection_id = self.tb.clone().sequence_next(0).await; + let connection_id: i64 = -4 - (connection_id as i64); + let connection_oid = Objid(connection_id); + + // Insert the initial tuples for the connection. + let tx = self.tb.clone().start_tx(); + let client_id = SliceRef::from_bytes(client_id.as_bytes()); + tx.relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .insert_tuple(client_id.clone(), connection_oid.as_sliceref()) + .await + .expect("Unable to insert client connection"); + tx.relation(RelationId(ConnectionRelation::ClientActivity as usize)) + .await + .insert_tuple(client_id.clone(), now_as_sliceref()) + .await + .expect("Unable to insert client activity"); + tx.relation(RelationId(ConnectionRelation::ClientConnectTime as usize)) + .await + .insert_tuple(client_id.clone(), now_as_sliceref()) + .await + .expect("Unable to insert client connect time"); + tx.relation(RelationId(ConnectionRelation::ClientPingTime as usize)) + .await + .insert_tuple(client_id.clone(), now_as_sliceref()) + .await + .expect("Unable to insert client ping time"); + tx.relation(RelationId(ConnectionRelation::ClientName as usize)) + .await + .insert_tuple(client_id.clone(), SliceRef::from_bytes(hostname.as_bytes())) + .await + .expect("Unable to insert client name"); + + tx.commit().await.expect("Unable to commit transaction"); + + Ok(connection_oid) + } + + async fn record_client_activity(&self, client_id: Uuid, _connobj: Objid) -> Result<(), Error> { + let tx = self.tb.clone().start_tx(); + tx.relation(RelationId(ConnectionRelation::ClientActivity as usize)) + .await + .upsert_tuple( + SliceRef::from_bytes(client_id.as_bytes()), + now_as_sliceref(), + ) + .await + .expect("Unable to update client activity"); + tx.commit().await?; + Ok(()) + } + + async fn notify_is_alive(&self, client_id: Uuid, _connection: Objid) -> Result<(), Error> { + let tx = self.tb.clone().start_tx(); + tx.relation(RelationId(ConnectionRelation::ClientPingTime as usize)) + .await + .upsert_tuple( + SliceRef::from_bytes(client_id.as_bytes()), + now_as_sliceref(), + ) + .await + .expect("Unable to update client ping time"); + tx.commit().await?; + Ok(()) + } + + async fn ping_check(&self) { + let now = SystemTime::now(); + let timeout_threshold = now - CONNECTION_TIMEOUT_DURATION; + + // Full scan the last ping relation, and compare the last ping time to the current time. + // If the difference is greater than the timeout duration, then we need to remove the + // connection from all the relations. + let tx = self.tb.clone().start_tx(); + + let last_ping_relation = tx + .relation(RelationId(ConnectionRelation::ClientPingTime as usize)) + .await; + let expired = last_ping_relation + .predicate_scan(&|(_, last_ping_time)| { + let last_ping_time = sliceref_as_time(last_ping_time.clone()); + last_ping_time < timeout_threshold + }) + .await + .expect("Unable to scan last ping relation"); + + for (client_id, last_ping_time) in expired { + debug!( + "Expiring connection for client {:?} because last_ping_time = {:?}", + client_id, + sliceref_as_time(last_ping_time) + ); + let _ = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .remove_by_domain(client_id.clone()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientActivity as usize)) + .await + .remove_by_domain(client_id.clone()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientConnectTime as usize)) + .await + .remove_by_domain(client_id.clone()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientPingTime as usize)) + .await + .remove_by_domain(client_id.clone()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientName as usize)) + .await + .remove_by_domain(client_id.clone()) + .await; + } + tx.commit().await.expect("Unable to commit transaction"); + } + + async fn last_activity_for(&self, connection_obj: Objid) -> Result { + let tx = self.tb.clone().start_tx(); + let mut client_times = Self::most_recent_client_connection(&tx, connection_obj).await?; + + // Most recent time is the last one. + let Some(time) = client_times.pop() else { + return Err(SessionError::NoConnectionForPlayer(connection_obj)); + }; + tx.commit().await.expect("Unable to commit transaction"); + Ok(time.1) + } + + async fn connection_name_for(&self, connection_obj: Objid) -> Result { + let tx = self.tb.clone().start_tx(); + let mut client_times = Self::most_recent_client_connection(&tx, connection_obj).await?; + + let Some(most_recent) = client_times.pop() else { + return Err(SessionError::NoConnectionForPlayer(connection_obj)); + }; + + let client_id = most_recent.0; + let name = tx + .relation(RelationId(ConnectionRelation::ClientName as usize)) + .await + .seek_by_domain(client_id.clone()) + .await + .expect("Unable to seek client name"); + tx.commit().await.expect("Unable to commit transaction"); + Ok(String::from_utf8(name.1.as_slice().to_vec()).expect("Invalid UTF-8 in client name")) + } + + async fn connected_seconds_for(&self, player: Objid) -> Result { + let tx = self.tb.clone().start_tx(); + // In this case we need to find the earliest connection time for the player, and then + // subtract that from the current time. + let Ok(clients) = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .seek_by_codomain(player.as_sliceref()) + .await + else { + return Err(SessionError::NoConnectionForPlayer(player)); + }; + + let mut times = Vec::new(); + for (client, _) in clients { + if let Ok(connect_time) = tx + .relation(RelationId(ConnectionRelation::ClientConnectTime as usize)) + .await + .seek_by_domain(client.clone()) + .await + { + let time = sliceref_as_time(connect_time.1); + times.push((client.clone(), time)); + } + } + times.sort_by(|(_, a), (_, b)| b.partial_cmp(a).unwrap()); + let earliest = times.pop().expect("No connection for player"); + let earliest = earliest.1; + let now = SystemTime::now(); + let duration = now.duration_since(earliest).expect("Invalid duration"); + let seconds = duration.as_secs_f64(); + tx.commit().await.expect("Unable to commit transaction"); + Ok(seconds) + } + + async fn client_ids_for(&self, player: Objid) -> Result, SessionError> { + let tx = self.tb.clone().start_tx(); + let Ok(clients) = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .seek_by_codomain(player.as_sliceref()) + .await + else { + return Ok(vec![]); + }; + + let mut client_ids = Vec::new(); + for (client, _) in clients { + let client_id = Uuid::from_slice(client.as_slice()).expect("Invalid UUID"); + client_ids.push(client_id); + } + tx.commit().await.expect("Unable to commit transaction"); + Ok(client_ids) + } + + async fn connections(&self) -> Vec { + // Full scan from ClientConnection relation to get all connections, and dump them into a + // hashset (to remove dupes) and return as a vector. + let tx = self.tb.clone().start_tx(); + let mut connections = HashSet::new(); + let clients = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .predicate_scan(&|_| true) + .await + .expect("Unable to scan client connection relation"); + + for (_, connection) in clients { + let connection = Objid::from_sliceref(connection); + connections.insert(connection); + } + + tx.commit().await.expect("Unable to commit transaction"); + connections.into_iter().collect() + } + + async fn is_valid_client(&self, client_id: Uuid) -> bool { + let tx = self.tb.clone().start_tx(); + let is_valid = match tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .seek_by_domain(client_id.as_bytes().as_sliceref()) + .await + { + Ok(_) => true, + Err(_) => false, + }; + tx.commit().await.expect("Unable to commit transaction"); + is_valid + } + + async fn connection_object_for_client(&self, client_id: Uuid) -> Option { + let tx = self.tb.clone().start_tx(); + let connection = match tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .seek_by_domain(client_id.as_bytes().as_sliceref()) + .await + { + Ok((_, connection)) => Some(Objid::from_sliceref(connection)), + Err(_) => None, + }; + tx.commit().await.expect("Unable to commit transaction"); + connection + } + + async fn remove_client_connection(&self, client_id: Uuid) -> Result<(), Error> { + let tx = self.tb.clone().start_tx(); + let _ = tx + .relation(RelationId(ConnectionRelation::ClientConnection as usize)) + .await + .remove_by_domain(client_id.as_bytes().as_sliceref()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientActivity as usize)) + .await + .remove_by_domain(client_id.as_bytes().as_sliceref()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientConnectTime as usize)) + .await + .remove_by_domain(client_id.as_bytes().as_sliceref()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientPingTime as usize)) + .await + .remove_by_domain(client_id.as_bytes().as_sliceref()) + .await; + let _ = tx + .relation(RelationId(ConnectionRelation::ClientName as usize)) + .await + .remove_by_domain(client_id.as_bytes().as_sliceref()) + .await; + + tx.commit().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use moor_values::var::objid::Objid; + + use crate::connections::ConnectionsDB; + use crate::connections_tb::ConnectionsTb; + + /// Simple test of: + /// * Attach a connection<->client + /// * Record activity & verify + /// * Update connection<->client to a new connection + /// * Verify the old connection has no clients + /// * Verify the new connection has the client + /// * Remove the connection<->client + /// * Verify the connection has no clients + #[tokio::test] + async fn test_single_connection() { + let db = Arc::new(ConnectionsTb::new(None).await); + let mut jh = vec![]; + + for x in 1..100 { + let db = db.clone(); + jh.push(tokio::spawn(async move { + let client_id = uuid::Uuid::new_v4(); + let oid = db + .new_connection(client_id, "localhost".to_string()) + .await + .unwrap(); + let client_ids = db.client_ids_for(oid).await.unwrap(); + assert_eq!(client_ids.len(), 1); + assert_eq!(client_ids[0], client_id); + db.record_client_activity(client_id, oid).await.unwrap(); + db.notify_is_alive(client_id, oid).await.unwrap(); + let last_activity = db + .last_activity_for(oid) + .await + .unwrap() + .elapsed() + .unwrap() + .as_secs_f64(); + assert!(last_activity < 1.0); + assert!(db.is_valid_client(client_id).await); + db.update_client_connection(oid, Objid(x)) + .await + .expect("Unable to update client connection"); + let client_ids = db.client_ids_for(Objid(x)).await.unwrap(); + assert_eq!(client_ids.len(), 1); + assert_eq!(client_ids[0], client_id); + db.remove_client_connection(client_id).await.unwrap(); + assert!(!db.is_valid_client(client_id).await); + let client_ids = db.client_ids_for(Objid(x)).await.unwrap(); + assert!(client_ids.is_empty()); + })); + } + for j in jh { + j.await.unwrap(); + } + } + + /// Test that a given player can have multiple clients connected to it. + #[tokio::test] + async fn test_multiple_connections() { + let db = Arc::new(ConnectionsTb::new(None).await); + let mut jh = vec![]; + for x in 1..100 { + let db = db.clone(); + jh.push(tokio::spawn(async move { + let client_id1 = uuid::Uuid::new_v4(); + let client_id2 = uuid::Uuid::new_v4(); + let con_oid1 = db + .new_connection(client_id1, "localhost".to_string()) + .await + .unwrap(); + let con_oid2 = db + .new_connection(client_id2, "localhost".to_string()) + .await + .unwrap(); + db.update_client_connection(con_oid1, Objid(x)) + .await + .expect("Unable to update client connection"); + let client_ids = db.client_ids_for(Objid(x)).await.unwrap(); + assert_eq!(client_ids.len(), 1); + assert!(client_ids.contains(&client_id1)); + + db.update_client_connection(con_oid2, Objid(x)) + .await + .expect("Unable to update client connection"); + let client_ids = db.client_ids_for(Objid(x)).await.unwrap(); + assert_eq!(client_ids.len(), 2); + assert!(client_ids.contains(&client_id2)); + + db.record_client_activity(client_id1, Objid(x)) + .await + .unwrap(); + let last_activity = db + .last_activity_for(Objid(x)) + .await + .unwrap() + .elapsed() + .unwrap() + .as_secs_f64(); + assert!(last_activity < 1.0); + db.remove_client_connection(client_id1).await.unwrap(); + let client_ids = db.client_ids_for(Objid(x)).await.unwrap(); + assert_eq!(client_ids.len(), 1); + assert!(client_ids.contains(&client_id2)); + })); + } + for j in jh { + j.await.unwrap(); + } + } + + // Validate that ping check works. + #[tokio::test] + async fn ping_test() { + let db = Arc::new(ConnectionsTb::new(None).await); + let client_id1 = uuid::Uuid::new_v4(); + let ob = db + .new_connection(client_id1, "localhost".to_string()) + .await + .unwrap(); + db.ping_check().await; + let client_ids = db.connections().await; + assert_eq!(client_ids.len(), 1); + assert!(db.is_valid_client(client_id1).await); + assert_eq!(db.connection_object_for_client(client_id1).await, Some(ob)); + } +} diff --git a/crates/daemon/src/main.rs b/crates/daemon/src/main.rs index 71251fdf..0e339bd4 100644 --- a/crates/daemon/src/main.rs +++ b/crates/daemon/src/main.rs @@ -18,6 +18,7 @@ use rpc_common::{RpcRequestError, RpcResponse, RpcResult}; use crate::rpc_server::zmq_loop; mod connections; +mod connections_tb; mod rpc_server; mod rpc_session; @@ -45,10 +46,10 @@ struct Args { #[arg( short, long, - value_name = "connections-file", - help = "Path to connections list file to use or create", + value_name = "connections-db", + help = "Path to connections database to use or create", value_hint = ValueHint::FilePath, - default_value = "connections.json" + default_value = "connections.db" )] connections_file: PathBuf, diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index 38d3e183..516a0baf 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -5,7 +5,6 @@ use std::time::{Instant, SystemTime}; use anyhow::{Context, Error}; use futures_util::SinkExt; -use itertools::Itertools; use metrics_macros::increment_counter; use tmq::publish::Publish; use tmq::{publish, reply, Multipart}; @@ -31,7 +30,8 @@ use rpc_common::{ BROADCAST_TOPIC, }; -use crate::connections::Connections; +use crate::connections::ConnectionsDB; +use crate::connections_tb::ConnectionsTb; use crate::make_response; use crate::rpc_session::RpcSession; @@ -39,7 +39,7 @@ pub struct RpcServer { publish: Arc>, world_state_source: Arc, scheduler: Scheduler, - connections: Connections, + connections: Arc, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -67,7 +67,7 @@ impl RpcServer { .set_sndtimeo(1) .bind(narrative_endpoint) .unwrap(); - let connections = Connections::new(connections_file).await; + let connections = Arc::new(ConnectionsTb::new(Some(connections_file)).await); info!( "Created connections list, with {} initial known connections", connections.connections().await.len() @@ -239,36 +239,21 @@ impl RpcServer { } pub(crate) async fn connection_name_for(&self, player: Objid) -> Result { - let connections = self.connections.connection_records_for(player).await?; - // Grab the most recent connection record (they are sorted by last_activity, so last item). - Ok(connections.last().unwrap().name.clone()) + self.connections.connection_name_for(player).await } + #[allow(dead_code)] async fn last_activity_for(&self, player: Objid) -> Result { - let connections = self.connections.connection_records_for(player).await?; - if connections.is_empty() { - return Err(SessionError::NoConnectionForPlayer(player)); - } - // Grab the most recent connection record (they are sorted by last_activity, so last item). - Ok(connections.last().unwrap().last_activity) + self.connections.last_activity_for(player).await } pub(crate) async fn idle_seconds_for(&self, player: Objid) -> Result { - let last_activity = self.last_activity_for(player).await?; + let last_activity = self.connections.last_activity_for(player).await?; Ok(last_activity.elapsed().unwrap().as_secs_f64()) } pub(crate) async fn connected_seconds_for(&self, player: Objid) -> Result { - // Grab the highest of all connection times. - let connections = self.connections.connection_records_for(player).await?; - Ok(connections - .iter() - .map(|c| c.connect_time) - .max() - .unwrap() - .elapsed() - .unwrap() - .as_secs_f64()) + self.connections.connected_seconds_for(player).await } // TODO this will issue physical disconnects to *all* connections for this player. @@ -278,8 +263,7 @@ impl RpcServer { // should be modified to reflect that. pub(crate) async fn disconnect(&self, player: Objid) -> Result<(), SessionError> { warn!("Disconnecting player: {}", player); - let connections = self.connections.connection_records_for(player).await?; - let all_client_ids = connections.iter().map(|c| c.client_id).collect_vec(); + let all_client_ids = self.connections.client_ids_for(player).await?; let mut publish = self.publish.lock().await; let event = ConnectionEvent::Disconnect(); @@ -500,7 +484,7 @@ impl RpcServer { if let Err(e) = self .connections - .activity_for_client(client_id, connection) + .record_client_activity(client_id, connection) .await { warn!("Unable to update client connection activity: {}", e); @@ -570,7 +554,7 @@ impl RpcServer { ) -> Result { if let Err(e) = self .connections - .activity_for_client(client_id, connection) + .record_client_activity(client_id, connection) .await { warn!("Unable to update client connection activity: {}", e); @@ -715,8 +699,7 @@ impl RpcServer { increment_counter!("rpc_server.publish_narrative_events"); let mut publish = self.publish.lock().await; for (player, event) in events { - let connections = self.connections.connection_records_for(*player).await?; - let client_ids = connections.iter().map(|c| c.client_id).collect_vec(); + let client_ids = self.connections.client_ids_for(*player).await?; let event = ConnectionEvent::Narrative(*player, event.clone()); let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard())?; for client_id in &client_ids { diff --git a/crates/daemon/src/rpc_session.rs b/crates/daemon/src/rpc_session.rs index c7ead6ff..89b3f12d 100644 --- a/crates/daemon/src/rpc_session.rs +++ b/crates/daemon/src/rpc_session.rs @@ -1,13 +1,16 @@ -use crate::rpc_server::RpcServer; -use async_trait::async_trait; -use moor_kernel::tasks::sessions::{Session, SessionError}; -use moor_values::model::NarrativeEvent; -use moor_values::var::objid::Objid; use std::sync::Arc; + +use async_trait::async_trait; use tokio::sync::Mutex; use tracing::trace; use uuid::Uuid; +use moor_kernel::tasks::sessions::{Session, SessionError}; +use moor_values::model::NarrativeEvent; +use moor_values::var::objid::Objid; + +use crate::rpc_server::RpcServer; + /// A "session" that runs over the RPC system. pub struct RpcSession { client_id: Uuid, diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index d1fd3487..3ffda943 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -86,7 +86,7 @@ impl DatabaseBuilder { DatabaseType::Tuplebox => { let (db, fresh) = TupleBoxWorldStateSource::open( self.path.clone(), - self.memory_size.unwrap_or(1 << 48), + self.memory_size.unwrap_or(1 << 40), ) .await; Ok((Box::new(db), fresh)) diff --git a/crates/db/src/tuplebox/mod.rs b/crates/db/src/tuplebox/mod.rs index abed6001..6ef82181 100644 --- a/crates/db/src/tuplebox/mod.rs +++ b/crates/db/src/tuplebox/mod.rs @@ -19,8 +19,10 @@ pub mod tb_worldstate; mod tuples; mod tx; +pub use tx::transaction::Transaction; + #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)] -pub struct RelationId(usize); +pub struct RelationId(pub usize); impl RelationId { pub fn transient(id: usize) -> Self { diff --git a/crates/db/src/tuplebox/tuples/mod.rs b/crates/db/src/tuplebox/tuples/mod.rs index 2d09c833..b7178ea0 100644 --- a/crates/db/src/tuplebox/tuples/mod.rs +++ b/crates/db/src/tuplebox/tuples/mod.rs @@ -10,7 +10,7 @@ mod tx_tuple; pub struct TupleRef { sb: Arc, - id: TupleId, + pub id: TupleId, } impl PartialEq for TupleRef { diff --git a/crates/db/src/tuplebox/tx/relvar.rs b/crates/db/src/tuplebox/tx/relvar.rs index ca613fde..ad7a89a7 100644 --- a/crates/db/src/tuplebox/tx/relvar.rs +++ b/crates/db/src/tuplebox/tx/relvar.rs @@ -61,4 +61,11 @@ impl<'a> RelVar<'a> { pub async fn remove_by_domain(&self, domain: SliceRef) -> Result<(), TupleError> { self.tx.remove_by_domain(self.id, domain).await } + + pub async fn predicate_scan bool>( + &self, + f: &F, + ) -> Result, TupleError> { + self.tx.predicate_scan(self.id, f).await + } } diff --git a/crates/db/src/tuplebox/tx/transaction.rs b/crates/db/src/tuplebox/tx/transaction.rs index 45186507..1c6d6c29 100644 --- a/crates/db/src/tuplebox/tx/transaction.rs +++ b/crates/db/src/tuplebox/tx/transaction.rs @@ -474,7 +474,7 @@ mod tests { use crate::tuplebox::tb::{RelationInfo, TupleBox}; use crate::tuplebox::tuples::TupleError; use crate::tuplebox::tx::transaction::CommitError; - use crate::tuplebox::RelationId; + use crate::tuplebox::{RelationId, Transaction}; fn attr(slice: &[u8]) -> SliceRef { SliceRef::from_bytes(slice) @@ -780,6 +780,46 @@ mod tests { } } + /// Test some few secondary index scenarios: + /// a->b, b->b, c->b = b->{a,b,c} -- before and after commit + #[tokio::test] + async fn secondary_indices() { + let db = test_db().await; + let rid = RelationId(0); + let tx = db.clone().start_tx(); + tx.insert_tuple(rid, attr(b"a"), attr(b"b")).await.unwrap(); + tx.insert_tuple(rid, attr(b"b"), attr(b"b")).await.unwrap(); + tx.insert_tuple(rid, attr(b"c"), attr(b"b")).await.unwrap(); + + async fn verify(tx: &Transaction, expected: Vec<&[u8]>) { + let b_results = tx + .seek_by_codomain(RelationId(0), attr(b"b")) + .await + .unwrap(); + + let mut domains = b_results + .iter() + .map(|(d, _)| d.as_slice()) + .collect::>(); + domains.sort(); + assert_eq!(domains, expected); + } + verify(&tx, vec![b"a", b"b", b"c"]).await; + + tx.commit().await.unwrap(); + + let tx = db.clone().start_tx(); + verify(&tx, vec![b"a", b"b", b"c"]).await; + + // Add another one, in our new transaction + tx.insert_tuple(rid, attr(b"d"), attr(b"b")).await.unwrap(); + verify(&tx, vec![b"a", b"b", b"c", b"d"]).await; + + // And remove one + tx.remove_by_domain(rid, attr(b"c")).await.unwrap(); + verify(&tx, vec![b"a", b"b", b"d"]).await; + } + #[tokio::test] async fn predicate_scan_with_predicate() { let db = test_db().await; diff --git a/crates/db/src/tuplebox/tx/working_set.rs b/crates/db/src/tuplebox/tx/working_set.rs index 30300f4e..b1f69d88 100644 --- a/crates/db/src/tuplebox/tx/working_set.rs +++ b/crates/db/src/tuplebox/tx/working_set.rs @@ -117,8 +117,9 @@ impl WorkingSet { // By performing the seek, we'll materialize the tuples into our local working set, which // will in turn update the codomain index for those tuples. for tuples in tuples_for_codomain { - self.seek_by_domain(db.clone(), relation_id, tuples.get().domain()) - .await?; + let _ = self + .seek_by_domain(db.clone(), relation_id, tuples.get().domain()) + .await; } let relation = &mut self.relations[relation_id.0];