-
The datafusion query engine is fantastic and I've been using it to implement a data streaming layer for the nautilus trading engine. However upon more testing I'm seeing very high total memory allocations by the datafusion code.
The file I'm reading is about 116M and contains about 10M records. The test code is streaming record batches from the file and counting the total number of records retrieved. Something like this were
Ideally my expectation is that only the amount of memory needed to process one chunk of data will be allocated. Retrieving it from the disk, processing and collecting it into a Vec. But some analysis using bytehound shows that the memory keeps on growing. Based on the stack trace it seems to be growing in the datafusion library logic. The total allocation for the program grows to about 1 GB (for a 116 MB file 🙃) and about 700 MB of it grows from these few lines in the datafusion parquet reader. The full backtrace and graphs is attached below. Is it there anyway to not let the memory keep growing?
|
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 7 replies
-
It turns out the Removing the However this means that the user must ensure that the file is pre-sorted. Is there any query or clause that can check this assumption while querying the file and failing if it's not sorted? |
Beta Was this translation helpful? Give feedback.
-
Thank you @twitu -- this is a great analysis and thank you for posting your results (and not leaving us hanging!) You can tell DataFusion how your file is sorted using APIs: Either via registering the listing table directly: Or via SQL in https://arrow.apache.org/datafusion/user-guide/sql/ddl.html
If you tell DataFusion the file is sorted by |
Beta Was this translation helpful? Give feedback.
-
Hi @alamb it appears the fix I mentioned above was spurious. For some reason a simple streaming query is still growing the memory continuously. Here's my script that takes a file name loads into the context and makes a simple "SELECT * FROM <table_name>" query. use std::path::PathBuf;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::StreamExt;
fn main() {
let file_path: PathBuf = std::env::var("FILE_PATH").unwrap().into();
let file_name = file_path.file_stem().unwrap().to_str().unwrap();
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let session_ctx = SessionContext::default();
let parquet_options = ParquetReadOptions::<'_> {
skip_metadata: Some(false),
..Default::default()
};
runtime
.block_on(session_ctx.register_parquet(
file_name,
file_path.to_str().unwrap(),
parquet_options,
))
.unwrap();
let default_query = format!("SELECT * FROM {}", &file_name);
let query = runtime.block_on(session_ctx.sql(&default_query)).unwrap();
let mut batch_stream = runtime.block_on(query.execute_stream()).unwrap();
let mut count = 0;
while let Some(Ok(batch)) = runtime.block_on(batch_stream.next()) {
count += batch.num_rows();
}
println!("{}", count);
} But when I check memory profile it keeps growing to almost 600 MB for a parquet file of 680 MB. As you can see from the step increases in memory, some Vec is growing and reallocating double memory. This logic is somewhere deep inside
|
Beta Was this translation helpful? Give feedback.
The unit of IO is the page if the offset index is enabled, otherwise falling back to reading entire column chunk. What did you use to write the file? This behaviour would make sense if only a few very large row groups, and no offset index