-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exponential planning time (100s of seconds) with UNION
and ORDER BY
queries
#13748
Comments
I tested this with various released versions:
|
BTW here is the code I am using to generate those numbers (it is pretty grotty) Details
// DataFusion spilling sort benchmark / exmaples
// Idea is to replicate a report from https://discord.com/channels/885562378132000778/1166447479609376850/1276137008435298335
// where sort doesn't spill
// Related link: sorting strings
use std::fs::File;
use std::path::PathBuf;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::datatypes::{Field, Fields, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use std::sync::Arc;
use datafusion::arrow;
use datafusion::prelude::SessionContext;
#[tokio::main]
async fn main() -> Result<()> {
// initialize logging to see DataFusion's internal logging
std::env::set_var("RUST_LOG", "info");
env_logger::init();
let ctx = SessionContext::new();
let n = 100;
make_table(n);
ctx.sql(&format!("CREATE EXTERNAL TABLE t({}) STORED AS CSV LOCATION 'data.csv' WITH ORDER (c1)", column_list(100))).await?
.show().await?;
println!("10 columns");
println!("{}", make_query(10));
println!("40 columns");
println!("{}", make_query(40));
for n in [10, 20, 30, 40, 50, 60, 70, 80, 90, 100] {
let sql = make_query(n);
print!("Running with {n} columns");
let start = std::time::Instant::now();
ctx.sql(&sql).await?.collect().await?;
let elapsed = start.elapsed();
println!("...completed in {:?}", elapsed);
}
Ok(())
}
fn column_list(n: usize) -> String {
(0..n)
.map(|i| format!("c{} int", i))
.collect::<Vec<_>>()
.join(", ")
}
/// Writes a table like this to CSV file
/// c1: int, c2: int, c3: int....cn:int
///
/// returns the filename
fn make_table(n: usize) -> PathBuf {
let path = PathBuf::from("data.csv");
let arrays = (0..n)
.map(|i| {
let i = i as i32;
let n = n as i32;
Arc::new(Int32Array::from(vec![i * n, 2 * i * n, 3 * i * n])) as ArrayRef
})
.collect::<Vec<_>>();
let schema = Schema::new(Fields::from(
arrays
.iter()
.enumerate()
.map(|(i, arr)| Field::new(&format!("c{}", i), arr.data_type().clone(), false))
.collect::<Vec<_>>(),
));
let batch = RecordBatch::try_new(Arc::new(schema), arrays).unwrap();
let file = File::create(&path).unwrap();
let mut writer = arrow::csv::Writer::new(file);
writer.write(&batch).unwrap();
// flush on drop
// writer.into_inner();
path
}
/// return a query like
/// ```sql
/// select c1, null as c2, ... null as cn from t ORDER BY c1
/// UNION ALL
/// select null as c1, c2, ... null as cn from t ORDER BY c2
/// ...
/// select null as c1, null as c2, ... cn from t ORDER BY cn
/// ORDER BY c1, c2 ... CN
/// ```
fn make_query(n: usize) -> String {
let mut query = String::new();
for i in 0..n {
if i != 0 {
query.push_str("\n UNION ALL \n");
}
let select_list = (0..n)
.map(|j| {
if i == j {
format!("c{j}")
} else {
format!("null as c{j}")
}
})
.collect::<Vec<_>>()
.join(", ");
query.push_str(&format!("(SELECT {} FROM t ORDER BY c{})", select_list, i));
}
query.push_str(&format!(
"\nORDER BY {}",
(0..n)
.map(|i| format!("c{}", i))
.collect::<Vec<_>>()
.join(", ")
));
query
} |
Do you know where the time is being spent? |
Yes the time is being spent normalizing equivalence orderings. I just need to spend some time staring at the code and figuring out how it could be done better |
I spent some time reviewing the flamegraph.
The high level observation is that continually recomputing normalized equivalence groups requires significaint amounts of time. What I am going to try is to make these implementations much more efficient (avoid allocation and recomputation) and Sketch:
|
FYI I am pretty sure I was seeing this with some of the sqlite test files, specifically https://github.com/Omega359/arrow-datafusion/blob/feature/sqllogictest_add_sqlite/datafusion/sqllogictest/test_files/sqlite/select4.slt |
I will make this much better, but it will take me a while |
I have big plans on how to fix this holistically, but I also want to be able to resolve the issue quickly if possible. I am going to see if I can find some way to work around this for now while I work on a better long term plan |
For anyone following along, we are tracking completing the sqllogictest work in |
Describe the bug
Our internal system generates queries that look like the following
When there are 10 columns this takes 22 ms (in release mode) to plan (which is still quite a while)
When there are 100 columns, it takes over 2 minutes (!!) to plan, which basically caused two production incidents
Here are some timings with numbers of columns (you can see the exponential growth):
Running with 10 columns...completed in 22.65575ms
Running with 20 columns...completed in 107.885ms
Running with 30 columns...completed in 481.31775ms
Running with 40 columns...completed in 1.656844042s
Running with 50 columns...completed in 4.560470708s
Running with 60 columns...completed in 10.54814975s
Running with 70 columns...completed in 21.993968458s
Running with 80 columns...completed in 41.614843209s
Running with 90 columns...completed in 73.642939542s
Running with 100 columns...completed in 123.150163417s
To Reproduce
With this data file (has 100 columns): data.csv
Create Table
Run query
40 column version (this takes over a second to plan in release mode)
Expected behavior
I expect that the query plans within a second with 100 columns
Additional context
The code spends an increasing amount of time in :
This particularly bad behavior was introduced in influxdata@577e4bb / #12562 (🤦 myself)
I think this is also what @berkaysynnada was warning us in #12446 (comment)
The text was updated successfully, but these errors were encountered: