Skip to content

Commit

Permalink
After looking at charts in cachegrind... Switch from SparseChunk to h…
Browse files Browse the repository at this point in the history
…and-rolled BitArray borrowed from my adaptive radix tree project. Faster.
  • Loading branch information
rdaum committed Jan 6, 2024
1 parent d808b11 commit bde75ec
Show file tree
Hide file tree
Showing 13 changed files with 631 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ io-uring.workspace = true
hi_sparse_bitset.workspace = true
tokio-eventfd.workspace = true
fast-counter.workspace = true
num-traits.workspace = true

# For testing & benching common bits
serde_json.workspace = true
Expand Down
16 changes: 8 additions & 8 deletions crates/db/benches/tb_single_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use moor_db::testing::jepsen::{History, Type, Value};
use moor_db::tuplebox::{RelationInfo, TupleBox};
use moor_values::util::slice_ref::SliceRef;
use sized_chunks::SparseChunk;
use std::sync::Arc;
use std::time::{Duration, Instant};

// This is a struct that tells Criterion.rs to use the "futures" crate's current-thread executor
use moor_db::tuplebox::{RelationId, Transaction};
use moor_values::util::{BitArray, Bitset64};
use tokio::runtime::Runtime;

/// Build a test database with a bunch of relations
Expand Down Expand Up @@ -66,19 +66,19 @@ fn load_history() -> Vec<History> {
async fn list_append_workload(
db: Arc<TupleBox>,
events: &Vec<History>,
processes: &mut SparseChunk<Arc<Transaction>, 64>,
processes: &mut BitArray<Arc<Transaction>, 64, Bitset64<1>>,
) {
for e in events {
match e.r#type {
Type::invoke => {
// Start a transaction.
let tx = Arc::new(db.clone().start_tx());
let existing = processes.insert(e.process as usize, tx.clone());
assert!(
existing.is_none(),
"T{} already exists uncommitted",
!processes.check(e.process as usize),
"T{} already exists committed",
e.process
);
processes.set(e.process as usize, tx.clone());
// Execute the actions
for ev in &e.value {
match ev {
Expand Down Expand Up @@ -106,11 +106,11 @@ async fn list_append_workload(
}
}
Type::ok => {
let tx = processes.remove(e.process as usize).unwrap();
let tx = processes.erase(e.process as usize).unwrap();
tx.commit().await.unwrap();
}
Type::fail => {
let tx = processes.remove(e.process as usize).unwrap();
let tx = processes.erase(e.process as usize).unwrap();
tx.rollback().await.unwrap();
}
}
Expand All @@ -123,7 +123,7 @@ async fn do_insert_workload(iters: u64, events: &Vec<History>) -> Duration {
let db = test_db().await;

// Where to track the transactions running.
let mut processes = SparseChunk::new();
let mut processes = BitArray::new();

let start = Instant::now();
list_append_workload(db, events, &mut processes).await;
Expand Down
4 changes: 2 additions & 2 deletions crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,14 @@ impl ColdStorage {
// The pages that are modified will be need be read-locked while they are copied.
let mut dirty_pages = HashSet::new();
for r in ws.relations.iter() {
for t in r.tuples() {
for t in r.1.tuples() {
match t {
TxTuple::Insert(_) | TxTuple::Update(_) | TxTuple::Tombstone { .. } => {
let TupleId {
page: page_id,
slot: _slot_id,
} = t.tuple_id();
dirty_pages.insert((page_id, r.id));
dirty_pages.insert((page_id, r.1.id));
}
TxTuple::Value(_) => {
// Untouched value (view), noop, should already exist in backing store.
Expand Down
8 changes: 4 additions & 4 deletions crates/db/src/tuplebox/tb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ impl TupleBox {
) -> Result<CommitSet, CommitError> {
let mut commitset = CommitSet::new(commit_ts);

for (relation_id, local_relation) in tx_working_set.relations.iter().enumerate() {
let relation_id = RelationId(relation_id);
for (_, local_relation) in tx_working_set.relations.iter() {
let relation_id = local_relation.id;
// scan through the local working set, and for each tuple, check to see if it's safe to
// commit. If it is, then we'll add it to the commit set.
// note we're not actually committing yet, just producing a candidate commit set
Expand Down Expand Up @@ -267,7 +267,7 @@ impl TupleBox {
// We have to hold a lock during the duration of this. If we fail, we will loop back
// and retry.
let mut canonical = self.canonical.write().await;
for relation in commit_set.iter() {
for (_, relation) in commit_set.iter() {
// Did the relation get committed to by someone else in the interim? If so, return
// back to the transaction letting it know that, and it can decide if it wants to
// retry.
Expand All @@ -279,7 +279,7 @@ impl TupleBox {
// Everything passed, so we can commit the changes by swapping in the new canonical
// before releasing the lock.
let commit_ts = commit_set.ts;
for relation in commit_set.into_iter() {
for (_, relation) in commit_set.into_iter() {
let idx = relation.id.0;
canonical[idx] = relation;
// And update the timestamp on the canonical relation.
Expand Down
20 changes: 10 additions & 10 deletions crates/db/src/tuplebox/tuples/slotbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::pin::Pin;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex};

use sized_chunks::SparseChunk;
use moor_values::util::{BitArray, Bitset64};
use thiserror::Error;
use tracing::error;

Expand Down Expand Up @@ -188,16 +188,16 @@ impl SlotBox {
}

pub fn num_pages(&self) -> usize {
let inner = self.inner.lock().unwrap();
inner.available_page_space.len()
let mut inner = self.inner.lock().unwrap();
inner.available_page_space.size()
}

pub fn used_pages(&self) -> Vec<PageId> {
let allocator = self.inner.lock().unwrap();
allocator
.available_page_space
.iter()
.map(|ps| ps.pages())
.map(|(_, ps)| ps.pages())
.flatten()
.collect()
}
Expand All @@ -209,7 +209,7 @@ struct Inner {
// so we can maybe get rid of the locks in the buffer pool...
pool: BufferPool,
/// The set of used pages, indexed by relation, in sorted order of the free space available in them.
available_page_space: SparseChunk<PageSpace, 64>,
available_page_space: BitArray<PageSpace, 64, Bitset64<1>>,
/// The "swizzelable" references to tuples, indexed by tuple id.
/// There has to be a stable-memory address for each of these, as they are referenced by
/// pointers in the TupleRefs themselves.
Expand All @@ -221,7 +221,7 @@ struct Inner {
impl Inner {
fn new(pool: BufferPool) -> Self {
Self {
available_page_space: SparseChunk::new(),
available_page_space: BitArray::new(),
pool,
swizrefs: HashMap::new(),
}
Expand Down Expand Up @@ -297,7 +297,7 @@ impl Inner {
let bid = Bid(pid as u64);
let Some(available_page_space) = self.available_page_space.get_mut(relation_id.0) else {
self.available_page_space
.insert(relation_id.0, PageSpace::new(free_space, bid));
.set(relation_id.0, PageSpace::new(free_space, bid));
return;
};

Expand Down Expand Up @@ -352,7 +352,7 @@ impl Inner {
Ok((bid.0 as PageId, available_page_space.len() - 1))
}
None => {
self.available_page_space.insert(
self.available_page_space.set(
relation_id.0,
PageSpace::new(slot_page_empty_size(actual_size), bid),
);
Expand Down Expand Up @@ -394,14 +394,14 @@ impl Inner {
offset: usize,
page_remaining_bytes: usize,
) {
let available_page_space = &mut self.available_page_space[relation_id.0];
let available_page_space = self.available_page_space.get_mut(relation_id.0).unwrap();
available_page_space.finish(offset, page_remaining_bytes);
}

fn report_free(&mut self, pid: PageId, new_size: usize, is_empty: bool) {
// Seek the page in the available_page_space vectors, and add the bytes back to its free space.
// We don't know the relation id here, so we have to linear scan all of them.
for available_page_space in self.available_page_space.iter_mut() {
for (_, available_page_space) in self.available_page_space.iter_mut() {
if available_page_space.update_page(pid, new_size, is_empty) {
if is_empty {
self.pool
Expand Down
2 changes: 2 additions & 0 deletions crates/db/src/tuplebox/tuples/slotted_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ impl<'a> PageWriteGuard<'a> {
sp.get_slot_mut(slot_id)
}

#[inline]
pub fn allocate(
&mut self,
size: usize,
Expand All @@ -620,6 +621,7 @@ impl<'a> PageWriteGuard<'a> {
};
sp.allocate(size, initial_value)
}

pub fn remove_slot(&mut self, slot_id: SlotId) -> Result<(usize, usize, bool), SlotBoxError> {
let sp = SlottedPage {
base_address: self.base_address,
Expand Down
14 changes: 7 additions & 7 deletions crates/db/src/tuplebox/tx/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use sized_chunks::SparseChunk;
use moor_values::util::{BitArray, Bitset64};
use thiserror::Error;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -307,26 +307,26 @@ impl Transaction {
/// working set.
pub struct CommitSet {
pub(crate) ts: u64,
relations: SparseChunk<BaseRelation, 64>,
relations: BitArray<BaseRelation, 64, Bitset64<1>>,
}

impl CommitSet {
pub(crate) fn new(ts: u64) -> Self {
Self {
ts,
relations: SparseChunk::new(),
relations: BitArray::new(),
}
}

/// Returns an iterator over the modified relations in the commit set.
pub(crate) fn iter(&self) -> impl Iterator<Item = &BaseRelation> {
pub(crate) fn iter(&self) -> impl Iterator<Item = (usize, &BaseRelation)> {
return self.relations.iter();
}

/// Returns an iterator over the modified relations in the commit set, moving and consuming the
/// commit set in the process.
pub(crate) fn into_iter(self) -> impl IntoIterator<Item = BaseRelation> {
self.relations.into_iter()
pub(crate) fn into_iter(self) -> impl IntoIterator<Item = (usize, BaseRelation)> {
self.relations.take_all().into_iter()
}

/// Fork the given base relation into the commit set, if it's not already there.
Expand All @@ -337,7 +337,7 @@ impl CommitSet {
) -> &mut BaseRelation {
if self.relations.get(relation_id.0).is_none() {
let r = canonical.clone();
self.relations.insert(relation_id.0, r);
self.relations.set(relation_id.0, r);
}
self.relations.get_mut(relation_id.0).unwrap()
}
Expand Down
Loading

0 comments on commit bde75ec

Please sign in to comment.