From 3e63476580a842d93339524ccb3d8f3ce2e49b99 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Wed, 18 Dec 2024 14:37:41 +0000 Subject: [PATCH] indexer-alt: remove {sum,wal}_{obj_types,coin_balances} (#20659) ## Description Clean up the previous consistent pipelines (obj_types and coin_balances) and their schemas. ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../2024-12-17-162230_drop_sum_wal/down.sql | 50 +++++ .../2024-12-17-162230_drop_sum_wal/up.sql | 7 + crates/sui-indexer-alt-schema/src/objects.rs | 72 +------ crates/sui-indexer-alt-schema/src/schema.rs | 52 ----- crates/sui-indexer-alt/src/config.rs | 161 +++++++-------- crates/sui-indexer-alt/src/handlers/mod.rs | 4 - .../src/handlers/sum_coin_balances.rs | 185 ------------------ .../src/handlers/sum_obj_types.rs | 183 ----------------- .../src/handlers/wal_coin_balances.rs | 65 ------ .../src/handlers/wal_obj_types.rs | 68 ------- crates/sui-indexer-alt/src/lib.rs | 86 ++------ 11 files changed, 136 insertions(+), 797 deletions(-) create mode 100644 crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql create mode 100644 crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql delete mode 100644 crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs delete mode 100644 crates/sui-indexer-alt/src/handlers/sum_obj_types.rs delete mode 100644 crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs delete mode 100644 crates/sui-indexer-alt/src/handlers/wal_obj_types.rs diff --git a/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql new file mode 100644 index 0000000000000..5306d86af5e89 --- /dev/null +++ b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql @@ -0,0 +1,50 @@ +CREATE TABLE IF NOT EXISTS sum_obj_types +( + object_id BYTEA PRIMARY KEY, + object_version BIGINT NOT NULL, + owner_kind SMALLINT NOT NULL, + owner_id BYTEA, + package BYTEA, + module TEXT, + name TEXT, + instantiation BYTEA +); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner +ON sum_obj_types (owner_kind, owner_id, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_pkg +ON sum_obj_types (package, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_mod +ON sum_obj_types (package, module, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_name +ON sum_obj_types (package, module, name, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_inst +ON sum_obj_types (package, module, name, instantiation, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_pkg +ON sum_obj_types (owner_kind, owner_id, package, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_mod +ON sum_obj_types (owner_kind, owner_id, package, module, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_name +ON sum_obj_types (owner_kind, owner_id, package, module, name, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_inst +ON sum_obj_types (owner_kind, owner_id, package, module, name, instantiation, object_id, object_version); + +CREATE TABLE IF NOT EXISTS sum_coin_balances +( + object_id BYTEA PRIMARY KEY, + object_version BIGINT NOT NULL, + owner_id BYTEA NOT NULL, + coin_type BYTEA NOT NULL, + coin_balance BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS sum_coin_balances_owner_type +ON sum_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version); diff --git a/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql new file mode 100644 index 0000000000000..03a4545304302 --- /dev/null +++ b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS sum_obj_types; + +DROP TABLE IF EXISTS wal_obj_types; + +DROP TABLE IF EXISTS sum_coin_balances; + +DROP TABLE IF EXISTS wal_coin_balances; diff --git a/crates/sui-indexer-alt-schema/src/objects.rs b/crates/sui-indexer-alt-schema/src/objects.rs index cc4211d0645d8..15c37eef0bdc8 100644 --- a/crates/sui-indexer-alt-schema/src/objects.rs +++ b/crates/sui-indexer-alt-schema/src/objects.rs @@ -6,12 +6,8 @@ use diesel::{ sql_types::SmallInt, FromSqlRow, }; use sui_field_count::FieldCount; -use sui_types::base_types::ObjectID; -use crate::schema::{ - coin_balance_buckets, kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, - wal_coin_balances, wal_obj_types, -}; +use crate::schema::{coin_balance_buckets, kv_objects, obj_info, obj_versions}; #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = kv_objects, primary_key(object_id, object_version))] @@ -30,17 +26,6 @@ pub struct StoredObjVersion { pub cp_sequence_number: i64, } -/// An insert/update or deletion of an object record, keyed on a particular Object ID and version. -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct StoredObjectUpdate { - pub object_id: ObjectID, - pub object_version: u64, - pub cp_sequence_number: u64, - /// `None` means the object was deleted or wrapped at this version, `Some(x)` means it was - /// changed to `x`. - pub update: Option, -} - #[derive(AsExpression, FromSqlRow, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] #[diesel(sql_type = SmallInt)] #[repr(i16)] @@ -59,54 +44,6 @@ pub enum StoredCoinOwnerKind { Consensus = 1, } -#[derive(Insertable, Debug, Clone, FieldCount)] -#[diesel(table_name = sum_coin_balances, primary_key(object_id))] -pub struct StoredSumCoinBalance { - pub object_id: Vec, - pub object_version: i64, - pub owner_id: Vec, - pub coin_type: Vec, - pub coin_balance: i64, -} - -#[derive(Insertable, Debug, Clone, FieldCount)] -#[diesel(table_name = sum_obj_types, primary_key(object_id))] -pub struct StoredSumObjType { - pub object_id: Vec, - pub object_version: i64, - pub owner_kind: StoredOwnerKind, - pub owner_id: Option>, - pub package: Option>, - pub module: Option, - pub name: Option, - pub instantiation: Option>, -} - -#[derive(Insertable, Debug, Clone)] -#[diesel(table_name = wal_coin_balances, primary_key(object_id, object_version))] -pub struct StoredWalCoinBalance { - pub object_id: Vec, - pub object_version: i64, - pub owner_id: Option>, - pub coin_type: Option>, - pub coin_balance: Option, - pub cp_sequence_number: i64, -} - -#[derive(Insertable, Debug, Clone)] -#[diesel(table_name = wal_obj_types, primary_key(object_id, object_version))] -pub struct StoredWalObjType { - pub object_id: Vec, - pub object_version: i64, - pub owner_kind: Option, - pub owner_id: Option>, - pub package: Option>, - pub module: Option, - pub name: Option, - pub instantiation: Option>, - pub cp_sequence_number: i64, -} - #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))] pub struct StoredObjInfo { @@ -131,13 +68,6 @@ pub struct StoredCoinBalanceBucket { pub coin_balance_bucket: Option, } -/// StoredObjectUpdate is a wrapper type, we want to count the fields of the inner type. -impl FieldCount for StoredObjectUpdate { - // Add one here for cp_sequence_number field, because StoredObjectUpdate is used for - // wal_* handlers, where the actual type to commit has an additional field besides fields of T. - const FIELD_COUNT: usize = T::FIELD_COUNT.saturating_add(1); -} - impl serialize::ToSql for StoredOwnerKind where i16: serialize::ToSql, diff --git a/crates/sui-indexer-alt-schema/src/schema.rs b/crates/sui-indexer-alt-schema/src/schema.rs index 9c59ec662d066..fe5c034144332 100644 --- a/crates/sui-indexer-alt-schema/src/schema.rs +++ b/crates/sui-indexer-alt-schema/src/schema.rs @@ -136,16 +136,6 @@ diesel::table! { } } -diesel::table! { - sum_coin_balances (object_id) { - object_id -> Bytea, - object_version -> Int8, - owner_id -> Bytea, - coin_type -> Bytea, - coin_balance -> Int8, - } -} - diesel::table! { sum_displays (object_type) { object_type -> Bytea, @@ -155,19 +145,6 @@ diesel::table! { } } -diesel::table! { - sum_obj_types (object_id) { - object_id -> Bytea, - object_version -> Int8, - owner_kind -> Int2, - owner_id -> Nullable, - package -> Nullable, - module -> Nullable, - name -> Nullable, - instantiation -> Nullable, - } -} - diesel::table! { sum_packages (package_id) { package_id -> Bytea, @@ -225,31 +202,6 @@ diesel::table! { } } -diesel::table! { - wal_coin_balances (object_id, object_version) { - object_id -> Bytea, - object_version -> Int8, - owner_id -> Nullable, - coin_type -> Nullable, - coin_balance -> Nullable, - cp_sequence_number -> Int8, - } -} - -diesel::table! { - wal_obj_types (object_id, object_version) { - object_id -> Bytea, - object_version -> Int8, - owner_kind -> Nullable, - owner_id -> Nullable, - package -> Nullable, - module -> Nullable, - name -> Nullable, - instantiation -> Nullable, - cp_sequence_number -> Int8, - } -} - diesel::allow_tables_to_appear_in_same_query!( coin_balance_buckets, ev_emit_mod, @@ -264,9 +216,7 @@ diesel::allow_tables_to_appear_in_same_query!( kv_transactions, obj_info, obj_versions, - sum_coin_balances, sum_displays, - sum_obj_types, sum_packages, tx_affected_addresses, tx_affected_objects, @@ -274,6 +224,4 @@ diesel::allow_tables_to_appear_in_same_query!( tx_calls, tx_digests, tx_kinds, - wal_coin_balances, - wal_obj_types, ); diff --git a/crates/sui-indexer-alt/src/config.rs b/crates/sui-indexer-alt/src/config.rs index ab297db368069..241e765c08351 100644 --- a/crates/sui-indexer-alt/src/config.rs +++ b/crates/sui-indexer-alt/src/config.rs @@ -46,17 +46,10 @@ pub struct IndexerConfig { pub extra: toml::Table, } -#[derive(Clone)] +#[derive(Default, Clone)] pub struct ConsistencyConfig { - /// How often to check whether write-ahead logs related to the consistent range can be - /// pruned. - pub consistent_pruning_interval_ms: u64, - - /// How long to wait before honouring reader low watermarks. - pub pruner_delay_ms: u64, - /// Number of checkpoints to delay indexing summary tables for. - pub consistent_range: Option, + pub consistent_range: u64, } // Configuration layers apply overrides over a base configuration. When reading configs from a @@ -81,8 +74,6 @@ pub struct IngestionLayer { #[DefaultConfig] #[derive(Clone, Default, Debug)] pub struct ConsistencyLayer { - consistent_pruning_interval_ms: Option, - pruner_delay_ms: Option, consistent_range: Option, #[serde(flatten)] @@ -137,13 +128,7 @@ pub struct PrunerLayer { #[derive(Clone, Default, Debug)] #[serde(rename_all = "snake_case")] pub struct PipelineLayer { - // Consistent pipelines (a sequential pipeline with a write-ahead log) - pub sum_coin_balances: Option, - pub wal_coin_balances: Option, - pub sum_obj_types: Option, - pub wal_obj_types: Option, - - // Sequential pipelines without a write-ahead log + // Sequential pipelines pub sum_displays: Option, pub sum_packages: Option, @@ -214,11 +199,7 @@ impl ConsistencyLayer { pub fn finish(self, base: ConsistencyConfig) -> ConsistencyConfig { check_extra("consistency", self.extra); ConsistencyConfig { - consistent_pruning_interval_ms: self - .consistent_pruning_interval_ms - .unwrap_or(base.consistent_pruning_interval_ms), - pruner_delay_ms: self.pruner_delay_ms.unwrap_or(base.pruner_delay_ms), - consistent_range: self.consistent_range.or(base.consistent_range), + consistent_range: self.consistent_range.unwrap_or(base.consistent_range), } } } @@ -286,10 +267,6 @@ impl PipelineLayer { /// configure. pub fn example() -> Self { PipelineLayer { - sum_coin_balances: Some(Default::default()), - wal_coin_balances: Some(Default::default()), - sum_obj_types: Some(Default::default()), - wal_obj_types: Some(Default::default()), sum_displays: Some(Default::default()), sum_packages: Some(Default::default()), obj_info: Some(Default::default()), @@ -355,10 +332,6 @@ impl Merge for ConsistencyLayer { check_extra("consistency", self.extra); check_extra("consistency", other.extra); ConsistencyLayer { - consistent_pruning_interval_ms: other - .consistent_pruning_interval_ms - .or(self.consistent_pruning_interval_ms), - pruner_delay_ms: other.pruner_delay_ms.or(self.pruner_delay_ms), consistent_range: other.consistent_range.or(self.consistent_range), extra: Default::default(), } @@ -428,10 +401,6 @@ impl Merge for PipelineLayer { check_extra("pipeline", self.extra); check_extra("pipeline", other.extra); PipelineLayer { - sum_coin_balances: self.sum_coin_balances.merge(other.sum_coin_balances), - wal_coin_balances: self.wal_coin_balances.merge(other.wal_coin_balances), - sum_obj_types: self.sum_obj_types.merge(other.sum_obj_types), - wal_obj_types: self.wal_obj_types.merge(other.wal_obj_types), sum_displays: self.sum_displays.merge(other.sum_displays), sum_packages: self.sum_packages.merge(other.sum_packages), obj_info: self.obj_info.merge(other.obj_info), @@ -473,16 +442,6 @@ impl Merge for Option { } } -impl Default for ConsistencyConfig { - fn default() -> Self { - Self { - consistent_pruning_interval_ms: 300_000, - pruner_delay_ms: 120_000, - consistent_range: None, - } - } -} - impl From for IngestionLayer { fn from(config: IngestionConfig) -> Self { Self { @@ -497,9 +456,7 @@ impl From for IngestionLayer { impl From for ConsistencyLayer { fn from(config: ConsistencyConfig) -> Self { Self { - consistent_pruning_interval_ms: Some(config.consistent_pruning_interval_ms), - pruner_delay_ms: Some(config.pruner_delay_ms), - consistent_range: config.consistent_range, + consistent_range: Some(config.consistent_range), extra: Default::default(), } } @@ -579,15 +536,11 @@ mod tests { #[test] fn merge_simple() { let this = ConsistencyLayer { - consistent_pruning_interval_ms: None, - pruner_delay_ms: Some(2000), consistent_range: Some(3000), extra: Default::default(), }; let that = ConsistencyLayer { - consistent_pruning_interval_ms: Some(1000), - pruner_delay_ms: None, consistent_range: Some(4000), extra: Default::default(), }; @@ -598,8 +551,6 @@ mod tests { assert_matches!( this_then_that, ConsistencyLayer { - consistent_pruning_interval_ms: Some(1000), - pruner_delay_ms: Some(2000), consistent_range: Some(4000), extra: _, } @@ -608,8 +559,6 @@ mod tests { assert_matches!( that_then_this, ConsistencyLayer { - consistent_pruning_interval_ms: Some(1000), - pruner_delay_ms: Some(2000), consistent_range: Some(3000), extra: _, } @@ -619,13 +568,6 @@ mod tests { #[test] fn merge_recursive() { let this = PipelineLayer { - sum_coin_balances: None, - sum_obj_types: Some(CommitterLayer { - write_concurrency: Some(5), - collect_interval_ms: Some(500), - watermark_interval_ms: None, - extra: Default::default(), - }), sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(10), @@ -636,17 +578,20 @@ mod tests { checkpoint_lag: Some(100), extra: Default::default(), }), + sum_packages: None, + ev_emit_mod: Some(ConcurrentLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(5), + collect_interval_ms: Some(500), + watermark_interval_ms: None, + extra: Default::default(), + }), + ..Default::default() + }), ..Default::default() }; let that = PipelineLayer { - sum_coin_balances: Some(CommitterLayer { - write_concurrency: Some(10), - collect_interval_ms: None, - watermark_interval_ms: Some(1000), - extra: Default::default(), - }), - sum_obj_types: None, sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(5), @@ -657,6 +602,16 @@ mod tests { checkpoint_lag: Some(200), extra: Default::default(), }), + sum_packages: Some(SequentialLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(10), + collect_interval_ms: None, + watermark_interval_ms: Some(1000), + extra: Default::default(), + }), + ..Default::default() + }), + ev_emit_mod: None, ..Default::default() }; @@ -666,18 +621,6 @@ mod tests { assert_matches!( this_then_that, PipelineLayer { - sum_coin_balances: Some(CommitterLayer { - write_concurrency: Some(10), - collect_interval_ms: None, - watermark_interval_ms: Some(1000), - extra: _, - }), - sum_obj_types: Some(CommitterLayer { - write_concurrency: Some(5), - collect_interval_ms: Some(500), - watermark_interval_ms: None, - extra: _, - }), sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(5), @@ -688,6 +631,27 @@ mod tests { checkpoint_lag: Some(200), extra: _, }), + sum_packages: Some(SequentialLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(10), + collect_interval_ms: None, + watermark_interval_ms: Some(1000), + extra: _, + }), + checkpoint_lag: None, + extra: _, + }), + ev_emit_mod: Some(ConcurrentLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(5), + collect_interval_ms: Some(500), + watermark_interval_ms: None, + extra: _, + }), + pruner: None, + checkpoint_lag: None, + extra: _, + }), .. }, ); @@ -695,18 +659,6 @@ mod tests { assert_matches!( that_then_this, PipelineLayer { - sum_coin_balances: Some(CommitterLayer { - write_concurrency: Some(10), - collect_interval_ms: None, - watermark_interval_ms: Some(1000), - extra: _, - }), - sum_obj_types: Some(CommitterLayer { - write_concurrency: Some(5), - collect_interval_ms: Some(500), - watermark_interval_ms: None, - extra: _, - }), sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(10), @@ -717,6 +669,27 @@ mod tests { checkpoint_lag: Some(100), extra: _, }), + sum_packages: Some(SequentialLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(10), + collect_interval_ms: None, + watermark_interval_ms: Some(1000), + extra: _, + }), + checkpoint_lag: None, + extra: _, + }), + ev_emit_mod: Some(ConcurrentLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(5), + collect_interval_ms: Some(500), + watermark_interval_ms: None, + extra: _, + }), + pruner: None, + checkpoint_lag: None, + extra: _, + }), .. }, ); diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index 3897d29838964..16fd82a9b56c4 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -15,9 +15,7 @@ pub(crate) mod kv_transactions; pub(crate) mod obj_info; pub(crate) mod obj_info_pruner; pub(crate) mod obj_versions; -pub(crate) mod sum_coin_balances; pub(crate) mod sum_displays; -pub(crate) mod sum_obj_types; pub(crate) mod sum_packages; pub(crate) mod tx_affected_addresses; pub(crate) mod tx_affected_objects; @@ -25,5 +23,3 @@ pub(crate) mod tx_balance_changes; pub(crate) mod tx_calls; pub(crate) mod tx_digests; pub(crate) mod tx_kinds; -pub(crate) mod wal_coin_balances; -pub(crate) mod wal_obj_types; diff --git a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs deleted file mode 100644 index 0e2a067440317..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::{ - collections::{btree_map::Entry, BTreeMap}, - sync::Arc, -}; - -use anyhow::{anyhow, bail, ensure}; -use diesel::{upsert::excluded, ExpressionMethods}; -use diesel_async::RunQueryDsl; -use futures::future::{try_join_all, Either}; -use sui_field_count::FieldCount; -use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumCoinBalance}, - schema::sum_coin_balances, -}; -use sui_pg_db as db; -use sui_types::{ - base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, - object::Owner, -}; - -const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumCoinBalance::FIELD_COUNT; -const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; - -pub(crate) struct SumCoinBalances; - -impl Processor for SumCoinBalances { - const NAME: &'static str = "sum_coin_balances"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> anyhow::Result> { - let CheckpointData { - transactions, - checkpoint_summary, - .. - } = checkpoint.as_ref(); - - let cp_sequence_number = checkpoint_summary.sequence_number; - let mut values: BTreeMap = BTreeMap::new(); - let mut coin_types: BTreeMap> = BTreeMap::new(); - - // Iterate over transactions in reverse so we see the latest version of each object first. - for tx in transactions.iter().rev() { - // Find all coins in the transaction's inputs and outputs. - for object in tx.input_objects.iter().chain(tx.output_objects.iter()) { - if let Some(coin_type) = object.type_().and_then(|t| t.coin_type_maybe()) { - let serialized = bcs::to_bytes(&coin_type) - .map_err(|_| anyhow!("Failed to serialize type for {}", object.id()))?; - - coin_types.insert(object.id(), serialized); - } - } - - // Deleted and wrapped coins - for change in tx.effects.object_changes() { - // The object is not deleted/wrapped, or if it is it was unwrapped in the same - // transaction. - if change.output_digest.is_some() || change.input_version.is_none() { - continue; - } - - // Object is not a coin - if !coin_types.contains_key(&change.id) { - continue; - } - - let object_id = change.id; - let object_version = tx.effects.lamport_version().value(); - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: None, - }); - } - } - } - - // Modified and created coins. - for object in &tx.output_objects { - let object_id = object.id(); - let object_version = object.version().value(); - - let Some(coin_type) = coin_types.get(&object_id) else { - continue; - }; - - // Coin balance only tracks address-owned objects - let Owner::AddressOwner(owner_id) = object.owner() else { - continue; - }; - - let Some(coin) = object.as_coin_maybe() else { - bail!("Failed to deserialize Coin for {object_id}"); - }; - - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: Some(StoredSumCoinBalance { - object_id: object_id.to_vec(), - object_version: object_version as i64, - owner_id: owner_id.to_vec(), - coin_type: coin_type.clone(), - coin_balance: coin.balance.value() as i64, - }), - }); - } - } - } - } - - Ok(values.into_values().collect()) - } -} - -#[async_trait::async_trait] -impl Handler for SumCoinBalances { - type Batch = BTreeMap; - - fn batch(batch: &mut Self::Batch, updates: Vec) { - // `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them - // will result in the batch containing the most up-to-date update for each object. - for update in updates { - batch.insert(update.object_id, update); - } - } - - async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result { - let mut updates = vec![]; - let mut deletes = vec![]; - - for update in batch.values() { - if let Some(update) = &update.update { - updates.push(update.clone()); - } else { - deletes.push(update.object_id.to_vec()); - } - } - let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left); - let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right); - - let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk { - Either::Left(update) => Either::Left( - diesel::insert_into(sum_coin_balances::table) - .values(update) - .on_conflict(sum_coin_balances::object_id) - .do_update() - .set(( - sum_coin_balances::object_version - .eq(excluded(sum_coin_balances::object_version)), - sum_coin_balances::owner_id.eq(excluded(sum_coin_balances::owner_id)), - sum_coin_balances::coin_balance - .eq(excluded(sum_coin_balances::coin_balance)), - )) - .execute(conn), - ), - - Either::Right(delete) => Either::Right( - diesel::delete(sum_coin_balances::table) - .filter(sum_coin_balances::object_id.eq_any(delete.iter().cloned())) - .execute(conn), - ), - }); - - Ok(try_join_all(futures).await?.into_iter().sum()) - } -} diff --git a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs deleted file mode 100644 index 84e61e2e01523..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::{ - collections::{btree_map::Entry, BTreeMap}, - sync::Arc, -}; - -use anyhow::{anyhow, ensure}; -use diesel::{upsert::excluded, ExpressionMethods}; -use diesel_async::RunQueryDsl; -use futures::future::{try_join_all, Either}; -use sui_field_count::FieldCount; -use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredOwnerKind, StoredSumObjType}, - schema::sum_obj_types, -}; -use sui_pg_db as db; -use sui_types::{ - base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, - object::Owner, -}; - -const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumObjType::FIELD_COUNT; -const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; - -pub(crate) struct SumObjTypes; - -impl Processor for SumObjTypes { - const NAME: &'static str = "sum_obj_types"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> anyhow::Result> { - let CheckpointData { - transactions, - checkpoint_summary, - .. - } = checkpoint.as_ref(); - - let cp_sequence_number = checkpoint_summary.sequence_number; - let mut values: BTreeMap = BTreeMap::new(); - - // Iterate over transactions in reverse so we see the latest version of each object first. - for tx in transactions.iter().rev() { - // Deleted and wrapped objects -- objects that show up without a digest in - // `object_changes` are either deleted or wrapped. Objects without an input version - // must have been unwrapped and deleted, meaning they do not need to be deleted from - // our records. - for change in tx.effects.object_changes() { - if change.output_digest.is_some() || change.input_version.is_none() { - continue; - } - - let object_id = change.id; - let object_version = tx.effects.lamport_version().value(); - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: None, - }); - } - } - } - - // Modified and created objects. - for object in &tx.output_objects { - let object_id = object.id(); - let object_version = object.version().value(); - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - let type_ = object.type_(); - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: Some(StoredSumObjType { - object_id: object_id.to_vec(), - object_version: object_version as i64, - - owner_kind: match object.owner() { - Owner::AddressOwner(_) => StoredOwnerKind::Address, - Owner::ObjectOwner(_) => StoredOwnerKind::Object, - Owner::Shared { .. } => StoredOwnerKind::Shared, - Owner::Immutable => StoredOwnerKind::Immutable, - // TODO: Implement support for ConsensusV2 objects. - Owner::ConsensusV2 { .. } => todo!(), - }, - - owner_id: match object.owner() { - Owner::AddressOwner(a) => Some(a.to_vec()), - Owner::ObjectOwner(o) => Some(o.to_vec()), - _ => None, - }, - - package: type_.map(|t| t.address().to_vec()), - module: type_.map(|t| t.module().to_string()), - name: type_.map(|t| t.name().to_string()), - instantiation: type_ - .map(|t| bcs::to_bytes(&t.type_params())) - .transpose() - .map_err(|e| { - anyhow!( - "Failed to serialize type parameters for {}: {e}", - object - .id() - .to_canonical_display(/* with_prefix */ true), - ) - })?, - }), - }); - } - } - } - } - - Ok(values.into_values().collect()) - } -} - -#[async_trait::async_trait] -impl Handler for SumObjTypes { - type Batch = BTreeMap; - - fn batch(batch: &mut Self::Batch, updates: Vec) { - // `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them - // will result in the batch containing the most up-to-date update for each object. - for update in updates { - batch.insert(update.object_id, update); - } - } - - async fn commit(values: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result { - let mut updates = vec![]; - let mut deletes = vec![]; - - for update in values.values() { - if let Some(update) = &update.update { - updates.push(update.clone()); - } else { - deletes.push(update.object_id.to_vec()); - } - } - - let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left); - let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right); - - let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk { - Either::Left(update) => Either::Left( - diesel::insert_into(sum_obj_types::table) - .values(update) - .on_conflict(sum_obj_types::object_id) - .do_update() - .set(( - sum_obj_types::object_version.eq(excluded(sum_obj_types::object_version)), - sum_obj_types::owner_kind.eq(excluded(sum_obj_types::owner_kind)), - sum_obj_types::owner_id.eq(excluded(sum_obj_types::owner_id)), - )) - .execute(conn), - ), - - Either::Right(delete) => Either::Right( - diesel::delete(sum_obj_types::table) - .filter(sum_obj_types::object_id.eq_any(delete.iter().cloned())) - .execute(conn), - ), - }); - - Ok(try_join_all(futures).await?.into_iter().sum()) - } -} diff --git a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs deleted file mode 100644 index 849b2d1faed7f..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::Result; -use diesel::{ExpressionMethods, QueryDsl}; -use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance}, - schema::wal_coin_balances, -}; -use sui_pg_db as db; -use sui_types::full_checkpoint_content::CheckpointData; - -use super::sum_coin_balances::SumCoinBalances; - -pub(crate) struct WalCoinBalances; - -impl Processor for WalCoinBalances { - const NAME: &'static str = "wal_coin_balances"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> Result> { - SumCoinBalances.process(checkpoint) - } -} - -#[async_trait::async_trait] -impl Handler for WalCoinBalances { - const MIN_EAGER_ROWS: usize = 100; - const MAX_PENDING_ROWS: usize = 10000; - - async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { - let values: Vec<_> = values - .iter() - .map(|value| StoredWalCoinBalance { - object_id: value.object_id.to_vec(), - object_version: value.object_version as i64, - - owner_id: value.update.as_ref().map(|o| o.owner_id.clone()), - - coin_type: value.update.as_ref().map(|o| o.coin_type.clone()), - coin_balance: value.update.as_ref().map(|o| o.coin_balance), - - cp_sequence_number: value.cp_sequence_number as i64, - }) - .collect(); - - Ok(diesel::insert_into(wal_coin_balances::table) - .values(&values) - .on_conflict_do_nothing() - .execute(conn) - .await?) - } - - async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { - let filter = wal_coin_balances::table - .filter(wal_coin_balances::cp_sequence_number.between(from as i64, to as i64 - 1)); - - Ok(diesel::delete(filter).execute(conn).await?) - } -} diff --git a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs deleted file mode 100644 index c0a6feb51cd24..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::Result; -use diesel::{ExpressionMethods, QueryDsl}; -use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumObjType, StoredWalObjType}, - schema::wal_obj_types, -}; -use sui_pg_db as db; -use sui_types::full_checkpoint_content::CheckpointData; - -use super::sum_obj_types::SumObjTypes; - -pub(crate) struct WalObjTypes; - -impl Processor for WalObjTypes { - const NAME: &'static str = "wal_obj_types"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> Result> { - SumObjTypes.process(checkpoint) - } -} - -#[async_trait::async_trait] -impl Handler for WalObjTypes { - const MIN_EAGER_ROWS: usize = 100; - const MAX_PENDING_ROWS: usize = 10000; - - async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { - let values: Vec<_> = values - .iter() - .map(|value| StoredWalObjType { - object_id: value.object_id.to_vec(), - object_version: value.object_version as i64, - - owner_kind: value.update.as_ref().map(|o| o.owner_kind), - owner_id: value.update.as_ref().and_then(|o| o.owner_id.clone()), - - package: value.update.as_ref().and_then(|o| o.package.clone()), - module: value.update.as_ref().and_then(|o| o.module.clone()), - name: value.update.as_ref().and_then(|o| o.name.clone()), - instantiation: value.update.as_ref().and_then(|o| o.instantiation.clone()), - - cp_sequence_number: value.cp_sequence_number as i64, - }) - .collect(); - - Ok(diesel::insert_into(wal_obj_types::table) - .values(&values) - .on_conflict_do_nothing() - .execute(conn) - .await?) - } - - async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { - let filter = wal_obj_types::table - .filter(wal_obj_types::cp_sequence_number.between(from as i64, to as i64 - 1)); - - Ok(diesel::delete(filter).execute(conn).await?) - } -} diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 4889d99bc83e2..15f41c9ef3b06 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -11,11 +11,10 @@ use handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags, kv_objects::KvObjects, kv_protocol_configs::KvProtocolConfigs, kv_transactions::KvTransactions, - obj_info::ObjInfo, obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, - sum_displays::SumDisplays, sum_obj_types::SumObjTypes, sum_packages::SumPackages, - tx_affected_addresses::TxAffectedAddresses, tx_affected_objects::TxAffectedObjects, - tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests, - tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, + obj_info::ObjInfo, obj_versions::ObjVersions, sum_displays::SumDisplays, + sum_packages::SumPackages, tx_affected_addresses::TxAffectedAddresses, + tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, + tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds, }; use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig}; use sui_indexer_alt_framework::pipeline::{ @@ -57,10 +56,6 @@ pub async fn start_indexer( } = indexer_config.finish(); let PipelineLayer { - sum_coin_balances, - wal_coin_balances, - sum_obj_types, - wal_obj_types, sum_displays, sum_packages, obj_info, @@ -87,29 +82,10 @@ pub async fn start_indexer( } = pipeline.finish(); let ingestion = ingestion.finish(IngestionConfig::default()); - - let ConsistencyConfig { - consistent_pruning_interval_ms, - pruner_delay_ms, - consistent_range, - } = consistency.finish(ConsistencyConfig::default()); - + let consistency = consistency.finish(ConsistencyConfig::default()); let committer = committer.finish(CommitterConfig::default()); let pruner = pruner.finish(PrunerConfig::default()); - // Pipelines that are split up into a summary table, and a write-ahead log prune their - // write-ahead log so it contains just enough information to overlap with the summary table. - let consistent_range = consistent_range.unwrap_or_default(); - let pruner_config = (consistent_range != 0).then(|| PrunerConfig { - interval_ms: consistent_pruning_interval_ms, - delay_ms: pruner_delay_ms, - // Retain at least twice as much data as the lag, to guarantee overlap between the - // summary table and the write-ahead log. - retention: consistent_range * 2, - // Prune roughly five minutes of data in one go. - max_chunk_size: 5 * 300, - }); - let cancel = CancellationToken::new(); let retry_interval = ingestion.retry_interval(); @@ -171,40 +147,10 @@ pub async fn start_indexer( }; } + // A consistent pipeline consists of two concurrent pipelines. The first (main) one writes + // new data, and the second one lags behind deleting data that has fallen out of the consistent + // range. macro_rules! add_consistent { - ($sum_handler:expr, $sum_config:expr; $wal_handler:expr, $wal_config:expr) => { - if let Some(sum_layer) = $sum_config { - indexer - .sequential_pipeline( - $sum_handler, - SequentialConfig { - committer: sum_layer.finish(committer.clone()), - checkpoint_lag: consistent_range, - }, - ) - .await?; - - if let Some(pruner_config) = pruner_config.clone() { - indexer - .concurrent_pipeline( - $wal_handler, - ConcurrentConfig { - committer: $wal_config - .unwrap_or_default() - .finish(committer.clone()), - pruner: Some(pruner_config), - checkpoint_lag: None, - }, - ) - .await?; - } - } - }; - } - - // Add two concurrent pipelines, one as the main pipeline, and one as a lagged pruner. - // The lagged pruner will prune the main pipeline's data based on the consistency range. - macro_rules! add_concurrent_with_lagged_pruner { ($main_handler:expr, $main_config:expr; $lagged_handler:expr, $lagged_config:expr) => { if let Some(main_layer) = $main_config { indexer @@ -224,7 +170,7 @@ pub async fn start_indexer( $lagged_config.unwrap_or_default().finish(ConcurrentConfig { committer: committer.clone(), pruner: None, - checkpoint_lag: Some(consistent_range), + checkpoint_lag: Some(consistency.consistent_range), }), ) .await?; @@ -240,26 +186,16 @@ pub async fn start_indexer( add_concurrent!(KvProtocolConfigs(genesis.clone()), kv_protocol_configs); } - add_consistent!( - SumCoinBalances, sum_coin_balances; - WalCoinBalances, wal_coin_balances - ); - - add_consistent!( - SumObjTypes, sum_obj_types; - WalObjTypes, wal_obj_types - ); - // Other summary tables (without write-ahead log) add_sequential!(SumDisplays, sum_displays); add_sequential!(SumPackages, sum_packages); - add_concurrent_with_lagged_pruner!( + add_consistent!( ObjInfo, obj_info; ObjInfoPruner, obj_info_pruner ); - add_concurrent_with_lagged_pruner!( + add_consistent!( CoinBalanceBuckets, coin_balance_buckets; CoinBalanceBucketsPruner, coin_balance_buckets_pruner );