Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds SQL support for Configurable Table Snapshot History #262

Merged
merged 10 commits into from
Dec 19, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.linkedin.openhouse.spark.statementtest;

import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException;
import java.nio.file.Files;
import java.util.Optional;
import lombok.SneakyThrows;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ExplainMode;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SetHistoryPolicyStatementTest {
private static SparkSession spark = null;

@SneakyThrows
@BeforeAll
public void setupSpark() {
Path unittest = new Path(Files.createTempDirectory("unittest_settablepolicy").toString());
spark =
SparkSession.builder()
.master("local[2]")
.config(
"spark.sql.extensions",
("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
+ "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions"))
.config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.openhouse.type", "hadoop")
.config("spark.sql.catalog.openhouse.warehouse", unittest.toString())
.getOrCreate();
}

@Test
public void testSetHistoryPolicyGood() {
// Validate setting only time setting
Dataset<Row> ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24H)");
assert isPlanValid(ds, "db.table", Optional.of("24"), Optional.of("HOUR"), Optional.empty());

ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10)");
assert isPlanValid(ds, "db.table", Optional.empty(), Optional.empty(), Optional.of("10"));

// Validate both time and count setting
ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D VERSIONS=20)");
assert isPlanValid(ds, "db.table", Optional.of("2"), Optional.of("DAY"), Optional.of("20"));
}

@Test
public void testSetHistoryPolicyIncorrectSyntax() {
// No time granularity
Assertions.assertThrows(
OpenhouseParseException.class,
() -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24)").show());

// Count before time
Assertions.assertThrows(
OpenhouseParseException.class,
() ->
spark
.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10 MAX_AGE=24H)")
.show());

// No time or count
Assertions.assertThrows(
OpenhouseParseException.class,
() -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY )").show());
}

@BeforeEach
public void setup() {
spark.sql("CREATE TABLE openhouse.db.table (id bigint, data string) USING iceberg").show();
spark.sql("CREATE TABLE openhouse.0_.0_ (id bigint, data string) USING iceberg").show();
spark
.sql("ALTER TABLE openhouse.db.table SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')")
.show();
spark
.sql("ALTER TABLE openhouse.0_.0_ SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')")
.show();
}

@AfterEach
public void tearDown() {
spark.sql("DROP TABLE openhouse.db.table").show();
spark.sql("DROP TABLE openhouse.0_.0_").show();
}

@AfterAll
public void tearDownSpark() {
spark.close();
}

@SneakyThrows
private boolean isPlanValid(
Dataset<Row> dataframe,
String dbTable,
Optional<String> maxAge,
Optional<String> granularity,
Optional<String> versions) {
String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple"));
return queryStr.contains(dbTable)
&& (!maxAge.isPresent() || queryStr.contains(maxAge.get()))
&& (!granularity.isPresent() || queryStr.contains(granularity.get()))
&& (!versions.isPresent() || queryStr.contains(versions.get()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ statement
: ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' replicationPolicy ')' #setReplicationPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' historyPolicy ')' #setHistoryPolicy
| ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag
| GRANT privilege ON grantableResource TO principal #grantStatement
| REVOKE privilege ON grantableResource FROM principal #revokeStatement
| SHOW GRANTS ON grantableResource #showGrantsStatement
| SHOW GRANTS ON grantableResource #showGrantsStatement
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
;

multipartIdentifier
Expand Down Expand Up @@ -65,7 +66,7 @@ quotedIdentifier
;

nonReserved
: ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION
: ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION | HISTORY
| GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN
;

Expand All @@ -76,6 +77,7 @@ sharingPolicy
BOOLEAN
: 'TRUE' | 'FALSE'
;

retentionPolicy
: RETENTION '=' duration
;
Expand Down Expand Up @@ -153,12 +155,25 @@ policyTag
: PII | HC
;

historyPolicy
: HISTORY maxAge? versions?
;

maxAge
: MAX_AGE'='duration
;

versions
: VERSIONS'='POSITIVE_INTEGER
;

ALTER: 'ALTER';
TABLE: 'TABLE';
SET: 'SET';
POLICY: 'POLICY';
RETENTION: 'RETENTION';
REPLICATION: 'REPLICATION';
HISTORY: 'HISTORY';
SHARING: 'SHARING';
GRANT: 'GRANT';
REVOKE: 'REVOKE';
Expand All @@ -182,6 +197,8 @@ HC: 'HC';
MODIFY: 'MODIFY';
TAG: 'TAG';
NONE: 'NONE';
VERSIONS: 'VERSIONS';
MAX_AGE: 'MAX_AGE';

POSITIVE_INTEGER
: DIGIT+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions

import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes
import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType
import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec
import org.antlr.v4.runtime.tree.ParseTree
Expand All @@ -22,7 +22,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
val (granularity, count) = typedVisit[(String, Int)](ctx.retentionPolicy())
val (colName, colPattern) =
if (ctx.columnRetentionPolicy() != null)
typedVisit[(String, String)](ctx.columnRetentionPolicy())
typedVisit[(String, String)](ctx.columnRetentionPolicy())
else (null, null)
SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern))
}
Expand Down Expand Up @@ -128,8 +128,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}
}

