Skip to content

Commit

Permalink
Merge pull request #23 from databrickslabs/main
Browse files Browse the repository at this point in the history
Points algorithms next generation
  • Loading branch information
a0x8o authored Feb 1, 2024
2 parents 2cc633f + acc0b2a commit 67aec7f
Show file tree
Hide file tree
Showing 17 changed files with 593 additions and 83 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ import com.databricks.labs.mosaic.JTS
val mosaicContext = MosaicContext.build(H3, JTS)
mosaicContext.register(spark)
```
__Note: Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters, but not Shared Access due to API changes, more [here](https://docs.databricks.com/en/udf/index.html).__
__Note: Mosaic 0.4.x SQL bindings for DBR 13 can register with Assigned clusters (as Hive UDFs), but not Shared Access due to API changes, more [here](https://docs.databricks.com/en/udf/index.html).__

## Examples

Expand Down
57 changes: 55 additions & 2 deletions docs/source/api/spatial-functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1721,7 +1721,7 @@ st_x

.. function:: st_x(col)

Returns the x coordinate of the input geometry.
Returns the x coordinate of the centroid point of the input geometry.

:param col: Geometry
:type col: Column
Expand Down Expand Up @@ -1880,7 +1880,7 @@ st_y
****
.. function:: st_y(col)

Returns the y coordinate of the input geometry.
Returns the y coordinate of the centroid point of the input geometry.

:param col: Geometry
:type col: Column
Expand Down Expand Up @@ -2036,6 +2036,59 @@ st_ymin
+-----------------+


st_z
****
.. function:: st_z(col)

Returns the z coordinate of an arbitrary point of the input geometry `geom`.

:param col: Point Geometry
:type col: Column
:rtype: Column: DoubleType

:example:

.. tabs::
.. code-tab:: py

df = spark.createDataFrame([{'wkt': 'POINT (30 10 20)'}])
df.select(st_z('wkt')).show()
+-----------------+
|st_z(wkt) |
+-----------------+
| 20.0|
+-----------------+

.. code-tab:: scala

val df = List(("POINT (30 10 20)")).toDF("wkt")
df.select(st_z(col("wkt"))).show()
+-----------------+
|st_z(wkt) |
+-----------------+
| 20.0|
+-----------------+

.. code-tab:: sql

SELECT st_z("POINT (30 10 20)")
+-----------------+
|st_z(wkt) |
+-----------------+
| 20.0|
+-----------------+

.. code-tab:: r R

df <- createDataFrame(data.frame(wkt = "POINT (30 10 20)"))
showDF(select(df, st_z(column("wkt"))), truncate=F)
+-----------------+
|st_z(wkt) |
+-----------------+
| 20.0|
+-----------------+


st_zmax
*******

Expand Down
244 changes: 234 additions & 10 deletions docs/source/api/vector-format-readers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@ Here are some common useful file formats:

* `GeoJSON <https://gdal.org/drivers/vector/geojson.html>`_ (also `ESRIJSON <https://gdal.org/drivers/vector/esrijson.html>`_,
`TopoJSON <https://gdal.org/drivers/vector/topojson.html>`_)
* `FileGDB <https://gdal.org/drivers/vector/filegdb.html>`_ (ESRI File Geodatabase) and
`OpenFileGDB <https://gdal.org/drivers/vector/openfilegdb.html>`_ (ESRI File Geodatabase vector) -
Mosaic implements named reader :ref:`spark.read.format("geo_db")` (described in this doc).
* `ESRI Shapefile <https://gdal.org/drivers/vector/shapefile.html>`_ (ESRI Shapefile / DBF) -
Mosaic implements named reader :ref:`spark.read.format("shapefile")` (described in this doc).
* `netCDF <https://gdal.org/drivers/raster/netcdf.html>`_ (Network Common Data Form) -
Mosaic supports GDAL netCDF raster reader also.
* `XLSX <https://gdal.org/drivers/vector/xlsx.html>`_, `XLS <https://gdal.org/drivers/vector/xls.html>`_,
`ODS <https://gdal.org/drivers/vector/ods.html>`_ spreadsheets
* `FileGDB <https://gdal.org/drivers/vector/filegdb.html>`_ (ESRI File Geodatabase) and `OpenFileGDB <https://gdal.org/drivers/vector/openfilegdb.html>`_ (ESRI File Geodatabase vector) - Mosaic implements named reader :ref:`spark.read.format("geo_db")` (described in this doc).
* `ESRI Shapefile <https://gdal.org/drivers/vector/shapefile.html>`_ (ESRI Shapefile / DBF) - Mosaic implements named reader :ref:`spark.read.format("shapefile")` (described in this doc).
* `netCDF <https://gdal.org/drivers/raster/netcdf.html>`_ (Network Common Data Form) - Mosaic supports GDAL netCDF raster reader also.
* `XLSX <https://gdal.org/drivers/vector/xlsx.html>`_, `XLS <https://gdal.org/drivers/vector/xls.html>`_, `ODS <https://gdal.org/drivers/vector/ods.html>`_ spreadsheets
* `TIGER <https://gdal.org/drivers/vector/tiger.html>`_ (U.S. Census TIGER/Line)
* `PGDump <https://gdal.org/drivers/vector/pgdump.html>`_ (PostgreSQL Dump)
* `KML <https://gdal.org/drivers/vector/kml.html>`_ (Keyhole Markup Language)
Expand All @@ -39,7 +34,6 @@ Additionally, for convenience, Mosaic provides specific readers for Shapefile an
* :code:`spark.read.format("geo_db")` reader for GeoDB files natively in Spark.
* :code:`spark.read.format("shapefile")` reader for Shapefiles natively in Spark.


spark.read.format("ogr")
*************************
A base Spark SQL data source for reading GDAL vector data sources.
Expand Down Expand Up @@ -296,3 +290,233 @@ The reader supports the following options:
Keyword options not identified in function signature are converted to a :code:`Map<String,String>`.
These must be supplied as a :code:`String`.
Also, you can supply function signature values as :code:`String`.

Vector File UDFs
################

It can be of use to perform various exploratory operations on vector file formats to help with processing.
The following UDFs use `fiona <https://fiona.readthedocs.io/en/stable/index.html>`_ which is already provided
as part of the dependencies of Mosaic python bindings.

We are showing the zipped variation for a larger (800MB) shapefile.
This is just one file for example purposes; you can have any number of files in real-world use.
Here is a snippet for downloading.

.. code-block:: bash
%sh
mkdir -p /dbfs/home/<username>/data/large_shapefiles
wget -nv -P /dbfs/home/<username>/data/large_shapefiles -nc https://osmdata.openstreetmap.de/download/land-polygons-split-4326.zip
ls -lh /dbfs/home/<username>/data/large_shapefiles
Then we create a spark dataframe made up of metadata to drive the examples.

.. code-block:: py
df = spark.createDataFrame([
{
'rel_path': '/land-polygons-split-4326/land_polygons.shp',
'driver': 'ESRI Shapefile',
'zip_path': '/dbfs/home/<username>/data/large_shapefiles/land-polygons-split-4326.zip'
}
])
Here is an example UDF to list layers, supporting both zipped and non-zipped.

.. code-block:: py
from pyspark.sql.functions import udf
from pyspark.sql.types import *
@udf(returnType=ArrayType(StringType()))
def list_layers(in_path, driver, zip_path=None):
"""
List layer names (in index order).
- in_path: file location for read; when used with `zip_path`,
this will be the relative path within a zip to open
- driver: name of GDAL driver to use
- zip_path: follows format 'zip:///some/file.zip' (Optional, default is None); zip gets opened something like:
`with fiona.open('/test/a.shp', vfs='zip:///tmp/dir1/test.zip', driver='<driver>') as f:`
Note: you can prepend 'zip://' for the param or leave it off in this example
"""
import fiona
z_path = zip_path
if zip_path and not zip_path.startswith("zip:"):
z_path = f"zip://{zip_path}"
return fiona.listlayers(in_path, vfs=z_path, driver=driver)
We can call the UDF, e.g.

.. code-block:: py
import pyspark.sql.functions as F
display(
df
.withColumn(
"layers",
list_layers("rel_path", "driver", "zip_path")
)
.withColumn("num_layers", F.size("layers"))
)
+--------------+--------------------+--------------------+---------------+----------+
| driver| rel_path| zip_path| layers|num_layers|
+--------------+--------------------+--------------------+---------------+----------+
|ESRI Shapefile|/land-polygons-sp...|/dbfs/home/... |[land_polygons]| 1|
+--------------+--------------------+--------------------+---------------+----------+
Here is an example UDF to count rows for a layer, supporting both zipped and non-zipped.

