Skip to content

Commit

Permalink
refactor: make grpc service able to be added dynamically (#3160)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield authored Jan 15, 2024
1 parent ca4d690 commit 93f28c2
Show file tree
Hide file tree
Showing 16 changed files with 245 additions and 261 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license.workspace = true
[dependencies]
arrow.workspace = true
chrono.workspace = true
clap = { version = "4.0", features = ["derive"] }
clap.workspace = true
client.workspace = true
futures-util.workspace = true
indicatif = "0.17.1"
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-trait.workspace = true
auth.workspace = true
catalog.workspace = true
chrono.workspace = true
clap = { version = "4.4", features = ["derive"] }
clap.workspace = true
client.workspace = true
common-base.workspace = true
common-catalog.workspace = true
Expand Down
16 changes: 13 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_config::WalConfig;
use common_telemetry::{info, logging};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
Expand All @@ -38,6 +39,10 @@ impl Instance {
fn new(datanode: Datanode) -> Self {
Self { datanode }
}

pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
}

#[async_trait]
Expand Down Expand Up @@ -219,15 +224,20 @@ impl StartCommand {
client: Arc::new(meta_client.clone()),
});

let datanode = DatanodeBuilder::new(opts, plugins)
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.enable_region_server_service()
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;

let services = DatanodeServiceBuilder::new(&opts)
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

Ok(Instance::new(datanode))
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ lazy_static::lazy_static! {
pub trait App {
fn name(&self) -> &str;

/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
fn pre_start(&mut self) -> error::Result<()> {
Ok(())
}

async fn start(&mut self) -> error::Result<()>;

async fn stop(&self) -> error::Result<()>;
Expand Down
99 changes: 8 additions & 91 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Datanode implementation.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;

Expand Down Expand Up @@ -45,11 +44,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::GrpcServerConfig;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{start_server, ServerHandler, ServerHandlers};
use servers::server::{start_server, ServerHandlers};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
Expand All @@ -62,8 +57,8 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
Expand All @@ -75,8 +70,6 @@ use crate::region_server::{DummyTableProviderFactory, RegionServer};
use crate::store;

const OPEN_REGION_PARALLELISM: usize = 16;
const REGION_SERVER_SERVICE_NAME: &str = "REGION_SERVER_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";

/// Datanode service.
pub struct Datanode {
Expand Down Expand Up @@ -129,6 +122,10 @@ impl Datanode {
}
}

pub fn setup_services(&mut self, services: ServerHandlers) {
self.services = services;
}

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
let _ = future::try_join_all(self.services.values().map(start_server))
Expand Down Expand Up @@ -173,8 +170,6 @@ pub struct DatanodeBuilder {
plugins: Plugins,
meta_client: Option<MetaClient>,
kv_backend: Option<KvBackendRef>,
enable_region_server_service: bool,
enable_http_service: bool,
}

impl DatanodeBuilder {
Expand All @@ -186,8 +181,6 @@ impl DatanodeBuilder {
plugins,
meta_client: None,
kv_backend: None,
enable_region_server_service: false,
enable_http_service: false,
}
}

Expand All @@ -205,20 +198,6 @@ impl DatanodeBuilder {
}
}

pub fn enable_region_server_service(self) -> Self {
Self {
enable_region_server_service: true,
..self
}
}

pub fn enable_http_service(self) -> Self {
Self {
enable_http_service: true,
..self
}
}

pub async fn build(mut self) -> Result<Datanode> {
let mode = &self.opts.mode;
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
Expand Down Expand Up @@ -269,8 +248,6 @@ impl DatanodeBuilder {
None
};

let services = self.create_datanode_services(&region_server)?;

let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
Some(self.opts.storage.data_home.clone()),
mode,
Expand All @@ -290,7 +267,7 @@ impl DatanodeBuilder {
.context(StartServerSnafu)?;

Ok(Datanode {
services,
services: HashMap::new(),
heartbeat_task,
region_server,
greptimedb_telemetry_task,
Expand All @@ -301,66 +278,6 @@ impl DatanodeBuilder {
})
}

fn create_datanode_services(&self, region_server: &RegionServer) -> Result<ServerHandlers> {
let mut services = HashMap::new();

if self.enable_region_server_service {
services.insert(
REGION_SERVER_SERVICE_NAME.to_string(),
self.create_region_server_service(region_server)?,
);
}

if self.enable_http_service {
services.insert(
DATANODE_HTTP_SERVICE_NAME.to_string(),
self.create_http_service()?,
);
}

Ok(services)
}

fn create_region_server_service(&self, region_server: &RegionServer) -> Result<ServerHandler> {
let opts = &self.opts;

let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};

let server = Box::new(
GrpcServerBuilder::new(region_server.runtime())
.config(config)
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
.build(),
);

let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;

Ok((server, addr))
}

fn create_http_service(&self) -> Result<ServerHandler> {
let opts = &self.opts;

let server = Box::new(
HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml_string())
.build(),
);

let addr = opts.http.addr.parse().context(ParseAddrSnafu {
addr: &opts.http.addr,
})?;

Ok((server, addr))
}

#[cfg(test)]
/// Open all regions belong to this datanode.
async fn initialize_region_server(
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod greptimedb_telemetry;
pub mod heartbeat;
pub mod metrics;
pub mod region_server;
pub mod service;
mod store;
#[cfg(any(test, feature = "testing"))]
pub mod tests;
107 changes: 107 additions & 0 deletions src/datanode/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use snafu::ResultExt;

use crate::config::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result};
use crate::region_server::RegionServer;

const DATANODE_GRPC_SERVICE_NAME: &str = "DATANODE_GRPC_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";

pub struct DatanodeServiceBuilder<'a> {
opts: &'a DatanodeOptions,
grpc_server: Option<GrpcServer>,
enable_http_service: bool,
}

impl<'a> DatanodeServiceBuilder<'a> {
pub fn new(opts: &'a DatanodeOptions) -> Self {
Self {
opts,
grpc_server: None,
enable_http_service: false,
}
}

pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
Self {
grpc_server: Some(grpc_server),
..self
}
}

pub fn with_default_grpc_server(mut self, region_server: &RegionServer) -> Self {
let grpc_server = Self::grpc_server_builder(self.opts, region_server).build();
self.grpc_server = Some(grpc_server);
self
}

pub fn enable_http_service(self) -> Self {
Self {
enable_http_service: true,
..self
}
}

pub fn build(mut self) -> Result<ServerHandlers> {
let mut services = HashMap::new();

if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.rpc_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
services.insert(DATANODE_GRPC_SERVICE_NAME.to_string(), handler);
}

if self.enable_http_service {
let http_server = HttpServerBuilder::new(self.opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(self.opts.to_toml_string())
.build();
let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
services.insert(DATANODE_HTTP_SERVICE_NAME.to_string(), handler);
}

Ok(services)
}

pub fn grpc_server_builder(
opts: &DatanodeOptions,
region_server: &RegionServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};

GrpcServerBuilder::new(config, region_server.runtime())
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
}
}
Loading

0 comments on commit 93f28c2

Please sign in to comment.