Skip to content

Commit

Permalink
Consolidate Examples: memtable.rs and parquet_multiple_files.rs (#13913)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and Omega359 committed Dec 28, 2024
1 parent b21b5a6 commit d089a53
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 88 deletions.
3 changes: 1 addition & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ cargo run --example dataframe
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros
- [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`optimizer_rule.rs`](examples/optimizer_rule.rs): Use a custom OptimizerRule to replace certain predicates
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
Expand All @@ -83,6 +81,7 @@ cargo run --example dataframe
- [`sql_analysis.rs`](examples/sql_analysis.rs): Analyse SQL queries with DataFusion structures
- [`sql_frontend.rs`](examples/sql_frontend.rs): Create LogicalPlans (only) from sql strings
- [`sql_dialect.rs`](examples/sql_dialect.rs): Example of implementing a custom SQL dialect on top of `DFParser`
- [`sql_query.rs`](examples/memtable.rs): Query data using SQL (in memory `RecordBatch`es, local Parquet files)q
- [`to_char.rs`](examples/to_char.rs): Examples of using the to_char function
- [`to_timestamp.rs`](examples/to_timestamp.rs): Examples of using to_timestamp functions

Expand Down
74 changes: 0 additions & 74 deletions datafusion-examples/examples/memtable.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,90 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::arrow::array::{UInt64Array, UInt8Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::MemTable;
use datafusion::error::{DataFusionError, Result};
use datafusion::prelude::SessionContext;
use datafusion_common::exec_datafusion_err;
use object_store::local::LocalFileSystem;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;

/// Examples of various ways to execute queries using SQL
///
/// [`query_memtable`]: a simple query against a [`MemTable`]
/// [`query_parquet`]: a simple query against a directory with multiple Parquet files
///
#[tokio::main]
async fn main() -> Result<()> {
query_memtable().await?;
query_parquet().await?;
Ok(())
}

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::prelude::*;
/// Run a simple query against a [`MemTable`]
pub async fn query_memtable() -> Result<()> {
let mem_table = create_memtable()?;

use object_store::local::LocalFileSystem;
// create local execution context
let ctx = SessionContext::new();

/// This example demonstrates executing a simple query against an Arrow data source (a directory
/// with multiple Parquet files) and fetching results. The query is run twice, once showing
/// how to used `register_listing_table` with an absolute path, and once registering an
/// ObjectStore to use a relative path.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Register the in-memory table containing the data
ctx.register_table("users", Arc::new(mem_table))?;

let dataframe = ctx.sql("SELECT * FROM users;").await?;

timeout(Duration::from_secs(10), async move {
let result = dataframe.collect().await.unwrap();
let record_batch = result.first().unwrap();

assert_eq!(1, record_batch.column(0).len());
dbg!(record_batch.columns());
})
.await
.unwrap();

Ok(())
}

fn create_memtable() -> Result<MemTable> {
MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]])
}

fn create_record_batch() -> Result<RecordBatch> {
let id_array = UInt8Array::from(vec![1]);
let account_array = UInt64Array::from(vec![9000]);

Ok(RecordBatch::try_new(
get_schema(),
vec![Arc::new(id_array), Arc::new(account_array)],
)
.unwrap())
}

fn get_schema() -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Field::new("id", DataType::UInt8, false),
Field::new("bank_account", DataType::UInt64, true),
]))
}

/// The simplest way to query parquet files is to use the
/// [`SessionContext::read_parquet`] API
///
/// For more control, you can use the lower level [`ListingOptions`] and
/// [`ListingTable`] APIS
///
/// This example shows how to use relative and absolute paths.
///
/// [`ListingTable`]: datafusion::datasource::listing::ListingTable
async fn query_parquet() -> Result<()> {
// create local execution context
let ctx = SessionContext::new();

Expand Down Expand Up @@ -73,13 +142,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let test_data_path = Path::new(&test_data);
let test_data_path_parent = test_data_path
.parent()
.ok_or("test_data path needs a parent")?;
.ok_or(exec_datafusion_err!("test_data path needs a parent"))?;

std::env::set_current_dir(test_data_path_parent)?;

let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")?;
let u = url::Url::parse("file://./")
.map_err(|e| DataFusionError::External(Box::new(e)))?;
ctx.register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
Expand Down

0 comments on commit d089a53

Please sign in to comment.