Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add failed offset to Flink error messages #336

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public RoutableKafkaIngressDeserializer(Map<String, RoutingConfig> routingConfig
public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
final String topic = input.topic();
final byte[] payload = input.value();
final byte[] key = requireNonNullKey(input.key());
Copy link

@kazimirov999 kazimirov999 Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I'd like your thoughts on the idea of incorporating the ability to skip records with null keys. Additionally, I'm thinking of making this functionality configurable. What are your thoughts on this?

kind: io.statefun.kafka.v1/ingress
spec:
  id: localhost:9092
  consumerGroupId: test-group-dev
  startupPosition:
    type: earliest
  properties:
    - isolation.level: read_committed
    - security.protocol: SSL
    - ssl.truststore.location: /tmp/flink/jks/truststore.jks
    - ssl.truststore.password: changeit
    - ssl.keystore.location: /tmp/flink/jks/kafka_keystore.jks
    - ssl.keystore.password: test
    - ssl.key.password: test
  topics:
    - topic: test-dev
      skipNullKeyRecords: true
      valueType: ua.Test
      targets:
        - ua.Test

default can be skipNullKeyRecords: false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might work. The NoOpDeserliazer just returns null, which could be done here as well.

private static class NoOpDeserializer implements KafkaIngressDeserializer<String> {

final byte[] key = requireNonNullKey(input);
final String id = new String(key, StandardCharsets.UTF_8);

final RoutingConfig routingConfig = routingConfigs.get(topic);
Expand All @@ -60,15 +60,16 @@ public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
.build();
}

private byte[] requireNonNullKey(byte[] key) {
private byte[] requireNonNullKey(ConsumerRecord<byte[], byte[]> input) {
final byte[] key = input.key();

if (key == null) {
TypeName tpe = RoutableKafkaIngressBinderV1.KIND_TYPE;
throw new IllegalStateException(
"The "
+ tpe.namespace()
+ "/"
+ tpe.name()
+ " ingress requires a UTF-8 key set for each record.");
String errorMessage =
String.format(
"The %s/%s ingress requires a UTF-8 key set for each record. The message in topic \"%s\", partition %d at offset %d does not have a key set.",
tpe.namespace(), tpe.name(), input.topic(), input.partition(), input.offset());
throw new IllegalStateException(errorMessage);
}
return key;
}
Expand Down