Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Spark-compatible cast between decimals with different precision and scale #375

Open
Tracked by #286
andygrove opened this issue May 3, 2024 · 9 comments
Labels
enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed

Comments

@andygrove
Copy link
Member

What is the problem the feature request solves?

Comet is not consistent with Spark when casting between decimals. Here is a test to demonstrate this.

  test("cast between decimals with different precision and scale") {
    val rowData = Seq(
      Row(BigDecimal("12345.6789")),
      Row(BigDecimal("9876.5432")),
      Row(BigDecimal("123.4567"))
    )
    val df = spark.createDataFrame(
      spark.sparkContext.parallelize(rowData),
      StructType(Seq(StructField("a", DataTypes.createDecimalType(10,4))))
    )
    castTest(df, DataTypes.createDecimalType(6,2))
  }

Spark Result

+----------+-------+
|         a|      a|
+----------+-------+
|  123.4567| 123.46|
| 9876.5432|9876.54|
|12345.6789|   null|
+----------+-------+

Comet Result

java.lang.ArithmeticException: Cannot convert 12345.68 (bytes: [B@4f834a43, integer: 1234568) to decimal with precision: 6 and scale: 2
	at org.apache.comet.vector.CometVector.getDecimal(CometVector.java:86)

Describe the potential solution

No response

Additional context

No response

@viirya
Copy link
Member

viirya commented May 3, 2024

This looks simple to fix. We currently throw an exception if cannot convert the byes to decimal, but looks like Spark returns null.

@caicancai
Copy link
Member

@andygrove Do you mind letting me try it? In my spare time, I have been fixing the compatibility issues of calcite in spark sql type conversion.

@viirya
Copy link
Member

viirya commented May 6, 2024

@caicancai Thanks. Please go ahead to create a PR for the open tickets which no one claims working on it.

@caicancai
Copy link
Member

I am working on it.

@himadripal
Copy link
Contributor

himadripal commented Nov 14, 2024

I was trying to take a look at this one - I added this test in the CometCastSuite -

test("cast between decimals with different precision and scale") {
    val df = generateDecimalsPrecision38Scale18()
    val df1 = df.withColumn("b", col("a").cast(DataTypes.createDecimalType(10, 2)))
    df1.show(false)
    castTest(generateDecimalsPrecision38Scale18(), DataTypes.createDecimalType(10, 2))
  }

It gives me result like this

+----------------------------------------+----------+
|a                                       |b         |
+----------------------------------------+----------+
|-99999999999999999999.999999999999000000|null      |
|-9223372036854775808.234567000000000000 |null      |
|-9223372036854775807.123123000000000000 |null      |
|-2147483648.123123123000000000          |null      |
|-2147483647.123123123000000000          |null      |
|-123456.789000000000000000              |-123456.79|
|0E-18                                   |0.00      |
|123456.789000000000000000               |123456.79 |
|2147483647.123123123000000000           |null      |
|2147483648.123123123000000000           |null      |
|9223372036854775807.123123000000000000  |null      |
|9223372036854775808.234567000000000000  |null      |
|99999999999999999999.999999999999000000 |null      |
|null                                    |null      |
+----------------------------------------+----------+

But castTest fails with the following assertion error -

Expected only Comet native operators, but found Sort.
plan: Sort [a#30 ASC NULLS FIRST], true, 0
+- Project [a#30, cast(a#30 as decimal(10,2)) AS a#32]
   +- CometCoalesce Coalesce 1, [a#30], 1
      +- CometScan parquet [a#30] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/jx/23vwhfzn2ts493_2twyz1dpc0000gn/T/spark-5f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(38,18)>

Also this code inside the test(....) produces following plan

spark.sql(s"select a, cast(a as decimal(10,2)) from t2 order by a").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CometSort [a#4, a#9], [a#4 ASC NULLS FIRST]
   +- CometColumnarExchange rangepartitioning(a#4 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=13]
      +- LocalTableScan [a#4, a#9]

Update : Figured out that, removing .coalesce(1) makes the plan CometSort but assertion fails in Project now

val data = roundtripParquet(input, dir)\\.coalesce(1)

@viirya @andygrove can you provide guidance here on how to proceed ?

@andygrove
Copy link
Member Author

Thanks for looking into this @himadripal. The issue is that the projection is falling back to Spark because org.apache.comet.expressions.CometCast#isSupported is either returning Unsupported or Incompatible for a cast between decimal types, so the first step is to update this method to say that this cast is now supported.

@andygrove
Copy link
Member Author

You may want to set spark.comet.explainFallback.enabled=true as well so that you can see the reason why queries are falling back to Spark.

@himadripal
Copy link
Contributor

thank you @andygrove for the guidance and tip. I'll explore spark.comet.explainFallback.enabled=true as well.

@himadripal
Copy link
Contributor

this arrow pr will fix this issue completely. waiting for arrow release and then Datafusion release later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

4 participants