Skip to content

Commit

Permalink
Add option to enable metadata comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
zeotuan committed Oct 12, 2024
1 parent af6299a commit 1791243
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ trait DataFrameComparer extends DatasetComparer {
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true,
truncate: Int = 500
): Unit = {
assertSmallDatasetEquality(
Expand All @@ -22,6 +23,7 @@ trait DataFrameComparer extends DatasetComparer {
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder,
ignoreMetadata,
truncate
)
}
Expand All @@ -35,15 +37,17 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
assertLargeDatasetEquality(
actualDF,
expectedDF,
ignoreNullable = ignoreNullable,
ignoreColumnNames = ignoreColumnNames,
orderedComparison = orderedComparison,
ignoreColumnOrder = ignoreColumnOrder
ignoreColumnOrder = ignoreColumnOrder,
ignoreMetadata = ignoreMetadata
)
}

Expand All @@ -57,7 +61,8 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
assertSmallDatasetEquality[Row](
actualDF,
Expand All @@ -66,6 +71,7 @@ trait DataFrameComparer extends DatasetComparer {
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder,
ignoreMetadata,
equals = RowComparer.areRowsEqual(_, _, precision)
)
}
Expand All @@ -80,7 +86,8 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
assertLargeDatasetEquality[Row](
actualDF,
Expand All @@ -89,7 +96,8 @@ trait DataFrameComparer extends DatasetComparer {
ignoreNullable,
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder
ignoreColumnOrder,
ignoreMetadata
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true,
truncate: Int = 500,
equals: (T, T) => Boolean = (o1: T, o2: T) => o1.equals(o2)
): Unit = {
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
val actual = if (ignoreColumnOrder) orderColumns(actualDS, expectedDS) else actualDS
assertSmallDatasetContentEquality(actual, expectedDS, orderedComparison, truncate, equals)
}
Expand Down Expand Up @@ -98,10 +99,11 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
// first check if the schemas are equal
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
SchemaComparer.assertSchemaEqual(actualDS, expectedDS, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
val actual = if (ignoreColumnOrder) orderColumns(actualDS, expectedDS) else actualDS
assertLargeDatasetContentEquality(actual, expectedDS, equals, orderedComparison)
}
Expand Down Expand Up @@ -157,7 +159,8 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
orderedComparison: Boolean = true,
ignoreColumnOrder: Boolean = false
ignoreColumnOrder: Boolean = false,
ignoreMetadata: Boolean = true
): Unit = {
val e = (r1: Row, r2: Row) => {
r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, precision)
Expand All @@ -169,7 +172,8 @@ Expected DataFrame Row Count: '$expectedCount'
ignoreNullable,
ignoreColumnNames,
orderedComparison,
ignoreColumnOrder
ignoreColumnOrder,
ignoreMetadata
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ object SchemaComparer {
expectedDS: Dataset[T],
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
ignoreColumnOrder: Boolean = true
ignoreColumnOrder: Boolean = true,
ignoreMetadata: Boolean = true
): Unit = {
require((ignoreColumnNames, ignoreColumnOrder) != (true, true), "Cannot set both ignoreColumnNames and ignoreColumnOrder to true.")
if (!SchemaComparer.equals(actualDS.schema, expectedDS.schema, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)) {
if (!SchemaComparer.equals(actualDS.schema, expectedDS.schema, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)) {
throw DatasetSchemaMismatch(
betterSchemaMismatchMessage(actualDS, expectedDS)
)
Expand All @@ -46,7 +47,8 @@ object SchemaComparer {
s2: StructType,
ignoreNullable: Boolean = false,
ignoreColumnNames: Boolean = false,
ignoreColumnOrder: Boolean = true
ignoreColumnOrder: Boolean = true,
ignoreMetadata: Boolean = true
): Boolean = {
if (s1.length != s2.length) {
false
Expand All @@ -55,24 +57,33 @@ object SchemaComparer {
false
} else {
val zipStruct = if (ignoreColumnOrder) s1.sortBy(_.name) zip s2.sortBy(_.name) else s1 zip s2
zipStruct.forall { t =>
(t._1.nullable == t._2.nullable || ignoreNullable) &&
(t._1.name == t._2.name || ignoreColumnNames) &&
equals(t._1.dataType, t._2.dataType, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
zipStruct.forall { case (f1, f2) =>
(f1.nullable == f1.nullable || ignoreNullable) &&
(f1.name == f2.name || ignoreColumnNames) &&
(f1.name == f2.name || ignoreColumnNames) &&
(f1.metadata == f2.metadata || ignoreMetadata) &&
equals(f1.dataType, f2.dataType, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
}
}
}
}

def equals(dt1: DataType, dt2: DataType, ignoreNullable: Boolean, ignoreColumnNames: Boolean, ignoreColumnOrder: Boolean): Boolean = {
(ignoreNullable, dt1, dt2) match {
case (ignoreNullable, st1: StructType, st2: StructType) if ignoreNullable || ignoreColumnOrder =>
def equals(
dt1: DataType,
dt2: DataType,
ignoreNullable: Boolean,
ignoreColumnNames: Boolean,
ignoreColumnOrder: Boolean,
ignoreMetadata: Boolean
): Boolean = {
(dt1, dt2) match {
case (st1: StructType, st2: StructType) =>
equals(st1, st2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
case (true, ArrayType(vdt1, _), ArrayType(vdt2, _)) =>
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
case (true, MapType(kdt1, vdt1, _), MapType(kdt2, vdt2, _)) =>
equals(kdt1, kdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder) &&
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)
case (ArrayType(vdt1, _), ArrayType(vdt2, _)) =>
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
case (MapType(kdt1, vdt1, _), MapType(kdt2, vdt2, _)) =>
equals(kdt1, kdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata) &&
equals(vdt1, vdt2, ignoreNullable, ignoreColumnNames, ignoreColumnOrder, ignoreMetadata)
case _ => dt1 == dt2
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.mrpowers.spark.fast.tests

import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType}
import org.apache.spark.sql.types.{DoubleType, IntegerType, MetadataBuilder, StringType}
import SparkSessionExt._
import com.github.mrpowers.spark.fast.tests.SchemaComparer.DatasetSchemaMismatch
import com.github.mrpowers.spark.fast.tests.StringExt.StringOps
Expand Down Expand Up @@ -310,6 +310,56 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar
)
assertLargeDataFrameEquality(sourceDF, expectedDF, ignoreColumnOrder = true)
}

"can performed Dataset comparisons and ignore metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small int").build())

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small number").build())

assertLargeDataFrameEquality(sourceDF, expectedDF)
}

"can performed Dataset comparisons and compare metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small int").build())

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small number").build())

intercept[DatasetSchemaMismatch] {
assertLargeDataFrameEquality(sourceDF, expectedDF, ignoreMetadata = false)
}
}
}

"assertApproximateDataFrameEquality" - {
Expand Down Expand Up @@ -457,6 +507,56 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar

assertApproximateDataFrameEquality(ds1, ds2, precision = 0.0000001, orderedComparison = false)
}

"can performed Dataset comparisons and ignore metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small int").build())

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small number").build())

assertApproximateDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001)
}

"can performed Dataset comparisons and compare metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small int").build())

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small number").build())

intercept[DatasetSchemaMismatch] {
assertApproximateDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001, ignoreMetadata = false)
}
}
}

"assertApproximateSmallDataFrameEquality" - {
Expand Down Expand Up @@ -604,5 +704,55 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar

assertApproximateSmallDataFrameEquality(ds1, ds2, precision = 0.0000001, orderedComparison = false)
}

"can performed Dataset comparisons and ignore metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small int").build())

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small number").build())

assertApproximateSmallDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001)
}

"can performed Dataset comparisons and compare metadata" in {
val sourceDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small int").build())

val expectedDF = spark
.createDF(
List(
1,
5
),
List(("number", IntegerType, true))
)
.withMetadata("number", new MetadataBuilder().putString("description", "small number").build())

intercept[DatasetSchemaMismatch] {
assertApproximateSmallDataFrameEquality(sourceDF, expectedDF, precision = 0.0000001, ignoreMetadata = false)
}
}
}
}
Loading

0 comments on commit 1791243

Please sign in to comment.