Skip to content

Commit

Permalink
Switch connections-list database to using TupleBox
Browse files Browse the repository at this point in the history
  * Use the internal database type, same as for the world state
  * Should be / could be a bit faster, but is also more consistent.
  * In the process found a bug in the tuple store with secondary indices
  • Loading branch information
rdaum committed Oct 31, 2023
1 parent 86d8c07 commit 34cd774
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 419 deletions.
414 changes: 39 additions & 375 deletions crates/daemon/src/connections.rs

Large diffs are not rendered by default.

583 changes: 583 additions & 0 deletions crates/daemon/src/connections_tb.rs

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions crates/daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use rpc_common::{RpcRequestError, RpcResponse, RpcResult};
use crate::rpc_server::zmq_loop;

mod connections;
mod connections_tb;
mod rpc_server;
mod rpc_session;

Expand Down Expand Up @@ -45,10 +46,10 @@ struct Args {
#[arg(
short,
long,
value_name = "connections-file",
help = "Path to connections list file to use or create",
value_name = "connections-db",
help = "Path to connections database to use or create",
value_hint = ValueHint::FilePath,
default_value = "connections.json"
default_value = "connections.db"
)]
connections_file: PathBuf,

Expand Down
43 changes: 13 additions & 30 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::{Instant, SystemTime};

use anyhow::{Context, Error};
use futures_util::SinkExt;
use itertools::Itertools;
use metrics_macros::increment_counter;
use tmq::publish::Publish;
use tmq::{publish, reply, Multipart};
Expand All @@ -31,15 +30,16 @@ use rpc_common::{
BROADCAST_TOPIC,
};

use crate::connections::Connections;
use crate::connections::ConnectionsDB;
use crate::connections_tb::ConnectionsTb;
use crate::make_response;
use crate::rpc_session::RpcSession;

