Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-344] Deserialize json in kafka source #354

Merged
merged 6 commits into from
May 6, 2024

Conversation

qingfei1994
Copy link
Contributor

What changes were proposed in this pull request?

  • new configuration for connector.format
  • new JsonDeserializer extends TableDeserializer to deserialize string to row
  • return JsonDeserializer according to configuration in Kafka Source

How was this PR tested?

  • Tests have Added for the changes
  • Production environment verified

@qingfei1994
Copy link
Contributor Author

Hi @Leomrlin could you help reivew my pr?

Copy link
Collaborator

@Leomrlin Leomrlin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few suggestions for modification.

public void init(Configuration conf, StructType schema) {
this.schema = Objects.requireNonNull(schema);
this.mapper = new ObjectMapper();
this.ignoreParseError = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be possible to invoke member methods of conf instead of static methods here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will fix this.

if (ignoreParseError) {
// return row with null value
IntStream.range(0, schema.size()).forEach(i -> values[i] = null);
return Collections.singletonList(ObjectRow.create(values));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record where all fields are null might not be very meaningful in terms of graph construction and computation, and it might be more appropriate to skip such records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will return empty List in this case.

IntStream.range(0, schema.size()).forEach(i -> values[i] = null);
return Collections.singletonList(ObjectRow.create(values));
} else {
throw new GeaflowRuntimeException("fail to deserialize record " + record + " due to " + e.getMessage());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception e should be wrapped and then thrown.

JsonNode value = jsonNode.get(fieldName);
IType<?> type = schema.getType(i);
if (value == null) {
if (failOnMissingField) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a slight difference between missing and null values, potentially risking broadening the meaning of the failOnMissingField configuration option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got what you mean,. faliOnMissField should mean no such field name exists in the json string. Another case is that the field name still exists, but value is null.

return (TableDeserializer<IN>) new TextDeserializer();
this.connectorFormat = conf.getString(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT,
(String) ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT.getDefaultValue());
if (this.connectorFormat.equals(ConnectorConstants.CONNECTOR_FORMAT_JSON)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's recommended to use a factory class, so that components can be modified or added/removed in the future more easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Leomrlin sure, another question is currently some table sources directly return TableDeserializer object, for example HiveTableSource return new RowTableDeserializer();. If I introduce a factory class for TableDeserializer, do you think it's appropriate for all the TableSource to use factory class?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Leomrlin sure, another question is currently some table sources directly return TableDeserializer object, for example HiveTableSource return new RowTableDeserializer();. If I introduce a factory class for TableDeserializer, do you think it's appropriate for all the TableSource to use factory class?

It would be ideal if all types of Deserializers could be unified into a factory class. Upon reviewing, I have noted that there are currently two basic implementation classes: RowTableDeserializer and TextDeserializer. In the future, there might be additional implementation classes. If they are exclusive to a plugin like Paimon, they can be included within the plugin package. I think that JsonDeserializer could serve as a basic implementation class and be incorporated into the factory class.

@Leomrlin
Copy link
Collaborator

qingfei1994

Hi @qingfei1994 ,

I've reviewed the PR you submitted and wanted to share a few points of feedback. I've outlined my suggestions for modification.Thank you!

@qingfei1994
Copy link
Contributor Author

Hi @Leomrlin, I've updated my pull request as per your suggestion, could you further check?

@Leomrlin
Copy link
Collaborator

Leomrlin commented May 6, 2024

LGTM

@Leomrlin Leomrlin merged commit 526a107 into TuGraph-family:master May 6, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants