Skip to content

Commit

Permalink
add L2TxSink for denylist
Browse files Browse the repository at this point in the history
  • Loading branch information
JayT106 committed May 29, 2024
1 parent 824c77d commit 0fee2d3
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 33 deletions.
2 changes: 2 additions & 0 deletions core/bin/zksync_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use zksync_config::{
},
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
tx_sink::TxSinkConfig,
ContractsConfig, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig,
FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, ObservabilityConfig,
PrometheusConfig, ProofDataHandlerConfig,
Expand Down Expand Up @@ -275,5 +276,6 @@ fn load_env_config() -> anyhow::Result<TempConfigStore> {
object_store_config: ObjectStoreConfig::from_env().ok(),
observability: ObservabilityConfig::from_env().ok(),
snapshot_creator: SnapshotsCreatorConfig::from_env().ok(),
tx_sink_config: TxSinkConfig::from_env().ok(),
})
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
chain::{CircuitBreakerConfig, MempoolConfig, OperationsManagerConfig, StateKeeperConfig},
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
tx_sink::TxSinkConfig,
FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig,
FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, ObservabilityConfig,
PrometheusConfig, ProofDataHandlerConfig,
Expand Down Expand Up @@ -32,4 +33,5 @@ pub struct GeneralConfig {
pub eth: Option<EthConfig>,
pub snapshot_creator: Option<SnapshotsCreatorConfig>,
pub observability: Option<ObservabilityConfig>,
pub tx_sink_config: Option<TxSinkConfig>,
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use self::{
observability::{ObservabilityConfig, OpentelemetryConfig},
proof_data_handler::ProofDataHandlerConfig,
snapshots_creator::SnapshotsCreatorConfig,
tx_sink::TxSinkConfig,
utils::PrometheusConfig,
};

Expand All @@ -43,6 +44,7 @@ pub mod object_store;
pub mod observability;
pub mod proof_data_handler;
pub mod snapshots_creator;
pub mod tx_sink;
pub mod utils;
pub mod wallets;

Expand Down
19 changes: 19 additions & 0 deletions core/lib/config/src/configs/tx_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::{collections::HashSet, str::FromStr};

use serde::Deserialize;
use zksync_basic_types::Address;

#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct TxSinkConfig {
pub deny_list: Option<String>,
}

impl TxSinkConfig {
pub fn deny_list(&self) -> Option<HashSet<Address>> {
self.deny_list.as_ref().map(|list| {
list.split(',')
.map(|element| Address::from_str(element).unwrap())
.collect()
})
}
}
8 changes: 8 additions & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,11 @@ impl Distribution<configs::consensus::ConsensusSecrets> for EncodeDist {
}
}
}

impl Distribution<configs::TxSinkConfig> for EncodeDist {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> configs::TxSinkConfig {
configs::TxSinkConfig {
deny_list: self.sample(rng),
}
}
}
1 change: 1 addition & 0 deletions core/lib/env_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod object_store;
mod observability;
mod proof_data_handler;
mod snapshots_creator;
mod tx_sink;
mod utils;

mod genesis;
Expand Down
35 changes: 35 additions & 0 deletions core/lib/env_config/src/tx_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use zksync_config::configs::TxSinkConfig;

use crate::{envy_load, FromEnv};

impl FromEnv for TxSinkConfig {
fn from_env() -> anyhow::Result<Self> {
envy_load("tx_sink", "TX_SINK_")
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::EnvMutex;

static MUTEX: EnvMutex = EnvMutex::new();

fn expected_config() -> TxSinkConfig {
TxSinkConfig {
deny_list: Some("0x1234567890abcdef".to_string()),
}
}

#[test]
fn from_env() {
let mut lock = MUTEX.lock();
let config = r#"
TX_SINK_DENY_LIST="0x1234567890abcdef"
"#;
lock.set_env(config);

let actual = TxSinkConfig::from_env().unwrap();
assert_eq!(actual, expected_config());
}
}
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl ProtoRepr for proto::GeneralConfig {
snapshot_creator: read_optional_repr(&self.snapshot_creator)
.context("snapshot_creator")?,
observability: read_optional_repr(&self.observability).context("observability")?,
tx_sink_config: read_optional_repr(&self.tx_sink).context("tx_sink")?,
})
}

Expand Down Expand Up @@ -68,6 +69,7 @@ impl ProtoRepr for proto::GeneralConfig {
eth: this.eth.as_ref().map(ProtoRepr::build),
snapshot_creator: this.snapshot_creator.as_ref().map(ProtoRepr::build),
observability: this.observability.as_ref().map(ProtoRepr::build),
tx_sink: this.tx_sink_config.as_ref().map(ProtoRepr::build),
}
}
}
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod snapshots_creator;
pub mod testonly;
#[cfg(test)]
mod tests;
mod tx_sink;
mod utils;
mod wallets;

Expand Down
6 changes: 4 additions & 2 deletions core/lib/protobuf_config/src/proto/config/general.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ syntax = "proto3";

package zksync.config.general;

import "zksync/config/prover.proto";
import "zksync/config/api.proto";
import "zksync/config/chain.proto";
import "zksync/config/circuit_breaker.proto";
import "zksync/config/contract_verifier.proto";
import "zksync/config/database.proto";
import "zksync/config/circuit_breaker.proto";
import "zksync/config/eth_sender.proto";
import "zksync/config/house_keeper.proto";
import "zksync/config/observability.proto";
import "zksync/config/prover.proto";
import "zksync/config/snapshots_creator.proto";
import "zksync/config/tx_sink.proto";
import "zksync/config/utils.proto";

