Skip to content

Commit

Permalink
refactor: partitioned_files as a method
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Dec 8, 2024
1 parent 8a7cbf8 commit bcc8669
Showing 1 changed file with 81 additions and 85 deletions.
166 changes: 81 additions & 85 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl StandardTableProvider {
})
.collect();

let (partitioned_files, statistics) = partitioned_files(cached, &self.schema);
let (partitioned_files, statistics) = self.partitioned_files(cached);
let plan = self
.create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
Expand Down Expand Up @@ -249,7 +249,7 @@ impl StandardTableProvider {
})
.collect();

let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &self.schema);
let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files);
let plan = self
.create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
Expand Down Expand Up @@ -317,6 +317,84 @@ impl StandardTableProvider {
};
Ok(exec)
}

fn partitioned_files(
&self,
manifest_files: Vec<catalog::manifest::File>,
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
let target_partition = num_cpus::get();
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics =
HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
let mut count = 0;
for (index, file) in manifest_files
.into_iter()
.enumerate()
.map(|(x, y)| (x % target_partition, y))
{
#[allow(unused_mut)]
let catalog::manifest::File {
mut file_path,
num_rows,
columns,
..
} = file;

// object_store::path::Path doesn't automatically deal with Windows path separators
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
// before sending the file path to PartitionedFile
// the github issue- https://github.com/parseablehq/parseable/issues/824
// For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution
// TODO: figure out an elegant solution to this
#[cfg(windows)]
{
if CONFIG.storage_name.eq("drive") {
file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
}
}
let pf = PartitionedFile::new(file_path, file.file_size);
partitioned_files[index].push(pf);

columns.into_iter().for_each(|col| {
column_statistics
.entry(col.name)
.and_modify(|x| {
if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone())
{
*x = Some(stats.update(col_stats));
}
})
.or_insert_with(|| col.stats.as_ref().cloned());
});
count += num_rows;
}
let statistics = self
.schema
.fields()
.iter()
.map(|field| {
column_statistics
.get(field.name())
.and_then(|stats| stats.as_ref())
.and_then(|stats| stats.clone().min_max_as_scalar(field.data_type()))
.map(|(min, max)| datafusion::common::ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Exact(max),
min_value: Precision::Exact(min),
distinct_count: Precision::Absent,
})
.unwrap_or_default()
})
.collect();

let statistics = datafusion::common::Statistics {
num_rows: Precision::Exact(count as usize),
total_byte_size: Precision::Absent,
column_statistics: statistics,
};

(partitioned_files, statistics)
}
}

async fn collect_from_snapshot(
Expand Down Expand Up @@ -366,88 +444,6 @@ async fn collect_from_snapshot(
Ok(manifest_files)
}

fn partitioned_files(
manifest_files: Vec<catalog::manifest::File>,
table_schema: &Schema,
) -> (Vec<Vec<PartitionedFile>>, datafusion::common::Statistics) {
let target_partition = num_cpus::get();
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
let mut count = 0;
for (index, file) in manifest_files
.into_iter()
.enumerate()
.map(|(x, y)| (x % target_partition, y))
{
let catalog::manifest::File {
file_path,
num_rows,
columns,
..
} = file;

// object_store::path::Path doesn't automatically deal with Windows path separators
// to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem
// before sending the file path to PartitionedFile
// the github issue- https://github.com/parseablehq/parseable/issues/824
// For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution
// TODO: figure out an elegant solution to this
let pf;

#[cfg(unix)]
{
pf = PartitionedFile::new(file_path, file.file_size);
}
#[cfg(windows)]
{
pf = if CONFIG.storage_name.eq("drive") {
let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap();
PartitionedFile::new(file_path, file.file_size)
} else {
PartitionedFile::new(file_path, file.file_size)
};
}

partitioned_files[index].push(pf);
columns.into_iter().for_each(|col| {
column_statistics
.entry(col.name)
.and_modify(|x| {
if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) {
*x = Some(stats.update(col_stats));
}
})
.or_insert_with(|| col.stats.as_ref().cloned());
});
count += num_rows;
}
let statistics = table_schema
.fields()
.iter()
.map(|field| {
column_statistics
.get(field.name())
.and_then(|stats| stats.as_ref())
.and_then(|stats| stats.clone().min_max_as_scalar(field.data_type()))
.map(|(min, max)| datafusion::common::ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Exact(max),
min_value: Precision::Exact(min),
distinct_count: Precision::Absent,
})
.unwrap_or_default()
})
.collect();

let statistics = datafusion::common::Statistics {
num_rows: Precision::Exact(count as usize),
total_byte_size: Precision::Absent,
column_statistics: statistics,
};

(partitioned_files, statistics)
}

#[async_trait::async_trait]
impl TableProvider for StandardTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
Expand Down Expand Up @@ -602,7 +598,7 @@ impl TableProvider for StandardTableProvider {
);
}

let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema);
let (partitioned_files, statistics) = self.partitioned_files(manifest_files);
let remote_exec = self
.create_parquet_physical_plan(
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
Expand Down

0 comments on commit bcc8669

Please sign in to comment.