Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(index): support SQL to specify inverted index columns #4929

Merged
merged 12 commits into from
Nov 11, 2024
40 changes: 30 additions & 10 deletions src/api/src/v1/column_def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;

use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextOptions, COMMENT_KEY, FULLTEXT_KEY,
INVERTED_INDEX_KEY,
};
use snafu::ResultExt;

Expand All @@ -25,6 +26,8 @@ use crate::v1::{ColumnDef, ColumnOptions, SemanticType};

/// Key used to store fulltext options in gRPC column options.
const FULLTEXT_GRPC_KEY: &str = "fulltext";
/// Key used to store inverted index options in gRPC column options.
const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
zhongzc marked this conversation as resolved.
Show resolved Hide resolved

/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
Expand All @@ -49,10 +52,13 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if !column_def.comment.is_empty() {
metadata.insert(COMMENT_KEY.to_string(), column_def.comment.clone());
}
if let Some(options) = column_def.options.as_ref()
&& let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY)
{
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.to_string());
if let Some(options) = column_def.options.as_ref() {
if let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY) {
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.clone());
}
if let Some(inverted_index) = options.options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), inverted_index.clone());
}
}

ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable)
Expand All @@ -70,7 +76,12 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
if let Some(fulltext) = column_schema.metadata().get(FULLTEXT_KEY) {
options
.options
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.to_string());
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.clone());
}
if let Some(inverted_index) = column_schema.metadata().get(INVERTED_INDEX_KEY) {
options
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), inverted_index.clone());
}

(!options.options.is_empty()).then_some(options)
Expand Down Expand Up @@ -115,10 +126,13 @@ mod tests {
comment: "test_comment".to_string(),
datatype_extension: None,
options: Some(ColumnOptions {
options: HashMap::from([(
FULLTEXT_GRPC_KEY.to_string(),
"{\"enable\":true}".to_string(),
)]),
options: HashMap::from([
(
FULLTEXT_GRPC_KEY.to_string(),
"{\"enable\":true}".to_string(),
),
(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string()),
]),
}),
};

Expand All @@ -139,6 +153,7 @@ mod tests {
..Default::default()
}
);
assert!(schema.is_inverted_indexed());
}

#[test]
Expand All @@ -153,12 +168,17 @@ mod tests {
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
})
.unwrap();
.unwrap()
.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
);
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
"true"
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R
use crate::prelude::DataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY,
TIME_INDEX_KEY,
INVERTED_INDEX_KEY, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
Expand Down
21 changes: 20 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub const COMMENT_KEY: &str = "greptime:storage:comment";
const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
/// Key used to store fulltext options in arrow field's metadata.
pub const FULLTEXT_KEY: &str = "greptime:fulltext";

/// Key used to store whether the column has inverted index in arrow field's metadata.
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Schema of a column, used as an immutable struct.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ColumnSchema {
Expand Down Expand Up @@ -134,6 +135,24 @@ impl ColumnSchema {
self
}

pub fn set_inverted_index(mut self, value: bool) -> Self {
let _ = self
.metadata
.insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
self
}

pub fn is_inverted_indexed(&self) -> bool {
self.metadata
.get(INVERTED_INDEX_KEY)
.map(|v| v == "true")
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or(false)
}

pub fn has_inverted_index_key(&self) -> bool {
self.metadata.contains_key(INVERTED_INDEX_KEY)
}

/// Set default constraint.
///
/// If a default constraint exists for the column, this method will
Expand Down
59 changes: 41 additions & 18 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion_expr::utils::expr_to_columns;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use store_api::storage::{ColumnId, ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -393,29 +393,13 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();

// TODO(zhongzc): currently we only index tag columns, need to support field columns.
let ignore_column_ids = &self
.version
.options
.index_options
.inverted_index
.ignore_column_ids;
let indexed_column_ids = self
.version
.metadata
.primary_key
.iter()
.filter(|id| !ignore_column_ids.contains(id))
.copied()
.collect::<HashSet<_>>();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
index_cache,
self.version.metadata.as_ref(),
indexed_column_ids,
self.build_inverted_indexed_column_ids(),
self.access_layer.puffin_manager_factory().clone(),
)
.build(&self.request.filters)
Expand Down Expand Up @@ -443,6 +427,45 @@ impl ScanRegion {
.flatten()
.map(Arc::new)
}

fn build_inverted_indexed_column_ids(&self) -> HashSet<ColumnId> {
// Default to use primary key columns as inverted index columns.
let pk_as_inverted_index = !self
.version
.metadata
.column_metadatas
.iter()
.any(|c| c.column_schema.has_inverted_index_key());
zhongzc marked this conversation as resolved.
Show resolved Hide resolved

let mut inverted_index: HashSet<_> = if pk_as_inverted_index {
self.version
.metadata
.primary_key_columns()
.map(|c| c.column_id)
.collect()
} else {
self.version
.metadata
.column_metadatas
.iter()
.filter(|column| column.column_schema.is_inverted_indexed())
.map(|column| column.column_id)
.collect()
};

let ignore_column_ids = &self
.version
.options
.index_options
.inverted_index
.ignore_column_ids;

for ignored in ignore_column_ids {
inverted_index.remove(ignored);
}

inverted_index
}
}

/// Config for parallel scan.
Expand Down
46 changes: 30 additions & 16 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,28 +213,13 @@ impl<'a> IndexerBuilder<'a> {
segment_row_count = row_group_size;
}

// TODO(zhongzc): currently we only index tag columns, need to support field columns.
let indexed_column_ids = self
.metadata
.primary_key
.iter()
.filter(|id| {
!self
.index_options
.inverted_index
.ignore_column_ids
.contains(id)
})
.copied()
.collect::<HashSet<_>>();

let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
indexed_column_ids,
self.inverted_indexed_column_ids(),
);

Some(indexer)
Expand Down Expand Up @@ -292,6 +277,35 @@ impl<'a> IndexerBuilder<'a> {

None
}

fn inverted_indexed_column_ids(&self) -> HashSet<ColumnId> {
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
// Default to use primary key columns as inverted index columns.
let pk_as_inverted_index = !self
.metadata
.column_metadatas
.iter()
.any(|c| c.column_schema.has_inverted_index_key());

let mut inverted_index: HashSet<_> = if pk_as_inverted_index {
self.metadata
.primary_key_columns()
.map(|c| c.column_id)
.collect()
} else {
self.metadata
.column_metadatas
.iter()
.filter(|column| column.column_schema.is_inverted_indexed())
.map(|column| column.column_id)
.collect()
};

for ignored in &self.index_options.inverted_index.ignore_column_ids {
inverted_index.remove(ignored);
}

inverted_index
}
}

#[cfg(test)]
Expand Down
Loading