diff --git a/python/Cargo.toml b/python/Cargo.toml index b03f1e997..f4966167a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,7 +39,7 @@ datafusion-python = { version = "42" } pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] } pyo3-log = "0.11.0" -tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread", "sync"] } [lib] crate-type = ["cdylib"] diff --git a/python/README.md b/python/README.md index 01b0a7f90..2da048e13 100644 --- a/python/README.md +++ b/python/README.md @@ -36,7 +36,7 @@ from ballista import BallistaBuilder ## Example SQL Usage ```python ->>> ctx.sql("create external table t stored as parquet location '/mnt/bigdata/tpch/sf10-parquet/lineitem.parquet'") +>>> ctx.sql("create external table t stored as parquet location './testdata/test.parquet'") >>> df = ctx.sql("select * from t limit 5") >>> pyarrow_batches = df.collect() ``` @@ -44,7 +44,7 @@ from ballista import BallistaBuilder ## Example DataFrame Usage ```python ->>> df = ctx.read_parquet('/mnt/bigdata/tpch/sf10-parquet/lineitem.parquet').limit(5) +>>> df = ctx.read_parquet('./testdata/test.parquet').limit(5) >>> pyarrow_batches = df.collect() ``` diff --git a/python/ballista/__init__.py b/python/ballista/__init__.py index a143f17e9..5e1a0e56a 100644 --- a/python/ballista/__init__.py +++ b/python/ballista/__init__.py @@ -26,11 +26,11 @@ import pyarrow as pa from .ballista_internal import ( - BallistaBuilder, + Ballista, ) __version__ = importlib_metadata.version(__name__) __all__ = [ - "BallistaBuilder", + "Ballista", ] \ No newline at end of file diff --git a/python/examples/example.py b/python/examples/example.py index 61a9abbd2..ba2af7a2e 100644 --- a/python/examples/example.py +++ b/python/examples/example.py @@ -15,18 +15,16 @@ # specific language governing permissions and limitations # under the License. -from ballista import BallistaBuilder +# %% + +from ballista import Ballista from datafusion.context import SessionContext -# Ballista will initiate with an empty config -# set config variables with `config` -ctx: SessionContext = BallistaBuilder()\ +ctx: SessionContext = Ballista.builder\ .config("ballista.job.name", "example ballista")\ - .config("ballista.shuffle.partitions", "16")\ + .config("datafusion.execution.target_partitions", "4")\ .standalone() - -#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050) # Select 1 to verify its working ctx.sql("SELECT 1").show() -#ctx_remote.sql("SELECT 2").show() \ No newline at end of file +# %% diff --git a/python/examples/experiment.py b/python/examples/experiment.py new file mode 100644 index 000000000..0cf9bfd75 --- /dev/null +++ b/python/examples/experiment.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# %% + +from ballista import Ballista +from datafusion.context import SessionContext + +ctx: SessionContext = Ballista.builder\ + .config("ballista.job.name", "example ballista")\ + .config("datafusion.execution.target_partitions", "4")\ + .standalone() + +ctx.sql("SELECT 1").show() +# %% diff --git a/python/examples/readme.py b/python/examples/readme.py new file mode 100644 index 000000000..64515a9af --- /dev/null +++ b/python/examples/readme.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# %% + +from ballista import Ballista +from datafusion.context import SessionContext + +ctx: SessionContext = Ballista.builder\ + .config("ballista.job.name", "Readme Examples")\ + .config("datafusion.execution.target_partitions", "4")\ + .standalone() + +ctx.sql("create external table t stored as parquet location '../testdata/test.parquet'") + +# %% +df = ctx.sql("select * from t limit 5") +pyarrow_batches = df.collect() +pyarrow_batches[0].to_pandas() +# %% +df = ctx.read_parquet('../testdata/test.parquet').limit(5) +pyarrow_batches = df.collect() +pyarrow_batches[0].to_pandas() +# %% diff --git a/python/pyproject.toml b/python/pyproject.toml index 2d06b225d..fc404a3ea 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -16,7 +16,7 @@ # under the License. [build-system] -requires = ["maturin>=0.15,<0.16"] +requires = ["maturin>=1.5.1,<1.6.0"] build-backend = "maturin" [project] @@ -24,7 +24,7 @@ name = "ballista" description = "Python client for Apache Arrow Ballista Distributed SQL Query Engine" readme = "README.md" license = {file = "LICENSE.txt"} -requires-python = ">=3.6" +requires-python = ">=3.7" keywords = ["ballista", "sql", "rust", "distributed"] classifier = [ "Development Status :: 2 - Pre-Alpha", @@ -43,7 +43,8 @@ classifier = [ "Programming Language :: Rust", ] dependencies = [ - "pyarrow>=11.0.0", + "pyarrow>=11.0.0", + "datafusion==42", ] [project.urls] diff --git a/python/requirements.txt b/python/requirements.txt index a03a8f8d2..e9d03ffb1 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,3 +1,4 @@ -datafusion==35.0.0 +datafusion==42.0.0 +maturin==1.5.1 pyarrow pytest \ No newline at end of file diff --git a/python/src/lib.rs b/python/src/lib.rs index 41b4b6d31..159ea5abd 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -20,12 +20,9 @@ use datafusion::execution::SessionStateBuilder; use datafusion::prelude::*; use datafusion_python::context::PySessionContext; use datafusion_python::utils::wait_for_future; - -use std::collections::HashMap; - use pyo3::prelude::*; +use std::collections::HashMap; mod utils; -use utils::to_pyerr; #[pymodule] fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { @@ -33,12 +30,12 @@ fn ballista_internal(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { // BallistaBuilder struct m.add_class::()?; // DataFusion struct - m.add_class::()?; + // m.add_class::()?; Ok(()) } -// Ballista Builder will take a HasMap/Dict Cionfg -#[pyclass(name = "BallistaBuilder", module = "ballista", subclass)] +#[derive(Debug, Default)] +#[pyclass(name = "Ballista", module = "ballista", subclass)] pub struct PyBallistaBuilder { conf: HashMap, } @@ -47,9 +44,12 @@ pub struct PyBallistaBuilder { impl PyBallistaBuilder { #[new] pub fn new() -> Self { - Self { - conf: HashMap::new(), - } + Self::default() + } + //#[staticmethod] + #[classattr] + pub fn builder() -> Self { + Self::default() } pub fn config( @@ -66,7 +66,11 @@ impl PyBallistaBuilder { /// Construct the standalone instance from the SessionContext pub fn standalone(&self, py: Python) -> PyResult { // Build the config - let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; + let mut config: SessionConfig = SessionConfig::new_with_ballista(); + for (key, value) in &self.conf { + let _ = config.options_mut().set(&key, value); + } + // Build the state let state = SessionStateBuilder::new() .with_config(config) @@ -85,7 +89,10 @@ impl PyBallistaBuilder { /// Construct the remote instance from the SessionContext pub fn remote(&self, url: &str, py: Python) -> PyResult { // Build the config - let config: SessionConfig = SessionConfig::from_string_hash_map(&self.conf)?; + let mut config: SessionConfig = SessionConfig::new_with_ballista(); + for (key, value) in &self.conf { + let _ = config.options_mut().set(&key, value); + } // Build the state let state = SessionStateBuilder::new() .with_config(config) diff --git a/python/src/utils.rs b/python/src/utils.rs index 10278537e..176565311 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -19,6 +19,7 @@ use ballista_core::error::BallistaError; use pyo3::exceptions::PyException; use pyo3::PyErr; +#[allow(dead_code)] pub(crate) fn to_pyerr(err: BallistaError) -> PyErr { PyException::new_err(err.to_string()) }