Skip to content

Commit

Permalink
Next-gen buffer pool
Browse files Browse the repository at this point in the history
Based roughly on the "Umbra" presentation/paper:

  * Adds support for variable length pages
  * Multiple "size classes" for different pages sizes
  * Page size is selected based on (expected) tuple size.

Intent is to be able to support large-pages for big values while
maintaining small sized pages for typical workloads

NOTE: While the buffer pool itself is pretty tested by me (elsewhere),
  there, I think, consistency issues around PageStorage, and with
  refcounting on tuples that is leading to bugs. I would not consider
  this "production worthy" yet.
  • Loading branch information
rdaum committed Jan 2, 2024
1 parent ca9cd47 commit 8a40789
Show file tree
Hide file tree
Showing 21 changed files with 985 additions and 271 deletions.
2 changes: 1 addition & 1 deletion .licensure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ excludes:
- README.*
- LICENSE.*
- Dockerfile
- .*\.(md|rst|txt|yml|png|jpg|gif|db|pem|lock|json|toml)
- .*\.(md|rst|txt|yml|png|jpg|gif|db|pem|lock|json|toml|in|out)
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ criterion = { version = "0.5.1", features = ["async_tokio"] }
crossbeam-channel = "0.5.10"
decorum = "0.3.1" # For ordering & comparing our floats
enum-primitive-derive = "0.3.0"
human_bytes = "0.4.3"
inventory = "0.3.14"
itertools = "0.12.0"
lazy_static = "1.4.0"
Expand Down
10 changes: 1 addition & 9 deletions crates/daemon/src/connections_tb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use rpc_common::RpcRequestError;
use crate::connections::{ConnectionsDB, CONNECTION_TIMEOUT_DURATION};

const CONNECTIONS_DB_MEM_SIZE: usize = 1 << 26;
const CONNECTIONS_DB_PAGE_SIZE: usize = 32768;
pub struct ConnectionsTb {
tb: Arc<TupleBox>,
}
Expand All @@ -56,14 +55,7 @@ impl ConnectionsTb {
.collect();
relations[ConnectionRelation::ClientConnection as usize].secondary_indexed = true;

let tb = TupleBox::new(
CONNECTIONS_DB_MEM_SIZE,
CONNECTIONS_DB_PAGE_SIZE,
path,
&relations,
1,
)
.await;
let tb = TupleBox::new(CONNECTIONS_DB_MEM_SIZE, path, &relations, 1).await;
Self { tb }
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ moor-values = { path = "../values" }
async-trait.workspace = true
strum.workspace = true
uuid.workspace = true
lazy_static.workspace = true
human_bytes.workspace = true

## Error declaration/ handling
thiserror.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/db/benches/tb_single_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_db() -> Arc<TupleBox> {
})
.collect::<Vec<_>>();

TupleBox::new(1 << 24, 4096, None, &relations, 0).await
TupleBox::new(1 << 24, None, &relations, 0).await
}

