Skip to content

Commit

Permalink
feat: update physical table schema on alter logical tables (#3585)
Browse files Browse the repository at this point in the history
* feat: update physical table schema on alter

* feat: alter logical table in sql path

* feat: invalidate cache step1

* feat: invalidate cache step2

* feat: invalidate cache step3

* feat: invalidate cache step4

* fix: failed ut

* fix: standalone cache invalidator

* feat: log the count of already finished

* feat: re-invalidate cache

* chore: by comment

* chore: Update src/common/meta/src/ddl/create_logical_tables.rs

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
fengjiachun and evenyag authored Mar 26, 2024
1 parent dd18d8c commit 58c7858
Show file tree
Hide file tree
Showing 20 changed files with 548 additions and 159 deletions.
12 changes: 7 additions & 5 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use catalog::kvbackend::CachedMetaKvBackendBuilder;
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand Down Expand Up @@ -248,11 +248,12 @@ impl StartCommand {
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

let catalog_manager =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone());

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
)),
Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -263,10 +264,11 @@ impl StartCommand {

let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
catalog_manager.clone(),
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(cached_meta_backend)
.with_cache_invalidator(catalog_manager.clone())
.with_plugin(plugins.clone())
.with_heartbeat_task(heartbeat_task)
.try_build()
Expand Down
26 changes: 19 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use std::sync::Arc;
use std::{fs, path};

use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
Expand Down Expand Up @@ -399,6 +400,9 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let catalog_manager =
KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator));

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
Expand Down Expand Up @@ -429,15 +433,22 @@ impl StartCommand {
table_metadata_manager,
procedure_manager.clone(),
datanode_manager.clone(),
catalog_manager.clone(),
table_meta_allocator,
)
.await?;

let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
let mut frontend = FrontendBuilder::new(
kv_backend,
catalog_manager.clone(),
datanode_manager,
ddl_task_executor,
)
.with_plugin(fe_plugins.clone())
.with_cache_invalidator(catalog_manager)
.try_build()
.await
.context(StartFrontendSnafu)?;

let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
Expand All @@ -459,13 +470,14 @@ impl StartCommand {
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
Arc::new(DummyCacheInvalidator),
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
mod physical_table_metadata;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand Down
116 changes: 86 additions & 30 deletions src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
mod check;
mod metadata;
mod region_request;
mod table_cache_keys;
mod update_metadata;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{info, warn};
use futures_util::future;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;

use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{Error, Result};
use crate::instruction::CacheIdent;
use crate::ddl::{physical_table_metadata, DdlContext};
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::{cache_invalidator, metrics, ClusterId};
Expand All @@ -60,8 +64,9 @@ impl AlterLogicalTablesProcedure {
tasks,
table_info_values: vec![],
physical_table_id,
physical_table_info: None,
physical_table_route: None,
cache_invalidate_keys: vec![],
physical_columns: vec![],
},
}
}
Expand All @@ -79,11 +84,24 @@ impl AlterLogicalTablesProcedure {
// Checks the physical table, must after [fill_table_info_values]
self.check_physical_table().await?;
// Fills the physical table info
self.fill_physical_table_route().await?;
// Filter the tasks
self.fill_physical_table_info().await?;
// Filter the finished tasks
let finished_tasks = self.check_finished_tasks()?;
if finished_tasks.iter().all(|x| *x) {
return Ok(Status::done());
let already_finished_count = finished_tasks
.iter()
.map(|x| if *x { 1 } else { 0 })
.sum::<usize>();
let apply_tasks_count = self.data.tasks.len();
if already_finished_count == apply_tasks_count {
info!("All the alter tasks are finished, will skip the procedure.");
// Re-invalidate the table cache
self.data.state = AlterTablesState::InvalidateTableCache;
return Ok(Status::executing(true));
} else if already_finished_count > 0 {
info!(
"There are {} alter tasks, {} of them were already finished.",
apply_tasks_count, already_finished_count
);
}
self.filter_task(&finished_tasks)?;

Expand Down Expand Up @@ -116,17 +134,61 @@ impl AlterLogicalTablesProcedure {
}
}

future::join_all(alter_region_tasks)
// Collects responses from all the alter region tasks.
let phy_raw_schemas = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

self.data.state = AlterTablesState::UpdateMetadata;
if phy_raw_schemas.is_empty() {
self.data.state = AlterTablesState::UpdateMetadata;
return Ok(Status::executing(true));
}

// Verify all the physical schemas are the same
// Safety: previous check ensures this vec is not empty
let first = phy_raw_schemas.first().unwrap();
ensure!(
phy_raw_schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "The physical schemas from datanodes are not the same."
}
);

