diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 431ff1199..ad02ac78a 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,2 +1,2 @@ # This should match the owning team set up in https://github.com/orgs/opensearch-project/teams -* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha @kaituo @YANG-DB @LantaoJin +* @joshuali925 @dai-chen @rupal-bq @mengweieric @vamsi-amazon @penghuo @seankao-az @anirudha @kaituo @YANG-DB @noCharger @LantaoJin @ykmr1224 diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 1a35fcc69..b7ead807d 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -12,9 +12,10 @@ This document contains a list of maintainers in this repo. See [opensearch-proje | Chen Dai | [dai-chen](https://github.com/dai-chen) | Amazon | | Vamsi Manohar | [vamsi-amazon](https://github.com/vamsi-amazon) | Amazon | | Peng Huo | [penghuo](https://github.com/penghuo) | Amazon | -| Lior Perry | [YANG-DB](https://github.com/YANG-DB) | Amazon | +| Lior Perry | [YANG-DB](https://github.com/YANG-DB) | Amazon | | Sean Kao | [seankao-az](https://github.com/seankao-az) | Amazon | | Anirudha Jadhav | [anirudha](https://github.com/anirudha) | Amazon | | Kaituo Li | [kaituo](https://github.com/kaituo) | Amazon | | Louis Chu | [noCharger](https://github.com/noCharger) | Amazon | | Lantao Jin | [LantaoJin](https://github.com/LantaoJin) | Amazon | +| Tomoyuki Morita | [ykmr1224](https://github.com/ykmr1224) | Amazon | diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 309877fdd..e5805731b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -566,7 +566,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val indexName = index.name val indexLogEntry = index.latestLogEntry.get val internalSchedulingService = - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) val externalSchedulingService = new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 4bfc50c55..fc1a611ef 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -257,7 +257,7 @@ object FlintSparkIndexOptions { .externalSchedulerIntervalThreshold()) case (false, _, Some("external")) => throw new IllegalArgumentException( - "spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode") + "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external") case _ => updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala index d22eff2c9..8928357c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf */ class FlintSparkJobInternalSchedulingService( spark: SparkSession, + flintSparkConf: FlintSparkConf, flintIndexMonitor: FlintSparkIndexMonitor) extends FlintSparkJobSchedulingService with Logging { @@ -55,12 +56,9 @@ class FlintSparkJobInternalSchedulingService( index: FlintSparkIndex, action: AsyncQuerySchedulerAction): Option[String] = { val indexName = index.name() - action match { case AsyncQuerySchedulerAction.SCHEDULE => None // No-op case AsyncQuerySchedulerAction.UPDATE => - logInfo("Scheduling index state monitor") - flintIndexMonitor.startMonitor(indexName) startRefreshingJob(index) case AsyncQuerySchedulerAction.UNSCHEDULE => logInfo("Stopping index state monitor") @@ -81,7 +79,17 @@ class FlintSparkJobInternalSchedulingService( private def startRefreshingJob(index: FlintSparkIndex): Option[String] = { logInfo(s"Starting refreshing job for index ${index.name()}") val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index) - indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + val jobId = indexRefresh.start(spark, flintSparkConf) + + // NOTE: Resolution for previous concurrency issue + // This code addresses a previously identified concurrency issue with recoverIndex + // where scheduled FlintSparkIndexMonitorTask couldn't detect the active Spark streaming job ID. The issue + // was caused by starting the FlintSparkIndexMonitor before the Spark streaming job was fully + // initialized. In this fixed version, we start the monitor after the streaming job has been + // initiated, ensuring that the job ID is available for detection. + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(index.name()) + jobId } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala index 6e25d8a8c..b813c7dd0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala @@ -70,7 +70,7 @@ object FlintSparkJobSchedulingService { if (isExternalSchedulerEnabled(index)) { new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) } else { - new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + new FlintSparkJobInternalSchedulingService(spark, flintSparkConf, flintIndexMonitor) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index 063c32074..4ff5b5adb 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -208,7 +208,7 @@ class FlintSparkIndexBuilderSuite None, None, Some( - "spark.flint.job.externalScheduler.enabled is false but refresh interval is set to external scheduler mode")), + "spark.flint.job.externalScheduler.enabled is false but scheduler_mode is set to external")), ( "set external mode when interval above threshold and no mode specified", true,