message GeneralConfig {
Expand All @@ -35,4 +36,5 @@ message GeneralConfig {
optional config.prover.ProverGateway prover_gateway = 30;
optional config.snapshot_creator.SnapshotsCreator snapshot_creator = 31;
optional config.observability.Observability observability = 32;
optional config.tx_sink.TxSink tx_sink = 33;
}
7 changes: 7 additions & 0 deletions core/lib/protobuf_config/src/proto/config/tx_sink.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto3";

package zksync.config.tx_sink;

message TxSink {
optional string deny_list = 1; // optional
}
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn test_encoding() {
test_encode_all_formats::<ReprConv<proto::prover::ProofDataHandler>>(rng);
test_encode_all_formats::<ReprConv<proto::snapshot_creator::SnapshotsCreator>>(rng);
test_encode_all_formats::<ReprConv<proto::observability::Observability>>(rng);
test_encode_all_formats::<ReprConv<proto::tx_sink::TxSink>>(rng);
}

pub fn decode_yaml_repr<T: ProtoRepr>(
Expand Down
19 changes: 19 additions & 0 deletions core/lib/protobuf_config/src/tx_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use zksync_config::configs;
use zksync_protobuf::repr::ProtoRepr;

use crate::proto::tx_sink as proto;

impl ProtoRepr for proto::TxSink {
type Type = configs::tx_sink::TxSinkConfig;
fn read(&self) -> anyhow::Result<Self::Type> {
Ok(Self::Type {
deny_list: self.deny_list.clone(),
})
}

fn build(this: &Self::Type) -> Self {
Self {
deny_list: this.deny_list.clone(),
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::collections::{
hash_map::{Entry, HashMap},
HashSet,
};

use tokio::sync::Mutex;
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, Core, CoreDal};
use zksync_shared_metrics::{TxStage, APP_METRICS};
use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx, Address, Nonce, H256};

use super::{tx_sink::TxSink, SubmitTxError};
use crate::api_server::web3::metrics::API_METRICS;

/// Wrapper for the master DB pool that allows to submit transactions to the mempool.
#[derive(Debug)]
pub struct DenyListPoolSink {
deny_list_pool: ConnectionPool<Core>,
deny_list: HashSet<Address>,
inflight_requests: Mutex<HashMap<(Address, Nonce), H256>>,
}

impl DenyListPoolSink {
pub fn new(deny_list_pool: ConnectionPool<Core>, deny_list: HashSet<Address>) -> Self {
Self {
deny_list_pool,
deny_list,
inflight_requests: Mutex::new(HashMap::new()),
}
}
}

#[async_trait::async_trait]
impl TxSink for DenyListPoolSink {
async fn submit_tx(
&self,
tx: &L2Tx,
execution_metrics: TransactionExecutionMetrics,
) -> Result<L2TxSubmissionResult, SubmitTxError> {
let address_and_nonce = (tx.initiator_account(), tx.nonce());
if self.deny_list.contains(&address_and_nonce.0) {
return Err(SubmitTxError::SenderInDenyList);
}

let mut lock = self.inflight_requests.lock().await;
match lock.entry(address_and_nonce) {
Entry::Occupied(entry) => {
let submission_res_handle = if entry.get() == &tx.hash() {
L2TxSubmissionResult::Duplicate
} else {
L2TxSubmissionResult::InsertionInProgress
};
APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
return Ok(submission_res_handle);
}
Entry::Vacant(entry) => {
entry.insert(tx.hash());
API_METRICS.inflight_tx_submissions.inc_by(1);
}
};
drop(lock);

let result = match self.deny_list_pool.connection_tagged("api").await {
Ok(mut connection) => connection
.transactions_dal()
.insert_transaction_l2(tx, execution_metrics)
.await
.map(|submission_res_handle| {
APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
submission_res_handle
})
.map_err(|err| err.generalize().into()),
Err(err) => Err(err.generalize().into()),
};

self.inflight_requests
.lock()
.await
.remove(&address_and_nonce);
API_METRICS.inflight_tx_submissions.dec_by(1);

result
}
}
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::{
utils::pending_protocol_version,
};

pub mod deny_list_pool_sink;
pub mod master_pool_sink;
pub mod proxy;
mod result;
Expand Down
3 changes: 3 additions & 0 deletions core/lib/zksync_core/src/api_server/tx_sender/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub enum SubmitTxError {
/// Catch-all internal error (e.g., database error) that should not be exposed to the caller.
#[error("internal error")]
Internal(#[from] anyhow::Error),
#[error("sender is in deny list")]
SenderInDenyList,
}

impl SubmitTxError {
Expand Down Expand Up @@ -108,6 +110,7 @@ impl SubmitTxError {
Self::ProxyError(_) => "proxy-error",
Self::FailedToPublishCompressedBytecodes => "failed-to-publish-compressed-bytecodes",
Self::Internal(_) => "internal",
Self::SenderInDenyList => "sender-in-deny-list",
}
}

Expand Down
12 changes: 9 additions & 3 deletions core/lib/zksync_core/src/api_server/tx_sender/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ pub(crate) async fn create_test_tx_sender(

let storage_caches = PostgresStorageCaches::new(1, 1);
let batch_fee_model_input_provider = Arc::new(MockBatchFeeParamsProvider::default());

let config = crate::TxSenderBuilderConfigs {
tx_sender_config: tx_sender_config.clone(),
web3_json_config: web3_config.clone(),
state_keeper_config: state_keeper_config.clone(),
tx_sink_config: None,
};

let (mut tx_sender, vm_barrier) = crate::build_tx_sender(
&tx_sender_config,
&web3_config,
&state_keeper_config,
config,
pool.clone(),
pool,
batch_fee_model_input_provider,
Expand Down
Loading

0 comments on commit 0fee2d3

Please sign in to comment.