Skip to content

Commit

Permalink
fix: data_length, index_length, table_rows in tables (#4927)
Browse files Browse the repository at this point in the history
* fix: data_length, index_length, table_rows in tables

* feat: table stats only works for mito engine currently

* fix: tests

* fix: typo

* chore: log error when region_stats fails
  • Loading branch information
killme2008 authored Nov 4, 2024
1 parent 1676d02 commit 191755f
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 34 deletions.
7 changes: 7 additions & 0 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ pub enum Error {
location: Location,
},

#[snafu(display("Partition manager not found, it's not expected."))]
PartitionManagerNotFound {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },

Expand Down Expand Up @@ -301,6 +307,7 @@ impl ErrorExt for Error {
| Error::CastManager { .. }
| Error::Json { .. }
| Error::GetInformationExtension { .. }
| Error::PartitionManagerNotFound { .. }
| Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected,

Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
Expand Down
35 changes: 9 additions & 26 deletions src/catalog/src/system_schema/information_schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ use datatypes::vectors::{
};
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, ScanRequest, TableId};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

use super::PARTITIONS;
use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu,
Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::system_schema::information_schema::{InformationTable, Predicates};
Expand Down Expand Up @@ -236,7 +235,8 @@ impl InformationSchemaPartitionsBuilder {
let partition_manager = catalog_manager
.as_any()
.downcast_ref::<KvBackendCatalogManager>()
.map(|catalog_manager| catalog_manager.partition_manager());
.map(|catalog_manager| catalog_manager.partition_manager())
.context(PartitionManagerNotFoundSnafu)?;

let predicates = Predicates::from_scan_request(&request);

Expand All @@ -262,27 +262,10 @@ impl InformationSchemaPartitionsBuilder {
let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect();

let mut table_partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};
let mut table_partitions = partition_manager
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu)?;

for table_info in table_infos {
let partitions = table_partitions
Expand Down
74 changes: 66 additions & 8 deletions src/catalog/src/system_schema/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID;
use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE};
use common_error::ext::BoxedError;
use common_meta::datanode::RegionStat;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::error;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
Expand All @@ -31,14 +34,15 @@ use datatypes::vectors::{
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

use super::TABLES;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::system_schema::information_schema::{InformationTable, Predicates};
use crate::system_schema::utils;
use crate::CatalogManager;

pub const TABLE_CATALOG: &str = "table_catalog";
Expand Down Expand Up @@ -234,17 +238,50 @@ impl InformationSchemaTablesBuilder {
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

let information_extension = utils::information_extension(&self.catalog_manager)?;

// TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc.
// But we don't want the statements such as `show tables` fail,
// so using `unwrap_or_else` here instead of `?` operator.
let region_stats = information_extension
.region_stats()
.await
.map_err(|e| {
error!(e; "Failed to call region_stats");
e
})
.unwrap_or_else(|_| vec![]);

for schema_name in catalog_manager.schema_names(&catalog_name, None).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None);

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();

// TODO(dennis): make it working for metric engine
let table_region_stats = if table_info.meta.engine == MITO_ENGINE {
let region_ids = table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.collect::<HashSet<_>>();

region_stats
.iter()
.filter(|stat| region_ids.contains(&stat.id))
.collect::<Vec<_>>()
} else {
vec![]
};

self.add_table(
&predicates,
&catalog_name,
&schema_name,
table_info,
table.table_type(),
&table_region_stats,
);
}
}
Expand All @@ -260,6 +297,7 @@ impl InformationSchemaTablesBuilder {
schema_name: &str,
table_info: Arc<TableInfo>,
table_type: TableType,
region_stats: &[&RegionStat],
) {
let table_name = table_info.name.as_ref();
let table_id = table_info.table_id();
Expand All @@ -273,7 +311,9 @@ impl InformationSchemaTablesBuilder {

let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_ID, &Value::from(table_id)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(ENGINE, &Value::from(engine)),
(TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type_text)),
];
Expand All @@ -287,21 +327,39 @@ impl InformationSchemaTablesBuilder {
self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type_text));
self.table_ids.push(Some(table_id));

let data_length = region_stats.iter().map(|stat| stat.sst_size).sum();
let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum();
let index_length = region_stats.iter().map(|stat| stat.index_size).sum();

// It's not precise, but it is acceptable for long-term data storage.
let avg_row_length = if table_rows > 0 {
let total_data_length = data_length
+ region_stats
.iter()
.map(|stat| stat.memtable_size)
.sum::<u64>();

total_data_length / table_rows
} else {
0
};

self.data_length.push(Some(data_length));
self.index_length.push(Some(index_length));
self.table_rows.push(Some(table_rows));
self.avg_row_length.push(Some(avg_row_length));

// TODO(sunng87): use real data for these fields
self.data_length.push(Some(0));
self.max_data_length.push(Some(0));
self.index_length.push(Some(0));
self.avg_row_length.push(Some(0));
self.max_index_length.push(Some(0));
self.checksum.push(Some(0));
self.table_rows.push(Some(0));
self.max_index_length.push(Some(0));
self.data_free.push(Some(0));
self.auto_increment.push(Some(0));
self.row_format.push(Some("Fixed"));
self.table_collation.push(Some("utf8_bin"));
self.update_time.push(None);
self.check_time.push(None);

// use mariadb default table version number here
self.version.push(Some(11));
self.table_comment.push(table_info.desc.as_deref());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
| 3 | 2145 | 0 | 0 |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+

SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

+-------------+--------------+----------------+------------+
| data_length | index_length | avg_row_length | table_rows |
+-------------+--------------+----------------+------------+
| 0 | 0 | 26 | 3 |
+-------------+--------------+----------------+------------+

DROP TABLE test;

Affected Rows: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id
IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public');

SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';

DROP TABLE test;

0 comments on commit 191755f

Please sign in to comment.