From d37b3c226bc1b96c90e7a23effdab4888c431068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9rique=20Mittelstaedt?= Date: Thu, 31 Aug 2023 21:15:48 +0100 Subject: [PATCH] add failed offset to Flink error messages --- .../v1/RoutableKafkaIngressDeserializer.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java index 1689f466c..70146707f 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java @@ -45,7 +45,7 @@ public RoutableKafkaIngressDeserializer(Map routingConfig public Message deserialize(ConsumerRecord input) { final String topic = input.topic(); final byte[] payload = input.value(); - final byte[] key = requireNonNullKey(input.key()); + final byte[] key = requireNonNullKey(input); final String id = new String(key, StandardCharsets.UTF_8); final RoutingConfig routingConfig = routingConfigs.get(topic); @@ -60,15 +60,16 @@ public Message deserialize(ConsumerRecord input) { .build(); } - private byte[] requireNonNullKey(byte[] key) { + private byte[] requireNonNullKey(ConsumerRecord input) { + final byte[] key = input.key(); + if (key == null) { TypeName tpe = RoutableKafkaIngressBinderV1.KIND_TYPE; - throw new IllegalStateException( - "The " - + tpe.namespace() - + "/" - + tpe.name() - + " ingress requires a UTF-8 key set for each record."); + String errorMessage = + String.format( + "The %s/%s ingress requires a UTF-8 key set for each record. The message in topic \"%s\", partition %d at offset %d does not have a key set.", + tpe.namespace(), tpe.name(), input.topic(), input.partition(), input.offset()); + throw new IllegalStateException(errorMessage); } return key; }