Skip to content

Commit

Permalink
[ISSUE-344] Deserialize json in kafka source (#354)
Browse files Browse the repository at this point in the history
* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source
  • Loading branch information
qingfei1994 authored May 6, 2024
1 parent f804128 commit 526a107
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ public class ConnectorConfigKeys implements Serializable {
.description("Specifies the starting unix timestamp for reading the data table. Format "
+ "must be 'yyyy-MM-dd HH:mm:ss'.");

public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT = ConfigKeys
.key("geaflow.dsl.connector.format")
.defaultValue("text")
.description("Specifies the deserialization format for reading from external source like kafka, "
+ "possible option currently: json/text");


public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR = ConfigKeys
.key("geaflow.dsl.connector.format.json.ignore-parse-error")
.defaultValue(false)
.description("for json format, skip fields and rows with parse errors instead of failing. "
+ "Fields are set to null in case of errors.");

public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_FAIL_ON_MISSING_FIELD = ConfigKeys
.key("geaflow.dsl.connector.format.json.fail-on-missing-field")
.defaultValue(false)
.description("for json format, whether to fail if a field is missing or not.");

/*************************************************
* FILE Connector Parameters.
*************************************************/
Expand All @@ -72,3 +90,5 @@ public class ConnectorConfigKeys implements Serializable {
.defaultValue(false)
.description("Whether skip the header for csv format.");
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.antgroup.geaflow.dsl.connector.api.serde;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;

public class DeserializerFactory {

public static <IN> TableDeserializer<IN> loadDeserializer(Configuration conf) {
String connectorFormat = conf.getString(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT,
(String) ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT.getDefaultValue());
if (connectorFormat.equals(ConnectorConstants.CONNECTOR_FORMAT_JSON)) {
return (TableDeserializer<IN>) new JsonDeserializer();
} else {
return (TableDeserializer<IN>) new TextDeserializer();
}
}

public static <IN> TableDeserializer<IN> loadRowTableDeserializer() {
return (TableDeserializer<IN>) new RowTableDeserializer();
}

public static <IN> TableDeserializer<IN> loadTextDeserializer() {
return (TableDeserializer<IN>) new TextDeserializer();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.antgroup.geaflow.dsl.connector.api.serde.impl;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.type.IType;
import com.antgroup.geaflow.dsl.common.data.Row;
import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
import com.antgroup.geaflow.dsl.common.types.StructType;
import com.antgroup.geaflow.dsl.common.util.TypeCastUtil;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class JsonDeserializer implements TableDeserializer<String> {

private StructType schema;

private ObjectMapper mapper;

private boolean ignoreParseError;

private boolean failOnMissingField;


@Override
public void init(Configuration conf, StructType schema) {
this.schema = Objects.requireNonNull(schema);
this.mapper = new ObjectMapper();
this.ignoreParseError = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR);
this.failOnMissingField = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_FAIL_ON_MISSING_FIELD);

}

@Override
public List<Row> deserialize(String record) {
if (record == null || record.isEmpty()) {
return Collections.emptyList();
}
Object[] values = new Object[schema.size()];
JsonNode jsonNode = null;
try {
jsonNode = mapper.readTree(record);
} catch (JsonProcessingException e) {
// handle exception according to configuration
if (ignoreParseError) {
// return empty list
return Collections.emptyList();
} else {
throw new GeaflowRuntimeException("fail to deserialize record " + record , e);
}
}
// if json node is null
for (int i = 0 ; i < schema.size() ; i++) {
String fieldName = schema.getFieldNames().get(i);
if (failOnMissingField) {
if (!jsonNode.has(fieldName)) {
throw new GeaflowRuntimeException("fail to deserialize record " + record + " due to missing field " + fieldName );
}
}
JsonNode value = jsonNode.get(fieldName);
IType<?> type = schema.getType(i);
// cast the value to the type defined in the schema.
if (value != null) {
values[i] = TypeCastUtil.cast(value.asText(), type);
} else {
values[i] = null;
}

}
return Collections.singletonList(ObjectRow.create(values));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@
public class ConnectorConstants {

public static final String START_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String CONNECTOR_FORMAT_JSON = "json";
public static final String CONNECTOR_FORMAT_TEXT = "text";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.antgroup.geaflow.dsl.connector.api;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.type.primitive.BinaryStringType;
import com.antgroup.geaflow.common.type.primitive.IntegerType;
import com.antgroup.geaflow.dsl.common.data.Row;
import com.antgroup.geaflow.dsl.common.types.StructType;
import com.antgroup.geaflow.dsl.common.types.TableField;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
import org.junit.Test;
import org.testng.Assert;

import java.util.Collections;
import java.util.List;

public class JsonDeserializerTest {

@Test
public void testDeserialize() {
JsonDeserializer deserializer = new JsonDeserializer();
StructType dataSchema = new StructType(
new TableField("id", IntegerType.INSTANCE, false),
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
deserializer.init(new Configuration(), dataSchema);
List<Row> row = deserializer.deserialize("{\"id\":1, \"name\":\"amy\", \"age\":10}");
List<Row> rowWithNull = deserializer.deserialize("{\"id\":1, \"name\":\"amy\"}");
Assert.assertEquals(row.get(0).getField(0, IntegerType.INSTANCE), 1);
Assert.assertEquals(row.get(0).getField(1, BinaryStringType.INSTANCE).toString(), "amy");
Assert.assertEquals(row.get(0).getField(2, IntegerType.INSTANCE), 10);
Assert.assertEquals(rowWithNull.get(0).getField(0, IntegerType.INSTANCE), 1);
Assert.assertEquals(rowWithNull.get(0).getField(1, BinaryStringType.INSTANCE).toString(), "amy");
Assert.assertEquals(rowWithNull.get(0).getField(2, IntegerType.INSTANCE), null);

}


@Test
public void testDeserializeEmptyString() {
JsonDeserializer deserializer = new JsonDeserializer();
StructType dataSchema = new StructType(
new TableField("id", IntegerType.INSTANCE, false),
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
deserializer.init(new Configuration(), dataSchema);
List<Row> rows = deserializer.deserialize("");
List<Row> testNullRows = deserializer.deserialize(null);
Assert.assertEquals(rows, Collections.emptyList());
Assert.assertEquals(testNullRows, Collections.emptyList());

}

@Test(expected = GeaflowRuntimeException.class)
public void testDeserializeParseError() {
JsonDeserializer deserializer = new JsonDeserializer();
StructType dataSchema = new StructType(
new TableField("id", IntegerType.INSTANCE, false),
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
deserializer.init(new Configuration(), dataSchema);
List<Row> rows = deserializer.deserialize("test");
}

@Test
public void testDeserializeIgnoreParseError() {
JsonDeserializer deserializer = new JsonDeserializer();
StructType dataSchema = new StructType(
new TableField("id", IntegerType.INSTANCE, false),
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
Configuration conf = new Configuration();
conf.put("geaflow.dsl.connector.format.json.ignore-parse-error", "true");
deserializer.init(conf, dataSchema);
List<Row> rows = deserializer.deserialize("test");
Assert.assertEquals(rows, Collections.emptyList());
}

@Test(expected = GeaflowRuntimeException.class)
public void testDeserializeFailOnMissingField() {
JsonDeserializer deserializer = new JsonDeserializer();
StructType dataSchema = new StructType(
new TableField("id", IntegerType.INSTANCE, false),
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
Configuration conf = new Configuration();
conf.put("geaflow.dsl.connector.format.json.fail-on-missing-field", "true");
deserializer.init(conf, dataSchema);
List<Row> rowWithMissingField = deserializer.deserialize("{\"id\":1, \"name\":\"amy\"}");

}




}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
import com.antgroup.geaflow.dsl.connector.hive.adapter.HiveVersionAdapter;
import com.antgroup.geaflow.dsl.connector.hive.adapter.HiveVersionAdapters;
import com.antgroup.geaflow.dsl.connector.hive.util.HiveUtils;
Expand Down Expand Up @@ -172,7 +172,7 @@ private boolean accept(Object[] partitionValues) {
@SuppressWarnings("unchecked")
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
return (TableDeserializer<IN>) new RowTableDeserializer();
return DeserializerFactory.loadRowTableDeserializer();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
import com.antgroup.geaflow.dsl.connector.jdbc.util.JDBCUtils;
import java.io.IOException;
import java.sql.Connection;
Expand Down Expand Up @@ -137,7 +137,7 @@ public List<Partition> listPartitions() {

@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
return (TableDeserializer<IN>) new RowTableDeserializer();
return DeserializerFactory.loadRowTableDeserializer();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.connector.kafka;

import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
Expand All @@ -26,8 +24,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
import com.antgroup.geaflow.dsl.connector.kafka.utils.KafkaConstants;
import java.io.IOException;
Expand All @@ -50,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaTableSource implements TableSource {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableSource.class);
Expand All @@ -63,6 +62,8 @@ public class KafkaTableSource implements TableSource {
private long windowSize;
private int startTime;
private Properties props;
private String connectorFormat;
private TableSchema schema;

private transient KafkaConsumer<String, String> consumer;

Expand Down Expand Up @@ -90,6 +91,7 @@ public void init(Configuration conf, TableSchema tableSchema) {
startTime = DateTimeUtil.toUnixTime(startTimeStr, ConnectorConstants.START_TIME_FORMAT);
}

this.schema = tableSchema;
this.props = new Properties();
props.setProperty(KafkaConstants.KAFKA_BOOTSTRAP_SERVERS, servers);
props.setProperty(KafkaConstants.KAFKA_KEY_DESERIALIZER,
Expand All @@ -99,7 +101,7 @@ public void init(Configuration conf, TableSchema tableSchema) {
props.setProperty(KafkaConstants.KAFKA_MAX_POLL_RECORDS,
String.valueOf(windowSize));
props.setProperty(KafkaConstants.KAFKA_GROUP_ID, groupId);
LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}",
LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}, connector format is : {}",
servers, topic, conf, tableSchema);
}

Expand All @@ -121,7 +123,7 @@ public List<Partition> listPartitions() {

@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
return (TableDeserializer<IN>) new TextDeserializer();
return DeserializerFactory.loadDeserializer(conf);
}

@Override
Expand Down Expand Up @@ -184,6 +186,7 @@ public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset,
}

Iterator<ConsumerRecord<String, String>> recordIterator = consumer.poll(POLL_TIMEOUT).iterator();

List<String> dataList = new ArrayList<>();
long responseMaxTimestamp = -1;
while (recordIterator.hasNext()) {
Expand Down Expand Up @@ -214,6 +217,7 @@ public void close() {
LOGGER.info("close");
}


public static class KafkaPartition implements Partition {

private final String topic;
Expand Down
Loading

0 comments on commit 526a107

Please sign in to comment.