-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE-344] Deserialize json in kafka source #354
[ISSUE-344] Deserialize json in kafka source #354
Conversation
Hi @Leomrlin could you help reivew my pr? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions for modification.
public void init(Configuration conf, StructType schema) { | ||
this.schema = Objects.requireNonNull(schema); | ||
this.mapper = new ObjectMapper(); | ||
this.ignoreParseError = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be possible to invoke member methods of conf instead of static methods here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will fix this.
if (ignoreParseError) { | ||
// return row with null value | ||
IntStream.range(0, schema.size()).forEach(i -> values[i] = null); | ||
return Collections.singletonList(ObjectRow.create(values)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Record where all fields are null might not be very meaningful in terms of graph construction and computation, and it might be more appropriate to skip such records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will return empty List in this case.
IntStream.range(0, schema.size()).forEach(i -> values[i] = null); | ||
return Collections.singletonList(ObjectRow.create(values)); | ||
} else { | ||
throw new GeaflowRuntimeException("fail to deserialize record " + record + " due to " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception e should be wrapped and then thrown.
JsonNode value = jsonNode.get(fieldName); | ||
IType<?> type = schema.getType(i); | ||
if (value == null) { | ||
if (failOnMissingField) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be a slight difference between missing and null values, potentially risking broadening the meaning of the failOnMissingField configuration option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got what you mean,. faliOnMissField should mean no such field name exists in the json string. Another case is that the field name still exists, but value is null.
return (TableDeserializer<IN>) new TextDeserializer(); | ||
this.connectorFormat = conf.getString(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT, | ||
(String) ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT.getDefaultValue()); | ||
if (this.connectorFormat.equals(ConnectorConstants.CONNECTOR_FORMAT_JSON)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's recommended to use a factory class, so that components can be modified or added/removed in the future more easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leomrlin sure, another question is currently some table sources directly return TableDeserializer object, for example HiveTableSource
return new RowTableDeserializer();
. If I introduce a factory class for TableDeserializer, do you think it's appropriate for all the TableSource to use factory class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Leomrlin sure, another question is currently some table sources directly return TableDeserializer object, for example
HiveTableSource
returnnew RowTableDeserializer();
. If I introduce a factory class for TableDeserializer, do you think it's appropriate for all the TableSource to use factory class?
It would be ideal if all types of Deserializers could be unified into a factory class. Upon reviewing, I have noted that there are currently two basic implementation classes: RowTableDeserializer and TextDeserializer. In the future, there might be additional implementation classes. If they are exclusive to a plugin like Paimon, they can be included within the plugin package. I think that JsonDeserializer could serve as a basic implementation class and be incorporated into the factory class.
Hi @qingfei1994 , I've reviewed the PR you submitted and wanted to share a few points of feedback. I've outlined my suggestions for modification.Thank you! |
Hi @Leomrlin, I've updated my pull request as per your suggestion, could you further check? |
LGTM |
What changes were proposed in this pull request?
How was this PR tested?