.. code-block:: py
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def count_vector_rows(in_path, driver, layer, zip_path=None):
"""
Count rows for the provided vector file.
- in_path: file location for read; when used with `zip_path`,
this will be the relative path within a zip to open
- driver: name of GDAL driver to use
- layer: integer (zero-indexed) or string (name)
- zip_path: follows format 'zip:///some/file.zip' (Optional, default is None); zip gets opened something like:
`with fiona.open('/test/a.shp', vfs='zip:///tmp/dir1/test.zip', driver='<driver>') as f:`
Note: you can prepend 'zip://' for the param or leave it off in this example
"""
import fiona
cnt = 0
z_path = zip_path
if zip_path and not zip_path.startswith("zip:"):
z_path = f"zip://{zip_path}"
with fiona.open(in_path, vfs=z_path, driver=driver, layer=layer) as in_file:
for item in in_file:
cnt += 1
return cnt
We can call the UDF, e.g.

.. code-block:: py
import pyspark.sql.functions as F
display(
df
.withColumn(
"row_cnt",
count_vector_rows("rel_path", "driver", F.lit(0), "zip_path")
)
)
+--------------+--------------------+--------------------+-------+
| driver| rel_path| zip_path|row_cnt|
+--------------+--------------------+--------------------+-------+
|ESRI Shapefile|/land-polygons-sp...|/dbfs/home/... | 789972|
+--------------+--------------------+--------------------+-------+
Here is an example UDF to get spark friendly schema for a layer, supporting both zipped and non-zipped.

.. code-block:: py
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def layer_schema(in_path, driver, layer, zip_path=None):
"""
Get the schema for the provided vector file layer.
- in_path: file location for read; when used with `zip_path`,
this will be the relative path within a zip to open
- driver: name of GDAL driver to use
- layer: integer (zero-indexed) or string (name)
- zip_path: follows format 'zip:///some/file.zip' (Optional, default is None); zip gets opened something like:
`with fiona.open('/test/a.shp', vfs='zip:///tmp/dir1/test.zip', driver='<driver>') as f:`
Note: you can prepend 'zip://' for the param or leave it off in this example
Returns layer schema json as string
"""
import fiona
import json
cnt = 0
z_path = zip_path
if zip_path and not zip_path.startswith("zip:"):
z_path = f"zip://{zip_path}"
with fiona.open(in_path, vfs=z_path, driver=driver, layer=layer) as in_file:
return json.dumps(in_file.schema.copy())
We can call the UDF, e.g.

.. code-block:: py
import pyspark.sql.functions as F
display(
df
.withColumn(
"layer_schema",
layer_schema("rel_path", "driver", F.lit(0), "zip_path")
)
)
+--------------+--------------------+--------------------+--------------------+
| driver| rel_path| zip_path| layer_schema|
+--------------+--------------------+--------------------+--------------------+
|ESRI Shapefile|/land-polygons-sp...|/dbfs/home/... |{"properties": {"...|
+--------------+--------------------+--------------------+--------------------+
Also, it can be useful to standardize collections of zipped vector formats to ensure all are individually zipped to work
with the provided APIs.
.. note::
Option `vsizip` in the Mosaic GDAL APIs (different API than the above fiona UDF examples) is for individually zipped
vector files (.e.g File Geodatabase or Shapefile), not collections. If you end up with mixed or unclear zipped files,
you can test them with a UDF such as shown below.
Here is an example UDF to test for zip of zips.
In this example, we can use :code:`zip_path` from :code:`df` because we left "zip://" out of the name.
.. code-block:: py
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
@udf(returnType=BooleanType())
def test_double_zip(path):
"""
Tests whether a zip contains zips, which is not supported by
Mosaic GDAL APIs.
- path: to check
Returns boolean
"""
import zipfile
try:
with zipfile.ZipFile(path, mode="r") as zip:
for f in zip.namelist():
if f.lower().endswith(".zip"):
return True
return False
except:
return False
We can call the UDF, e.g.
.. code-block:: py
display(
df
.withColumn(
"is_double_zip",
test_double_zip("zip_path")
)
)
+--------------------+-------------+
| zip_path|is_double_zip|
+--------------------+-------------+
|/dbfs/home/... | false|
+--------------------+-------------+
Though not shown here, you can then handle unzipping the "double" zips that return `True` by extending
:code:`test_double_zip` UDF to perform unzips (with a provided out_dir) or through an additional UDF, e.g. using ZipFile
`extractall <https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile.extractall>`_ function.
Loading

0 comments on commit 67aec7f

Please sign in to comment.