Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: alter database ttl #5035

Merged
merged 20 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ etcd-client = "0.13"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0b90ddc7eb2e99ce15d1d62c5d41f76a139c5c28" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8a88448eb5b015fbf12e67c72c620c9d314f2375" }
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
hex = "0.4"
humantime = "2.1"
humantime-serde = "1.1"
Expand Down
3 changes: 2 additions & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,14 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
match request.expr {
Some(Expr::CreateDatabase(_)) => "ddl.create_database",
Some(Expr::CreateTable(_)) => "ddl.create_table",
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::AlterTable(_)) => "ddl.alter_table",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
None => "ddl.empty",
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl InformationSchemaSchemataBuilder {
.context(TableMetadataManagerSnafu)?
// information_schema is not available from this
// table_metadata_manager and we return None
.map(|schema_opts| format!("{schema_opts}"))
.map(|schema_opts| format!("{}", schema_opts.into_inner()))
} else {
None
};
Expand Down
6 changes: 3 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
AlterTableExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests,
QueryRequest, RequestHeader,
};
use arrow_flight::Ticket;
Expand Down Expand Up @@ -211,9 +211,9 @@ impl Database {
.await
}

pub async fn alter(&self, expr: AlterExpr) -> Result<Output> {
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
self.do_get(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
expr: Some(DdlExpr::AlterTable(expr)),
}))
.await
}
Expand Down
18 changes: 9 additions & 9 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

