Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Sep 25, 2023
1 parent b315ddd commit 6978ed1
Show file tree
Hide file tree
Showing 29 changed files with 106 additions and 3,016 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use graph::blockchain::{
use graph::cheap_clone::CheapClone;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoint;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use graph::{
blockchain::{
block_stream::{
Expand Down Expand Up @@ -50,7 +52,7 @@ impl std::fmt::Debug for Chain {
}

impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self) -> Chain {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -257,7 +259,10 @@ pub struct FirehoseMapper {

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<codec::Block>, Error> {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<codec::Block>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("Arweave mapper is expected to always have a block"),
Expand All @@ -277,8 +282,9 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
}
async fn decode_triggers(
&self,
logger: &Logger,
block: &prost_types::Any,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
Expand Down Expand Up @@ -315,7 +321,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(&any_block))?.unwrap();

use ForkStep::*;
match step {
Expand Down
2 changes: 1 addition & 1 deletion chain/arweave/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[rustfmt::skip]
#[path = "protobuf/sf.arweave.r#type.v1.rs"]
pub mod pbcodec;
mod pbcodec;

use graph::{blockchain::Block as BlockchainBlock, blockchain::BlockPtr, prelude::BlockNumber};

Expand Down
2 changes: 1 addition & 1 deletion chain/arweave/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod adapter;
mod chain;
pub mod codec;
mod codec;
mod data_source;
mod runtime;
mod trigger;
Expand Down
35 changes: 0 additions & 35 deletions chain/common/src/pb/mod.rs

This file was deleted.

1 change: 0 additions & 1 deletion chain/common/src/substreams.rs

This file was deleted.

16 changes: 11 additions & 5 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::BlockIngestor;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use std::sync::Arc;

use graph::blockchain::block_stream::{FirehoseCursor, SubstreamsMapper};
Expand Down Expand Up @@ -46,7 +48,7 @@ impl std::fmt::Debug for Chain {
}

impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self) -> Chain {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
Expand Down Expand Up @@ -330,7 +332,10 @@ pub struct FirehoseMapper {

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<crate::Block>, Error> {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<crate::Block>, Error> {
let block = match output {
Some(block) => crate::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("cosmos mapper is expected to always have a block"),
Expand All @@ -351,8 +356,9 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {

async fn decode_triggers(
&self,
logger: &Logger,
block: &prost_types::Any,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
Expand Down Expand Up @@ -389,7 +395,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down struct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(&any_block))?.unwrap();

match step {
ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock(
Expand Down
2 changes: 1 addition & 1 deletion chain/cosmos/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use crate::protobuf::pbcodec::*;
pub(crate) use crate::protobuf::pbcodec::*;

use graph::blockchain::Block as BlockchainBlock;
use graph::{
Expand Down
13 changes: 9 additions & 4 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use graph::prelude::{
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
};
use graph::schema::InputSchema;
use graph::substreams::Clock;
use graph::{
blockchain::{
block_stream::{
Expand Down Expand Up @@ -734,7 +735,10 @@ pub struct FirehoseMapper {

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<BlockFinality>, Error> {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<BlockFinality>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("ethereum mapper is expected to always have a block"),
Expand All @@ -759,8 +763,9 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {

async fn decode_triggers(
&self,
logger: &Logger,
block: &prost_types::Any,
_logger: &Logger,
_clock: &Clock,
_block: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
unimplemented!()
}
Expand Down Expand Up @@ -801,7 +806,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
match step {
StepNew => {
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(&any_block))?.unwrap();
let block_with_triggers = self.block_with_triggers(logger, block).await?;

Ok(BlockStreamEvent::ProcessBlock(
Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[rustfmt::skip]
#[path = "protobuf/sf.ethereum.r#type.v2.rs"]
pub mod pbcodec;
mod pbcodec;

use anyhow::format_err;
use graph::{
Expand Down
29 changes: 25 additions & 4 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use graph::blockchain::{
use graph::cheap_clone::CheapClone;
use graph::components::store::DeploymentCursorTracker;
use graph::data::subgraph::UnifiedMappingApiVersion;
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoint;
use graph::prelude::{MetricsRegistry, TryFutureExt};
use graph::schema::InputSchema;
use graph::substreams::Package;
use graph::substreams::{Clock, Package};
use graph::{
anyhow::Result,
blockchain::{
Expand Down Expand Up @@ -161,6 +162,7 @@ pub struct Chain {
chain_store: Arc<dyn ChainStore>,
metrics_registry: Arc<MetricsRegistry>,
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
prefer_substreams: bool,
}

impl std::fmt::Debug for Chain {
Expand All @@ -170,14 +172,15 @@ impl std::fmt::Debug for Chain {
}

impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
fn build(self) -> Chain {
fn build(self, config: &Arc<EnvVars>) -> Chain {
Chain {
logger_factory: self.logger_factory,
name: self.name,
chain_store: self.chain_store,
client: Arc::new(ChainClient::new_firehose(self.firehose_endpoints)),
metrics_registry: self.metrics_registry,
block_stream_builder: Arc::new(NearStreamBuilder {}),
prefer_substreams: config.prefer_substreams_block_streams,
}
}
}
Expand Down Expand Up @@ -223,6 +226,20 @@ impl Blockchain for Chain {
filter: Arc<Self::TriggerFilter>,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Box<dyn BlockStream<Self>>, Error> {
if self.prefer_substreams {
return self
.block_stream_builder
.build_substreams(
self,
store.input_schema(),
deployment,
store.firehose_cursor(),
store.block_ptr(),
filter,
)
.await;
}

self.block_stream_builder
.build_firehose(
self,
Expand Down Expand Up @@ -390,7 +407,10 @@ pub struct FirehoseMapper {

#[async_trait]
impl SubstreamsMapper<Chain> for FirehoseMapper {
fn decode(&self, output: Option<&prost_types::Any>) -> Result<Option<codec::Block>, Error> {
fn decode_block(
&self,
output: Option<&prost_types::Any>,
) -> Result<Option<codec::Block>, Error> {
let block = match output {
Some(block) => codec::Block::decode(block.value.as_ref())?,
None => anyhow::bail!("near mapper is expected to always have a block"),
Expand All @@ -412,6 +432,7 @@ impl SubstreamsMapper<Chain> for FirehoseMapper {
async fn decode_triggers(
&self,
_logger: &Logger,
_clock: &Clock,
message: &prost_types::Any,
) -> Result<BlockWithTriggers<Chain>, Error> {
let BlockAndReceipts {
Expand Down Expand Up @@ -472,7 +493,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode(Some(&any_block))?.unwrap();
let block = self.decode_block(Some(&any_block))?.unwrap();

use ForkStep::*;
match step {
Expand Down
2 changes: 0 additions & 2 deletions chain/substreams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ base64 = "0.20.0"

itertools = "0.11.0"

graph-chain-common = { path = "../common" }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
18 changes: 1 addition & 17 deletions chain/substreams/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,7 @@ fn main() {
println!("cargo:rerun-if-changed=proto");
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".sf.near.codec.v1", "graph_chain_near::codec::pbcodec")
.extern_path(
".sf.ethereum.type.v2",
"graph_chain_ethereum::codec::pbcodec",
)
.extern_path(".sf.arweave.type.v1", "graph_chain_arweave::codec::pbcodec")
.extern_path(".sf.cosmos.type.v1", "graph_chain_cosmos::codec")
.out_dir("src/protobuf")
.compile(
&["proto/codec.proto"],
&[
"proto",
"../near/proto",
"../ethereum/proto",
"../arweave/proto",
"../cosmos/proto",
],
)
.compile(&["proto/codec.proto"], &["proto"])
.expect("Failed to compile Substreams entity proto(s)");
}
1 change: 0 additions & 1 deletion chain/substreams/proto/codec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ message Field {
optional Value new_value = 3;
optional Value old_value = 5;
}

3 changes: 2 additions & 1 deletion chain/substreams/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use graph::blockchain::{
BasicBlockchainBuilder, BlockIngestor, EmptyNodeCapabilities, NoopRuntimeAdapter,
};
use graph::components::store::{DeploymentCursorTracker, EntityKey};
use graph::env::EnvVars;
use graph::firehose::FirehoseEndpoints;
use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry};
use graph::{
Expand Down Expand Up @@ -192,7 +193,7 @@ impl Blockchain for Chain {
}

impl blockchain::BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
fn build(self) -> super::Chain {
fn build(self, _config: &Arc<EnvVars>) -> Chain {
let BasicBlockchainBuilder {
logger_factory,
name: _,
Expand Down
4 changes: 0 additions & 4 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ use graph::{
components::link_resolver::LinkResolver,
prelude::{async_trait, BlockNumber, DataSourceTemplateInfo, Link},
slog::Logger,
substreams::{
module::input::{Input, Params},
Module,
},
};

use prost::Message;
Expand Down
Loading

0 comments on commit 6978ed1

Please sign in to comment.