pub struct RpcServer {
publish: Arc<Mutex<Publish>>,
world_state_source: Arc<dyn WorldStateSource>,
scheduler: Scheduler,
connections: Connections,
connections: Arc<dyn ConnectionsDB + Send + Sync>,
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -67,7 +67,7 @@ impl RpcServer {
.set_sndtimeo(1)
.bind(narrative_endpoint)
.unwrap();
let connections = Connections::new(connections_file).await;
let connections = Arc::new(ConnectionsTb::new(Some(connections_file)).await);
info!(
"Created connections list, with {} initial known connections",
connections.connections().await.len()
Expand Down Expand Up @@ -239,36 +239,21 @@ impl RpcServer {
}

pub(crate) async fn connection_name_for(&self, player: Objid) -> Result<String, SessionError> {
let connections = self.connections.connection_records_for(player).await?;
// Grab the most recent connection record (they are sorted by last_activity, so last item).
Ok(connections.last().unwrap().name.clone())
self.connections.connection_name_for(player).await
}

#[allow(dead_code)]
async fn last_activity_for(&self, player: Objid) -> Result<SystemTime, SessionError> {
let connections = self.connections.connection_records_for(player).await?;
if connections.is_empty() {
return Err(SessionError::NoConnectionForPlayer(player));
}
// Grab the most recent connection record (they are sorted by last_activity, so last item).
Ok(connections.last().unwrap().last_activity)
self.connections.last_activity_for(player).await
}

pub(crate) async fn idle_seconds_for(&self, player: Objid) -> Result<f64, SessionError> {
let last_activity = self.last_activity_for(player).await?;
let last_activity = self.connections.last_activity_for(player).await?;
Ok(last_activity.elapsed().unwrap().as_secs_f64())
}

pub(crate) async fn connected_seconds_for(&self, player: Objid) -> Result<f64, SessionError> {
// Grab the highest of all connection times.
let connections = self.connections.connection_records_for(player).await?;
Ok(connections
.iter()
.map(|c| c.connect_time)
.max()
.unwrap()
.elapsed()
.unwrap()
.as_secs_f64())
self.connections.connected_seconds_for(player).await
}

// TODO this will issue physical disconnects to *all* connections for this player.
Expand All @@ -278,8 +263,7 @@ impl RpcServer {
// should be modified to reflect that.
pub(crate) async fn disconnect(&self, player: Objid) -> Result<(), SessionError> {
warn!("Disconnecting player: {}", player);
let connections = self.connections.connection_records_for(player).await?;
let all_client_ids = connections.iter().map(|c| c.client_id).collect_vec();
let all_client_ids = self.connections.client_ids_for(player).await?;

let mut publish = self.publish.lock().await;
let event = ConnectionEvent::Disconnect();
Expand Down Expand Up @@ -500,7 +484,7 @@ impl RpcServer {

if let Err(e) = self
.connections
.activity_for_client(client_id, connection)
.record_client_activity(client_id, connection)
.await
{
warn!("Unable to update client connection activity: {}", e);
Expand Down Expand Up @@ -570,7 +554,7 @@ impl RpcServer {
) -> Result<RpcResponse, RpcRequestError> {
if let Err(e) = self
.connections
.activity_for_client(client_id, connection)
.record_client_activity(client_id, connection)
.await
{
warn!("Unable to update client connection activity: {}", e);
Expand Down Expand Up @@ -715,8 +699,7 @@ impl RpcServer {
increment_counter!("rpc_server.publish_narrative_events");
let mut publish = self.publish.lock().await;
for (player, event) in events {
let connections = self.connections.connection_records_for(*player).await?;
let client_ids = connections.iter().map(|c| c.client_id).collect_vec();
let client_ids = self.connections.client_ids_for(*player).await?;
let event = ConnectionEvent::Narrative(*player, event.clone());
let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard())?;
for client_id in &client_ids {
Expand Down
13 changes: 8 additions & 5 deletions crates/daemon/src/rpc_session.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::rpc_server::RpcServer;
use async_trait::async_trait;
use moor_kernel::tasks::sessions::{Session, SessionError};
use moor_values::model::NarrativeEvent;
use moor_values::var::objid::Objid;
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::Mutex;
use tracing::trace;
use uuid::Uuid;

use moor_kernel::tasks::sessions::{Session, SessionError};
use moor_values::model::NarrativeEvent;
use moor_values::var::objid::Objid;

use crate::rpc_server::RpcServer;

/// A "session" that runs over the RPC system.
pub struct RpcSession {
client_id: Uuid,
Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl DatabaseBuilder {
DatabaseType::Tuplebox => {
let (db, fresh) = TupleBoxWorldStateSource::open(
self.path.clone(),
self.memory_size.unwrap_or(1 << 48),
self.memory_size.unwrap_or(1 << 40),
)
.await;
Ok((Box::new(db), fresh))
Expand Down
4 changes: 3 additions & 1 deletion crates/db/src/tuplebox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ pub mod tb_worldstate;
mod tuples;
mod tx;

pub use tx::transaction::Transaction;

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct RelationId(usize);
pub struct RelationId(pub usize);

impl RelationId {
pub fn transient(id: usize) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/tuplebox/tuples/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod tx_tuple;

pub struct TupleRef {
sb: Arc<SlotBox>,
id: TupleId,
pub id: TupleId,
}

impl PartialEq for TupleRef {
Expand Down
7 changes: 7 additions & 0 deletions crates/db/src/tuplebox/tx/relvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,11 @@ impl<'a> RelVar<'a> {
pub async fn remove_by_domain(&self, domain: SliceRef) -> Result<(), TupleError> {
self.tx.remove_by_domain(self.id, domain).await
}

pub async fn predicate_scan<F: Fn(&(SliceRef, SliceRef)) -> bool>(
&self,
f: &F,
) -> Result<Vec<(SliceRef, SliceRef)>, TupleError> {
self.tx.predicate_scan(self.id, f).await
}
}
42 changes: 41 additions & 1 deletion crates/db/src/tuplebox/tx/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ mod tests {
use crate::tuplebox::tb::{RelationInfo, TupleBox};
use crate::tuplebox::tuples::TupleError;
use crate::tuplebox::tx::transaction::CommitError;
use crate::tuplebox::RelationId;
use crate::tuplebox::{RelationId, Transaction};

fn attr(slice: &[u8]) -> SliceRef {
SliceRef::from_bytes(slice)
Expand Down Expand Up @@ -780,6 +780,46 @@ mod tests {
}
}

/// Test some few secondary index scenarios:
/// a->b, b->b, c->b = b->{a,b,c} -- before and after commit
#[tokio::test]
async fn secondary_indices() {
let db = test_db().await;
let rid = RelationId(0);
let tx = db.clone().start_tx();
tx.insert_tuple(rid, attr(b"a"), attr(b"b")).await.unwrap();
tx.insert_tuple(rid, attr(b"b"), attr(b"b")).await.unwrap();
tx.insert_tuple(rid, attr(b"c"), attr(b"b")).await.unwrap();

async fn verify(tx: &Transaction, expected: Vec<&[u8]>) {
let b_results = tx
.seek_by_codomain(RelationId(0), attr(b"b"))
.await
.unwrap();

let mut domains = b_results
.iter()
.map(|(d, _)| d.as_slice())
.collect::<Vec<&[u8]>>();
domains.sort();
assert_eq!(domains, expected);
}
verify(&tx, vec![b"a", b"b", b"c"]).await;

tx.commit().await.unwrap();

let tx = db.clone().start_tx();
verify(&tx, vec![b"a", b"b", b"c"]).await;

// Add another one, in our new transaction
tx.insert_tuple(rid, attr(b"d"), attr(b"b")).await.unwrap();
verify(&tx, vec![b"a", b"b", b"c", b"d"]).await;

// And remove one
tx.remove_by_domain(rid, attr(b"c")).await.unwrap();
verify(&tx, vec![b"a", b"b", b"d"]).await;
}

#[tokio::test]
async fn predicate_scan_with_predicate() {
let db = test_db().await;
Expand Down
5 changes: 3 additions & 2 deletions crates/db/src/tuplebox/tx/working_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ impl WorkingSet {
// By performing the seek, we'll materialize the tuples into our local working set, which
// will in turn update the codomain index for those tuples.
for tuples in tuples_for_codomain {
self.seek_by_domain(db.clone(), relation_id, tuples.get().domain())
.await?;
let _ = self
.seek_by_domain(db.clone(), relation_id, tuples.get().domain())
.await;
}

let relation = &mut self.relations[relation_id.0];
Expand Down

0 comments on commit 34cd774

Please sign in to comment.