Skip to content

Commit

Permalink
feat: Add experimental support for Apache Spark 3.5.1 (apache#587)
Browse files Browse the repository at this point in the history
* add profile

* fix for LegacyBehaviorPolicy

* fix 3.5 ShimCometScanExec

* builds with Spark 3.5

* fix builds

* use correct parquet version

* make docs more explicit

* bug fix

* remove use of reflection

* fix

* fix 4.0 build

* allow different stability plans for 3.5

* copy approved plans from 3.x to 3.5

* regenerate golden files for 3.5

* enable CI test

* fix merge conflict

* remove unused imports

* Refine shim

* remove some uses of reflection

* refine shim

* remove unused code

* remove unused imports

* add isTimestampNTZType to 3.5 shim

* address feedback

* remove unused imports

* address feedback
  • Loading branch information
andygrove authored Jun 20, 2024
1 parent a4b968e commit 28309a4
Show file tree
Hide file tree
Showing 287 changed files with 58,341 additions and 38 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
os: [ubuntu-latest]
java_version: [8, 11, 17]
test-target: [rust, java]
spark-version: ['3.4']
spark-version: ['3.5']
scala-version: ['2.12', '2.13']
is_push_event:
- ${{ github.event_name == 'push' }}
Expand Down Expand Up @@ -109,7 +109,7 @@ jobs:
os: [ubuntu-latest]
java_version: [8, 11, 17]
test-target: [java]
spark-version: ['3.3']
spark-version: ['3.3', '3.4']
scala-version: ['2.12', '2.13']
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }}
Expand All @@ -134,7 +134,7 @@ jobs:
os: [macos-13]
java_version: [8, 11, 17]
test-target: [rust, java]
spark-version: ['3.4']
spark-version: ['3.4', '3.5']
scala-version: ['2.12', '2.13']
fail-fast: false
if: github.event_name == 'push'
Expand All @@ -161,7 +161,7 @@ jobs:
matrix:
java_version: [8, 11, 17]
test-target: [rust, java]
spark-version: ['3.4']
spark-version: ['3.4', '3.5']
scala-version: ['2.12', '2.13']
is_push_event:
- ${{ github.event_name == 'push' }}
Expand Down Expand Up @@ -247,7 +247,7 @@ jobs:
matrix:
java_version: [8, 17]
test-target: [java]
spark-version: ['3.3']
spark-version: ['3.3', '3.4']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 8
Expand Down
1 change: 1 addition & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ under the License.
<sources>
<source>src/main/${shims.majorVerSrc}</source>
<source>src/main/${shims.minorVerSrc}</source>
<source>src/main/${shims.pre35Src}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,13 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {

// TODO: remove after dropping Spark 3.3 support and directly call PartitionedFile
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
classOf[PartitionedFile].getDeclaredConstructors
.map(c =>
c.getParameterCount match {
case 5 =>
c.newInstance(
partitionValues,
file,
Long.box(-1), // -1 means we read the entire file
Long.box(-1),
Array.empty[String])
case 7 =>
c.newInstance(
partitionValues,
c.getParameterTypes()(1)
.getConstructor(classOf[String])
.newInstance(file)
.asInstanceOf[AnyRef],
Long.box(-1), // -1 means we read the entire file
Long.box(-1),
Array.empty[String],
Long.box(0),
Long.box(0))
})
.head
.asInstanceOf[PartitionedFile]
PartitionedFile(
partitionValues,
file,
-1, // -1 means we read the entire file
-1,
Array.empty[String],
0,
0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {

def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
PartitionedFile(
partitionValues,
SparkPath.fromPathString(file),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
0,
0)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {

def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
PartitionedFile(
partitionValues,
SparkPath.fromPathString(file),
-1, // -1 means we read the entire file
-1,
Array.empty[String],
0,
0,
Map.empty)
}
14 changes: 13 additions & 1 deletion docs/source/user-guide/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@ The following diagram illustrates the architecture of Comet:

## Current Status

The project is currently integrated into Apache Spark 3.3, and 3.4.
Comet currently supports the following versions of Apache Spark:

- 3.3.x
- 3.4.x

Experimental support is provided for the following versions of Apache Spark and is intended for development/testing
use only and should not be used in production yet.

- 3.5.x
- 4.0.0-preview1

Note that Comet may not fully work with proprietary forks of Apache Spark such as the Spark versions offered by
Cloud Service Providers.

## Feature Parity with Apache Spark

Expand Down
20 changes: 20 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ under the License.
<argLine>-ea -Xmx4g -Xss4m ${extraJavaTestArgs}</argLine>
<additional.3_3.test.source>spark-3.3-plus</additional.3_3.test.source>
<additional.3_4.test.source>spark-3.4-plus</additional.3_4.test.source>
<additional.3_5.test.source>not-needed</additional.3_5.test.source>
<additional.pre35.test.source>spark-pre-3.5</additional.pre35.test.source>
<shims.majorVerSrc>spark-3.x</shims.majorVerSrc>
<shims.minorVerSrc>spark-3.4</shims.minorVerSrc>
<shims.pre35Src>spark-pre-3.5</shims.pre35Src>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -547,6 +550,21 @@ under the License.
</properties>
</profile>

<profile>
<!-- FIXME: this is WIP. Tests may fail -->
<id>spark-3.5</id>
<properties>
<scala.version>2.12.18</scala.version>
<spark.version>3.5.1</spark.version>
<spark.version.short>3.5</spark.version.short>
<parquet.version>1.13.1</parquet.version>
<shims.minorVerSrc>spark-3.5</shims.minorVerSrc>
<shims.pre35Src>not-needed</shims.pre35Src>
<additional.pre35.test.source>not-needed</additional.pre35.test.source>
<additional.3_5.test.source>spark-3.5</additional.3_5.test.source>
</properties>
</profile>

<profile>
<!-- FIXME: this is WIP. Tests may fail https://github.com/apache/datafusion-comet/issues/551 -->
<id>spark-4.0</id>
Expand All @@ -561,6 +579,8 @@ under the License.
<slf4j.version>2.0.13</slf4j.version>
<shims.majorVerSrc>spark-4.0</shims.majorVerSrc>
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
<shims.pre35Src>not-needed</shims.pre35Src>
<additional.pre35.test.source>not-needed</additional.pre35.test.source>
<!-- Use jdk17 by default -->
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand Down
3 changes: 3 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ under the License.
<sources>
<source>src/test/${additional.3_3.test.source}</source>
<source>src/test/${additional.3_4.test.source}</source>
<source>src/test/${additional.3_5.test.source}</source>
<source>src/test/${additional.pre35.test.source}</source>
<source>src/test/${shims.majorVerSrc}</source>
<source>src/test/${shims.minorVerSrc}</source>
</sources>
Expand All @@ -267,6 +269,7 @@ under the License.
<sources>
<source>src/main/${shims.majorVerSrc}</source>
<source>src/main/${shims.minorVerSrc}</source>
<source>src/main/${shims.pre35Src}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,10 @@ object CometSparkSessionExtensions extends Logging {
org.apache.spark.SPARK_VERSION >= "3.4"
}

def isSpark35Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "3.5"
}

def isSpark40Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "4.0"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.comet.shims

import org.apache.comet.expressions.CometEvalMode
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, TimestampNTZType}

/**
* `CometExprShim` acts as a shim for for parsing expressions from different Spark versions.
*/
trait CometExprShim {
/**
* Returns a tuple of expressions for the `unhex` function.
*/
protected def unhexSerde(unhex: Unhex): (Expression, Expression) = {
(unhex.child, Literal(unhex.failOnError))
}

protected def isTimestampNTZType(dt: DataType): Boolean = dt match {
case _: TimestampNTZType => true
case _ => false
}

protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)
}

object CometEvalModeUtil {
def fromSparkEvalMode(evalMode: EvalMode.Value): CometEvalMode.Value = evalMode match {
case EvalMode.LEGACY => CometEvalMode.LEGACY
case EvalMode.TRY => CometEvalMode.TRY
case EvalMode.ANSI => CometEvalMode.ANSI
}
}

36 changes: 36 additions & 0 deletions spark/src/main/spark-3.5/org/apache/comet/shims/ShimSQLConf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.shims

import org.apache.spark.sql.internal.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.SQLConf

trait ShimSQLConf {

/**
* Spark 3.4 renamed parquetFilterPushDownStringStartWith to
* parquetFilterPushDownStringPredicate
*/
protected def getPushDownStringPredicate(sqlConf: SQLConf): Boolean =
sqlConf.parquetFilterPushDownStringPredicate

protected val LEGACY = LegacyBehaviorPolicy.LEGACY
protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED
}
Loading

0 comments on commit 28309a4

Please sign in to comment.