diff --git a/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql new file mode 100644 index 0000000000000..5306d86af5e89 --- /dev/null +++ b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/down.sql @@ -0,0 +1,50 @@ +CREATE TABLE IF NOT EXISTS sum_obj_types +( + object_id BYTEA PRIMARY KEY, + object_version BIGINT NOT NULL, + owner_kind SMALLINT NOT NULL, + owner_id BYTEA, + package BYTEA, + module TEXT, + name TEXT, + instantiation BYTEA +); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner +ON sum_obj_types (owner_kind, owner_id, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_pkg +ON sum_obj_types (package, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_mod +ON sum_obj_types (package, module, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_name +ON sum_obj_types (package, module, name, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_inst +ON sum_obj_types (package, module, name, instantiation, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_pkg +ON sum_obj_types (owner_kind, owner_id, package, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_mod +ON sum_obj_types (owner_kind, owner_id, package, module, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_name +ON sum_obj_types (owner_kind, owner_id, package, module, name, object_id, object_version); + +CREATE INDEX IF NOT EXISTS sum_obj_types_owner_inst +ON sum_obj_types (owner_kind, owner_id, package, module, name, instantiation, object_id, object_version); + +CREATE TABLE IF NOT EXISTS sum_coin_balances +( + object_id BYTEA PRIMARY KEY, + object_version BIGINT NOT NULL, + owner_id BYTEA NOT NULL, + coin_type BYTEA NOT NULL, + coin_balance BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS sum_coin_balances_owner_type +ON sum_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version); diff --git a/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql new file mode 100644 index 0000000000000..03a4545304302 --- /dev/null +++ b/crates/sui-indexer-alt-schema/migrations/2024-12-17-162230_drop_sum_wal/up.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS sum_obj_types; + +DROP TABLE IF EXISTS wal_obj_types; + +DROP TABLE IF EXISTS sum_coin_balances; + +DROP TABLE IF EXISTS wal_coin_balances; diff --git a/crates/sui-indexer-alt-schema/src/objects.rs b/crates/sui-indexer-alt-schema/src/objects.rs index cc4211d0645d8..15c37eef0bdc8 100644 --- a/crates/sui-indexer-alt-schema/src/objects.rs +++ b/crates/sui-indexer-alt-schema/src/objects.rs @@ -6,12 +6,8 @@ use diesel::{ sql_types::SmallInt, FromSqlRow, }; use sui_field_count::FieldCount; -use sui_types::base_types::ObjectID; -use crate::schema::{ - coin_balance_buckets, kv_objects, obj_info, obj_versions, sum_coin_balances, sum_obj_types, - wal_coin_balances, wal_obj_types, -}; +use crate::schema::{coin_balance_buckets, kv_objects, obj_info, obj_versions}; #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = kv_objects, primary_key(object_id, object_version))] @@ -30,17 +26,6 @@ pub struct StoredObjVersion { pub cp_sequence_number: i64, } -/// An insert/update or deletion of an object record, keyed on a particular Object ID and version. -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub struct StoredObjectUpdate { - pub object_id: ObjectID, - pub object_version: u64, - pub cp_sequence_number: u64, - /// `None` means the object was deleted or wrapped at this version, `Some(x)` means it was - /// changed to `x`. - pub update: Option, -} - #[derive(AsExpression, FromSqlRow, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] #[diesel(sql_type = SmallInt)] #[repr(i16)] @@ -59,54 +44,6 @@ pub enum StoredCoinOwnerKind { Consensus = 1, } -#[derive(Insertable, Debug, Clone, FieldCount)] -#[diesel(table_name = sum_coin_balances, primary_key(object_id))] -pub struct StoredSumCoinBalance { - pub object_id: Vec, - pub object_version: i64, - pub owner_id: Vec, - pub coin_type: Vec, - pub coin_balance: i64, -} - -#[derive(Insertable, Debug, Clone, FieldCount)] -#[diesel(table_name = sum_obj_types, primary_key(object_id))] -pub struct StoredSumObjType { - pub object_id: Vec, - pub object_version: i64, - pub owner_kind: StoredOwnerKind, - pub owner_id: Option>, - pub package: Option>, - pub module: Option, - pub name: Option, - pub instantiation: Option>, -} - -#[derive(Insertable, Debug, Clone)] -#[diesel(table_name = wal_coin_balances, primary_key(object_id, object_version))] -pub struct StoredWalCoinBalance { - pub object_id: Vec, - pub object_version: i64, - pub owner_id: Option>, - pub coin_type: Option>, - pub coin_balance: Option, - pub cp_sequence_number: i64, -} - -#[derive(Insertable, Debug, Clone)] -#[diesel(table_name = wal_obj_types, primary_key(object_id, object_version))] -pub struct StoredWalObjType { - pub object_id: Vec, - pub object_version: i64, - pub owner_kind: Option, - pub owner_id: Option>, - pub package: Option>, - pub module: Option, - pub name: Option, - pub instantiation: Option>, - pub cp_sequence_number: i64, -} - #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = obj_info, primary_key(object_id, cp_sequence_number))] pub struct StoredObjInfo { @@ -131,13 +68,6 @@ pub struct StoredCoinBalanceBucket { pub coin_balance_bucket: Option, } -/// StoredObjectUpdate is a wrapper type, we want to count the fields of the inner type. -impl FieldCount for StoredObjectUpdate { - // Add one here for cp_sequence_number field, because StoredObjectUpdate is used for - // wal_* handlers, where the actual type to commit has an additional field besides fields of T. - const FIELD_COUNT: usize = T::FIELD_COUNT.saturating_add(1); -} - impl serialize::ToSql for StoredOwnerKind where i16: serialize::ToSql, diff --git a/crates/sui-indexer-alt-schema/src/schema.rs b/crates/sui-indexer-alt-schema/src/schema.rs index 9c59ec662d066..fe5c034144332 100644 --- a/crates/sui-indexer-alt-schema/src/schema.rs +++ b/crates/sui-indexer-alt-schema/src/schema.rs @@ -136,16 +136,6 @@ diesel::table! { } } -diesel::table! { - sum_coin_balances (object_id) { - object_id -> Bytea, - object_version -> Int8, - owner_id -> Bytea, - coin_type -> Bytea, - coin_balance -> Int8, - } -} - diesel::table! { sum_displays (object_type) { object_type -> Bytea, @@ -155,19 +145,6 @@ diesel::table! { } } -diesel::table! { - sum_obj_types (object_id) { - object_id -> Bytea, - object_version -> Int8, - owner_kind -> Int2, - owner_id -> Nullable, - package -> Nullable, - module -> Nullable, - name -> Nullable, - instantiation -> Nullable, - } -} - diesel::table! { sum_packages (package_id) { package_id -> Bytea, @@ -225,31 +202,6 @@ diesel::table! { } } -diesel::table! { - wal_coin_balances (object_id, object_version) { - object_id -> Bytea, - object_version -> Int8, - owner_id -> Nullable, - coin_type -> Nullable, - coin_balance -> Nullable, - cp_sequence_number -> Int8, - } -} - -diesel::table! { - wal_obj_types (object_id, object_version) { - object_id -> Bytea, - object_version -> Int8, - owner_kind -> Nullable, - owner_id -> Nullable, - package -> Nullable, - module -> Nullable, - name -> Nullable, - instantiation -> Nullable, - cp_sequence_number -> Int8, - } -} - diesel::allow_tables_to_appear_in_same_query!( coin_balance_buckets, ev_emit_mod, @@ -264,9 +216,7 @@ diesel::allow_tables_to_appear_in_same_query!( kv_transactions, obj_info, obj_versions, - sum_coin_balances, sum_displays, - sum_obj_types, sum_packages, tx_affected_addresses, tx_affected_objects, @@ -274,6 +224,4 @@ diesel::allow_tables_to_appear_in_same_query!( tx_calls, tx_digests, tx_kinds, - wal_coin_balances, - wal_obj_types, ); diff --git a/crates/sui-indexer-alt/src/config.rs b/crates/sui-indexer-alt/src/config.rs index ab297db368069..241e765c08351 100644 --- a/crates/sui-indexer-alt/src/config.rs +++ b/crates/sui-indexer-alt/src/config.rs @@ -46,17 +46,10 @@ pub struct IndexerConfig { pub extra: toml::Table, } -#[derive(Clone)] +#[derive(Default, Clone)] pub struct ConsistencyConfig { - /// How often to check whether write-ahead logs related to the consistent range can be - /// pruned. - pub consistent_pruning_interval_ms: u64, - - /// How long to wait before honouring reader low watermarks. - pub pruner_delay_ms: u64, - /// Number of checkpoints to delay indexing summary tables for. - pub consistent_range: Option, + pub consistent_range: u64, } // Configuration layers apply overrides over a base configuration. When reading configs from a @@ -81,8 +74,6 @@ pub struct IngestionLayer { #[DefaultConfig] #[derive(Clone, Default, Debug)] pub struct ConsistencyLayer { - consistent_pruning_interval_ms: Option, - pruner_delay_ms: Option, consistent_range: Option, #[serde(flatten)] @@ -137,13 +128,7 @@ pub struct PrunerLayer { #[derive(Clone, Default, Debug)] #[serde(rename_all = "snake_case")] pub struct PipelineLayer { - // Consistent pipelines (a sequential pipeline with a write-ahead log) - pub sum_coin_balances: Option, - pub wal_coin_balances: Option, - pub sum_obj_types: Option, - pub wal_obj_types: Option, - - // Sequential pipelines without a write-ahead log + // Sequential pipelines pub sum_displays: Option, pub sum_packages: Option, @@ -214,11 +199,7 @@ impl ConsistencyLayer { pub fn finish(self, base: ConsistencyConfig) -> ConsistencyConfig { check_extra("consistency", self.extra); ConsistencyConfig { - consistent_pruning_interval_ms: self - .consistent_pruning_interval_ms - .unwrap_or(base.consistent_pruning_interval_ms), - pruner_delay_ms: self.pruner_delay_ms.unwrap_or(base.pruner_delay_ms), - consistent_range: self.consistent_range.or(base.consistent_range), + consistent_range: self.consistent_range.unwrap_or(base.consistent_range), } } } @@ -286,10 +267,6 @@ impl PipelineLayer { /// configure. pub fn example() -> Self { PipelineLayer { - sum_coin_balances: Some(Default::default()), - wal_coin_balances: Some(Default::default()), - sum_obj_types: Some(Default::default()), - wal_obj_types: Some(Default::default()), sum_displays: Some(Default::default()), sum_packages: Some(Default::default()), obj_info: Some(Default::default()), @@ -355,10 +332,6 @@ impl Merge for ConsistencyLayer { check_extra("consistency", self.extra); check_extra("consistency", other.extra); ConsistencyLayer { - consistent_pruning_interval_ms: other - .consistent_pruning_interval_ms - .or(self.consistent_pruning_interval_ms), - pruner_delay_ms: other.pruner_delay_ms.or(self.pruner_delay_ms), consistent_range: other.consistent_range.or(self.consistent_range), extra: Default::default(), } @@ -428,10 +401,6 @@ impl Merge for PipelineLayer { check_extra("pipeline", self.extra); check_extra("pipeline", other.extra); PipelineLayer { - sum_coin_balances: self.sum_coin_balances.merge(other.sum_coin_balances), - wal_coin_balances: self.wal_coin_balances.merge(other.wal_coin_balances), - sum_obj_types: self.sum_obj_types.merge(other.sum_obj_types), - wal_obj_types: self.wal_obj_types.merge(other.wal_obj_types), sum_displays: self.sum_displays.merge(other.sum_displays), sum_packages: self.sum_packages.merge(other.sum_packages), obj_info: self.obj_info.merge(other.obj_info), @@ -473,16 +442,6 @@ impl Merge for Option { } } -impl Default for ConsistencyConfig { - fn default() -> Self { - Self { - consistent_pruning_interval_ms: 300_000, - pruner_delay_ms: 120_000, - consistent_range: None, - } - } -} - impl From for IngestionLayer { fn from(config: IngestionConfig) -> Self { Self { @@ -497,9 +456,7 @@ impl From for IngestionLayer { impl From for ConsistencyLayer { fn from(config: ConsistencyConfig) -> Self { Self { - consistent_pruning_interval_ms: Some(config.consistent_pruning_interval_ms), - pruner_delay_ms: Some(config.pruner_delay_ms), - consistent_range: config.consistent_range, + consistent_range: Some(config.consistent_range), extra: Default::default(), } } @@ -579,15 +536,11 @@ mod tests { #[test] fn merge_simple() { let this = ConsistencyLayer { - consistent_pruning_interval_ms: None, - pruner_delay_ms: Some(2000), consistent_range: Some(3000), extra: Default::default(), }; let that = ConsistencyLayer { - consistent_pruning_interval_ms: Some(1000), - pruner_delay_ms: None, consistent_range: Some(4000), extra: Default::default(), }; @@ -598,8 +551,6 @@ mod tests { assert_matches!( this_then_that, ConsistencyLayer { - consistent_pruning_interval_ms: Some(1000), - pruner_delay_ms: Some(2000), consistent_range: Some(4000), extra: _, } @@ -608,8 +559,6 @@ mod tests { assert_matches!( that_then_this, ConsistencyLayer { - consistent_pruning_interval_ms: Some(1000), - pruner_delay_ms: Some(2000), consistent_range: Some(3000), extra: _, } @@ -619,13 +568,6 @@ mod tests { #[test] fn merge_recursive() { let this = PipelineLayer { - sum_coin_balances: None, - sum_obj_types: Some(CommitterLayer { - write_concurrency: Some(5), - collect_interval_ms: Some(500), - watermark_interval_ms: None, - extra: Default::default(), - }), sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(10), @@ -636,17 +578,20 @@ mod tests { checkpoint_lag: Some(100), extra: Default::default(), }), + sum_packages: None, + ev_emit_mod: Some(ConcurrentLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(5), + collect_interval_ms: Some(500), + watermark_interval_ms: None, + extra: Default::default(), + }), + ..Default::default() + }), ..Default::default() }; let that = PipelineLayer { - sum_coin_balances: Some(CommitterLayer { - write_concurrency: Some(10), - collect_interval_ms: None, - watermark_interval_ms: Some(1000), - extra: Default::default(), - }), - sum_obj_types: None, sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(5), @@ -657,6 +602,16 @@ mod tests { checkpoint_lag: Some(200), extra: Default::default(), }), + sum_packages: Some(SequentialLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(10), + collect_interval_ms: None, + watermark_interval_ms: Some(1000), + extra: Default::default(), + }), + ..Default::default() + }), + ev_emit_mod: None, ..Default::default() }; @@ -666,18 +621,6 @@ mod tests { assert_matches!( this_then_that, PipelineLayer { - sum_coin_balances: Some(CommitterLayer { - write_concurrency: Some(10), - collect_interval_ms: None, - watermark_interval_ms: Some(1000), - extra: _, - }), - sum_obj_types: Some(CommitterLayer { - write_concurrency: Some(5), - collect_interval_ms: Some(500), - watermark_interval_ms: None, - extra: _, - }), sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(5), @@ -688,6 +631,27 @@ mod tests { checkpoint_lag: Some(200), extra: _, }), + sum_packages: Some(SequentialLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(10), + collect_interval_ms: None, + watermark_interval_ms: Some(1000), + extra: _, + }), + checkpoint_lag: None, + extra: _, + }), + ev_emit_mod: Some(ConcurrentLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(5), + collect_interval_ms: Some(500), + watermark_interval_ms: None, + extra: _, + }), + pruner: None, + checkpoint_lag: None, + extra: _, + }), .. }, ); @@ -695,18 +659,6 @@ mod tests { assert_matches!( that_then_this, PipelineLayer { - sum_coin_balances: Some(CommitterLayer { - write_concurrency: Some(10), - collect_interval_ms: None, - watermark_interval_ms: Some(1000), - extra: _, - }), - sum_obj_types: Some(CommitterLayer { - write_concurrency: Some(5), - collect_interval_ms: Some(500), - watermark_interval_ms: None, - extra: _, - }), sum_displays: Some(SequentialLayer { committer: Some(CommitterLayer { write_concurrency: Some(10), @@ -717,6 +669,27 @@ mod tests { checkpoint_lag: Some(100), extra: _, }), + sum_packages: Some(SequentialLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(10), + collect_interval_ms: None, + watermark_interval_ms: Some(1000), + extra: _, + }), + checkpoint_lag: None, + extra: _, + }), + ev_emit_mod: Some(ConcurrentLayer { + committer: Some(CommitterLayer { + write_concurrency: Some(5), + collect_interval_ms: Some(500), + watermark_interval_ms: None, + extra: _, + }), + pruner: None, + checkpoint_lag: None, + extra: _, + }), .. }, ); diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index 3897d29838964..16fd82a9b56c4 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -15,9 +15,7 @@ pub(crate) mod kv_transactions; pub(crate) mod obj_info; pub(crate) mod obj_info_pruner; pub(crate) mod obj_versions; -pub(crate) mod sum_coin_balances; pub(crate) mod sum_displays; -pub(crate) mod sum_obj_types; pub(crate) mod sum_packages; pub(crate) mod tx_affected_addresses; pub(crate) mod tx_affected_objects; @@ -25,5 +23,3 @@ pub(crate) mod tx_balance_changes; pub(crate) mod tx_calls; pub(crate) mod tx_digests; pub(crate) mod tx_kinds; -pub(crate) mod wal_coin_balances; -pub(crate) mod wal_obj_types; diff --git a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs deleted file mode 100644 index 0e2a067440317..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::{ - collections::{btree_map::Entry, BTreeMap}, - sync::Arc, -}; - -use anyhow::{anyhow, bail, ensure}; -use diesel::{upsert::excluded, ExpressionMethods}; -use diesel_async::RunQueryDsl; -use futures::future::{try_join_all, Either}; -use sui_field_count::FieldCount; -use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumCoinBalance}, - schema::sum_coin_balances, -}; -use sui_pg_db as db; -use sui_types::{ - base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, - object::Owner, -}; - -const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumCoinBalance::FIELD_COUNT; -const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; - -pub(crate) struct SumCoinBalances; - -impl Processor for SumCoinBalances { - const NAME: &'static str = "sum_coin_balances"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> anyhow::Result> { - let CheckpointData { - transactions, - checkpoint_summary, - .. - } = checkpoint.as_ref(); - - let cp_sequence_number = checkpoint_summary.sequence_number; - let mut values: BTreeMap = BTreeMap::new(); - let mut coin_types: BTreeMap> = BTreeMap::new(); - - // Iterate over transactions in reverse so we see the latest version of each object first. - for tx in transactions.iter().rev() { - // Find all coins in the transaction's inputs and outputs. - for object in tx.input_objects.iter().chain(tx.output_objects.iter()) { - if let Some(coin_type) = object.type_().and_then(|t| t.coin_type_maybe()) { - let serialized = bcs::to_bytes(&coin_type) - .map_err(|_| anyhow!("Failed to serialize type for {}", object.id()))?; - - coin_types.insert(object.id(), serialized); - } - } - - // Deleted and wrapped coins - for change in tx.effects.object_changes() { - // The object is not deleted/wrapped, or if it is it was unwrapped in the same - // transaction. - if change.output_digest.is_some() || change.input_version.is_none() { - continue; - } - - // Object is not a coin - if !coin_types.contains_key(&change.id) { - continue; - } - - let object_id = change.id; - let object_version = tx.effects.lamport_version().value(); - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: None, - }); - } - } - } - - // Modified and created coins. - for object in &tx.output_objects { - let object_id = object.id(); - let object_version = object.version().value(); - - let Some(coin_type) = coin_types.get(&object_id) else { - continue; - }; - - // Coin balance only tracks address-owned objects - let Owner::AddressOwner(owner_id) = object.owner() else { - continue; - }; - - let Some(coin) = object.as_coin_maybe() else { - bail!("Failed to deserialize Coin for {object_id}"); - }; - - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: Some(StoredSumCoinBalance { - object_id: object_id.to_vec(), - object_version: object_version as i64, - owner_id: owner_id.to_vec(), - coin_type: coin_type.clone(), - coin_balance: coin.balance.value() as i64, - }), - }); - } - } - } - } - - Ok(values.into_values().collect()) - } -} - -#[async_trait::async_trait] -impl Handler for SumCoinBalances { - type Batch = BTreeMap; - - fn batch(batch: &mut Self::Batch, updates: Vec) { - // `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them - // will result in the batch containing the most up-to-date update for each object. - for update in updates { - batch.insert(update.object_id, update); - } - } - - async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result { - let mut updates = vec![]; - let mut deletes = vec![]; - - for update in batch.values() { - if let Some(update) = &update.update { - updates.push(update.clone()); - } else { - deletes.push(update.object_id.to_vec()); - } - } - let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left); - let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right); - - let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk { - Either::Left(update) => Either::Left( - diesel::insert_into(sum_coin_balances::table) - .values(update) - .on_conflict(sum_coin_balances::object_id) - .do_update() - .set(( - sum_coin_balances::object_version - .eq(excluded(sum_coin_balances::object_version)), - sum_coin_balances::owner_id.eq(excluded(sum_coin_balances::owner_id)), - sum_coin_balances::coin_balance - .eq(excluded(sum_coin_balances::coin_balance)), - )) - .execute(conn), - ), - - Either::Right(delete) => Either::Right( - diesel::delete(sum_coin_balances::table) - .filter(sum_coin_balances::object_id.eq_any(delete.iter().cloned())) - .execute(conn), - ), - }); - - Ok(try_join_all(futures).await?.into_iter().sum()) - } -} diff --git a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs deleted file mode 100644 index 84e61e2e01523..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::{ - collections::{btree_map::Entry, BTreeMap}, - sync::Arc, -}; - -use anyhow::{anyhow, ensure}; -use diesel::{upsert::excluded, ExpressionMethods}; -use diesel_async::RunQueryDsl; -use futures::future::{try_join_all, Either}; -use sui_field_count::FieldCount; -use sui_indexer_alt_framework::pipeline::{sequential::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredOwnerKind, StoredSumObjType}, - schema::sum_obj_types, -}; -use sui_pg_db as db; -use sui_types::{ - base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, - object::Owner, -}; - -const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumObjType::FIELD_COUNT; -const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; - -pub(crate) struct SumObjTypes; - -impl Processor for SumObjTypes { - const NAME: &'static str = "sum_obj_types"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> anyhow::Result> { - let CheckpointData { - transactions, - checkpoint_summary, - .. - } = checkpoint.as_ref(); - - let cp_sequence_number = checkpoint_summary.sequence_number; - let mut values: BTreeMap = BTreeMap::new(); - - // Iterate over transactions in reverse so we see the latest version of each object first. - for tx in transactions.iter().rev() { - // Deleted and wrapped objects -- objects that show up without a digest in - // `object_changes` are either deleted or wrapped. Objects without an input version - // must have been unwrapped and deleted, meaning they do not need to be deleted from - // our records. - for change in tx.effects.object_changes() { - if change.output_digest.is_some() || change.input_version.is_none() { - continue; - } - - let object_id = change.id; - let object_version = tx.effects.lamport_version().value(); - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: None, - }); - } - } - } - - // Modified and created objects. - for object in &tx.output_objects { - let object_id = object.id(); - let object_version = object.version().value(); - match values.entry(object_id) { - Entry::Occupied(entry) => { - ensure!(entry.get().object_version > object_version); - } - - Entry::Vacant(entry) => { - let type_ = object.type_(); - entry.insert(StoredObjectUpdate { - object_id, - object_version, - cp_sequence_number, - update: Some(StoredSumObjType { - object_id: object_id.to_vec(), - object_version: object_version as i64, - - owner_kind: match object.owner() { - Owner::AddressOwner(_) => StoredOwnerKind::Address, - Owner::ObjectOwner(_) => StoredOwnerKind::Object, - Owner::Shared { .. } => StoredOwnerKind::Shared, - Owner::Immutable => StoredOwnerKind::Immutable, - // TODO: Implement support for ConsensusV2 objects. - Owner::ConsensusV2 { .. } => todo!(), - }, - - owner_id: match object.owner() { - Owner::AddressOwner(a) => Some(a.to_vec()), - Owner::ObjectOwner(o) => Some(o.to_vec()), - _ => None, - }, - - package: type_.map(|t| t.address().to_vec()), - module: type_.map(|t| t.module().to_string()), - name: type_.map(|t| t.name().to_string()), - instantiation: type_ - .map(|t| bcs::to_bytes(&t.type_params())) - .transpose() - .map_err(|e| { - anyhow!( - "Failed to serialize type parameters for {}: {e}", - object - .id() - .to_canonical_display(/* with_prefix */ true), - ) - })?, - }), - }); - } - } - } - } - - Ok(values.into_values().collect()) - } -} - -#[async_trait::async_trait] -impl Handler for SumObjTypes { - type Batch = BTreeMap; - - fn batch(batch: &mut Self::Batch, updates: Vec) { - // `updates` are guaranteed to be provided in checkpoint order, so blindly inserting them - // will result in the batch containing the most up-to-date update for each object. - for update in updates { - batch.insert(update.object_id, update); - } - } - - async fn commit(values: &Self::Batch, conn: &mut db::Connection<'_>) -> anyhow::Result { - let mut updates = vec![]; - let mut deletes = vec![]; - - for update in values.values() { - if let Some(update) = &update.update { - updates.push(update.clone()); - } else { - deletes.push(update.object_id.to_vec()); - } - } - - let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left); - let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right); - - let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk { - Either::Left(update) => Either::Left( - diesel::insert_into(sum_obj_types::table) - .values(update) - .on_conflict(sum_obj_types::object_id) - .do_update() - .set(( - sum_obj_types::object_version.eq(excluded(sum_obj_types::object_version)), - sum_obj_types::owner_kind.eq(excluded(sum_obj_types::owner_kind)), - sum_obj_types::owner_id.eq(excluded(sum_obj_types::owner_id)), - )) - .execute(conn), - ), - - Either::Right(delete) => Either::Right( - diesel::delete(sum_obj_types::table) - .filter(sum_obj_types::object_id.eq_any(delete.iter().cloned())) - .execute(conn), - ), - }); - - Ok(try_join_all(futures).await?.into_iter().sum()) - } -} diff --git a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs deleted file mode 100644 index 849b2d1faed7f..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::Result; -use diesel::{ExpressionMethods, QueryDsl}; -use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance}, - schema::wal_coin_balances, -}; -use sui_pg_db as db; -use sui_types::full_checkpoint_content::CheckpointData; - -use super::sum_coin_balances::SumCoinBalances; - -pub(crate) struct WalCoinBalances; - -impl Processor for WalCoinBalances { - const NAME: &'static str = "wal_coin_balances"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> Result> { - SumCoinBalances.process(checkpoint) - } -} - -#[async_trait::async_trait] -impl Handler for WalCoinBalances { - const MIN_EAGER_ROWS: usize = 100; - const MAX_PENDING_ROWS: usize = 10000; - - async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { - let values: Vec<_> = values - .iter() - .map(|value| StoredWalCoinBalance { - object_id: value.object_id.to_vec(), - object_version: value.object_version as i64, - - owner_id: value.update.as_ref().map(|o| o.owner_id.clone()), - - coin_type: value.update.as_ref().map(|o| o.coin_type.clone()), - coin_balance: value.update.as_ref().map(|o| o.coin_balance), - - cp_sequence_number: value.cp_sequence_number as i64, - }) - .collect(); - - Ok(diesel::insert_into(wal_coin_balances::table) - .values(&values) - .on_conflict_do_nothing() - .execute(conn) - .await?) - } - - async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { - let filter = wal_coin_balances::table - .filter(wal_coin_balances::cp_sequence_number.between(from as i64, to as i64 - 1)); - - Ok(diesel::delete(filter).execute(conn).await?) - } -} diff --git a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs deleted file mode 100644 index c0a6feb51cd24..0000000000000 --- a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use std::sync::Arc; - -use anyhow::Result; -use diesel::{ExpressionMethods, QueryDsl}; -use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; -use sui_indexer_alt_schema::{ - objects::{StoredObjectUpdate, StoredSumObjType, StoredWalObjType}, - schema::wal_obj_types, -}; -use sui_pg_db as db; -use sui_types::full_checkpoint_content::CheckpointData; - -use super::sum_obj_types::SumObjTypes; - -pub(crate) struct WalObjTypes; - -impl Processor for WalObjTypes { - const NAME: &'static str = "wal_obj_types"; - - type Value = StoredObjectUpdate; - - fn process(&self, checkpoint: &Arc) -> Result> { - SumObjTypes.process(checkpoint) - } -} - -#[async_trait::async_trait] -impl Handler for WalObjTypes { - const MIN_EAGER_ROWS: usize = 100; - const MAX_PENDING_ROWS: usize = 10000; - - async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { - let values: Vec<_> = values - .iter() - .map(|value| StoredWalObjType { - object_id: value.object_id.to_vec(), - object_version: value.object_version as i64, - - owner_kind: value.update.as_ref().map(|o| o.owner_kind), - owner_id: value.update.as_ref().and_then(|o| o.owner_id.clone()), - - package: value.update.as_ref().and_then(|o| o.package.clone()), - module: value.update.as_ref().and_then(|o| o.module.clone()), - name: value.update.as_ref().and_then(|o| o.name.clone()), - instantiation: value.update.as_ref().and_then(|o| o.instantiation.clone()), - - cp_sequence_number: value.cp_sequence_number as i64, - }) - .collect(); - - Ok(diesel::insert_into(wal_obj_types::table) - .values(&values) - .on_conflict_do_nothing() - .execute(conn) - .await?) - } - - async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { - let filter = wal_obj_types::table - .filter(wal_obj_types::cp_sequence_number.between(from as i64, to as i64 - 1)); - - Ok(diesel::delete(filter).execute(conn).await?) - } -} diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 4889d99bc83e2..15f41c9ef3b06 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -11,11 +11,10 @@ use handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, kv_epoch_ends::KvEpochEnds, kv_epoch_starts::KvEpochStarts, kv_feature_flags::KvFeatureFlags, kv_objects::KvObjects, kv_protocol_configs::KvProtocolConfigs, kv_transactions::KvTransactions, - obj_info::ObjInfo, obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, - sum_displays::SumDisplays, sum_obj_types::SumObjTypes, sum_packages::SumPackages, - tx_affected_addresses::TxAffectedAddresses, tx_affected_objects::TxAffectedObjects, - tx_balance_changes::TxBalanceChanges, tx_calls::TxCalls, tx_digests::TxDigests, - tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, + obj_info::ObjInfo, obj_versions::ObjVersions, sum_displays::SumDisplays, + sum_packages::SumPackages, tx_affected_addresses::TxAffectedAddresses, + tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, + tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds, }; use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig}; use sui_indexer_alt_framework::pipeline::{ @@ -57,10 +56,6 @@ pub async fn start_indexer( } = indexer_config.finish(); let PipelineLayer { - sum_coin_balances, - wal_coin_balances, - sum_obj_types, - wal_obj_types, sum_displays, sum_packages, obj_info, @@ -87,29 +82,10 @@ pub async fn start_indexer( } = pipeline.finish(); let ingestion = ingestion.finish(IngestionConfig::default()); - - let ConsistencyConfig { - consistent_pruning_interval_ms, - pruner_delay_ms, - consistent_range, - } = consistency.finish(ConsistencyConfig::default()); - + let consistency = consistency.finish(ConsistencyConfig::default()); let committer = committer.finish(CommitterConfig::default()); let pruner = pruner.finish(PrunerConfig::default()); - // Pipelines that are split up into a summary table, and a write-ahead log prune their - // write-ahead log so it contains just enough information to overlap with the summary table. - let consistent_range = consistent_range.unwrap_or_default(); - let pruner_config = (consistent_range != 0).then(|| PrunerConfig { - interval_ms: consistent_pruning_interval_ms, - delay_ms: pruner_delay_ms, - // Retain at least twice as much data as the lag, to guarantee overlap between the - // summary table and the write-ahead log. - retention: consistent_range * 2, - // Prune roughly five minutes of data in one go. - max_chunk_size: 5 * 300, - }); - let cancel = CancellationToken::new(); let retry_interval = ingestion.retry_interval(); @@ -171,40 +147,10 @@ pub async fn start_indexer( }; } + // A consistent pipeline consists of two concurrent pipelines. The first (main) one writes + // new data, and the second one lags behind deleting data that has fallen out of the consistent + // range. macro_rules! add_consistent { - ($sum_handler:expr, $sum_config:expr; $wal_handler:expr, $wal_config:expr) => { - if let Some(sum_layer) = $sum_config { - indexer - .sequential_pipeline( - $sum_handler, - SequentialConfig { - committer: sum_layer.finish(committer.clone()), - checkpoint_lag: consistent_range, - }, - ) - .await?; - - if let Some(pruner_config) = pruner_config.clone() { - indexer - .concurrent_pipeline( - $wal_handler, - ConcurrentConfig { - committer: $wal_config - .unwrap_or_default() - .finish(committer.clone()), - pruner: Some(pruner_config), - checkpoint_lag: None, - }, - ) - .await?; - } - } - }; - } - - // Add two concurrent pipelines, one as the main pipeline, and one as a lagged pruner. - // The lagged pruner will prune the main pipeline's data based on the consistency range. - macro_rules! add_concurrent_with_lagged_pruner { ($main_handler:expr, $main_config:expr; $lagged_handler:expr, $lagged_config:expr) => { if let Some(main_layer) = $main_config { indexer @@ -224,7 +170,7 @@ pub async fn start_indexer( $lagged_config.unwrap_or_default().finish(ConcurrentConfig { committer: committer.clone(), pruner: None, - checkpoint_lag: Some(consistent_range), + checkpoint_lag: Some(consistency.consistent_range), }), ) .await?; @@ -240,26 +186,16 @@ pub async fn start_indexer( add_concurrent!(KvProtocolConfigs(genesis.clone()), kv_protocol_configs); } - add_consistent!( - SumCoinBalances, sum_coin_balances; - WalCoinBalances, wal_coin_balances - ); - - add_consistent!( - SumObjTypes, sum_obj_types; - WalObjTypes, wal_obj_types - ); - // Other summary tables (without write-ahead log) add_sequential!(SumDisplays, sum_displays); add_sequential!(SumPackages, sum_packages); - add_concurrent_with_lagged_pruner!( + add_consistent!( ObjInfo, obj_info; ObjInfoPruner, obj_info_pruner ); - add_concurrent_with_lagged_pruner!( + add_consistent!( CoinBalanceBuckets, coin_balance_buckets; CoinBalanceBucketsPruner, coin_balance_buckets_pruner );