diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 0d9e96ab6a44..c7e6f8b55c01 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -178,6 +178,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Partition manager not found, it's not expected."))] + PartitionManagerNotFound { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to find table partitions"))] FindPartitions { source: partition::error::Error }, @@ -301,6 +307,7 @@ impl ErrorExt for Error { | Error::CastManager { .. } | Error::Json { .. } | Error::GetInformationExtension { .. } + | Error::PartitionManagerNotFound { .. } | Error::ProcedureIdNotFound { .. } => StatusCode::Unexpected, Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments, diff --git a/src/catalog/src/system_schema/information_schema/partitions.rs b/src/catalog/src/system_schema/information_schema/partitions.rs index 93d60679901e..4cfeece62637 100644 --- a/src/catalog/src/system_schema/information_schema/partitions.rs +++ b/src/catalog/src/system_schema/information_schema/partitions.rs @@ -34,15 +34,14 @@ use datatypes::vectors::{ }; use futures::{StreamExt, TryStreamExt}; use partition::manager::PartitionInfo; -use partition::partition::PartitionDef; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, ScanRequest, TableId}; +use store_api::storage::{ScanRequest, TableId}; use table::metadata::{TableInfo, TableType}; use super::PARTITIONS; use crate::error::{ - CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result, - UpgradeWeakCatalogManagerRefSnafu, + CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, PartitionManagerNotFoundSnafu, + Result, UpgradeWeakCatalogManagerRefSnafu, }; use crate::kvbackend::KvBackendCatalogManager; use crate::system_schema::information_schema::{InformationTable, Predicates}; @@ -236,7 +235,8 @@ impl InformationSchemaPartitionsBuilder { let partition_manager = catalog_manager .as_any() .downcast_ref::() - .map(|catalog_manager| catalog_manager.partition_manager()); + .map(|catalog_manager| catalog_manager.partition_manager()) + .context(PartitionManagerNotFoundSnafu)?; let predicates = Predicates::from_scan_request(&request); @@ -262,27 +262,10 @@ impl InformationSchemaPartitionsBuilder { let table_ids: Vec = table_infos.iter().map(|info| info.ident.table_id).collect(); - let mut table_partitions = if let Some(partition_manager) = &partition_manager { - partition_manager - .batch_find_table_partitions(&table_ids) - .await - .context(FindPartitionsSnafu)? - } else { - // Current node must be a standalone instance, contains only one partition by default. - // TODO(dennis): change it when we support multi-regions for standalone. - table_ids - .into_iter() - .map(|table_id| { - ( - table_id, - vec![PartitionInfo { - id: RegionId::new(table_id, 0), - partition: PartitionDef::new(vec![], vec![]), - }], - ) - }) - .collect() - }; + let mut table_partitions = partition_manager + .batch_find_table_partitions(&table_ids) + .await + .context(FindPartitionsSnafu)?; for table_info in table_infos { let partitions = table_partitions diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index 976c920b9ab9..b258b857b2db 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; -use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID; +use common_catalog::consts::{INFORMATION_SCHEMA_TABLES_TABLE_ID, MITO_ENGINE}; use common_error::ext::BoxedError; +use common_meta::datanode::RegionStat; use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_telemetry::error; use datafusion::execution::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; @@ -31,7 +34,7 @@ use datatypes::vectors::{ }; use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{ScanRequest, TableId}; +use store_api::storage::{RegionId, ScanRequest, TableId}; use table::metadata::{TableInfo, TableType}; use super::TABLES; @@ -39,6 +42,7 @@ use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; use crate::system_schema::information_schema::{InformationTable, Predicates}; +use crate::system_schema::utils; use crate::CatalogManager; pub const TABLE_CATALOG: &str = "table_catalog"; @@ -234,17 +238,50 @@ impl InformationSchemaTablesBuilder { .context(UpgradeWeakCatalogManagerRefSnafu)?; let predicates = Predicates::from_scan_request(&request); + let information_extension = utils::information_extension(&self.catalog_manager)?; + + // TODO(dennis): `region_stats` API is not stable in distributed cluster because of network issue etc. + // But we don't want the statements such as `show tables` fail, + // so using `unwrap_or_else` here instead of `?` operator. + let region_stats = information_extension + .region_stats() + .await + .map_err(|e| { + error!(e; "Failed to call region_stats"); + e + }) + .unwrap_or_else(|_| vec![]); + for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { let mut stream = catalog_manager.tables(&catalog_name, &schema_name, None); while let Some(table) = stream.try_next().await? { let table_info = table.table_info(); + + // TODO(dennis): make it working for metric engine + let table_region_stats = if table_info.meta.engine == MITO_ENGINE { + let region_ids = table_info + .meta + .region_numbers + .iter() + .map(|n| RegionId::new(table_info.ident.table_id, *n)) + .collect::>(); + + region_stats + .iter() + .filter(|stat| region_ids.contains(&stat.id)) + .collect::>() + } else { + vec![] + }; + self.add_table( &predicates, &catalog_name, &schema_name, table_info, table.table_type(), + &table_region_stats, ); } } @@ -260,6 +297,7 @@ impl InformationSchemaTablesBuilder { schema_name: &str, table_info: Arc, table_type: TableType, + region_stats: &[&RegionStat], ) { let table_name = table_info.name.as_ref(); let table_id = table_info.table_id(); @@ -273,7 +311,9 @@ impl InformationSchemaTablesBuilder { let row = [ (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_ID, &Value::from(table_id)), (TABLE_SCHEMA, &Value::from(schema_name)), + (ENGINE, &Value::from(engine)), (TABLE_NAME, &Value::from(table_name)), (TABLE_TYPE, &Value::from(table_type_text)), ]; @@ -287,21 +327,39 @@ impl InformationSchemaTablesBuilder { self.table_names.push(Some(table_name)); self.table_types.push(Some(table_type_text)); self.table_ids.push(Some(table_id)); + + let data_length = region_stats.iter().map(|stat| stat.sst_size).sum(); + let table_rows = region_stats.iter().map(|stat| stat.num_rows).sum(); + let index_length = region_stats.iter().map(|stat| stat.index_size).sum(); + + // It's not precise, but it is acceptable for long-term data storage. + let avg_row_length = if table_rows > 0 { + let total_data_length = data_length + + region_stats + .iter() + .map(|stat| stat.memtable_size) + .sum::(); + + total_data_length / table_rows + } else { + 0 + }; + + self.data_length.push(Some(data_length)); + self.index_length.push(Some(index_length)); + self.table_rows.push(Some(table_rows)); + self.avg_row_length.push(Some(avg_row_length)); + // TODO(sunng87): use real data for these fields - self.data_length.push(Some(0)); self.max_data_length.push(Some(0)); - self.index_length.push(Some(0)); - self.avg_row_length.push(Some(0)); - self.max_index_length.push(Some(0)); self.checksum.push(Some(0)); - self.table_rows.push(Some(0)); + self.max_index_length.push(Some(0)); self.data_free.push(Some(0)); self.auto_increment.push(Some(0)); self.row_format.push(Some("Fixed")); self.table_collation.push(Some("utf8_bin")); self.update_time.push(None); self.check_time.push(None); - // use mariadb default table version number here self.version.push(Some(11)); self.table_comment.push(table_info.desc.as_deref()); diff --git a/tests/cases/standalone/common/information_schema/region_statistics.result b/tests/cases/standalone/common/information_schema/region_statistics.result index 6e49679cf172..b4f931300a12 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.result +++ b/tests/cases/standalone/common/information_schema/region_statistics.result @@ -32,6 +32,14 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) | 3 | 2145 | 0 | 0 | +-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+ +SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; + ++-------------+--------------+----------------+------------+ +| data_length | index_length | avg_row_length | table_rows | ++-------------+--------------+----------------+------------+ +| 0 | 0 | 26 | 3 | ++-------------+--------------+----------------+------------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/information_schema/region_statistics.sql b/tests/cases/standalone/common/information_schema/region_statistics.sql index 9b6e64890405..ed7a7b0cfcf8 100644 --- a/tests/cases/standalone/common/information_schema/region_statistics.sql +++ b/tests/cases/standalone/common/information_schema/region_statistics.sql @@ -21,4 +21,6 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size) FROM INFORMATION_SCHEMA.REGION_STATISTICS WHERE table_id IN (SELECT TABLE_ID FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test' and table_schema = 'public'); +SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test'; + DROP TABLE test;