override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): (String) = {
(ctx.retentionColumnPatternClause().STRING().getText)
override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): String = {
ctx.retentionColumnPatternClause().STRING().getText
}

override def visitSharingPolicy(ctx: SharingPolicyContext): String = {
Expand All @@ -149,7 +149,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}

override def visitDuration(ctx: DurationContext): (String, Int) = {
val granularity = if (ctx.RETENTION_DAY != null) {
val granularity: String = if (ctx.RETENTION_DAY != null) {
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
TimePartitionSpec.GranularityEnum.DAY.getValue()
} else if (ctx.RETENTION_YEAR() != null) {
TimePartitionSpec.GranularityEnum.YEAR.getValue()
Expand All @@ -158,13 +158,36 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
} else {
TimePartitionSpec.GranularityEnum.HOUR.getValue()
}

val count = ctx.getText.substring(0, ctx.getText.length - 1).toInt
(granularity, count)
}

override def visitSetHistoryPolicy(ctx: SetHistoryPolicyContext): SetHistoryPolicy = {
val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier)
val (granularity, maxAge, versions) = typedVisit[(Option[String], Int, Int)](ctx.historyPolicy())
SetHistoryPolicy(tableName, granularity, maxAge, versions)
}
override def visitHistoryPolicy(ctx: HistoryPolicyContext): (Option[String], Int, Int) = {
val maxAgePolicy = if (ctx.maxAge() != null)
typedVisit[(String, Int)](ctx.maxAge().duration())
else (null, -1)
val versionPolicy = if (ctx.versions() != null)
typedVisit[Int](ctx.versions())
else -1
if (maxAgePolicy._2 == -1 && versionPolicy == -1) {
throw new OpenhouseParseException("At least one of MAX_AGE or VERSIONS must be specified in HISTORY policy, e.g. " +
"ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D) or ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=3)",
ctx.start.getLine, ctx.start.getCharPositionInLine)
}
(Option(maxAgePolicy._1), maxAgePolicy._2, versionPolicy)
}

override def visitVersions(ctx: VersionsContext): Integer = {
ctx.POSITIVE_INTEGER().getText.toInt
}

private def toBuffer[T](list: java.util.List[T]) = list.asScala
private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq
private def toSeq[T](list: java.util.List[T]) = toBuffer(list).toSeq

private def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.plans.logical.Command

case class SetHistoryPolicy (tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends Command {
override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, SetHistoryPolicy, ShowGrantsStatement}
import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog}
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
Expand All @@ -17,6 +17,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w
SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil
case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) =>
SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil
case SetHistoryPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, maxAge, versions) =>
SetHistoryPolicyExec(catalog, ident, granularity, maxAge, versions) :: Nil
case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) =>
SetSharingPolicyExec(catalog, ident, sharing) :: Nil
case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec

case class SetHistoryPolicyExec(
catalog: TableCatalog,
ident: Identifier,
granularity: Option[String],
maxAge: Int,
versions: Int
) extends V2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
val key = "updated.openhouse.policy"
val value = {
(maxAge, versions) match {
case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}"""
case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}"""
case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}"""
}
}

iceberg.table().updateProperties()
.set(key, value)
.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.plans.logical.LeafCommand

case class SetHistoryPolicy(tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends LeafCommand {
override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec

case class SetHistoryPolicyExec(
catalog: TableCatalog,
ident: Identifier,
granularity: Option[String],
maxAge: Int,
versions: Int
) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
val key = "updated.openhouse.policy"
val value = {
(maxAge, versions) match {
case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}"""
case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}"""
case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}"""
}
}

iceberg.table().updateProperties()
.set(key, value)
.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}"
}
}
Loading