Skip to content

Commit

Permalink
group by 1 file okay
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Dec 15, 2024
1 parent 5634be1 commit 2796898
Showing 1 changed file with 42 additions and 19 deletions.
61 changes: 42 additions & 19 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,57 @@
// specific language governing permissions and limitations
// under the License.

use arrow::util::pretty::pretty_format_batches;
use super::*;
use datafusion::scalar::ScalarValue;

#[tokio::test]
async fn parquet_agg_oom() -> Result<()> {
let ctx = SessionContext::new();
let testdata = datafusion::test_util::arrow_test_data();
let schema = test_util::aggr_test_schema();
ctx.register_csv(
"aggregate_test_100",
&format!("{testdata}/csv/aggregate_test_100.csv"),
CsvReadOptions::new().schema(&schema),
ctx.register_parquet(
"agg_oom",
&format!("/Users/bgardner/Downloads/fffe1163-1dcb-4b83-9b29-ab1a1fbe9e0c.parquet"),
ParquetReadOptions::default()
)
.await?;

let sql = "SELECT c1, c2 FROM aggregate_test_100 order by c1 limit 1";
let actual = execute_to_batches(&ctx, sql).await;
let actual = format!("{}", pretty_format_batches(actual.as_slice()).unwrap());
let expected = r#"
+----+----+
| c1 | c2 |
+----+----+
| a | 1 |
+----+----+
"#.trim();

assert_eq!(actual, expected);
let sql = r#"
select truncated_time, count(*) AS cnt
from (
select
truncated_time, k8s_deployment_name, message
from (
SELECT
fs__event_metadata__priorityclass__paxotxj5r733hqnv2mtnyec2twekis3z AS priorityclass,
fs__event_metadata__timestamp__uwqbxd5fgh5k2vtdact65xx5chcnah66 AS timestamp,
date_trunc('day', fs__event_metadata__timestamp__uwqbxd5fgh5k2vtdact65xx5chcnah66) AS truncated_time,
fs__user_data__resource__sep__attributes__sep__k8s_deployment_name__qkthsul42nc677ot3oujq3ypey3tqnyc AS k8s_deployment_name,
fs__user_data__message__n6nzv46nn2fyu46czxhng77j6wjcnyt5 as message
FROM agg_oom
where fs__event_metadata__priorityclass__paxotxj5r733hqnv2mtnyec2twekis3z != 'low'
)
group by truncated_time, k8s_deployment_name, message
) group by truncated_time
"#;
let batches = execute_to_batches(&ctx, sql).await;
println!("batches={}", batches.len());
let batch = batches.get(0).unwrap();
for field in batch.schema().fields.iter() {
println!("field={:?}", field);
}

// let mut max_len = 0;
// for batch in batches.iter() {
// println!("rows={}", batch.num_rows());
// let col = batch.column_by_name("message").unwrap();
// let strs = col.as_any().downcast_ref::<StringArray>();
// for str in strs.iter() {
// max_len = usize::max(max_len, str.len());
// }
// }
// println!("max_len={max_len}");

use arrow::util::pretty::print_batches;
print_batches(batches.as_slice()).unwrap();

Ok(())
}
Expand Down

0 comments on commit 2796898

Please sign in to comment.