Skip to content

Commit

Permalink
Add the concept of raster-tile to the schema and types.
Browse files Browse the repository at this point in the history
Update the logic of expressions and file readers.
Update tests.
  • Loading branch information
milos.colic committed Oct 11, 2023
1 parent 43e483f commit 3d6b648
Show file tree
Hide file tree
Showing 107 changed files with 1,027 additions and 927 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package com.databricks.labs.mosaic.core.index

import org.apache.spark.sql.SparkSession

object IndexSystemFactory {

def getIndexSystem(spark: SparkSession): IndexSystem = {
val indexSystem = spark.conf.get("spark.databricks.labs.mosaic.index.system", "H3")
getIndexSystem(indexSystem)
}

def getIndexSystem(name: String): IndexSystem = {
val customIndexRE = "CUSTOM\\((-?\\d+), ?(-?\\d+), ?(-?\\d+), ?(-?\\d+), ?(\\d+), ?(\\d+), ?(\\d+) ?\\)".r
val customIndexWithCRSRE = "CUSTOM\\((-?\\d+), ?(-?\\d+), ?(-?\\d+), ?(-?\\d+), ?(\\d+), ?(\\d+), ?(\\d+), ?(\\d+) ?\\)".r
Expand Down Expand Up @@ -35,4 +42,5 @@ object IndexSystemFactory {
case _ => throw new Error("Index not supported yet!")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@ import org.gdal.osr.SpatialReference
* A flag to indicate if the raster is in memory or not.
*/
abstract class MosaicRaster(
isInMem: Boolean
isInMem: Boolean,
parentPath: String,
driverShortName: String
) extends Serializable
with RasterWriter
with RasterCleaner {

def getSubdataset(subsetName: String): MosaicRaster

def getDriversShortName: String = driverShortName

def getParentPath: String = parentPath

def getRasterFileExtension: String = getRaster.GetDriver().GetMetadataItem("DMD_EXTENSION")

def asTemp: MosaicRaster

def flushCache(): MosaicRaster = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.databricks.labs.mosaic.core.raster.api

import com.databricks.labs.mosaic.core.raster._
import com.databricks.labs.mosaic.core.raster.gdal_raster.{MosaicRasterGDAL, RasterCleaner, RasterReader, RasterTransform}
import com.databricks.labs.mosaic.utils.PathUtils
import org.apache.spark.sql.types.{BinaryType, DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.gdal.gdal.gdal
Expand All @@ -13,17 +14,28 @@ import org.gdal.gdal.gdal
*/
abstract class RasterAPI(reader: RasterReader) extends Serializable {

def readRaster(inputRaster: Any, inputDT: DataType): MosaicRaster = {
def getExtension(driverShortName: String): String

def readRaster(
inputRaster: Any,
parentPath: String,
shortDriverName: String,
inputDT: DataType,
readDirect: Boolean = false
): MosaicRaster = {
inputDT match {
case StringType =>
val path = inputRaster.asInstanceOf[UTF8String].toString
reader.readRaster(path)
val isSubdataset = PathUtils.isSubdataset(path)
reader.readRaster(path, parentPath, shortDriverName, readDirect || isSubdataset)
case BinaryType =>
val bytes = inputRaster.asInstanceOf[Array[Byte]]
val raster = reader.readRaster(bytes)
val raster = reader.readRaster(bytes, parentPath, shortDriverName)
// If the raster is coming as a byte array, we can't check for zip condition.
// We first try to read the raster directly, if it fails, we read it as a zip.
Option(raster).getOrElse(reader.readRaster(bytes))
Option(raster).getOrElse(
reader.readRaster(bytes, parentPath, shortDriverName)
)
}
}

Expand Down Expand Up @@ -67,9 +79,11 @@ abstract class RasterAPI(reader: RasterReader) extends Serializable {
* @return
* Returns a Raster object.
*/
def raster(path: String): MosaicRaster = reader.readRaster(path)
def raster(path: String, parentPath: String, driverShortName: String): MosaicRaster =
reader.readRaster(path, parentPath, driverShortName)

def raster(content: Array[Byte]): MosaicRaster = reader.readRaster(content)
def raster(content: Array[Byte], parentPath: String, driverShortName: String): MosaicRaster =
reader.readRaster(content, parentPath, driverShortName)

/**
* Reads a raster from the given path. It extracts the specified band from
Expand All @@ -83,7 +97,8 @@ abstract class RasterAPI(reader: RasterReader) extends Serializable {
* @return
* Returns a Raster band object.
*/
def band(path: String, bandIndex: Int): MosaicRasterBand = reader.readBand(path, bandIndex)
def band(path: String, bandIndex: Int, parentPath: String, driverShortName: String): MosaicRasterBand =
reader.readBand(path, bandIndex, parentPath, driverShortName)

/**
* Converts raster x, y coordinates to lat, lon coordinates.
Expand Down Expand Up @@ -165,6 +180,11 @@ object RasterAPI extends Serializable {
if (gdal.GetDriverCount() == 0) gdal.AllRegister()
}

override def getExtension(driverShortName: String): String = {
val driver = gdal.GetDriverByName(driverShortName)
driver.GetMetadataItem("DMD_EXTENSION")
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.labs.mosaic.core.raster.gdal_raster

import com.databricks.labs.mosaic.GDAL
import com.databricks.labs.mosaic.core.geometry.MosaicGeometry
import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.IndexSystem
Expand All @@ -22,7 +23,15 @@ import scala.util.Try

/** GDAL implementation of the MosaicRaster trait. */
//noinspection DuplicatedCode
class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: Boolean) extends MosaicRaster(isInMem = false) {
class MosaicRasterGDAL(
_uuid: Long,
var raster: Dataset,
path: String,
isTemp: Boolean,
parentPath: String,
driverShortName: String,
memSize: Long
) extends MosaicRaster(isInMem = false, parentPath, driverShortName) {

protected val crsFactory: CRSFactory = new CRSFactory

Expand All @@ -31,6 +40,10 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
wsg84.ImportFromEPSG(4326)
wsg84.SetAxisMappingStrategy(osr.osrConstants.OAMS_TRADITIONAL_GIS_ORDER)

def openRaster(path: String): Dataset = {
MosaicRasterGDAL.openRaster(path, driverShortName)
}

override def metadata: Map[String, String] = {
Option(raster.GetMetadataDomainList())
.map(_.toArray)
Expand All @@ -56,9 +69,11 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
keys.flatMap(key =>
if (key.toUpperCase(Locale.ROOT).contains("NAME")) {
val path = subdatasetsMap(key)
val pieces = path.split(":")
Seq(
key -> path.split(":").last,
path.split(":").last -> path
key -> pieces.last,
s"${pieces.last}_vsimem" -> path,
pieces.last -> s"${pieces.head}:$parentPath:${pieces.last}"
)
} else Seq(key -> subdatasetsMap(key))
).toMap
Expand Down Expand Up @@ -157,7 +172,7 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
val cellGeom = indexSystem.indexToGeometry(cellID, geometryAPI)
// buffer by diagonal size of the raster pixel to avoid clipping issues
// add 1% to avoid rounding errors
val bufferR = pixelDiagSize*1.01
val bufferR = pixelDiagSize * 1.01
val bufferedCell = cellGeom.buffer(bufferR)
val geomCRS =
if (cellGeom.getSpatialReference == 0) wsg84
Expand Down Expand Up @@ -194,15 +209,21 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B

/** @return Returns the amount of memory occupied by the file in bytes. */
override def getMemSize: Long = {
if (PathUtils.isInMemory(path)) {
val tempPath = PathUtils.createTmpFilePath(this.uuid.toString, getExtension)
writeToPath(tempPath)
val size = Files.size(Paths.get(tempPath))
Files.delete(Paths.get(tempPath))
size
if (memSize == -1) {
if (PathUtils.isInMemory(path)) {
val tempPath = PathUtils.createTmpFilePath(this.uuid.toString, getExtension)
writeToPath(tempPath)
this.refresh()
val size = Files.size(Paths.get(tempPath))
Files.delete(Paths.get(tempPath))
size
} else {
Files.size(Paths.get(path))
}
} else {
Files.size(Paths.get(path))
memSize
}

}

/**
Expand Down Expand Up @@ -266,13 +287,13 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
* usable again.
*/
override def refresh(): Unit = {
this.raster = gdal.Open(path, GA_ReadOnly)
this.raster = openRaster(path)
}

override def uuid: Long = _uuid

override def getExtension: String = {
val driver = raster.GetDriver()
val driver = gdal.GetDriverByName(driverShortName)
val extension = driver.GetMetadataItem("DMD_EXTENSION")
extension
}
Expand Down Expand Up @@ -303,24 +324,66 @@ class MosaicRasterGDAL(_uuid: Long, var raster: Dataset, path: String, isTemp: B
writeToPath(temp)
if (PathUtils.isInMemory(path)) RasterCleaner.dispose(this)
else this.destroy()
val ds = gdal.Open(temp, GA_ReadOnly)
MosaicRasterGDAL(ds, temp, isTemp = true)
val ds = openRaster(temp)
MosaicRasterGDAL(ds, temp, isTemp = true, parentPath, driverShortName, memSize)
}

override def getSubdataset(subsetName: String): MosaicRaster = {
val path = Option(raster.GetMetadata_Dict("SUBDATASETS"))
.map(_.asScala.toMap.asInstanceOf[Map[String, String]])
.getOrElse(Map.empty[String, String])
.values
.find(_.toUpperCase(Locale.ROOT).endsWith(subsetName.toUpperCase(Locale.ROOT)))
.getOrElse(throw new Exception(s"Subdataset $subsetName not found"))
val ds = openRaster(path)
// Avoid costly IO to compute MEM size here
// It will be available when the raster is serialized for next operation
// If value is needed then it will be computed when getMemSize is called
MosaicRasterGDAL(ds, path, isTemp = false, parentPath, driverShortName, -1)
}

}

//noinspection ZeroIndexToHead
object MosaicRasterGDAL extends RasterReader {

def apply(dataset: Dataset, path: String, isTemp: Boolean): MosaicRasterGDAL = {
def openRaster(path: String, driverShortName: String): Dataset = {
val drivers = new JVector[String]()
drivers.add(driverShortName)
gdal.OpenEx(path, GA_ReadOnly, drivers)
}

def indentifyDriver(parentPath: String): String = {
val isSubdataset = PathUtils.isSubdataset(parentPath)
val path = PathUtils.getCleanPath(parentPath, parentPath.endsWith(".zip"))
val readPath =
if (isSubdataset) PathUtils.getSubdatasetPath(path)
else PathUtils.getZipPath(path)
val dataset = gdal.Open(readPath, GA_ReadOnly)
val driver = dataset.GetDriver()
val driverShortName = driver.getShortName
dataset.delete()
driver.delete()
driverShortName
}

def apply(
dataset: Dataset,
path: String,
isTemp: Boolean,
parentPath: String,
driverShortName: String,
memSize: Long
): MosaicRasterGDAL = {
val uuid = Murmur3.hash64(path.getBytes())
val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp)
val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp, parentPath, driverShortName, memSize)
raster
}

def apply(path: String, isTemp: Boolean): MosaicRasterGDAL = {
def apply(path: String, isTemp: Boolean, parentPath: String, driverShortName: String, memSize: Long): MosaicRasterGDAL = {
val uuid = Murmur3.hash64(path.getBytes())
val raster = new MosaicRasterGDAL(uuid, gdal.Open(path, GA_ReadOnly), path, isTemp)
val dataset = openRaster(path, driverShortName)
val raster = new MosaicRasterGDAL(uuid, dataset, path, isTemp, parentPath, driverShortName, memSize)
raster
}

Expand All @@ -336,39 +399,46 @@ object MosaicRasterGDAL extends RasterReader {
* @return
* A MosaicRaster object.
*/
override def readRaster(inPath: String): MosaicRaster = {
override def readRaster(inPath: String, parentPath: String, driverShortName: String, readDirect: Boolean = false): MosaicRaster = {
val isSubdataset = PathUtils.isSubdataset(inPath)
val localCopy = PathUtils.copyToTmp(inPath)
val localCopy = if (readDirect) inPath else PathUtils.copyToTmp(inPath)
val path = PathUtils.getCleanPath(localCopy, localCopy.endsWith(".zip"))

val uuid = Murmur3.hash64(path.getBytes())
val readPath =
if (isSubdataset) PathUtils.getSubdatasetPath(path)
else PathUtils.getZipPath(path)
val dataset = gdal.Open(readPath, GA_ReadOnly)
val raster = new MosaicRasterGDAL(uuid, dataset, path, true)
val dataset = openRaster(readPath, driverShortName)

// Avoid costly IO to compute MEM size here
// It will be available when the raster is serialized for next operation
// If value is needed then it will be computed when getMemSize is called
// We cannot just use memSize value of the parent due to the fact that the raster could be a subdataset
val raster = new MosaicRasterGDAL(uuid, dataset, path, true, parentPath, driverShortName, -1)
raster
}

override def readRaster(contentBytes: Array[Byte]): MosaicRaster = {
override def readRaster(contentBytes: Array[Byte], parentPath: String, driverShortName: String): MosaicRaster = {
if (Option(contentBytes).isEmpty || contentBytes.isEmpty) {
// Handle empty rasters, easy check to -1L as UUID for empty rasters
new MosaicRasterGDAL(-1L, null, "", false)
new MosaicRasterGDAL(-1L, null, "", false, parentPath, "", -1)
} else {
// This is a temp UUID for purposes of reading the raster through GDAL from memory
// The stable UUID is kept in metadata of the raster
val uuid = Murmur3.hash64(UUID.randomUUID().toString.getBytes())
val extension = "tif"
val extension = GDAL.getExtension(driverShortName)
val virtualPath = s"/vsimem/$uuid.$extension"
gdal.FileFromMemBuffer(virtualPath, contentBytes)
// Try reading as a virtual file, if that fails, read as a zipped virtual file
val dataset = Option(gdal.Open(virtualPath, GA_ReadOnly)).getOrElse({
val dataset = Option(
openRaster(virtualPath, driverShortName)
).getOrElse({
val virtualPath = s"/vsimem/$uuid.zip"
val zippedPath = s"/vsizip/$virtualPath"
gdal.FileFromMemBuffer(virtualPath, contentBytes)
gdal.Open(zippedPath, GA_ReadOnly)
openRaster(zippedPath, driverShortName)
})
val raster = new MosaicRasterGDAL(uuid, dataset, virtualPath, false)
val raster = new MosaicRasterGDAL(uuid, dataset, virtualPath, false, parentPath, driverShortName, contentBytes.length)
raster
}
}
Expand All @@ -387,8 +457,8 @@ object MosaicRasterGDAL extends RasterReader {
* @return
* A MosaicRaster object.
*/
override def readBand(path: String, bandIndex: Int): MosaicRasterBand = {
val raster = readRaster(path)
override def readBand(path: String, bandIndex: Int, parentPath: String, driverShortName: String): MosaicRasterBand = {
val raster = readRaster(path, parentPath, driverShortName)
raster.getBand(bandIndex)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.mosaic.core.raster.gdal_raster

import com.databricks.labs.mosaic.core.raster.MosaicRaster
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
import org.gdal.gdal.Dataset

trait RasterCleaner {
Expand Down Expand Up @@ -30,11 +31,14 @@ object RasterCleaner {

def dispose(raster: Any): Unit = {
raster match {
case r: MosaicRaster =>
case r: MosaicRaster =>
r.destroy()
r.cleanUp()
case rt: MosaicRasterTile =>
rt.raster.destroy()
rt.raster.cleanUp()
// NOOP for simpler code handling in expressions, removes need for repeated if/else
case _ => ()
case _ => ()
}
}

Expand Down
Loading

0 comments on commit 3d6b648

Please sign in to comment.