diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/scheduler/model/AsyncQuerySchedulerRequest.java b/flint-commons/src/main/scala/org/opensearch/flint/common/scheduler/model/AsyncQuerySchedulerRequest.java index a7f5ce023..dea0a9e62 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/scheduler/model/AsyncQuerySchedulerRequest.java +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/scheduler/model/AsyncQuerySchedulerRequest.java @@ -10,20 +10,67 @@ import java.time.Instant; -/** Represents a job request for a scheduled task. */ +/** + * Represents a job request for a scheduled task. + */ @Builder @Data public class AsyncQuerySchedulerRequest { - protected String accountId; - // Scheduler jobid is the opensearch index name until we support multiple jobs per index - protected String jobId; - protected String dataSource; - protected String scheduledQuery; - protected String queryLang; - protected Object schedule; - protected boolean enabled; - protected Instant lastUpdateTime; - protected Instant enabledTime; - protected Long lockDurationSeconds; - protected Double jitter; -} + + /** + * The AWS accountId used to identify the resource. + */ + String accountId; + + /** + * The unique identifier for the scheduler job. + */ + String jobId; + + /** + * The name of the data source on which the scheduled query will be executed. + */ + String dataSource; + + /** + * The scheduled query to be executed. + */ + String scheduledQuery; + + /** + * The language in which the query is written, such as SQL, PPL (Piped Processing Language), etc. + */ + String queryLang; + + /** + * The interval expression defining the frequency of the job execution. + * Typically expressed as a time-based pattern (e.g. 5 minutes). + */ + String interval; + + /** + * Indicates whether the scheduled job is currently enabled or not. + */ + boolean enabled; + + /** + * The timestamp of the last update made to this job. + */ + Instant lastUpdateTime; + + /** + * The timestamp when this job was enabled. + */ + Instant enabledTime; + + /** + * The duration, in seconds, for which the job remains locked. + * This lock is used to prevent concurrent executions of the same job, ensuring that only one instance of the job runs at a time. + */ + Long lockDurationSeconds; + + /** + * The jitter value to add randomness to the execution schedule, helping to avoid the thundering herd problem. + */ + Double jitter; +} \ No newline at end of file diff --git a/flint-core/src/main/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParser.java b/flint-core/src/main/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParser.java deleted file mode 100644 index 13d8db131..000000000 --- a/flint-core/src/main/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParser.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.scheduler.util; - -import org.apache.spark.sql.execution.streaming.Triggers; -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; -import org.opensearch.jobscheduler.spi.schedule.Schedule; - -import java.time.Instant; -import java.time.temporal.ChronoUnit; - -public class IntervalSchedulerParser { - - public static Schedule parse(Object schedule) { - if (schedule == null) { - return null; - } - - if (schedule instanceof Schedule) { - return (Schedule) schedule; - } - - if (schedule instanceof scala.Option) { - scala.Option option = (scala.Option) schedule; - if (option.isDefined()) { - Object value = option.get(); - if (value instanceof String) { - return parseStringSchedule((String) value); - } - } - return null; - } - - if (schedule instanceof String) { - return parseStringSchedule((String) schedule); - } - - throw new IllegalArgumentException("Schedule must be a String, Option[String], or Schedule object for parsing."); - } - - public static IntervalSchedule parseStringSchedule(String scheduleStr) { - Long millis = Triggers.convert(scheduleStr); - - // Convert milliseconds to minutes (rounding down) - int minutes = (int) (millis / (60 * 1000)); - - // Use the current time as the start time - Instant startTime = Instant.now(); - - return new IntervalSchedule(startTime, minutes, ChronoUnit.MINUTES); - } -} \ No newline at end of file diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index cce07957e..e505cf45d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -178,8 +178,13 @@ public int getSocketTimeoutMillis() { public String getDataSourceName() { return options.getOrDefault(DATA_SOURCE_NAME, ""); } - - public String getClientId() { + + /** + * Get the AWS accountId from the cluster name. + * Flint cluster name is in the format of "accountId:clusterName". + * @return the AWS accountId + */ + public String getAWSAccountId() { String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", ""); String[] parts = clusterName.split(":"); return parts.length == 2 ? parts[0] : ""; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 975421e60..3f8076b63 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark -import java.time.Instant - import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} @@ -16,19 +14,17 @@ import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, Optimi import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.common.scheduler.AsyncQueryScheduler -import org.opensearch.flint.common.scheduler.model.{AsyncQuerySchedulerRequest, LangType} import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder -import org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilder -import org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ -import org.opensearch.flint.spark.scheduler.util.RefreshQueryGenerator +import org.opensearch.flint.spark.scheduler.{AsyncQuerySchedulerBuilder, FlintSparkJobSchedulingService} +import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy @@ -120,6 +116,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w } } else { val metadata = index.metadata() + val jobSchedulingService = FlintSparkJobSchedulingService.create( + index, + spark, + flintAsyncQueryScheduler, + flintSparkConf, + flintIndexMonitor) tx .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) @@ -134,9 +136,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w flintIndexMetadataService .updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id))) } - if (isExternalSchedulerEnabled(index)) { - handleAsyncQueryScheduler(index, AsyncQuerySchedulerAction.SCHEDULE) - } + jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE) }) } } @@ -254,21 +254,20 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w withTransaction[Boolean](indexName, "Delete Flint index") { tx => if (flintClient.exists(indexName)) { val index = describeIndex(indexName) + val jobSchedulingService = FlintSparkJobSchedulingService.create( + index.get, + spark, + flintAsyncQueryScheduler, + flintSparkConf, + flintIndexMonitor) tx .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED) .transientLog(latest => latest.copy(state = DELETING)) .finalLog(latest => latest.copy(state = DELETED)) .commit(_ => { - if (isExternalSchedulerEnabled(index.get)) { - handleAsyncQueryScheduler(index.get, AsyncQuerySchedulerAction.UNSCHEDULE) - true - } else { - // TODO: share same transaction for now - flintIndexMonitor.stopMonitor(indexName) - stopRefreshingJob(indexName) - true - } + jobSchedulingService.handleJob(index.get, AsyncQuerySchedulerAction.UNSCHEDULE) + true }) } else { logInfo("Flint index to be deleted doesn't exist") @@ -289,14 +288,18 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w if (flintClient.exists(indexName)) { val index = describeIndex(indexName) val options = index.get.options + val jobSchedulingService = FlintSparkJobSchedulingService.create( + index.get, + spark, + flintAsyncQueryScheduler, + flintSparkConf, + flintIndexMonitor) tx .initialLog(latest => latest.state == DELETED) .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { - if (isExternalSchedulerEnabled(index.get)) { - handleAsyncQueryScheduler(index.get, AsyncQuerySchedulerAction.REMOVE) - } + jobSchedulingService.handleJob(index.get, AsyncQuerySchedulerAction.REMOVE) flintClient.deleteIndex(indexName) flintIndexMetadataService.deleteIndexMetadata(indexName) @@ -463,6 +466,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w tx: OptimisticTransaction[Option[String]]): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get + val jobSchedulingService = FlintSparkJobSchedulingService.create( + index, + spark, + flintAsyncQueryScheduler, + flintSparkConf, + flintIndexMonitor) tx .initialLog(latest => latest.state == REFRESHING && latest.entryVersion == indexLogEntry.entryVersion) @@ -471,14 +480,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .commit(_ => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) logInfo("Update index options complete") - if (isExternalSchedulerEnabled(index)) { - handleAsyncQueryScheduler(index, AsyncQuerySchedulerAction.UNSCHEDULE) - None - } else { - flintIndexMonitor.stopMonitor(indexName) - stopRefreshingJob(indexName) - None - } + jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UNSCHEDULE) + None }) } @@ -487,7 +490,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w tx: OptimisticTransaction[Option[String]]): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get - val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) + val jobSchedulingService = FlintSparkJobSchedulingService.create( + index, + spark, + flintAsyncQueryScheduler, + flintSparkConf, + flintIndexMonitor) tx .initialLog(latest => latest.state == ACTIVE && latest.entryVersion == indexLogEntry.entryVersion) @@ -501,56 +509,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .commit(_ => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) logInfo("Update index options complete") - if (isExternalSchedulerEnabled(index)) { - handleAsyncQueryScheduler(index, AsyncQuerySchedulerAction.UPDATE) - None - } else { - indexRefresh.start(spark, flintSparkConf) - } + jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE) + None }) } - - private def handleAsyncQueryScheduler( - index: FlintSparkIndex, - action: AsyncQuerySchedulerAction): Unit = { - val dataSource = flintSparkConf.flintOptions().getDataSourceName() - val clientId = flintSparkConf.flintOptions().getClientId() - val indexName = index.name() - - logInfo(s"handleAsyncQueryScheduler invoked: $action") - - val baseRequest = AsyncQuerySchedulerRequest - .builder() - .accountId(clientId) - .jobId(indexName) - .dataSource(dataSource) - - val request = action match { - case AsyncQuerySchedulerAction.SCHEDULE | AsyncQuerySchedulerAction.UPDATE => - val currentTime = Instant.now() - baseRequest - .scheduledQuery(RefreshQueryGenerator.generateRefreshQuery(index)) - .queryLang(LangType.SQL) - .schedule(index.options.refreshInterval()) - .enabled(true) - .enabledTime(currentTime) - .lastUpdateTime(currentTime) - .build() - case _ => baseRequest.build() - } - - action match { - case AsyncQuerySchedulerAction.SCHEDULE => flintAsyncQueryScheduler.scheduleJob(request) - case AsyncQuerySchedulerAction.UPDATE => flintAsyncQueryScheduler.updateJob(request) - case AsyncQuerySchedulerAction.UNSCHEDULE => flintAsyncQueryScheduler.unscheduleJob(request) - case AsyncQuerySchedulerAction.REMOVE => flintAsyncQueryScheduler.removeJob(request) - case _ => throw new IllegalArgumentException(s"Unsupported action: $action") - } - } - - private def isExternalSchedulerEnabled(index: FlintSparkIndex): Boolean = { - val autoRefresh = index.options.autoRefresh() - val schedulerModeExternal = index.options.isExternalSchedulerEnabled() - autoRefresh && schedulerModeExternal - } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 72767d591..984e3cd13 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -9,12 +9,11 @@ import java.util.Collections import scala.collection.JavaConverters.mapAsJavaMapConverter -import org.opensearch.flint.core.scheduler.util.IntervalSchedulerParser.parseStringSchedule -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.SCHEDULER_MODE +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{CHECKPOINT_LOCATION, REFRESH_INTERVAL, SCHEDULER_MODE} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode +import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser import org.apache.spark.sql.catalog.Column import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -159,7 +158,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { } /** - * Updates the options with a default checkpoint location if not already set. + * Updates the options with a default values for Create and Alter index. * * @param indexName * The index name string @@ -176,25 +175,36 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { val updatedOptions = new scala.collection.mutable.HashMap[String, String]() ++= options.options - options.checkpointLocation(indexName, flintSparkConf) match { - case Some(location) => updatedOptions += (CHECKPOINT_LOCATION.toString -> location) - case None => // Do nothing + // Add checkpoint location if not present + options.checkpointLocation(indexName, flintSparkConf).foreach { location => + updatedOptions += (CHECKPOINT_LOCATION.toString -> location) } - // Update scheduler mode by default - if (options.autoRefresh() && !updatedOptions.contains(SCHEDULER_MODE.toString)) { - val externalSchedulerEnabled = flintSparkConf.isExternalSchedulerEnabled - // If refresh interval is not set, or it is set but the interval is smaller than the threshold, use spark internal scheduler - val shouldUseExternalScheduler = options.refreshInterval().exists { interval => - parseStringSchedule( - flintSparkConf.externalSchedulerIntervalThreshold()).getInterval >= parseStringSchedule( - interval).getInterval - } - if (externalSchedulerEnabled && shouldUseExternalScheduler) { + // Update scheduler mode and refresh interval only if auto refresh is enabled + if (!options.autoRefresh()) { + return FlintSparkIndexOptions(updatedOptions.toMap) + } + + val externalSchedulerEnabled = flintSparkConf.isExternalSchedulerEnabled + val thresholdInterval = + IntervalSchedulerParser.parse(flintSparkConf.externalSchedulerIntervalThreshold()) + val currentInterval = options.refreshInterval().map(IntervalSchedulerParser.parse) + + ( + externalSchedulerEnabled, + currentInterval, + updatedOptions.get(SCHEDULER_MODE.toString)) match { + case (true, Some(interval), _) if interval.getInterval >= thresholdInterval.getInterval => + updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.EXTERNAL.toString) + case (true, None, Some("external")) => + updatedOptions += (REFRESH_INTERVAL.toString -> flintSparkConf + .externalSchedulerIntervalThreshold()) + case (true, None, None) => updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.EXTERNAL.toString) - } else { + updatedOptions += (REFRESH_INTERVAL.toString -> flintSparkConf + .externalSchedulerIntervalThreshold()) + case _ => updatedOptions += (SCHEDULER_MODE.toString -> SchedulerMode.INTERNAL.toString) - } } FlintSparkIndexOptions(updatedOptions.toMap) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index ad9bc7acf..d343fd999 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -10,7 +10,6 @@ import java.util.Collections import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} -import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE @@ -109,9 +108,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory) .addRefreshInterval(options.refreshInterval()) .addAvailableNowTrigger( - options - .isExternalSchedulerEnabled() || options - .incrementalRefresh()) + options.isExternalSchedulerEnabled() || options.incrementalRefresh()) .addOutputMode(options.outputMode()) .options(options.extraSinkOptions()) } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilder.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java similarity index 90% rename from flint-core/src/main/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilder.java rename to flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java index 94fcd7f4d..9865081c8 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilder.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.scheduler; +package org.opensearch.flint.spark.scheduler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,7 +30,7 @@ public enum AsyncQuerySchedulerAction { public static AsyncQueryScheduler build(FlintOptions options) { String className = options.getCustomAsyncQuerySchedulerClass(); - logger.info("className: {}", className); + logger.info("Attempting to instantiate AsyncQueryScheduler with class name: {}", className); if (className.isEmpty()) { return new OpenSearchAsyncQueryScheduler(options); diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala new file mode 100644 index 000000000..26456c7a4 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.scheduler + +import java.time.Instant + +import org.opensearch.flint.common.scheduler.AsyncQueryScheduler +import org.opensearch.flint.common.scheduler.model.{AsyncQuerySchedulerRequest, LangType} +import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction +import org.opensearch.flint.spark.scheduler.util.RefreshQueryGenerator + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.flint.config.FlintSparkConf + +/** + * External scheduling service for Flint Spark jobs. + * + * This class implements the FlintSparkJobSchedulingService interface and provides functionality + * to handle job scheduling, updating, unscheduling, and removal using an external + * AsyncQueryScheduler. + * + * @param flintAsyncQueryScheduler + * The AsyncQueryScheduler used for job management + * @param flintSparkConf + * The Flint Spark configuration + */ +class FlintSparkJobExternalSchedulingService( + flintAsyncQueryScheduler: AsyncQueryScheduler, + flintSparkConf: FlintSparkConf) + extends FlintSparkJobSchedulingService + with Logging { + + override def handleJob(index: FlintSparkIndex, action: AsyncQuerySchedulerAction): Unit = { + val dataSource = flintSparkConf.flintOptions().getDataSourceName() + val clientId = flintSparkConf.flintOptions().getAWSAccountId() + val indexName = index.name() + + logInfo(s"handleAsyncQueryScheduler invoked: $action") + + val baseRequest = AsyncQuerySchedulerRequest + .builder() + .accountId(clientId) + .jobId(indexName) + .dataSource(dataSource) + + val request = action match { + case AsyncQuerySchedulerAction.SCHEDULE | AsyncQuerySchedulerAction.UPDATE => + val currentTime = Instant.now() + baseRequest + .scheduledQuery(RefreshQueryGenerator.generateRefreshQuery(index)) + .queryLang(LangType.SQL) + .interval(index.options.refreshInterval().get) + .enabled(true) + .enabledTime(currentTime) + .lastUpdateTime(currentTime) + .build() + case _ => baseRequest.build() + } + + action match { + case AsyncQuerySchedulerAction.SCHEDULE => flintAsyncQueryScheduler.scheduleJob(request) + case AsyncQuerySchedulerAction.UPDATE => flintAsyncQueryScheduler.updateJob(request) + case AsyncQuerySchedulerAction.UNSCHEDULE => flintAsyncQueryScheduler.unscheduleJob(request) + case AsyncQuerySchedulerAction.REMOVE => flintAsyncQueryScheduler.removeJob(request) + case _ => throw new IllegalArgumentException(s"Unsupported action: $action") + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala new file mode 100644 index 000000000..6ae6726fa --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobInternalSchedulingService.scala @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.scheduler + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexMonitor} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh +import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.flint.config.FlintSparkConf + +/** + * Internal scheduling service for Flint Spark jobs. + * + * This class implements the FlintSparkJobSchedulingService interface and provides functionality + * to handle job scheduling, updating, and unscheduling using internal Spark mechanisms and + * FlintSparkIndexMonitor. + * + * @param spark + * The SparkSession + * @param flintIndexMonitor + * The FlintSparkIndexMonitor used for index monitoring + */ +class FlintSparkJobInternalSchedulingService( + spark: SparkSession, + flintIndexMonitor: FlintSparkIndexMonitor) + extends FlintSparkJobSchedulingService + with Logging { + + /** + * Handles job-related actions for a given Flint Spark index. + * + * This method processes different actions (schedule, update, unschedule) for a Flint Spark + * index using internal scheduling mechanisms. + * + * @param index + * The FlintSparkIndex to be processed + * @param action + * The AsyncQuerySchedulerAction to be performed + */ + override def handleJob(index: FlintSparkIndex, action: AsyncQuerySchedulerAction): Unit = { + val indexName = index.name() + + action match { + case AsyncQuerySchedulerAction.SCHEDULE => + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(indexName) + case AsyncQuerySchedulerAction.UPDATE => + logInfo("Updating index state monitor") + flintIndexMonitor.startMonitor(indexName) + startRefreshingJob(index) + case AsyncQuerySchedulerAction.UNSCHEDULE => + logInfo("Stopping index state monitor") + flintIndexMonitor.stopMonitor(indexName) + stopRefreshingJob(indexName) + case AsyncQuerySchedulerAction.REMOVE => // No-op + case _ => throw new IllegalArgumentException(s"Unsupported action: $action") + } + } + + /** + * Starts a refreshing job for the given Flint Spark index. + * + * @param index + * The FlintSparkIndex for which to start the refreshing job + */ + private def startRefreshingJob(index: FlintSparkIndex): Unit = { + logInfo(s"Starting refreshing job for index ${index.name()}") + val indexRefresh = FlintSparkIndexRefresh.create(index.name(), index) + indexRefresh.start(spark, new FlintSparkConf(spark.conf.getAll.toMap.asJava)) + } + + /** + * Stops the refreshing job for the given index name. + * + * @param indexName + * The name of the index for which to stop the refreshing job + */ + private def stopRefreshingJob(indexName: String): Unit = { + logInfo(s"Terminating refreshing job $indexName") + val job = spark.streams.active.find(_.name == indexName) + if (job.isDefined) { + job.get.stop() + } else { + logWarning("Refreshing job not found") + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala new file mode 100644 index 000000000..294b0fcb2 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.scheduler + +import org.opensearch.flint.common.scheduler.AsyncQueryScheduler +import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexMonitor} +import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.flint.config.FlintSparkConf + +/** + * Trait defining the interface for Flint Spark job scheduling services. + */ +trait FlintSparkJobSchedulingService { + + /** + * Handles a job action for a given Flint Spark index. + * + * @param index + * The FlintSparkIndex to be processed + * @param action + * The AsyncQuerySchedulerAction to be performed + */ + def handleJob(index: FlintSparkIndex, action: AsyncQuerySchedulerAction): Unit + + /** + * Checks if the external scheduler is enabled for a given Flint Spark index. + * + * @param index + * The FlintSparkIndex to check + * @return + * true if external scheduler is enabled, false otherwise + */ + def isExternalSchedulerEnabled(index: FlintSparkIndex): Boolean = { + val autoRefresh = index.options.autoRefresh() + val schedulerModeExternal = index.options.isExternalSchedulerEnabled() + autoRefresh && schedulerModeExternal + } +} + +/** + * Companion object for FlintSparkJobSchedulingService. Provides a factory method to create + * appropriate scheduling service instances. + */ +object FlintSparkJobSchedulingService { + + /** + * Creates a FlintSparkJobSchedulingService instance based on the index configuration. + * + * @param index + * The FlintSparkIndex for which the service is created + * @param spark + * The SparkSession + * @param flintAsyncQueryScheduler + * The AsyncQueryScheduler + * @param flintSparkConf + * The FlintSparkConf configuration + * @param flintIndexMonitor + * The FlintSparkIndexMonitor + * @return + * An instance of FlintSparkJobSchedulingService + */ + def create( + index: FlintSparkIndex, + spark: SparkSession, + flintAsyncQueryScheduler: AsyncQueryScheduler, + flintSparkConf: FlintSparkConf, + flintIndexMonitor: FlintSparkIndexMonitor): FlintSparkJobSchedulingService = { + if (index.options.isExternalSchedulerEnabled()) { + new FlintSparkJobExternalSchedulingService(flintAsyncQueryScheduler, flintSparkConf) + } else { + new FlintSparkJobInternalSchedulingService(spark, flintIndexMonitor) + } + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/scheduler/OpenSearchAsyncQueryScheduler.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java similarity index 96% rename from flint-core/src/main/java/org/opensearch/flint/core/scheduler/OpenSearchAsyncQueryScheduler.java rename to flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java index eb2370848..19532254b 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -3,13 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.core.scheduler; +package org.opensearch.flint.spark.scheduler; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; -import lombok.SneakyThrows; import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,9 +31,8 @@ import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; -import org.opensearch.flint.core.scheduler.util.IntervalSchedulerParser; +import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser; import org.opensearch.flint.core.storage.OpenSearchClientUtils; -import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.rest.RestStatus; @@ -43,6 +41,11 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; +/** + * OpenSearch implementation of the AsyncQueryScheduler interface. + * This class manages the scheduling, unscheduling, updating, and removal of asynchronous query jobs + * using OpenSearch as the backend storage and scheduling mechanism. + */ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler { public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = "async-query-scheduler-index-mapping.yml"; @@ -248,8 +251,8 @@ private static String serializeRequest(AsyncQuerySchedulerRequest request) { json.put("jitter", request.getJitter()); } - if (request.getSchedule() != null) { - Schedule parsedSchedule = IntervalSchedulerParser.parse(request.getSchedule()); + if (request.getInterval() != null) { + Schedule parsedSchedule = IntervalSchedulerParser.parse(request.getInterval()); json.set("schedule", serializeSchedule(parsedSchedule)); } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java new file mode 100644 index 000000000..8745681b9 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.scheduler.util; + +import org.apache.parquet.Strings; +import org.apache.spark.sql.execution.streaming.Triggers; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +/** + * Utility class for parsing interval schedules. + */ +public class IntervalSchedulerParser { + + /** + * Parses a schedule string into an IntervalSchedule. + * + * @param scheduleStr the schedule string to parse + * @return the parsed IntervalSchedule + * @throws IllegalArgumentException if the schedule string is invalid + */ + public static IntervalSchedule parse(String scheduleStr) { + if (Strings.isNullOrEmpty(scheduleStr)) { + throw new IllegalArgumentException("Schedule string must not be null or empty."); + } + + Long millis = Triggers.convert(scheduleStr); + + // Convert milliseconds to minutes (rounding down) + int minutes = (int) (millis / (60 * 1000)); + + // Use the current time as the start time + Instant startTime = Instant.now(); + + return new IntervalSchedule(startTime, minutes, ChronoUnit.MINUTES); + } +} \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGenerator.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGenerator.scala index 429964455..510e0b9d5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGenerator.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGenerator.scala @@ -27,9 +27,9 @@ object RefreshQueryGenerator { case skippingIndex: FlintSparkSkippingIndex => s"REFRESH SKIPPING INDEX ON ${skippingIndex.tableName}" case coveringIndex: FlintSparkCoveringIndex => - s"REFRESH INDEX ${index.name()} ON ${coveringIndex.tableName}" - case _: FlintSparkMaterializedView => - s"REFRESH MATERIALIZED VIEW ${index.name()}" + s"REFRESH INDEX ${coveringIndex.indexName} ON ${coveringIndex.tableName}" + case materializedView: FlintSparkMaterializedView => + s"REFRESH MATERIALIZED VIEW ${materializedView.mvName}" case _ => throw new IllegalArgumentException( s"Unsupported index type: ${index.getClass.getSimpleName}") diff --git a/flint-core/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java similarity index 94% rename from flint-core/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java rename to flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java index e9cf74ab6..67b5afee5 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java +++ b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java @@ -9,6 +9,8 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler; import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder; +import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; diff --git a/flint-core/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java similarity index 54% rename from flint-core/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java rename to flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java index 409f41ddc..2ad1fea9c 100644 --- a/flint-core/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java +++ b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java @@ -6,6 +6,7 @@ package org.opensearch.flint.core.scheduler.util; import org.junit.Test; +import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.schedule.Schedule; @@ -13,40 +14,18 @@ import java.time.temporal.ChronoUnit; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; - public class IntervalSchedulerParserTest { - @Test + @Test(expected = IllegalArgumentException.class) public void testParseNull() { - assertNull(IntervalSchedulerParser.parse(null)); - } - - @Test - public void testParseScheduleInstance() { - Schedule schedule = new IntervalSchedule(Instant.now(), 5, ChronoUnit.MINUTES); - assertEquals(schedule, IntervalSchedulerParser.parse(schedule)); - } - - @Test - public void testParseScalaOptionWithString() { - scala.Option option = scala.Option.apply("5 minutes"); - Schedule result = IntervalSchedulerParser.parse(option); - assertTrue(result instanceof IntervalSchedule); + IntervalSchedulerParser.parse(null); } - @Test - public void testParseScalaOptionEmpty() { - scala.Option option = scala.Option.empty(); - assertNull(IntervalSchedulerParser.parse(option)); - } - - @Test - public void testParseScalaOptionWithNonString() { - scala.Option option = scala.Option.apply(5); - assertNull(IntervalSchedulerParser.parse(option)); + @Test(expected = IllegalArgumentException.class) + public void testParseEmptyString() { + IntervalSchedulerParser.parse(""); } @Test @@ -59,40 +38,35 @@ public void testParseString() { } @Test(expected = IllegalArgumentException.class) - public void testParseInvalidObject() { - IntervalSchedulerParser.parse(new Object()); + public void testParseInvalidFormat() { + IntervalSchedulerParser.parse("invalid format"); } @Test public void testParseStringScheduleMinutes() { - IntervalSchedule result = IntervalSchedulerParser.parseStringSchedule("5 minutes"); + IntervalSchedule result = IntervalSchedulerParser.parse("5 minutes"); assertEquals(5, result.getInterval()); assertEquals(ChronoUnit.MINUTES, result.getUnit()); } @Test public void testParseStringScheduleHours() { - IntervalSchedule result = IntervalSchedulerParser.parseStringSchedule("2 hours"); + IntervalSchedule result = IntervalSchedulerParser.parse("2 hours"); assertEquals(120, result.getInterval()); assertEquals(ChronoUnit.MINUTES, result.getUnit()); } @Test public void testParseStringScheduleDays() { - IntervalSchedule result = IntervalSchedulerParser.parseStringSchedule("1 day"); + IntervalSchedule result = IntervalSchedulerParser.parse("1 day"); assertEquals(1440, result.getInterval()); assertEquals(ChronoUnit.MINUTES, result.getUnit()); } - @Test(expected = IllegalArgumentException.class) - public void testParseStringScheduleInvalidFormat() { - IntervalSchedulerParser.parseStringSchedule("invalid format"); - } - @Test public void testParseStringScheduleStartTime() { Instant before = Instant.now(); - IntervalSchedule result = IntervalSchedulerParser.parseStringSchedule("30 minutes"); + IntervalSchedule result = IntervalSchedulerParser.parse("30 minutes"); Instant after = Instant.now(); assertTrue(result.getStartTime().isAfter(before) || result.getStartTime().equals(before)); diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGeneratorSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGeneratorSuite.scala index 72082ef01..1cd83c38d 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGeneratorSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/scheduler/util/RefreshQueryGeneratorSuite.scala @@ -26,7 +26,7 @@ class RefreshQueryGeneratorTest extends SparkFunSuite with Matchers { test("generateRefreshQuery should return correct query for FlintSparkCoveringIndex") { val mockIndex = mock(classOf[FlintSparkCoveringIndex]) - when(mockIndex.name()).thenReturn("testIndex") + when(mockIndex.indexName).thenReturn("testIndex") when(mockIndex.tableName).thenReturn("testTable") val result = RefreshQueryGenerator.generateRefreshQuery(mockIndex) @@ -35,7 +35,7 @@ class RefreshQueryGeneratorTest extends SparkFunSuite with Matchers { test("generateRefreshQuery should return correct query for FlintSparkMaterializedView") { val mockIndex = mock(classOf[FlintSparkMaterializedView]) - when(mockIndex.name()).thenReturn("testMV") + when(mockIndex.mvName).thenReturn("testMV") val result = RefreshQueryGenerator.generateRefreshQuery(mockIndex) result shouldBe "REFRESH MATERIALIZED VIEW testMV" diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/scheduler/OpenSearchAsyncQuerySchedulerITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/scheduler/OpenSearchAsyncQuerySchedulerITSuite.scala index f7baceb64..5731b8dfc 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/scheduler/OpenSearchAsyncQuerySchedulerITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/scheduler/OpenSearchAsyncQuerySchedulerITSuite.scala @@ -18,6 +18,7 @@ import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.opensearch.flint.spark.FlintSparkSuite +import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler import org.scalatest.matchers.should.Matchers class OpenSearchAsyncQuerySchedulerITSuite extends FlintSparkSuite with Matchers { @@ -49,7 +50,7 @@ class OpenSearchAsyncQuerySchedulerITSuite extends FlintSparkSuite with Matchers // Update job with new query and schedule val updatedRequest = - AsyncQuerySchedulerRequest.builder().jobId(testJobId).schedule("5 minutes").build() + AsyncQuerySchedulerRequest.builder().jobId(testJobId).interval("5 minutes").build() scheduler.updateJob(updatedRequest) val (_, _, _, updatedSchedule) = verifyJob(testJobId) updatedSchedule shouldBe "5 minutes" @@ -73,7 +74,7 @@ class OpenSearchAsyncQuerySchedulerITSuite extends FlintSparkSuite with Matchers AsyncQuerySchedulerRequest .builder() .jobId(testJobId) - .schedule("2 minutes") + .interval("2 minutes") .lastUpdateTime(Instant.now()) .build() assertThrows[IllegalArgumentException] { @@ -121,7 +122,7 @@ class OpenSearchAsyncQuerySchedulerITSuite extends FlintSparkSuite with Matchers .dataSource("test_datasource") .scheduledQuery("SELECT * FROM test_table") .queryLang("SQL") - .schedule("1 minutes") + .interval("1 minutes") .enabled(true) .lastUpdateTime(Instant.now()) .build() diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index c94e38296..aac06a2c1 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -19,6 +19,7 @@ import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, OPTIMIZER_RULE_COVERING_INDEX_ENABLED} class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { @@ -145,6 +146,11 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { test("create covering index with external scheduler") { withTempDir { checkpointDir => + setFlintSparkConf( + FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS, + "org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerForSqlIT") + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, true) + sql(s""" | CREATE INDEX $testIndex ON $testTable | (name, age) @@ -160,8 +166,8 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe defined indexData.count() shouldBe 0 - // Refresh all present source data as of now - sql(s"REFRESH INDEX $testIndex ON $testTable") + // query index after 25 sec + Thread.sleep(25000) flint.queryIndex(testFlintIndex).count() shouldBe 2 // New data won't be refreshed until refresh statement triggered @@ -172,9 +178,10 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { |""".stripMargin) flint.queryIndex(testFlintIndex).count() shouldBe 2 - // New data is refreshed incrementally - sql(s"REFRESH INDEX $testIndex ON $testTable") - flint.queryIndex(testFlintIndex).count() shouldBe 3 + // Drop index with test scheduler + sql(s"DROP INDEX $testIndex ON $testTable") + conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) + conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 8bc622f53..f569bf123 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { @@ -84,6 +85,11 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("create materialized view with auto refresh and external scheduler") { withTempDir { checkpointDir => + setFlintSparkConf( + FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS, + "org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerForSqlIT") + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, true) + sql(s""" | CREATE MATERIALIZED VIEW $testMvName | AS $testQuery @@ -100,8 +106,8 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe defined indexData.count() shouldBe 0 - // Refresh all present source data as of now - sql(s"REFRESH MATERIALIZED VIEW $testMvName") + // query index after 25 sec + Thread.sleep(25000) flint.queryIndex(testFlintIndex).count() shouldBe 3 // New data won't be refreshed until refresh statement triggered @@ -111,9 +117,10 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | """.stripMargin) flint.queryIndex(testFlintIndex).count() shouldBe 3 - // New data is refreshed incrementally - sql(s"REFRESH MATERIALIZED VIEW $testMvName") - flint.queryIndex(testFlintIndex).count() shouldBe 4 + // Drop index with test scheduler + sql(s"DROP MATERIALIZED VIEW $testMvName") + conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) + conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 828470530..2e5b4d2ab 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -20,6 +20,7 @@ import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.{ExplainSuiteHelper, Row} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuiteHelper { @@ -64,6 +65,11 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit test("create skipping index with auto refresh and external scheduler") { withTempDir { checkpointDir => + setFlintSparkConf( + FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS, + "org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerForSqlIT") + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, true) + sql(s""" | CREATE SKIPPING INDEX ON $testTable | ( year PARTITION ) @@ -79,8 +85,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit flint.describeIndex(testIndex) shouldBe defined indexData.count() shouldBe 0 - // Refresh all present source data as of now - sql(s"REFRESH SKIPPING INDEX ON $testTable") + // query index after 25 sec + Thread.sleep(25000) flint.queryIndex(testIndex).count() shouldBe 2 // New data won't be refreshed until refresh statement triggered @@ -91,8 +97,9 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit |""".stripMargin) flint.queryIndex(testIndex).count() shouldBe 2 - sql(s"REFRESH SKIPPING INDEX ON $testTable") - flint.queryIndex(testIndex).count() shouldBe 3 + sql(s"DROP SKIPPING INDEX ON $testTable") + conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) + conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerForSqlIT.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerForSqlIT.scala new file mode 100644 index 000000000..c330fdd27 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerForSqlIT.scala @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.scheduler + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import org.opensearch.flint.common.scheduler.AsyncQueryScheduler +import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +/** + * An implementation of AsyncQueryScheduler for Spark SQL integration testing. + * + * This scheduler uses a dedicated single-threaded ScheduledExecutorService to run asynchronous + * SQL queries. It's designed to schedule jobs with a short delay for testing purposes, avoiding + * the use of the global ExecutionContext to prevent resource conflicts in a Spark environment. + */ +class AsyncQuerySchedulerForSqlIT extends AsyncQueryScheduler with AutoCloseable with Logging { + + lazy val spark = SparkSession.builder().getOrCreate() + + private val executorService: ScheduledExecutorService = + Executors.newSingleThreadScheduledExecutor() + + override def scheduleJob(asyncQuerySchedulerRequest: AsyncQuerySchedulerRequest): Unit = { + // Schedule the job to run after 100ms + executorService.schedule( + new Runnable { + override def run(): Unit = { + logInfo(s"scheduleJob starting...${asyncQuerySchedulerRequest.getScheduledQuery}") + spark.sql(asyncQuerySchedulerRequest.getScheduledQuery) + logInfo("scheduleJob complete") + } + }, + 100, + TimeUnit.MILLISECONDS) + + logInfo("scheduleJob method returned") + } + + override def updateJob(asyncQuerySchedulerRequest: AsyncQuerySchedulerRequest): Unit = { + logInfo("updateJob method returned") + } + + override def unscheduleJob(asyncQuerySchedulerRequest: AsyncQuerySchedulerRequest): Unit = { + logInfo("unscheduleJob method returned") + } + + override def removeJob(asyncQuerySchedulerRequest: AsyncQuerySchedulerRequest): Unit = { + logInfo("removeJob method returned") + } + + /** + * Closes the scheduler and releases resources. + */ + override def close(): Unit = { + logInfo("Shutting down AsyncQuerySchedulerForSqlIT") + executorService.shutdown() + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow() + } + } catch { + case _: InterruptedException => + executorService.shutdownNow() + } + } +}