Skip to content

Commit

Permalink
Add validators for History / Snapshot Retention Policy (#259)
Browse files Browse the repository at this point in the history
## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

Supports defining a policy to control the versions of snapshots tables
to retain.
The policy will support 2 types of configurations:
1. Time-based (e.g. 3 days)
2. Count-based (e.g. 10 versions)

It can also support a combination of the 2 policies, in which case it
will retain versions that fall under both categories (e.g. only keep
versions newer than 3 days AND within 10 versions).

This PR also refactors the PolicySpecValidator to accurately depict that
it's referencing to partition-based retention of tables. In the future,
we should refactor these policy classes to follow some `Policy`
interface for better standardization for future policy support
(duplicated interfaces between Replication, Partition Retention, Retain
Snapshot).

This PR also defines the maximums for snapshot retention defaults (TODO:
make this configurable/static?)

**The current maximums defined is 3 days and 100 versions**


## Changes
- [x] Client-facing API Changes
- [x] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [x] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Ran on local docker and tested using postman by creating a new table
with the following:
```
{
    "tableId": "t2",
    "databaseId": "d3",
    "baseTableVersion": "INITIAL_VERSION",
    "clusterId": "LocalFSCluster",
    "schema": "{\"type\": \"struct\", \"fields\": [{\"id\": 1,\"required\": true,\"name\": \"id\",\"type\": \"string\"},{\"id\": 2,\"required\": true,\"name\": \"name\",\"type\": \"string\"},{\"id\": 3,\"required\": true,\"name\": \"ts\",\"type\": \"timestamp\"}]}",
    "tableProperties": {
        "key": "value"
    },
    "policies": {
        "sharingEnabled": "true",
        "history": {"maxAge": 1, "granularity": "DAY", "versions": 2}
    }
}
```
Tested getTable after and confirmed the policy was showing correctly.

Also tested the negative cases (high version count, negative numbers)
which worked as well.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
Will-Lo authored Dec 20, 2024
1 parent 059d810 commit 92bce12
Show file tree
Hide file tree
Showing 13 changed files with 507 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.openhouse.gen.tables.client.api.SnapshotApi;
import com.linkedin.openhouse.gen.tables.client.api.TableApi;
import com.linkedin.openhouse.gen.tables.client.model.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.gen.tables.client.model.History;
import com.linkedin.openhouse.gen.tables.client.model.Policies;
import com.linkedin.openhouse.gen.tables.client.model.PolicyTag;
import com.linkedin.openhouse.gen.tables.client.model.Retention;
Expand Down Expand Up @@ -344,4 +345,62 @@ public void testPoliciesReplicationExistsUpdateExistsForMultiple() {
updatedPolicies.getReplication().getConfig().get(1).getInterval(), "2D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 2);
}

@Test
public void testPoliciesHistoryInMetadataNoUpdate() {
Map<String, String> props = new HashMap<>();
props.put(
"policies",
"{\"history\": {\"maxAge\": \"1\", \"granularity\": \"DAY\", \"versions\": \"2\"}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(1, updatedPolicies.getHistory().getMaxAge());
Assertions.assertEquals(
History.GranularityEnum.DAY, updatedPolicies.getHistory().getGranularity());
Assertions.assertEquals(2, updatedPolicies.getHistory().getVersions());
}

@Test
public void testNoPoliciesHistoryExistsButUpdateExists() {
Map<String, String> props = new HashMap<>();
props.put(
"updated.openhouse.policy",
"{\"history\": {\"maxAge\": \"1\", \"granularity\": \"DAY\", \"versions\": \"2\"}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(1, updatedPolicies.getHistory().getMaxAge());
Assertions.assertEquals(
History.GranularityEnum.DAY, updatedPolicies.getHistory().getGranularity());
Assertions.assertEquals(2, updatedPolicies.getHistory().getVersions());
}

@Test
public void testPoliciesHistoryExistsUpdate() {
Map<String, String> props = new HashMap<>();
props.put(
"openhouse.policy",
"{\"history\": {\"maxAge\": \"2\", \"granularity\": \"HOUR\", \"versions\": \"3\"}}");
props.put(
"updated.openhouse.policy",
"{\"history\": {\"maxAge\": \"1\", \"granularity\": \"DAY\", \"versions\": \"2\"}, \"sharingEnabled\": true}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(1, updatedPolicies.getHistory().getMaxAge());
Assertions.assertEquals(
History.GranularityEnum.DAY, updatedPolicies.getHistory().getGranularity());
Assertions.assertEquals(2, updatedPolicies.getHistory().getVersions());
Assertions.assertEquals(true, updatedPolicies.getSharingEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ Policies buildUpdatedPolicies(TableMetadata metadata) {
if (patchUpdatedPolicy.getRetention() != null) {
policies.setRetention(patchUpdatedPolicy.getRetention());
}

// Update sharing config
if (patchUpdatedPolicy.getSharingEnabled() != null) {
policies.sharingEnabled(patchUpdatedPolicy.getSharingEnabled());
Expand All @@ -215,6 +216,11 @@ Policies buildUpdatedPolicies(TableMetadata metadata) {
if (patchUpdatedPolicy.getReplication() != null) {
policies.replication(patchUpdatedPolicy.getReplication());
}
// Update history config
if (patchUpdatedPolicy.getHistory() != null) {
policies.setHistory(patchUpdatedPolicy.getHistory());
}

return policies;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import io.swagger.v3.oas.annotations.media.Schema;
import javax.validation.Valid;
import javax.validation.constraints.PositiveOrZero;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder(toBuilder = true)
@EqualsAndHashCode
@Getter
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class History {
@Schema(
description = "Time period in count <granularity> to keep the snapshot history on the table",
example = "3,4,5")
@PositiveOrZero(
message =
"Incorrect count specified. history.maxAge has to be a positive integer or zero if undefined")
@Valid
int maxAge;

@Schema(description = "time period granularity for the snapshot history", example = "hour, day")
@Valid
TimePartitionSpec.Granularity granularity;

@Schema(
description =
"Number of snapshots to keep within history for the table after snapshot expiration",
example = "3,4,5")
@PositiveOrZero(
message =
"Incorrect count specified. history.versions has to be a positive integer or zero if undefined")
int versions;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public class Policies {
example = "{replication:{config:[{destination: clusterA, interval: 12H}]}}")
@Valid
Replication replication;

@Schema(
description =
"History as required in /tables API request. This field holds the snapshot retention specification.",
example = "{history:{maxAge:3, granularity: 'day', versions: 5}}")
@Valid
History history;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.linkedin.openhouse.tables.api.validator.impl;

import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.History;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class HistoryPolicySpecValidator {

private String failureMessage = "";
private String errorField = "";

protected boolean validate(History history, TableUri tableUri) {
if (history != null) {
if (history.getMaxAge() <= 0 && history.getVersions() <= 0) {
failureMessage =
String.format(
"Must define either a time based retention or count based retention for snapshots in table %s",
tableUri);
return false;
}

if (history.getGranularity() == null && history.getMaxAge() > 0
|| history.getGranularity() != null && history.getMaxAge() <= 0) {
failureMessage =
String.format(
"Incorrect maxAge specified. history.maxAge must be defined together with history.granularity for table %s",
tableUri);
return false;
}

if (!validateHistoryConfigMaxAgeWithinBounds(history)) {
failureMessage =
String.format(
"History for the table [%s] max age must be between 1 to 3 days", tableUri);
return false;
}

if (!validateHistoryConfigVersionsWithinBounds(history)) {
failureMessage =
String.format("History for the table [%s] must be between 2 to 100 versions", tableUri);
return false;
}
}
return true;
}

/**
* Validate that the amount of time to retain history of table snapshots is between 1 and 3 days
*
* @param history
* @return
*/
protected boolean validateHistoryConfigMaxAgeWithinBounds(History history) {
int maxAge = history.getMaxAge();
TimePartitionSpec.Granularity granularity = history.getGranularity();
// if maxAge is 0 then consider it undefined and refer to default for snapshot expiration
if (maxAge == 0) {
return true;
}

if (granularity.equals(TimePartitionSpec.Granularity.HOUR)
|| granularity.equals(TimePartitionSpec.Granularity.DAY)) {
return (maxAge <= 3 && granularity.equals(TimePartitionSpec.Granularity.DAY)
|| maxAge <= 72 && granularity.equals(TimePartitionSpec.Granularity.HOUR))
&& (maxAge >= 1 && granularity.equals(TimePartitionSpec.Granularity.DAY)
|| maxAge >= 24 && granularity.equals(TimePartitionSpec.Granularity.HOUR));
}

return false;
}

/*
* Validate that the number of versions to retain history of table snapshots is between 2 and 100
* We want at least 2 versions so that users can always rollback to at least 1 version before a commit
*/
protected boolean validateHistoryConfigVersionsWithinBounds(History history) {
if (history.getVersions()
== 0) { // versions is 0 then consider it undefined and refer to default for snapshot
// expiration
return true;
}
int versions = history.getVersions();
return versions >= 2 && versions <= 100;
}

public String getMessage() {
return failureMessage;
}

public String getField() {
return errorField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public class OpenHouseTablesApiValidator implements TablesApiValidator {

@Autowired private Validator validator;

@Autowired private PoliciesSpecValidator policiesSpecValidator;
@Autowired private RetentionPolicySpecValidator retentionPolicySpecValidator;

@Autowired private ClusteringSpecValidator clusteringSpecValidator;

@Autowired private ReplicationConfigValidator replicationConfigValidator;

@Autowired private HistoryPolicySpecValidator historyPolicySpecValidator;

@Override
public void validateGetTable(String databaseId, String tableId) {
List<String> validationFailures = new ArrayList<>();
Expand Down Expand Up @@ -132,7 +134,7 @@ private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequ
.clusterId(createUpdateTableRequestBody.getClusterId())
.databaseId(createUpdateTableRequestBody.getDatabaseId())
.build();
if (!policiesSpecValidator.validate(
if (!retentionPolicySpecValidator.validate(
createUpdateTableRequestBody.getPolicies(),
createUpdateTableRequestBody.getTimePartitioning(),
tableUri,
Expand All @@ -141,7 +143,8 @@ private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequ
Arrays.asList(
String.format(
"%s : %s",
policiesSpecValidator.getField(), policiesSpecValidator.getMessage())));
retentionPolicySpecValidator.getField(),
retentionPolicySpecValidator.getMessage())));
}
if (createUpdateTableRequestBody.getPolicies() != null
&& createUpdateTableRequestBody.getPolicies().getReplication() != null) {
Expand All @@ -155,6 +158,18 @@ private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequ
replicationConfigValidator.getMessage())));
}
}
if (createUpdateTableRequestBody.getPolicies() != null
&& createUpdateTableRequestBody.getPolicies().getHistory() != null) {
if (!historyPolicySpecValidator.validate(
createUpdateTableRequestBody.getPolicies().getHistory(), tableUri)) {
throw new RequestValidationFailureException(
Arrays.asList(
String.format(
"%s : %s",
historyPolicySpecValidator.getField(),
historyPolicySpecValidator.getMessage())));
}
}
}

@SuppressWarnings("checkstyle:OperatorWrap")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import org.springframework.stereotype.Component;

/**
* PoliciesSpecValidator is a custom validator to validate the input values for period in retention
* policy. This custom validator can be used to add validators for fields in policies
* RetentionPolicySpecValidator is a custom validator to validate the input values for period in
* retention policy.
*/
@Component
@Slf4j
public class PoliciesSpecValidator {
public class RetentionPolicySpecValidator {

private String failureMessage = "";

Expand Down
Loading

0 comments on commit 92bce12

Please sign in to comment.