Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(index): support SQL to specify inverted index columns #4929

Merged
merged 12 commits into from
Nov 11, 2024
40 changes: 30 additions & 10 deletions src/api/src/v1/column_def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;

use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextOptions, COMMENT_KEY, FULLTEXT_KEY,
INVERTED_INDEX_KEY,
};
use snafu::ResultExt;

Expand All @@ -25,6 +26,8 @@ use crate::v1::{ColumnDef, ColumnOptions, SemanticType};

/// Key used to store fulltext options in gRPC column options.
const FULLTEXT_GRPC_KEY: &str = "fulltext";
/// Key used to store inverted index options in gRPC column options.
const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
zhongzc marked this conversation as resolved.
Show resolved Hide resolved

/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
Expand All @@ -49,10 +52,13 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if !column_def.comment.is_empty() {
metadata.insert(COMMENT_KEY.to_string(), column_def.comment.clone());
}
if let Some(options) = column_def.options.as_ref()
&& let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY)
{
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.to_string());
if let Some(options) = column_def.options.as_ref() {
if let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY) {
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.clone());
}
if let Some(inverted_index) = options.options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), inverted_index.clone());
}
}

ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable)
Expand All @@ -70,7 +76,12 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
if let Some(fulltext) = column_schema.metadata().get(FULLTEXT_KEY) {
options
.options
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.to_string());
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.clone());
}
if let Some(inverted_index) = column_schema.metadata().get(INVERTED_INDEX_KEY) {
options
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), inverted_index.clone());
}

(!options.options.is_empty()).then_some(options)
Expand Down Expand Up @@ -115,10 +126,13 @@ mod tests {
comment: "test_comment".to_string(),
datatype_extension: None,
options: Some(ColumnOptions {
options: HashMap::from([(
FULLTEXT_GRPC_KEY.to_string(),
"{\"enable\":true}".to_string(),
)]),
options: HashMap::from([
(
FULLTEXT_GRPC_KEY.to_string(),
"{\"enable\":true}".to_string(),
),
(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string()),
]),
}),
};

Expand All @@ -139,6 +153,7 @@ mod tests {
..Default::default()
}
);
assert!(schema.is_inverted_indexed());
}

#[test]
Expand All @@ -153,12 +168,17 @@ mod tests {
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
})
.unwrap();
.unwrap()
.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
);
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
"true"
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R
use crate::prelude::DataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY,
TIME_INDEX_KEY,
INVERTED_INDEX_KEY, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
Expand Down
21 changes: 20 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub const COMMENT_KEY: &str = "greptime:storage:comment";
const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
/// Key used to store fulltext options in arrow field's metadata.
pub const FULLTEXT_KEY: &str = "greptime:fulltext";

/// Key used to store whether the column has inverted index in arrow field's metadata.
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Schema of a column, used as an immutable struct.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ColumnSchema {
Expand Down Expand Up @@ -134,6 +135,24 @@ impl ColumnSchema {
self
}

pub fn set_inverted_index(mut self, value: bool) -> Self {
let _ = self
.metadata
.insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
self
}

pub fn is_inverted_indexed(&self) -> bool {
self.metadata
.get(INVERTED_INDEX_KEY)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

pub fn has_inverted_index_key(&self) -> bool {
self.metadata.contains_key(INVERTED_INDEX_KEY)
}

/// Set default constraint.
///
/// If a default constraint exists for the column, this method will
Expand Down
25 changes: 8 additions & 17 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,29 +393,20 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();

// TODO(zhongzc): currently we only index tag columns, need to support field columns.
let ignore_column_ids = &self
.version
.options
.index_options
.inverted_index
.ignore_column_ids;
let indexed_column_ids = self
.version
.metadata
.primary_key
.iter()
.filter(|id| !ignore_column_ids.contains(id))
.copied()
.collect::<HashSet<_>>();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
index_cache,
self.version.metadata.as_ref(),
indexed_column_ids,
self.version.metadata.inverted_indexed_column_ids(
self.version
.options
.index_options
.inverted_index
.ignore_column_ids
.iter(),
),
self.access_layer.puffin_manager_factory().clone(),
)
.build(&self.request.filters)
Expand Down
20 changes: 3 additions & 17 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub(crate) mod puffin_manager;
mod statistics;
mod store;

use std::collections::HashSet;
use std::num::NonZeroUsize;

use common_telemetry::{debug, warn};
Expand Down Expand Up @@ -213,28 +212,15 @@ impl<'a> IndexerBuilder<'a> {
segment_row_count = row_group_size;
}

// TODO(zhongzc): currently we only index tag columns, need to support field columns.
let indexed_column_ids = self
.metadata
.primary_key
.iter()
.filter(|id| {
!self
.index_options
.inverted_index
.ignore_column_ids
.contains(id)
})
.copied()
.collect::<HashSet<_>>();

let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
indexed_column_ids,
self.metadata.inverted_indexed_column_ids(
self.index_options.inverted_index.ignore_column_ids.iter(),
),
);