fn from_val(value: i64) -> SliceRef {
Expand Down
3 changes: 1 addition & 2 deletions crates/db/src/object_relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ mod tests {
relations[ObjectParent as usize].secondary_indexed = true;
relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true;


TupleBox::new(1 << 24, 32768, None, &relations, WorldStateSequences::COUNT).await
TupleBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await
}

/// Test simple relations mapping oid->oid (with secondary index), independent of all other
Expand Down
15 changes: 2 additions & 13 deletions crates/db/src/tb_worldstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ use crate::tuplebox::tb::{RelationInfo, TupleBox};
use crate::tuplebox::{CommitError, Transaction};
use crate::{object_relations, tuplebox, Database};

// TODO: Totally arbitrary and needs profiling. Needs to be big enough to hold entire props and
// verbs.
const PAGE_SIZE: usize = 65536;

/// An implementation of `WorldState` / `WorldStateSource` that uses the TupleBox as its backing
pub struct TupleBoxWorldStateSource {
db: Arc<TupleBox>,
Expand All @@ -73,14 +69,7 @@ impl TupleBoxWorldStateSource {
relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true;
// Same with "contents".
relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true;
let db = TupleBox::new(
memory_size,
PAGE_SIZE,
path,
&relations,
WorldStateSequences::COUNT,
)
.await;
let db = TupleBox::new(memory_size, path, &relations, WorldStateSequences::COUNT).await;

// Check the db for sys (#0) object to see if this is a fresh DB or not.
let fresh_db = {
Expand Down Expand Up @@ -1140,7 +1129,7 @@ mod tests {
relations[WorldStateRelation::ObjectParent as usize].secondary_indexed = true;
relations[WorldStateRelation::ObjectLocation as usize].secondary_indexed = true;

TupleBox::new(1 << 24, 4096, None, &relations, WorldStateSequences::COUNT).await
TupleBox::new(1 << 24, None, &relations, WorldStateSequences::COUNT).await
}

#[tokio::test]
Expand Down
48 changes: 16 additions & 32 deletions crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ impl ColdStorage {
};

// Grab page storage and wait for all the writes to complete.
let mut cs = page_storage.lock().unwrap();
cs.wait_complete();
let mut ps = page_storage.lock().unwrap();
ps.wait_complete();

// Get the sequence page, and load the sequences from it, if any.
if let Ok(Some(sequence_page)) = cs.read_sequence_page() {
if let Ok(Some(sequence_page)) = ps.read_sequence_page() {
let sequence_page = sequence_page::View::new(&sequence_page[..]);
let num_sequences = sequence_page.num_sequences().read();
assert_eq!(num_sequences, sequences.len() as u64,
Expand All @@ -98,13 +98,13 @@ impl ColdStorage {
}

// Recover all the pages from cold storage and re-index all the tuples in them.
let ids = cs.list_pages();
let ids = ps.list_pages();
let mut restored_slots = HashMap::new();
let mut restored_bytes = 0;
for (page_size, page_num, relation_id) in ids {
let sb_page = slot_box.page_for(page_num);
let sb_page = slot_box.restore(page_num).expect("Unable to get page");
let slot_ids = sb_page.load(|buf| {
cs.read_page_buf(page_num, relation_id, buf)
ps.read_page_buf(page_num, relation_id, buf)
.expect("Unable to read page")
});
// The allocator needs to know that this page is used.
Expand Down Expand Up @@ -217,13 +217,11 @@ impl ColdStorage {
// For syncing pages, we don't need to sync each individual tuple, we we just find the set of dirty pages
// and sync them.
// The pages that are modified will be need be read-locked while they are copied.
let mut dirty_tuple_count = 0;
let mut dirty_pages = HashSet::new();
for r in &ws.relations {
for t in r.tuples() {
match t {
TxTuple::Insert(_) | TxTuple::Update(_) | TxTuple::Tombstone { .. } => {
dirty_tuple_count += 1;
let (page_id, _slot_id) = t.tuple_id();
dirty_pages.insert((page_id, r.id));
}
Expand All @@ -234,12 +232,12 @@ impl ColdStorage {
}
}

let mut total_synced_tuples = 0;

for (page_id, r) in &dirty_pages {
// Get the page for this tuple.
let page = slot_box.page_for(*page_id);
total_synced_tuples += page.num_active_slots();
// Get the slotboxy page for this tuple.
let Ok(page) = slot_box.page_for(*page_id) else {
// If the slot or page is already gone, ce la vie, we don't need to sync it.
continue;
};

// Copy the page into the WAL entry directly.
let wal_entry_buffer = make_wal_entry(
Expand All @@ -254,21 +252,6 @@ impl ColdStorage {
write_batch.push((*page_id, Some(wal_entry_buffer)));
}

let mut total_tuples = 0;
for p in slot_box.used_pages() {
let page = slot_box.page_for(p);
total_tuples += page.num_active_slots();
}

debug!(
dirty_tuple_count,
dirt_pages = dirty_pages.len(),
num_relations = ws.relations.len(),
total_synced_tuples,
total_tuples,
"Syncing dirty pages to WAL"
);

let mut sync_wal = wal.begin_entry().expect("Failed to begin WAL entry");
for (_page_id, wal_entry_buf) in write_batch {
if let Some(wal_entry_buf) = wal_entry_buf {
Expand Down Expand Up @@ -305,8 +288,9 @@ impl LogManager for WalManager {
for chunk in chunks {
Self::chunk_to_mutations(&chunk, &mut write_batch, &mut evicted);
}
let mut cs = self.page_storage.lock().unwrap();
cs.write_batch(write_batch).expect("Unable to write batch");
let mut ps = self.page_storage.lock().unwrap();
ps.write_batch(write_batch).expect("Unable to write batch");

Ok(())
}

Expand Down Expand Up @@ -336,11 +320,11 @@ impl LogManager for WalManager {
}
}

let Ok(mut cs) = self.page_storage.lock() else {
let Ok(mut ps) = self.page_storage.lock() else {
error!("Unable to lock cold storage");
return Ok(());
};
if let Err(e) = cs.write_batch(write_batch) {
if let Err(e) = ps.write_batch(write_batch) {
error!("Unable to write batch: {:?}", e);
return Ok(());
};
Expand Down
1 change: 1 addition & 0 deletions crates/db/src/tuplebox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod base_relation;

mod coldstorage;
mod page_storage;
mod pool;
mod slots;
pub mod tb;
mod tuples;
Expand Down
2 changes: 2 additions & 0 deletions crates/db/src/tuplebox/page_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// this program. If not, see <https://www.gnu.org/licenses/>.
//

// TODO: there's no way this is "robust" enough to be used in production

use crate::tuplebox::slots::PageId;
use crate::tuplebox::RelationId;
use im::{HashMap, HashSet};
Expand Down
Loading

0 comments on commit 8a40789

Please sign in to comment.