Skip to content

Commit

Permalink
Merge branch 'main' into pr/issues/667
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin authored Oct 25, 2024
2 parents c8c03e4 + 54f4fa7 commit af0fa75
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha @kaituo @YANG-DB @LantaoJin
* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha @kaituo @YANG-DB @noCharger @LantaoJin @ykmr1224
3 changes: 2 additions & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Chen Dai | [dai-chen](https://github.com/dai-chen) | Amazon |
| Vamsi Manohar | [vamsi-amazon](https://github.com/vamsi-amazon) | Amazon |
| Peng Huo | [penghuo](https://github.com/penghuo) | Amazon |
| Lior Perry | [YANG-DB](https://github.com/YANG-DB) | Amazon |
| Lior Perry | [YANG-DB](https://github.com/YANG-DB) | Amazon |
| Sean Kao | [seankao-az](https://github.com/seankao-az) | Amazon |
| Anirudha Jadhav | [anirudha](https://github.com/anirudha) | Amazon |
| Kaituo Li | [kaituo](https://github.com/kaituo) | Amazon |
| Louis Chu | [noCharger](https://github.com/noCharger) | Amazon |
| Lantao Jin | [LantaoJin](https://github.com/LantaoJin) | Amazon |
| Tomoyuki Morita | [ykmr1224](https://github.com/ykmr1224) | Amazon |
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val internalSchedulingService =
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
val externalSchedulingService =
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ object FlintSparkIndexOptions {
.externalSchedulerIntervalThreshold())
case (false, _, Some("external")) =>
throw new IllegalArgumentException(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")
case _ =>
updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
*/
class FlintSparkJobInternalSchedulingService(
spark: SparkSession,
flintSparkConf: FlintSparkConf,
flintIndexMonitor: FlintSparkIndexMonitor)
extends FlintSparkJobSchedulingService
with Logging {
Expand All @@ -55,12 +56,9 @@ class FlintSparkJobInternalSchedulingService(
index: FlintSparkIndex,
action: AsyncQuerySchedulerAction): Option[String] = {
val indexName = index.name()

action match {
case AsyncQuerySchedulerAction.SCHEDULE => None // No-op
case AsyncQuerySchedulerAction.UPDATE =>
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
startRefreshingJob(index)
case AsyncQuerySchedulerAction.UNSCHEDULE =>
logInfo("Stopping index state monitor")
Expand All @@ -81,7 +79,17 @@ class FlintSparkJobInternalSchedulingService(
private def startRefreshingJob(index: FlintSparkIndex): Option[String] = {
logInfo(s"Starting refreshing job for index ${index.name()}")
val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index)
indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava))
val jobId = indexRefresh.start(spark, flintSparkConf)

// NOTE: Resolution for previous concurrency issue
// This code addresses a previously identified concurrency issue with recoverIndex
// where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue
// was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully
// initialized. In this fixed version, we start the monitor after the streaming job has been
// initiated, ensuring that the job ID is available for detection.
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(index.name())
jobId
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService {
if (isExternalSchedulerEnabled(index)) {
new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf)
} else {
new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor)
new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite
None,
None,
Some(
"spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")),
"spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")),
(
"set external mode when interval above threshold and no mode specified",
true,
Expand Down

0 comments on commit af0fa75

Please sign in to comment.