diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 82c819082..2f0f9022f 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -255,34 +255,36 @@ pub fn convert_disk_files_to_parquet( custom_partition_fields.insert(custom_partition_field.to_string(), index); } } - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; let props = parquet_writer_props( time_partition.clone(), index_time_partition, custom_partition_fields, ) .build(); - schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); - let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?; for ref record in record_reader.merged_iter(schema, time_partition.clone()) { writer.write(record)?; } writer.close()?; - - for file in files { - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - - if fs::remove_file(file.clone()).is_err() { - log::error!("Failed to delete file. Unstable state"); - process::abort() + if parquet_file.metadata().unwrap().len() == 0 { + log::error!("Invalid parquet file detected, removing it"); + fs::remove_file(parquet_path).unwrap(); + } else { + for file in files { + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + if fs::remove_file(file.clone()).is_err() { + log::error!("Failed to delete file. Unstable state"); + process::abort() + } + metrics::STORAGE_SIZE + .with_label_values(&["staging", stream, file_type]) + .sub(file_size as i64); } - metrics::STORAGE_SIZE - .with_label_values(&["staging", stream, file_type]) - .sub(file_size as i64); } }