Skip to content

Commit

Permalink
Filipe/substreams triggers (#4887)
Browse files Browse the repository at this point in the history
* substreams triggers

* unify firehose and substreams mapper

* wire up substreams near

* cleanup

* fix test

* add trigger-filter and tests
  • Loading branch information
mangas authored Sep 27, 2023
1 parent 022dc36 commit 53dbe93
Show file tree
Hide file tree
Showing 61 changed files with 3,496 additions and 1,123 deletions.
132 changes: 124 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
"runtime/*",
"server/*",
"store/*",
"substreams-head-tracker",
"substreams/*",
"graph",
"tests",
]
Expand All @@ -33,3 +33,9 @@ incremental = false

[profile.dev]
incremental = false

[profile.release]
lto = true
opt-level = 's'
strip = "debuginfo"

2 changes: 1 addition & 1 deletion chain/arweave/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ fn main() {
println!("cargo:rerun-if-changed=proto");
tonic_build::configure()
.out_dir("src/protobuf")
.compile(&["proto/type.proto"], &["proto"])
.compile(&["proto/arweave.proto"], &["proto"])
.expect("Failed to compile Firehose Arweave proto(s)");
}
File renamed without changes.
59 changes: 49 additions & 10 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use graph::blockchain::{
use graph::cheap_clone::CheapClone;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoint;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use graph::{
blockchain::{
block_stream::{
Expand All @@ -33,7 +35,7 @@ use crate::{
codec,
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor};
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor, SubstreamsMapper};

pub struct Chain {
logger_factory: LoggerFactory,
Expand All @@ -50,7 +52,7 @@ impl std::fmt::Debug for Chain {
}

impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self) -> Chain {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -127,16 +129,14 @@ impl Blockchain for Chain {
.subgraph_logger(&deployment)
.new(o!("component" => "FirehoseBlockStream"));

let firehose_mapper = Arc::new(FirehoseMapper {});
let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter });

Ok(Box::new(FirehoseBlockStream::new(
deployment.hash,
self.chain_client(),
store.block_ptr(),
store.firehose_cursor(),
firehose_mapper,
adapter,
filter,
start_blocks,
logger,
self.metrics_registry.clone(),
Expand Down Expand Up @@ -252,16 +252,54 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

pub struct FirehoseMapper {}
pub struct FirehoseMapper {
adapter: Arc<dyn TriggersAdapterTrait<Chain>>,
filter: Arc<TriggerFilter>,
}

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<codec::Block>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("Arweave mapper is expected to always have a block"),
};

Ok(Some(block))
}

async fn block_with_triggers(
&self,
logger: &Logger,
block: codec::Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
self.adapter
.triggers_in_block(logger, block, self.filter.as_ref())
.await
}
async fn decode_triggers(
&self,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
}

#[async_trait]
impl FirehoseMapperTrait<Chain> for FirehoseMapper {
fn trigger_filter(&self) -> &TriggerFilter {
self.filter.as_ref()
}

async fn to_block_stream_event(
&self,
logger: &Logger,
response: &firehose::Response,
adapter: &Arc<dyn TriggersAdapterTrait<Chain>>,
filter: &TriggerFilter,
) -> Result<BlockStreamEvent<Chain>, FirehoseError> {
let step = ForkStep::from_i32(response.step).unwrap_or_else(|| {
panic!(
Expand All @@ -282,12 +320,13 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
//
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
let block = codec::Block::decode(any_block.value.as_ref())?;
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(&any_block))?.unwrap();

use ForkStep::*;
match step {
StepNew => Ok(BlockStreamEvent::ProcessBlock(
adapter.triggers_in_block(logger, block, filter).await?,
self.block_with_triggers(&logger, block).await?,
FirehoseCursor::from(response.cursor.clone()),
)),

Expand Down
Loading

0 comments on commit 53dbe93

Please sign in to comment.