Skip to content

Commit

Permalink
feat: add custom table catalogue format (#561)
Browse files Browse the repository at this point in the history
This PR introduces a new table format that keeps track of 
all the data files in the data storage. The format is inspired by 
Apache Iceberg so has similar naming scheme for things.

Snapshot which is stored in the stream metadata file is the 
main entry point to a table. A snapshot essentially is a list of URL 
to manifest file and primary time statistics for pruning said manifest 
during query. A manifest file contains list of all the actual files along 
with their file level statistics. Currently a manifest file is generated 
per top level partition ( i.e date ).

For old data (data ingested by older versions of Parseable) the 
query mechanism falls back to old style of query data.

Signed-off-by: Satyam Singh <[email protected]>
Co-authored-by: Nick <[email protected]>
  • Loading branch information
trueleo and theteachr authored Dec 7, 2023
1 parent 05de709 commit 3b98dd8
Show file tree
Hide file tree
Showing 18 changed files with 1,686 additions and 423 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ humantime = "2.1.0"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
171 changes: 171 additions & 0 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::sync::Arc;

use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
use relative_path::RelativePathBuf;

use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
storage::{ObjectStorage, ObjectStorageError},
};

use self::{column::Column, snapshot::ManifestItem};

pub mod column;
pub mod manifest;
pub mod snapshot;

pub use manifest::create_from_parquet_file;

pub trait Snapshot {
fn manifests(&self, time_predicates: Vec<PartialTimeFilter>) -> Vec<ManifestItem>;
}

pub trait ManifestFile {
fn file_name(&self) -> &str;
fn ingestion_size(&self) -> u64;
fn file_size(&self) -> u64;
fn num_rows(&self) -> u64;
fn columns(&self) -> &[Column];
}

impl ManifestFile for manifest::File {
fn file_name(&self) -> &str {
&self.file_path
}

fn ingestion_size(&self) -> u64 {
self.ingestion_size
}

fn file_size(&self) -> u64 {
self.file_size
}

fn num_rows(&self) -> u64 {
self.num_rows
}

fn columns(&self) -> &[Column] {
self.columns.as_slice()
}
}

pub async fn update_snapshot(
storage: Arc<dyn ObjectStorage + Send>,
stream_name: &str,
change: manifest::File,
) -> Result<(), ObjectStorageError> {
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
match file
.columns()
.iter()
.find(|col| col.name == "p_timestamp")
.unwrap()
.stats
.clone()
.unwrap()
{
column::TypedStatistics::Int(stats) => (
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
NaiveDateTime::from_timestamp_millis(stats.min)
.unwrap()
.and_utc(),
),
_ => unreachable!(),
}
}

// get current snapshot
let mut meta = storage.get_snapshot(stream_name).await?;
let manifests = &mut meta.manifest_list;

let (lower_bound, _) = get_file_bounds(&change);
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});

// We update the manifest referenced by this position
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
let Some(mut manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
manifest.apply_change(change);
storage.put_manifest(&path, manifest).await?;
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(
23 * 3600 + 59 * 60 + 59,
999_999_999,
)
.unwrap(),
)
.and_utc();

let manifest = Manifest {
files: vec![change],
..Manifest::default()
};

let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
let path = storage.absolute_url(&path);
let new_snapshot_entriy = snapshot::ManifestItem {
manifest_path: path.to_string(),
time_lower_bound: lower_bound,
time_upper_bound: upper_bound,
};
manifests.push(new_snapshot_entriy);
storage.put_snapshot(stream_name, meta).await?;
}

Ok(())
}

/// Partition the path to which this manifest belongs.
/// Useful when uploading the manifest file.
fn partition_path(
stream: &str,
lower_bound: DateTime<Utc>,
upper_bound: DateTime<Utc>,
) -> RelativePathBuf {
let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string();
let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string();
if lower == upper {
RelativePathBuf::from_iter([stream, &format!("date={}", lower)])
} else {
RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)])
}
}
146 changes: 146 additions & 0 deletions server/src/catalog/column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::cmp::{max, min};

use parquet::file::statistics::Statistics;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BoolType {
pub min: bool,
pub max: bool,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Float64Type {
pub min: f64,
pub max: f64,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Int64Type {
pub min: i64,
pub max: i64,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Utf8Type {
pub min: String,
pub max: String,
}

// Typed statistics are typed variant of statistics
// Currently all parquet types are casted down to these 4 types
// Binary types are assumed to be of valid Utf8
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum TypedStatistics {
Bool(BoolType),
Int(Int64Type),
Float(Float64Type),
String(Utf8Type),
}

impl TypedStatistics {
pub fn update(self, other: Self) -> Self {
match (self, other) {
(TypedStatistics::Bool(this), TypedStatistics::Bool(other)) => {
TypedStatistics::Bool(BoolType {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
}
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
TypedStatistics::Float(Float64Type {
min: this.min.min(other.min),
max: this.max.max(other.max),
})
}
(TypedStatistics::Int(this), TypedStatistics::Int(other)) => {
TypedStatistics::Int(Int64Type {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
}
(TypedStatistics::String(this), TypedStatistics::String(other)) => {
TypedStatistics::String(Utf8Type {
min: min(this.min, other.min),
max: max(this.max, other.max),
})
}
_ => panic!("Cannot update wrong types"),
}
}
}

/// Column statistics are used to track statistics for a column in a given file.
/// This is similar to and derived from parquet statistics.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Column {
pub name: String,
pub stats: Option<TypedStatistics>,
pub uncompressed_size: u64,
pub compressed_size: u64,
}

impl TryFrom<&Statistics> for TypedStatistics {
type Error = parquet::errors::ParquetError;
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
if !value.has_min_max_set() {
return Err(parquet::errors::ParquetError::General(
"min max is not set".to_string(),
));
}

let res = match value {
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
min: *stats.min(),
max: *stats.max(),
}),
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min() as i64,
max: *stats.max() as i64,
}),
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min(),
max: *stats.max(),
}),
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
min: stats.min().to_i64(),
max: stats.max().to_i64(),
}),
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min() as f64,
max: *stats.max() as f64,
}),
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min(),
max: *stats.max(),
}),
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
}),
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
}),
};

Ok(res)
}
}
Loading

0 comments on commit 3b98dd8

Please sign in to comment.