use api::helper::ColumnDataTypeWrapper;
use api::v1::add_column_location::LocationType;
use api::v1::alter_expr::Kind;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::as_fulltext_option;
use api::v1::{
column_def, AddColumnLocation as Location, AlterExpr, Analyzer, CreateTableExpr, DropColumns,
ModifyColumnTypes, RenameTable, SemanticType,
column_def, AddColumnLocation as Location, AlterTableExpr, Analyzer, CreateTableExpr,
DropColumns, ModifyColumnTypes, RenameTable, SemanticType,
};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, FulltextOptions, RawSchema};
Expand All @@ -36,8 +36,8 @@ use crate::error::{
const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;

/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
/// Convert an [`AlterTableExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(table_id: TableId, expr: AlterTableExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
Expand Down Expand Up @@ -203,7 +203,7 @@ mod tests {

#[test]
fn test_alter_expr_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -244,7 +244,7 @@ mod tests {

#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: String::default(),
schema_name: String::default(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -321,7 +321,7 @@ mod tests {

#[test]
fn test_modify_column_type_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {

#[test]
fn test_drop_column_expr() {
let expr = AlterExpr {
let expr = AlterTableExpr {
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_name: "monitor".to_string(),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::{ClusterId, DatanodeId};

pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
Expand Down
248 changes: 248 additions & 0 deletions src/common/meta/src/ddl/alter_database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;

use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{Result, SchemaNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::rpc::ddl::UnsetDatabaseOption::{self};
use crate::rpc::ddl::{AlterDatabaseKind, AlterDatabaseTask, SetDatabaseOption};
use crate::ClusterId;

pub struct AlterDatabaseProcedure {
pub context: DdlContext,
pub data: AlterDatabaseData,
}

fn build_new_schema_value(
mut value: SchemaNameValue,
alter_kind: &AlterDatabaseKind,
) -> Result<SchemaNameValue> {
match alter_kind {
AlterDatabaseKind::SetDatabaseOptions(options) => {
for option in options.0.iter() {
match option {
SetDatabaseOption::Ttl(ttl) => {
if ttl.is_zero() {
value.ttl = None;
} else {
value.ttl = Some(*ttl);
}
}
}
}
}
AlterDatabaseKind::UnsetDatabaseOptions(keys) => {
for key in keys.0.iter() {
match key {
UnsetDatabaseOption::Ttl => value.ttl = None,
}
}
}
}
Ok(value)
}

impl AlterDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterDatabase";

pub fn new(
cluster_id: ClusterId,
task: AlterDatabaseTask,
context: DdlContext,
) -> Result<Self> {
Ok(Self {
context,
data: AlterDatabaseData::new(task, cluster_id)?,
})
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

Ok(Self { context, data })
}

pub async fn on_prepare(&mut self) -> Result<Status> {
let value = self
.context
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(self.data.catalog(), self.data.schema()))
.await?;

ensure!(
value.is_some(),
SchemaNotFoundSnafu {
table_schema: self.data.schema(),
}
);

self.data.schema_value = value;
self.data.state = AlterDatabaseState::UpdateMetadata;

Ok(Status::executing(true))
}

pub async fn on_update_metadata(&mut self) -> Result<Status> {
let schema_name = SchemaNameKey::new(self.data.catalog(), self.data.schema());

// Safety: schema_value is not None.
let current_schema_value = self.data.schema_value.as_ref().unwrap();

let new_schema_value = build_new_schema_value(
current_schema_value.get_inner_ref().clone(),
&self.data.kind,
)?;

self.context
.table_metadata_manager
.schema_manager()
.update(schema_name, current_schema_value, &new_schema_value)
.await?;

info!("Updated database metadata for schema {schema_name}");
self.data.state = AlterDatabaseState::InvalidateSchemaCache;
Ok(Status::executing(true))
}

pub async fn on_invalidate_schema_cache(&mut self) -> Result<Status> {
let cache_invalidator = &self.context.cache_invalidator;
cache_invalidator
.invalidate(
&Context::default(),
&[CacheIdent::SchemaName(SchemaName {
catalog_name: self.data.catalog().to_string(),
schema_name: self.data.schema().to_string(),
})],
)
.await?;

Ok(Status::done())
}
}

#[async_trait]
impl Procedure for AlterDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
AlterDatabaseState::Prepare => self.on_prepare().await,
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let catalog = self.data.catalog();
let schema = self.data.schema();

let lock_key = vec![
CatalogLock::Read(catalog).into(),
SchemaLock::write(catalog, schema).into(),
];

LockKey::new(lock_key)
}
}

#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum AlterDatabaseState {
Prepare,
UpdateMetadata,
InvalidateSchemaCache,
}

/// The data of alter database procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterDatabaseData {
cluster_id: ClusterId,
state: AlterDatabaseState,
kind: AlterDatabaseKind,
catalog_name: String,
schema_name: String,
schema_value: Option<DeserializedValueWithBytes<SchemaNameValue>>,
}

impl AlterDatabaseData {
pub fn new(task: AlterDatabaseTask, cluster_id: ClusterId) -> Result<Self> {
Ok(Self {
cluster_id,
state: AlterDatabaseState::Prepare,
kind: AlterDatabaseKind::try_from(task.alter_expr.kind.unwrap())?,
catalog_name: task.alter_expr.catalog_name,
schema_name: task.alter_expr.schema_name,
schema_value: None,
})
}

pub fn catalog(&self) -> &str {
&self.catalog_name
}

pub fn schema(&self) -> &str {
&self.schema_name
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use crate::ddl::alter_database::build_new_schema_value;
use crate::key::schema_name::SchemaNameValue;
use crate::rpc::ddl::{
AlterDatabaseKind, SetDatabaseOption, SetDatabaseOptions, UnsetDatabaseOption,
UnsetDatabaseOptions,
};

#[test]
fn test_build_new_schema_value() {
let set_ttl = AlterDatabaseKind::SetDatabaseOptions(SetDatabaseOptions(vec![
SetDatabaseOption::Ttl(Duration::from_secs(10)),
]));
let current_schema_value = SchemaNameValue::default();
let new_schema_value =
build_new_schema_value(current_schema_value.clone(), &set_ttl).unwrap();
assert_eq!(new_schema_value.ttl, Some(Duration::from_secs(10)));

let unset_ttl_alter_kind =
AlterDatabaseKind::UnsetDatabaseOptions(UnsetDatabaseOptions(vec![
UnsetDatabaseOption::Ttl,
]));
let new_schema_value =
build_new_schema_value(current_schema_value, &unset_ttl_alter_kind).unwrap();
assert_eq!(new_schema_value.ttl, None);
}
}
Loading
Loading