diff --git a/config/config.md b/config/config.md
index ec00eb98b730..1f034d28731d 100644
--- a/config/config.md
+++ b/config/config.md
@@ -136,7 +136,6 @@
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
-| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
@@ -464,7 +463,6 @@
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
-| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index c5fdd24ebe14..11c2794e61df 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -492,12 +492,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
-## Parallelism to scan a region (default: 1/4 of cpu cores).
-## - `0`: using the default value (1/4 of cpu cores).
-## - `1`: scan in current thread.
-## - `n`: scan in parallelism n.
-scan_parallelism = 0
-
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index deaf8900f213..a69295af1644 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -530,12 +530,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
-## Parallelism to scan a region (default: 1/4 of cpu cores).
-## - `0`: using the default value (1/4 of cpu cores).
-## - `1`: scan in current thread.
-## - `n`: scan in parallelism n.
-scan_parallelism = 0
-
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 454188141d14..c5f1111d37b6 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -69,7 +69,6 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
- scan_parallelism: 0,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
@@ -205,7 +204,6 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
- scan_parallelism: 0,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index a4094af74121..5f462f33a111 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -597,9 +597,8 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
- SeqScan::new(scan_input)
- .with_compaction()
- .build_reader()
+ SeqScan::new(scan_input, true)
+ .build_reader_for_compaction()
.await
}
}
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 797c42f8084c..cb4022f65e57 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -30,7 +30,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
-const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
+pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -107,11 +107,6 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
- /// Parallelism to scan a region (default: 1/4 of cpu cores).
- /// - 0: using the default value (1/4 of cpu cores).
- /// - 1: scan in current thread.
- /// - n: scan in parallelism n.
- pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
@@ -156,7 +151,6 @@ impl Default for MitoConfig {
experimental_write_cache_size: ReadableSize::gb(1),
experimental_write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
- scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
index: IndexConfig::default(),
@@ -229,11 +223,6 @@ impl MitoConfig {
);
}
- // Use default value if `scan_parallelism` is 0.
- if self.scan_parallelism == 0 {
- self.scan_parallelism = divide_num_cpus(4);
- }
-
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index c60b7c4107ed..a518da32535d 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -90,7 +90,7 @@ use crate::error::{
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
-use crate::read::scan_region::{ScanParallelism, ScanRegion, Scanner};
+use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
@@ -171,19 +171,9 @@ impl MitoEngine {
self.scan_region(region_id, request)?.scanner()
}
- /// Returns a region scanner to scan the region for `request`.
- fn region_scanner(
- &self,
- region_id: RegionId,
- request: ScanRequest,
- ) -> Result {
- let scanner = self.scanner(region_id, request)?;
- scanner.region_scanner()
- }
-
/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result {
- self.inner.handle_query(region_id, request)
+ self.inner.scan_region(region_id, request)
}
/// Edit region's metadata by [RegionEdit] directly. Use with care.
@@ -423,7 +413,7 @@ impl EngineInner {
}
/// Handles the scan `request` and returns a [ScanRegion].
- fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result {
+ fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self
@@ -433,14 +423,10 @@ impl EngineInner {
let version = region.version();
// Get cache.
let cache_manager = self.workers.cache_manager();
- let scan_parallelism = ScanParallelism {
- parallelism: self.config.scan_parallelism,
- channel_size: self.config.parallel_scan_channel_size,
- };
let scan_region =
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
- .with_parallelism(scan_parallelism)
+ .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
@@ -538,7 +524,9 @@ impl RegionEngine for MitoEngine {
region_id: RegionId,
request: ScanRequest,
) -> Result {
- self.region_scanner(region_id, request)
+ self.scan_region(region_id, request)
+ .map_err(BoxedError::new)?
+ .region_scanner()
.map_err(BoxedError::new)
}
diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs
index ab8515aa133c..c9f61c5db3e0 100644
--- a/src/mito2/src/engine/append_mode_test.rs
+++ b/src/mito2/src/engine/append_mode_test.rs
@@ -92,7 +92,6 @@ async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
- scan_parallelism: 2,
..Default::default()
})
.await;
@@ -176,19 +175,19 @@ async fn test_append_mode_compaction() {
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
- let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
+ let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
+ scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
- // Reopens engine with parallelism 1.
+ // Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
- scan_parallelism: 1,
..Default::default()
},
)
diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs
index 08f4d0565007..e74aba5655a3 100644
--- a/src/mito2/src/engine/merge_mode_test.rs
+++ b/src/mito2/src/engine/merge_mode_test.rs
@@ -92,7 +92,6 @@ async fn test_merge_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
- scan_parallelism: 2,
..Default::default()
})
.await;
@@ -190,19 +189,19 @@ async fn test_merge_mode_compaction() {
| a | | 13.0 | 1970-01-01T00:00:03 |
+-------+---------+---------+---------------------+";
// Scans in parallel.
- let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
+ let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
+ scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
- // Reopens engine with parallelism 1.
+ // Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
- scan_parallelism: 1,
..Default::default()
},
)
diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs
index 53cc0dca8fb0..3d5dab3540e1 100644
--- a/src/mito2/src/engine/parallel_test.rs
+++ b/src/mito2/src/engine/parallel_test.rs
@@ -37,7 +37,6 @@ async fn scan_in_parallel(
) {
let engine = env
.open_engine(MitoConfig {
- scan_parallelism: parallelism,
parallel_scan_channel_size: channel_size,
..Default::default()
})
@@ -57,7 +56,9 @@ async fn scan_in_parallel(
.unwrap();
let request = ScanRequest::default();
- let stream = engine.scan_to_stream(region_id, request).await.unwrap();
+ let mut scanner = engine.scanner(region_id, request).unwrap();
+ scanner.set_target_partitions(parallelism);
+ let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs
index 1944d171dd19..554751830ffc 100644
--- a/src/mito2/src/read/range.rs
+++ b/src/mito2/src/read/range.rs
@@ -34,6 +34,16 @@ use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
+/// Index and metadata for a memtable or file.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub(crate) struct SourceIndex {
+ /// Index of the memtable and file.
+ pub(crate) index: usize,
+ /// Total number of row groups in this source. 0 if the metadata
+ /// is unavailable. We use this to split files.
+ pub(crate) num_row_groups: u64,
+}
+
/// Index to access a row group.
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
@@ -52,7 +62,7 @@ pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
/// Indices to memtables or files.
- indices: SmallVec<[usize; 2]>,
+ pub(crate) indices: SmallVec<[SourceIndex; 2]>,
/// Indices to memtable/file row groups that this range scans.
pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
/// Estimated number of rows in the range. This can be 0 if the statistics are not available.
@@ -81,12 +91,17 @@ impl RangeMeta {
}
/// Creates a list of ranges from the `input` for seq scan.
- pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec {
+ /// If `compaction` is true, it doesn't split the ranges.
+ pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
+ if compaction {
+ // We don't split ranges in compaction.
+ return ranges;
+ }
maybe_split_ranges_for_seq_scan(ranges)
}
@@ -105,13 +120,13 @@ impl RangeMeta {
}
/// Returns true if the time range of given `meta` overlaps with the time range of this meta.
- pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool {
+ fn overlaps(&self, meta: &RangeMeta) -> bool {
overlaps(&self.time_range, &meta.time_range)
}
/// Merges given `meta` to this meta.
/// It assumes that the time ranges overlap and they don't have the same file or memtable index.
- pub(crate) fn merge(&mut self, mut other: RangeMeta) {
+ fn merge(&mut self, mut other: RangeMeta) {
debug_assert!(self.overlaps(&other));
debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
debug_assert!(self
@@ -130,22 +145,28 @@ impl RangeMeta {
/// Returns true if we can split the range into multiple smaller ranges and
/// still preserve the order for [SeqScan].
- pub(crate) fn can_split_preserve_order(&self) -> bool {
- // Only one source and multiple row groups.
- self.indices.len() == 1 && self.row_group_indices.len() > 1
+ fn can_split_preserve_order(&self) -> bool {
+ self.indices.len() == 1 && self.indices[0].num_row_groups > 1
}
/// Splits the range if it can preserve the order.
- pub(crate) fn maybe_split(self, output: &mut Vec) {
+ fn maybe_split(self, output: &mut Vec) {
if self.can_split_preserve_order() {
+ let num_row_groups = self.indices[0].num_row_groups;
+ debug_assert_eq!(1, self.row_group_indices.len());
+ debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
+
output.reserve(self.row_group_indices.len());
- let num_rows = self.num_rows / self.row_group_indices.len();
+ let num_rows = self.num_rows / num_row_groups as usize;
// Splits by row group.
- for index in self.row_group_indices {
+ for row_group_index in 0..num_row_groups {
output.push(RangeMeta {
time_range: self.time_range,
indices: self.indices.clone(),
- row_group_indices: smallvec![index],
+ row_group_indices: smallvec![RowGroupIndex {
+ index: self.indices[0].index,
+ row_group_index: row_group_index as i64,
+ }],
num_rows,
});
}
@@ -165,7 +186,10 @@ impl RangeMeta {
let num_rows = stats.num_rows() / stats.num_ranges();
ranges.push(RangeMeta {
time_range,
- indices: smallvec![memtable_index],
+ indices: smallvec![SourceIndex {
+ index: memtable_index,
+ num_row_groups: stats.num_ranges() as u64,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: memtable_index,
row_group_index: row_group_index as i64,
@@ -199,7 +223,10 @@ impl RangeMeta {
let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
ranges.push(RangeMeta {
time_range: time_range.unwrap_or_else(|| file.time_range()),
- indices: smallvec![file_index],
+ indices: smallvec![SourceIndex {
+ index: file_index,
+ num_row_groups: file.meta_ref().num_row_groups,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
@@ -212,7 +239,10 @@ impl RangeMeta {
for row_group_index in 0..file.meta_ref().num_row_groups {
ranges.push(RangeMeta {
time_range: file.time_range(),
- indices: smallvec![file_index],
+ indices: smallvec![SourceIndex {
+ index: file_index,
+ num_row_groups: file.meta_ref().num_row_groups,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
@@ -224,7 +254,10 @@ impl RangeMeta {
// If we don't known the number of row groups in advance, scan all row groups.
ranges.push(RangeMeta {
time_range: file.time_range(),
- indices: smallvec![file_index],
+ indices: smallvec![SourceIndex {
+ index: file_index,
+ num_row_groups: 0,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
@@ -245,7 +278,10 @@ impl RangeMeta {
};
ranges.push(RangeMeta {
time_range,
- indices: smallvec![i],
+ indices: smallvec![SourceIndex {
+ index: i,
+ num_row_groups: stats.num_ranges() as u64,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: i,
row_group_index: ALL_ROW_GROUPS,
@@ -263,31 +299,18 @@ impl RangeMeta {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
- if file.meta_ref().num_row_groups > 0 {
- // All row groups share the same time range.
- let row_group_indices = (0..file.meta_ref().num_row_groups)
- .map(|row_group_index| RowGroupIndex {
- index: file_index,
- row_group_index: row_group_index as i64,
- })
- .collect();
- ranges.push(RangeMeta {
- time_range: file.time_range(),
- indices: smallvec![file_index],
- row_group_indices,
- num_rows: file.meta_ref().num_rows as usize,
- });
- } else {
- ranges.push(RangeMeta {
- time_range: file.time_range(),
- indices: smallvec![file_index],
- row_group_indices: smallvec![RowGroupIndex {
- index: file_index,
- row_group_index: ALL_ROW_GROUPS,
- }],
- num_rows: file.meta_ref().num_rows as usize,
- });
- }
+ ranges.push(RangeMeta {
+ time_range: file.time_range(),
+ indices: smallvec![SourceIndex {
+ index: file_index,
+ num_row_groups: file.meta_ref().num_row_groups,
+ }],
+ row_group_indices: smallvec![RowGroupIndex {
+ index: file_index,
+ row_group_index: ALL_ROW_GROUPS,
+ }],
+ num_rows: file.meta_ref().num_rows as usize,
+ });
}
}
}
@@ -514,7 +537,10 @@ mod tests {
);
RangeMeta {
time_range,
- indices: smallvec![*idx],
+ indices: smallvec![SourceIndex {
+ index: *idx,
+ num_row_groups: 0,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: *idx,
row_group_index: 0
@@ -527,7 +553,7 @@ mod tests {
let actual: Vec<_> = output
.iter()
.map(|range| {
- let indices = range.indices.to_vec();
+ let indices = range.indices.iter().map(|index| index.index).collect();
let group_indices: Vec<_> = range
.row_group_indices
.iter()
@@ -578,7 +604,10 @@ mod tests {
fn test_merge_range() {
let mut left = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -593,7 +622,10 @@ mod tests {
};
let right = RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
- indices: smallvec![2],
+ indices: smallvec![SourceIndex {
+ index: 2,
+ num_row_groups: 2,
+ }],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
@@ -612,7 +644,16 @@ mod tests {
left,
RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
- indices: smallvec![1, 2],
+ indices: smallvec![
+ SourceIndex {
+ index: 1,
+ num_row_groups: 2
+ },
+ SourceIndex {
+ index: 2,
+ num_row_groups: 2
+ }
+ ],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -640,17 +681,14 @@ mod tests {
fn test_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
- row_group_indices: smallvec![
- RowGroupIndex {
- index: 1,
- row_group_index: 1
- },
- RowGroupIndex {
- index: 1,
- row_group_index: 2
- }
- ],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
+ row_group_indices: smallvec![RowGroupIndex {
+ index: 1,
+ row_group_index: ALL_ROW_GROUPS,
+ }],
num_rows: 5,
};
@@ -663,19 +701,25 @@ mod tests {
&[
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
- row_group_index: 1
+ row_group_index: 0
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
- row_group_index: 2
+ row_group_index: 1
}],
num_rows: 2,
}
@@ -687,7 +731,16 @@ mod tests {
fn test_not_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1, 2],
+ indices: smallvec![
+ SourceIndex {
+ index: 1,
+ num_row_groups: 1,
+ },
+ SourceIndex {
+ index: 2,
+ num_row_groups: 1,
+ }
+ ],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -710,32 +763,50 @@ mod tests {
#[test]
fn test_maybe_split_ranges() {
let ranges = vec![
+ RangeMeta {
+ time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
+ indices: smallvec![SourceIndex {
+ index: 0,
+ num_row_groups: 1,
+ }],
+ row_group_indices: smallvec![RowGroupIndex {
+ index: 0,
+ row_group_index: 0,
+ },],
+ num_rows: 4,
+ },
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
- row_group_indices: smallvec![
- RowGroupIndex {
- index: 1,
- row_group_index: 0
- },
- RowGroupIndex {
- index: 1,
- row_group_index: 1
- }
- ],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
+ row_group_indices: smallvec![RowGroupIndex {
+ index: 1,
+ row_group_index: ALL_ROW_GROUPS,
+ },],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
- indices: smallvec![2, 3],
+ indices: smallvec![
+ SourceIndex {
+ index: 2,
+ num_row_groups: 2,
+ },
+ SourceIndex {
+ index: 3,
+ num_row_groups: 0,
+ }
+ ],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
- row_group_index: 0
+ row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
- row_group_index: 0
+ row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,
@@ -745,9 +816,24 @@ mod tests {
assert_eq!(
output,
vec![
+ RangeMeta {
+ time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
+ indices: smallvec![SourceIndex {
+ index: 0,
+ num_row_groups: 1,
+ }],
+ row_group_indices: smallvec![RowGroupIndex {
+ index: 0,
+ row_group_index: 0
+ },],
+ num_rows: 4,
+ },
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 0
@@ -756,7 +842,10 @@ mod tests {
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
- indices: smallvec![1],
+ indices: smallvec![SourceIndex {
+ index: 1,
+ num_row_groups: 2,
+ }],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
@@ -765,15 +854,24 @@ mod tests {
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
- indices: smallvec![2, 3],
+ indices: smallvec![
+ SourceIndex {
+ index: 2,
+ num_row_groups: 2
+ },
+ SourceIndex {
+ index: 3,
+ num_row_groups: 0,
+ }
+ ],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
- row_group_index: 0
+ row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
- row_group_index: 0
+ row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,
diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs
index 7da80806f22e..471cc1a8e5d4 100644
--- a/src/mito2/src/read/scan_region.rs
+++ b/src/mito2/src/read/scan_region.rs
@@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
+use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
@@ -68,15 +69,6 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}
-
- /// Returns a [RegionScanner] to scan the region.
- #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
- pub(crate) fn region_scanner(self) -> Result {
- match self {
- Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
- Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)),
- }
- }
}
#[cfg(test)]
@@ -104,6 +96,17 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
}
}
+
+ /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
+ pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
+ use store_api::region_engine::{PrepareRequest, RegionScanner};
+
+ let request = PrepareRequest::default().with_target_partitions(target_partitions);
+ match self {
+ Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
+ Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
+ }
+ }
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -165,8 +168,8 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_manager: CacheManagerRef,
- /// Parallelism to scan.
- parallelism: ScanParallelism,
+ /// Capacity of the channel to send data from parallel scan tasks to the main task.
+ parallel_scan_channel_size: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -188,17 +191,20 @@ impl ScanRegion {
access_layer,
request,
cache_manager,
- parallelism: ScanParallelism::default(),
+ parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
ignore_inverted_index: false,
ignore_fulltext_index: false,
start_time: None,
}
}
- /// Sets parallelism.
+ /// Sets parallel scan task channel size.
#[must_use]
- pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
- self.parallelism = parallelism;
+ pub(crate) fn with_parallel_scan_channel_size(
+ mut self,
+ parallel_scan_channel_size: usize,
+ ) -> Self {
+ self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -224,7 +230,7 @@ impl ScanRegion {
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result {
- if self.version.options.append_mode && self.request.series_row_selector.is_none() {
+ if self.use_unordered_scan() {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.unordered_scan().map(Scanner::Unordered)
@@ -233,10 +239,20 @@ impl ScanRegion {
}
}
+ /// Returns a [RegionScanner] to scan the region.
+ #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
+ pub(crate) fn region_scanner(self) -> Result {
+ if self.use_unordered_scan() {
+ self.unordered_scan().map(|scanner| Box::new(scanner) as _)
+ } else {
+ self.seq_scan().map(|scanner| Box::new(scanner) as _)
+ }
+ }
+
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result {
let input = self.scan_input(true)?;
- Ok(SeqScan::new(input))
+ Ok(SeqScan::new(input, false))
}
/// Unordered scan.
@@ -248,7 +264,14 @@ impl ScanRegion {
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result {
let input = self.scan_input(false)?;
- Ok(SeqScan::new(input))
+ Ok(SeqScan::new(input, false))
+ }
+
+ /// Returns true if the region can use unordered scan for current request.
+ fn use_unordered_scan(&self) -> bool {
+ // If table is append only and there is no series row selector, we use unordered scan in query.
+ // We still use seq scan in compaction.
+ self.version.options.append_mode && self.request.series_row_selector.is_none()
}
/// Creates a scan input.
@@ -314,7 +337,7 @@ impl ScanRegion {
.with_cache(self.cache_manager)
.with_inverted_index_applier(inverted_index_applier)
.with_fulltext_index_applier(fulltext_index_applier)
- .with_parallelism(self.parallelism)
+ .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
@@ -428,15 +451,6 @@ impl ScanRegion {
}
}
-/// Config for parallel scan.
-#[derive(Debug, Clone, Copy, Default)]
-pub(crate) struct ScanParallelism {
- /// Number of tasks expect to spawn to read data.
- pub(crate) parallelism: usize,
- /// Channel size to send batches. Only takes effect when the parallelism > 1.
- pub(crate) channel_size: usize,
-}
-
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -466,8 +480,8 @@ pub(crate) struct ScanInput {
pub(crate) cache_manager: CacheManagerRef,
/// Ignores file not found error.
ignore_file_not_found: bool,
- /// Parallelism to scan data.
- pub(crate) parallelism: ScanParallelism,
+ /// Capacity of the channel to send data from parallel scan tasks to the main task.
+ pub(crate) parallel_scan_channel_size: usize,
/// Index appliers.
inverted_index_applier: Option,
fulltext_index_applier: Option,
@@ -496,7 +510,7 @@ impl ScanInput {
files: Vec::new(),
cache_manager: CacheManagerRef::default(),
ignore_file_not_found: false,
- parallelism: ScanParallelism::default(),
+ parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
fulltext_index_applier: None,
query_start: None,
@@ -549,10 +563,13 @@ impl ScanInput {
self
}
- /// Sets scan parallelism.
+ /// Sets scan task channel size.
#[must_use]
- pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
- self.parallelism = parallelism;
+ pub(crate) fn with_parallel_scan_channel_size(
+ mut self,
+ parallel_scan_channel_size: usize,
+ ) -> Self {
+ self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -621,12 +638,15 @@ impl ScanInput {
sources: Vec