Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor source executor (part 1) #15103

Merged
merged 1 commit into from
Feb 19, 2024

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented Feb 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Rename stream_source_splits and state_cache according to how they are updated.

They are very similar and can be wrongly used. Actually only one of them may be enough. This PR only does the renaming first to make it easy to review.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@xxchan xxchan changed the title refactor: refactor SourceExecutor refactor: refactor source executor (part 1) Feb 18, 2024
@xxchan xxchan marked this pull request as ready for review February 18, 2024 03:37
Comment on lines +442 to +453
self.stream_source_core.latest_split_info.get_mut(id).map(
|origin_split| {
origin_split.update_in_place(offset.clone())?;
Ok::<_, anyhow::Error>((id.clone(), origin_split.clone()))
},
)
})
.try_collect()?;

self.stream_source_core.state_cache.extend(state);
self.stream_source_core
.updated_splits_in_epoch
.extend(state);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can see they are updated in the same way on data chunk. Plan to unify them later.

Comment on lines -255 to 274
// fetch the newest offset, either it's in cache (before barrier)
// or in state table (just after barrier)
let target_state = if core.state_cache.is_empty() {
for ele in &mut *split_info {
if let Some(recover_state) = core
.split_state_store
.try_recover_from_state_store(ele)
.await?
{
*ele = recover_state;
}
}
split_info.to_owned()
} else {
core.state_cache
.values()
.map(|split_impl| split_impl.to_owned())
.collect_vec()
};

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a bug: When state_cache is some, we shouldn't use it to rebuild stream reader, since it only contains splits updated in this epoch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, this is hard to trigger. In my local testing I found that rebuild_stream_reader_from_error isn't triggered even when external Kafka/PG is killed...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is a bug: When state_cache is some, we shouldn't use it to rebuild stream reader, since it only contains splits updated in this epoch.

Source attempts to recover from the latest successful offset so we always recover from cache.
The case you mentioned above occurs when some partitions have no new data in one epoch, which means partitioning imbalance and this can hardly happen in MQs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my local testing I found that rebuild_stream_reader_from_error isn't triggered even when external Kafka/PG is killed...

Kafka SDK handle Kafka broker timeout internally and will keep trying to reconnect. For kinesis, the logic will be triggered when network issue happens. 😇

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when some partitions have no new data in one epoch, which means partitioning imbalance and this can hardly happen in MQs.

I think It's not that impossible. Imagine the source is idle for a while, and then only 1 message is produced. Can also happen if user's key is imbalanced according to their bussiness logic.

Comment on lines -516 to -519
if let Some(target_state) = &target_state {
latest_split_info = target_state.clone();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the local variable latest_split_info can be replaced by the field.

Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, PTAL @shanicky

@xxchan xxchan requested a review from tabVersion February 18, 2024 05:54

let target_state = core.latest_split_info.values().cloned().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some correctness concerns here. Seems you are resetting the process to the offset where the last successful barrier came, taking the time as T0.
But later an error occurs at time T1 and requires rebuilding source internally. The logic here may lead to read data from T0 to T1 twice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expected logic here is taking a union of both state_cache and split_info to make sure resetting every assigned split to its latest offset.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you are resetting the process to the offset where the last successful barrier came

No. latest_split_info is also updated on every message chunk, so its offset is always up to date.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed very confusing and also why I wanted to use just one.

image.png

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest lgtm
thanks for finding bugs.

// state cache may be stale
for existing_split_id in core.stream_source_splits.keys() {
// Checks dropped splits
for existing_split_id in core.latest_split_info.keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also delete removed items in latest_split_info here because it is seen as the ground truth of split assignment when doing recovery internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is updated later in persist_state_and_clear_cache using target_state.. This is how it works previously. This PR doesn't want to change it. #15104 just changes this.

@xxchan xxchan added this pull request to the merge queue Feb 19, 2024
Merged via the queue into main with commit 3ce0996 Feb 19, 2024
35 of 36 checks passed
@xxchan xxchan deleted the 02-16-refactor_refactor_SourceExecutor branch February 19, 2024 05:46
TennyZhuang added a commit that referenced this pull request Feb 21, 2024
TennyZhuang added a commit that referenced this pull request Feb 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants