Skip to content

Commit

Permalink
refactor: Evolve BlockStore trait (#402)
Browse files Browse the repository at this point in the history
- Use new RPITIT instead of `async_trait` macro for all traits
- Expose blanket implementation `impl<B: BlockStore> BlockStore for &B` and `Box<B>`
- Add two functions to `trait BlockStore`:
  - `async fn has_block(&self, cid: &Cid) -> Result<bool>;` to find out whether a block is available locally
  - `async fn put_block_keyed(&self, cid: &Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()>;` to add a block with given CID to the blockstore. This allows us to support adding blocks using different hashing functions to the same blockstore.
- Use explicit `BlockStoreError` type in `trait Blockstore` instead of `anyhow::Error`
- Update `rug` dependency to `1.24`

---

* feat: Implement `BlockStore` for derefs, too

* refactor: Use new RPITIT feature for `trait BlockStore`

* refactor: Move `trait PrivateForest` to RPITIT

* refactor: Use RPITIT in `trait PrivateKey` & `trait ExchangeKey`

* refactor: Completely remove `async_trait`

* chore: Fix warnings, remove unused `IpldEq` trait

* fix: Update rug & enable std feature

* feat: don't require `std` for `rug`, more efficient `to_bytes_be`

* chore: Fix nightly warning

* refactor: Blanket-impl for `&B` and `Box<B>` instead of `Deref`

This way is recommended by dtolnay (rust-lang/api-guidelines#158)
and the "Rust for Rustaceans" book (paragraph "Ergonomic Trait Implementations").
This leads to better compiler error messages when you pass something that doesn't `impl BlockStore` the right way.
`rand_core` explicitly decided against a `DerefMut` blanket implementation for `trait RngCore`.

* refactor: Remove serializable things from `BlockStore`

Use the `Storable` trait and its `store` and `load` functions instead.

* feat: Add `has_block` to `trait BlockStore`

* fix: Update accesskey snapshot

* fix: Implement `has_block` for `ForeignBlockStore`

* feat: Add `get_block_keyed` to `trait BlockStore`, fix wasm

* refactor: Move blockstore interface close to extern

* refactor: Use precise error type in `trait BlockStore`

* feat: Return correct error in `ForeignBlockStore::get_block`

* refactor: Use `libipld_core::serde::to_ipld` instead of dag-cbor

* docs: Add comments explaining use of `boxed_fut`
  • Loading branch information
matheus23 authored Feb 15, 2024
1 parent aff9cf1 commit 3eaee3c
Show file tree
Hide file tree
Showing 47 changed files with 410 additions and 334 deletions.
3 changes: 2 additions & 1 deletion wnfs-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ categories = [
license = "Apache-2.0"
readme = "README.md"
edition = "2021"
rust-version = "1.75"
repository = "https://github.com/wnfs-wg/rs-wnfs/tree/main/wnfs-common"
homepage = "https://fission.codes"
authors = ["The Fission Authors"]

[dependencies]
anyhow = "1.0"
async-once-cell = "0.5"
async-trait = "0.1"
base64 = { version = "0.21", optional = true }
base64-serde = { version = "0.7", optional = true }
bytes = { version = "1.4", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
cid = "0.10"
dashmap = "5.5.3"
futures = "0.3"
libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] }
Expand Down
214 changes: 150 additions & 64 deletions wnfs-common/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use crate::{
utils::{Arc, CondSend, CondSync},
BlockStoreError, MAX_BLOCK_SIZE,
};
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::Future;
use libipld::{
cbor::DagCborCodec,
cid::Version,
multihash::{Code, MultihashDigest},
serde as ipld_serde, Cid,
Cid,
};
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -45,38 +44,74 @@ pub const CODEC_DAG_PB: u64 = 0x70;
pub const CODEC_RAW: u64 = 0x55;

//--------------------------------------------------------------------------------------------------
// Type Definitions
// Traits
//--------------------------------------------------------------------------------------------------

/// For types that implement block store operations like adding, getting content from the store.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BlockStore: Sized + CondSync {
async fn get_block(&self, cid: &Cid) -> Result<Bytes>;
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid>;

async fn get_deserializable<V>(&self, cid: &Cid) -> Result<V>
where
V: DeserializeOwned,
{
let bytes = self.get_block(cid).await?;
let ipld = decode(bytes.as_ref(), DagCborCodec)?;
Ok(ipld_serde::from_ipld::<V>(ipld)?)
pub trait BlockStore: CondSync {
/// Retrieve a block from this store via its hash (`Cid`).
///
/// If this store can't find the block, it may raise an error like `BlockNotFound`.
fn get_block(
&self,
cid: &Cid,
) -> impl Future<Output = Result<Bytes, BlockStoreError>> + CondSend;

/// Put some bytes into the blockstore. These bytes should be encoded with the given codec.
///
/// E.g. `CODEC_RAW` for raw bytes blocks, `CODEC_DAG_CBOR` for dag-cbor, etc.
///
/// This codec will determine the codec encoded in the final `Cid` that's returned.
///
/// If the codec is incorrect, this function won't fail, but any tools that depend on the
/// correctness of the codec may fail. (E.g. tools that follow the links of blocks).
///
/// This funciton allows the blockstore to choose the hashing function itself.
/// The hashing function that was chosen will be readable from the `Cid` metadata.
///
/// If you need control over the concrete hashing function that's used, see `put_block_keyed`.
fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> impl Future<Output = Result<Cid, BlockStoreError>> + CondSend {
let bytes = bytes.into();
async move {
let cid = self.create_cid(&bytes, codec)?;
self.put_block_keyed(cid, bytes).await?;
Ok(cid)
}
}

async fn put_serializable<V>(&self, value: &V) -> Result<Cid>
where
V: Serialize + CondSync,
{
let bytes = encode(&ipld_serde::to_ipld(value)?, DagCborCodec)?;
self.put_block(bytes, CODEC_DAG_CBOR).await
}
/// Put a block of data into this blockstore. The block's CID needs to match the CID given.
///
/// It's up to the blockstore whether to check this fact or assume it when this function is called.
///
/// The default implementation of `put_block` will use this function under the hood and use
/// the correct CID provided by the `create_cid` function.
///
/// This is useful to be able to add blocks that were generated from other
/// clients with differently configured hashing functions to this blockstore.
fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> impl Future<Output = Result<(), BlockStoreError>> + CondSend;

/// Find out whether a call to `get_block` would return with a result or not.
///
/// This is useful for data exchange protocols to find out what needs to be fetched
/// externally and what doesn't.
fn has_block(
&self,
cid: &Cid,
) -> impl Future<Output = Result<bool, BlockStoreError>> + CondSend;

// This should be the same in all implementations of BlockStore
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid> {
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
// If there are too many bytes, abandon this task
if bytes.len() > MAX_BLOCK_SIZE {
bail!(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()))
return Err(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()));
}

// Compute the Blake3 hash of the bytes
Expand All @@ -93,6 +128,66 @@ pub trait BlockStore: Sized + CondSync {
// Implementations
//--------------------------------------------------------------------------------------------------

impl<B: BlockStore> BlockStore for &B {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
(**self).get_block(cid).await
}

async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
(**self).put_block(bytes, codec).await
}

async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
(**self).put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
(**self).has_block(cid).await
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
(**self).create_cid(bytes, codec)
}
}

impl<B: BlockStore> BlockStore for Box<B> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
(**self).get_block(cid).await
}

