Skip to content

Commit

Permalink
A round of changes related to problems & style violations found by cl…
Browse files Browse the repository at this point in the history
…ippy

(Note that in the process found a bug in the slotbox reference counts established during initial textdump load that will have to be resolved, though so far it seems like a "harmless" warning is sufficient)

Should now have a clean output for --all-targets --all-features run for Clippy.
  • Loading branch information
rdaum committed Jan 7, 2024
1 parent 107e598 commit f090ce6
Show file tree
Hide file tree
Showing 23 changed files with 101 additions and 67 deletions.
20 changes: 9 additions & 11 deletions crates/compiler/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,15 @@ fn parse_arglist(
names: Rc<RefCell<Names>>,
pairs: pest::iterators::Pairs<Rule>,
) -> Result<Vec<Arg>, CompileError> {
for pair in pairs {
match pair.as_rule() {
Rule::exprlist => {
return parse_exprlist(names, pair.into_inner());
}
_ => {
panic!("Unimplemented arglist: {:?}", pair);
}
}
}
Ok(vec![])
let Some(first) = pairs.peek() else {
return Ok(vec![]);
};

let Rule::exprlist = first.as_rule() else {
panic!("Unimplemented arglist: {:?}", first);
};

return parse_exprlist(names, first.into_inner());
}

