diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index a918e2ea3c56..2c5028b40908 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -19,7 +19,6 @@ use std::sync::{Arc, Weak}; use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context}; -use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::error::Result as MetaResult; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; @@ -55,7 +54,6 @@ pub struct KvBackendCatalogManager { cache_invalidator: CacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, - datanode_manager: DatanodeManagerRef, /// A sub-CatalogManager that handles system tables system_catalog: SystemCatalog, } @@ -76,16 +74,11 @@ impl CacheInvalidator for KvBackendCatalogManager { } impl KvBackendCatalogManager { - pub fn new( - backend: KvBackendRef, - cache_invalidator: CacheInvalidatorRef, - datanode_manager: DatanodeManagerRef, - ) -> Arc { + pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc { Arc::new_cyclic(|me| Self { partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), cache_invalidator, - datanode_manager, system_catalog: SystemCatalog { catalog_manager: me.clone(), }, @@ -99,10 +92,6 @@ impl KvBackendCatalogManager { pub fn table_metadata_manager_ref(&self) -> &TableMetadataManagerRef { &self.table_metadata_manager } - - pub fn datanode_manager(&self) -> DatanodeManagerRef { - self.datanode_manager.clone() - } } #[async_trait::async_trait] diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index e99a5403fb5e..ee15fd186051 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::Instant; use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; -use client::client_manager::DatanodeClients; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::ext::ErrorExt; @@ -250,13 +249,8 @@ async fn create_query_engine(meta_addr: &str) -> Result { let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let datanode_clients = Arc::new(DatanodeClients::default()); - - let catalog_list = KvBackendCatalogManager::new( - cached_meta_backend.clone(), - cached_meta_backend.clone(), - datanode_clients, - ); + let catalog_list = + KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend); let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 30eeff465c13..ae84282b1c01 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -37,6 +37,12 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to init DDL manager"))] + InitDdlManager { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to start procedure manager"))] StartProcedureManager { location: Location, @@ -225,13 +231,6 @@ pub enum Error { #[snafu(source)] error: std::io::Error, }, - - #[snafu(display("Failed to parse address {}", addr))] - ParseAddr { - addr: String, - #[snafu(source)] - error: std::net::AddrParseError, - }, } pub type Result = std::result::Result; @@ -247,9 +246,11 @@ impl ErrorExt for Error { Error::ShutdownMetaServer { source, .. } => source.status_code(), Error::BuildMetaServer { source, .. } => source.status_code(), Error::UnsupportedSelectorType { source, .. } => source.status_code(), - Error::IterStream { source, .. } | Error::InitMetadata { source, .. } => { - source.status_code() - } + + Error::IterStream { source, .. } + | Error::InitMetadata { source, .. } + | Error::InitDdlManager { source, .. } => source.status_code(), + Error::ConnectServer { source, .. } => source.status_code(), Error::MissingConfig { .. } | Error::LoadLayeredConfig { .. } @@ -259,8 +260,7 @@ impl ErrorExt for Error { | Error::NotDataFromOutput { .. } | Error::CreateDir { .. } | Error::EmptyResult { .. } - | Error::InvalidDatabaseName { .. } - | Error::ParseAddr { .. } => StatusCode::InvalidArguments, + | Error::InvalidDatabaseName { .. } => StatusCode::InvalidArguments, Error::StartProcedureManager { source, .. } | Error::StopProcedureManager { source, .. } => source.status_code(), diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 6adfd4451062..3362785da822 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -12,18 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; +use catalog::kvbackend::CachedMetaKvBackend; use clap::Parser; +use client::client_manager::DatanodeClients; +use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_telemetry::logging; use frontend::frontend::FrontendOptions; +use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; +use frontend::heartbeat::HeartbeatTask; +use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::MetaClientOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; -use crate::error::{self, Result, StartFrontendSnafu}; +use crate::error::{self, MissingConfigSnafu, Result, StartFrontendSnafu}; use crate::options::{Options, TopLevelOptions}; pub struct Instance { @@ -196,10 +204,38 @@ impl StartCommand { logging::info!("Frontend start command: {:#?}", self); logging::info!("Frontend options: {:#?}", opts); - let mut instance = FeInstance::try_new_distributed(&opts, plugins.clone()) + let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu { + msg: "'meta_client'", + })?; + let meta_client = FeInstance::create_meta_client(meta_client_options) .await .context(StartFrontendSnafu)?; + let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); + + let executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler), + Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())), + ]); + + let heartbeat_task = HeartbeatTask::new( + meta_client.clone(), + opts.heartbeat.clone(), + Arc::new(executor), + ); + + let mut instance = FrontendBuilder::new( + meta_backend.clone(), + Arc::new(DatanodeClients::default()), + meta_client, + ) + .with_cache_invalidator(meta_backend) + .with_plugin(plugins) + .with_heartbeat_task(heartbeat_task) + .try_build() + .await + .context(StartFrontendSnafu)?; + instance .build_servers(opts) .await diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 1de1e60099d7..78b99bc0ac53 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -173,7 +173,12 @@ impl StartCommand { logging::info!("MetaSrv start command: {:#?}", self); logging::info!("MetaSrv options: {:#?}", opts); - let instance = MetaSrvInstance::new(opts, plugins) + let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None) + .await + .context(error::BuildMetaServerSnafu)?; + let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?; + + let instance = MetaSrvInstance::new(opts, plugins, metasrv) .await .context(error::BuildMetaServerSnafu)?; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8e7a56855125..30cb33234866 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -15,21 +15,23 @@ use std::sync::Arc; use std::{fs, path}; -use catalog::kvbackend::KvBackendCatalogManager; -use catalog::CatalogManagerRef; use clap::Parser; -use common_base::Plugins; use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; -use common_meta::cache_invalidator::DummyKvCacheInvalidator; +use common_meta::cache_invalidator::DummyCacheInvalidator; +use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::ddl_manager::DdlManager; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig}; use datanode::datanode::{Datanode, DatanodeBuilder}; -use datanode::region_server::RegionServer; use file_engine::config::EngineConfig as FileEngineConfig; use frontend::frontend::FrontendOptions; +use frontend::instance::builder::FrontendBuilder; +use frontend::instance::standalone::StandaloneTableMetadataCreator; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, @@ -42,9 +44,9 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu, - ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, - StopProcedureManagerSnafu, + CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result, + ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::options::{MixOptions, Options, TopLevelOptions}; @@ -156,6 +158,7 @@ impl StandaloneOptions { wal: self.wal, storage: self.storage, region_engine: self.region_engine, + rpc_addr: self.grpc.addr, ..Default::default() } } @@ -347,36 +350,25 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone()) - .with_kv_backend(kv_backend.clone()) - .build() - .await - .context(StartDatanodeSnafu)?; + let builder = + DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); + let datanode = builder.build().await.context(StartDatanodeSnafu)?; - let region_server = datanode.region_server(); + let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); - let catalog_manager = KvBackendCatalogManager::new( + let ddl_task_executor = Self::create_ddl_task_executor( kv_backend.clone(), - Arc::new(DummyKvCacheInvalidator), - Arc::new(StandaloneDatanodeManager(region_server.clone())), - ); - - catalog_manager - .table_metadata_manager_ref() - .init() - .await - .context(InitMetadataSnafu)?; - - // TODO: build frontend instance like in distributed mode - let mut frontend = build_frontend( - fe_plugins, - kv_backend, procedure_manager.clone(), - catalog_manager, - region_server, + datanode_manager.clone(), ) .await?; + let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) + .with_plugin(fe_plugins) + .try_build() + .await + .context(StartFrontendSnafu)?; + frontend .build_servers(opts) .await @@ -388,26 +380,41 @@ impl StartCommand { procedure_manager, }) } -} -/// Build frontend instance in standalone mode -async fn build_frontend( - plugins: Plugins, - kv_backend: KvBackendRef, - procedure_manager: ProcedureManagerRef, - catalog_manager: CatalogManagerRef, - region_server: RegionServer, -) -> Result { - let frontend_instance = FeInstance::try_new_standalone( - kv_backend, - procedure_manager, - catalog_manager, - plugins, - region_server, - ) - .await - .context(StartFrontendSnafu)?; - Ok(frontend_instance) + async fn create_ddl_task_executor( + kv_backend: KvBackendRef, + procedure_manager: ProcedureManagerRef, + datanode_manager: DatanodeManagerRef, + ) -> Result { + let table_metadata_manager = + Self::create_table_metadata_manager(kv_backend.clone()).await?; + + let ddl_task_executor: DdlTaskExecutorRef = Arc::new( + DdlManager::try_new( + procedure_manager, + datanode_manager, + Arc::new(DummyCacheInvalidator), + table_metadata_manager, + Arc::new(StandaloneTableMetadataCreator::new(kv_backend)), + ) + .context(InitDdlManagerSnafu)?, + ); + + Ok(ddl_task_executor) + } + + async fn create_table_metadata_manager( + kv_backend: KvBackendRef, + ) -> Result { + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend)); + + table_metadata_manager + .init() + .await + .context(InitMetadataSnafu)?; + + Ok(table_metadata_manager) + } } #[cfg(test)] diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 75a5e66ce9b7..8eacd3c1f90e 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -182,9 +182,6 @@ pub enum Error { source: servers::error::Error, }, - #[snafu(display("Missing meta_client_options section in config"))] - MissingMetasrvOpts { location: Location }, - #[snafu(display("Failed to find leaders when altering table, table: {}", table))] LeaderNotFound { table: String, location: Location }, @@ -299,7 +296,6 @@ impl ErrorExt for Error { | Error::IllegalPrimaryKeysDef { .. } | Error::SchemaExists { .. } | Error::ColumnNotFound { .. } - | Error::MissingMetasrvOpts { .. } | Error::UnsupportedFormat { .. } | Error::IllegalAuthConfig { .. } | Error::EmptyData { .. } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ed470d900c34..8d17db6de81e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod builder; mod grpc; mod influxdb; mod opentsdb; @@ -21,24 +22,16 @@ mod region_query; mod script; pub mod standalone; -use std::collections::HashMap; use std::sync::Arc; use api::v1::meta::Role; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; -use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; use catalog::CatalogManagerRef; -use client::client_manager::DatanodeClients; use common_base::Plugins; use common_config::KvBackendConfig; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::cache_invalidator::DummyCacheInvalidator; -use common_meta::ddl_manager::DdlManager; -use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; -use common_meta::heartbeat::handler::HandlerGroupExecutor; -use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; @@ -47,19 +40,18 @@ use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::error; use common_telemetry::logging::info; -use datanode::region_server::RegionServer; use log_store::raft_engine::RaftEngineBackend; use meta_client::client::{MetaClient, MetaClientBuilder}; -use operator::delete::{Deleter, DeleterRef}; -use operator::insert::{Inserter, InserterRef}; +use meta_client::MetaClientOptions; +use operator::delete::DeleterRef; +use operator::insert::InserterRef; use operator::statement::StatementExecutor; -use operator::table::{table_idents_to_full_name, TableMutationOperator}; -use partition::manager::PartitionRuleManager; +use operator::table::table_idents_to_full_name; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::DescribeResult; -use query::{QueryEngineFactory, QueryEngineRef}; +use query::QueryEngineRef; use raft_engine::{Config, ReadableSize, RecoveryMode}; use servers::error as server_error; use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu}; @@ -83,15 +75,11 @@ use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; -use self::region_query::FrontendRegionQueryHandler; -use self::standalone::StandaloneTableMetadataCreator; use crate::error::{ - self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, - ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, - TableOperationSnafu, + self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu, + PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, TableOperationSnafu, }; use crate::frontend::{FrontendOptions, TomlSerializable}; -use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; use crate::metrics; use crate::script::ScriptExecutor; @@ -131,99 +119,9 @@ pub struct Instance { } impl Instance { - pub async fn try_new_distributed(opts: &FrontendOptions, plugins: Plugins) -> Result { - let meta_client = Self::create_meta_client(opts).await?; - - let datanode_clients = Arc::new(DatanodeClients::default()); - - Self::try_new_distributed_with(meta_client, datanode_clients, plugins, opts).await - } - - pub async fn try_new_distributed_with( - meta_client: Arc, - datanode_clients: Arc, - plugins: Plugins, - opts: &FrontendOptions, - ) -> Result { - let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - - let catalog_manager = KvBackendCatalogManager::new( - meta_backend.clone(), - meta_backend.clone(), - datanode_clients.clone(), - ); - let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); - - let region_query_handler = FrontendRegionQueryHandler::arc( - partition_manager.clone(), - catalog_manager.datanode_manager().clone(), - ); - - let inserter = Arc::new(Inserter::new( - catalog_manager.clone(), - partition_manager.clone(), - datanode_clients.clone(), - )); - let deleter = Arc::new(Deleter::new( - catalog_manager.clone(), - partition_manager, - datanode_clients, - )); - - let table_mutation_handler = Arc::new(TableMutationOperator::new( - inserter.clone(), - deleter.clone(), - )); - - let query_engine = QueryEngineFactory::new_with_plugins( - catalog_manager.clone(), - Some(region_query_handler.clone()), - Some(table_mutation_handler), - true, - plugins.clone(), - ) - .query_engine(); - - let statement_executor = Arc::new(StatementExecutor::new( - catalog_manager.clone(), - query_engine.clone(), - meta_client.clone(), - meta_backend.clone(), - catalog_manager.clone(), - inserter.clone(), - )); - - plugins.insert::(statement_executor.clone()); - - let script_executor = - Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); - - let handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(meta_backend)), - ]); - - let heartbeat_task = Some(HeartbeatTask::new( - meta_client.clone(), - opts.heartbeat.clone(), - Arc::new(handlers_executor), - )); - - Ok(Instance { - catalog_manager, - script_executor, - statement_executor, - query_engine, - plugins: plugins.clone(), - servers: Arc::new(HashMap::new()), - heartbeat_task, - inserter, - deleter, - }) - } - - async fn create_meta_client(opts: &FrontendOptions) -> Result> { - let meta_client_options = opts.meta_client.as_ref().context(MissingMetasrvOptsSnafu)?; + pub async fn create_meta_client( + meta_client_options: &MetaClientOptions, + ) -> Result> { info!( "Creating Frontend instance in distributed mode with Meta server addr {:?}", meta_client_options.metasrv_addrs @@ -285,82 +183,6 @@ impl Instance { Ok((kv_backend, procedure_manager)) } - pub async fn try_new_standalone( - kv_backend: KvBackendRef, - procedure_manager: ProcedureManagerRef, - catalog_manager: CatalogManagerRef, - plugins: Plugins, - region_server: RegionServer, - ) -> Result { - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); - let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); - - let region_query_handler = - FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone()); - - let inserter = Arc::new(Inserter::new( - catalog_manager.clone(), - partition_manager.clone(), - datanode_manager.clone(), - )); - let deleter = Arc::new(Deleter::new( - catalog_manager.clone(), - partition_manager, - datanode_manager.clone(), - )); - let table_mutation_handler = Arc::new(TableMutationOperator::new( - inserter.clone(), - deleter.clone(), - )); - - let query_engine = QueryEngineFactory::new_with_plugins( - catalog_manager.clone(), - Some(region_query_handler), - Some(table_mutation_handler), - true, - plugins.clone(), - ) - .query_engine(); - - let script_executor = - Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); - - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - - let cache_invalidator = Arc::new(DummyCacheInvalidator); - let ddl_executor = Arc::new( - DdlManager::try_new( - procedure_manager, - datanode_manager, - cache_invalidator.clone(), - table_metadata_manager.clone(), - Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), - ) - .context(error::InitDdlManagerSnafu)?, - ); - - let statement_executor = Arc::new(StatementExecutor::new( - catalog_manager.clone(), - query_engine.clone(), - ddl_executor, - kv_backend.clone(), - cache_invalidator, - inserter.clone(), - )); - - Ok(Instance { - catalog_manager: catalog_manager.clone(), - script_executor, - statement_executor, - query_engine, - plugins, - servers: Arc::new(HashMap::new()), - heartbeat_task: None, - inserter, - deleter, - }) - } - pub async fn build_servers( &mut self, opts: impl Into + TomlSerializable, @@ -400,10 +222,13 @@ impl FrontendInstance for Instance { self.script_executor.start(self)?; - futures::future::try_join_all(self.servers.values().map(start_server)) - .await - .context(error::StartServerSnafu) - .map(|_| ()) + futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move { + info!("Starting service: {name}"); + start_server(handler).await + })) + .await + .context(error::StartServerSnafu) + .map(|_| ()) } } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs new file mode 100644 index 000000000000..1a3147be8f19 --- /dev/null +++ b/src/frontend/src/instance/builder.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use catalog::kvbackend::KvBackendCatalogManager; +use common_base::Plugins; +use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; +use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::kv_backend::KvBackendRef; +use operator::delete::Deleter; +use operator::insert::Inserter; +use operator::statement::StatementExecutor; +use operator::table::TableMutationOperator; +use partition::manager::PartitionRuleManager; +use query::QueryEngineFactory; + +use crate::error::Result; +use crate::heartbeat::HeartbeatTask; +use crate::instance::region_query::FrontendRegionQueryHandler; +use crate::instance::{Instance, StatementExecutorRef}; +use crate::script::ScriptExecutor; + +pub struct FrontendBuilder { + kv_backend: KvBackendRef, + cache_invalidator: Option, + datanode_manager: DatanodeManagerRef, + plugins: Option, + ddl_task_executor: DdlTaskExecutorRef, + heartbeat_task: Option, +} + +impl FrontendBuilder { + pub fn new( + kv_backend: KvBackendRef, + datanode_manager: DatanodeManagerRef, + ddl_task_executor: DdlTaskExecutorRef, + ) -> Self { + Self { + kv_backend, + cache_invalidator: None, + datanode_manager, + plugins: None, + ddl_task_executor, + heartbeat_task: None, + } + } + + pub fn with_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self { + Self { + cache_invalidator: Some(cache_invalidator), + ..self + } + } + + pub fn with_plugin(self, plugins: Plugins) -> Self { + Self { + plugins: Some(plugins), + ..self + } + } + + pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self { + Self { + heartbeat_task: Some(heartbeat_task), + ..self + } + } + + pub async fn try_build(self) -> Result { + let kv_backend = self.kv_backend; + let datanode_manager = self.datanode_manager; + let plugins = self.plugins.unwrap_or_default(); + + let catalog_manager = KvBackendCatalogManager::new( + kv_backend.clone(), + self.cache_invalidator + .unwrap_or_else(|| Arc::new(DummyCacheInvalidator)), + ); + + let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); + + let region_query_handler = + FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone()); + + let inserter = Arc::new(Inserter::new( + catalog_manager.clone(), + partition_manager.clone(), + datanode_manager.clone(), + )); + let deleter = Arc::new(Deleter::new( + catalog_manager.clone(), + partition_manager, + datanode_manager.clone(), + )); + let table_mutation_handler = Arc::new(TableMutationOperator::new( + inserter.clone(), + deleter.clone(), + )); + + let query_engine = QueryEngineFactory::new_with_plugins( + catalog_manager.clone(), + Some(region_query_handler.clone()), + Some(table_mutation_handler), + true, + plugins.clone(), + ) + .query_engine(); + + let script_executor = + Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + + let statement_executor = Arc::new(StatementExecutor::new( + catalog_manager.clone(), + query_engine.clone(), + self.ddl_task_executor, + kv_backend, + catalog_manager.clone(), + inserter.clone(), + )); + + plugins.insert::(statement_executor.clone()); + + Ok(Instance { + catalog_manager, + script_executor, + statement_executor, + query_engine, + plugins, + servers: Arc::new(HashMap::new()), + heartbeat_task: self.heartbeat_task, + inserter, + deleter, + }) + } +} diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 29ce275b2cb8..9376dce00920 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -107,7 +107,7 @@ impl Datanode for RegionInvoker { } } -pub(crate) struct StandaloneTableMetadataCreator { +pub struct StandaloneTableMetadataCreator { table_id_sequence: SequenceRef, } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index dcf112e2e754..076d25c77598 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -22,7 +22,7 @@ use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; -use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use etcd_client::Client; use servers::configurator::ConfiguratorRef; @@ -60,8 +60,11 @@ pub struct MetaSrvInstance { } impl MetaSrvInstance { - pub async fn new(opts: MetaSrvOptions, plugins: Plugins) -> Result { - let meta_srv = build_meta_srv(&opts, plugins.clone()).await?; + pub async fn new( + opts: MetaSrvOptions, + plugins: Plugins, + meta_srv: MetaSrv, + ) -> Result { let http_srv = Arc::new( HttpServerBuilder::new(opts.http.clone()) .with_metrics_handler(MetricsHandler) @@ -161,28 +164,26 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(admin::make_admin_service(meta_srv)) } -pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result { - let (kv_backend, election, lock) = if opts.use_memory_store { - ( +pub async fn metasrv_builder( + opts: &MetaSrvOptions, + plugins: Plugins, + kv_backend: Option, +) -> Result { + let (kv_backend, election, lock) = match (kv_backend, opts.use_memory_store) { + (Some(kv_backend), _) => (kv_backend, None, Some(Arc::new(MemLock::default()) as _)), + (None, true) => ( Arc::new(MemoryKvBackend::new()) as _, None, Some(Arc::new(MemLock::default()) as _), - ) - } else { - let etcd_endpoints = opts - .store_addr - .split(',') - .map(|x| x.trim()) - .filter(|x| !x.is_empty()) - .collect::>(); - let etcd_client = Client::connect(&etcd_endpoints, None) - .await - .context(error::ConnectEtcdSnafu)?; - ( - EtcdStore::with_etcd_client(etcd_client.clone()), - Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?), - Some(EtcdLock::with_etcd_client(etcd_client)?), - ) + ), + (None, false) => { + let etcd_client = create_etcd_client(opts).await?; + ( + EtcdStore::with_etcd_client(etcd_client.clone()), + Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?), + Some(EtcdLock::with_etcd_client(etcd_client)?), + ) + } }; let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; @@ -192,14 +193,24 @@ pub async fn build_meta_srv(opts: &MetaSrvOptions, plugins: Plugins) -> Result Arc::new(LeaseBasedSelector) as SelectorRef, }; - MetaSrvBuilder::new() + Ok(MetaSrvBuilder::new() .options(opts.clone()) .kv_backend(kv_backend) .in_memory(in_memory) .selector(selector) .election(election) .lock(lock) - .plugins(plugins) - .build() + .plugins(plugins)) +} + +async fn create_etcd_client(opts: &MetaSrvOptions) -> Result { + let etcd_endpoints = opts + .store_addr + .split(',') + .map(|x| x.trim()) + .filter(|x| !x.is_empty()) + .collect::>(); + Client::connect(&etcd_endpoints, None) .await + .context(error::ConnectEtcdSnafu) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index e3d03da142fb..87f5936c32ae 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -19,12 +19,14 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; use common_grpc::channel_manager::ChannelConfig; +use common_meta::datanode_manager::DatanodeManagerRef; +use common_meta::ddl::TableMetadataAllocatorRef; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::distributed_time_constants; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; -use common_meta::sequence::{Sequence, SequenceRef}; +use common_meta::sequence::Sequence; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; @@ -70,8 +72,9 @@ pub struct MetaSrvBuilder { election: Option, meta_peer_client: Option, lock: Option, - datanode_clients: Option>, + datanode_manager: Option, plugins: Option, + table_metadata_allocator: Option, } impl MetaSrvBuilder { @@ -85,8 +88,9 @@ impl MetaSrvBuilder { election: None, options: None, lock: None, - datanode_clients: None, + datanode_manager: None, plugins: None, + table_metadata_allocator: None, } } @@ -130,8 +134,8 @@ impl MetaSrvBuilder { self } - pub fn datanode_clients(mut self, clients: Arc) -> Self { - self.datanode_clients = Some(clients); + pub fn datanode_manager(mut self, datanode_manager: DatanodeManagerRef) -> Self { + self.datanode_manager = Some(datanode_manager); self } @@ -140,6 +144,14 @@ impl MetaSrvBuilder { self } + pub fn table_metadata_allocator( + mut self, + table_metadata_allocator: TableMetadataAllocatorRef, + ) -> Self { + self.table_metadata_allocator = Some(table_metadata_allocator); + self + } + pub async fn build(self) -> Result { let started = Arc::new(AtomicBool::new(false)); @@ -152,8 +164,9 @@ impl MetaSrvBuilder { selector, handler_group, lock, - datanode_clients, + datanode_manager, plugins, + table_metadata_allocator, } = self; let options = options.unwrap_or_default(); @@ -189,14 +202,22 @@ impl MetaSrvBuilder { meta_peer_client: meta_peer_client.clone(), table_id: None, }; + + let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { + Arc::new(MetaSrvTableMetadataAllocator::new( + selector_ctx.clone(), + selector.clone(), + table_id_sequence.clone(), + )) + }); + let ddl_manager = build_ddl_manager( &options, - datanode_clients, + datanode_manager, &procedure_manager, &mailbox, &table_metadata_manager, - (&selector, &selector_ctx), - &table_id_sequence, + table_metadata_allocator, )?; let opening_region_keeper = Arc::new(OpeningRegionKeeper::default()); @@ -324,12 +345,11 @@ fn build_procedure_manager( fn build_ddl_manager( options: &MetaSrvOptions, - datanode_clients: Option>, + datanode_clients: Option, procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, - (selector, selector_ctx): (&SelectorRef, &SelectorContext), - table_id_sequence: &SequenceRef, + table_metadata_allocator: TableMetadataAllocatorRef, ) -> Result { let datanode_clients = datanode_clients.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -349,19 +369,13 @@ fn build_ddl_manager( }, )); - let table_meta_allocator = Arc::new(MetaSrvTableMetadataAllocator::new( - selector_ctx.clone(), - selector.clone(), - table_id_sequence.clone(), - )); - Ok(Arc::new( DdlManager::try_new( procedure_manager.clone(), datanode_clients, cache_invalidator, table_metadata_manager.clone(), - table_meta_allocator, + table_metadata_allocator, ) .context(error::InitDdlManagerSnafu)?, )) diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 4e633e672ddd..ebcca26301d3 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -70,7 +70,7 @@ pub async fn mock( }; let builder = match datanode_clients { - Some(clients) => builder.datanode_clients(clients), + Some(clients) => builder.datanode_manager(clients), None => builder, }; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index c1862bcba6ed..3de98634762e 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -18,11 +18,13 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Role; -use catalog::kvbackend::MetaKvBackend; +use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -33,13 +35,16 @@ use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; -use frontend::frontend::FrontendOptions; +use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; +use frontend::heartbeat::HeartbeatTask; +use frontend::instance::builder::FrontendBuilder; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; use servers::grpc::GrpcServer; +use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; use tonic::transport::Server; use tower::service_fn; @@ -252,18 +257,26 @@ impl GreptimeDbClusterBuilder { meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); - let frontend_opts = FrontendOptions::default(); + let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - Arc::new( - FeInstance::try_new_distributed_with( - meta_client, - datanode_clients, - Plugins::default(), - &frontend_opts, - ) + let handlers_executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler), + Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())), + ]); + + let heartbeat_task = HeartbeatTask::new( + meta_client.clone(), + HeartbeatOptions::default(), + Arc::new(handlers_executor), + ); + + let instance = FrontendBuilder::new(meta_backend, datanode_clients, meta_client) + .with_heartbeat_task(heartbeat_task) + .try_build() .await - .unwrap(), - ) + .unwrap(); + + Arc::new(instance) } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index f11fe91bd621..19610333e499 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,16 +14,19 @@ use std::sync::Arc; -use catalog::kvbackend::KvBackendCatalogManager; use cmd::options::MixOptions; use common_base::Plugins; use common_config::KvBackendConfig; -use common_meta::cache_invalidator::DummyKvCacheInvalidator; +use common_meta::cache_invalidator::DummyCacheInvalidator; +use common_meta::ddl_manager::DdlManager; +use common_meta::key::TableMetadataManager; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; +use frontend::instance::builder::FrontendBuilder; +use frontend::instance::standalone::StandaloneTableMetadataCreator; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -88,29 +91,28 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - let catalog_manager = KvBackendCatalogManager::new( - kv_backend.clone(), - Arc::new(DummyKvCacheInvalidator), - Arc::new(StandaloneDatanodeManager(datanode.region_server())), + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + table_metadata_manager.init().await.unwrap(); + + let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); + + let ddl_task_executor = Arc::new( + DdlManager::try_new( + procedure_manager.clone(), + datanode_manager.clone(), + Arc::new(DummyCacheInvalidator), + table_metadata_manager, + Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())), + ) + .unwrap(), ); - catalog_manager - .table_metadata_manager_ref() - .init() + let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) + .with_plugin(plugins) + .try_build() .await .unwrap(); - let instance = Instance::try_new_standalone( - kv_backend, - procedure_manager.clone(), - catalog_manager, - plugins, - datanode.region_server(), - ) - .await - .unwrap(); - - // Ensures all loaders are registered. procedure_manager.start().await.unwrap(); test_util::prepare_another_catalog_and_schema(&instance).await;