Skip to content

Commit

Permalink
Refactor recovery, enhance error logging, adjust parallelism control …
Browse files Browse the repository at this point in the history
…in streaming processes.
  • Loading branch information
shanicky committed Jan 3, 2025
1 parent 79405dc commit 3bb430b
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 54 deletions.
66 changes: 32 additions & 34 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,38 @@ impl GlobalBarrierWorkerContextImpl {
.await
.context("clean dirty streaming jobs")?;

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);

let mut active_streaming_nodes =
ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
.await?;

// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
// FIXME: Transactions should be used.
// TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
let mut info = if !self.env.opts.disable_automatic_parallelism_control {
info!("trigger offline scaling");
self.scale_actors(&active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "scale actors failed");
})?;

self.resolve_graph_info(None).await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
info!("trigger actor migration");
// Migrate actors in expired CN to newly joined one.
self.migrate_actors(&mut active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "migrate actors failed");
})?
};

// Mview progress needs to be recovered.
tracing::info!("recovering mview progress");
let background_jobs = {
Expand Down Expand Up @@ -172,47 +204,13 @@ impl GlobalBarrierWorkerContextImpl {
};
tracing::info!("recovered mview progress");

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);

let mut active_streaming_nodes =
ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
.await?;

let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
info!(
"background streaming jobs: {:?} total {}",
background_streaming_jobs,
background_streaming_jobs.len()
);

// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
// FIXME: Transactions should be used.
// TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
let mut info = if !self.env.opts.disable_automatic_parallelism_control
&& background_streaming_jobs.is_empty()
{
info!("trigger offline scaling");
self.scale_actors(&active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "scale actors failed");
})?;

self.resolve_graph_info(None).await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
info!("trigger actor migration");
// Migrate actors in expired CN to newly joined one.
self.migrate_actors(&mut active_streaming_nodes)
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "migrate actors failed");
})?
};

if self.scheduled_barriers.pre_apply_drop_cancel(None) {
info = self.resolve_graph_info(None).await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
Expand Down
10 changes: 0 additions & 10 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,16 +402,6 @@ impl DdlController {
deferred = true;
}

if !deferred
&& !self
.metadata_manager
.list_background_creating_jobs()
.await?
.is_empty()
{
bail!("The system is creating jobs in the background, please try again later")
}

self.stream_manager
.alter_table_parallelism(table_id, parallelism.into(), deferred)
.await
Expand Down
47 changes: 37 additions & 10 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2265,6 +2265,24 @@ impl ScaleController {

Ok(())
}

pub async fn resolve_related_no_shuffle_jobs(
&self,
jobs: &[TableId],
) -> MetaResult<HashSet<TableId>> {
let RescheduleWorkingSet { related_jobs, .. } = self
.metadata_manager
.catalog_controller
.resolve_working_set_for_reschedule_tables(
jobs.iter().map(|id| id.table_id as _).collect(),
)
.await?;

Ok(related_jobs
.keys()
.map(|id| TableId::new(*id as _))
.collect())
}
}

/// At present, for table level scaling, we use the strategy `TableResizePolicy`.
Expand Down Expand Up @@ -2357,23 +2375,31 @@ impl GlobalStreamManager {
/// - `Ok(false)` if no jobs can be scaled;
/// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled.
async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
tracing::info!("trigger parallelism control");

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let background_streaming_jobs = self
.metadata_manager
.list_background_creating_jobs()
.await?;

if !background_streaming_jobs.is_empty() {
tracing::debug!(
"skipping parallelism control due to background jobs {:?}",
background_streaming_jobs
);
// skip if there are background creating jobs
return Ok(true);
}
let skipped_jobs = if !background_streaming_jobs.is_empty() {
let jobs = self
.scale_controller
.resolve_related_no_shuffle_jobs(&background_streaming_jobs)
.await?;

tracing::info!("trigger parallelism control");
tracing::info!(
"skipping parallelism control of background jobs {:?} and associated jobs {:?}",
background_streaming_jobs,
jobs
);

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
jobs
} else {
HashSet::new()
};

let table_parallelisms: HashMap<_, _> = {
let streaming_parallelisms = self
Expand All @@ -2384,6 +2410,7 @@ impl GlobalStreamManager {

streaming_parallelisms
.into_iter()
.filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _)))
.map(|(table_id, parallelism)| {
let table_parallelism = match parallelism {
StreamingParallelism::Adaptive => TableParallelism::Adaptive,
Expand Down
18 changes: 18 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,24 @@ impl GlobalStreamManager {
) -> MetaResult<()> {
let _reschedule_job_lock = self.reschedule_lock_write_guard().await;

let background_jobs = self
.metadata_manager
.list_background_creating_jobs()
.await?;

if !background_jobs.is_empty() {
let related_jobs = self
.scale_controller
.resolve_related_no_shuffle_jobs(&background_jobs)
.await?;

for job in background_jobs {
if related_jobs.contains(&job) {
bail!("Cannot alter the job {} because the related job {} is currently being created", table_id, job.table_id);
}
}
}

let database_id = DatabaseId::new(
self.metadata_manager
.catalog_controller
Expand Down

0 comments on commit 3bb430b

Please sign in to comment.