Skip to content

Commit

Permalink
fix for invalid parquet issue
Browse files Browse the repository at this point in the history
detect the 0 sized parquet
log the error and delete the parquet and retain the grouped arrow files
the arrow files will be converted to parquet in the next sync cycle (every 60 secs)
  • Loading branch information
nikhilsinhaparseable committed Jun 26, 2024
1 parent 4c0d2a8 commit 19c5047
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,34 +255,36 @@ pub fn convert_disk_files_to_parquet(
custom_partition_fields.insert(custom_partition_field.to_string(), index);
}
}
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let props = parquet_writer_props(
time_partition.clone(),
index_time_partition,
custom_partition_fields,
)
.build();

schemas.push(merged_schema.clone());
let schema = Arc::new(merged_schema);
let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?;
for ref record in record_reader.merged_iter(schema, time_partition.clone()) {
writer.write(record)?;
}

writer.close()?;

for file in files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();

if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
if parquet_file.metadata().unwrap().len() == 0 {
log::error!("Invalid parquet file detected, removing it");
fs::remove_file(parquet_path).unwrap();
} else {
for file in files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
}

Expand Down

0 comments on commit 19c5047

Please sign in to comment.