Skip to content

Commit

Permalink
upgrade datafusion to 43.0.0
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Dec 7, 2024
1 parent 5671cba commit 9cf55b5
Show file tree
Hide file tree
Showing 44 changed files with 1,586 additions and 1,179 deletions.
1,639 changes: 969 additions & 670 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 9 additions & 18 deletions datafusion-optd-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ pub struct DynamicObjectStoreCatalog {
}

impl DynamicObjectStoreCatalog {
pub fn new(
inner: Arc<dyn CatalogProviderList>,
state: Weak<RwLock<SessionState>>,
) -> Self {
pub fn new(inner: Arc<dyn CatalogProviderList>, state: Weak<RwLock<SessionState>>) -> Self {
Self { inner, state }
}
}
Expand All @@ -68,9 +65,9 @@ impl CatalogProviderList for DynamicObjectStoreCatalog {

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let state = self.state.clone();
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _
})
self.inner
.catalog(name)
.map(|catalog| Arc::new(DynamicObjectStoreCatalogProvider::new(catalog, state)) as _)
}
}

Expand All @@ -82,10 +79,7 @@ struct DynamicObjectStoreCatalogProvider {
}

impl DynamicObjectStoreCatalogProvider {
pub fn new(
inner: Arc<dyn CatalogProvider>,
state: Weak<RwLock<SessionState>>,
) -> Self {
pub fn new(inner: Arc<dyn CatalogProvider>, state: Weak<RwLock<SessionState>>) -> Self {
Self { inner, state }
}
}
Expand All @@ -101,9 +95,9 @@ impl CatalogProvider for DynamicObjectStoreCatalogProvider {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let state = self.state.clone();
self.inner.schema(name).map(|schema| {
Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _
})
self.inner
.schema(name)
.map(|schema| Arc::new(DynamicObjectStoreSchemaProvider::new(schema, state)) as _)
}

fn register_schema(
Expand All @@ -124,10 +118,7 @@ struct DynamicObjectStoreSchemaProvider {
}

impl DynamicObjectStoreSchemaProvider {
pub fn new(
inner: Arc<dyn SchemaProvider>,
state: Weak<RwLock<SessionState>>,
) -> Self {
pub fn new(inner: Arc<dyn SchemaProvider>, state: Weak<RwLock<SessionState>>) -> Self {
Self { inner, state }
}
}
Expand Down
10 changes: 2 additions & 8 deletions datafusion-optd-cli/src/cli_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ pub trait CliSessionContext {
fn register_table_options_extension_from_scheme(&self, scheme: &str);

/// Execute a logical plan and return a DataFrame.
async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError>;
async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame, DataFusionError>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -89,10 +86,7 @@ impl CliSessionContext for SessionContext {
}
}

async fn execute_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<DataFrame, DataFusionError> {
async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame, DataFusionError> {
self.execute_logical_plan(plan).await
}
}
41 changes: 12 additions & 29 deletions datafusion-optd-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,16 @@ impl Command {
let command_batch = all_commands_info();
print_options.print_batches(command_batch.schema(), &[command_batch], now)
}
Self::ListTables => {
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
}
Self::ListTables => exec_and_print(ctx, print_options, "SHOW TABLES".into()).await,
Self::DescribeTableStmt(name) => {
exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {}", name))
.await
exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {}", name)).await
}
Self::Include(filename) => {
if let Some(filename) = filename {
let file = File::open(filename).map_err(|e| {
DataFusionError::Execution(format!(
"Error opening {:?} {}",
filename, e
))
DataFusionError::Execution(format!("Error opening {:?} {}", filename, e))
})?;
exec_from_lines(ctx, &mut BufReader::new(file), print_options)
.await?;
exec_from_lines(ctx, &mut BufReader::new(file), print_options).await?;
Ok(())
} else {
exec_err!("Required filename argument is missing")
Expand Down Expand Up @@ -112,9 +105,9 @@ impl Command {
exec_err!("{function} is not a supported function")
}
}
Self::OutputFormat(_) => exec_err!(
"Unexpected change output format, this should be handled outside"
),
Self::OutputFormat(_) => {
exec_err!("Unexpected change output format, this should be handled outside")
}
}
}

Expand All @@ -124,15 +117,11 @@ impl Command {
Self::ListTables => ("\\d", "list tables"),
Self::DescribeTableStmt(_) => ("\\d name", "describe table"),
Self::Help => ("\\?", "help"),
Self::Include(_) => {
("\\i filename", "reads input from the specified filename")
}
Self::Include(_) => ("\\i filename", "reads input from the specified filename"),
Self::ListFunctions => ("\\h", "function list"),
Self::SearchFunctions(_) => ("\\h function", "search function"),
Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"),
Self::OutputFormat(_) => {
("\\pset [NAME [VALUE]]", "set table output option\n(format)")
}
Self::OutputFormat(_) => ("\\pset [NAME [VALUE]]", "set table output option\n(format)"),
}
}
}
Expand Down Expand Up @@ -186,16 +175,10 @@ impl FromStr for Command {
("h", Some(function)) => Self::SearchFunctions(function.into()),
("i", None) => Self::Include(None),
("i", Some(filename)) => Self::Include(Some(filename.to_owned())),
("quiet", Some("true" | "t" | "yes" | "y" | "on")) => {
Self::QuietMode(Some(true))
}
("quiet", Some("false" | "f" | "no" | "n" | "off")) => {
Self::QuietMode(Some(false))
}
("quiet", Some("true" | "t" | "yes" | "y" | "on")) => Self::QuietMode(Some(true)),
("quiet", Some("false" | "f" | "no" | "n" | "off")) => Self::QuietMode(Some(false)),
("quiet", None) => Self::QuietMode(None),
("pset", Some(subcommand)) => {
Self::OutputFormat(Some(subcommand.to_string()))
}
("pset", Some(subcommand)) => Self::OutputFormat(Some(subcommand.to_string())),
("pset", None) => Self::OutputFormat(None),
_ => return Err(()),
})
Expand Down
121 changes: 87 additions & 34 deletions datafusion-optd-cli/src/exec.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Copyright (c) 2023-2024 CMU Database Group
//
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand Down Expand Up @@ -32,6 +37,8 @@ use crate::{
print_options::{MaxRows, PrintOptions},
};

