diff --git a/README.md b/README.md index 1ad03e94e..f2d74baf1 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,7 @@ import com.databricks.labs.mosaic.JTS val mosaicContext = MosaicContext.build(H3, JTS) mosaicContext.register(spark) ``` -__Note: Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters, but not Shared Access due to API changes, more [here](https://docs.databricks.com/en/udf/index.html).__ +__Note: Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters (as Hive UDFs), but not Shared Access due to API changes, more [here](https://docs.databricks.com/en/udf/index.html).__ ## Examples diff --git a/docs/source/api/spatial-functions.rst b/docs/source/api/spatial-functions.rst index 73be0ce27..2158f397c 100644 --- a/docs/source/api/spatial-functions.rst +++ b/docs/source/api/spatial-functions.rst @@ -1721,7 +1721,7 @@ st_x .. function:: st_x(col) - Returns the x coordinate of the input geometry. + Returns the x coordinate of the centroid point of the input geometry. :param col: Geometry :type col: Column @@ -1880,7 +1880,7 @@ st_y **** .. function:: st_y(col) - Returns the y coordinate of the input geometry. + Returns the y coordinate of the centroid point of the input geometry. :param col: Geometry :type col: Column @@ -2036,6 +2036,59 @@ st_ymin +-----------------+ +st_z +**** +.. function:: st_z(col) + + Returns the z coordinate of an arbitrary point of the input geometry `geom`. + + :param col: Point Geometry + :type col: Column + :rtype: Column: DoubleType + + :example: + +.. tabs:: + .. code-tab:: py + + df = spark.createDataFrame([{'wkt': 'POINT (30 10 20)'}]) + df.select(st_z('wkt')).show() + +-----------------+ + |st_z(wkt) | + +-----------------+ + | 20.0| + +-----------------+ + + .. code-tab:: scala + + val df = List(("POINT (30 10 20)")).toDF("wkt") + df.select(st_z(col("wkt"))).show() + +-----------------+ + |st_z(wkt) | + +-----------------+ + | 20.0| + +-----------------+ + + .. code-tab:: sql + + SELECT st_z("POINT (30 10 20)") + +-----------------+ + |st_z(wkt) | + +-----------------+ + | 20.0| + +-----------------+ + + .. code-tab:: r R + + df <- createDataFrame(data.frame(wkt = "POINT (30 10 20)")) + showDF(select(df, st_z(column("wkt"))), truncate=F) + +-----------------+ + |st_z(wkt) | + +-----------------+ + | 20.0| + +-----------------+ + + st_zmax ******* diff --git a/docs/source/api/vector-format-readers.rst b/docs/source/api/vector-format-readers.rst index 43e8a6e08..f6821427f 100644 --- a/docs/source/api/vector-format-readers.rst +++ b/docs/source/api/vector-format-readers.rst @@ -11,15 +11,10 @@ Here are some common useful file formats: * `GeoJSON `_ (also `ESRIJSON `_, `TopoJSON `_) - * `FileGDB `_ (ESRI File Geodatabase) and - `OpenFileGDB `_ (ESRI File Geodatabase vector) - - Mosaic implements named reader :ref:`spark.read.format("geo_db")` (described in this doc). - * `ESRI Shapefile `_ (ESRI Shapefile / DBF) - - Mosaic implements named reader :ref:`spark.read.format("shapefile")` (described in this doc). - * `netCDF `_ (Network Common Data Form) - - Mosaic supports GDAL netCDF raster reader also. - * `XLSX `_, `XLS `_, - `ODS `_ spreadsheets + * `FileGDB `_ (ESRI File Geodatabase) and `OpenFileGDB `_ (ESRI File Geodatabase vector) - Mosaic implements named reader :ref:`spark.read.format("geo_db")` (described in this doc). + * `ESRI Shapefile `_ (ESRI Shapefile / DBF) - Mosaic implements named reader :ref:`spark.read.format("shapefile")` (described in this doc). + * `netCDF `_ (Network Common Data Form) - Mosaic supports GDAL netCDF raster reader also. + * `XLSX `_, `XLS `_, `ODS `_ spreadsheets * `TIGER `_ (U.S. Census TIGER/Line) * `PGDump `_ (PostgreSQL Dump) * `KML `_ (Keyhole Markup Language) @@ -39,7 +34,6 @@ Additionally, for convenience, Mosaic provides specific readers for Shapefile an * :code:`spark.read.format("geo_db")` reader for GeoDB files natively in Spark. * :code:`spark.read.format("shapefile")` reader for Shapefiles natively in Spark. - spark.read.format("ogr") ************************* A base Spark SQL data source for reading GDAL vector data sources. @@ -296,3 +290,233 @@ The reader supports the following options: Keyword options not identified in function signature are converted to a :code:`Map`. These must be supplied as a :code:`String`. Also, you can supply function signature values as :code:`String`. + +Vector File UDFs +################ + +It can be of use to perform various exploratory operations on vector file formats to help with processing. +The following UDFs use `fiona `_ which is already provided +as part of the dependencies of Mosaic python bindings. + +We are showing the zipped variation for a larger (800MB) shapefile. +This is just one file for example purposes; you can have any number of files in real-world use. +Here is a snippet for downloading. + +.. code-block:: bash + + %sh + mkdir -p /dbfs/home//data/large_shapefiles + wget -nv -P /dbfs/home//data/large_shapefiles -nc https://osmdata.openstreetmap.de/download/land-polygons-split-4326.zip + ls -lh /dbfs/home//data/large_shapefiles + +Then we create a spark dataframe made up of metadata to drive the examples. + +.. code-block:: py + + df = spark.createDataFrame([ + { + 'rel_path': '/land-polygons-split-4326/land_polygons.shp', + 'driver': 'ESRI Shapefile', + 'zip_path': '/dbfs/home//data/large_shapefiles/land-polygons-split-4326.zip' + } + ]) + +Here is an example UDF to list layers, supporting both zipped and non-zipped. + +.. code-block:: py + + from pyspark.sql.functions import udf + from pyspark.sql.types import * + + @udf(returnType=ArrayType(StringType())) + def list_layers(in_path, driver, zip_path=None): + """ + List layer names (in index order). + - in_path: file location for read; when used with `zip_path`, + this will be the relative path within a zip to open + - driver: name of GDAL driver to use + - zip_path: follows format 'zip:///some/file.zip' (Optional, default is None); zip gets opened something like: + `with fiona.open('/test/a.shp', vfs='zip:///tmp/dir1/test.zip', driver='') as f:` + Note: you can prepend 'zip://' for the param or leave it off in this example + """ + import fiona + + z_path = zip_path + if zip_path and not zip_path.startswith("zip:"): + z_path = f"zip://{zip_path}" + return fiona.listlayers(in_path, vfs=z_path, driver=driver) + +We can call the UDF, e.g. + +.. code-block:: py + + import pyspark.sql.functions as F + + display( + df + .withColumn( + "layers", + list_layers("rel_path", "driver", "zip_path") + ) + .withColumn("num_layers", F.size("layers")) + ) + +--------------+--------------------+--------------------+---------------+----------+ + | driver| rel_path| zip_path| layers|num_layers| + +--------------+--------------------+--------------------+---------------+----------+ + |ESRI Shapefile|/land-polygons-sp...|/dbfs/home/... |[land_polygons]| 1| + +--------------+--------------------+--------------------+---------------+----------+ + +Here is an example UDF to count rows for a layer, supporting both zipped and non-zipped. + +.. code-block:: py + + from pyspark.sql.functions import udf + from pyspark.sql.types import IntegerType + + @udf(returnType=IntegerType()) + def count_vector_rows(in_path, driver, layer, zip_path=None): + """ + Count rows for the provided vector file. + - in_path: file location for read; when used with `zip_path`, + this will be the relative path within a zip to open + - driver: name of GDAL driver to use + - layer: integer (zero-indexed) or string (name) + - zip_path: follows format 'zip:///some/file.zip' (Optional, default is None); zip gets opened something like: + `with fiona.open('/test/a.shp', vfs='zip:///tmp/dir1/test.zip', driver='') as f:` + Note: you can prepend 'zip://' for the param or leave it off in this example + """ + import fiona + + cnt = 0 + z_path = zip_path + if zip_path and not zip_path.startswith("zip:"): + z_path = f"zip://{zip_path}" + with fiona.open(in_path, vfs=z_path, driver=driver, layer=layer) as in_file: + for item in in_file: + cnt += 1 + return cnt + +We can call the UDF, e.g. + +.. code-block:: py + + import pyspark.sql.functions as F + + display( + df + .withColumn( + "row_cnt", + count_vector_rows("rel_path", "driver", F.lit(0), "zip_path") + ) + ) + +--------------+--------------------+--------------------+-------+ + | driver| rel_path| zip_path|row_cnt| + +--------------+--------------------+--------------------+-------+ + |ESRI Shapefile|/land-polygons-sp...|/dbfs/home/... | 789972| + +--------------+--------------------+--------------------+-------+ + + +Here is an example UDF to get spark friendly schema for a layer, supporting both zipped and non-zipped. + +.. code-block:: py + + from pyspark.sql.functions import udf + from pyspark.sql.types import StringType + + @udf(returnType=StringType()) + def layer_schema(in_path, driver, layer, zip_path=None): + """ + Get the schema for the provided vector file layer. + - in_path: file location for read; when used with `zip_path`, + this will be the relative path within a zip to open + - driver: name of GDAL driver to use + - layer: integer (zero-indexed) or string (name) + - zip_path: follows format 'zip:///some/file.zip' (Optional, default is None); zip gets opened something like: + `with fiona.open('/test/a.shp', vfs='zip:///tmp/dir1/test.zip', driver='') as f:` + Note: you can prepend 'zip://' for the param or leave it off in this example + Returns layer schema json as string + """ + import fiona + import json + + cnt = 0 + z_path = zip_path + if zip_path and not zip_path.startswith("zip:"): + z_path = f"zip://{zip_path}" + with fiona.open(in_path, vfs=z_path, driver=driver, layer=layer) as in_file: + return json.dumps(in_file.schema.copy()) + +We can call the UDF, e.g. + +.. code-block:: py + + import pyspark.sql.functions as F + + display( + df + .withColumn( + "layer_schema", + layer_schema("rel_path", "driver", F.lit(0), "zip_path") + ) + ) + +--------------+--------------------+--------------------+--------------------+ + | driver| rel_path| zip_path| layer_schema| + +--------------+--------------------+--------------------+--------------------+ + |ESRI Shapefile|/land-polygons-sp...|/dbfs/home/... |{"properties": {"...| + +--------------+--------------------+--------------------+--------------------+ + +Also, it can be useful to standardize collections of zipped vector formats to ensure all are individually zipped to work +with the provided APIs. + +.. note:: + Option `vsizip` in the Mosaic GDAL APIs (different API than the above fiona UDF examples) is for individually zipped + vector files (.e.g File Geodatabase or Shapefile), not collections. If you end up with mixed or unclear zipped files, + you can test them with a UDF such as shown below. + +Here is an example UDF to test for zip of zips. +In this example, we can use :code:`zip_path` from :code:`df` because we left "zip://" out of the name. + +.. code-block:: py + + from pyspark.sql.functions import udf + from pyspark.sql.types import BooleanType + + @udf(returnType=BooleanType()) + def test_double_zip(path): + """ + Tests whether a zip contains zips, which is not supported by + Mosaic GDAL APIs. + - path: to check + Returns boolean + """ + import zipfile + + try: + with zipfile.ZipFile(path, mode="r") as zip: + for f in zip.namelist(): + if f.lower().endswith(".zip"): + return True + return False + except: + return False + +We can call the UDF, e.g. + +.. code-block:: py + + display( + df + .withColumn( + "is_double_zip", + test_double_zip("zip_path") + ) + ) + +--------------------+-------------+ + | zip_path|is_double_zip| + +--------------------+-------------+ + |/dbfs/home/... | false| + +--------------------+-------------+ + +Though not shown here, you can then handle unzipping the "double" zips that return `True` by extending +:code:`test_double_zip` UDF to perform unzips (with a provided out_dir) or through an additional UDF, e.g. using ZipFile +`extractall `_ function. \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index d63107940..94cebe819 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -54,12 +54,12 @@ this will leverage the Databricks H3 expressions when using H3 grid system. Mosaic provides: - * easy conversion between common spatial data encodings (WKT, WKB and GeoJSON); - * constructors to easily generate new geometries from Spark native data types; - * many of the OGC SQL standard :code:`ST_` functions implemented as Spark Expressions for transforming, aggregating and joining spatial datasets; - * high performance through implementation of Spark code generation within the core Mosaic functions; - * optimisations for performing point-in-polygon joins using an approach we co-developed with Ordnance Survey (`blog post `_); and - * the choice of a Scala, SQL and Python API. +* easy conversion between common spatial data encodings (WKT, WKB and GeoJSON); +* constructors to easily generate new geometries from Spark native data types; +* many of the OGC SQL standard :code:`ST_` functions implemented as Spark Expressions for transforming, aggregating and joining spatial datasets; +* high performance through implementation of Spark code generation within the core Mosaic functions; +* optimisations for performing point-in-polygon joins using an approach we co-developed with Ordnance Survey (`blog post `_); and +* the choice of a Scala, SQL and Python API. .. note:: For Mosaic versions < 0.4 please use the `0.3 docs `_. @@ -75,37 +75,36 @@ We recommend using Databricks Runtime versions 13.3 LTS with Photon enabled. Mosaic 0.4.x series only supports DBR 13.x DBRs. If running on a different DBR it will throw an exception: -**DEPRECATION ERROR: Mosaic v0.4.x series only supports Databricks Runtime 13. You can specify -`%pip install 'databricks-mosaic<0.4,>=0.3'` for DBR < 13.** + DEPRECATION ERROR: Mosaic v0.4.x series only supports Databricks Runtime 13. + You can specify `%pip install 'databricks-mosaic<0.4,>=0.3'` for DBR < 13. Mosaic 0.4.x series issues an ERROR on standard, non-Photon clusters `ADB `_ | `AWS `_ | -`GCP `_ : +`GCP `_: -**DEPRECATION ERROR: Please use a Databricks Photon-enabled Runtime for performance benefits or Runtime ML for spatial -AI benefits; Mosaic 0.4.x series restricts executing this cluster.** + DEPRECATION ERROR: Please use a Databricks Photon-enabled Runtime for performance benefits or Runtime ML for + spatial AI benefits; Mosaic 0.4.x series restricts executing this cluster. As of Mosaic 0.4.0 (subject to change in follow-on releases) - * `Assigned Clusters `_ : Mosaic Python, SQL, R, and Scala APIs. - * `Shared Access Clusters `_ : Mosaic Scala API (JVM) with - Admin `allowlisting `_ ; - Python bindings to Mosaic Scala APIs are blocked by Py4J Security on Shared Access Clusters. +* `Assigned Clusters `_: Mosaic Python, SQL, R, and Scala APIs. +* `Shared Access Clusters `_: Mosaic Scala API (JVM) with + Admin `allowlisting `_; + Python bindings to Mosaic Scala APIs are blocked by Py4J Security on Shared Access Clusters. .. warning:: - Mosaic SQL expressions cannot yet be registered with `Unity Catalog `_ - due to API changes affecting DBRs >= 13, more `here `_. + Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters (as Hive UDFs), but not Shared Access due + to `Unity Catalog `_ API changes, more `here `_. .. note:: As of Mosaic 0.4.0 (subject to change in follow-on releases) - * `Unity Catalog `_ : Enforces process isolation which is difficult to - accomplish with custom JVM libraries; as such only built-in (aka platform provided) JVM APIs can be invoked from other - supported languages in Shared Access Clusters. - * `Volumes `_ : Along the same principle of isolation, - clusters (both assigned and shared access) can read Volumes via relevant built-in readers and writers or via custom - python calls which do not involve any custom JVM code. - + * `Unity Catalog `_ enforces process isolation which is difficult + to accomplish with custom JVM libraries; as such only built-in (aka platform provided) JVM APIs can be invoked from + other supported languages in Shared Access Clusters. + * Along the same principle of isolation, clusters (both Assigned and Shared Access) can read + `Volumes `_ via relevant built-in readers and + writers or via custom python calls which do not involve any custom JVM code. Version 0.3.x Series ==================== @@ -119,10 +118,10 @@ For Mosaic versions < 0.4.0 please use the `0.3.x docs `_ | `AWS `_ | -`GCP `_ : +`GCP `_: -**DEPRECATION WARNING: Please use a Databricks Photon-enabled Runtime for performance benefits or Runtime ML for spatial -AI benefits; Mosaic will stop working on this cluster after v0.3.x.** + DEPRECATION WARNING: Please use a Databricks Photon-enabled Runtime for performance benefits or Runtime ML for spatial + AI benefits; Mosaic will stop working on this cluster after v0.3.x. If you are receiving this warning in v0.3.11+, you will want to begin to plan for a supported runtime. The reason we are making this change is that we are streamlining Mosaic internals to be more aligned with future product APIs which are diff --git a/docs/source/usage/automatic-sql-registration.rst b/docs/source/usage/automatic-sql-registration.rst index a653f75b9..6175394d6 100644 --- a/docs/source/usage/automatic-sql-registration.rst +++ b/docs/source/usage/automatic-sql-registration.rst @@ -11,7 +11,8 @@ to your Spark / Databricks cluster to perform spatial queries or integrating Spa with a geospatial middleware component such as [Geoserver](https://geoserver.org/). .. warning:: - Mosaic 0.4.x SQL bindings for DBR 13 not yet available in Unity Catalog due to API changes. + Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters (as Hive UDFs), but not Shared Access due + to `Unity Catalog `_ API changes, more `here `_. Pre-requisites ************** @@ -76,7 +77,7 @@ Testing To test the installation, create a new Python notebook and run the following commands (similar for :code:`grid_` and :code:`rst_`, not shown): -.. code-block:: python +.. code-block:: py sql("""SHOW FUNCTIONS""").where("startswith(function, 'st_')").display() @@ -88,9 +89,10 @@ You should see all the supported :code:`ST_` functions registered by Mosaic appe Fig 1. Show Functions Example .. note:: - You may see some :code:`ST_` functions from other libraries, so pay close attention to the provider. + You may see some :code:`ST_` functions from other libraries, so pay close attention to the provider; + also, function auto-complete in the UI may not list custom registered SQL expressions. -.. code-block:: python +.. code-block:: py sql("""DESCRIBE FUNCTION st_buffer""") @@ -99,10 +101,6 @@ You should see all the supported :code:`ST_` functions registered by Mosaic appe Fig 2. Describe Function Example -.. warning:: - Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters, but not Shared Access due to API changes, - more `here `_. - .. warning:: Issue 317: https://github.com/databrickslabs/mosaic/issues/317 Mosaic jar needs to be installed via init script and not through the cluster UI. diff --git a/docs/source/usage/install-gdal.rst b/docs/source/usage/install-gdal.rst index 97fba5d5e..e919940aa 100644 --- a/docs/source/usage/install-gdal.rst +++ b/docs/source/usage/install-gdal.rst @@ -41,7 +41,7 @@ the :code:`setup_gdal` function. All of the listed parameters are optional. You can have even more control with :code:`setup_fuse_install` function. :param to_fuse_dir: Path to write out the init script for GDAL installation; - default is '/Workspace/Shared/geospatial/mosaic/gdal/jammy'. + default is "/Workspace/Shared/geospatial/mosaic/gdal/jammy". :type to_fuse_dir: str :param with_mosaic_pip: Whether to configure a script that pip installs databricks-mosaic, fixed to the current version; default is False. @@ -50,10 +50,10 @@ the :code:`setup_gdal` function. default is False. :type with_ubuntugis: bool :param script_out_name: name of the script to be written; - default is 'mosaic-gdal-init.sh'. + default is "mosaic-gdal-init.sh". :type script_out_name: str :param override_mosaic_version: String value to use to override the mosaic version to install, - e.g. '==0.4.0' or '<0.5,>=0.4'; default is None. + e.g. "==0.4.0" or "<0.5,>=0.4"; default is None. :type override_mosaic_version: str :rtype: bool @@ -106,6 +106,6 @@ code at the top of the notebook: GDAL enabled. GDAL 3.4.1, released 2021/12/27 - .. note:: - You can configure init script from default ubuntu GDAL (3.4.1) to ubuntugis ppa @ https://launchpad.net/~ubuntugis/+archive/ubuntu/ppa (3.4.3) - with `setup_gdal(with_ubuntugis=True)` \ No newline at end of file +.. note:: + You can configure init script from default ubuntu GDAL (3.4.1) to ubuntugis ppa @ https://launchpad.net/~ubuntugis/+archive/ubuntu/ppa (3.4.3) + with `setup_gdal(with_ubuntugis=True)` \ No newline at end of file diff --git a/docs/source/usage/installation.rst b/docs/source/usage/installation.rst index b74e53f4b..d1cf54d1e 100644 --- a/docs/source/usage/installation.rst +++ b/docs/source/usage/installation.rst @@ -10,36 +10,32 @@ Supported platforms Mosaic 0.4.x series only supports DBR 13.x DBRs. If running on a different DBR it will throw an exception: -**DEPRECATION ERROR: Mosaic v0.4.x series only supports Databricks Runtime 13. You can specify -`%pip install 'databricks-mosaic<0.4,>=0.3'` for DBR < 13.** + DEPRECATION ERROR: Mosaic v0.4.x series only supports Databricks Runtime 13. + You can specify `%pip install 'databricks-mosaic<0.4,>=0.3'` for DBR < 13. Mosaic 0.4.x series issues an ERROR on standard, non-Photon clusters `ADB `_ | `AWS `_ | -`GCP `_ : +`GCP `_: -**DEPRECATION ERROR: Please use a Databricks Photon-enabled Runtime for performance benefits or Runtime ML for spatial -AI benefits; Mosaic 0.4.x series restricts executing this cluster.** + DEPRECATION ERROR: Please use a Databricks Photon-enabled Runtime for performance benefits or Runtime ML for + spatial AI benefits; Mosaic 0.4.x series restricts executing this cluster. As of Mosaic 0.4.0 (subject to change in follow-on releases) - * `Assigned Clusters `_ : Mosaic Python, SQL, R, and Scala APIs. - * `Shared Access Clusters `_ : Mosaic Scala API (JVM) with - Admin `allowlisting `_ ; - Python bindings to Mosaic Scala APIs are blocked by Py4J Security on Shared Access Clusters. - -.. warning:: - Mosaic SQL expressions cannot yet be registered with `Unity Catalog `_ - due to API changes affecting DBRs >= 13, more `here `_. +* `Assigned Clusters `_: Mosaic Python, SQL, R, and Scala APIs. +* `Shared Access Clusters `_: Mosaic Scala API (JVM) with + Admin `allowlisting `_; + Python bindings to Mosaic Scala APIs are blocked by Py4J Security on Shared Access Clusters. .. note:: As of Mosaic 0.4.0 (subject to change in follow-on releases) - * `Unity Catalog `_ : Enforces process isolation which is difficult to - accomplish with custom JVM libraries; as such only built-in (aka platform provided) JVM APIs can be invoked from other - supported languages in Shared Access Clusters. - * `Volumes `_ : Along the same principle of isolation, - clusters (both assigned and shared access) can read Volumes via relevant built-in readers and writers or via custom - python calls which do not involve any custom JVM code. + * `Unity Catalog `_ enforces process isolation which is difficult + to accomplish with custom JVM libraries; as such only built-in (aka platform provided) JVM APIs can be invoked from + other supported languages in Shared Access Clusters. + * Along the same principle of isolation, clusters (both assigned and shared access) can read + `Volumes `_ via relevant built-in readers and + writers or via custom python calls which do not involve any custom JVM code. If you have cluster creation permissions in your Databricks workspace, you can create a cluster using the instructions @@ -64,7 +60,8 @@ or from within a Databricks notebook using the :code:`%pip` magic command, e.g. %pip install databricks-mosaic -If you need to install Mosaic 0.3 series for DBR 12.2 LTS, e.g. +When you install with :code:`%pip`, the JAR is availalbe from the Python WHL (see the "Enabling" section for more); also, +if you need to install Mosaic 0.3 series for DBR 12.2 LTS, e.g. .. code-block:: bash @@ -116,11 +113,43 @@ The mechanism for enabling the Mosaic functions varies by language: enableMosaic() .. note:: - We recommend :code:`import mosaic as mos` to namespace the python api and avoid any conflicts with other similar functions. + We recommend use of `import mosaic as mos` to namespace the python api and avoid any conflicts with other similar + functions. By default, the python import will handle installing the JAR and registering Hive UDFs which is suitable + for Assigned (vs Shared Access) clusters. + +Unless you are specially adding the JAR to your cluster (outside :code:`%pip` or the WHL file), please always initialize +with Python first, then you can initialize Scala (after the JAR has been auto-attached by python); otherwise, you don't +need to initialize Scala unless you are using that language binding. You can further configure Mosaic enable with spark +confs as well as through extra params in Mosaic 0.4.x series :code:`enable_mosaic` function. + +.. function:: enable_mosaic() + + Use this function at the start of your workflow to ensure all the required dependencies are installed and + Mosaic is configured according to your needs. + + :param spark: The active spark session. + :type spark: pyspark.sql.SparkSession + :param dbutils: Specify dbutils object used for :code:`display` and :code:`displayHTML` functions, needed for Kepler integration (Optional, default is None). + :type dbutils: dbruntime.dbutils.DBUtils + :param log_info: True will try to setLogLevel to "info", False will not (Optional, default is False). + :type log_info: bool + :param jar_path: If provided, sets :code:`"spark.databricks.labs.mosaic.jar.path"` (Optional, default is None). + :type jar_path: str + :param jar_autoattach: False will not registers the JAR; sets :code:`"spark.databricks.labs.mosaic.jar.autoattach"` to False, True will register the JAR (Optional, default is True). + :type jar_autoattach: bool + :rtype: None + +Users can control various aspects of Mosaic's operation with the following Spark confs: + + * :code:`"spark.databricks.labs.mosaic.jar.autoattach"` - Automatically attach the Mosaic JAR to the Databricks cluster (Optional, default is "true"). + * :code:`"spark.databricks.labs.mosaic.jar.path"` - Explicitly specify the path to the Mosaic JAR (Optional and not required at all in a standard Databricks environment). + * :code:`"spark.databricks.labs.mosaic.geometry.api"` - Explicitly specify the underlying geometry library to use for spatial operations (Optional, default is "JTS"). + * :code:`"spark.databricks.labs.mosaic.index.system"` - Explicitly specify the index system to use for optimized spatial joins (Optional, default is "H3"). + SQL usage ********* -If you have not employed :ref:`Automatic SQL registration`, you will need to +If you have not employed :ref:`Automatic SQL registration` (on by default and handled by Python enable in notebook), you will need to register the Mosaic SQL functions in your SparkSession from a Scala notebook cell: .. code-block:: scala @@ -133,5 +162,5 @@ register the Mosaic SQL functions in your SparkSession from a Scala notebook cel mosaicContext.register(spark) .. warning:: - Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters, but not Shared Access due to API changes, - more `here `_. + Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters (as Hive UDFs), but not Shared Access due + to `Unity Catalog `_ API changes, more `here `_. diff --git a/python/mosaic/api/functions.py b/python/mosaic/api/functions.py index 9819caa5f..e2165195b 100644 --- a/python/mosaic/api/functions.py +++ b/python/mosaic/api/functions.py @@ -50,6 +50,7 @@ "st_zmax", "st_x", "st_y", + "st_z", "flatten_polygons", "grid_boundaryaswkb", "grid_boundary", @@ -753,7 +754,7 @@ def st_updatesrid( def st_x(geom: ColumnOrName) -> Column: """ - Returns the x coordinate of the input geometry `geom`. + Returns the x coordinate of the centroid point of the input geometry `geom`. Parameters ---------- @@ -769,7 +770,7 @@ def st_x(geom: ColumnOrName) -> Column: def st_y(geom: ColumnOrName) -> Column: """ - Returns the y coordinate of the input geometry `geom`. + Returns the y coordinate of the centroid point of the input geometry `geom`. Parameters ---------- @@ -783,6 +784,22 @@ def st_y(geom: ColumnOrName) -> Column: return config.mosaic_context.invoke_function("st_y", pyspark_to_java_column(geom)) +def st_z(geom: ColumnOrName) -> Column: + """ + Returns the z coordinate of an arbitrary point of the input geometry `geom`. + + Parameters + ---------- + geom : Column + + Returns + ------- + Column (DoubleType) + + """ + return config.mosaic_context.invoke_function("st_z", pyspark_to_java_column(geom)) + + def st_geometrytype(geom: ColumnOrName) -> Column: """ Returns the type of the input geometry `geom` (“POINT”, “LINESTRING”, “POLYGON” etc.). diff --git a/python/test/test_vector_functions.py b/python/test/test_vector_functions.py index 3a127327a..8dbb191a9 100644 --- a/python/test/test_vector_functions.py +++ b/python/test/test_vector_functions.py @@ -1,6 +1,6 @@ import random -from pyspark.sql.functions import abs, col, first, lit, sqrt +from pyspark.sql.functions import abs, col, concat, first, lit, sqrt from .context import api from .utils import MosaicTestCase @@ -27,6 +27,25 @@ def test_st_point(self): ) self.assertListEqual([rw.points for rw in result], expected) + def test_st_z(self): + expected = [ + 0, + 1, + ] + result = ( + self.spark.range(2) + .select(col("id").cast("double")) + .withColumn( + "points", + api.st_geomfromwkt( + concat(lit("POINT (9 9 "), "id", lit(")")) + ), + ) + .withColumn("z", api.st_z("points")) + .collect() + ) + self.assertListEqual([rw.z for rw in result], expected) + def test_st_bindings_happy_flow(self): # Checks that the python bindings do not throw exceptions # Not testing the logic, since that is tested in Scala diff --git a/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometry.scala b/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometry.scala index 0093aa7c5..3c639446e 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometry.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometry.scala @@ -41,6 +41,8 @@ trait MosaicGeometry extends GeometryWriter with Serializable { def getCentroid: MosaicPoint + def getAnyPoint: MosaicPoint + def getDimension: Int def isEmpty: Boolean diff --git a/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometryJTS.scala b/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometryJTS.scala index d509bc9ec..e860d0b67 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometryJTS.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/geometry/MosaicGeometryJTS.scala @@ -55,6 +55,16 @@ abstract class MosaicGeometryJTS(geom: Geometry) extends MosaicGeometry { MosaicPointJTS(centroid) } + override def getAnyPoint: MosaicPointJTS = { + // while this doesn't return the centroid but an arbitrary point via getCoordinate in JTS, + // inlike getCentroid this supports a Z coordinate. + + val coord = geom.getCoordinate + val gf = new GeometryFactory() + val point = gf.createPoint(coord) + MosaicPointJTS(point) + } + override def isEmpty: Boolean = geom.isEmpty override def boundary: MosaicGeometryJTS = MosaicGeometryJTS(geom.getBoundary) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_X.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_X.scala index f95af0921..bfb87ad71 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_X.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_X.scala @@ -44,7 +44,7 @@ object ST_X extends WithExpressionInfo { override def name: String = "st_x" override def usage: String = - "_FUNC_(expr1) - Returns x coordinate of a point or x coordinate of the centroid if the geometry isnt a point." + "_FUNC_(expr1) - Returns x coordinate of a point or x coordinate of the centroid if the geometry isn't a point." override def example: String = """ diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Y.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Y.scala index ff626ec0b..6e62fa7bf 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Y.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Y.scala @@ -44,7 +44,7 @@ object ST_Y extends WithExpressionInfo { override def name: String = "st_y" override def usage: String = - "_FUNC_(expr1) - Returns y coordinate of a point or y coordinate of the centroid if the geometry isnt a point." + "_FUNC_(expr1) - Returns y coordinate of a point or y coordinate of the centroid if the geometry isn't a point." override def example: String = """ diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Z.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Z.scala new file mode 100644 index 000000000..ea4e0d67a --- /dev/null +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/geometry/ST_Z.scala @@ -0,0 +1,57 @@ +package com.databricks.labs.mosaic.expressions.geometry + +import com.databricks.labs.mosaic.core.geometry.MosaicGeometry +import com.databricks.labs.mosaic.expressions.base.{GenericExpressionFactory, WithExpressionInfo} +import com.databricks.labs.mosaic.expressions.geometry.base.UnaryVectorExpression +import com.databricks.labs.mosaic.functions.MosaicExpressionConfig +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.types.{DataType, DoubleType} + +/** + * SQL expression that returns Z coordinate of the input point. Input must be a point. + * + * @param inputGeom + * Expression containing the geometry. + * @param expressionConfig + * Mosaic execution context, e.g. geometryAPI, indexSystem, etc. Additional + * arguments for the expression (expressionConfigs). + */ +case class ST_Z( + inputGeom: Expression, + expressionConfig: MosaicExpressionConfig +) extends UnaryVectorExpression[ST_Z](inputGeom, returnsGeometry = false, expressionConfig) { + + override def dataType: DataType = DoubleType + + override def geometryTransform(geometry: MosaicGeometry): Any = geometry.getAnyPoint.getZ + + override def geometryCodeGen(geometryRef: String, ctx: CodegenContext): (String, String) = { + val resultRef = ctx.freshName("result") + val code = s"""double $resultRef = $geometryRef.getAnyPoint().getZ();""" + (code, resultRef) + } + +} + +/** Expression info required for the expression registration for spark SQL. */ +object ST_Z extends WithExpressionInfo { + + override def name: String = "st_z" + + override def usage: String = + "_FUNC_(expr1) - Returns z coordinate of a point or z coordinate of an arbitrary point in geometry if it isn't a point." + + override def example: String = + """ + | Examples: + | > SELECT _FUNC_(a); + | 12.3 + | """.stripMargin + + override def builder(expressionConfig: MosaicExpressionConfig): FunctionBuilder = { + GenericExpressionFactory.getBaseBuilder[ST_Z](1, expressionConfig) + } + +} diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala index 596d1e2a7..905d6962e 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicContext.scala @@ -181,6 +181,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends mosaicRegistry.registerExpression[ST_Within](expressionConfig) mosaicRegistry.registerExpression[ST_X](expressionConfig) mosaicRegistry.registerExpression[ST_Y](expressionConfig) + mosaicRegistry.registerExpression[ST_Z](expressionConfig) mosaicRegistry.registerExpression[ST_Haversine](expressionConfig) // noinspection ScalaDeprecation @@ -600,6 +601,7 @@ class MosaicContext(indexSystem: IndexSystem, geometryAPI: GeometryAPI) extends ColumnAdapter(ST_Translate(geom1.expr, xd.expr, yd.expr, expressionConfig)) def st_x(geom: Column): Column = ColumnAdapter(ST_X(geom.expr, expressionConfig)) def st_y(geom: Column): Column = ColumnAdapter(ST_Y(geom.expr, expressionConfig)) + def st_z(geom: Column): Column = ColumnAdapter(ST_Z(geom.expr, expressionConfig)) def st_xmax(geom: Column): Column = ColumnAdapter(ST_MinMaxXYZ(geom.expr, expressionConfig, "X", "MAX")) def st_xmin(geom: Column): Column = ColumnAdapter(ST_MinMaxXYZ(geom.expr, expressionConfig, "X", "MIN")) def st_ymax(geom: Column): Column = ColumnAdapter(ST_MinMaxXYZ(geom.expr, expressionConfig, "Y", "MAX")) diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/geometry/ST_ZBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/geometry/ST_ZBehaviors.scala new file mode 100644 index 000000000..3ce7ad57d --- /dev/null +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/geometry/ST_ZBehaviors.scala @@ -0,0 +1,87 @@ +package com.databricks.labs.mosaic.expressions.geometry + +import com.databricks.labs.mosaic.functions.MosaicContext +import com.databricks.labs.mosaic.test.MosaicSpatialQueryTest +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator} +import org.apache.spark.sql.execution.WholeStageCodegenExec +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types._ +import org.scalatest.matchers.must.Matchers.noException +import org.scalatest.matchers.should.Matchers.{an, be, convertToAnyShouldWrapper} + +trait ST_ZBehaviors extends MosaicSpatialQueryTest { + + def stzBehavior(mosaicContext: MosaicContext): Unit = { + spark.sparkContext.setLogLevel("FATAL") + val mc = mosaicContext + import mc.functions._ + val sc = spark + import sc.implicits._ + mc.register(spark) + + val rows = List( + ("POINT (2 3 5)", 5), + ("POINT (7 11 13)", 13), + ("POINT (17 19 23)", 23), + ("POINT (29 31 37)", 37) + ) + + val result = rows + .toDF("wkt", "expected") + .withColumn("result", st_z($"wkt")) + .where($"expected" === $"result") + + result.count shouldBe 4 + } + + def stzCodegen(mosaicContext: MosaicContext): Unit = { + spark.sparkContext.setLogLevel("FATAL") + val mc = mosaicContext + val sc = spark + import mc.functions._ + import sc.implicits._ + mc.register(spark) + + val rows = List( + ("POINT (2 3 5)", 5), + ("POINT (7 11 13)", 13), + ("POINT (17 19 23)", 23), + ("POINT (29 31 37)", 37) + ) + + val points = rows.toDF("wkt", "expected") + + val result = points + .withColumn("result", st_z($"wkt")) + .where($"expected" === $"result") + + val queryExecution = result.queryExecution + val plan = queryExecution.executedPlan + + val wholeStageCodegenExec = plan.find(_.isInstanceOf[WholeStageCodegenExec]) + + wholeStageCodegenExec.isDefined shouldBe true + + val codeGenStage = wholeStageCodegenExec.get.asInstanceOf[WholeStageCodegenExec] + val (_, code) = codeGenStage.doCodeGen() + + noException should be thrownBy CodeGenerator.compile(code) + + val stZ = ST_Z(lit(1).expr, mc.expressionConfig) + val ctx = new CodegenContext + an[Error] should be thrownBy stZ.genCode(ctx) + } + + def auxiliaryMethods(mosaicContext: MosaicContext): Unit = { + spark.sparkContext.setLogLevel("FATAL") + val mc = mosaicContext + mc.register(spark) + + val stZ = ST_Z(lit("POINT (2 3 4)").expr, mc.expressionConfig) + + stZ.child shouldEqual lit("POINT (2 3 4)").expr + stZ.dataType shouldEqual DoubleType + noException should be thrownBy stZ.makeCopy(Array(stZ.child)) + } + +} diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/geometry/ST_ZTest.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/geometry/ST_ZTest.scala new file mode 100644 index 000000000..4e2e99b26 --- /dev/null +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/geometry/ST_ZTest.scala @@ -0,0 +1,13 @@ +package com.databricks.labs.mosaic.expressions.geometry + +import com.databricks.labs.mosaic.test.MosaicSpatialQueryTest +import org.apache.spark.sql.test.SharedSparkSession + +class ST_ZTest extends MosaicSpatialQueryTest with SharedSparkSession with ST_ZBehaviors { + + testAllGeometriesNoCodegen("Testing stZ NO_CODEGEN") { stzBehavior } + testAllGeometriesCodegen("Testing stZ CODEGEN") { stzBehavior } + testAllGeometriesCodegen("Testing stZ CODEGEN compilation") { stzCodegen } + testAllGeometriesNoCodegen("Testing stZ auxiliary methods") { auxiliaryMethods } + +}