diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java new file mode 100644 index 00000000..9b8d7526 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java @@ -0,0 +1,112 @@ +package com.linkedin.openhouse.spark.statementtest; + +import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException; +import java.nio.file.Files; +import java.util.Optional; +import lombok.SneakyThrows; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ExplainMode; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SetHistoryPolicyStatementTest { + private static SparkSession spark = null; + + @SneakyThrows + @BeforeAll + public void setupSpark() { + Path unittest = new Path(Files.createTempDirectory("unittest_settablepolicy").toString()); + spark = + SparkSession.builder() + .master("local[2]") + .config( + "spark.sql.extensions", + ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," + + "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions")) + .config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.openhouse.type", "hadoop") + .config("spark.sql.catalog.openhouse.warehouse", unittest.toString()) + .getOrCreate(); + } + + @Test + public void testSetHistoryPolicyGood() { + // Validate setting only time setting + Dataset ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24H)"); + assert isPlanValid(ds, "db.table", Optional.of("24"), Optional.of("HOUR"), Optional.empty()); + + ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10)"); + assert isPlanValid(ds, "db.table", Optional.empty(), Optional.empty(), Optional.of("10")); + + // Validate both time and count setting + ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D VERSIONS=20)"); + assert isPlanValid(ds, "db.table", Optional.of("2"), Optional.of("DAY"), Optional.of("20")); + } + + @Test + public void testSetHistoryPolicyIncorrectSyntax() { + // No time granularity + Assertions.assertThrows( + OpenhouseParseException.class, + () -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24)").show()); + + // Count before time + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10 MAX_AGE=24H)") + .show()); + + // No time or count + Assertions.assertThrows( + OpenhouseParseException.class, + () -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY )").show()); + } + + @BeforeEach + public void setup() { + spark.sql("CREATE TABLE openhouse.db.table (id bigint, data string) USING iceberg").show(); + spark.sql("CREATE TABLE openhouse.0_.0_ (id bigint, data string) USING iceberg").show(); + spark + .sql("ALTER TABLE openhouse.db.table SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')") + .show(); + spark + .sql("ALTER TABLE openhouse.0_.0_ SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')") + .show(); + } + + @AfterEach + public void tearDown() { + spark.sql("DROP TABLE openhouse.db.table").show(); + spark.sql("DROP TABLE openhouse.0_.0_").show(); + } + + @AfterAll + public void tearDownSpark() { + spark.close(); + } + + @SneakyThrows + private boolean isPlanValid( + Dataset dataframe, + String dbTable, + Optional maxAge, + Optional granularity, + Optional versions) { + String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple")); + return queryStr.contains(dbTable) + && (!maxAge.isPresent() || queryStr.contains(maxAge.get())) + && (!granularity.isPresent() || queryStr.contains(granularity.get())) + && (!versions.isPresent() || queryStr.contains(versions.get())); + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index 435a77e6..27cb980f 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -26,10 +26,11 @@ statement : ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy | ALTER TABLE multipartIdentifier SET POLICY '(' replicationPolicy ')' #setReplicationPolicy | ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy + | ALTER TABLE multipartIdentifier SET POLICY '(' historyPolicy ')' #setHistoryPolicy | ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag | GRANT privilege ON grantableResource TO principal #grantStatement | REVOKE privilege ON grantableResource FROM principal #revokeStatement - | SHOW GRANTS ON grantableResource #showGrantsStatement + | SHOW GRANTS ON grantableResource #showGrantsStatement ; multipartIdentifier @@ -65,7 +66,7 @@ quotedIdentifier ; nonReserved - : ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION + : ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION | HISTORY | GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN ; @@ -76,6 +77,7 @@ sharingPolicy BOOLEAN : 'TRUE' | 'FALSE' ; + retentionPolicy : RETENTION '=' duration ; @@ -153,12 +155,25 @@ policyTag : PII | HC ; +historyPolicy + : HISTORY maxAge? versions? + ; + +maxAge + : MAX_AGE'='duration + ; + +versions + : VERSIONS'='POSITIVE_INTEGER + ; + ALTER: 'ALTER'; TABLE: 'TABLE'; SET: 'SET'; POLICY: 'POLICY'; RETENTION: 'RETENTION'; REPLICATION: 'REPLICATION'; +HISTORY: 'HISTORY'; SHARING: 'SHARING'; GRANT: 'GRANT'; REVOKE: 'REVOKE'; @@ -182,6 +197,8 @@ HC: 'HC'; MODIFY: 'MODIFY'; TAG: 'TAG'; NONE: 'NONE'; +VERSIONS: 'VERSIONS'; +MAX_AGE: 'MAX_AGE'; POSITIVE_INTEGER : DIGIT+ diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index 0619f834..8120d721 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._ -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec import org.antlr.v4.runtime.tree.ParseTree @@ -22,7 +22,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh val (granularity, count) = typedVisit[(String, Int)](ctx.retentionPolicy()) val (colName, colPattern) = if (ctx.columnRetentionPolicy() != null) - typedVisit[(String, String)](ctx.columnRetentionPolicy()) + typedVisit[(String, String)](ctx.columnRetentionPolicy()) else (null, null) SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern)) } @@ -128,8 +128,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } } - override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): (String) = { - (ctx.retentionColumnPatternClause().STRING().getText) + override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): String = { + ctx.retentionColumnPatternClause().STRING().getText } override def visitSharingPolicy(ctx: SharingPolicyContext): String = { @@ -149,7 +149,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } override def visitDuration(ctx: DurationContext): (String, Int) = { - val granularity = if (ctx.RETENTION_DAY != null) { + val granularity: String = if (ctx.RETENTION_DAY != null) { TimePartitionSpec.GranularityEnum.DAY.getValue() } else if (ctx.RETENTION_YEAR() != null) { TimePartitionSpec.GranularityEnum.YEAR.getValue() @@ -158,13 +158,36 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } else { TimePartitionSpec.GranularityEnum.HOUR.getValue() } - val count = ctx.getText.substring(0, ctx.getText.length - 1).toInt (granularity, count) } + override def visitSetHistoryPolicy(ctx: SetHistoryPolicyContext): SetHistoryPolicy = { + val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) + val (granularity, maxAge, versions) = typedVisit[(Option[String], Int, Int)](ctx.historyPolicy()) + SetHistoryPolicy(tableName, granularity, maxAge, versions) + } + override def visitHistoryPolicy(ctx: HistoryPolicyContext): (Option[String], Int, Int) = { + val maxAgePolicy = if (ctx.maxAge() != null) + typedVisit[(String, Int)](ctx.maxAge().duration()) + else (null, -1) + val versionPolicy = if (ctx.versions() != null) + typedVisit[Int](ctx.versions()) + else -1 + if (maxAgePolicy._2 == -1 && versionPolicy == -1) { + throw new OpenhouseParseException("At least one of MAX_AGE or VERSIONS must be specified in HISTORY policy, e.g. " + + "ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D) or ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=3)", + ctx.start.getLine, ctx.start.getCharPositionInLine) + } + (Option(maxAgePolicy._1), maxAgePolicy._2, versionPolicy) + } + + override def visitVersions(ctx: VersionsContext): Integer = { + ctx.POSITIVE_INTEGER().getText.toInt + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala - private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq + private def toSeq[T](list: java.util.List[T]) = toBuffer(list).toSeq private def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala new file mode 100644 index 00000000..99b42c9b --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.Command + +case class SetHistoryPolicy (tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends Command { + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}" + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala index 8545a2bc..e93f8a5b 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala @@ -1,6 +1,6 @@ package com.linkedin.openhouse.spark.sql.execution.datasources.v2 -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, SetHistoryPolicy, ShowGrantsStatement} import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -17,6 +17,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) => SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil + case SetHistoryPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, maxAge, versions) => + SetHistoryPolicyExec(catalog, ident, granularity, maxAge, versions) :: Nil case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) => SetSharingPolicyExec(catalog, ident, sharing) :: Nil case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) => diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala new file mode 100644 index 00000000..271f9e19 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala @@ -0,0 +1,45 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec + +case class SetHistoryPolicyExec( + catalog: TableCatalog, + ident: Identifier, + granularity: Option[String], + maxAge: Int, + versions: Int +) extends V2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = { + (maxAge, versions) match { + case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}""" + case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}""" + case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}""" + } + } + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}" + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala new file mode 100644 index 00000000..dc9899b9 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class SetHistoryPolicy(tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}" + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala new file mode 100644 index 00000000..c429d199 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala @@ -0,0 +1,45 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class SetHistoryPolicyExec( + catalog: TableCatalog, + ident: Identifier, + granularity: Option[String], + maxAge: Int, + versions: Int +) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = { + (maxAge, versions) match { + case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}""" + case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}""" + case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}""" + } + } + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}" + } +}