Skip to content

Commit

Permalink
[indexer-alt] Do not prune in the case of object creation (#20641)
Browse files Browse the repository at this point in the history
## Description 

We do not need to prune anything when we create or unwrap objects in a
checkpoint, since we know that the object did not exist prior to this
checkpoint in the table.
This saves a lot of unnecessary pruning requests.

## Test plan 

Deployed in prod.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Dec 17, 2024
1 parent 5577878 commit 3c99eb4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 37 deletions.
5 changes: 3 additions & 2 deletions crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl TryInto<StoredCoinBalanceBucket> for &ProcessedCoinBalanceBucket {

/// Get the owner kind and address of a coin, if it is owned by a single address,
/// either through fast-path ownership or ConsensusV2 ownership.
fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> {
pub(crate) fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)> {
match object.owner() {
Owner::AddressOwner(owner_id) => Some((StoredCoinOwnerKind::Fastpath, *owner_id)),
Owner::ConsensusV2 { authenticator, .. } => Some((
Expand All @@ -198,8 +198,9 @@ fn get_coin_owner(object: &Object) -> Option<(StoredCoinOwnerKind, SuiAddress)>
}
}

fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result<i16> {
pub(crate) fn get_coin_balance_bucket(coin: &Object) -> anyhow::Result<i16> {
let Some(coin) = coin.as_coin_maybe() else {
// TODO: We should make this an invariant violation.
bail!("Failed to deserialize Coin for {}", coin.id());
};
let balance = coin.balance.value();
Expand Down
67 changes: 54 additions & 13 deletions crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,62 @@ use std::{collections::BTreeMap, sync::Arc};
use anyhow::Result;
use diesel::sql_query;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData};

use super::coin_balance_buckets::{
CoinBalanceBucketChangeKind, CoinBalanceBuckets, ProcessedCoinBalanceBucket,
};
use super::coin_balance_buckets::{get_coin_balance_bucket, get_coin_owner};

pub(crate) struct CoinBalanceBucketsPruner;

pub(crate) struct CoinBalanceBucketToBePruned {
pub object_id: ObjectID,
pub cp_sequence_number_exclusive: u64,
}

impl Processor for CoinBalanceBucketsPruner {
const NAME: &'static str = "coin_balance_buckets_pruner";
type Value = ProcessedCoinBalanceBucket;
type Value = CoinBalanceBucketToBePruned;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
CoinBalanceBuckets.process(checkpoint)
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
let checkpoint_input_objects = checkpoint.checkpoint_input_objects();
let latest_live_output_objects: BTreeMap<_, _> = checkpoint
.latest_live_output_objects()
.into_iter()
.map(|o| (o.id(), o))
.collect();
let mut values = Vec::new();
for (object_id, input_object) in checkpoint_input_objects {
// This loop processes all coins that were owned by a single address prior to the checkpoint,
// but is now deleted/wrapped, or changed owner or coin balance bucket the checkpoint.
if !input_object.is_coin() {
continue;
}
let Some(input_coin_owner) = get_coin_owner(input_object) else {
continue;
};
let input_coin_balance_bucket = get_coin_balance_bucket(input_object)?;
if let Some(output_object) = latest_live_output_objects.get(&object_id) {
let output_coin_owner = get_coin_owner(output_object);
let output_coin_balance_bucket = get_coin_balance_bucket(output_object)?;
if (output_coin_owner, output_coin_balance_bucket)
!= (Some(input_coin_owner), input_coin_balance_bucket)
{
values.push(CoinBalanceBucketToBePruned {
object_id,
cp_sequence_number_exclusive: cp_sequence_number,
});
}
} else {
values.push(CoinBalanceBucketToBePruned {
object_id,
cp_sequence_number_exclusive: cp_sequence_number + 1,
});
}
}
Ok(values)
}
}

Expand All @@ -32,13 +72,8 @@ impl Handler for CoinBalanceBucketsPruner {

let mut to_prune = BTreeMap::new();
for v in values {
let object_id = v.object_id;
let cp_sequence_number_exclusive = match v.change {
CoinBalanceBucketChangeKind::Insert { .. } => v.cp_sequence_number,
CoinBalanceBucketChangeKind::Delete => v.cp_sequence_number + 1,
} as i64;
let cp = to_prune.entry(object_id).or_default();
*cp = std::cmp::max(*cp, cp_sequence_number_exclusive);
let cp = to_prune.entry(v.object_id).or_default();
*cp = std::cmp::max(*cp, v.cp_sequence_number_exclusive);
}
let values = to_prune
.iter()
Expand Down Expand Up @@ -66,3 +101,9 @@ impl Handler for CoinBalanceBucketsPruner {
Ok(rows_deleted)
}
}

impl FieldCount for CoinBalanceBucketToBePruned {
// This does not really matter since we are not limited by postgres' bound variable limit, because
// we don't bind parameters in the deletion statement.
const FIELD_COUNT: usize = 1;
}
9 changes: 0 additions & 9 deletions crates/sui-indexer-alt/src/handlers/obj_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ impl Handler for ObjInfo {
}
}

impl ProcessedObjInfo {
pub fn object_id(&self) -> ObjectID {
match &self.update {
ProcessedObjInfoUpdate::Insert(object) => object.id(),
ProcessedObjInfoUpdate::Delete(object_id) => *object_id,
}
}
}

impl FieldCount for ProcessedObjInfo {
const FIELD_COUNT: usize = StoredObjInfo::FIELD_COUNT;
}
Expand Down
59 changes: 46 additions & 13 deletions crates/sui-indexer-alt/src/handlers/obj_info_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,51 @@ use std::{collections::BTreeMap, sync::Arc};
use anyhow::Result;
use diesel::sql_query;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;

use super::obj_info::{ObjInfo, ProcessedObjInfo, ProcessedObjInfoUpdate};
use sui_types::{base_types::ObjectID, full_checkpoint_content::CheckpointData};

pub(crate) struct ObjInfoPruner;

pub(crate) struct ObjInfoToBePruned {
pub object_id: ObjectID,
pub cp_sequence_number_exclusive: u64,
}

impl Processor for ObjInfoPruner {
const NAME: &'static str = "obj_info_pruner";
type Value = ProcessedObjInfo;
type Value = ObjInfoToBePruned;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
ObjInfo.process(checkpoint)
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number;
let checkpoint_input_objects = checkpoint.checkpoint_input_objects();
let latest_live_output_objects = checkpoint
.latest_live_output_objects()
.into_iter()
.map(|o| (o.id(), o))
.collect::<BTreeMap<_, _>>();
let mut values = Vec::with_capacity(checkpoint_input_objects.len());
// We only need to prune if an object is removed, or its owner changed.
// We do not need to prune when an object is created or unwrapped, since there would have not
// been an entry for it in the table prior to this checkpoint.
// This makes the logic different from the one in obj_info.rs.
for (object_id, input_object) in checkpoint_input_objects {
if let Some(output_object) = latest_live_output_objects.get(&object_id) {
if output_object.owner() != input_object.owner() {
values.push(ObjInfoToBePruned {
object_id,
cp_sequence_number_exclusive: cp_sequence_number,
});
}
} else {
values.push(ObjInfoToBePruned {
object_id,
cp_sequence_number_exclusive: cp_sequence_number + 1,
});
}
}
Ok(values)
}
}

Expand All @@ -30,16 +61,12 @@ impl Handler for ObjInfoPruner {

// For each (object_id, cp_sequence_number_exclusive), delete all entries in obj_info with
// cp_sequence_number less than cp_sequence_number_exclusive that match the object_id.
// For each object_id, we first get the highest cp_sequence_number_exclusive.

// Minor optimization:For each object_id, we first get the highest cp_sequence_number_exclusive.
let mut to_prune = BTreeMap::new();
for v in values {
let object_id = v.object_id();
let cp_sequence_number_exclusive = match v.update {
ProcessedObjInfoUpdate::Insert(_) => v.cp_sequence_number,
ProcessedObjInfoUpdate::Delete(_) => v.cp_sequence_number + 1,
} as i64;
let cp = to_prune.entry(object_id).or_default();
*cp = std::cmp::max(*cp, cp_sequence_number_exclusive);
let cp = to_prune.entry(v.object_id).or_default();
*cp = std::cmp::max(*cp, v.cp_sequence_number_exclusive);
}
let values = to_prune
.iter()
Expand Down Expand Up @@ -67,3 +94,9 @@ impl Handler for ObjInfoPruner {
Ok(rows_deleted)
}
}

impl FieldCount for ObjInfoToBePruned {
// This does not really matter since we are not limited by postgres' bound variable limit, because
// we don't bind parameters in the deletion statement.
const FIELD_COUNT: usize = 1;
}

0 comments on commit 3c99eb4

Please sign in to comment.