// Decodes the physical raw schemas
if let Some(phy_raw_schema) = first {
self.data.physical_columns =
ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
} else {
warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
}

pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
if !self.data.physical_columns.is_empty() {
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();

// Generates new table info
let old_raw_table_info = physical_table_info.table_info.clone();
let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
old_raw_table_info,
&self.data.physical_columns,
);

// Updates physical table's metadata
self.context
.table_metadata_manager
.update_table_info(
DeserializedValueWithBytes::from_inner(physical_table_info.clone()),
new_raw_table_info,
)
.await?;
}

let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
Expand All @@ -151,15 +213,12 @@ impl AlterLogicalTablesProcedure {
}

pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = self
.data
.cache_invalidate_keys
.drain(..)
.map(CacheIdent::TableId)
.collect::<Vec<_>>();
let ctx = cache_invalidator::Context::default();
let to_invalidate = self.build_table_cache_keys_to_invalidate();

self.context
.cache_invalidator
.invalidate(&cache_invalidator::Context::default(), to_invalidate)
.invalidate(&ctx, to_invalidate)
.await?;
Ok(Status::done())
}
Expand Down Expand Up @@ -212,17 +271,13 @@ impl Procedure for AlterLogicalTablesProcedure {
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.physical_table_id).into());
lock_key.extend(
self.data
.table_info_values
.iter()
.map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
);

for task in &self.data.tasks {
lock_key.push(
TableNameLock::new(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
.into(),
);
}
LockKey::new(lock_key)
}
}
Expand All @@ -237,8 +292,9 @@ pub struct AlterTablesData {
table_info_values: Vec<TableInfoValue>,
/// Physical table info
physical_table_id: TableId,
physical_table_info: Option<TableInfoValue>,
physical_table_route: Option<PhysicalTableRouteValue>,
cache_invalidate_keys: Vec<TableId>,
physical_columns: Vec<ColumnMetadata>,
}

#[derive(Debug, Serialize, Deserialize, AsRefStr)]
Expand Down
45 changes: 33 additions & 12 deletions src/common/meta/src/ddl/alter_logical_tables/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ use snafu::OptionExt;
use table::metadata::TableId;

use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{Result, TableInfoNotFoundSnafu, TableNotFoundSnafu};
use crate::error::{
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu,
TableRouteNotFoundSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::AlterTableTask;

impl AlterLogicalTablesProcedure {
Expand All @@ -46,21 +50,38 @@ impl AlterLogicalTablesProcedure {
}
})
.collect();
self.data.cache_invalidate_keys = self
.data
.table_info_values
.iter()
.map(|table| table.table_info.ident.table_id)
.collect();

Ok(())
}

pub(crate) async fn fill_physical_table_route(&mut self) -> Result<()> {
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
let (_, physical_table_route) = table_route_manager
.get_physical_table_route(self.data.physical_table_id)
pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
let (physical_table_info, physical_table_route) = self
.context
.table_metadata_manager
.get_full_table_info(self.data.physical_table_id)
.await?;

let physical_table_info = physical_table_info
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id - {}", self.data.physical_table_id),
})?
.into_inner();
let physical_table_route = physical_table_route
.context(TableRouteNotFoundSnafu {
table_id: self.data.physical_table_id,
})?
.into_inner();

self.data.physical_table_info = Some(physical_table_info);
let TableRouteValue::Physical(physical_table_route) = physical_table_route else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: format!(
"expected a physical table but got a logical table: {:?}",
self.data.physical_table_id
),
}
.fail();
};
self.data.physical_table_route = Some(physical_table_route);

Ok(())
Expand All @@ -87,7 +108,7 @@ impl AlterLogicalTablesProcedure {
table_info_map
.remove(table_id)
.with_context(|| TableInfoNotFoundSnafu {
table_name: extract_table_name(task),
table: extract_table_name(task),
})?;
table_info_values.push(table_info_value);
}
Expand Down
Loading

0 comments on commit 58c7858

Please sign in to comment.