Skip to content

Commit

Permalink
Revert "refactor: refactor source executor (part 2) (#15104)"
Browse files Browse the repository at this point in the history
This reverts commit 2592880.
  • Loading branch information
TennyZhuang committed Feb 21, 2024
1 parent 4197ad5 commit a4dbf2d
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 84 deletions.
172 changes: 91 additions & 81 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_connector::source::reader::desc::{SourceDesc, SourceDescBuilder};
use risingwave_connector::source::{
BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitId, SplitMetaData,
BoxChunkSourceStream, ConnectorState, SourceContext, SourceCtrlOpts, SplitMetaData,
};
use risingwave_connector::ConnectorParams;
use risingwave_storage::StateStore;
Expand Down Expand Up @@ -138,21 +138,13 @@ impl<S: StateStore> SourceExecutor<S> {
]
}

/// - `should_trim_state`: whether to trim state for dropped splits.
///
/// For scaling, the connector splits can be migrated to other actors, but
/// won't be added or removed. Actors should not trim states for splits that
/// are moved to other actors.
///
/// For source split change, split will not be migrated and we can trim states
/// for deleted splits.
/// Returns `target_states` if split changed. Otherwise `None`.
async fn apply_split_change<const BIASED: bool>(
&mut self,
source_desc: &SourceDesc,
stream: &mut StreamReaderWithPause<BIASED, StreamChunk>,
split_assignment: &HashMap<ActorId, Vec<SplitImpl>>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
) -> StreamExecutorResult<Option<Vec<SplitImpl>>> {
self.metrics
.source_split_change_count
.with_label_values(
Expand All @@ -164,96 +156,82 @@ impl<S: StateStore> SourceExecutor<S> {
)
.inc();
if let Some(target_splits) = split_assignment.get(&self.actor_ctx.id).cloned() {
if self
.update_state_if_changed(target_splits, should_trim_state)
.await?
{
self.rebuild_stream_reader(source_desc, stream).await?;
if let Some(target_state) = self.update_state_if_changed(Some(target_splits)).await? {
tracing::info!(
actor_id = self.actor_ctx.id,
state = ?target_state,
"apply split change"
);

self.replace_stream_reader_with_target_state(
source_desc,
stream,
target_state.clone(),
)
.await?;

return Ok(Some(target_state));
}
}

Ok(())
Ok(None)
}

/// Returns `true` if split changed. Otherwise `false`.
/// Returns `target_states` if split changed. Otherwise `None`.
///
/// Note: `update_state_if_changed` will modify `updated_splits_in_epoch`
async fn update_state_if_changed(
&mut self,
target_splits: Vec<SplitImpl>,
should_trim_state: bool,
) -> StreamExecutorResult<bool> {
state: ConnectorState,
) -> StreamExecutorResult<ConnectorState> {
let core = self.stream_source_core.as_mut().unwrap();

let target_splits: HashMap<_, _> = target_splits
let target_splits: HashMap<_, _> = state
.unwrap()
.into_iter()
.map(|split| (split.id(), split))
.collect();

let mut target_state: HashMap<SplitId, SplitImpl> =
HashMap::with_capacity(target_splits.len());
let mut target_state: Vec<SplitImpl> = Vec::with_capacity(target_splits.len());

let mut split_changed = false;

// Checks added splits
for (split_id, split) in target_splits {
if let Some(s) = core.latest_split_info.get(&split_id) {
// For existing splits, we should use the latest offset from the cache.
// `target_splits` is from meta and contains the initial offset.
target_state.insert(split_id, s.clone());
for (split_id, split) in &target_splits {
if let Some(s) = core.updated_splits_in_epoch.get(split_id) {
// existing split, no change, clone from cache
target_state.push(s.clone())
} else {
split_changed = true;
// write new assigned split to state cache. snapshot is base on cache.

let initial_state = if let Some(recover_state) = core
.split_state_store
.try_recover_from_state_store(&split)
.try_recover_from_state_store(split)
.await?
{
recover_state
} else {
split
split.clone()
};

core.updated_splits_in_epoch
.entry(split_id.clone())
.entry(split.id())
.or_insert_with(|| initial_state.clone());

target_state.insert(split_id, initial_state);
target_state.push(initial_state);
}
}

// Checks dropped splits
for existing_split_id in core.latest_split_info.keys() {
if !target_state.contains_key(existing_split_id) {
if !target_splits.contains_key(existing_split_id) {
tracing::info!("split dropping detected: {}", existing_split_id);
split_changed = true;
}
}

if split_changed {
tracing::info!(
actor_id = self.actor_ctx.id,
state = ?target_state,
"apply split change"
);

core.updated_splits_in_epoch
.retain(|split_id, _| target_state.get(split_id).is_some());

let dropped_splits = core
.latest_split_info
.extract_if(|split_id, _| target_state.get(split_id).is_none())
.map(|(_, split)| split)
.collect_vec();

if should_trim_state && !dropped_splits.is_empty() {
// trim dropped splits' state
core.split_state_store.trim_state(&dropped_splits).await?;
}

core.latest_split_info = target_state;
}

Ok(split_changed)
Ok(split_changed.then_some(target_state))
}

/// Rebuild stream if there is a err in stream
Expand All @@ -278,17 +256,17 @@ impl<S: StateStore> SourceExecutor<S> {
core.source_id.to_string(),
]);

self.rebuild_stream_reader(source_desc, stream).await
let target_state = core.latest_split_info.values().cloned().collect();
self.replace_stream_reader_with_target_state(source_desc, stream, target_state)
.await
}

async fn rebuild_stream_reader<const BIASED: bool>(
async fn replace_stream_reader_with_target_state<const BIASED: bool>(
&mut self,
source_desc: &SourceDesc,
stream: &mut StreamReaderWithPause<BIASED, StreamChunk>,
target_state: Vec<SplitImpl>,
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();
let target_state: Vec<SplitImpl> = core.latest_split_info.values().cloned().collect();

tracing::info!(
"actor {:?} apply source split change to {:?}",
self.actor_ctx.id,
Expand All @@ -306,21 +284,56 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(())
}

/// - `target_state`: the new split info from barrier. `None` if no split update.
/// - `should_trim_state`: whether to trim state for dropped splits.
///
/// For scaling, the connector splits can be migrated to other actors, but
/// won't be added or removed. Actors should not trim states for splits that
/// are moved to other actors.
///
/// For source split change, split will not be migrated and we can trim states
/// for deleted splits.
async fn persist_state_and_clear_cache(
&mut self,
epoch: EpochPair,
// target_state is Some means split change (or migration) happened.
target_state: Option<Vec<SplitImpl>>,
should_trim_state: bool,
) -> StreamExecutorResult<()> {
let core = self.stream_source_core.as_mut().unwrap();

let cache = core
let mut cache = core
.updated_splits_in_epoch
.values()
.map(|split_impl| split_impl.to_owned())
.collect_vec();

if let Some(target_splits) = target_state {
let target_split_ids: HashSet<_> =
target_splits.iter().map(|split| split.id()).collect();

cache.retain(|split| target_split_ids.contains(&split.id()));

let dropped_splits = core
.latest_split_info
.extract_if(|split_id, _| !target_split_ids.contains(split_id))
.map(|(_, split)| split)
.collect_vec();

if should_trim_state && !dropped_splits.is_empty() {
// trim dropped splits' state
core.split_state_store.trim_state(&dropped_splits).await?;
}

core.latest_split_info = target_splits
.into_iter()
.map(|split| (split.id(), split))
.collect();
}

if !cache.is_empty() {
tracing::debug!(actor_id = self.actor_ctx.id, state = ?cache, "take snapshot");
core.split_state_store.set_states(cache).await?;
core.split_state_store.set_states(cache).await?
}

// commit anyway, even if no message saved
Expand Down Expand Up @@ -458,6 +471,9 @@ impl<S: StateStore> SourceExecutor<S> {

let epoch = barrier.epoch;

let mut target_state = None;
let mut should_trim_state = false;

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Expand All @@ -469,29 +485,23 @@ impl<S: StateStore> SourceExecutor<S> {
"source change split received"
);

self.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
true,
)
.await?;
target_state = self
.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?;
should_trim_state = true;
}

Mutation::Update(UpdateMutation { actor_splits, .. }) => {
self.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
false,
)
.await?;
target_state = self
.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?;
}
_ => {}
}
}

self.persist_state_and_clear_cache(epoch).await?;
self.persist_state_and_clear_cache(epoch, target_state, should_trim_state)
.await?;

self.metrics
.source_row_per_barrier
Expand Down
11 changes: 8 additions & 3 deletions src/stream/src/executor/source/state_table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,14 @@ impl<S: StateStore> SourceStateTableHandler<S> {
where
SS: SplitMetaData,
{
for split_impl in states {
self.set(split_impl.id(), split_impl.encode_to_json())
.await?;
if states.is_empty() {
// TODO should be a clear Error Code
bail!("states require not null");
} else {
for split_impl in states {
self.set(split_impl.id(), split_impl.encode_to_json())
.await?;
}
}
Ok(())
}
Expand Down

0 comments on commit a4dbf2d

Please sign in to comment.