Skip to content

Commit

Permalink
Some refactoring in db/tuplebox/
Browse files Browse the repository at this point in the history
  * Changes to the structure of how relations are managed in the
  transaction +
  * More work towards supporting transient local relations.
  • Loading branch information
rdaum committed Oct 11, 2023
1 parent 4ed08d2 commit 90d086c
Show file tree
Hide file tree
Showing 13 changed files with 847 additions and 460 deletions.
22 changes: 19 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ strum = { version = "0.25.0", features = ["derive"] }
text_io = "0.1.12" # Used for reading text dumps.
uuid = { version = "1.4.1", features = ["v4"] }
yoke = "0.7.1"
sized-chunks = "0.7.0"

## Required for MOO builtins.
pwhash = "1.0.0" # For MOO's hokey "crypt" function, which is unix's crypt(3) basically
Expand Down
1 change: 1 addition & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ rocksdb.workspace = true
crossbeam-channel.workspace = true
bincode.workspace = true
im.workspace = true
sized-chunks.workspace = true

2 changes: 1 addition & 1 deletion crates/db/src/tuplebox/backing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Used for write-ahead type storage at commit-time, and backed by whatever preferred physical
//! storage mechanism is desired.
use crate::tuplebox::transaction::WorkingSet;
use crate::tuplebox::working_set::WorkingSet;
use tokio::sync::mpsc::UnboundedSender;
pub struct BackingStoreClient {
sender: UnboundedSender<WriterMessage>,
Expand Down
6 changes: 5 additions & 1 deletion crates/db/src/tuplebox/base_relation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::tuplebox::RelationId;
use moor_values::util::slice_ref::SliceRef;
use std::collections::HashSet;

Expand All @@ -9,6 +10,8 @@ use std::collections::HashSet;
// type-checking on the values, though for our purposes this may be overkill at this time.
#[derive(Clone)]
pub struct BaseRelation {
pub(crate) id: RelationId,

/// The last successful committer's tx timestamp
pub(crate) ts: u64,

Expand All @@ -22,8 +25,9 @@ pub struct BaseRelation {
}

impl BaseRelation {
pub fn new(timestamp: u64) -> Self {
pub fn new(id: RelationId, timestamp: u64) -> Self {
Self {
id,
ts: timestamp,
domain_tuples: im::HashMap::new(),
codomain_domain: None,
Expand Down
21 changes: 20 additions & 1 deletion crates/db/src/tuplebox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,27 @@
mod backing;
mod base_relation;
mod object_relations;
mod transaction;
mod working_set;

pub mod relvar;
pub mod rocks_backing;
pub mod tb;
pub mod tb_worldstate;

mod transaction;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct RelationId(usize);

impl RelationId {
pub fn transient(id: usize) -> Self {
RelationId(id | (1 << 63))
}

// If the top bit (63rd) bit is not set, then this is a base relation.
pub fn is_base_relation(&self) -> bool {
self.0 & (1 << 63) == 0
}
pub fn is_transient_relation(&self) -> bool {
!self.is_base_relation()
}
}
40 changes: 26 additions & 14 deletions crates/db/src/tuplebox/object_relations.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::tuplebox::transaction::{Transaction, TupleError};
use crate::tuplebox::RelationId;
use moor_values::model::objset::ObjSet;
use moor_values::model::WorldStateError;
use moor_values::util::slice_ref::SliceRef;
Expand Down Expand Up @@ -33,6 +34,12 @@ pub enum WorldStateRelation {
ObjectPropertyValue = 8,
}

impl Into<RelationId> for WorldStateRelation {
fn into(self) -> RelationId {
RelationId(self as usize)
}
}

#[repr(usize)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, EnumIter, EnumCount)]
pub enum WorldStateSequences {
Expand All @@ -50,7 +57,8 @@ pub async fn upsert_object_value<Codomain: Clone + Eq + PartialEq + AsByteBuffer
// TODO: copy might not be needed here.
let value = SliceRef::from_vec(value.make_copy_as_vec());

if let Err(e) = tx.upsert_tuple(rel as usize, &key_bytes, value).await {
let relation = tx.relation(RelationId(rel as usize)).await;
if let Err(e) = relation.upsert_tuple(&key_bytes, value).await {
panic!("Unexpected error: {:?}", e)
}
Ok(())
Expand All @@ -65,8 +73,8 @@ pub async fn insert_object_value<Codomain: Clone + Eq + PartialEq + AsByteBuffer
) -> Result<(), WorldStateError> {
let key_bytes = oid.0.to_le_bytes();
let value = SliceRef::from_vec(value.make_copy_as_vec());

match tx.insert_tuple(rel as usize, &key_bytes, value).await {
let relation = tx.relation(RelationId(rel as usize)).await;
match relation.insert_tuple(&key_bytes, value).await {
Ok(_) => Ok(()),
Err(TupleError::Duplicate) => {
Err(WorldStateError::DatabaseError("Duplicate key".to_string()))
Expand All @@ -81,7 +89,8 @@ pub async fn get_object_value<Codomain: Clone + Eq + PartialEq + AsByteBuffer>(
oid: Objid,
) -> Option<Codomain> {
let key_bytes = oid.0.to_le_bytes();
match tx.seek_by_domain(rel as usize, &key_bytes).await {
let relation = tx.relation(RelationId(rel as usize)).await;
match relation.seek_by_domain(&key_bytes).await {
Ok(v) => Some(Codomain::from_sliceref(v)),
Err(TupleError::NotFound) => None,
Err(e) => panic!("Unexpected error: {:?}", e),
Expand All @@ -93,9 +102,9 @@ pub async fn get_object_codomain<Codomain: Clone + Eq + PartialEq + AsByteBuffer
rel: WorldStateRelation,
codomain: Codomain,
) -> ObjSet {
// Transaction-side support for the reverse index is not yet implemented.
let objs = tx
.seek_by_codomain(rel as usize, &codomain.make_copy_as_vec())
let relation = tx.relation(RelationId(rel as usize)).await;
let objs = relation
.seek_by_codomain(&codomain.make_copy_as_vec())
.await
.expect("Unable to seek by codomain")
.into_iter()
Expand All @@ -113,7 +122,8 @@ pub async fn get_composite_value<Codomain: Clone + Eq + PartialEq + AsByteBuffer
uuid: Uuid,
) -> Option<Codomain> {
let key_bytes = composite_key_for(oid, &uuid);
match tx.seek_by_domain(rel as usize, &key_bytes).await {
let relation = tx.relation(RelationId(rel as usize)).await;
match relation.seek_by_domain(&key_bytes).await {
Ok(v) => Some(Codomain::from_sliceref(v)),
Err(TupleError::NotFound) => None,
Err(e) => panic!("Unexpected error: {:?}", e),
Expand All @@ -130,8 +140,8 @@ async fn insert_composite_value<Codomain: Clone + Eq + PartialEq + AsByteBuffer>
) -> Result<(), WorldStateError> {
let key_bytes = composite_key_for(oid, &uuid);
let value = SliceRef::from_vec(value.make_copy_as_vec());

match tx.insert_tuple(rel as usize, &key_bytes, value).await {
let relation = tx.relation(RelationId(rel as usize)).await;
match relation.insert_tuple(&key_bytes, value).await {
Ok(_) => Ok(()),
Err(TupleError::Duplicate) => Err(WorldStateError::ObjectNotFound(oid)),
Err(e) => panic!("Unexpected error: {:?}", e),
Expand All @@ -145,7 +155,8 @@ async fn delete_if_exists<Codomain: Clone + Eq + PartialEq + AsByteBuffer>(
oid: Objid,
) -> Result<(), WorldStateError> {
let key_bytes = oid.0.to_le_bytes();
match tx.remove_by_domain(rel as usize, &key_bytes).await {
let relation = tx.relation(RelationId(rel as usize)).await;
match relation.remove_by_domain(&key_bytes).await {
Ok(_) => Ok(()),
Err(TupleError::NotFound) => Ok(()),
Err(e) => panic!("Unexpected error: {:?}", e),
Expand All @@ -159,7 +170,8 @@ pub async fn delete_composite_if_exists<Codomain: Clone + Eq + PartialEq + AsByt
uuid: Uuid,
) -> Result<(), WorldStateError> {
let key_bytes = composite_key_for(oid, &uuid);
match tx.remove_by_domain(rel as usize, &key_bytes).await {
let relation = tx.relation(RelationId(rel as usize)).await;
match relation.remove_by_domain(&key_bytes).await {
Ok(_) => Ok(()),
Err(TupleError::NotFound) => Ok(()),
Err(e) => panic!("Unexpected error: {:?}", e),
Expand All @@ -175,8 +187,8 @@ pub async fn upsert_obj_uuid_value<Codomain: Clone + Eq + PartialEq + AsByteBuff
) -> Result<(), WorldStateError> {
let key_bytes = composite_key_for(oid, &uuid);
let value = SliceRef::from_vec(value.make_copy_as_vec());

if let Err(e) = tx.upsert_tuple(rel as usize, &key_bytes, value).await {
let relation = tx.relation(RelationId(rel as usize)).await;
if let Err(e) = relation.upsert_tuple(&key_bytes, value).await {
panic!("Unexpected error: {:?}", e)
}
Ok(())
Expand Down
45 changes: 45 additions & 0 deletions crates/db/src/tuplebox/relvar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::tuplebox::transaction::{Transaction, TupleError};
use crate::tuplebox::RelationId;
use moor_values::util::slice_ref::SliceRef;
use std::collections::HashSet;

/// A reference / handle / pointer to a relation, the actual operations are managed through the
/// transaction.
/// A more convenient handle tied to the lifetime of the transaction.
pub struct RelVar<'a> {
pub(crate) tx: &'a Transaction,
pub(crate) id: RelationId,
}

impl<'a> RelVar<'a> {
/// Seek for a tuple by its indexed domain value.
pub async fn seek_by_domain(&self, domain: &[u8]) -> Result<SliceRef, TupleError> {
self.tx.seek_by_domain(self.id, domain).await
}

/// Seek for tuples by their indexed codomain value, if there's an index. Panics if there is no
/// secondary index.
pub async fn seek_by_codomain(&self, codomain: &[u8]) -> Result<HashSet<Vec<u8>>, TupleError> {
self.tx.seek_by_codomain(self.id, codomain).await
}

/// Insert a tuple into the relation.
pub async fn insert_tuple(&self, domain: &[u8], codomain: SliceRef) -> Result<(), TupleError> {
self.tx.insert_tuple(self.id, domain, codomain).await
}

/// Update a tuple in the relation.
pub async fn update_tuple(&self, domain: &[u8], codomain: SliceRef) -> Result<(), TupleError> {
self.tx.update_tuple(self.id, domain, codomain).await
}

/// Upsert a tuple into the relation.
pub async fn upsert_tuple(&self, domain: &[u8], codomain: SliceRef) -> Result<(), TupleError> {
self.tx.upsert_tuple(self.id, domain, codomain).await
}

/// Remove a tuple from the relation.
pub async fn remove_by_domain(&self, domain: &[u8]) -> Result<(), TupleError> {
self.tx.remove_by_domain(self.id, domain).await
}
}
46 changes: 20 additions & 26 deletions crates/db/src/tuplebox/rocks_backing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
use crate::tuplebox::backing::{BackingStoreClient, WriterMessage};
use crate::tuplebox::base_relation::BaseRelation;
use crate::tuplebox::tb::RelationInfo;
use crate::tuplebox::transaction::{TupleOperation, WorkingSet};
use crate::tuplebox::transaction::TupleOperation;
use crate::tuplebox::working_set::WorkingSet;
use moor_values::util::slice_ref::SliceRef;
use rocksdb::{IteratorMode, DB};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -84,27 +85,20 @@ impl RocksBackingStore {
let bs = Arc::new(RocksBackingStore { db, schema });

let (writer_send, writer_receive) = tokio::sync::mpsc::unbounded_channel();
let abort_flag = Arc::new(Mutex::new(None));
tokio::spawn(bs.clone().listen_loop(writer_receive, abort_flag));
tokio::spawn(bs.clone().listen_loop(writer_receive));

BackingStoreClient::new(writer_send)
}

async fn listen_loop(
self: Arc<Self>,
mut writer_receive: UnboundedReceiver<WriterMessage>,
abort_flag: Arc<Mutex<Option<u64>>>,
) {
async fn listen_loop(self: Arc<Self>, mut writer_receive: UnboundedReceiver<WriterMessage>) {
loop {
let abort_flag = abort_flag.clone();
let (abort_send, abort_receive) = tokio::sync::watch::channel(0);
let bs = self.clone();
match writer_receive.recv().await {
Some(WriterMessage::Commit(ts, ws, sequences)) => {
debug!("Committing write-ahead for ts {}", ts);
{
*abort_flag.lock().unwrap() = Some(ts);
}
tokio::spawn(bs.perform_writes(ts, ws, sequences, abort_flag.clone()));
abort_send.send(ts).unwrap();
tokio::spawn(bs.perform_writes(ts, ws, sequences, abort_receive.clone()));
}
Some(WriterMessage::Shutdown) => {
info!("Shutting down RocksDB writer thread");
Expand All @@ -123,7 +117,7 @@ impl RocksBackingStore {
ts: u64,
committed_working_set: WorkingSet,
current_sequences: Vec<u64>,
abort: Arc<Mutex<Option<u64>>>,
abort: tokio::sync::watch::Receiver<u64>,
) {
// Write the current state of sequences first.
let seq_cf = self
Expand All @@ -141,22 +135,22 @@ impl RocksBackingStore {
}

// Go through the modified tuples and mutate the underlying column families
for (relation_id, local_relation) in
committed_working_set.local_relations.iter().enumerate()
{
for (relation_id, local_relation) in committed_working_set.0.iter().enumerate() {
let relation_info = &self.schema[relation_id];
let cf = self
.db
.cf_handle(relation_info.name.as_str())
.expect("Unable to open column family");
for (domain, tuple) in local_relation.domain_index.iter() {
let abort = abort.lock().unwrap();
if abort.is_none() || abort.unwrap() != ts {
debug!(
"Aborting write-ahead due to abort flag flip from {} to {:?}",
ts, abort
);
return;
for (domain, tuple) in local_relation.tuples() {
if let Ok(true) = abort.has_changed() {
let new_ts = abort.borrow();
if *new_ts != ts {
debug!(
"Aborting write-ahead due to abort flag flip from {} to {:?}",
ts, new_ts
);
return;
}
}
match &tuple.t {
TupleOperation::Insert(v)
Expand Down
Loading

0 comments on commit 90d086c

Please sign in to comment.