diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java index ea31f4a..1533a2a 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java @@ -27,7 +27,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { .filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null)) .collect(Collectors.toList()); - return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields); + return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), + dataType.isNullable()); } else { switch (dataType.getSqlTypeName()) { case INTEGER: @@ -42,6 +43,14 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable()); case CHAR: return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable()); + case BOOLEAN: + return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable()); + case ARRAY: + return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())), + dataType.isNullable()); + // TODO support map types + // case MAP: + // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable()); case UNKNOWN: case NULL: return Schema.createUnion(Schema.create(Schema.Type.NULL)); @@ -56,14 +65,18 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro return avro(namespace, name, relProtoDataType.apply(factory)); } - private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) { + private static Schema createAvroSchemaWithNullability(Schema schema, boolean nullable) { if (nullable) { - return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType)); + return Schema.createUnion(Schema.create(Schema.Type.NULL), schema); } else { - return Schema.create(rawType); + return schema; } } + private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) { + return createAvroSchemaWithNullability(Schema.create(rawType), nullable); + } + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { @@ -74,17 +87,24 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) .collect(Collectors.toList())); case INT: - // schema.isNullable() should be false for basic types iiuc - return createRelTypeWithNullability(typeFactory, SqlTypeName.INTEGER, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.INTEGER); case LONG: - return createRelTypeWithNullability(typeFactory, SqlTypeName.BIGINT, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.BIGINT); case ENUM: + case FIXED: case STRING: - return createRelTypeWithNullability(typeFactory, SqlTypeName.VARCHAR, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.VARCHAR); case FLOAT: - return createRelTypeWithNullability(typeFactory, SqlTypeName.FLOAT, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.FLOAT); case DOUBLE: - return createRelTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.DOUBLE); + case BOOLEAN: + return createRelType(typeFactory, SqlTypeName.BOOLEAN); + case ARRAY: + return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1); +// TODO support map types +// case MAP: +// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory)); case UNION: if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); @@ -102,9 +122,9 @@ public static RelDataType rel(Schema schema) { return rel(schema, DataType.DEFAULT_TYPE_FACTORY); } - private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) { RelDataType rawType = typeFactory.createSqlType(typeName); - return typeFactory.createTypeWithNullability(rawType, nullable); + return typeFactory.createTypeWithNullability(rawType, false); } public static RelProtoDataType proto(Schema schema) { diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index 6497795..a5f9d90 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -1,23 +1,32 @@ package com.linkedin.hoptimator.catalog; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.rel2sql.RelToSqlConverter; +import org.apache.calcite.rel.rel2sql.SqlImplementor; import org.apache.calcite.sql.SqlWriter; //import org.apache.calcite.sql.SqlWriterConfig; import org.apache.calcite.sql.SqlDataTypeSpec; -import org.apache.calcite.sql.SqlRowTypeNameSpec; import org.apache.calcite.sql.SqlBasicTypeNameSpec; +import org.apache.calcite.sql.SqlCollectionTypeNameSpec; +import org.apache.calcite.sql.SqlRowTypeNameSpec; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlRowTypeNameSpec; +import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.fun.SqlRowOperator; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.pretty.SqlPrettyWriter; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.rel2sql.RelToSqlConverter; -import org.apache.calcite.rel.rel2sql.SqlImplementor; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.util.SqlShuttle; import java.util.Map; import java.util.List; @@ -129,9 +138,31 @@ public QueryImplementor(RelNode relNode) { public void implement(SqlWriter w) { RelToSqlConverter converter = new RelToSqlConverter(w.getDialect()); SqlImplementor.Result result = converter.visitRoot(relNode); - w.literal(result.asSelect().toSqlString(w.getDialect()).getSql()); + SqlSelect select = result.asSelect(); + if (select.getSelectList() != null) { + select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); + } + w.literal(select.toSqlString(w.getDialect()).getSql()); } - } + + // A `ROW(...)` operator which will unparse as just `(...)`. + private final SqlRowOperator IMPLIED_ROW_OPERATOR = new SqlRowOperator(""); // empty string name + + // a shuttle that replaces `Row(...)` with just `(...)` + private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() { + @Override + public SqlNode visit(SqlCall call) { + List operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList()); + if ((call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST + || call.getOperator() instanceof SqlRowOperator) + && operands.size() > 1) { + return IMPLIED_ROW_OPERATOR.createCall(call.getParserPosition(), operands); + } else { + return call.getOperator().createCall(call.getParserPosition(), operands); + } + } + }; + } /** * Implements a CREATE TABLE...WITH... DDL statement. @@ -291,6 +322,10 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { .map(x -> toSpec(x)) .collect(Collectors.toList()); return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO)); + } if (dataType.getComponentType() != null) { + return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec( + dataType.getComponentType().getSqlTypeName(), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO), + SqlParserPos.ZERO)); } else { return maybeNullable(dataType, new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO)); } @@ -298,7 +333,7 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSpec spec) { if (!dataType.isNullable()) { - return spec.withNullable(true); + return spec.withNullable(false); } else { // we don't want "VARCHAR NULL", only "VARCHAR NOT NULL" return spec; diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java new file mode 100644 index 0000000..005f4b6 --- /dev/null +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java @@ -0,0 +1,42 @@ +package com.linkedin.hoptimator.catalog; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.Litmus; +import org.apache.avro.Schema; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class AvroConverterTest { + + @Test + public void convertsNestedSchemas() { + String schemaString = "{\"type\":\"record\",\"name\":\"E\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"h\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"H\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"A\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"A\",\"fields\":[]}]}]}]}]}"; + + Schema avroSchema1 = (new Schema.Parser()).parse(schemaString); + RelDataType rel1 = AvroConverter.rel(avroSchema1); + assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size()); + assertTrue(rel1.toString(), rel1.getField("h", false, false) != null); + RelDataType rel2 = rel1.getField("h", false, false).getType(); + assertTrue(rel2.toString(), rel2.isNullable()); + Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1); + assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size()); + assertTrue(rel2.toString(), rel2.getField("A", false, false) != null); + RelDataType rel3 = rel2.getField("A", false, false).getType(); + assertTrue(rel3.toString(), rel3.isNullable()); + Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1); + assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size()); + Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1); + assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable()); + assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount()); + Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2); + assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable()); + assertEquals(avroSchema5.toString(), avroSchema5.getTypes().get(1).getFields().size(), rel2.getFieldCount()); + Schema avroSchema6 = AvroConverter.avro("NS", "R", rel3); + assertEquals(avroSchema6.toString(), avroSchema6.getTypes().get(1).getFields().size(), rel3.getFieldCount()); + RelDataType rel4 = AvroConverter.rel(avroSchema4); + assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW)); + } +} diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java index 1ce2cb6..6cfcb84 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java @@ -1,12 +1,14 @@ package com.linkedin.hoptimator.planner; import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.util.Litmus; import com.linkedin.hoptimator.catalog.Resource; import com.linkedin.hoptimator.catalog.ResourceProvider; @@ -77,6 +79,7 @@ public ScriptImplementor query() { /** Script ending in INSERT INTO ... */ public ScriptImplementor insertInto(HopTable sink) { + RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW); return script.database(sink.database()).with(sink) .insert(sink.database(), sink.name(), relNode); }