Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support explain query when running dfbench with clickbench #13942

Merged
merged 2 commits into from
Dec 30, 2024

Conversation

zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Dec 30, 2024

Which issue does this PR close?

Closes #13941

Rationale for this change

What changes are included in this PR?

Add the debug option for clickbench

Are these changes tested?

yes:

cargo run --release --bin dfbench clickbench  --query 35 --debug
   Compiling datafusion-benchmarks v44.0.0 (/Users/zhuqi/arrow-datafusion/benchmarks)
    Building [=======================> ] 352/353: dfbench(bin)

    Finished `release` profile [optimized] target(s) in 4m 51s
     Running `target/release/dfbench clickbench --query 35 --debug`
Running benchmarks with the following options: RunOpt { query: Some(35), common: CommonOpt { iterations: 3, partitions: None, batch_size: 8192, debug: true }, path: "benchmarks/data/hits.parquet", queries_path: "benchmarks/queries/clickbench/queries.sql", output_path: None }
Q35: SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10;
Query 35 iteration 0 took 1186.0 ms and returned 10 rows
Query 35 iteration 1 took 1018.3 ms and returned 10 rows
Query 35 iteration 2 took 970.2 ms and returned 10 rows
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: c DESC NULLS FIRST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |   Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP - Int64(2), hits.ClientIP - Int64(3), count(*) AS c                                                                                                                                                                                                                                                                                                                                                                                      |
|               |     Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS hits.ClientIP - Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2), __common_expr_1 AS hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS count(*)]]                                                                                                                                                                                                                                                                                         |
|               |       Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1, hits.ClientIP                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |         TableScan: hits projection=[ClientIP]                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| physical_plan | SortPreservingMergeExec: [c@4 DESC], fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |   SortExec: TopK(fetch=10), expr=[c@4 DESC], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |     ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - Int64(3), count(*)@4 as c]                                                                                                                                                                                                                                                                    |
|               |       AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - Int64(3)], aggr=[count(*)]                                                                                                                                                                                                                                             |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |           RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP - Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 14), input_partitions=14                                                                                                                                                                                                                                                                                                                                       |
|               |             AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP, __common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)], aggr=[count(*)]                                                                                                                                                                                                                                                               |
|               |               ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as __common_expr_1, ClientIP@0 as ClientIP]                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                 ParquetExec: file_groups={14 groups: [[Users/zhuqi/arrow-datafusion/benchmarks/data/hits.parquet:0..1055712604], [Users/zhuqi/arrow-datafusion/benchmarks/data/hits.parquet:1055712604..2111425208], [Users/zhuqi/arrow-datafusion/benchmarks/data/hits.parquet:2111425208..3167137812], [Users/zhuqi/arrow-datafusion/benchmarks/data/hits.parquet:3167137812..4222850416], [Users/zhuqi/arrow-datafusion/benchmarks/data/hits.parquet:4222850416..5278563020], ...]}, projection=[ClientIP] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Are there any user-facing changes?

yes

@zhuqi-lucas
Copy link
Contributor Author

cc @alamb It's a small change for explain support for dfbench, thanks!

@2010YOUY01
Copy link
Contributor

Perhaps we can use an existing common option to print plans


tpch benchmark uses this flag, but it's not used in clickbench yet
async fn execute_query(
&self,
ctx: &SessionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
if debug {
println!("=== Logical plan ===\n{plan}\n");
}
let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{plan}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent(true)
);
}
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent(true)
);
if !result.is_empty() {
// do not call print_batches if there are no batches as the result is confusing
// and makes it look like there is a batch with no columns
pretty::print_batches(&result)?;
}
}
Ok(result)
}

@zhuqi-lucas
Copy link
Contributor Author

Thank you @2010YOUY01 for review and good suggestion! Addressed in latest PR.

@zhuqi-lucas zhuqi-lucas changed the title Support explain query when running dfbench Support explain query when running dfbench with clickbench Dec 30, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zhuqi-lucas and @2010YOUY01 -- looks good to me

FYI @XiangpengHao as I think you have also been using this benchmark program

@alamb alamb merged commit 4d07579 into apache:main Dec 30, 2024
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support explain query when running dfbench with clickbench
3 participants