Skip to content

Commit

Permalink
Support skipping nested Rows
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Apr 1, 2024
1 parent 8edcd27 commit ad8449f
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/** Common data types. Not authoratitive or exhaustive. */
public enum DataType {

VARCHAR_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)),
VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false));

public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
Expand Down Expand Up @@ -56,16 +56,24 @@ public static Struct struct(RelDataType relDataType) {
/** Convenience builder for non-scalar types */
public interface Struct extends RelProtoDataType {

default Struct with(String name, DataType dataType) {
default Struct with(String name, RelDataType dataType) {
return x -> {
RelDataType existing = apply(x);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x);
builder.addAll(existing.getFieldList());
builder.add(name, dataType.rel(x));
builder.add(name, dataType);
return builder.build();
};
}

default Struct with(String name, DataType dataType) {
return with(name, dataType.rel());
}

default Struct with(String name, Struct struct) {
return with(name, struct.rel());
}

default RelDataType rel() {
return apply(DEFAULT_TYPE_FACTORY);
}
Expand All @@ -85,6 +93,17 @@ default Struct drop(String name) {
};
}

default Struct dropNestedRows() {
return x -> {
RelDataType dataType = apply(x);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x);
builder.addAll(dataType.getFieldList().stream()
.filter(y -> y.getType().getSqlTypeName() != SqlTypeName.ROW)
.collect(Collectors.toList()));
return builder.build();
};
}

default Struct get(String name) {
return x -> {
RelDataTypeField field = apply(x).getField(name, true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;

import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/** Resolves a table name into a concrete row type. Usually involves a network call. */
public interface TableResolver {
RelDataType resolve(String table) throws InterruptedException, ExecutionException;

static TableResolver from(Function<String, RelDataType> f) {
return x -> f.apply(x);
}

/** Appends an extra column to the resolved type */
default TableResolver with(String name, RelDataType dataType) {
return x -> {
Expand All @@ -19,4 +25,20 @@ default TableResolver with(String name, RelDataType dataType) {
return builder.build();
};
}

default TableResolver with(String name, DataType dataType) {
return with(name, dataType.rel());
}

default TableResolver with(String name, DataType.Struct struct) {
return with(name, struct.rel());
}

default TableResolver mapStruct(Function<DataType.Struct, DataType.Struct> f) {
return x -> f.apply(DataType.struct(resolve(x))).rel();
}

default TableResolver map(Function<RelDataType, RelDataType> f) {
return x -> f.apply(resolve(x));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.rel.type.RelDataType;

import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class DataTypeTest {

@Test
public void skipsNestedRows() {
DataType.Struct struct = DataType.struct().with("one", DataType.VARCHAR)
.with("two", DataType.struct().with("three", DataType.VARCHAR));
RelDataType row1 = struct.rel();
assertTrue(row1.toString(), row1.getFieldCount() == 2);
assertTrue(row1.toString(), row1.getField("one", false, false) != null);
assertTrue(row1.toString(), row1.getField("two", false, false) != null);
RelDataType row2 = struct.dropNestedRows().rel();
assertTrue(row2.toString(), row2.getFieldCount() == 1);
assertTrue(row2.toString(), row2.getField("one", false, false) != null);
assertTrue(row2.toString(), row2.getField("two", false, false) == null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public void implementsFlinkCreateTableDDL() {
// Output isn't necessarily deterministic, but should be something like:
// CREATE TABLE IF NOT EXISTS "DATABASE"."TABLE1" ("idValue1" VARCHAR) WITH
// ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='topic1')
assertTrue(out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out.contains("'connector'='kafka'"));
assertTrue(out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out.contains("'topic'='topic1'"));
assertFalse(out.contains("Row"));
assertTrue(out, out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH "));
assertTrue(out, out.contains("'connector'='kafka'"));
assertTrue(out, out.contains("'properties.bootstrap.servers'='localhost:9092'"));
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
String principal = (String) operand.getOrDefault("principal", "User:ANONYMOUS");
Map<String, Object> clientConfig = (Map<String, Object>) operand.get("clientConfig");
DataType.Struct rowType = DataType.struct()
.with("PAYLOAD", DataType.VARCHAR_NULL)
.with("KEY", DataType.VARCHAR_NULL);
.with("PAYLOAD", DataType.VARCHAR)
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public Result reconcile(Request request) {
// Mark the Subscription as failed.
status.setFailed(true);
status.setMessage("Error: " + e.getMessage());
result = new Result(true, operator.failureRetryDuration());
}
} else if (status.getReady() == null && status.getResources() != null) {
// Phase 2
Expand Down

0 comments on commit ad8449f

Please sign in to comment.