async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
(**self).put_block(bytes, codec).await
}

async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
(**self).put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
(**self).has_block(cid).await
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
(**self).create_cid(bytes, codec)
}
}

/// An in-memory block store to simulate IPFS.
///
/// IPFS is basically a glorified HashMap.
Expand All @@ -111,11 +206,8 @@ impl MemoryBlockStore {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for MemoryBlockStore {
/// Retrieves an array of bytes from the block store with given CID.
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
let bytes = self
.0
.lock()
Expand All @@ -126,18 +218,18 @@ impl BlockStore for MemoryBlockStore {
Ok(bytes)
}

/// Stores an array of bytes in the block store.
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
// Convert the bytes into a Bytes object
let bytes: Bytes = bytes.into();
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.0.lock().insert(cid, bytes.into());

// Try to build the CID from the bytes and codec
let cid = self.create_cid(&bytes, codec)?;

// Insert the bytes into the HashMap using the CID as the key
self.0.lock().insert(cid, bytes);
Ok(())
}

Ok(cid)
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
Ok(self.0.lock().contains_key(cid))
}
}

Expand All @@ -146,21 +238,18 @@ impl BlockStore for MemoryBlockStore {
//--------------------------------------------------------------------------------------------------

/// Tests the retrieval property of a BlockStore-conforming type.
pub async fn bs_retrieval_test<T>(store: &T) -> Result<()>
where
T: BlockStore + 'static,
{
pub async fn bs_retrieval_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = b"hello world".to_vec();

// Insert the objects into the blockstore
let first_cid = store.put_serializable(&first_bytes).await?;
let second_cid = store.put_serializable(&second_bytes).await?;
let first_cid = store.put_block(first_bytes.clone(), CODEC_RAW).await?;
let second_cid = store.put_block(second_bytes.clone(), CODEC_RAW).await?;

// Retrieve the objects from the blockstore
let first_loaded: Vec<u8> = store.get_deserializable(&first_cid).await?;
let second_loaded: Vec<u8> = store.get_deserializable(&second_cid).await?;
let first_loaded = store.get_block(&first_cid).await?;
let second_loaded = store.get_block(&second_cid).await?;

// Assert that the objects are the same as the ones we inserted
assert_eq!(first_loaded, first_bytes);
Expand All @@ -170,24 +259,21 @@ where
}

/// Tests the duplication of a BlockStore-conforming type.
pub async fn bs_duplication_test<T>(store: &T) -> Result<()>
where
T: BlockStore + 'static,
{
pub async fn bs_duplication_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = first_bytes.clone();

// Insert the objects into the blockstore
let first_cid = store.put_serializable(&first_bytes).await?;
let second_cid = store.put_serializable(&second_bytes).await?;
let first_cid = store.put_block(first_bytes.clone(), CODEC_RAW).await?;
let second_cid = store.put_block(second_bytes.clone(), CODEC_RAW).await?;

// Assert that the two vecs produced the same CID
assert_eq!(first_cid, second_cid);

// Retrieve the objects from the blockstore
let first_loaded: Vec<u8> = store.get_deserializable(&first_cid).await?;
let second_loaded: Vec<u8> = store.get_deserializable(&second_cid).await?;
let first_loaded = store.get_block(&first_cid).await?;
let second_loaded = store.get_block(&second_cid).await?;

// Assert that the objects are the same as the ones we inserted
assert_eq!(first_loaded, first_bytes);
Expand All @@ -200,22 +286,22 @@ where
}

/// Tests the serialization of a BlockStore-conforming type.
pub async fn bs_serialization_test<T>(store: &T) -> Result<()>
pub async fn bs_serialization_test<T>(store: &T) -> Result<(), BlockStoreError>
where
T: BlockStore + Serialize + 'static + for<'de> Deserialize<'de>,
T: BlockStore + Serialize + for<'de> Deserialize<'de>,
{
// Example objects to insert and remove from the blockstore
let bytes = vec![1, 2, 3, 4, 5];

// Insert the object into the blockstore
let cid = store.put_serializable(&bytes).await?;
let cid = store.put_block(bytes.clone(), CODEC_RAW).await?;

// Serialize the BlockStore
let serial_store: Vec<u8> = encode(&store, DagCborCodec)?;
// Construct a new BlockStore from the Serialized object
let deserial_store: T = decode(&serial_store, DagCborCodec)?;
// Retrieve the object from the blockstore
let loaded: Vec<u8> = deserial_store.get_deserializable(&cid).await?;
let loaded = deserial_store.get_block(&cid).await?;

// Assert that the objects are the same as the ones we inserted
assert_eq!(loaded, bytes);
Expand All @@ -231,9 +317,9 @@ mod tests {
#[async_std::test]
async fn memory_blockstore() -> Result<()> {
let store = &MemoryBlockStore::new();
bs_retrieval_test(store).await?;
bs_duplication_test(store).await?;
bs_serialization_test(store).await?;
bs_retrieval_test::<MemoryBlockStore>(store).await?;
bs_duplication_test::<MemoryBlockStore>(store).await?;
bs_serialization_test::<MemoryBlockStore>(store).await?;
Ok(())
}
}
8 changes: 4 additions & 4 deletions wnfs-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub enum BlockStoreError {
#[error("Cannot find specified CID in block store: {0}")]
CIDNotFound(Cid),

#[error("Cannot find handler for block with CID: {0}")]
BlockHandlerNotFound(Cid),
#[error("CID error during blockstore operation: {0}")]
CIDError(#[from] cid::Error),

#[error("Lock poisoned")]
LockPoisoned,
#[error(transparent)]
Custom(#[from] anyhow::Error),
}
1 change: 0 additions & 1 deletion wnfs-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod link;
mod metadata;
mod pathnodes;
mod storable;
mod traits;
pub mod utils;

pub use blockstore::*;
Expand Down
Loading

0 comments on commit 3eaee3c

Please sign in to comment.