From 86fb9d8ac7513eb03a4a27bf112e7f736a2ee58b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:09:00 +0800 Subject: [PATCH 01/21] refactor: remove redudant PromStoreProtocolHandler::write (#3553) refactor: remove redudant PromStoreProtocolHandler::write API and rename PromStoreProtocolHandler::write_fast to write --- src/frontend/src/instance/prom_store.rs | 54 ++--------------------- src/frontend/src/metrics.rs | 7 --- src/servers/src/export_metrics.rs | 15 ++++++- src/servers/src/http/prom_store.rs | 32 +++++++------- src/servers/src/metrics.rs | 7 +-- src/servers/src/proto.rs | 5 ++- src/servers/src/query_handler.rs | 10 +---- src/servers/tests/http/prom_store_test.rs | 11 +---- tests-integration/src/prom_store.rs | 4 +- 9 files changed, 46 insertions(+), 99 deletions(-) diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 1b5aa14c5ff6..104573bf8621 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::prom_store::remote::read_request::ResponseType; -use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; +use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse}; use api::v1::RowInsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -46,7 +46,6 @@ use crate::error::{ TableNotFoundSnafu, }; use crate::instance::Instance; -use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -163,43 +162,6 @@ impl Instance { #[async_trait] impl PromStoreProtocolHandler for Instance { async fn write( - &self, - request: WriteRequest, - ctx: QueryContextRef, - with_metric_engine: bool, - ) -> ServerResult { - self.plugins - .get::() - .as_ref() - .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) - .context(AuthSnafu)?; - let interceptor_ref = self - .plugins - .get::>(); - interceptor_ref.pre_write(&request, ctx.clone())?; - - let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?; - let output = if with_metric_engine { - let physical_table = ctx - .extension(PHYSICAL_TABLE_PARAM) - .unwrap_or(GREPTIME_PHYSICAL_TABLE) - .to_string(); - self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string()) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)? - } else { - self.handle_row_inserts(requests, ctx.clone()) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)? - }; - - PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(output) - } - - async fn write_fast( &self, request: RowInsertRequests, ctx: QueryContextRef, @@ -316,14 +278,13 @@ impl ExportMetricHandler { impl PromStoreProtocolHandler for ExportMetricHandler { async fn write( &self, - request: WriteRequest, + request: RowInsertRequests, ctx: QueryContextRef, _: bool, ) -> ServerResult { - let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?; self.inserter .handle_metric_row_inserts( - requests, + request, ctx, &self.statement_executor, GREPTIME_PHYSICAL_TABLE.to_string(), @@ -333,15 +294,6 @@ impl PromStoreProtocolHandler for ExportMetricHandler { .context(error::ExecuteGrpcQuerySnafu) } - async fn write_fast( - &self, - _request: RowInsertRequests, - _ctx: QueryContextRef, - _with_metric_engine: bool, - ) -> ServerResult { - unimplemented!() - } - async fn read( &self, _request: ReadRequest, diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 5c3c6122e492..db9d53ac19aa 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -41,13 +41,6 @@ lazy_static! { .with_label_values(&["insert"]); pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED .with_label_values(&["execute"]); - - /// The samples count of Prometheus remote write. - pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!( - "greptime_frontend_prometheus_remote_write_samples", - "frontend prometheus remote write samples" - ) - .unwrap(); pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_metrics_rows", "frontend otlp metrics rows" diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index 014f58e52678..3ec089db411b 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -29,7 +29,7 @@ use snafu::{ensure, ResultExt}; use tokio::time::{self, Interval}; use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu}; -use crate::prom_store::snappy_compress; +use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests}; use crate::query_handler::PromStoreProtocolHandlerRef; /// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), @@ -256,8 +256,19 @@ pub async fn write_system_metric_by_handler( filter.as_ref(), Timestamp::current_millis().value(), ); - if let Err(e) = handler.write(request, ctx.clone(), false).await { + + let (requests, samples) = match to_grpc_row_insert_requests(&request) { + Ok((requests, samples)) => (requests, samples), + Err(e) => { + error!(e; "Failed to convert gathered metrics to RowInsertRequests"); + continue; + } + }; + + if let Err(e) = handler.write(requests, ctx.clone(), false).await { error!("report export metrics by handler failed, error {}", e); + } else { + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); } } } diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 514bf6d22c92..bf3f248eb005 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::prom_store::remote::ReadRequest; use api::v1::RowInsertRequests; use axum::extract::{Query, RawBody, State}; use axum::http::{header, HeaderValue, StatusCode}; @@ -75,13 +75,14 @@ pub async fn route_write_without_metric_engine( .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_write_request(body).await?; + let (request, samples) = decode_remote_write_request(body).await?; // reject if physical table is specified when metric engine is disabled if params.physical_table.is_some() { return UnexpectedPhysicalTableSnafu {}.fail(); } let output = handler.write(request, query_ctx, false).await?; + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(( StatusCode::NO_CONTENT, write_cost_header_map(output.meta.cost), @@ -104,7 +105,7 @@ pub async fn remote_write( .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_write_request_to_row_inserts(body).await?; + let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?; if let Some(physical_table) = params.physical_table { let mut new_query_ctx = query_ctx.as_ref().clone(); @@ -112,7 +113,8 @@ pub async fn remote_write( query_ctx = Arc::new(new_query_ctx); } - let output = handler.write_fast(request, query_ctx, true).await?; + let output = handler.write(request, query_ctx, true).await?; + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(( StatusCode::NO_CONTENT, write_cost_header_map(output.meta.cost), @@ -159,7 +161,9 @@ pub async fn remote_read( handler.read(request, query_ctx).await } -async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result { +async fn decode_remote_write_request_to_row_inserts( + body: Body, +) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) .await @@ -171,24 +175,22 @@ async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result Result { +async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) .await .context(error::HyperSnafu)?; - let buf = snappy_decompress(&body[..])?; - - let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?; - crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES - .observe(request.timeseries.len() as f64); + let buf = Bytes::from(snappy_decompress(&body[..])?); - Ok(request) + let mut request = PromWriteRequest::default(); + request + .merge(buf) + .context(error::DecodePromRemoteRequestSnafu)?; + Ok(request.as_row_insert_requests()) } async fn decode_remote_read_request(body: Body) -> Result { diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 41d56702bc96..5fc63ba7d838 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -103,9 +103,10 @@ lazy_static! { /// Duration to convert prometheus write request to gRPC request. pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED .with_label_values(&["convert"]); - pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!( - "greptime_servers_http_prometheus_decode_num_series", - "servers http prometheus decode num series", + /// The samples count of Prometheus remote write. + pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!( + "greptime_servers_prometheus_remote_write_samples", + "frontend prometheus remote write samples" ) .unwrap(); /// Http prometheus read duration per database. diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 9ea907306c71..51b8fb05b63c 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -40,7 +40,10 @@ pub struct PromLabel { } impl Clear for PromLabel { - fn clear(&mut self) {} + fn clear(&mut self) { + self.name.clear(); + self.value.clear(); + } } impl PromLabel { diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index f68dc1d02e89..0430005aed7d 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -28,7 +28,7 @@ pub mod sql; use std::collections::HashMap; use std::sync::Arc; -use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::prom_store::remote::ReadRequest; use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; @@ -90,14 +90,6 @@ pub struct PromStoreResponse { pub trait PromStoreProtocolHandler { /// Handling prometheus remote write requests async fn write( - &self, - request: WriteRequest, - ctx: QueryContextRef, - with_metric_engine: bool, - ) -> Result; - - /// Handling prometheus remote write requests - async fn write_fast( &self, request: RowInsertRequests, ctx: QueryContextRef, diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 5d6b12bc9d1c..1a005fc0bd77 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -58,16 +58,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl PromStoreProtocolHandler for DummyInstance { - async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result { - let _ = self - .tx - .send((ctx.current_schema().to_owned(), request.encode_to_vec())) - .await; - - Ok(Output::new_with_affected_rows(0)) - } - - async fn write_fast( + async fn write( &self, _request: RowInsertRequests, _ctx: QueryContextRef, diff --git a/tests-integration/src/prom_store.rs b/tests-integration/src/prom_store.rs index 4c0c7a4a530f..73502c4f741e 100644 --- a/tests-integration/src/prom_store.rs +++ b/tests-integration/src/prom_store.rs @@ -25,6 +25,7 @@ mod tests { use prost::Message; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::prom_store; + use servers::prom_store::to_grpc_row_insert_requests; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::PromStoreProtocolHandler; use session::context::QueryContext; @@ -107,8 +108,9 @@ mod tests { .unwrap() .is_ok()); + let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap(); instance - .write(write_request, ctx.clone(), true) + .write(row_inserts, ctx.clone(), true) .await .unwrap(); From c9ac72e7f86b007382df806bb060442ba883cd3e Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 22 Mar 2024 11:25:01 +0800 Subject: [PATCH 02/21] ci: use a PAT to list all writers (#3559) Signed-off-by: tison --- .github/workflows/unassign.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unassign.yml b/.github/workflows/unassign.yml index 3772bc499d21..e2d793b8ab11 100644 --- a/.github/workflows/unassign.yml +++ b/.github/workflows/unassign.yml @@ -17,5 +17,5 @@ jobs: - name: Auto Unassign uses: tisonspieces/auto-unassign@main with: - token: ${{ secrets.GITHUB_TOKEN }} + token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }} repository: ${{ github.repository }} From 9f020aa4144b7f8dfd3fdf624b842d4866f607db Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 22 Mar 2024 12:08:37 +0800 Subject: [PATCH 03/21] fix(flow): Arrange get range with batch unaligned (#3552) * fix: Arrange get range with batch unaligned * chore: per review * refactor: sort at apply_updates --- src/flow/src/utils.rs | 287 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 269 insertions(+), 18 deletions(-) diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 41114e1f0cb2..01c6539d2dd2 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, BTreeSet}; +use std::ops::Bound; use std::sync::Arc; use itertools::Itertools; @@ -24,11 +25,12 @@ use crate::expr::error::InternalSnafu; use crate::expr::{EvalError, ScalarExpr}; use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, Row, Timestamp}; +pub type Batch = BTreeMap>; +pub type Spine = BTreeMap; + /// Determine when should a key expire according to it's event timestamp in key, /// if a key is expired, any future updates to it should be ignored /// Note that key is expired by it's event timestamp(contained in the key), not by the time it's inserted(system timestamp) -/// -/// TODO(discord9): find a better way to handle key expiration, like write to disk or something instead of throw away #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] pub struct KeyExpiryManager { /// a map from event timestamp to key, used for expire keys @@ -121,13 +123,16 @@ pub struct Arrangement { /// And for consolidated batch(i.e. btach representing now), there should be only one update for each key with `diff==1` /// /// And since most time a key gots updated by first delete then insert, small vec with size of 2 make sense - spine: BTreeMap>>, + /// TODO: batch size balancing? + spine: Spine, /// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce` full_arrangement: bool, /// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared is_written: bool, /// manage the expire state of the arrangement expire_state: Option, + /// the time that the last compaction happened, also know as current time + last_compaction_time: Option, } impl Arrangement { @@ -137,6 +142,7 @@ impl Arrangement { full_arrangement: false, is_written: false, expire_state: None, + last_compaction_time: None, } } @@ -160,6 +166,7 @@ impl Arrangement { continue; } } + // the first batch with key that's greater or equal to ts let batch = if let Some((_, batch)) = self.spine.range_mut(ts..).next() { batch @@ -171,20 +178,87 @@ impl Arrangement { { let key_updates = batch.entry(key).or_insert(smallvec![]); key_updates.push((val, ts, diff)); + // a stable sort make updates sort in order of insertion + // without changing the order of updates within same tick + key_updates.sort_by_key(|r| r.1); } } Ok(max_late_by) } + /// find out the time of next update in the future + /// that is the next update with `timestamp > now` + pub fn get_next_update_time(&self, now: &Timestamp) -> Option { + // iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch + let next_batches = self.spine.range((Bound::Excluded(now), Bound::Unbounded)); + for (_ts, batch) in next_batches { + let min_ts = batch + .iter() + .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts)) + .min(); + if let Some(min_ts) = min_ts { + return Some(min_ts); + } else { + continue; + } + } + // all batches are empty, return now + None + } + + /// get the last compaction time + pub fn get_compaction(&self) -> Option { + self.last_compaction_time + } + + /// split spine off at `now`, and return the spine that's before `now`(including `now`) + fn split_lte(&mut self, now: &Timestamp) -> Spine { + let mut before = self.spine.split_off(&(now + 1)); + std::mem::swap(&mut before, &mut self.spine); + + // if before's last key == now, then all the keys we needed are found + if before + .last_key_value() + .map(|(k, _v)| *k == *now) + .unwrap_or(false) + { + return before; + } + + // also need to move all keys from the first batch in spine with timestamp<=now to before + // we know that all remaining keys to be split off are last key < key <= now, we will make them into a new batch + if let Some(mut first_batch) = self.spine.first_entry() { + let mut new_batch: Batch = Default::default(); + // remove all keys with val of empty vec + first_batch.get_mut().retain(|key, updates| { + // remove keys <= now from updates + updates.retain(|(val, ts, diff)| { + if *ts <= *now { + new_batch.entry(key.clone()).or_insert(smallvec![]).push(( + val.clone(), + *ts, + *diff, + )); + } + *ts > *now + }); + !updates.is_empty() + }); + + before.entry(*now).or_default().extend(new_batch); + } + before + } + /// advance time to `now` and consolidate all older(`now` included) updates to the first key /// /// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired pub fn set_compaction(&mut self, now: Timestamp) -> Result, EvalError> { let mut max_late_by: Option = None; - let mut should_compact = self.spine.split_off(&(now + 1)); - std::mem::swap(&mut should_compact, &mut self.spine); + let should_compact = self.split_lte(&now); + self.last_compaction_time = Some(now); // if a full arrangement is not needed, we can just discard everything before and including now if !self.full_arrangement { return Ok(None); @@ -221,18 +295,62 @@ impl Arrangement { } /// get the updates of the arrangement from the given range of time - pub fn get_updates_in_range>( + pub fn get_updates_in_range + Clone>( &self, range: R, ) -> Vec { let mut result = vec![]; - for (_ts, batch) in self.spine.range(range) { - for (key, updates) in batch.clone() { - for (val, ts, diff) in updates { - result.push(((key.clone(), val), ts, diff)); + // three part: + // 1.the starting batch with first key >= range.start, which may contain updates that not in range + // 2. the batches with key in range + // 3. the last batch with first key > range.end, which may contain updates that are in range + let mut is_first = true; + for (_ts, batch) in self.spine.range(range.clone()) { + if is_first { + for (key, updates) in batch { + let iter = updates + .iter() + .filter(|(_val, ts, _diff)| range.contains(ts)) + .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)); + result.extend(iter); + } + is_first = false; + } else { + for (key, updates) in batch.clone() { + result.extend( + updates + .iter() + .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)), + ); } } } + + // deal with boundary include start and end + // and for the next batch with upper_bound >= range.end + // we need to search for updates within range + let neg_bound = match range.end_bound() { + Bound::Included(b) => { + // if boundary is aligned, the last batch in range actually cover the full range + // then there will be no further keys we need in the next batch + if self.spine.contains_key(b) { + return result; + } + Bound::Excluded(*b) + } + Bound::Excluded(b) => Bound::Included(*b), + Bound::Unbounded => return result, + }; + let search_range = (neg_bound, Bound::Unbounded); + if let Some(last_batch) = self.spine.range(search_range).next() { + for (key, updates) in last_batch.1 { + let iter = updates + .iter() + .filter(|(_val, ts, _diff)| range.contains(ts)) + .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)); + result.extend(iter); + } + }; result } @@ -260,11 +378,12 @@ impl Arrangement { /// get current state of things /// useful for query existing keys(i.e. reduce and join operator need to query existing state) pub fn get(&self, now: Timestamp, key: &Row) -> Option<(Row, Timestamp, Diff)> { - if self - .spine - .first_key_value() - .map(|(ts, _)| *ts >= now) - .unwrap_or(false) + if self.full_arrangement + && self + .spine + .first_key_value() + .map(|(ts, _)| *ts >= now) + .unwrap_or(false) { self.spine .first_key_value() @@ -272,10 +391,38 @@ impl Arrangement { } else { // check keys <= now to know current value let mut final_val = None; - for (_ts, batch) in self.spine.range(..=now) { + + let with_extra_batch = { + let unaligned = self.spine.range(..=now); + if unaligned + .clone() + .last() + .map(|(ts, _)| *ts == now) + .unwrap_or(false) + { + // this extra chain is there just to make type the same + unaligned.chain(None) + } else { + // if the last key is not equal to now, then we need to include the next batch + // because we know last batch key < now < next batch key + // therefore next batch may contain updates that we want + unaligned.chain( + self.spine + .range((Bound::Excluded(now), Bound::Unbounded)) + .next(), + ) + } + }; + for (ts, batch) in with_extra_batch { if let Some(new_rows) = batch.get(key).map(|v| v.iter()) { - for new_row in new_rows { - final_val = compact_diff_row(final_val, new_row); + if *ts <= now { + for new_row in new_rows { + final_val = compact_diff_row(final_val, new_row); + } + } else { + for new_row in new_rows.filter(|new_row| new_row.1 <= now) { + final_val = compact_diff_row(final_val, new_row); + } } } } @@ -530,4 +677,108 @@ mod test { assert_eq!(arr.get(12, &Row::new(vec![1i64.into()])), None); } } + + /// test if split_lte get ranges that are not aligned with batch boundaries + /// this split_lte can correctly retrieve all updates in the range, including updates that are in the batches + /// near the boundary of input range + #[test] + fn test_split_off() { + let mut arr = Arrangement::new(); + // manually create batch ..=1 and 2..=3 + arr.spine.insert(1, Default::default()); + arr.spine.insert(3, Default::default()); + arr.apply_updates( + 2, + vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 2, 1)], + ) + .unwrap(); + // updates falls into the range of 2..=3 + let mut arr1 = arr.clone(); + { + assert_eq!(arr.get_next_update_time(&1), Some(2)); + // split expect to take batch ..=1 and create a new batch 2..=2(which contain update) + let split = &arr.split_lte(&2); + assert_eq!(split.len(), 2); + assert_eq!(split[&2].len(), 1); + let _ = &arr.split_lte(&3); + assert_eq!(arr.get_next_update_time(&1), None); + } + { + // take all updates with timestamp <=1, will get no updates + let split = &arr1.split_lte(&1); + assert_eq!(split.len(), 1); + } + } + + /// test if get ranges is not aligned with boundary of batch, + /// whether can get correct result + #[test] + fn test_get_by_range() { + let mut arr = Arrangement::new(); + + // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch + // TODO(discord9): manually set batch + let updates: Vec = vec![ + ((Row::new(vec![1i64.into()]), Row::empty()), 2, 1), + ((Row::new(vec![1i64.into()]), Row::empty()), 1, 1), + ((Row::new(vec![2i64.into()]), Row::empty()), 4, 1), + ((Row::new(vec![3i64.into()]), Row::empty()), 3, 1), + ((Row::new(vec![3i64.into()]), Row::empty()), 6, 1), + ((Row::new(vec![1i64.into()]), Row::empty()), 5, 1), + ]; + arr.apply_updates(0, updates).unwrap(); + assert_eq!( + arr.get_updates_in_range(2..=5), + vec![ + ((Row::new(vec![1i64.into()]), Row::empty()), 2, 1), + ((Row::new(vec![2i64.into()]), Row::empty()), 4, 1), + ((Row::new(vec![3i64.into()]), Row::empty()), 3, 1), + ((Row::new(vec![1i64.into()]), Row::empty()), 5, 1), + ] + ); + } + + /// test if get with range unaligned with batch boundary + /// can get correct result + #[test] + fn test_get_unaligned() { + let mut arr = Arrangement::new(); + + // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch + // TODO(discord9): manually set batch + let key = Row::new(vec![1i64.into()]); + let updates: Vec = vec![ + ((key.clone(), Row::new(vec![1i64.into()])), 2, 1), + ((key.clone(), Row::new(vec![2i64.into()])), 1, 1), + ((key.clone(), Row::new(vec![3i64.into()])), 4, 1), + ((key.clone(), Row::new(vec![4i64.into()])), 3, 1), + ((key.clone(), Row::new(vec![5i64.into()])), 6, 1), + ((key.clone(), Row::new(vec![6i64.into()])), 5, 1), + ]; + arr.apply_updates(0, updates).unwrap(); + // aligned with batch boundary + assert_eq!(arr.get(2, &key), Some((Row::new(vec![1i64.into()]), 2, 1))); + // unaligned with batch boundary + assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1))); + } + + /// test if out of order updates can be sorted correctly + #[test] + fn test_out_of_order_apply_updates() { + let mut arr = Arrangement::new(); + + let key = Row::new(vec![1i64.into()]); + let updates: Vec = vec![ + ((key.clone(), Row::new(vec![5i64.into()])), 6, 1), + ((key.clone(), Row::new(vec![2i64.into()])), 2, -1), + ((key.clone(), Row::new(vec![1i64.into()])), 2, 1), + ((key.clone(), Row::new(vec![2i64.into()])), 1, 1), + ((key.clone(), Row::new(vec![3i64.into()])), 4, 1), + ((key.clone(), Row::new(vec![4i64.into()])), 3, 1), + ((key.clone(), Row::new(vec![6i64.into()])), 5, 1), + ]; + arr.apply_updates(0, updates.clone()).unwrap(); + let sorted = updates.iter().sorted_by_key(|r| r.1).cloned().collect_vec(); + assert_eq!(arr.get_updates_in_range(1..7), sorted); + } } From 3420a010e6723d98ac37f9fee7c761f201c6651b Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 22 Mar 2024 12:46:17 +0800 Subject: [PATCH 04/21] refactor: reduce one clone by carefully pass ready boundary (#3543) * refactor: reduce one clone by carefully pass ready boundary Signed-off-by: tison * defensive handle None Signed-off-by: tison * tidy code a bit Signed-off-by: tison * except batch exist Signed-off-by: tison --------- Signed-off-by: tison --- .../src/extension_plan/series_divide.rs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index be41a2eed872..4abf8d87885a 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -255,23 +255,23 @@ impl Stream for SeriesDivideStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - // It has to be cloned here, otherwise the later ready! will mess things up - if let Some(batch) = self.buffer.clone() { - let same_length = self.find_first_diff_row(&batch) + 1; + if let Some(batch) = self.buffer.as_ref() { + let same_length = self.find_first_diff_row(batch) + 1; if same_length >= batch.num_rows() { - let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) { - Some(Ok(batch)) => batch, - None => { - self.buffer = None; - self.num_series += 1; - return Poll::Ready(Some(Ok(batch))); - } - error => return Poll::Ready(error), - }; - let new_batch = - compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?; - self.buffer = Some(new_batch); - continue; + let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?; + // SAFETY: if-let guards the buffer is not None; + // and we cannot change the buffer at this point. + let batch = self.buffer.take().expect("this batch must exist"); + if let Some(next_batch) = next_batch { + self.buffer = Some(compute::concat_batches( + &batch.schema(), + &[batch, next_batch], + )?); + continue; + } else { + self.num_series += 1; + return Poll::Ready(Some(Ok(batch))); + } } else { let result_batch = batch.slice(0, same_length); let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length); From 8345f1753cfe59b5f51e62395fe60c49f77b14b2 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 22 Mar 2024 19:16:36 +0800 Subject: [PATCH 05/21] chore: avoid confusing TryFrom (#3565) Signed-off-by: tison --- src/auth/src/common.rs | 2 +- .../src/user_provider/static_user_provider.rs | 22 +++++++++---------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/auth/src/common.rs b/src/auth/src/common.rs index 16e1ecec2b4c..109a98175d4c 100644 --- a/src/auth/src/common.rs +++ b/src/auth/src/common.rs @@ -40,7 +40,7 @@ pub fn user_provider_from_option(opt: &String) -> Result { match name { STATIC_USER_PROVIDER => { let provider = - StaticUserProvider::try_from(content).map(|p| Arc::new(p) as UserProviderRef)?; + StaticUserProvider::new(content).map(|p| Arc::new(p) as UserProviderRef)?; Ok(provider) } _ => InvalidConfigSnafu { diff --git a/src/auth/src/user_provider/static_user_provider.rs b/src/auth/src/user_provider/static_user_provider.rs index 591a116b9281..e6d474389431 100644 --- a/src/auth/src/user_provider/static_user_provider.rs +++ b/src/auth/src/user_provider/static_user_provider.rs @@ -23,7 +23,7 @@ use secrecy::ExposeSecret; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - Error, IllegalParamSnafu, InvalidConfigSnafu, IoSnafu, Result, UnsupportedPasswordTypeSnafu, + IllegalParamSnafu, InvalidConfigSnafu, IoSnafu, Result, UnsupportedPasswordTypeSnafu, UserNotFoundSnafu, UserPasswordMismatchSnafu, }; use crate::user_info::DefaultUserInfo; @@ -31,10 +31,12 @@ use crate::{auth_mysql, Identity, Password, UserInfoRef, UserProvider}; pub(crate) const STATIC_USER_PROVIDER: &str = "static_user_provider"; -impl TryFrom<&str> for StaticUserProvider { - type Error = Error; +pub(crate) struct StaticUserProvider { + users: HashMap>, +} - fn try_from(value: &str) -> Result { +impl StaticUserProvider { + pub(crate) fn new(value: &str) -> Result { let (mode, content) = value.split_once(':').context(InvalidConfigSnafu { value: value.to_string(), msg: "StaticUserProviderOption must be in format `