From d089a53caa2a09dfe46cdb8f5ebc869bd6566a7b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Dec 2024 09:44:07 -0500 Subject: [PATCH] Consolidate Examples: memtable.rs and parquet_multiple_files.rs (#13913) --- datafusion-examples/README.md | 3 +- datafusion-examples/examples/memtable.rs | 74 --------------- ...uet_sql_multiple_files.rs => sql_query.rs} | 94 ++++++++++++++++--- 3 files changed, 83 insertions(+), 88 deletions(-) delete mode 100644 datafusion-examples/examples/memtable.rs rename datafusion-examples/examples/{parquet_sql_multiple_files.rs => sql_query.rs} (54%) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index b06148ce267f..3ec008a6026d 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -64,10 +64,8 @@ cargo run --example dataframe - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function -- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es - [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates - [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries -- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files - [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution - [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`. - [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan` @@ -83,6 +81,7 @@ cargo run --example dataframe - [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures - [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings - [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser` +- [`sql_query.rs`](examples/memtable.rs): Query data using SQL (in memory `RecordBatch`es, local Parquet files)q - [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function - [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions diff --git a/datafusion-examples/examples/memtable.rs b/datafusion-examples/examples/memtable.rs deleted file mode 100644 index bb0b720eff79..000000000000 --- a/datafusion-examples/examples/memtable.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::arrow::array::{UInt64Array, UInt8Array}; -use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::MemTable; -use datafusion::error::Result; -use datafusion::prelude::SessionContext; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::timeout; - -/// This example demonstrates executing a simple query against a [`MemTable`] -#[tokio::main] -async fn main() -> Result<()> { - let mem_table = create_memtable()?; - - // create local execution context - let ctx = SessionContext::new(); - - // Register the in-memory table containing the data - ctx.register_table("users", Arc::new(mem_table))?; - - let dataframe = ctx.sql("SELECT * FROM users;").await?; - - timeout(Duration::from_secs(10), async move { - let result = dataframe.collect().await.unwrap(); - let record_batch = result.first().unwrap(); - - assert_eq!(1, record_batch.column(0).len()); - dbg!(record_batch.columns()); - }) - .await - .unwrap(); - - Ok(()) -} - -fn create_memtable() -> Result { - MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]]) -} - -fn create_record_batch() -> Result { - let id_array = UInt8Array::from(vec![1]); - let account_array = UInt64Array::from(vec![9000]); - - Ok(RecordBatch::try_new( - get_schema(), - vec![Arc::new(id_array), Arc::new(account_array)], - ) - .unwrap()) -} - -fn get_schema() -> SchemaRef { - SchemaRef::new(Schema::new(vec![ - Field::new("id", DataType::UInt8, false), - Field::new("bank_account", DataType::UInt64, true), - ])) -} diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/sql_query.rs similarity index 54% rename from datafusion-examples/examples/parquet_sql_multiple_files.rs rename to datafusion-examples/examples/sql_query.rs index b0d3922a3278..f6d3936568cc 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/sql_query.rs @@ -15,21 +15,90 @@ // specific language governing permissions and limitations // under the License. +use datafusion::arrow::array::{UInt64Array, UInt8Array}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::MemTable; +use datafusion::error::{DataFusionError, Result}; +use datafusion::prelude::SessionContext; +use datafusion_common::exec_datafusion_err; +use object_store::local::LocalFileSystem; use std::path::Path; use std::sync::Arc; +use std::time::Duration; +use tokio::time::timeout; + +/// Examples of various ways to execute queries using SQL +/// +/// [`query_memtable`]: a simple query against a [`MemTable`] +/// [`query_parquet`]: a simple query against a directory with multiple Parquet files +/// +#[tokio::main] +async fn main() -> Result<()> { + query_memtable().await?; + query_parquet().await?; + Ok(()) +} -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::ListingOptions; -use datafusion::prelude::*; +/// Run a simple query against a [`MemTable`] +pub async fn query_memtable() -> Result<()> { + let mem_table = create_memtable()?; -use object_store::local::LocalFileSystem; + // create local execution context + let ctx = SessionContext::new(); -/// This example demonstrates executing a simple query against an Arrow data source (a directory -/// with multiple Parquet files) and fetching results. The query is run twice, once showing -/// how to used `register_listing_table` with an absolute path, and once registering an -/// ObjectStore to use a relative path. -#[tokio::main] -async fn main() -> Result<(), Box> { + // Register the in-memory table containing the data + ctx.register_table("users", Arc::new(mem_table))?; + + let dataframe = ctx.sql("SELECT * FROM users;").await?; + + timeout(Duration::from_secs(10), async move { + let result = dataframe.collect().await.unwrap(); + let record_batch = result.first().unwrap(); + + assert_eq!(1, record_batch.column(0).len()); + dbg!(record_batch.columns()); + }) + .await + .unwrap(); + + Ok(()) +} + +fn create_memtable() -> Result { + MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]]) +} + +fn create_record_batch() -> Result { + let id_array = UInt8Array::from(vec![1]); + let account_array = UInt64Array::from(vec![9000]); + + Ok(RecordBatch::try_new( + get_schema(), + vec![Arc::new(id_array), Arc::new(account_array)], + ) + .unwrap()) +} + +fn get_schema() -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) +} + +/// The simplest way to query parquet files is to use the +/// [`SessionContext::read_parquet`] API +/// +/// For more control, you can use the lower level [`ListingOptions`] and +/// [`ListingTable`] APIS +/// +/// This example shows how to use relative and absolute paths. +/// +/// [`ListingTable`]: datafusion::datasource::listing::ListingTable +async fn query_parquet() -> Result<()> { // create local execution context let ctx = SessionContext::new(); @@ -73,13 +142,14 @@ async fn main() -> Result<(), Box> { let test_data_path = Path::new(&test_data); let test_data_path_parent = test_data_path .parent() - .ok_or("test_data path needs a parent")?; + .ok_or(exec_datafusion_err!("test_data path needs a parent"))?; std::env::set_current_dir(test_data_path_parent)?; let local_fs = Arc::new(LocalFileSystem::default()); - let u = url::Url::parse("file://./")?; + let u = url::Url::parse("file://./") + .map_err(|e| DataFusionError::External(Box::new(e)))?; ctx.register_object_store(&u, local_fs); // Register a listing table - this will use all files in the directory as data sources