Gather attributes of downstream resources #61
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
The subscription controller now gathers "attributes" from downstream resources and reports them as
.status.attributes
. These attributes are open-ended and depend on the various downstream controllers, but currently includejobId
,numPartitions
,startTime
, and more, as reported by the Kafka topic controller and Flink operator.The Kafka topic controller now reports the actual number of partitions, which may diverge from the
kafka.numPartitions
hint. This means users can request a specific number of partitions with.spec.hints.kafka.numPartitions
and then check.status.attributes.numPartitions
to see the actual number of partitions.Details
Attributes are collected from downstream resources and the job, not from upstream/input resources. For example,
numPartitions
would refer to an output Kafka topic, not an input Kafka topic.Since it's possible for a sink/output table to have multiple physical tables under-the-hood (e.g. an adapter may create multiple Kafka topics for a single table), it's possible that attributes from different controllers will clash/conflict. We make a best effort by bubbling up the last seen attribute with a given key. This jibes with the strategy for applying "hints": they are applied across the entire set of output resources. In particular, setting the hint
kafka.numPartitions = N
will likely mean that.status.attributes.numPartitions = N
, unless an adapter is doing something goofy.Testing
Validated that the Flink operator and KafkaTopic controller bubble-up attributes to Subscriptions:
A neat consequence of this feature is that you can now check the
state
of the underlying Flink job.