From 262a79a170384d3aa2ec1402bd01d0d390fe5ca5 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 18 Dec 2023 14:10:22 +0800 Subject: [PATCH] feat: adds some tables to information_schema (#2935) * feat: adds engines table to information_schema * feat: adds COLUMN_PRIVILEGES and COLUMN_STATISTICS * feat: refactor memory tables * chore: rename memory_tables * test: adds unit tests * chore: format * chore: style * fix: by cr comments * refactor: tables --- Cargo.lock | 1 + src/catalog/Cargo.toml | 1 + src/catalog/src/information_schema.rs | 104 +++++++-- src/catalog/src/information_schema/columns.rs | 68 +++--- .../src/information_schema/memory_table.rs | 214 ++++++++++++++++++ .../information_schema/memory_table/tables.rs | 108 +++++++++ .../src/information_schema/table_names.rs | 21 ++ src/catalog/src/information_schema/tables.rs | 32 +-- src/catalog/src/kvbackend/manager.rs | 15 +- src/catalog/src/memory/manager.rs | 3 +- src/common/catalog/src/consts.rs | 9 + src/common/recordbatch/src/recordbatch.rs | 10 + tests-integration/src/tests/instance_test.rs | 5 +- .../common/show/show_databases_tables.result | 15 +- .../common/system/information_schema.result | 125 ++++++++-- .../common/system/information_schema.sql | 16 ++ 16 files changed, 623 insertions(+), 124 deletions(-) create mode 100644 src/catalog/src/information_schema/memory_table.rs create mode 100644 src/catalog/src/information_schema/memory_table/tables.rs create mode 100644 src/catalog/src/information_schema/table_names.rs diff --git a/Cargo.lock b/Cargo.lock index 5aa53779d444..fdfabd0a6ec3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1198,6 +1198,7 @@ dependencies = [ "object-store", "parking_lot 0.12.1", "partition", + "paste", "prometheus", "regex", "serde", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index fb41bf15d94d..93f5b8b91c52 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -33,6 +33,7 @@ meta-client.workspace = true moka = { workspace = true, features = ["future"] } parking_lot = "0.12" partition.workspace = true +paste = "1.0" prometheus.workspace = true regex.workspace = true serde.workspace = true diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 484529cd57a4..839f83035010 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -13,16 +13,20 @@ // limitations under the License. mod columns; +mod memory_table; +mod table_names; mod tables; use std::collections::HashMap; use std::sync::{Arc, Weak}; -use common_catalog::consts::INFORMATION_SCHEMA_NAME; +use common_catalog::consts::{self, INFORMATION_SCHEMA_NAME}; use common_error::ext::BoxedError; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures_util::StreamExt; +use lazy_static::lazy_static; +use paste::paste; use snafu::ResultExt; use store_api::data_source::DataSource; use store_api::storage::{ScanRequest, TableId}; @@ -32,43 +36,101 @@ use table::metadata::{ }; use table::thin_table::{ThinTable, ThinTableAdapter}; use table::TableRef; +pub use table_names::*; use self::columns::InformationSchemaColumns; use crate::error::Result; +use crate::information_schema::memory_table::{get_schema_columns, MemoryTable}; use crate::information_schema::tables::InformationSchemaTables; use crate::CatalogManager; -pub const TABLES: &str = "tables"; -pub const COLUMNS: &str = "columns"; +lazy_static! { + // Memory tables in `information_schema`. + static ref MEMORY_TABLES: &'static [&'static str] = &[ + ENGINES, + COLUMN_PRIVILEGES, + COLUMN_STATISTICS + ]; +} +macro_rules! setup_memory_table { + ($name: expr) => { + paste! { + { + let (schema, columns) = get_schema_columns($name); + Some(Arc::new(MemoryTable::new( + consts::[], + $name, + schema, + columns + )) as _) + } + } + }; +} + +/// The `information_schema` tables info provider. pub struct InformationSchemaProvider { catalog_name: String, catalog_manager: Weak, + tables: HashMap, } impl InformationSchemaProvider { pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { - Self { + let mut provider = Self { catalog_name, catalog_manager, - } + tables: HashMap::new(), + }; + + provider.build_tables(); + + provider } - /// Build a map of [TableRef] in information schema. - /// Including `tables` and `columns`. - pub fn build( - catalog_name: String, - catalog_manager: Weak, - ) -> HashMap { - let provider = Self::new(catalog_name, catalog_manager); - - let mut schema = HashMap::new(); - schema.insert(TABLES.to_owned(), provider.table(TABLES).unwrap()); - schema.insert(COLUMNS.to_owned(), provider.table(COLUMNS).unwrap()); - schema + /// Returns table names in the order of table id. + pub fn table_names(&self) -> Vec { + let mut tables = self.tables.values().clone().collect::>(); + + tables.sort_by(|t1, t2| { + t1.table_info() + .table_id() + .partial_cmp(&t2.table_info().table_id()) + .unwrap() + }); + tables + .into_iter() + .map(|t| t.table_info().name.clone()) + .collect() } + /// Returns a map of [TableRef] in information schema. + pub fn tables(&self) -> &HashMap { + assert!(!self.tables.is_empty()); + + &self.tables + } + + /// Returns the [TableRef] by table name. pub fn table(&self, name: &str) -> Option { + self.tables.get(name).cloned() + } + + fn build_tables(&mut self) { + let mut tables = HashMap::new(); + tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); + tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap()); + + // Add memory tables + for name in MEMORY_TABLES.iter() { + tables.insert((*name).to_string(), self.build_table(name).unwrap()); + } + + self.tables = tables; + } + + fn build_table(&self, name: &str) -> Option { self.information_table(name).map(|table| { let table_info = Self::table_info(self.catalog_name.clone(), &table); let filter_pushdown = FilterPushDownType::Unsupported; @@ -89,6 +151,9 @@ impl InformationSchemaProvider { self.catalog_name.clone(), self.catalog_manager.clone(), )) as _), + ENGINES => setup_memory_table!(ENGINES), + COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES), + COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS), _ => None, } } @@ -102,9 +167,9 @@ impl InformationSchemaProvider { .unwrap(); let table_info = TableInfoBuilder::default() .table_id(table.table_id()) - .name(table.table_name().to_owned()) + .name(table.table_name().to_string()) .catalog_name(catalog_name) - .schema_name(INFORMATION_SCHEMA_NAME.to_owned()) + .schema_name(INFORMATION_SCHEMA_NAME.to_string()) .meta(table_meta) .table_type(table.table_type()) .build() @@ -176,6 +241,7 @@ impl DataSource for InformationTableDataSource { stream: Box::pin(stream), output_ordering: None, }; + Ok(Box::pin(stream)) } } diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 34e9c7ef66c5..53f338783ad3 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -16,8 +16,8 @@ use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::{ - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD, - SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX, + INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, + SEMANTIC_TYPE_TIME_INDEX, }; use common_error::ext::BoxedError; use common_query::physical_plan::TaskContext; @@ -33,8 +33,7 @@ use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; -use super::tables::InformationSchemaTables; -use super::{InformationTable, COLUMNS, TABLES}; +use super::{InformationTable, COLUMNS}; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -102,7 +101,7 @@ impl InformationTable for InformationSchemaColumns { schema, futures::stream::once(async move { builder - .make_tables() + .make_columns() .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) @@ -148,8 +147,8 @@ impl InformationSchemaColumnsBuilder { } } - /// Construct the `information_schema.tables` virtual table - async fn make_tables(&mut self) -> Result { + /// Construct the `information_schema.columns` virtual table + async fn make_columns(&mut self) -> Result { let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager @@ -163,48 +162,38 @@ impl InformationSchemaColumnsBuilder { { continue; } + for table_name in catalog_manager .table_names(&catalog_name, &schema_name) .await? { - let (keys, schema) = if let Some(table) = catalog_manager + if let Some(table) = catalog_manager .table(&catalog_name, &schema_name, &table_name) .await? { let keys = &table.table_info().meta.primary_key_indices; let schema = table.schema(); - (keys.clone(), schema) - } else { - // TODO: this specific branch is only a workaround for FrontendCatalogManager. - if schema_name == INFORMATION_SCHEMA_NAME { - if table_name == COLUMNS { - (vec![], InformationSchemaColumns::schema()) - } else if table_name == TABLES { - (vec![], InformationSchemaTables::schema()) + + for (idx, column) in schema.column_schemas().iter().enumerate() { + let semantic_type = if column.is_time_index() { + SEMANTIC_TYPE_TIME_INDEX + } else if keys.contains(&idx) { + SEMANTIC_TYPE_PRIMARY_KEY } else { - continue; - } - } else { - continue; + SEMANTIC_TYPE_FIELD + }; + + self.add_column( + &catalog_name, + &schema_name, + &table_name, + &column.name, + &column.data_type.name(), + semantic_type, + ); } - }; - - for (idx, column) in schema.column_schemas().iter().enumerate() { - let semantic_type = if column.is_time_index() { - SEMANTIC_TYPE_TIME_INDEX - } else if keys.contains(&idx) { - SEMANTIC_TYPE_PRIMARY_KEY - } else { - SEMANTIC_TYPE_FIELD - }; - self.add_column( - &catalog_name, - &schema_name, - &table_name, - &column.name, - &column.data_type.name(), - semantic_type, - ); + } else { + unreachable!(); } } } @@ -238,6 +227,7 @@ impl InformationSchemaColumnsBuilder { Arc::new(self.data_types.finish()), Arc::new(self.semantic_types.finish()), ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) } } @@ -254,7 +244,7 @@ impl DfPartitionStream for InformationSchemaColumns { schema, futures::stream::once(async move { builder - .make_tables() + .make_columns() .await .map(|x| x.into_df_record_batch()) .map_err(Into::into) diff --git a/src/catalog/src/information_schema/memory_table.rs b/src/catalog/src/information_schema/memory_table.rs new file mode 100644 index 000000000000..cce53c88a73c --- /dev/null +++ b/src/catalog/src/information_schema/memory_table.rs @@ -0,0 +1,214 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod tables; +use std::sync::Arc; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_error::ext::BoxedError; +use common_query::physical_plan::TaskContext; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::schema::SchemaRef; +use datatypes::vectors::VectorRef; +use snafu::ResultExt; +use store_api::storage::TableId; +pub use tables::get_schema_columns; + +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; +use crate::information_schema::InformationTable; + +/// A memory table with specified schema and columns. +pub(super) struct MemoryTable { + table_id: TableId, + table_name: &'static str, + schema: SchemaRef, + columns: Vec, +} + +impl MemoryTable { + /// Creates a memory table with table id, name, schema and columns. + pub(super) fn new( + table_id: TableId, + table_name: &'static str, + schema: SchemaRef, + columns: Vec, + ) -> Self { + Self { + table_id, + table_name, + schema, + columns, + } + } + + fn builder(&self) -> MemoryTableBuilder { + MemoryTableBuilder::new(self.schema.clone(), self.columns.clone()) + } +} + +impl InformationTable for MemoryTable { + fn table_id(&self) -> TableId { + self.table_id + } + + fn table_name(&self) -> &'static str { + self.table_name + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .memory_records() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct MemoryTableBuilder { + schema: SchemaRef, + columns: Vec, +} + +impl MemoryTableBuilder { + fn new(schema: SchemaRef, columns: Vec) -> Self { + Self { schema, columns } + } + + /// Construct the `information_schema.{table_name}` virtual table + async fn memory_records(&mut self) -> Result { + if self.columns.is_empty() { + RecordBatch::new_empty(self.schema.clone()).context(CreateRecordBatchSnafu) + } else { + RecordBatch::new(self.schema.clone(), std::mem::take(&mut self.columns)) + .context(CreateRecordBatchSnafu) + } + } +} + +impl DfPartitionStream for MemoryTable { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .memory_records() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_recordbatch::RecordBatches; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::StringVector; + + use super::*; + + #[tokio::test] + async fn test_memory_table() { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), + ])); + + let table = MemoryTable::new( + 42, + "test", + schema.clone(), + vec![ + Arc::new(StringVector::from(vec!["a1", "a2"])), + Arc::new(StringVector::from(vec!["b1", "b2"])), + ], + ); + + assert_eq!(42, table.table_id()); + assert_eq!("test", table.table_name()); + assert_eq!(schema, InformationTable::schema(&table)); + + let stream = table.to_stream().unwrap(); + + let batches = RecordBatches::try_collect(stream).await.unwrap(); + + assert_eq!( + "\ ++----+----+ +| a | b | ++----+----+ +| a1 | b1 | +| a2 | b2 | ++----+----+", + batches.pretty_print().unwrap() + ); + } + + #[tokio::test] + async fn test_empty_memory_table() { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), + ])); + + let table = MemoryTable::new(42, "test", schema.clone(), vec![]); + + assert_eq!(42, table.table_id()); + assert_eq!("test", table.table_name()); + assert_eq!(schema, InformationTable::schema(&table)); + + let stream = table.to_stream().unwrap(); + + let batches = RecordBatches::try_collect(stream).await.unwrap(); + + assert_eq!( + "\ ++---+---+ +| a | b | ++---+---+ ++---+---+", + batches.pretty_print().unwrap() + ); + } +} diff --git a/src/catalog/src/information_schema/memory_table/tables.rs b/src/catalog/src/information_schema/memory_table/tables.rs new file mode 100644 index 000000000000..b17e1d997643 --- /dev/null +++ b/src/catalog/src/information_schema/memory_table/tables.rs @@ -0,0 +1,108 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::MITO_ENGINE; +use datatypes::prelude::{ConcreteDataType, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::StringVector; + +use crate::information_schema::table_names::*; + +/// Find the schema and columns by the table_name, only valid for memory tables. +/// Safety: the user MUST ensure the table schema exists, panic otherwise. +pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { + let (column_schemas, columns): (_, Vec) = match table_name { + COLUMN_PRIVILEGES => ( + string_columns(&[ + "GRANTEE", + "TABLE_CATALOG", + "TABLE_SCHEMA", + "TABLE_NAME", + "COLUMN_NAME", + "PRIVILEGE_TYPE", + "IS_GRANTABLE", + ]), + vec![], + ), + + COLUMN_STATISTICS => ( + string_columns(&[ + "SCHEMA_NAME", + "TABLE_NAME", + "COLUMN_NAME", + // TODO(dennis): It must be a JSON type, but we don't support it yet + "HISTOGRAM", + ]), + vec![], + ), + + ENGINES => ( + string_columns(&[ + "ENGINE", + "SUPPORT", + "COMMENT", + "TRANSACTIONS", + "XA", + "SAVEPOINTS", + ]), + vec![ + Arc::new(StringVector::from(vec![MITO_ENGINE])), + Arc::new(StringVector::from(vec!["DEFAULT"])), + Arc::new(StringVector::from(vec![ + "Storage engine for time-series data", + ])), + Arc::new(StringVector::from(vec!["NO"])), + Arc::new(StringVector::from(vec!["NO"])), + Arc::new(StringVector::from(vec!["NO"])), + ], + ), + + _ => unreachable!("Unknown table in information_schema: {}", table_name), + }; + + (Arc::new(Schema::new(column_schemas)), columns) +} + +fn string_columns(names: &[&'static str]) -> Vec { + names.iter().map(|name| string_column(name)).collect() +} + +fn string_column(name: &str) -> ColumnSchema { + ColumnSchema::new( + str::to_lowercase(name), + ConcreteDataType::string_datatype(), + false, + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_string_columns() { + let columns = ["a", "b", "c"]; + let column_schemas = string_columns(&columns); + + assert_eq!(3, column_schemas.len()); + for (i, name) in columns.iter().enumerate() { + let cs = column_schemas.get(i).unwrap(); + + assert_eq!(*name, cs.name); + assert_eq!(ConcreteDataType::string_datatype(), cs.data_type); + } + } +} diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/information_schema/table_names.rs new file mode 100644 index 000000000000..0b3404f472a2 --- /dev/null +++ b/src/catalog/src/information_schema/table_names.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// All table names in `information_schema`. + +pub const TABLES: &str = "tables"; +pub const COLUMNS: &str = "columns"; +pub const ENGINES: &str = "engines"; +pub const COLUMN_PRIVILEGES: &str = "column_privileges"; +pub const COLUMN_STATISTICS: &str = "column_statistics"; diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index a626dbfdd31a..d258dd490130 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -15,10 +15,7 @@ use std::sync::{Arc, Weak}; use arrow_schema::SchemaRef as ArrowSchemaRef; -use common_catalog::consts::{ - INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, - INFORMATION_SCHEMA_TABLES_TABLE_ID, -}; +use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID; use common_error::ext::BoxedError; use common_query::physical_plan::TaskContext; use common_recordbatch::adapter::RecordBatchStreamAdapter; @@ -33,7 +30,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::storage::TableId; use table::metadata::TableType; -use super::{COLUMNS, TABLES}; +use super::TABLES; use crate::error::{ CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, }; @@ -178,29 +175,8 @@ impl InformationSchemaTablesBuilder { Some(&table_info.meta.engine), ); } else { - // TODO: this specific branch is only a workaround for FrontendCatalogManager. - if schema_name == INFORMATION_SCHEMA_NAME { - if table_name == COLUMNS { - self.add_table( - &catalog_name, - &schema_name, - &table_name, - TableType::Temporary, - Some(INFORMATION_SCHEMA_COLUMNS_TABLE_ID), - None, - ); - } else if table_name == TABLES { - self.add_table( - &catalog_name, - &schema_name, - &table_name, - TableType::Temporary, - Some(INFORMATION_SCHEMA_TABLES_TABLE_ID), - None, - ); - } - } - }; + unreachable!(); + } } } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 2c5028b40908..93536ab73f41 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -38,7 +38,7 @@ use crate::error::{ self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, Result as CatalogResult, TableMetadataManagerSnafu, }; -use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES}; +use crate::information_schema::InformationSchemaProvider; use crate::CatalogManager; /// Access all existing catalog, schema and tables. @@ -81,6 +81,11 @@ impl KvBackendCatalogManager { cache_invalidator, system_catalog: SystemCatalog { catalog_manager: me.clone(), + information_schema_provider: Arc::new(InformationSchemaProvider::new( + // The catalog name is not used in system_catalog, so let it empty + "".to_string(), + me.clone(), + )), }, }) } @@ -231,11 +236,11 @@ impl CatalogManager for KvBackendCatalogManager { // a new catalog is created. /// Existing system tables: /// - public.numbers -/// - information_schema.tables -/// - information_schema.columns +/// - information_schema.{tables} #[derive(Clone)] struct SystemCatalog { catalog_manager: Weak, + information_schema_provider: Arc, } impl SystemCatalog { @@ -245,7 +250,7 @@ impl SystemCatalog { fn table_names(&self, schema: &str) -> Vec { if schema == INFORMATION_SCHEMA_NAME { - vec![TABLES.to_string(), COLUMNS.to_string()] + self.information_schema_provider.table_names() } else if schema == DEFAULT_SCHEMA_NAME { vec![NUMBERS_TABLE_NAME.to_string()] } else { @@ -259,7 +264,7 @@ impl SystemCatalog { fn table_exist(&self, schema: &str, table: &str) -> bool { if schema == INFORMATION_SCHEMA_NAME { - table == TABLES || table == COLUMNS + self.information_schema_provider.table(table).is_some() } else if schema == DEFAULT_SCHEMA_NAME { table == NUMBERS_TABLE_NAME } else { diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 5d08c1162600..745256beeaa8 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -243,10 +243,11 @@ impl MemoryCatalogManager { } fn create_catalog_entry(self: &Arc, catalog: String) -> SchemaEntries { - let information_schema = InformationSchemaProvider::build( + let information_schema_provider = InformationSchemaProvider::new( catalog, Arc::downgrade(self) as Weak, ); + let information_schema = information_schema_provider.tables().clone(); let mut catalog = HashMap::new(); catalog.insert(INFORMATION_SCHEMA_NAME.to_string(), information_schema); catalog diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 111ffb8a4d45..1de1e263fae4 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -29,10 +29,19 @@ pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0; pub const SCRIPTS_TABLE_ID: u32 = 1; /// numbers table id pub const NUMBERS_TABLE_ID: u32 = 2; + +/// ----- Begin of information_schema tables ----- /// id for information_schema.tables pub const INFORMATION_SCHEMA_TABLES_TABLE_ID: u32 = 3; /// id for information_schema.columns pub const INFORMATION_SCHEMA_COLUMNS_TABLE_ID: u32 = 4; +/// id for information_schema.engines +pub const INFORMATION_SCHEMA_ENGINES_TABLE_ID: u32 = 5; +/// id for information_schema.column_privileges +pub const INFORMATION_SCHEMA_COLUMN_PRIVILEGES_TABLE_ID: u32 = 6; +/// id for information_schema.column_statistics +pub const INFORMATION_SCHEMA_COLUMN_STATISTICS_TABLE_ID: u32 = 7; +/// ----- End of information_schema tables ----- pub const MITO_ENGINE: &str = "mito"; pub const MITO2_ENGINE: &str = "mito2"; diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 5de969dbbdb7..420901902b64 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -57,6 +57,16 @@ impl RecordBatch { }) } + /// Create an empty [`RecordBatch`] from `schema`. + pub fn new_empty(schema: SchemaRef) -> Result { + let df_record_batch = DfRecordBatch::new_empty(schema.arrow_schema().clone()); + Ok(RecordBatch { + schema, + columns: vec![], + df_record_batch, + }) + } + pub fn try_project(&self, indices: &[usize]) -> Result { let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?); let mut columns = Vec::with_capacity(indices.len()); diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 6bf5fc0d4784..5054182b56d4 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -1732,7 +1732,7 @@ async fn test_information_schema_dot_tables(instance: Arc) { // User can only see information schema under current catalog. // A necessary requirement to GreptimeCloud. - let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' order by table_name"; + let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' and table_name in ('columns', 'numbers', 'tables', 'another_table') order by table_name"; let output = execute_sql(&instance, sql).await; let expected = "\ @@ -1760,6 +1760,7 @@ async fn test_information_schema_dot_tables(instance: Arc) { #[apply(both_instances_cases)] async fn test_information_schema_dot_columns(instance: Arc) { + logging::init_default_ut_logging(); let instance = instance.frontend(); let sql = "create table another_table(i timestamp time index)"; @@ -1769,7 +1770,7 @@ async fn test_information_schema_dot_columns(instance: Arc) { // User can only see information schema under current catalog. // A necessary requirement to GreptimeCloud. - let sql = "select table_catalog, table_schema, table_name, column_name, data_type, semantic_type from information_schema.columns order by table_name"; + let sql = "select table_catalog, table_schema, table_name, column_name, data_type, semantic_type from information_schema.columns where table_name in ('columns', 'numbers', 'tables', 'another_table') order by table_name"; let output = execute_sql(&instance, sql).await; let expected = "\ diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index c4e4f402603e..d68f1d991612 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -16,10 +16,13 @@ Affected Rows: 0 show tables; -+---------+ -| Tables | -+---------+ -| columns | -| tables | -+---------+ ++-------------------+ +| Tables | ++-------------------+ +| column_privileges | +| column_statistics | +| columns | +| engines | +| tables | ++-------------------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 6a33a00baff3..e9bb4535e201 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -4,33 +4,53 @@ from information_schema.tables where table_name != 'scripts' order by table_schema, table_name; -+---------------+--------------------+------------+-----------------+----------+-------------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+--------------------+------------+-----------------+----------+-------------+ -| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | | -| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | -| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | -+---------------+--------------------+------------+-----------------+----------+-------------+ ++---------------+--------------------+-------------------+-----------------+----------+-------------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+--------------------+-------------------+-----------------+----------+-------------+ +| greptime | information_schema | column_privileges | LOCAL TEMPORARY | 6 | | +| greptime | information_schema | column_statistics | LOCAL TEMPORARY | 7 | | +| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | | +| greptime | information_schema | engines | LOCAL TEMPORARY | 5 | | +| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | +| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | ++---------------+--------------------+-------------------+-----------------+----------+-------------+ select * from information_schema.columns order by table_schema, table_name; -+---------------+--------------------+------------+---------------+-----------+---------------+ -| table_catalog | table_schema | table_name | column_name | data_type | semantic_type | -+---------------+--------------------+------------+---------------+-----------+---------------+ -| greptime | information_schema | columns | table_catalog | String | FIELD | -| greptime | information_schema | columns | table_schema | String | FIELD | -| greptime | information_schema | columns | table_name | String | FIELD | -| greptime | information_schema | columns | column_name | String | FIELD | -| greptime | information_schema | columns | data_type | String | FIELD | -| greptime | information_schema | columns | semantic_type | String | FIELD | -| greptime | information_schema | tables | table_catalog | String | FIELD | -| greptime | information_schema | tables | table_schema | String | FIELD | -| greptime | information_schema | tables | table_name | String | FIELD | -| greptime | information_schema | tables | table_type | String | FIELD | -| greptime | information_schema | tables | table_id | UInt32 | FIELD | -| greptime | information_schema | tables | engine | String | FIELD | -| greptime | public | numbers | number | UInt32 | TAG | -+---------------+--------------------+------------+---------------+-----------+---------------+ ++---------------+--------------------+-------------------+----------------+-----------+---------------+ +| table_catalog | table_schema | table_name | column_name | data_type | semantic_type | ++---------------+--------------------+-------------------+----------------+-----------+---------------+ +| greptime | information_schema | column_privileges | grantee | String | FIELD | +| greptime | information_schema | column_privileges | is_grantable | String | FIELD | +| greptime | information_schema | column_privileges | privilege_type | String | FIELD | +| greptime | information_schema | column_privileges | column_name | String | FIELD | +| greptime | information_schema | column_privileges | table_name | String | FIELD | +| greptime | information_schema | column_privileges | table_schema | String | FIELD | +| greptime | information_schema | column_privileges | table_catalog | String | FIELD | +| greptime | information_schema | column_statistics | histogram | String | FIELD | +| greptime | information_schema | column_statistics | column_name | String | FIELD | +| greptime | information_schema | column_statistics | table_name | String | FIELD | +| greptime | information_schema | column_statistics | schema_name | String | FIELD | +| greptime | information_schema | columns | table_catalog | String | FIELD | +| greptime | information_schema | columns | semantic_type | String | FIELD | +| greptime | information_schema | columns | data_type | String | FIELD | +| greptime | information_schema | columns | column_name | String | FIELD | +| greptime | information_schema | columns | table_name | String | FIELD | +| greptime | information_schema | columns | table_schema | String | FIELD | +| greptime | information_schema | engines | engine | String | FIELD | +| greptime | information_schema | engines | support | String | FIELD | +| greptime | information_schema | engines | comment | String | FIELD | +| greptime | information_schema | engines | transactions | String | FIELD | +| greptime | information_schema | engines | xa | String | FIELD | +| greptime | information_schema | engines | savepoints | String | FIELD | +| greptime | information_schema | tables | table_catalog | String | FIELD | +| greptime | information_schema | tables | engine | String | FIELD | +| greptime | information_schema | tables | table_id | UInt32 | FIELD | +| greptime | information_schema | tables | table_type | String | FIELD | +| greptime | information_schema | tables | table_name | String | FIELD | +| greptime | information_schema | tables | table_schema | String | FIELD | +| greptime | public | numbers | number | UInt32 | TAG | ++---------------+--------------------+-------------------+----------------+-----------+---------------+ create database my_db; @@ -93,3 +113,60 @@ drop schema my_db; Error: 1001(Unsupported), SQL statement is not supported: drop schema my_db;, keyword: schema +use information_schema; + +Affected Rows: 0 + +-- test engines +select * from engines; + ++--------+---------+-------------------------------------+--------------+----+------------+ +| engine | support | comment | transactions | xa | savepoints | ++--------+---------+-------------------------------------+--------------+----+------------+ +| mito | DEFAULT | Storage engine for time-series data | NO | NO | NO | ++--------+---------+-------------------------------------+--------------+----+------------+ + +-- tables not implemented +desc table COLUMN_PRIVILEGES; + ++----------------+--------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++----------------+--------+-----+------+---------+---------------+ +| grantee | String | | NO | | FIELD | +| table_catalog | String | | NO | | FIELD | +| table_schema | String | | NO | | FIELD | +| table_name | String | | NO | | FIELD | +| column_name | String | | NO | | FIELD | +| privilege_type | String | | NO | | FIELD | +| is_grantable | String | | NO | | FIELD | ++----------------+--------+-----+------+---------+---------------+ + +select * from COLUMN_PRIVILEGES; + ++---------+---------------+--------------+------------+-------------+----------------+--------------+ +| grantee | table_catalog | table_schema | table_name | column_name | privilege_type | is_grantable | ++---------+---------------+--------------+------------+-------------+----------------+--------------+ ++---------+---------------+--------------+------------+-------------+----------------+--------------+ + +desc table COLUMN_STATISTICS; + ++-------------+--------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++-------------+--------+-----+------+---------+---------------+ +| schema_name | String | | NO | | FIELD | +| table_name | String | | NO | | FIELD | +| column_name | String | | NO | | FIELD | +| histogram | String | | NO | | FIELD | ++-------------+--------+-----+------+---------+---------------+ + +select * from COLUMN_STATISTICS; + ++-------------+------------+-------------+-----------+ +| schema_name | table_name | column_name | histogram | ++-------------+------------+-------------+-----------+ ++-------------+------------+-------------+-----------+ + +use public; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index 8c44edb4c885..f4ebc63303e7 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -38,3 +38,19 @@ order by table_schema, table_name; use public; drop schema my_db; + +use information_schema; + +-- test engines +select * from engines; + +-- tables not implemented +desc table COLUMN_PRIVILEGES; + +select * from COLUMN_PRIVILEGES; + +desc table COLUMN_STATISTICS; + +select * from COLUMN_STATISTICS; + +use public;