diff --git a/Cargo.lock b/Cargo.lock index 8f62994542d4..397459043901 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,14 +876,34 @@ dependencies = [ name = "benchmarks" version = "0.7.1" dependencies = [ + "api", "arrow", "chrono", "clap 4.4.11", "client", + "common-base", + "common-telemetry", + "common-wal", + "dotenv", + "futures", "futures-util", + "humantime", + "humantime-serde", "indicatif", + "itertools 0.10.5", + "lazy_static", + "log-store", + "mito2", + "num_cpus", "parquet", + "prometheus", + "rand", + "rskafka", + "serde", + "store-api", "tokio", + "toml 0.8.8", + "uuid", ] [[package]] @@ -1937,6 +1957,7 @@ dependencies = [ "async-stream", "async-trait", "backon", + "common-base", "common-error", "common-macro", "common-runtime", @@ -4844,6 +4865,8 @@ dependencies = [ "futures", "futures-util", "itertools 0.10.5", + "lazy_static", + "prometheus", "protobuf", "protobuf-build", "raft-engine", diff --git a/Cargo.toml b/Cargo.toml index 5140c5729d71..0bf04f17b82a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,11 +99,13 @@ datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.g datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } derive_builder = "0.12" +dotenv = "0.15" etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" } +humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 862579b26dfb..18b44e944858 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -8,11 +8,31 @@ license.workspace = true workspace = true [dependencies] +api.workspace = true arrow.workspace = true chrono.workspace = true clap.workspace = true client.workspace = true +common-base.workspace = true +common-telemetry.workspace = true +common-wal.workspace = true +dotenv.workspace = true +futures.workspace = true futures-util.workspace = true +humantime.workspace = true +humantime-serde.workspace = true indicatif = "0.17.1" +itertools.workspace = true +lazy_static.workspace = true +log-store.workspace = true +mito2.workspace = true +num_cpus.workspace = true parquet.workspace = true +prometheus.workspace = true +rand.workspace = true +rskafka.workspace = true +serde.workspace = true +store-api.workspace = true tokio.workspace = true +toml.workspace = true +uuid.workspace = true diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 000000000000..c281af38293e --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,11 @@ +Benchmarkers for GreptimeDB +-------------------------------- + +## Wal Benchmarker +The wal benchmarker serves to evaluate the performance of GreptimeDB's Write-Ahead Log (WAL) component. It meticulously assesses the read/write performance of the WAL under diverse workloads generated by the benchmarker. + + +### How to use +To compile the benchmarker, navigate to the `greptimedb/benchmarks` directory and execute `cargo build --release`. Subsequently, you'll find the compiled target located at `greptimedb/target/release/wal_bench`. + +The `./wal_bench -h` command reveals numerous arguments that the target accepts. Among these, a notable one is the `cfg-file` argument. By utilizing a configuration file in the TOML format, you can bypass the need to repeatedly specify cumbersome arguments. diff --git a/benchmarks/config/wal_bench.example.toml b/benchmarks/config/wal_bench.example.toml new file mode 100644 index 000000000000..72faed0d7410 --- /dev/null +++ b/benchmarks/config/wal_bench.example.toml @@ -0,0 +1,21 @@ +# Refers to the documents of `Args` in benchmarks/src/wal.rs`. +wal_provider = "kafka" +bootstrap_brokers = ["localhost:9092"] +num_workers = 10 +num_topics = 32 +num_regions = 1000 +num_scrapes = 1000 +num_rows = 5 +col_types = "ifs" +max_batch_size = "512KB" +linger = "1ms" +backoff_init = "10ms" +backoff_max = "1ms" +backoff_base = 2 +backoff_deadline = "3s" +compression = "zstd" +rng_seed = 42 +skip_read = false +skip_write = false +random_topics = true +report_metrics = false diff --git a/benchmarks/src/bin/wal_bench.rs b/benchmarks/src/bin/wal_bench.rs new file mode 100644 index 000000000000..6caa7b699871 --- /dev/null +++ b/benchmarks/src/bin/wal_bench.rs @@ -0,0 +1,326 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(int_roundings)] + +use std::fs; +use std::sync::Arc; +use std::time::Instant; + +use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; +use benchmarks::metrics; +use benchmarks::wal_bench::{Args, Config, Region, WalProvider}; +use clap::Parser; +use common_telemetry::info; +use common_wal::config::kafka::common::BackoffConfig; +use common_wal::config::kafka::DatanodeKafkaConfig as KafkaConfig; +use common_wal::config::raft_engine::RaftEngineConfig; +use common_wal::options::{KafkaWalOptions, WalOptions}; +use itertools::Itertools; +use log_store::kafka::log_store::KafkaLogStore; +use log_store::raft_engine::log_store::RaftEngineLogStore; +use mito2::wal::Wal; +use prometheus::{Encoder, TextEncoder}; +use rand::distributions::{Alphanumeric, DistString}; +use rand::rngs::SmallRng; +use rand::SeedableRng; +use rskafka::client::partition::Compression; +use rskafka::client::ClientBuilder; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +async fn run_benchmarker(cfg: &Config, topics: &[String], wal: Arc>) { + let chunk_size = cfg.num_regions.div_ceil(cfg.num_workers); + let region_chunks = (0..cfg.num_regions) + .map(|id| { + build_region( + id as u64, + topics, + &mut SmallRng::seed_from_u64(cfg.rng_seed), + cfg, + ) + }) + .chunks(chunk_size as usize) + .into_iter() + .map(|chunk| Arc::new(chunk.collect::>())) + .collect::>(); + + let mut write_elapsed = 0; + let mut read_elapsed = 0; + + if !cfg.skip_write { + info!("Benchmarking write ..."); + + let num_scrapes = cfg.num_scrapes; + let timer = Instant::now(); + futures::future::join_all((0..cfg.num_workers).map(|i| { + let wal = wal.clone(); + let regions = region_chunks[i as usize].clone(); + tokio::spawn(async move { + for _ in 0..num_scrapes { + let mut wal_writer = wal.writer(); + regions + .iter() + .for_each(|region| region.add_wal_entry(&mut wal_writer)); + wal_writer.write_to_wal().await.unwrap(); + } + }) + })) + .await; + write_elapsed += timer.elapsed().as_millis(); + } + + if !cfg.skip_read { + info!("Benchmarking read ..."); + + let timer = Instant::now(); + futures::future::join_all((0..cfg.num_workers).map(|i| { + let wal = wal.clone(); + let regions = region_chunks[i as usize].clone(); + tokio::spawn(async move { + for region in regions.iter() { + region.replay(&wal).await; + } + }) + })) + .await; + read_elapsed = timer.elapsed().as_millis(); + } + + dump_report(cfg, write_elapsed, read_elapsed); +} + +fn build_region(id: u64, topics: &[String], rng: &mut SmallRng, cfg: &Config) -> Region { + let wal_options = match cfg.wal_provider { + WalProvider::Kafka => { + assert!(!topics.is_empty()); + WalOptions::Kafka(KafkaWalOptions { + topic: topics.get(id as usize % topics.len()).cloned().unwrap(), + }) + } + WalProvider::RaftEngine => WalOptions::RaftEngine, + }; + Region::new( + RegionId::from_u64(id), + build_schema(&parse_col_types(&cfg.col_types), rng), + wal_options, + cfg.num_rows, + cfg.rng_seed, + ) +} + +fn build_schema(col_types: &[ColumnDataType], mut rng: &mut SmallRng) -> Vec { + col_types + .iter() + .map(|col_type| ColumnSchema { + column_name: Alphanumeric.sample_string(&mut rng, 5), + datatype: *col_type as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + }) + .chain(vec![ColumnSchema { + column_name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Tag as i32, + datatype_extension: None, + }]) + .collect() +} + +fn dump_report(cfg: &Config, write_elapsed: u128, read_elapsed: u128) { + let cost_report = format!( + "write costs: {} ms, read costs: {} ms", + write_elapsed, read_elapsed, + ); + + let total_written_bytes = metrics::METRIC_WAL_WRITE_BYTES_TOTAL.get() as u128; + let write_throughput = if write_elapsed > 0 { + (total_written_bytes * 1000).div_floor(write_elapsed) + } else { + 0 + }; + let total_read_bytes = metrics::METRIC_WAL_READ_BYTES_TOTAL.get() as u128; + let read_throughput = if read_elapsed > 0 { + (total_read_bytes * 1000).div_floor(read_elapsed) + } else { + 0 + }; + + let throughput_report = format!( + "total written bytes: {} bytes, total read bytes: {} bytes, write throuput: {} bytes/s ({} mb/s), read throughput: {} bytes/s ({} mb/s)", + total_written_bytes, + total_read_bytes, + write_throughput, + write_throughput.div_floor(1 << 20), + read_throughput, + read_throughput.div_floor(1 << 20), + ); + + let metrics_report = if cfg.report_metrics { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + let metrics = prometheus::gather(); + encoder.encode(&metrics, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() + } else { + String::new() + }; + + info!( + r#" +Benchmark config: +{cfg:?} + +Benchmark report: +{cost_report} +{throughput_report} +{metrics_report}"# + ); +} + +async fn create_topics(cfg: &Config) -> Vec { + // Creates topics. + let client = ClientBuilder::new(cfg.bootstrap_brokers.clone()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + let (topics, tasks): (Vec<_>, Vec<_>) = (0..cfg.num_topics) + .map(|i| { + let topic = if cfg.random_topics { + format!( + "greptime_wal_bench_topic_{}_{}", + uuid::Uuid::new_v4().as_u128(), + i + ) + } else { + format!("greptime_wal_bench_topic_{}", i) + }; + let task = ctrl_client.create_topic( + topic.clone(), + 1, + cfg.bootstrap_brokers.len() as i16, + 2000, + ); + (topic, task) + }) + .unzip(); + // Must ignore errors since we allow topics being created more than once. + let _ = futures::future::try_join_all(tasks).await; + + topics +} + +fn parse_compression(comp: &str) -> Compression { + match comp { + "no" => Compression::NoCompression, + "gzip" => Compression::Gzip, + "lz4" => Compression::Lz4, + "snappy" => Compression::Snappy, + "zstd" => Compression::Zstd, + other => unreachable!("Unrecognized compression {other}"), + } +} + +fn parse_col_types(col_types: &str) -> Vec { + let parts = col_types.split('x').collect::>(); + assert!(parts.len() <= 2); + + let pattern = parts[0]; + let repeat = parts + .get(1) + .map(|r| r.parse::().unwrap()) + .unwrap_or(1); + + pattern + .chars() + .map(|c| match c { + 'i' | 'I' => ColumnDataType::Int64, + 'f' | 'F' => ColumnDataType::Float64, + 's' | 'S' => ColumnDataType::String, + other => unreachable!("Cannot parse {other} as a column data type"), + }) + .cycle() + .take(pattern.len() * repeat) + .collect() +} + +fn main() { + // Sets the global logging to INFO and suppress loggings from rskafka other than ERROR and upper ones. + std::env::set_var("UNITTEST_LOG_LEVEL", "info,rskafka=error"); + common_telemetry::init_default_ut_logging(); + + let args = Args::parse(); + let cfg = if !args.cfg_file.is_empty() { + toml::from_str(&fs::read_to_string(&args.cfg_file).unwrap()).unwrap() + } else { + Config::from(args) + }; + + // Validates arguments. + if cfg.num_regions < cfg.num_workers { + panic!("num_regions must be greater than or equal to num_workers"); + } + if cfg + .num_workers + .min(cfg.num_topics) + .min(cfg.num_regions) + .min(cfg.num_scrapes) + .min(cfg.max_batch_size.as_bytes() as u32) + .min(cfg.bootstrap_brokers.len() as u32) + == 0 + { + panic!("Invalid arguments"); + } + + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + match cfg.wal_provider { + WalProvider::Kafka => { + let topics = create_topics(&cfg).await; + let kafka_cfg = KafkaConfig { + broker_endpoints: cfg.bootstrap_brokers.clone(), + max_batch_size: cfg.max_batch_size, + linger: cfg.linger, + backoff: BackoffConfig { + init: cfg.backoff_init, + max: cfg.backoff_max, + base: cfg.backoff_base, + deadline: Some(cfg.backoff_deadline), + }, + compression: parse_compression(&cfg.compression), + ..Default::default() + }; + let store = Arc::new(KafkaLogStore::try_new(&kafka_cfg).await.unwrap()); + let wal = Arc::new(Wal::new(store)); + run_benchmarker(&cfg, &topics, wal).await; + } + WalProvider::RaftEngine => { + // The benchmarker assumes the raft engine directory exists. + let store = RaftEngineLogStore::try_new( + "/tmp/greptimedb/raft-engine-wal".to_string(), + RaftEngineConfig::default(), + ) + .await + .map(Arc::new) + .unwrap(); + let wal = Arc::new(Wal::new(store)); + run_benchmarker(&cfg, &[], wal).await; + } + } + }); +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs new file mode 100644 index 000000000000..bab08887f765 --- /dev/null +++ b/benchmarks/src/lib.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod metrics; +pub mod wal_bench; diff --git a/benchmarks/src/metrics.rs b/benchmarks/src/metrics.rs new file mode 100644 index 000000000000..d65fd1eb9aa0 --- /dev/null +++ b/benchmarks/src/metrics.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::*; + +/// Logstore label. +pub const LOGSTORE_LABEL: &str = "logstore"; +/// Operation type label. +pub const OPTYPE_LABEL: &str = "optype"; + +lazy_static! { + /// Counters of bytes of each operation on a logstore. + pub static ref METRIC_WAL_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_bench_wal_op_bytes_total", + "wal operation bytes total", + &[OPTYPE_LABEL], + ) + .unwrap(); + /// Counter of bytes of the append_batch operation. + pub static ref METRIC_WAL_WRITE_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values( + &["write"], + ); + /// Counter of bytes of the read operation. + pub static ref METRIC_WAL_READ_BYTES_TOTAL: IntCounter = METRIC_WAL_OP_BYTES_TOTAL.with_label_values( + &["read"], + ); +} diff --git a/benchmarks/src/wal_bench.rs b/benchmarks/src/wal_bench.rs new file mode 100644 index 000000000000..10e88f99f37c --- /dev/null +++ b/benchmarks/src/wal_bench.rs @@ -0,0 +1,361 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem::size_of; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use api::v1::value::ValueData; +use api::v1::{ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, Value, WalEntry}; +use clap::{Parser, ValueEnum}; +use common_base::readable_size::ReadableSize; +use common_wal::options::WalOptions; +use futures::StreamExt; +use mito2::wal::{Wal, WalWriter}; +use rand::distributions::{Alphanumeric, DistString, Uniform}; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use serde::{Deserialize, Serialize}; +use store_api::logstore::LogStore; +use store_api::storage::RegionId; + +use crate::metrics; + +/// The wal provider. +#[derive(Clone, ValueEnum, Default, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WalProvider { + #[default] + RaftEngine, + Kafka, +} + +#[derive(Parser)] +pub struct Args { + /// The provided configuration file. + /// The example configuration file can be found at `greptimedb/benchmarks/config/wal_bench.example.toml`. + #[clap(long, short = 'c')] + pub cfg_file: String, + + /// The wal provider. + #[clap(long, value_enum, default_value_t = WalProvider::default())] + pub wal_provider: WalProvider, + + /// The advertised addresses of the kafka brokers. + /// If there're multiple bootstrap brokers, their addresses should be separated by comma, for e.g. "localhost:9092,localhost:9093". + #[clap(long, short = 'b', default_value = "localhost:9092")] + pub bootstrap_brokers: String, + + /// The number of workers each running in a dedicated thread. + #[clap(long, default_value_t = num_cpus::get() as u32)] + pub num_workers: u32, + + /// The number of kafka topics to be created. + #[clap(long, default_value_t = 32)] + pub num_topics: u32, + + /// The number of regions. + #[clap(long, default_value_t = 1000)] + pub num_regions: u32, + + /// The number of times each region is scraped. + #[clap(long, default_value_t = 1000)] + pub num_scrapes: u32, + + /// The number of rows in each wal entry. + /// Each time a region is scraped, a wal entry containing will be produced. + #[clap(long, default_value_t = 5)] + pub num_rows: u32, + + /// The column types of the schema for each region. + /// Currently, three column types are supported: + /// - i = ColumnDataType::Int64 + /// - f = ColumnDataType::Float64 + /// - s = ColumnDataType::String + /// For e.g., "ifs" will be parsed as three columns: i64, f64, and string. + /// + /// Additionally, a "x" sign can be provided to repeat the column types for a given number of times. + /// For e.g., "iix2" will be parsed as 4 columns: i64, i64, i64, and i64. + /// This feature is useful if you want to specify many columns. + #[clap(long, default_value = "ifs")] + pub col_types: String, + + /// The maximum size of a batch of kafka records. + /// The default value is 1mb. + #[clap(long, default_value = "512KB")] + pub max_batch_size: ReadableSize, + + /// The minimum latency the kafka client issues a batch of kafka records. + /// However, a batch of kafka records would be immediately issued if a record cannot be fit into the batch. + #[clap(long, default_value = "1ms")] + pub linger: String, + + /// The initial backoff delay of the kafka consumer. + #[clap(long, default_value = "10ms")] + pub backoff_init: String, + + /// The maximum backoff delay of the kafka consumer. + #[clap(long, default_value = "1s")] + pub backoff_max: String, + + /// The exponential backoff rate of the kafka consumer. The next back off = base * the current backoff. + #[clap(long, default_value_t = 2)] + pub backoff_base: u32, + + /// The deadline of backoff. The backoff ends if the total backoff delay reaches the deadline. + #[clap(long, default_value = "3s")] + pub backoff_deadline: String, + + /// The client-side compression algorithm for kafka records. + #[clap(long, default_value = "zstd")] + pub compression: String, + + /// The seed of random number generators. + #[clap(long, default_value_t = 42)] + pub rng_seed: u64, + + /// Skips the read phase, aka. region replay, if set to true. + #[clap(long, default_value_t = false)] + pub skip_read: bool, + + /// Skips the write phase if set to true. + #[clap(long, default_value_t = false)] + pub skip_write: bool, + + /// Randomly generates topic names if set to true. + /// Useful when you want to run the benchmarker without worrying about the topics created before. + #[clap(long, default_value_t = false)] + pub random_topics: bool, + + /// Logs out the gathered prometheus metrics when the benchmarker ends. + #[clap(long, default_value_t = false)] + pub report_metrics: bool, +} + +/// Benchmarker config. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Config { + pub wal_provider: WalProvider, + pub bootstrap_brokers: Vec, + pub num_workers: u32, + pub num_topics: u32, + pub num_regions: u32, + pub num_scrapes: u32, + pub num_rows: u32, + pub col_types: String, + pub max_batch_size: ReadableSize, + #[serde(with = "humantime_serde")] + pub linger: Duration, + #[serde(with = "humantime_serde")] + pub backoff_init: Duration, + #[serde(with = "humantime_serde")] + pub backoff_max: Duration, + pub backoff_base: u32, + #[serde(with = "humantime_serde")] + pub backoff_deadline: Duration, + pub compression: String, + pub rng_seed: u64, + pub skip_read: bool, + pub skip_write: bool, + pub random_topics: bool, + pub report_metrics: bool, +} + +impl From for Config { + fn from(args: Args) -> Self { + let cfg = Self { + wal_provider: args.wal_provider, + bootstrap_brokers: args + .bootstrap_brokers + .split(',') + .map(ToString::to_string) + .collect::>(), + num_workers: args.num_workers.min(num_cpus::get() as u32), + num_topics: args.num_topics, + num_regions: args.num_regions, + num_scrapes: args.num_scrapes, + num_rows: args.num_rows, + col_types: args.col_types, + max_batch_size: args.max_batch_size, + linger: humantime::parse_duration(&args.linger).unwrap(), + backoff_init: humantime::parse_duration(&args.backoff_init).unwrap(), + backoff_max: humantime::parse_duration(&args.backoff_max).unwrap(), + backoff_base: args.backoff_base, + backoff_deadline: humantime::parse_duration(&args.backoff_deadline).unwrap(), + compression: args.compression, + rng_seed: args.rng_seed, + skip_read: args.skip_read, + skip_write: args.skip_write, + random_topics: args.random_topics, + report_metrics: args.report_metrics, + }; + + cfg + } +} + +/// The region used for wal benchmarker. +pub struct Region { + id: RegionId, + schema: Vec, + wal_options: WalOptions, + next_sequence: AtomicU64, + next_entry_id: AtomicU64, + next_timestamp: AtomicI64, + rng: Mutex>, + num_rows: u32, +} + +impl Region { + /// Creates a new region. + pub fn new( + id: RegionId, + schema: Vec, + wal_options: WalOptions, + num_rows: u32, + rng_seed: u64, + ) -> Self { + Self { + id, + schema, + wal_options, + next_sequence: AtomicU64::new(1), + next_entry_id: AtomicU64::new(1), + next_timestamp: AtomicI64::new(1655276557000), + rng: Mutex::new(Some(SmallRng::seed_from_u64(rng_seed))), + num_rows, + } + } + + /// Scrapes the region and adds the generated entry to wal. + pub fn add_wal_entry(&self, wal_writer: &mut WalWriter) { + let mutation = Mutation { + op_type: OpType::Put as i32, + sequence: self + .next_sequence + .fetch_add(self.num_rows as u64, Ordering::Relaxed), + rows: Some(self.build_rows()), + }; + let entry = WalEntry { + mutations: vec![mutation], + }; + metrics::METRIC_WAL_WRITE_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); + + wal_writer + .add_entry( + self.id, + self.next_entry_id.fetch_add(1, Ordering::Relaxed), + &entry, + &self.wal_options, + ) + .unwrap(); + } + + /// Replays the region. + pub async fn replay(&self, wal: &Arc>) { + let mut wal_stream = wal.scan(self.id, 0, &self.wal_options).unwrap(); + while let Some(res) = wal_stream.next().await { + let (_, entry) = res.unwrap(); + metrics::METRIC_WAL_READ_BYTES_TOTAL.inc_by(Self::entry_estimated_size(&entry) as u64); + } + } + + /// Computes the estimated size in bytes of the entry. + pub fn entry_estimated_size(entry: &WalEntry) -> usize { + let wrapper_size = size_of::() + + entry.mutations.capacity() * size_of::() + + size_of::(); + + let rows = entry.mutations[0].rows.as_ref().unwrap(); + + let schema_size = rows.schema.capacity() * size_of::() + + rows + .schema + .iter() + .map(|s| s.column_name.capacity()) + .sum::(); + let values_size = (rows.rows.capacity() * size_of::()) + + rows + .rows + .iter() + .map(|r| r.values.capacity() * size_of::()) + .sum::(); + + wrapper_size + schema_size + values_size + } + + fn build_rows(&self) -> Rows { + let cols = self + .schema + .iter() + .map(|col_schema| { + let col_data_type = ColumnDataType::try_from(col_schema.datatype).unwrap(); + self.build_col(&col_data_type, self.num_rows) + }) + .collect::>(); + + let rows = (0..self.num_rows) + .map(|i| { + let values = cols.iter().map(|col| col[i as usize].clone()).collect(); + Row { values } + }) + .collect(); + + Rows { + schema: self.schema.clone(), + rows, + } + } + + fn build_col(&self, col_data_type: &ColumnDataType, num_rows: u32) -> Vec { + let mut rng_guard = self.rng.lock().unwrap(); + let rng = rng_guard.as_mut().unwrap(); + match col_data_type { + ColumnDataType::TimestampMillisecond => (0..num_rows) + .map(|_| { + let ts = self.next_timestamp.fetch_add(1000, Ordering::Relaxed); + Value { + value_data: Some(ValueData::TimestampMillisecondValue(ts)), + } + }) + .collect(), + ColumnDataType::Int64 => (0..num_rows) + .map(|_| { + let v = rng.sample(Uniform::new(0, 10_000)); + Value { + value_data: Some(ValueData::I64Value(v)), + } + }) + .collect(), + ColumnDataType::Float64 => (0..num_rows) + .map(|_| { + let v = rng.sample(Uniform::new(0.0, 5000.0)); + Value { + value_data: Some(ValueData::F64Value(v)), + } + }) + .collect(), + ColumnDataType::String => (0..num_rows) + .map(|_| { + let v = Alphanumeric.sample_string(rng, 10); + Value { + value_data: Some(ValueData::StringValue(v)), + } + }) + .collect(), + _ => unreachable!(), + } + } +} diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index b9adcf215263..472a57e40a0a 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -29,6 +29,12 @@ store_key_prefix = "" max_retry_times = 12 # Initial retry delay of procedures, increases exponentially retry_delay = "500ms" +# Auto split large value +# GreptimeDB procedure uses etcd as the default metadata storage backend. +# The etcd the maximum size of any request is 1.5 MiB +# 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key) +# Comments out the `max_metadata_value_size`, for don't split large value (no limit). +max_metadata_value_size = "1500KiB" # Failure detectors options. [failure_detector] diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 0e45ffa461a8..c7affc93c6f1 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -218,6 +218,7 @@ impl StartCommand { mod tests { use std::io::Write; + use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; use meta_srv::selector::SelectorType; @@ -297,6 +298,10 @@ mod tests { .first_heartbeat_estimate .as_millis() ); + assert_eq!( + options.procedure.max_metadata_value_size, + Some(ReadableSize::kb(1500)) + ); } #[test] diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 5c8e99211b96..a5a64b0efda5 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -23,7 +23,6 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{Context, LockKey, Procedure, Status}; use common_telemetry::{info, warn}; use futures_util::future; -use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; @@ -32,11 +31,10 @@ use strum::AsRefStr; use table::metadata::TableId; use crate::ddl::utils::add_peer_context_if_needed; -use crate::ddl::{physical_table_metadata, DdlContext}; +use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::find_leaders; @@ -128,7 +126,7 @@ impl AlterLogicalTablesProcedure { }); } - // Collects responses from all the alter region tasks. + // Collects responses from datanodes. let phy_raw_schemas = future::join_all(alter_region_tasks) .await .into_iter() @@ -163,44 +161,8 @@ impl AlterLogicalTablesProcedure { } pub(crate) async fn on_update_metadata(&mut self) -> Result { - if !self.data.physical_columns.is_empty() { - let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); - - // Generates new table info - let old_raw_table_info = physical_table_info.table_info.clone(); - let new_raw_table_info = physical_table_metadata::build_new_physical_table_info( - old_raw_table_info, - &self.data.physical_columns, - ); - - // Updates physical table's metadata - self.context - .table_metadata_manager - .update_table_info( - DeserializedValueWithBytes::from_inner(physical_table_info.clone()), - new_raw_table_info, - ) - .await?; - } - - let table_info_values = self.build_update_metadata()?; - let manager = &self.context.table_metadata_manager; - let chunk_size = manager.batch_update_table_info_value_chunk_size(); - if table_info_values.len() > chunk_size { - let chunks = table_info_values - .into_iter() - .chunks(chunk_size) - .into_iter() - .map(|check| check.collect::>()) - .collect::>(); - for chunk in chunks { - manager.batch_update_table_info_values(chunk).await?; - } - } else { - manager - .batch_update_table_info_values(table_info_values) - .await?; - } + self.update_physical_table_metadata().await?; + self.update_logical_tables_metadata().await?; self.data.state = AlterTablesState::InvalidateTableCache; Ok(Status::executing(true)) diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index b31e0a879935..3c8623ea24db 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -13,16 +13,70 @@ // limitations under the License. use common_grpc_expr::alter_expr_to_request; +use common_telemetry::warn; +use itertools::Itertools; use snafu::ResultExt; use table::metadata::{RawTableInfo, TableInfo}; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::ddl::physical_table_metadata; use crate::error; use crate::error::{ConvertAlterTableRequestSnafu, Result}; use crate::key::table_info::TableInfoValue; +use crate::key::DeserializedValueWithBytes; use crate::rpc::ddl::AlterTableTask; impl AlterLogicalTablesProcedure { + pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { + if self.data.physical_columns.is_empty() { + warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables"); + return Ok(()); + } + + let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); + + // Generates new table info + let old_raw_table_info = physical_table_info.table_info.clone(); + let new_raw_table_info = physical_table_metadata::build_new_physical_table_info( + old_raw_table_info, + &self.data.physical_columns, + ); + + // Updates physical table's metadata + self.context + .table_metadata_manager + .update_table_info( + DeserializedValueWithBytes::from_inner(physical_table_info.clone()), + new_raw_table_info, + ) + .await?; + + Ok(()) + } + + pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> { + let table_info_values = self.build_update_metadata()?; + let manager = &self.context.table_metadata_manager; + let chunk_size = manager.batch_update_table_info_value_chunk_size(); + if table_info_values.len() > chunk_size { + let chunks = table_info_values + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|check| check.collect::>()) + .collect::>(); + for chunk in chunks { + manager.batch_update_table_info_values(chunk).await?; + } + } else { + manager + .batch_update_table_info_values(table_info_values) + .await?; + } + + Ok(()) + } + pub(crate) fn build_update_metadata(&self) -> Result> { let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len()); for (task, table) in self diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index df64b8e286e3..d050e7e3e465 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -12,45 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::ops::Deref; +mod check; +mod metadata; +mod region_request; +mod update_metadata; -use api::v1::region::region_request::Body as PbRegionRequest; -use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader}; use api::v1::CreateTableExpr; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{info, warn}; +use common_telemetry::warn; use futures_util::future::join_all; -use itertools::Itertools; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; -use crate::cache_invalidator::Context; -use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; -use crate::ddl::{physical_table_metadata, DdlContext}; -use crate::error::{ - DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu, - TableInfoNotFoundSnafu, -}; -use crate::instruction::CacheIdent; -use crate::key::table_info::TableInfoValue; -use crate::key::table_name::TableNameKey; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::DdlContext; +use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; -use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; -use crate::table_name::TableName; +use crate::rpc::router::{find_leaders, RegionRoute}; use crate::{metrics, ClusterId}; pub struct CreateLogicalTablesProcedure { @@ -67,17 +54,18 @@ impl CreateLogicalTablesProcedure { physical_table_id: TableId, context: DdlContext, ) -> Self { - let len = tasks.len(); - let data = CreateTablesData { - cluster_id, - state: CreateTablesState::Prepare, - tasks, - table_ids_already_exists: vec![None; len], - physical_table_id, - physical_region_numbers: vec![], - physical_columns: vec![], - }; - Self { context, data } + Self { + context, + data: CreateTablesData { + cluster_id, + state: CreateTablesState::Prepare, + tasks, + table_ids_already_exists: vec![], + physical_table_id, + physical_region_numbers: vec![], + physical_columns: vec![], + }, + } } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { @@ -96,91 +84,45 @@ impl CreateLogicalTablesProcedure { /// - Failed to check whether tables exist. /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`. pub(crate) async fn on_prepare(&mut self) -> Result { - let manager = &self.context.table_metadata_manager; - + self.check_input_tasks()?; // Sets physical region numbers - let physical_table_id = self.data.physical_table_id(); - let physical_region_numbers = manager - .table_route_manager() - .get_physical_table_route(physical_table_id) - .await - .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?; - self.data - .set_physical_region_numbers(physical_region_numbers); - + self.fill_physical_table_info().await?; // Checks if the tables exist - let table_name_keys = self - .data - .all_create_table_exprs() - .iter() - .map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name)) - .collect::>(); - let already_exists_tables_ids = manager - .table_name_manager() - .batch_get(table_name_keys) - .await? - .iter() - .map(|x| x.map(|x| x.table_id())) - .collect::>(); - - // Validates the tasks - let tasks = &mut self.data.tasks; - for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) { - if table_id.is_some() { - // If a table already exists, we just ignore it. - ensure!( - task.create_table.create_if_not_exists, - TableAlreadyExistsSnafu { - table_name: task.create_table.table_name.to_string(), - } - ); - continue; - } - } + self.check_tables_already_exist().await?; // If all tables already exist, returns the table_ids. - if already_exists_tables_ids.iter().all(Option::is_some) { + if self + .data + .table_ids_already_exists + .iter() + .all(Option::is_some) + { return Ok(Status::done_with_output( - already_exists_tables_ids - .into_iter() + self.data + .table_ids_already_exists + .drain(..) .flatten() .collect::>(), )); } // Allocates table ids and sort columns on their names. - for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) { - let table_id = if let Some(table_id) = table_id { - *table_id - } else { - self.context - .table_metadata_allocator - .allocate_table_id(task) - .await? - }; - task.set_table_id(table_id); - - // sort columns in task - task.sort_columns(); - } + self.allocate_table_ids().await?; - self.data - .set_table_ids_already_exists(already_exists_tables_ids); self.data.state = CreateTablesState::DatanodeCreateRegions; Ok(Status::executing(true)) } pub async fn on_datanode_create_regions(&mut self) -> Result { - let physical_table_id = self.data.physical_table_id(); let (_, physical_table_route) = self .context .table_metadata_manager .table_route_manager() - .get_physical_table_route(physical_table_id) + .get_physical_table_route(self.data.physical_table_id) .await?; - let region_routes = &physical_table_route.region_routes; - self.create_regions(region_routes).await + self.create_regions(&physical_table_route.region_routes) + .await } /// Creates table metadata for logical tables and update corresponding physical @@ -189,179 +131,54 @@ impl CreateLogicalTablesProcedure { /// Abort(not-retry): /// - Failed to create table metadata. pub async fn on_create_metadata(&mut self) -> Result { - let manager = &self.context.table_metadata_manager; - let physical_table_id = self.data.physical_table_id(); - let remaining_tasks = self.data.remaining_tasks(); - let num_tables = remaining_tasks.len(); - - if num_tables > 0 { - let chunk_size = manager.create_logical_tables_metadata_chunk_size(); - if num_tables > chunk_size { - let chunks = remaining_tasks - .into_iter() - .chunks(chunk_size) - .into_iter() - .map(|chunk| chunk.collect::>()) - .collect::>(); - for chunk in chunks { - manager.create_logical_tables_metadata(chunk).await?; - } - } else { - manager - .create_logical_tables_metadata(remaining_tasks) - .await?; - } - } - - // The `table_id` MUST be collected after the [Prepare::Prepare], - // ensures the all `table_id`s have been allocated. - let table_ids = self - .data - .tasks - .iter() - .map(|task| task.table_info.ident.table_id) - .collect::>(); - - if !self.data.physical_columns.is_empty() { - // fetch old physical table's info - let physical_table_info = self - .context - .table_metadata_manager - .table_info_manager() - .get(self.data.physical_table_id) - .await? - .with_context(|| TableInfoNotFoundSnafu { - table: format!("table id - {}", self.data.physical_table_id), - })?; - - // generate new table info - let new_table_info = self - .data - .build_new_physical_table_info(&physical_table_info); - - let physical_table_name = TableName::new( - &new_table_info.catalog_name, - &new_table_info.schema_name, - &new_table_info.name, - ); - - // update physical table's metadata - self.context - .table_metadata_manager - .update_table_info(physical_table_info, new_table_info) - .await?; - - // invalid table cache - self.context - .cache_invalidator - .invalidate( - &Context::default(), - vec![ - CacheIdent::TableId(self.data.physical_table_id), - CacheIdent::TableName(physical_table_name), - ], - ) - .await?; - } else { - warn!("No physical columns found, leaving the physical table's schema unchanged"); - } - - info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}"); + self.update_physical_table_metadata().await?; + let table_ids = self.create_logical_tables_metadata().await?; Ok(Status::done_with_output(table_ids)) } - fn create_region_request_builder( - &self, - physical_table_id: TableId, - task: &CreateTableTask, - ) -> Result { - let create_expr = &task.create_table; - let template = build_template(create_expr)?; - Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) - } - - fn one_datanode_region_requests( - &self, - datanode: &Peer, - region_routes: &[RegionRoute], - ) -> Result { - let create_tables_data = &self.data; - let tasks = &create_tables_data.tasks; - let physical_table_id = create_tables_data.physical_table_id(); - let regions = find_leader_regions(region_routes, datanode); - let mut requests = Vec::with_capacity(tasks.len() * regions.len()); - - for task in tasks { - let create_table_expr = &task.create_table; - let catalog = &create_table_expr.catalog_name; - let schema = &create_table_expr.schema_name; - let logical_table_id = task.table_info.ident.table_id; - let storage_path = region_storage_path(catalog, schema); - let request_builder = self.create_region_request_builder(physical_table_id, task)?; - - for region_number in ®ions { - let region_id = RegionId::new(logical_table_id, *region_number); - let create_region_request = - request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?; - requests.push(create_region_request); - } - } - - Ok(CreateRequests { requests }) - } - async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result { let leaders = find_leaders(region_routes); let mut create_region_tasks = Vec::with_capacity(leaders.len()); - for datanode in leaders { - let requester = self.context.datanode_manager.datanode(&datanode).await; - let creates = self.one_datanode_region_requests(&datanode, region_routes)?; - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(PbRegionRequest::Creates(creates)), - }; + for peer in leaders { + let requester = self.context.datanode_manager.datanode(&peer).await; + let request = self.make_request(&peer, region_routes)?; + create_region_tasks.push(async move { requester .handle(request) .await - .map_err(add_peer_context_if_needed(datanode)) + .map_err(add_peer_context_if_needed(peer)) }); } - // collect response from datanodes - let raw_schemas = join_all(create_region_tasks) + // Collects response from datanodes. + let phy_raw_schemas = join_all(create_region_tasks) .await .into_iter() - .map(|response| { - response.map(|mut response| response.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)) - }) + .map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; - if raw_schemas.is_empty() { + if phy_raw_schemas.is_empty() { self.data.state = CreateTablesState::CreateMetadata; return Ok(Status::executing(false)); } - // verify all datanodes return the same raw schemas - // Safety: previous check ensures this vector is not empty. - let first = raw_schemas.first().unwrap(); + // Verify all the physical schemas are the same + // Safety: previous check ensures this vec is not empty + let first = phy_raw_schemas.first().unwrap(); ensure!( - raw_schemas.iter().all(|x| x == first), + phy_raw_schemas.iter().all(|x| x == first), MetadataCorruptionSnafu { - err_msg: "Raw schemas from datanodes are not the same" + err_msg: "The physical schemas from datanodes are not the same." } ); - // decode raw schemas and store it - if let Some(raw_schema) = first { - let physical_columns = - ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?; - self.data.physical_columns = physical_columns; + // Decodes the physical raw schemas + if let Some(phy_raw_schemas) = first { + self.data.physical_columns = + ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?; } else { warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } @@ -405,7 +222,7 @@ impl Procedure for CreateLogicalTablesProcedure { let table_ref = self.data.tasks[0].table_ref(); lock_key.push(CatalogLock::Read(table_ref.catalog).into()); lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); - lock_key.push(TableLock::Write(self.data.physical_table_id()).into()); + lock_key.push(TableLock::Write(self.data.physical_table_id).into()); for task in &self.data.tasks { lock_key.push( @@ -437,18 +254,6 @@ impl CreateTablesData { &self.state } - fn physical_table_id(&self) -> TableId { - self.physical_table_id - } - - fn set_physical_region_numbers(&mut self, physical_region_numbers: Vec) { - self.physical_region_numbers = physical_region_numbers; - } - - fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec>) { - self.table_ids_already_exists = table_ids_already_exists; - } - fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> { self.tasks .iter() @@ -480,21 +285,6 @@ impl CreateTablesData { }) .collect::>() } - - /// Generate the new physical table info. - /// - /// This method will consumes the physical columns. - fn build_new_physical_table_info( - &mut self, - old_table_info: &DeserializedValueWithBytes, - ) -> RawTableInfo { - let raw_table_info = old_table_info.deref().table_info.clone(); - - physical_table_metadata::build_new_physical_table_info( - raw_table_info, - &self.physical_columns, - ) - } } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] diff --git a/src/common/meta/src/ddl/create_logical_tables/check.rs b/src/common/meta/src/ddl/create_logical_tables/check.rs new file mode 100644 index 000000000000..2f86b8ac448d --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/check.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::error::{CreateLogicalTablesInvalidArgumentsSnafu, Result, TableAlreadyExistsSnafu}; +use crate::key::table_name::TableNameKey; + +impl CreateLogicalTablesProcedure { + pub(crate) fn check_input_tasks(&self) -> Result<()> { + self.check_schema()?; + + Ok(()) + } + + pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> { + let table_name_keys = self + .data + .all_create_table_exprs() + .iter() + .map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name)) + .collect::>(); + let table_ids_already_exists = self + .context + .table_metadata_manager + .table_name_manager() + .batch_get(table_name_keys) + .await? + .iter() + .map(|x| x.map(|x| x.table_id())) + .collect::>(); + + self.data.table_ids_already_exists = table_ids_already_exists; + + // Validates the tasks + let tasks = &mut self.data.tasks; + for (task, table_id) in tasks.iter().zip(self.data.table_ids_already_exists.iter()) { + if table_id.is_some() { + // If a table already exists, we just ignore it. + ensure!( + task.create_table.create_if_not_exists, + TableAlreadyExistsSnafu { + table_name: task.create_table.table_name.to_string(), + } + ); + continue; + } + } + + Ok(()) + } + + // Checks if the schemas of the tasks are the same + fn check_schema(&self) -> Result<()> { + let is_same_schema = self.data.tasks.windows(2).all(|pair| { + pair[0].create_table.catalog_name == pair[1].create_table.catalog_name + && pair[0].create_table.schema_name == pair[1].create_table.schema_name + }); + + ensure!( + is_same_schema, + CreateLogicalTablesInvalidArgumentsSnafu { + err_msg: "Schemas of the tasks are not the same" + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables/metadata.rs b/src/common/meta/src/ddl/create_logical_tables/metadata.rs new file mode 100644 index 000000000000..2d61719d3915 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/metadata.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::error::Result; +use crate::key::table_route::TableRouteValue; + +impl CreateLogicalTablesProcedure { + pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> { + let physical_region_numbers = self + .context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(self.data.physical_table_id) + .await + .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?; + + self.data.physical_region_numbers = physical_region_numbers; + + Ok(()) + } + + pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> { + for (task, table_id) in self + .data + .tasks + .iter_mut() + .zip(self.data.table_ids_already_exists.iter()) + { + let table_id = if let Some(table_id) = table_id { + *table_id + } else { + self.context + .table_metadata_allocator + .allocate_table_id(task) + .await? + }; + task.set_table_id(table_id); + + // sort columns in task + task.sort_columns(); + } + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs new file mode 100644 index 000000000000..bc0d290c4ef2 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader}; +use common_telemetry::tracing_context::TracingContext; +use store_api::storage::RegionId; + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; +use crate::ddl::utils::region_storage_path; +use crate::error::Result; +use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::{find_leader_regions, RegionRoute}; + +impl CreateLogicalTablesProcedure { + pub(crate) fn make_request( + &self, + peer: &Peer, + region_routes: &[RegionRoute], + ) -> Result { + let tasks = &self.data.tasks; + let regions_on_this_peer = find_leader_regions(region_routes, peer); + let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len()); + for task in tasks { + let create_table_expr = &task.create_table; + let catalog = &create_table_expr.catalog_name; + let schema = &create_table_expr.schema_name; + let logical_table_id = task.table_info.ident.table_id; + let storage_path = region_storage_path(catalog, schema); + let request_builder = self.create_region_request_builder(task)?; + + for region_number in ®ions_on_this_peer { + let region_id = RegionId::new(logical_table_id, *region_number); + let one_region_request = + request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?; + requests.push(one_region_request); + } + } + + Ok(RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Creates(CreateRequests { requests })), + }) + } + + fn create_region_request_builder( + &self, + task: &CreateTableTask, + ) -> Result { + let create_expr = &task.create_table; + let template = build_template(create_expr)?; + Ok(CreateRequestBuilder::new( + template, + Some(self.data.physical_table_id), + )) + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs new file mode 100644 index 000000000000..650035072719 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs @@ -0,0 +1,128 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; + +use common_telemetry::{info, warn}; +use itertools::Itertools; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::cache_invalidator::Context; +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::physical_table_metadata; +use crate::error::{Result, TableInfoNotFoundSnafu}; +use crate::instruction::CacheIdent; +use crate::table_name::TableName; + +impl CreateLogicalTablesProcedure { + pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { + if self.data.physical_columns.is_empty() { + warn!("No physical columns found, leaving the physical table's schema unchanged when creating logical tables"); + return Ok(()); + } + + // Fetches old physical table's info + let physical_table_info = self + .context + .table_metadata_manager + .table_info_manager() + .get(self.data.physical_table_id) + .await? + .with_context(|| TableInfoNotFoundSnafu { + table: format!("table id - {}", self.data.physical_table_id), + })?; + + // Generates new table info + let raw_table_info = physical_table_info.deref().table_info.clone(); + + let new_table_info = physical_table_metadata::build_new_physical_table_info( + raw_table_info, + &self.data.physical_columns, + ); + + let physical_table_name = TableName::new( + &new_table_info.catalog_name, + &new_table_info.schema_name, + &new_table_info.name, + ); + + // Update physical table's metadata + self.context + .table_metadata_manager + .update_table_info(physical_table_info, new_table_info) + .await?; + + // Invalid physical table cache + self.context + .cache_invalidator + .invalidate( + &Context::default(), + vec![ + CacheIdent::TableId(self.data.physical_table_id), + CacheIdent::TableName(physical_table_name), + ], + ) + .await?; + + Ok(()) + } + + pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result> { + let remaining_tasks = self.data.remaining_tasks(); + let num_tables = remaining_tasks.len(); + + if num_tables > 0 { + let chunk_size = self + .context + .table_metadata_manager + .create_logical_tables_metadata_chunk_size(); + if num_tables > chunk_size { + let chunks = remaining_tasks + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|chunk| chunk.collect::>()) + .collect::>(); + for chunk in chunks { + self.context + .table_metadata_manager + .create_logical_tables_metadata(chunk) + .await?; + } + } else { + self.context + .table_metadata_manager + .create_logical_tables_metadata(remaining_tasks) + .await?; + } + } + + // The `table_id` MUST be collected after the [Prepare::Prepare], + // ensures the all `table_id`s have been allocated. + let table_ids = self + .data + .tasks + .iter() + .map(|task| task.table_info.ident.table_id) + .collect::>(); + + info!( + "Created {num_tables} tables {table_ids:?} metadata for physical table {}", + self.data.physical_table_id + ); + + Ok(table_ids) + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ff067aa609a9..a4728676bafd 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -403,6 +403,9 @@ pub enum Error { #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))] AlterLogicalTablesInvalidArguments { err_msg: String, location: Location }, + + #[snafu(display("Create logical tables invalid arguments: {}", err_msg))] + CreateLogicalTablesInvalidArguments { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -463,7 +466,8 @@ impl ErrorExt for Error { | PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } - | AlterLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments, + | AlterLogicalTablesInvalidArguments { .. } + | CreateLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments, TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index 686e7477cec6..eb3a91de173b 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -45,21 +45,30 @@ fn strip_prefix(key: &str) -> String { pub struct KvStateStore { kv_backend: KvBackendRef, // The max num of keys to be returned in a range scan request - // `None` stands no limit. - max_num_per_range: Option, + // `None` stands for no limit. + max_num_per_range_request: Option, // The max bytes of value. - // `None` stands no limit. - max_size_per_value: Option, + // `None` stands for no limit. + max_value_size: Option, } impl KvStateStore { pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend, - max_num_per_range: None, - max_size_per_value: None, + max_num_per_range_request: None, + max_value_size: None, } } + + /// Sets the `max_value_size`. `None` stands for no limit. + /// + /// If a value is larger than the `max_value_size`, + /// the [`KvStateStore`] will automatically split the large value into multiple values. + pub fn with_max_value_size(mut self, max_value_size: Option) -> Self { + self.max_value_size = max_value_size; + self + } } fn decode_kv(kv: KeyValue) -> Result<(String, Vec)> { @@ -75,12 +84,12 @@ enum SplitValue<'a> { Multiple(Vec<&'a [u8]>), } -fn split_value(value: &[u8], max_size_per_value: Option) -> SplitValue<'_> { - if let Some(max_size_per_value) = max_size_per_value { - if value.len() <= max_size_per_value { +fn split_value(value: &[u8], max_value_size: Option) -> SplitValue<'_> { + if let Some(max_value_size) = max_value_size { + if value.len() <= max_value_size { SplitValue::Single(value) } else { - SplitValue::Multiple(value.chunks(max_size_per_value).collect::>()) + SplitValue::Multiple(value.chunks(max_value_size).collect::>()) } } else { SplitValue::Single(value) @@ -90,7 +99,7 @@ fn split_value(value: &[u8], max_size_per_value: Option) -> SplitValue<'_ #[async_trait] impl StateStore for KvStateStore { async fn put(&self, key: &str, value: Vec) -> ProcedureResult<()> { - let split = split_value(&value, self.max_size_per_value); + let split = split_value(&value, self.max_value_size); let key = with_prefix(key); match split { SplitValue::Single(_) => { @@ -156,7 +165,7 @@ impl StateStore for KvStateStore { let stream = PaginationStream::new( self.kv_backend.clone(), req, - self.max_num_per_range.unwrap_or_default(), + self.max_num_per_range_request.unwrap_or_default(), Arc::new(decode_kv), ); @@ -214,8 +223,8 @@ mod tests { async fn test_meta_state_store() { let store = &KvStateStore { kv_backend: Arc::new(MemoryKvBackend::new()), - max_num_per_range: Some(1), // for testing "more" in range - max_size_per_value: None, + max_num_per_range_request: Some(1), // for testing "more" in range + max_value_size: None, }; let walk_top_down = async move |path: &str| -> Vec { @@ -294,8 +303,8 @@ mod tests { } let store = &KvStateStore { kv_backend: kv_backend.clone(), - max_num_per_range: Some(num_per_range as usize), // for testing "more" in range - max_size_per_value: Some(size_limit as usize), + max_num_per_range_request: Some(num_per_range as usize), // for testing "more" in range + max_value_size: Some(size_limit as usize), }; let walk_top_down = async move |path: &str| -> Vec { let mut data = store @@ -366,11 +375,11 @@ mod tests { Arc::new(ChrootKvBackend::new(chroot.into(), backend)) }; - let key_preserve_size = 1024; + let key_size = 1024; // The etcd default size limit of any requests is 1.5MiB. // However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`; // we don't know the exact size of the key. - let size_limit = 1536 * 1024 - key_preserve_size; + let size_limit = 1536 * 1024 - key_size; let page_size = rand::thread_rng().gen_range(1..10); test_meta_state_store_split_value_with_size_limit( kv_backend, diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index af755b2b87f3..d5b47677749f 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -14,6 +14,7 @@ workspace = true async-stream.workspace = true async-trait.workspace = true backon = "0.4" +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true diff --git a/src/common/procedure/src/options.rs b/src/common/procedure/src/options.rs index 9cebfa805c25..503812de49a7 100644 --- a/src/common/procedure/src/options.rs +++ b/src/common/procedure/src/options.rs @@ -16,6 +16,7 @@ use std::time::Duration; +use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -26,6 +27,8 @@ pub struct ProcedureConfig { /// Initial retry delay of procedures, increases exponentially. #[serde(with = "humantime_serde")] pub retry_delay: Duration, + /// `None` stands for no limit. + pub max_metadata_value_size: Option, } impl Default for ProcedureConfig { @@ -33,6 +36,7 @@ impl Default for ProcedureConfig { ProcedureConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), + max_metadata_value_size: None, } } } diff --git a/src/common/procedure/src/store/util.rs b/src/common/procedure/src/store/util.rs index 924a3b069bd0..c8ed3bf50f86 100644 --- a/src/common/procedure/src/store/util.rs +++ b/src/common/procedure/src/store/util.rs @@ -22,7 +22,7 @@ use super::state_store::KeySet; use crate::error; use crate::error::Result; -pub struct CollectingState { +struct CollectingState { pairs: Vec<(String, Vec)>, } @@ -40,11 +40,11 @@ fn parse_segments(segments: Vec<(String, Vec)>, prefix: &str) -> Result>>() } -/// Collects multiple values into a single key-value pair. +/// Merges multiple values into a single key-value pair. /// Returns an error if: /// - Part values are lost. /// - Failed to parse the key of segment. -fn multiple_values_collector( +fn merge_multiple_values( CollectingState { mut pairs }: CollectingState, ) -> Result<(KeySet, Vec)> { if pairs.len() == 1 { @@ -100,8 +100,10 @@ impl CollectingState { } } -pub type Upstream = dyn Stream)>> + Send; +type Upstream = dyn Stream)>> + Send; +/// Merges multiple values that have the same prefix of the key +/// from `upstream` into a single value. pub fn multiple_value_stream( mut upstream: Pin>, ) -> impl Stream)>> { @@ -117,14 +119,14 @@ pub fn multiple_value_stream( } else { // Starts to collect next key value pair. collecting = Some(CollectingState::new(key, value)); - yield multiple_values_collector(current)?; + yield merge_multiple_values(current)?; } } None => collecting = Some(CollectingState::new(key, value)), } } if let Some(current) = collecting.take() { - yield multiple_values_collector(current)? + yield merge_multiple_values(current)? } } } @@ -149,7 +151,7 @@ mod tests { } #[tokio::test] - async fn test_multiple_values_collector() { + async fn test_merge_multiple_values() { let upstream = stream::iter(vec![ Ok(("foo".to_string(), vec![0, 1, 2, 3])), Ok(("foo/0002".to_string(), vec![6, 7])), @@ -192,7 +194,7 @@ mod tests { } #[tokio::test] - async fn test_multiple_values_collector_err() { + async fn test_multiple_values_stream_err() { let upstream = stream::iter(vec![ Err(error::UnexpectedSnafu { err_msg: "mock" }.build()), Ok(("foo".to_string(), vec![0, 1, 2, 3])), diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index fa98d65daebc..851c3f94f0b9 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -26,6 +26,8 @@ common-time.workspace = true common-wal.workspace = true futures.workspace = true futures-util.workspace = true +lazy_static.workspace = true +prometheus.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true rskafka.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index ef84b9a68acf..d80a19d5c38f 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem::size_of; pub(crate) mod client_manager; pub mod log_store; pub(crate) mod util; @@ -69,6 +70,10 @@ impl Entry for EntryImpl { fn namespace(&self) -> Self::Namespace { self.ns.clone() } + + fn estimated_size(&self) -> usize { + size_of::() + self.data.capacity() * size_of::() + self.ns.topic.capacity() + } } impl Display for EntryImpl { @@ -82,3 +87,27 @@ impl Display for EntryImpl { ) } } + +#[cfg(test)] +mod tests { + use std::mem::size_of; + + use store_api::logstore::entry::Entry; + + use crate::kafka::{EntryImpl, NamespaceImpl}; + + #[test] + fn test_estimated_size() { + let entry = EntryImpl { + data: Vec::with_capacity(100), + id: 0, + ns: NamespaceImpl { + region_id: 0, + topic: String::with_capacity(10), + }, + }; + let expected = size_of::() + 100 * size_of::() + 10; + let got = entry.estimated_size(); + assert_eq!(expected, got); + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 49b8deb27906..14f7ba8df1bb 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -22,7 +22,7 @@ use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; use snafu::ResultExt; -use store_api::logstore::entry::Id as EntryId; +use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; @@ -32,6 +32,7 @@ use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::util::offset::Offset; use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; +use crate::metrics; /// A log store backed by Kafka. #[derive(Debug)] @@ -86,6 +87,15 @@ impl LogStore for KafkaLogStore { /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { + metrics::METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL.inc(); + metrics::METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL.inc_by( + entries + .iter() + .map(EntryTrait::estimated_size) + .sum::() as u64, + ); + let _timer = metrics::METRIC_KAFKA_APPEND_BATCH_ELAPSED.start_timer(); + if entries.is_empty() { return Ok(AppendBatchResponse::default()); } @@ -124,6 +134,9 @@ impl LogStore for KafkaLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { + metrics::METRIC_KAFKA_READ_CALLS_TOTAL.inc(); + let _timer = metrics::METRIC_KAFKA_READ_ELAPSED.start_timer(); + // Gets the client associated with the topic. let client = self .client_manager @@ -183,6 +196,9 @@ impl LogStore for KafkaLogStore { })?; let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset); + metrics::METRIC_KAFKA_READ_RECORD_BYTES_TOTAL + .inc_by(kafka_record.approximate_size() as u64); + debug!( "Read a record at offset {} for ns {}, high watermark: {}", offset, ns_clone, high_watermark diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs index 9bc97557ad39..e2035318c4c7 100644 --- a/src/log-store/src/kafka/util/record.rs +++ b/src/log-store/src/kafka/util/record.rs @@ -25,6 +25,7 @@ use crate::error::{ use crate::kafka::client_manager::ClientManagerRef; use crate::kafka::util::offset::Offset; use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; +use crate::metrics; /// The current version of Record. pub(crate) const VERSION: u32 = 0; @@ -97,6 +98,7 @@ impl TryFrom for KafkaRecord { } } +// TODO(niebayes): improve the performance of decoding kafka record. impl TryFrom for Record { type Error = crate::error::Error; @@ -150,6 +152,7 @@ impl RecordProducer { /// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records. /// Returns the offset of the last successfully produced record. + // TODO(niebayes): maybe requires more fine-grained metrics to measure stages of writing to kafka. pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); @@ -173,6 +176,11 @@ impl RecordProducer { for entry in self.entries { for record in build_records(entry, max_record_size) { let kafka_record = KafkaRecord::try_from(record)?; + + metrics::METRIC_KAFKA_PRODUCE_RECORD_COUNTS.inc(); + metrics::METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL + .inc_by(kafka_record.approximate_size() as u64); + // Records of a certain region cannot be produced in parallel since their order must be static. let offset = producer .produce(kafka_record.clone()) diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index a57e76850e2d..c035e5fcff80 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -17,6 +17,7 @@ pub mod error; pub mod kafka; +pub mod metrics; mod noop; pub mod raft_engine; pub mod test_util; diff --git a/src/log-store/src/metrics.rs b/src/log-store/src/metrics.rs new file mode 100644 index 000000000000..bdd03b34c44f --- /dev/null +++ b/src/log-store/src/metrics.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::*; + +/// Logstore label. +pub const LOGSTORE_LABEL: &str = "logstore"; +/// Operation type label. +pub const OPTYPE_LABEL: &str = "optype"; + +lazy_static! { + /// Counters of bytes of each operation on a logstore. + pub static ref METRIC_LOGSTORE_OP_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_op_bytes_total", + "logstore operation bytes total", + &[LOGSTORE_LABEL, OPTYPE_LABEL], + ) + .unwrap(); + /// Counter of bytes of the append_batch operation on the kafka logstore. + pub static ref METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["kafka", "append_batch"], + ); + /// Counter of bytes of the read operation on the kafka logstore. + pub static ref METRIC_KAFKA_READ_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["kafka", "read"], + ); + /// Counter of bytes of the append_batch operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["raft-engine", "append_batch"], + ); + /// Counter of bytes of the read operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_READ_BYTES_TOTAL: IntCounter = METRIC_LOGSTORE_OP_BYTES_TOTAL.with_label_values( + &["raft-engine", "read"], + ); + + /// Counter of bytes of the records read by the kafka logstore. + pub static ref METRIC_KAFKA_READ_RECORD_BYTES_TOTAL: IntCounter = register_int_counter!( + "greptime_kafka_read_record_bytes_total", + "kafka read record bytes total" + ).unwrap(); + + /// Counter of the numbers of the records produced by the kafka logstore. + pub static ref METRIC_KAFKA_PRODUCE_RECORD_COUNTS: IntCounter = register_int_counter!( + "greptime_kafka_produce_record_counts", + "kafka produce record counts", + ).unwrap(); + + /// Counter of bytes of the records produced by the kafka logstore. + pub static ref METRIC_KAFKA_PRODUCE_RECORD_BYTES_TOTAL: IntCounter = register_int_counter!( + "greptime_kafka_produce_record_bytes_total", + "kafka produce record bytes total" + ).unwrap(); + + /// Counters of calls of each operation on a logstore. + pub static ref METRIC_LOGSTORE_OP_CALLS_TOTAL: IntCounterVec = register_int_counter_vec!( + "greptime_logstore_op_calls_total", + "logstore operation calls total", + &[LOGSTORE_LABEL, OPTYPE_LABEL], + ) + .unwrap(); + /// Counter of calls of the append_batch operation on the kafka logstore. + pub static ref METRIC_KAFKA_APPEND_BATCH_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["kafka", "append_batch"], + ); + /// Counter of calls of the read operation on the kafka logstore. + pub static ref METRIC_KAFKA_READ_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["kafka", "read"], + ); + /// Counter of calls of the append_batch operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["raft-engine", "append_batch"], + ); + /// Counter of calls of the read operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_READ_CALLS_TOTAL: IntCounter = METRIC_LOGSTORE_OP_CALLS_TOTAL.with_label_values( + &["raft-engine", "read"], + ); + + /// Timer of operations on a logstore. + pub static ref METRIC_LOGSTORE_OP_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_logstore_op_elapsed", + "logstore operation elapsed", + &[LOGSTORE_LABEL, OPTYPE_LABEL], + ) + .unwrap(); + /// Timer of the append_batch operation on the kafka logstore. + pub static ref METRIC_KAFKA_APPEND_BATCH_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["kafka", "append_batch"]); + /// Timer of the append_batch operation on the kafka logstore. + /// This timer only measures the duration of the read operation, not measures the total duration of replay. + pub static ref METRIC_KAFKA_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["kafka", "read"]); + /// Timer of the append_batch operation on the raft-engine logstore. + pub static ref METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "append_batch"]); + /// Timer of the append_batch operation on the raft-engine logstore. + /// This timer only measures the duration of the read operation, not measures the total duration of replay. + pub static ref METRIC_RAFT_ENGINE_READ_ELAPSED: Histogram = METRIC_LOGSTORE_OP_ELAPSED.with_label_values(&["raft-engine", "read"]); +} diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1ffada095e80..48668c39ec04 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -50,6 +50,10 @@ impl Entry for EntryImpl { fn namespace(&self) -> Self::Namespace { Default::default() } + + fn estimated_size(&self) -> usize { + 0 + } } #[async_trait::async_trait] diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index fe761388f6dd..49082acab041 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::hash::{Hash, Hasher}; +use std::mem::size_of; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -83,4 +84,25 @@ impl Entry for EntryImpl { ..Default::default() } } + + fn estimated_size(&self) -> usize { + size_of::() + self.data.capacity() * size_of::() + } +} + +#[cfg(test)] +mod tests { + use std::mem::size_of; + + use store_api::logstore::entry::Entry; + + use crate::raft_engine::protos::logstore::EntryImpl; + + #[test] + fn test_estimated_size() { + let entry = EntryImpl::create(1, 1, Vec::with_capacity(100)); + let expected = size_of::() + 100; + let got = entry.estimated_size(); + assert_eq!(expected, got); + } } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 13a32e6fe7af..f5fab5f97eed 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -25,7 +25,7 @@ use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::options::WalOptions; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::Id as EntryId; +use store_api::logstore::entry::{Entry as EntryTrait, Id as EntryId}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait}; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; @@ -35,6 +35,7 @@ use crate::error::{ IllegalNamespaceSnafu, IllegalStateSnafu, OverrideCompactedEntrySnafu, RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, }; +use crate::metrics; use crate::raft_engine::backend::SYSTEM_NAMESPACE; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace}; @@ -248,6 +249,15 @@ impl LogStore for RaftEngineLogStore { /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of /// batch append. async fn append_batch(&self, entries: Vec) -> Result { + metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_CALLS_TOTAL.inc(); + metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL.inc_by( + entries + .iter() + .map(EntryTrait::estimated_size) + .sum::() as u64, + ); + let _timer = metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED.start_timer(); + ensure!(self.started(), IllegalStateSnafu); if entries.is_empty() { return Ok(AppendBatchResponse::default()); @@ -280,6 +290,9 @@ impl LogStore for RaftEngineLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { + metrics::METRIC_RAFT_ENGINE_READ_CALLS_TOTAL.inc(); + let _timer = metrics::METRIC_RAFT_ENGINE_READ_ELAPSED.start_timer(); + ensure!(self.started(), IllegalStateSnafu); let engine = self.engine.clone(); diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 13e27b4efd75..137e45f57a6a 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -33,7 +33,7 @@ etcd-client.workspace = true futures.workspace = true h2 = "0.3" http-body = "0.4" -humantime = "2.1" +humantime.workspace = true humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 68b3579298db..fa7690369415 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -18,6 +18,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; @@ -115,6 +116,9 @@ impl Default for MetaSrvOptions { procedure: ProcedureConfig { max_retry_times: 12, retry_delay: Duration::from_millis(500), + // The etcd the maximum size of any request is 1.5 MiB + // 1500KiB = 1536KiB (1.5MiB) - 36KiB (reserved size of key) + max_metadata_value_size: Some(ReadableSize::kb(1500)), }, failure_detector: PhiAccrualFailureDetectorOptions::default(), datanode: DatanodeOptions::default(), diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index e7a561f87bdd..fe327bd5898f 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -376,8 +376,13 @@ fn build_procedure_manager( retry_delay: options.procedure.retry_delay, ..Default::default() }; - let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); - Arc::new(LocalManager::new(manager_config, state_store)) + let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size( + options + .procedure + .max_metadata_value_size + .map(|v| v.as_bytes() as usize), + ); + Arc::new(LocalManager::new(manager_config, Arc::new(state_store))) } fn build_ddl_manager( diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 93f2c3adce41..7f97d7a9c9ab 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -329,22 +329,6 @@ impl StatementExecutor { name: "alter table" } ); - ensure!( - alter_table_exprs - .windows(2) - .all(|expr| expr[0].catalog_name == expr[1].catalog_name), - DdlWithMultiCatalogsSnafu { - ddl_name: "alter tables", - } - ); - ensure!( - alter_table_exprs - .windows(2) - .all(|expr| expr[0].schema_name == expr[1].schema_name), - DdlWithMultiSchemasSnafu { - ddl_name: "alter tables", - } - ); self.alter_logical_tables_procedure(alter_table_exprs) .await?; diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index eb680b1a9bd8..78e435861470 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -40,7 +40,7 @@ datatypes.workspace = true futures = "0.3" futures-util.workspace = true greptime-proto.workspace = true -humantime = "2.1" +humantime.workspace = true lazy_static.workspace = true meter-core.workspace = true meter-macros.workspace = true diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 1748ff5621be..671f55ac35a2 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -35,4 +35,7 @@ pub trait Entry: Send + Sync { /// Returns the namespace of the entry. fn namespace(&self) -> Self::Namespace; + + /// Computes the estimated size in bytes of the entry. + fn estimated_size(&self) -> usize; } diff --git a/src/store-api/src/logstore/entry_stream.rs b/src/store-api/src/logstore/entry_stream.rs index 89a8bd192e42..23d131e451aa 100644 --- a/src/store-api/src/logstore/entry_stream.rs +++ b/src/store-api/src/logstore/entry_stream.rs @@ -31,6 +31,7 @@ pub type SendableEntryStream<'a, I, E> = Pin #[cfg(test)] mod tests { use std::any::Any; + use std::mem::size_of; use std::task::{Context, Poll}; use common_error::ext::StackError; @@ -87,6 +88,10 @@ mod tests { fn namespace(&self) -> Self::Namespace { Namespace {} } + + fn estimated_size(&self) -> usize { + self.data.capacity() * size_of::() + } } impl SimpleEntry { diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index f3b8530441c7..b285be08b907 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -29,7 +29,7 @@ datafusion-physical-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true futures.workspace = true -humantime = "2.1" +humantime.workspace = true humantime-serde.workspace = true paste = "1.0" serde.workspace = true diff --git a/tests-fuzz/Cargo.toml b/tests-fuzz/Cargo.toml index ce216de41970..abe17f5e978d 100644 --- a/tests-fuzz/Cargo.toml +++ b/tests-fuzz/Cargo.toml @@ -40,6 +40,7 @@ sqlx = { version = "0.6", features = [ ] } [dev-dependencies] +dotenv.workspace = true tokio = { workspace = true } [[bin]] diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1e549c6f9337..aa40cae92ac0 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -36,7 +36,7 @@ common-test-util.workspace = true common-wal.workspace = true datanode = { workspace = true } datatypes.workspace = true -dotenv = "0.15" +dotenv.workspace = true frontend = { workspace = true, features = ["testing"] } futures.workspace = true meta-client.workspace = true diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 4ca617b2ab8c..b4eb412856e1 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -174,6 +174,7 @@ impl GreptimeDbClusterBuilder { // We only make max_retry_times and retry_delay large than the default in tests. max_retry_times: 5, retry_delay: Duration::from_secs(1), + max_metadata_value_size: None, }, wal: self.metasrv_wal_config.clone(), ..Default::default()