Skip to content

Commit

Permalink
refactor: reuse aligned ts array in range manipulate exec (#4535)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Aug 12, 2024
1 parent 90cfe27 commit 9bcaeaa
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,16 @@ impl ExecutionPlan for RangeManipulateExec {
.0
})
.collect();
let aligned_ts_array =
RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
Ok(Box::pin(RangeManipulateStream {
start: self.start,
end: self.end,
interval: self.interval,
range: self.range,
time_index,
field_columns,
aligned_ts_array,
output_schema: self.output_schema.clone(),
input,
metric: baseline_metric,
Expand Down Expand Up @@ -405,6 +408,7 @@ pub struct RangeManipulateStream {
range: Millisecond,
time_index: usize,
field_columns: Vec<usize>,
aligned_ts_array: ArrayRef,

output_schema: SchemaRef,
input: SendableRecordBatchStream,
Expand Down Expand Up @@ -447,7 +451,7 @@ impl RangeManipulateStream {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let (aligned_ts, ranges) = self.calculate_range(&input)?;
let ranges = self.calculate_range(&input)?;
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
Expand Down Expand Up @@ -479,17 +483,20 @@ impl RangeManipulateStream {
new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
}
// replace timestamp with the aligned one
new_columns[self.time_index] = aligned_ts;
new_columns[self.time_index] = self.aligned_ts_array.clone();

RecordBatch::try_new(self.output_schema.clone(), new_columns)
.map(Some)
.map_err(|e| DataFusionError::ArrowError(e, None))
}

fn calculate_range(
&self,
input: &RecordBatch,
) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> {
fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
Arc::new(TimestampMillisecondArray::from_iter_values(
(start..=end).step_by(interval as _),
))
}

fn calculate_range(&self, input: &RecordBatch) -> DataFusionResult<Vec<(u32, u32)>> {
let ts_column = input
.column(self.time_index)
.as_any()
Expand All @@ -500,12 +507,10 @@ impl RangeManipulateStream {
)
})?;

let mut aligned_ts = vec![];
let mut ranges = vec![];

// calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
for curr_ts in (self.start..=self.end).step_by(self.interval as _) {
aligned_ts.push(curr_ts);
let mut range_start = ts_column.len();
let mut range_end = 0;
for (index, ts) in ts_column.values().iter().enumerate() {
Expand All @@ -525,9 +530,7 @@ impl RangeManipulateStream {
}
}

let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _;

Ok((aligned_ts_array, ranges))
Ok(ranges)
}
}

Expand Down

0 comments on commit 9bcaeaa

Please sign in to comment.