use arrow::util::display::ArrayFormatter;
use arrow::util::display::FormatOptions;
use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::config::ConfigFileType;
Expand All @@ -47,6 +54,69 @@ use rustyline::error::ReadlineError;
use rustyline::Editor;
use tokio::signal;

// BEGIN optd-cli patch

pub async fn exec_from_commands_collect(
ctx: &dyn CliSessionContext,
commands: Vec<String>,
) -> Result<Vec<Vec<String>>> {
let mut result = Vec::new();
for sql in commands {
result.extend(exec_and_collect(ctx, sql).await?);
}
Ok(result)
}

/// Utility function to execute a query and collect the result.
pub async fn exec_and_collect(
ctx: &dyn CliSessionContext,
sql: String,
) -> Result<Vec<Vec<String>>> {
let sql = unescape_input(&sql)?;
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;

let mut result = Vec::new();

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let plan = create_plan(ctx, statement).await?;

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

let results = collect(physical_plan, task_ctx.clone()).await?;

let options = FormatOptions::default();
for batch in results {
let converters = batch
.columns()
.iter()
.map(|a| ArrayFormatter::try_new(a.as_ref(), &options))
.collect::<Result<Vec<_>, _>>()?;
for row_idx in 0..batch.num_rows() {
let mut row = Vec::with_capacity(batch.num_columns());
for converter in converters.iter() {
let mut buffer = String::with_capacity(8);
converter.value(row_idx).write(&mut buffer)?;
row.push(buffer);
}
result.push(row);
}
}
}
Ok(result)
}

// END optd-cli patch

/// run and execute SQL statements and commands, against a context with the given print options
pub async fn exec_from_commands(
ctx: &dyn CliSessionContext,
Expand Down Expand Up @@ -149,10 +219,7 @@ pub async fn exec_from_repl(
eprintln!("{e}")
}
} else {
eprintln!(
"'\\{}' is not a valid command",
&line[1..]
);
eprintln!("'\\{}' is not a valid command", &line[1..]);
}
} else {
println!("Output format is {:?}.", print_options.format);
Expand Down Expand Up @@ -183,9 +250,9 @@ pub async fn exec_from_repl(
},
}
// dialect might have changed
rl.helper_mut().unwrap().set_dialect(
&ctx.task_ctx().session_config().options().sql_parser.dialect,
);
rl.helper_mut()
.unwrap()
.set_dialect(&ctx.task_ctx().session_config().options().sql_parser.dialect);
}
}
Err(ReadlineError::Interrupted) => {
Expand Down Expand Up @@ -225,8 +292,7 @@ pub(super) async fn exec_and_print(

let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);
let adjusted = AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);
Expand Down Expand Up @@ -274,9 +340,7 @@ impl AdjustedPrintOptions {
// all rows
if matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
LogicalPlan::Explain(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Analyze(_)
) {
self.inner.maxrows = MaxRows::Unlimited;
}
Expand Down Expand Up @@ -314,13 +378,8 @@ async fn create_plan(
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
ctx,
&cmd.location,
&cmd.options,
format,
)
.await?;
register_object_store_and_config_extensions(ctx, &cmd.location, &cmd.options, format)
.await?;
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
Expand Down Expand Up @@ -390,8 +449,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
table_options.alter_with_string_hash_map(options)?;

// Retrieve the appropriate object store based on the scheme, URL, and modified table options
let store =
get_object_store(&ctx.session_state(), scheme, url, &table_options).await?;
let store = get_object_store(&ctx.session_state(), scheme, url, &table_options).await?;

// Register the retrieved object store in the session context's runtime environment
ctx.register_object_store(url, store);
Expand All @@ -414,13 +472,8 @@ mod tests {

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
&cmd.location,
&cmd.options,
format,
)
.await?;
register_object_store_and_config_extensions(&ctx, &cmd.location, &cmd.options, format)
.await?;
} else {
return plan_err!("LogicalPlan is not a CreateExternalTable");
}
Expand Down Expand Up @@ -462,8 +515,7 @@ mod tests {
async fn create_object_store_table_http() -> Result<()> {
// Should be OK
let location = "http://example.com/file.parquet";
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
create_external_table_test(location, &sql).await?;

Ok(())
Expand Down Expand Up @@ -580,8 +632,10 @@ mod tests {
let location = "gcs://bucket/path/file.parquet";

// for service_account_path
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('gcp.service_account_path' '{service_account_path}') LOCATION '{location}'");
let sql = format!(
"CREATE EXTERNAL TABLE test STORED AS PARQUET
OPTIONS('gcp.service_account_path' '{service_account_path}') LOCATION '{location}'"
);
let err = create_external_table_test(location, &sql)
.await
.unwrap_err();
Expand Down Expand Up @@ -611,8 +665,7 @@ mod tests {
let location = "path/to/file.parquet";

// Ensure that local files are also registered
let sql =
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
let sql = format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{location}'");
create_external_table_test(location, &sql).await.unwrap();

Ok(())
Expand Down
Loading

0 comments on commit 9cf55b5

Please sign in to comment.