-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: refactor source executor (part 2) #15104
Conversation
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
if states.is_empty() { | ||
// TODO should be a clear Error Code | ||
bail!("states require not null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't find this check useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is ok to remove the check, no need any more
if let Some(target_splits) = target_state { | ||
let target_split_ids: HashSet<_> = | ||
target_splits.iter().map(|split| split.id()).collect(); | ||
|
||
cache.retain(|split| target_split_ids.contains(&split.id())); | ||
|
||
let dropped_splits = core | ||
.latest_split_info | ||
.extract_if(|split_id, _| !target_split_ids.contains(split_id)) | ||
.map(|(_, split)| split) | ||
.collect_vec(); | ||
|
||
if should_trim_state && !dropped_splits.is_empty() { | ||
// trim dropped splits' state | ||
core.split_state_store.trim_state(&dropped_splits).await?; | ||
} | ||
|
||
core.latest_split_info = target_splits | ||
.into_iter() | ||
.map(|split| (split.id(), split)) | ||
.collect(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section is moved to update_state_if_changed
now
Oops, deterministic test failed |
// existing split, no change, clone from cache | ||
target_state.push(s.clone()) | ||
for (split_id, split) in target_splits { | ||
if let Some(s) = core.latest_split_info.get(&split_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is a minor change. I think using latest_split_info
is better than updated_splits_in_epoch
here.
d42f9c7
to
6975649
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This reverts commit 2592880.
This reverts commit 2592880.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This refactor reduces the
target_state
variable passed around. It's hard to understand and error-prone since we already have the fields have similar meanings.We update the states immediately instead.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.