Some(indexer)
Expand Down
84 changes: 63 additions & 21 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use query::sql::{
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnOption, TableConstraint};
use sql::ast::ColumnOption;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{
Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TIME_INDEX,
Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
};
use sql::statements::{
column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
Expand Down Expand Up @@ -130,8 +130,14 @@ pub(crate) async fn create_external_expr(
// expanded form
let time_index = find_time_index(&create.constraints)?;
let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let column_schemas =
columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
let inverted_index_cols = find_inverted_index_cols(&create.columns, &create.constraints)?;
let column_schemas = columns_to_column_schemas(
&create.columns,
&time_index,
&inverted_index_cols,
&primary_keys,
Some(&query_ctx.timezone()),
)?;
(time_index, primary_keys, column_schemas)
} else {
// inferred form
Expand Down Expand Up @@ -186,6 +192,7 @@ pub fn create_to_expr(
);

let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let inverted_index_cols = find_inverted_index_cols(&create.columns, &create.constraints)?;

let expr = CreateTableExpr {
catalog_name,
Expand All @@ -196,6 +203,7 @@ pub fn create_to_expr(
&create.columns,
&time_index,
&primary_keys,
&inverted_index_cols,
Some(&query_ctx.timezone()),
)?,
time_index,
Expand Down Expand Up @@ -304,9 +312,9 @@ fn find_primary_keys(
let constraints_pk = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::PrimaryKey {
name: _, columns, ..
} => Some(columns.iter().map(|ident| ident.value.clone())),
TableConstraint::PrimaryKey { columns, .. } => {
Some(columns.iter().map(|ident| ident.value.clone()))
}
_ => None,
})
.flatten()
Expand All @@ -329,20 +337,9 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
let time_index = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: Some(name),
columns,
..
} => {
if name.value == TIME_INDEX {
Some(columns.iter().map(|ident| &ident.value))
} else {
None
}
}
TableConstraint::TimeIndex { column, .. } => Some(&column.value),
_ => None,
})
.flatten()
.collect::<Vec<&String>>();
ensure!(
time_index.len() == 1,
Expand All @@ -353,25 +350,70 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
Ok(time_index.first().unwrap().to_string())
}

/// Finds the inverted index columns from the constraints. If no inverted index
/// columns are provided in the constraints, return `None`.
fn find_inverted_index_cols(
columns: &[SqlColumn],
constraints: &[TableConstraint],
) -> Result<Option<Vec<String>>> {
let inverted_index_cols = constraints.iter().find_map(|constraint| {
if let TableConstraint::InvertedIndex { columns } = constraint {
Some(
columns
.iter()
.map(|ident| ident.value.clone())
.collect::<Vec<_>>(),
)
} else {
None
}
});

let Some(inverted_index_cols) = inverted_index_cols else {
return Ok(None);
};

for col in &inverted_index_cols {
if !columns.iter().any(|c| c.name().value == *col) {
return InvalidSqlSnafu {
err_msg: format!("inverted index column `{}` not found in column list", col),
}
.fail();
}
}

Ok(Some(inverted_index_cols))
}

fn columns_to_expr(
column_defs: &[SqlColumn],
time_index: &str,
primary_keys: &[String],
invereted_index_cols: &Option<Vec<String>>,
timezone: Option<&Timezone>,
) -> Result<Vec<api::v1::ColumnDef>> {
let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
let column_schemas = columns_to_column_schemas(
column_defs,
time_index,
invereted_index_cols,
primary_keys,
timezone,
)?;
column_schemas_to_defs(column_schemas, primary_keys)
}

fn columns_to_column_schemas(
columns: &[SqlColumn],
time_index: &str,
invereted_index_cols: &Option<Vec<String>>,
primary_keys: &[String],
timezone: Option<&Timezone>,
) -> Result<Vec<ColumnSchema>> {
columns
.iter()
.map(|c| {
column_to_schema(c, c.name().to_string() == time_index, timezone).context(ParseSqlSnafu)
column_to_schema(c, time_index, invereted_index_cols, primary_keys, timezone)
.context(ParseSqlSnafu)
})
.collect::<Result<Vec<ColumnSchema>>>()
}
Expand Down
Loading