fn parse_except_codes(
Expand Down
2 changes: 2 additions & 0 deletions crates/daemon/src/connections_tb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ impl ConnectionsTb {

#[repr(usize)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, EnumIter, EnumCount, Display)]
// Don't warn about same-prefix, "I did that on purpose"
#[allow(clippy::enum_variant_names)]
enum ConnectionRelation {
// One to many, client id <-> connection/player object. Secondary index will seek on object id.
ClientConnection = 0,
Expand Down
6 changes: 2 additions & 4 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,8 +1003,7 @@ impl RpcServer {
client_id: Uuid,
) -> Result<(), SessionError> {
let key: Key<32> = Key::from(&self.keypair[32..]);
let pk: PasetoAsymmetricPublicKey<V4, Public> =
PasetoAsymmetricPublicKey::try_from(&key).unwrap();
let pk: PasetoAsymmetricPublicKey<V4, Public> = PasetoAsymmetricPublicKey::from(&key);
let verified_token = Paseto::<V4, Public>::try_verify(
token.0.as_str(),
&pk,
Expand Down Expand Up @@ -1059,8 +1058,7 @@ impl RpcServer {
objid: Option<Objid>,
) -> Result<Objid, SessionError> {
let key: Key<32> = Key::from(&self.keypair[32..]);
let pk: PasetoAsymmetricPublicKey<V4, Public> =
PasetoAsymmetricPublicKey::try_from(&key).unwrap();
let pk: PasetoAsymmetricPublicKey<V4, Public> = PasetoAsymmetricPublicKey::from(&key);
let verified_token = Paseto::<V4, Public>::try_verify(
token.0.as_str(),
&pk,
Expand Down
5 changes: 0 additions & 5 deletions crates/db/benches/tb_single_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ async fn test_db() -> Arc<TupleBox> {
fn from_val(value: i64) -> SliceRef {
SliceRef::from_bytes(&value.to_le_bytes()[..])
}
fn to_val(value: SliceRef) -> i64 {
let mut bytes = [0; 8];
bytes.copy_from_slice(value.as_slice());
i64::from_le_bytes(bytes)
}

fn load_history() -> Vec<History> {
let lines = include_str!("list-append-dataset.json")
Expand Down
2 changes: 2 additions & 0 deletions crates/db/src/db_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub trait DbTransaction {
) -> Result<(), WorldStateError>;

/// Define a new verb on the given object.
// Yes yes I know it's a lot of arguments, but wrapper object here is redundant.
#[allow(clippy::too_many_arguments)]
async fn add_object_verb(
&self,
location: Objid,
Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/tuplebox/base_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl BaseRelation {

/// Establish indexes for a tuple initial-loaded from secondary storage. Basically a, "trust us,
/// this exists" move.
pub fn index_tuple(&mut self, tuple: TupleRef) {
pub fn index_tuple(&mut self, mut tuple: TupleRef) {
self.tuples.insert(tuple.clone());

// Reset timestamp to 0, since this is a tuple initial-loaded from secondary storage.
Expand Down
8 changes: 4 additions & 4 deletions crates/db/src/tuplebox/coldstorage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl WalManager {
to_evict: &mut Vec<TupleId>,
) {
// The first N bytes have to be WAL_MAGIC or this is an invalid chunk.
if chunk.len() < wal_entry::header::OFFSET {
if chunk.len() < wal_entry::data::OFFSET {
warn!("Chunk is too small to be valid");
return;
}
Expand All @@ -470,7 +470,7 @@ impl WalManager {
// page number.
let relation_id = RelationId(wal_entry.header().relation_id().read() as usize);

write_mutations.push(PageStoreMutation::SyncRelationPage(
write_mutations.push(PageStoreMutation::SyncRelation(
relation_id,
pid as PageId,
data,
Expand All @@ -479,13 +479,13 @@ impl WalManager {
WalEntryType::SequenceSync => {
// Write current state of sequences to the sequence page. Ignores page id, slot id.
// Data is the contents of the sequence page.
write_mutations.push(PageStoreMutation::SyncSequencePage(data));
write_mutations.push(PageStoreMutation::SyncSequence(data));
}
WalEntryType::Delete => {
// Delete
let relation_id = RelationId(wal_entry.header().relation_id().read() as usize);
let slot_id = wal_entry.header().slot_id().read();
write_mutations.push(PageStoreMutation::DeleteRelationPage(
write_mutations.push(PageStoreMutation::DeleteRelation(
pid as PageId,
relation_id,
));
Expand Down
12 changes: 6 additions & 6 deletions crates/db/src/tuplebox/page_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use std::thread::yield_now;
use tokio_eventfd::EventFd;

pub(crate) enum PageStoreMutation {
SyncRelationPage(RelationId, PageId, Box<[u8]>),
SyncSequencePage(Box<[u8]>),
DeleteRelationPage(PageId, RelationId),
SyncRelation(RelationId, PageId, Box<[u8]>),
SyncSequence(Box<[u8]>),
DeleteRelation(PageId, RelationId),
}

/// Manages the directory of pages, one file per page.
Expand Down Expand Up @@ -183,7 +183,7 @@ impl PageStore {
let request_id = self.next_request_id;
self.next_request_id += 1;
match mutation {
PageStoreMutation::SyncRelationPage(relation_id, page_id, data) => {
PageStoreMutation::SyncRelation(relation_id, page_id, data) => {
let path = self.dir.join(format!("{}_{}.page", page_id, relation_id.0));
let len = data.len();
let mut options = OpenOptions::new();
Expand Down Expand Up @@ -217,7 +217,7 @@ impl PageStore {
.expect("Unable to push fsync to submission queue");
}
}
PageStoreMutation::SyncSequencePage(data) => {
PageStoreMutation::SyncSequence(data) => {
let path = self.dir.join("sequences.page");

let len = data.len();
Expand Down Expand Up @@ -250,7 +250,7 @@ impl PageStore {
.expect("Unable to push fsync to submission queue");
}
}
PageStoreMutation::DeleteRelationPage(_, _) => {
PageStoreMutation::DeleteRelation(_, _) => {
// TODO
}
}
Expand Down
10 changes: 5 additions & 5 deletions crates/db/src/tuplebox/tb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ impl TupleBox {
pub(crate) async fn prepare_commit_set<'a>(
&self,
commit_ts: u64,
tx_working_set: &WorkingSet,
tx_working_set: &mut WorkingSet,
) -> Result<CommitSet, CommitError> {
let mut commitset = CommitSet::new(commit_ts);

for (_, local_relation) in tx_working_set.relations.iter() {
for (_, local_relation) in tx_working_set.relations.iter_mut() {
let relation_id = local_relation.id;
// scan through the local working set, and for each tuple, check to see if it's safe to
// commit. If it is, then we'll add it to the commit set.
// note we're not actually committing yet, just producing a candidate commit set
let canonical = &self.canonical.read().await[relation_id.0];
for tuple in local_relation.tuples() {
for mut tuple in local_relation.tuples_mut() {
let canon_tuple = canonical.seek_by_domain(tuple.domain().clone());

// If there's no value there, and our local is not tombstoned and we're not doing
Expand All @@ -197,7 +197,7 @@ impl TupleBox {
// TODO: it should be possible to do this without having the fork logic exist twice
// here.
let Some(cv) = canon_tuple else {
match &tuple {
match &mut tuple {
TxTuple::Insert(t) => {
t.update_timestamp(commit_ts);
let forked_relation = commitset.fork(relation_id, canonical);
Expand Down Expand Up @@ -232,7 +232,7 @@ impl TupleBox {
// Otherwise apply the change into a new canonical relation, which is a CoW
// branching of the old one.
let forked_relation = commitset.fork(relation_id, canonical);
match &tuple {
match &mut tuple {
TxTuple::Insert(t) | TxTuple::Update(t) => {
t.update_timestamp(commit_ts);
let forked_relation = commitset.fork(relation_id, canonical);
Expand Down
32 changes: 19 additions & 13 deletions crates/db/src/tuplebox/tuples/slotbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::sync::{Arc, Mutex};

use moor_values::util::{BitArray, Bitset64};
use thiserror::Error;
use tracing::error;
use tracing::{error, warn};

use crate::tuplebox::pool::{Bid, BufferPool, PagerError};
pub use crate::tuplebox::tuples::slotted_page::SlotId;
Expand Down Expand Up @@ -122,6 +122,12 @@ impl SlotBox {
inner.page_for(id)
}

pub fn refcount(&self, id: TupleId) -> Result<u16, SlotBoxError> {
let inner = self.inner.lock().unwrap();
let page_handle = inner.page_for(id.page)?;
page_handle.refcount(id.slot)
}

pub fn upcount(&self, id: TupleId) -> Result<(), SlotBoxError> {
let inner = self.inner.lock().unwrap();
let page_handle = inner.page_for(id.page)?;
Expand Down Expand Up @@ -188,7 +194,7 @@ impl SlotBox {
}

pub fn num_pages(&self) -> usize {
let mut inner = self.inner.lock().unwrap();
let inner = self.inner.lock().unwrap();
inner.available_page_space.len()
}

Expand Down Expand Up @@ -394,8 +400,6 @@ impl Inner {
}

fn report_free(&mut self, pid: PageId, new_size: usize, is_empty: bool) {
// Seek the page in the available_page_space vectors, and add the bytes back to its free space.
// We don't know the relation id here, so we have to linear scan all of them.
for (_, available_page_space) in self.available_page_space.iter_mut() {
if available_page_space.update_page(pid, new_size, is_empty) {
if is_empty {
Expand All @@ -405,10 +409,11 @@ impl Inner {
}
return;
}
return;
}

error!(
// TODO: initial textdump load seems to have a problem with initial inserts having a too-low refcount?
// but once the DB is established, it's fine. So maybe this is a problem with insert tuple allocation?
warn!(
"Page not found in used pages in allocator on free; pid {}; could be double-free, dangling weak reference?",
pid
);
Expand Down Expand Up @@ -460,6 +465,7 @@ impl PageSpace {

/// Update the allocation record for the page.
fn update_page(&mut self, pid: PageId, available: usize, is_empty: bool) -> bool {
// Page does not exist in this relation, so we can't update it.
let Some(index) = self.seek(pid) else {
return false;
};
Expand Down Expand Up @@ -642,8 +648,8 @@ mod tests {
// and then scan back and verify their presence/equality.
#[test]
fn test_basic_add_fill_etc() {
let mut sb = Arc::new(SlotBox::new(32768 * 32));
let mut tuples = fill_until_full(&mut sb);
let sb = Arc::new(SlotBox::new(32768 * 32));
let mut tuples = fill_until_full(&sb);
for (i, (tuple, expected_value)) in tuples.iter().enumerate() {
let retrieved_domain = tuple.domain();
let retrieved_codomain = tuple.codomain();
Expand Down Expand Up @@ -671,8 +677,8 @@ mod tests {
// everything mmap DONTNEED'd, and we should be able to re-fill it again, too.
#[test]
fn test_full_fill_and_empty() {
let mut sb = Arc::new(SlotBox::new(32768 * 64));
let mut tuples = fill_until_full(&mut sb);
let sb = Arc::new(SlotBox::new(32768 * 64));
let mut tuples = fill_until_full(&sb);

// Collect the manual ids of the tuples we've allocated, so we can check them for refcount goodness.
let ids = tuples.iter().map(|(t, _)| t.id()).collect::<Vec<_>>();
Expand All @@ -688,8 +694,8 @@ mod tests {
// fill back up again and verify the new presence.
#[test]
fn test_fill_and_free_and_refill_etc() {
let mut sb = Arc::new(SlotBox::new(32768 * 64));
let mut tuples = fill_until_full(&mut sb);
let sb = Arc::new(SlotBox::new(32768 * 64));
let mut tuples = fill_until_full(&sb);
let mut rng = thread_rng();
let mut freed_tuples = Vec::new();

Expand All @@ -715,7 +721,7 @@ mod tests {
assert!(sb.get(id).is_err());
}
// Now fill back up again.
let new_tuples = fill_until_full(&mut sb);
let new_tuples = fill_until_full(&sb);
// Verify both the new tuples and the old tuples are there.
for (tuple, expected) in new_tuples {
let retrieved_domain = tuple.domain();
Expand Down
10 changes: 8 additions & 2 deletions crates/db/src/tuplebox/tuples/slotted_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ impl<'a> SlottedPage<'a> {
index_entry.as_mut().mark_used(size);

// Update used bytes in the header
let header = header;
header.add_used(size);

let slc = unsafe {
Expand Down Expand Up @@ -315,7 +314,6 @@ impl<'a> SlottedPage<'a> {
.alloc(content_start_position, content_size, size);

// Update the header to subtract the used space.
let header = header;
let new_slot = header.add_entry(size);

// Return the slot id and the number of bytes remaining to append at the end.
Expand Down Expand Up @@ -394,6 +392,14 @@ impl<'a> SlottedPage<'a> {
Ok((self.available_content_bytes(), slot_size, is_empty))
}

pub(crate) fn refcount(&self, slot_id: SlotId) -> Result<u16, SlotBoxError> {
let index_entry = self.get_index_entry(slot_id);
if !index_entry.used {
return Err(SlotBoxError::TupleNotFound(slot_id as usize));
}
Ok(index_entry.refcount)
}

pub(crate) fn upcount(&self, slot_id: SlotId) -> Result<(), SlotBoxError> {
let mut index_entry = self.get_index_entry_mut(slot_id);
unsafe { index_entry.as_mut().get_unchecked_mut() }.refcount += 1;
Expand Down
7 changes: 6 additions & 1 deletion crates/db/src/tuplebox/tuples/tuple_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl TuplePtr {
}

#[inline]
pub(crate) fn as_mut_ptr<T>(&self) -> *mut T {
pub(crate) fn as_mut_ptr<T>(&mut self) -> *mut T {
self.bufaddr as *mut T
}

Expand All @@ -95,6 +95,11 @@ impl TuplePtr {
}
}

#[inline]
pub fn refcount(&self) -> u16 {
self.sb.refcount(self.id).unwrap()
}

#[inline]
pub fn upcount(&self) {
self.sb.upcount(self.id).unwrap();
Expand Down
18 changes: 13 additions & 5 deletions crates/db/src/tuplebox/tuples/tuple_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ impl TupleRef {
buffer[start_pos..start_pos + domain_len].copy_from_slice(domain);
buffer[codomain_start..codomain_end].copy_from_slice(codomain);
})?;

// Initial refcount should be 1, because we have a reference to it.
assert_eq!(tuple_ref.resolve_slot_ptr().refcount(), 1);
Ok(tuple_ref)
}

Expand All @@ -80,7 +83,7 @@ impl TupleRef {

/// Update the timestamp of the tuple.
#[inline]
pub fn update_timestamp(&self, ts: u64) {
pub fn update_timestamp(&mut self, ts: u64) {
let header = self.header_mut();
header.ts = ts;
}
Expand Down Expand Up @@ -129,14 +132,19 @@ impl TupleRef {
}

#[inline]
fn header_mut(&self) -> &mut Header {
let slot_ptr = self.resolve_slot_ptr();
let header: *mut Header = slot_ptr.as_mut_ptr();
fn header_mut(&mut self) -> &mut Header {
let slot_ptr = self.resolve_slot_ptr_mut();
let header: *mut Header = unsafe { slot_ptr.get_unchecked_mut() }.as_mut_ptr();
unsafe { &mut *header }
}

#[inline]
fn resolve_slot_ptr(&self) -> Pin<&mut TuplePtr> {
fn resolve_slot_ptr(&self) -> Pin<&TuplePtr> {
unsafe { Pin::new_unchecked(&*self.sp) }
}

#[inline]
fn resolve_slot_ptr_mut(&mut self) -> Pin<&mut TuplePtr> {
unsafe { Pin::new_unchecked(&mut *self.sp) }
}

Expand Down
Loading

0 comments on commit f090ce6

Please sign in to comment.