Skip to content

Commit

Permalink
feat(mito): add bloom filter read metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 26, 2024
1 parent 0cf44e1 commit f86c3ab
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ impl PruneReader {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter;
self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};

// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows;
self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;

if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
Expand Down
138 changes: 77 additions & 61 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ impl ParquetReaderBuilder {
return BTreeMap::default();
}

metrics.num_row_groups_before_filtering += num_row_groups;
metrics.num_rows_in_row_group_before_filtering += num_rows as usize;
metrics.rg_total += num_row_groups;
metrics.rows_total += num_rows as usize;

let mut output = (0..num_row_groups).map(|i| (i, None)).collect();

Expand Down Expand Up @@ -398,7 +398,7 @@ impl ParquetReaderBuilder {
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply full-text index, region_id: {}, file_id: {}, err: {}",
"Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
Expand All @@ -420,8 +420,8 @@ impl ParquetReaderBuilder {
parquet_meta,
row_group_to_row_ids,
output,
&mut metrics.num_row_groups_fulltext_index_filtered,
&mut metrics.num_rows_in_row_group_fulltext_index_filtered,
&mut metrics.rg_fulltext_filtered,
&mut metrics.rows_fulltext_filtered,
);

true
Expand Down Expand Up @@ -482,7 +482,7 @@ impl ParquetReaderBuilder {
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply inverted index, region_id: {}, file_id: {}, err: {}",
"Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
Expand Down Expand Up @@ -521,8 +521,8 @@ impl ParquetReaderBuilder {
parquet_meta,
ranges_in_row_groups,
output,
&mut metrics.num_row_groups_inverted_index_filtered,
&mut metrics.num_rows_in_row_group_inverted_index_filtered,
&mut metrics.rg_inverted_filtered,
&mut metrics.rows_inverted_filtered,
);

true
Expand Down Expand Up @@ -564,7 +564,7 @@ impl ParquetReaderBuilder {
.collect::<BTreeMap<_, _>>();

let row_groups_after = res.len();
metrics.num_row_groups_min_max_filtered += row_groups_before - row_groups_after;
metrics.rg_minmax_filtered += row_groups_before - row_groups_after;

*output = res;
true
Expand Down Expand Up @@ -627,7 +627,7 @@ impl ParquetReaderBuilder {
&self,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
_metrics: &mut ReaderFilterMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.bloom_filter_index_applier else {
return false;
Expand All @@ -637,8 +637,10 @@ impl ParquetReaderBuilder {
return false;
}

let before_rg = output.len();

let file_size_hint = self.file_handle.meta_ref().bloom_filter_index_size();
match index_applier
if let Err(err) = index_applier
.apply(
self.file_handle.file_id(),
file_size_hint,
Expand All @@ -647,26 +649,27 @@ impl ParquetReaderBuilder {
)
.await
{
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}

return false;
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}

return false;
};

let after_rg = output.len();
// Update metrics.
metrics.rg_bloom_filtered += before_rg - after_rg;

true
}

Expand Down Expand Up @@ -727,64 +730,77 @@ impl ParquetReaderBuilder {
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) num_row_groups_before_filtering: usize,
pub(crate) rg_total: usize,
/// Number of row groups filtered by fulltext index.
pub(crate) num_row_groups_fulltext_index_filtered: usize,
pub(crate) rg_fulltext_filtered: usize,
/// Number of row groups filtered by inverted index.
pub(crate) num_row_groups_inverted_index_filtered: usize,
pub(crate) rg_inverted_filtered: usize,
/// Number of row groups filtered by min-max index.
pub(crate) num_row_groups_min_max_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) num_rows_precise_filtered: usize,
pub(crate) rg_minmax_filtered: usize,
/// Number of row groups filtered by bloom filter index.
pub(crate) rg_bloom_filtered: usize,

/// Number of rows in row group before filtering.
pub(crate) num_rows_in_row_group_before_filtering: usize,
pub(crate) rows_total: usize,
/// Number of rows in row group filtered by fulltext index.
pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize,
pub(crate) rows_fulltext_filtered: usize,
/// Number of rows in row group filtered by inverted index.
pub(crate) num_rows_in_row_group_inverted_index_filtered: usize,
pub(crate) rows_inverted_filtered: usize,
/// Number of rows in row group filtered by bloom filter index.
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
}

impl ReaderFilterMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
self.rg_total += other.rg_total;
self.rg_fulltext_filtered += other.rg_fulltext_filtered;
self.rg_inverted_filtered += other.rg_inverted_filtered;
self.rg_minmax_filtered += other.rg_minmax_filtered;
self.rg_bloom_filtered += other.rg_bloom_filtered;

self.rows_total += other.rows_total;
self.rows_fulltext_filtered += other.rows_fulltext_filtered;
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
}

/// Reports metrics.
pub(crate) fn observe(&self) {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_row_groups_before_filtering as u64);
.inc_by(self.rg_total as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_row_groups_fulltext_index_filtered as u64);
.inc_by(self.rg_fulltext_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_row_groups_inverted_index_filtered as u64);
.inc_by(self.rg_inverted_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.num_row_groups_min_max_filtered as u64);
.inc_by(self.rg_minmax_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rg_bloom_filtered as u64);

PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.num_rows_precise_filtered as u64);
.inc_by(self.rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_rows_in_row_group_before_filtering as u64);
.inc_by(self.rows_total as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64);
.inc_by(self.rows_fulltext_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64);
.inc_by(self.rows_inverted_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["bloom_filter_index_filtered"])
.inc_by(self.rows_bloom_filtered as u64);
}
}

Expand Down Expand Up @@ -1040,12 +1056,12 @@ impl Drop for ParquetReader {
self.context.reader_builder().file_handle.region_id(),
self.context.reader_builder().file_handle.file_id(),
self.context.reader_builder().file_handle.time_range(),
metrics.filter_metrics.num_row_groups_before_filtering
- metrics
.filter_metrics
.num_row_groups_inverted_index_filtered
- metrics.filter_metrics.num_row_groups_min_max_filtered,
metrics.filter_metrics.num_row_groups_before_filtering,
metrics.filter_metrics.rg_total
- metrics.filter_metrics.rg_inverted_filtered
- metrics.filter_metrics.rg_minmax_filtered
- metrics.filter_metrics.rg_fulltext_filtered
- metrics.filter_metrics.rg_bloom_filtered,
metrics.filter_metrics.rg_total,
metrics
);

Expand Down

0 comments on commit f86c3ab

Please sign in to comment.