Skip to content

Commit

Permalink
feat: adds some tables to information_schema (#2935)
Browse files Browse the repository at this point in the history
* feat: adds engines table to information_schema

* feat: adds COLUMN_PRIVILEGES and COLUMN_STATISTICS

* feat: refactor memory tables

* chore: rename memory_tables

* test: adds unit tests

* chore: format

* chore: style

* fix: by cr comments

* refactor: tables
  • Loading branch information
killme2008 authored Dec 18, 2023
1 parent 5dc7ce1 commit 262a79a
Show file tree
Hide file tree
Showing 16 changed files with 623 additions and 124 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ meta-client.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
paste = "1.0"
prometheus.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down
104 changes: 85 additions & 19 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@
// limitations under the License.

mod columns;
mod memory_table;
mod table_names;
mod tables;

use std::collections::HashMap;
use std::sync::{Arc, Weak};

use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_catalog::consts::{self, INFORMATION_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use lazy_static::lazy_static;
use paste::paste;
use snafu::ResultExt;
use store_api::data_source::DataSource;
use store_api::storage::{ScanRequest, TableId};
Expand All @@ -32,43 +36,101 @@ use table::metadata::{
};
use table::thin_table::{ThinTable, ThinTableAdapter};
use table::TableRef;
pub use table_names::*;

use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::memory_table::{get_schema_columns, MemoryTable};
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

pub const TABLES: &str = "tables";
pub const COLUMNS: &str = "columns";
lazy_static! {
// Memory tables in `information_schema`.
static ref MEMORY_TABLES: &'static [&'static str] = &[
ENGINES,
COLUMN_PRIVILEGES,
COLUMN_STATISTICS
];
}

macro_rules! setup_memory_table {
($name: expr) => {
paste! {
{
let (schema, columns) = get_schema_columns($name);
Some(Arc::new(MemoryTable::new(
consts::[<INFORMATION_SCHEMA_ $name _TABLE_ID>],
$name,
schema,
columns
)) as _)
}
}
};
}

/// The `information_schema` tables info provider.
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
tables: HashMap<String, TableRef>,
}

impl InformationSchemaProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
let mut provider = Self {
catalog_name,
catalog_manager,
}
tables: HashMap::new(),
};

provider.build_tables();

provider
}

/// Build a map of [TableRef] in information schema.
/// Including `tables` and `columns`.
pub fn build(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> HashMap<String, TableRef> {
let provider = Self::new(catalog_name, catalog_manager);

let mut schema = HashMap::new();
schema.insert(TABLES.to_owned(), provider.table(TABLES).unwrap());
schema.insert(COLUMNS.to_owned(), provider.table(COLUMNS).unwrap());
schema
/// Returns table names in the order of table id.
pub fn table_names(&self) -> Vec<String> {
let mut tables = self.tables.values().clone().collect::<Vec<_>>();

tables.sort_by(|t1, t2| {
t1.table_info()
.table_id()
.partial_cmp(&t2.table_info().table_id())
.unwrap()
});
tables
.into_iter()
.map(|t| t.table_info().name.clone())
.collect()
}

/// Returns a map of [TableRef] in information schema.
pub fn tables(&self) -> &HashMap<String, TableRef> {
assert!(!self.tables.is_empty());

&self.tables
}

/// Returns the [TableRef] by table name.
pub fn table(&self, name: &str) -> Option<TableRef> {
self.tables.get(name).cloned()
}

fn build_tables(&mut self) {
let mut tables = HashMap::new();
tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
tables.insert(COLUMNS.to_string(), self.build_table(COLUMNS).unwrap());

// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).unwrap());
}

self.tables = tables;
}

fn build_table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Unsupported;
Expand All @@ -89,6 +151,9 @@ impl InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
ENGINES => setup_memory_table!(ENGINES),
COLUMN_PRIVILEGES => setup_memory_table!(COLUMN_PRIVILEGES),
COLUMN_STATISTICS => setup_memory_table!(COLUMN_STATISTICS),
_ => None,
}
}
Expand All @@ -102,9 +167,9 @@ impl InformationSchemaProvider {
.unwrap();
let table_info = TableInfoBuilder::default()
.table_id(table.table_id())
.name(table.table_name().to_owned())
.name(table.table_name().to_string())
.catalog_name(catalog_name)
.schema_name(INFORMATION_SCHEMA_NAME.to_owned())
.schema_name(INFORMATION_SCHEMA_NAME.to_string())
.meta(table_meta)
.table_type(table.table_type())
.build()
Expand Down Expand Up @@ -176,6 +241,7 @@ impl DataSource for InformationTableDataSource {
stream: Box::pin(stream),
output_ordering: None,
};

Ok(Box::pin(stream))
}
}
68 changes: 29 additions & 39 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, INFORMATION_SCHEMA_NAME, SEMANTIC_TYPE_FIELD,
SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX,
INFORMATION_SCHEMA_COLUMNS_TABLE_ID, SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY,
SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
Expand All @@ -33,8 +33,7 @@ use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;

use super::tables::InformationSchemaTables;
use super::{InformationTable, COLUMNS, TABLES};
use super::{InformationTable, COLUMNS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
Expand Down Expand Up @@ -102,7 +101,7 @@ impl InformationTable for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_columns()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down Expand Up @@ -148,8 +147,8 @@ impl InformationSchemaColumnsBuilder {
}
}

/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
/// Construct the `information_schema.columns` virtual table
async fn make_columns(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
Expand All @@ -163,48 +162,38 @@ impl InformationSchemaColumnsBuilder {
{
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let (keys, schema) = if let Some(table) = catalog_manager
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
(keys.clone(), schema)
} else {
// TODO: this specific branch is only a workaround for FrontendCatalogManager.
if schema_name == INFORMATION_SCHEMA_NAME {
if table_name == COLUMNS {
(vec![], InformationSchemaColumns::schema())
} else if table_name == TABLES {
(vec![], InformationSchemaTables::schema())

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
continue;
}
} else {
continue;
SEMANTIC_TYPE_FIELD
};

self.add_column(
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
);
}
};

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
self.add_column(
&catalog_name,
&schema_name,
&table_name,
&column.name,
&column.data_type.name(),
semantic_type,
);
} else {
unreachable!();
}
}
}
Expand Down Expand Up @@ -238,6 +227,7 @@ impl InformationSchemaColumnsBuilder {
Arc::new(self.data_types.finish()),
Arc::new(self.semantic_types.finish()),
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
Expand All @@ -254,7 +244,7 @@ impl DfPartitionStream for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_columns()
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
Expand Down
Loading

0 comments on commit 262a79a

Please sign in to comment.