diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java index 1689f466c..70146707f 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java @@ -45,7 +45,7 @@ public RoutableKafkaIngressDeserializer(Map routingConfig public Message deserialize(ConsumerRecord input) { final String topic = input.topic(); final byte[] payload = input.value(); - final byte[] key = requireNonNullKey(input.key()); + final byte[] key = requireNonNullKey(input); final String id = new String(key, StandardCharsets.UTF_8); final RoutingConfig routingConfig = routingConfigs.get(topic); @@ -60,15 +60,16 @@ public Message deserialize(ConsumerRecord input) { .build(); } - private byte[] requireNonNullKey(byte[] key) { + private byte[] requireNonNullKey(ConsumerRecord input) { + final byte[] key = input.key(); + if (key == null) { TypeName tpe = RoutableKafkaIngressBinderV1.KIND_TYPE; - throw new IllegalStateException( - "The " - + tpe.namespace() - + "/" - + tpe.name() - + " ingress requires a UTF-8 key set for each record."); + String errorMessage = + String.format( + "The %s/%s ingress requires a UTF-8 key set for each record. The message in topic \"%s\", partition %d at offset %d does not have a key set.", + tpe.namespace(), tpe.name(), input.topic(), input.partition(), input.offset()); + throw new IllegalStateException(errorMessage); } return key; }