Skip to content

Commit

Permalink
wire up substreams near
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Sep 22, 2023
1 parent ca15002 commit b315ddd
Show file tree
Hide file tree
Showing 23 changed files with 299 additions and 137 deletions.
15 changes: 11 additions & 4 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
codec,
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor};
use graph::blockchain::block_stream::{BlockStream, FirehoseCursor, SubstreamsMapper};

pub struct Chain {
logger_factory: LoggerFactory,
Expand Down Expand Up @@ -257,9 +257,9 @@ pub struct FirehoseMapper {

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<arweave::Block>, Error> {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<codec::Block>, Error> {
let block = match output {
Some(block) => arweave::Block::decode(block.value.as_ref())?,
Some(block) => codec::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("Arweave mapper is expected to always have a block"),
};

Expand All @@ -269,12 +269,19 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
async fn block_with_triggers(
&self,
logger: &Logger,
block: arweave::Block,
block: codec::Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
self.adapter
.triggers_in_block(logger, block, self.filter.as_ref())
.await
}
async fn decode_triggers(
&self,
logger: &Logger,
block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
}

#[async_trait]
Expand Down
15 changes: 15 additions & 0 deletions chain/common/proto/near-filter-substreams.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

import "near.proto";

package receipts.v1;

message BlockAndReceipts {
sf.near.codec.v1.Block block = 1;
repeated sf.near.codec.v1.ExecutionOutcomeWithId outcome = 2;
repeated sf.near.codec.v1.Receipt receipt = 3;
}




35 changes: 35 additions & 0 deletions chain/common/src/pb/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
pub mod receipts {
pub mod v1 {
include!("receipts.v1.rs");
}
}
pub mod sf {
pub mod arweave {
pub mod r#type {
pub mod v1 {
include!("sf.arweave.r#type.v1.rs");
}
}
}
pub mod ethereum {
pub mod r#type {
pub mod v2 {
include!("sf.ethereum.r#type.v2.rs");
}
}
}
pub mod near {
pub mod codec {
pub mod v1 {
include!("sf.near.codec.v1.rs");
}
}
}
}
pub mod substreams {
pub mod entity {
pub mod v1 {
include!("substreams.entity.v1.rs");
}
}
}
8 changes: 8 additions & 0 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,14 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.triggers_in_block(logger, block, self.filter.as_ref())
.await
}

async fn decode_triggers(
&self,
logger: &Logger,
block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
}

#[async_trait]
Expand Down
10 changes: 9 additions & 1 deletion chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ pub struct FirehoseMapper {
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<BlockFinality>, Error> {
let block = match output {
Some(block) => ethereum::Block::decode(block.value.as_ref())?,
Some(block) => codec::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("ethereum mapper is expected to always have a block"),
};

Expand All @@ -756,6 +756,14 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.triggers_in_block(logger, block, &self.filter)
.await
}

async fn decode_triggers(
&self,
logger: &Logger,
block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
}

#[async_trait]
Expand Down
6 changes: 5 additions & 1 deletion chain/near/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ fn main() {
println!("cargo:rerun-if-changed=proto");
tonic_build::configure()
.out_dir("src/protobuf")
.compile(&["proto/near.proto"], &["proto"])
.extern_path(".sf.near.codec.v1", "crate::codec::pbcodec")
.compile(
&["proto/near.proto", "proto/substreams-triggers.proto"],
&["proto"],
)
.expect("Failed to compile Firehose NEAR proto(s)");
}
12 changes: 12 additions & 0 deletions chain/near/proto/substreams-triggers.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";

import "near.proto";

package receipts.v1;

message BlockAndReceipts {
sf.near.codec.v1.Block block = 1;
repeated sf.near.codec.v1.ExecutionOutcomeWithId outcome = 2;
repeated sf.near.codec.v1.Receipt receipt = 3;
}

12 changes: 12 additions & 0 deletions chain/near/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::data_source::PartialAccounts;
use crate::{data_source::DataSource, Chain};
use graph::blockchain as bc;
use graph::firehose::{BasicReceiptFilter, PrefixSuffixPair};
use graph::itertools::Itertools;
use graph::prelude::*;
use prost::Message;
use prost_types::Any;
Expand All @@ -17,6 +18,17 @@ pub struct TriggerFilter {
pub(crate) receipt_filter: NearReceiptFilter,
}

impl TriggerFilter {
pub fn to_module_params(&self) -> String {
let matches = self.receipt_filter.accounts.iter().join(",");
format!(
"{}\n{}",
self.block_filter.trigger_every_block.to_string(),
matches
)
}
}

impl bc::TriggerFilter<Chain> for TriggerFilter {
fn extend<'a>(&mut self, data_sources: impl Iterator<Item = &'a DataSource> + Clone) {
let TriggerFilter {
Expand Down
93 changes: 87 additions & 6 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::anyhow;
use graph::blockchain::client::ChainClient;
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::substreams_block_stream::SubstreamsBlockStream;
use graph::blockchain::{
BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, BlockchainKind, NoopRuntimeAdapter,
};
Expand All @@ -9,6 +11,7 @@ use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::firehose::FirehoseEndpoint;
use graph::prelude::{MetricsRegistry, TryFutureExt};
use graph::schema::InputSchema;
use graph::substreams::Package;
use graph::{
anyhow::Result,
blockchain::{
Expand All @@ -28,6 +31,7 @@ use prost::Message;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
use crate::codec::substreams_triggers::BlockAndReceipts;
use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate};
use crate::trigger::{self, NearTrigger};
use crate::{
Expand All @@ -38,20 +42,66 @@ use graph::blockchain::block_stream::{
BlockStream, BlockStreamBuilder, FirehoseCursor, SubstreamsMapper,
};

const NEAR_FILTER_MODULE_NAME: &str = "near_filter";
const SUBSTREAMS_TRIGGER_FILTER_BYTES: &[u8; 443975] =
include_bytes!("../../../substreams-trigger-filter-v0.1.0.spkg");

pub struct NearStreamBuilder {}

#[async_trait]
impl BlockStreamBuilder<Chain> for NearStreamBuilder {
async fn build_substreams(
&self,
_chain: &Chain,
chain: &Chain,
_schema: Arc<InputSchema>,
_deployment: DeploymentLocator,
_block_cursor: FirehoseCursor,
_subgraph_current_block: Option<BlockPtr>,
_filter: Arc<<Chain as Blockchain>::TriggerFilter>,
deployment: DeploymentLocator,
block_cursor: FirehoseCursor,
subgraph_current_block: Option<BlockPtr>,
filter: Arc<<Chain as Blockchain>::TriggerFilter>,
) -> Result<Box<dyn BlockStream<Chain>>> {
unimplemented!()
let mapper = Arc::new(FirehoseMapper {
adapter: Arc::new(TriggersAdapter {}),
filter,
});
let mut package =
Package::decode(SUBSTREAMS_TRIGGER_FILTER_BYTES.to_vec().as_ref()).unwrap();
match package.modules.as_mut() {
Some(modules) => modules
.modules
.iter_mut()
.find(|module| module.name == NEAR_FILTER_MODULE_NAME)
.map(|module| {
graph::substreams::patch_module_params(
mapper.filter.to_module_params(),
module,
);
module
}),
None => None,
};

let logger = chain
.logger_factory
.subgraph_logger(&deployment)
.new(o!("component" => "SubstreamsBlockStream"));
let start_block = subgraph_current_block
.as_ref()
.map(|b| b.number)
.unwrap_or_default();

Ok(Box::new(SubstreamsBlockStream::new(
deployment.hash,
chain.chain_client(),
subgraph_current_block,
block_cursor.as_ref().clone(),
mapper,
package.modules.clone(),
NEAR_FILTER_MODULE_NAME.to_string(),
vec![start_block],
vec![],
logger,
chain.metrics_registry.clone(),
)))
}

async fn build_firehose(
Expand Down Expand Up @@ -358,6 +408,37 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
.triggers_in_block(logger, block, self.filter.as_ref())
.await
}

async fn decode_triggers(
&self,
_logger: &Logger,
message: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
let BlockAndReceipts {
block,
outcome,
receipt,
} = BlockAndReceipts::decode(message.value.as_ref())?;
let block = block.ok_or_else(|| anyhow!("near block is mandatory on substreams"))?;
let arc_block = Arc::new(block.clone());

let trigger_data = outcome
.into_iter()
.zip(receipt.into_iter())
.map(|(outcome, receipt)| {
NearTrigger::Receipt(Arc::new(trigger::ReceiptWithOutcome {
outcome,
receipt,
block: arc_block.clone(),
}))
})
.collect();

Ok(BlockWithTriggers {
block,
trigger_data,
})
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions chain/near/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
#[path = "protobuf/sf.near.codec.v1.rs"]
pub mod pbcodec;

#[rustfmt::skip]
#[path = "protobuf/receipts.v1.rs"]
pub mod substreams_triggers;

use graph::{
blockchain::Block as BlockchainBlock,
blockchain::BlockPtr,
Expand Down
10 changes: 10 additions & 0 deletions chain/near/src/protobuf/receipts.v1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BlockAndReceipts {
#[prost(message, optional, tag = "1")]
pub block: ::core::option::Option<crate::codec::pbcodec::Block>,
#[prost(message, repeated, tag = "2")]
pub outcome: ::prost::alloc::vec::Vec<crate::codec::pbcodec::ExecutionOutcomeWithId>,
#[prost(message, repeated, tag = "3")]
pub receipt: ::prost::alloc::vec::Vec<crate::codec::pbcodec::Receipt>,
}
17 changes: 0 additions & 17 deletions chain/substreams/proto/codec.proto
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
syntax = "proto3";

import "near.proto";
import "ethereum.proto";
import "arweave.proto";
import "cosmos.proto";

package substreams.entity.v1;

message EntityChanges {
Expand Down Expand Up @@ -50,15 +45,3 @@ message Field {
optional Value old_value = 5;
}

message InnerBlock {
oneof block {
sf.near.codec.v1.Block near = 1;
sf.ethereum.type.v2.Block eth = 2;
sf.arweave.type.v1.Block arweave = 3;
sf.cosmos.type.v1.Block cosmos = 4;
}
}

message Block {
InnerBlock inner = 1;
}
Loading

0 comments on commit b315ddd

Please sign in to comment.