Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Sep 23, 2024
1 parent 88a8463 commit 0f7ebaf
Show file tree
Hide file tree
Showing 21 changed files with 567 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,67 @@

import java.time.Instant;

/** Represents a job request for a scheduled task. */
/**
* Represents a job request for a scheduled task.
*/
@Builder
@Data
public class AsyncQuerySchedulerRequest {
protected String accountId;
// Scheduler jobid is the opensearch index name until we support multiple jobs per index
protected String jobId;
protected String dataSource;
protected String scheduledQuery;
protected String queryLang;
protected Object schedule;
protected boolean enabled;
protected Instant lastUpdateTime;
protected Instant enabledTime;
protected Long lockDurationSeconds;
protected Double jitter;
}

/**
* The AWS accountId used to identify the resource.
*/
String accountId;

/**
* The unique identifier for the scheduler job.
*/
String jobId;

/**
* The name of the data source on which the scheduled query will be executed.
*/
String dataSource;

/**
* The scheduled query to be executed.
*/
String scheduledQuery;

/**
* The language in which the query is written, such as SQL, PPL (Piped Processing Language), etc.
*/
String queryLang;

/**
* The interval expression defining the frequency of the job execution.
* Typically expressed as a time-based pattern (e.g. 5 minutes).
*/
String interval;

/**
* Indicates whether the scheduled job is currently enabled or not.
*/
boolean enabled;

/**
* The timestamp of the last update made to this job.
*/
Instant lastUpdateTime;

/**
* The timestamp when this job was enabled.
*/
Instant enabledTime;

/**
* The duration, in seconds, for which the job remains locked.
* This lock is used to prevent concurrent executions of the same job, ensuring that only one instance of the job runs at a time.
*/
Long lockDurationSeconds;

/**
* The jitter value to add randomness to the execution schedule, helping to avoid the thundering herd problem.
*/
Double jitter;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,13 @@ public int getSocketTimeoutMillis() {
public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}

public String getClientId() {

/**
* Get the AWS accountId from the cluster name.
* Flint cluster name is in the format of "accountId:clusterName".
* @return the AWS accountId
*/
public String getAWSAccountId() {
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", "");
String[] parts = clusterName.split(":");
return parts.length == 2 ? parts[0] : "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark

import java.time.Instant

import scala.collection.JavaConverters._

import org.json4s.{Formats, NoTypeHints}
Expand All @@ -16,19 +14,17 @@ import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, Optimi
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler
import org.opensearch.flint.common.scheduler.model.{AsyncQuerySchedulerRequest, LangType}
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilder
import org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.scheduler.util.RefreshQueryGenerator
import org.opensearch.flint.spark.scheduler.{AsyncQuerySchedulerBuilder, FlintSparkJobSchedulingService}
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder.AsyncQuerySchedulerAction
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy
Expand Down Expand Up @@ -120,6 +116,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
} else {
val metadata = index.metadata()
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index,
spark,
flintAsyncQueryScheduler,
flintSparkConf,
flintIndexMonitor)
tx
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
Expand All @@ -134,9 +136,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
flintIndexMetadataService
.updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id)))
}
if (isExternalSchedulerEnabled(index)) {
handleAsyncQueryScheduler(index, AsyncQuerySchedulerAction.SCHEDULE)
}
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE)
})
}
}
Expand Down Expand Up @@ -254,21 +254,20 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
withTransaction[Boolean](indexName, "Delete Flint index") { tx =>
if (flintClient.exists(indexName)) {
val index = describeIndex(indexName)
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index.get,
spark,
flintAsyncQueryScheduler,
flintSparkConf,
flintIndexMonitor)
tx
.initialLog(latest =>
latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED)
.transientLog(latest => latest.copy(state = DELETING))
.finalLog(latest => latest.copy(state = DELETED))
.commit(_ => {
if (isExternalSchedulerEnabled(index.get)) {
handleAsyncQueryScheduler(index.get, AsyncQuerySchedulerAction.UNSCHEDULE)
true
} else {
// TODO: share same transaction for now
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
true
}
jobSchedulingService.handleJob(index.get, AsyncQuerySchedulerAction.UNSCHEDULE)
true
})
} else {
logInfo("Flint index to be deleted doesn't exist")
Expand All @@ -289,14 +288,18 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
if (flintClient.exists(indexName)) {
val index = describeIndex(indexName)
val options = index.get.options
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index.get,
spark,
flintAsyncQueryScheduler,
flintSparkConf,
flintIndexMonitor)
tx
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
if (isExternalSchedulerEnabled(index.get)) {
handleAsyncQueryScheduler(index.get, AsyncQuerySchedulerAction.REMOVE)
}
jobSchedulingService.handleJob(index.get, AsyncQuerySchedulerAction.REMOVE)
flintClient.deleteIndex(indexName)
flintIndexMetadataService.deleteIndexMetadata(indexName)

Expand Down Expand Up @@ -463,6 +466,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index,
spark,
flintAsyncQueryScheduler,
flintSparkConf,
flintIndexMonitor)
tx
.initialLog(latest =>
latest.state == REFRESHING && latest.entryVersion == indexLogEntry.entryVersion)
Expand All @@ -471,14 +480,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
logInfo("Update index options complete")
if (isExternalSchedulerEnabled(index)) {
handleAsyncQueryScheduler(index, AsyncQuerySchedulerAction.UNSCHEDULE)
None
} else {
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
None
}
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UNSCHEDULE)
None
})
}

Expand All @@ -487,7 +490,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index,
spark,
flintAsyncQueryScheduler,
flintSparkConf,
flintIndexMonitor)
tx
.initialLog(latest =>
latest.state == ACTIVE && latest.entryVersion == indexLogEntry.entryVersion)
Expand All @@ -501,56 +509,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
logInfo("Update index options complete")
if (isExternalSchedulerEnabled(index)) {
handleAsyncQueryScheduler(index, AsyncQuerySchedulerAction.UPDATE)
None
} else {
indexRefresh.start(spark, flintSparkConf)
}
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE)
None
})
}

private def handleAsyncQueryScheduler(
index: FlintSparkIndex,
action: AsyncQuerySchedulerAction): Unit = {
val dataSource = flintSparkConf.flintOptions().getDataSourceName()
val clientId = flintSparkConf.flintOptions().getClientId()
val indexName = index.name()

logInfo(s"handleAsyncQueryScheduler invoked: $action")

val baseRequest = AsyncQuerySchedulerRequest
.builder()
.accountId(clientId)
.jobId(indexName)
.dataSource(dataSource)

val request = action match {
case AsyncQuerySchedulerAction.SCHEDULE | AsyncQuerySchedulerAction.UPDATE =>
val currentTime = Instant.now()
baseRequest
.scheduledQuery(RefreshQueryGenerator.generateRefreshQuery(index))
.queryLang(LangType.SQL)
.schedule(index.options.refreshInterval())
.enabled(true)
.enabledTime(currentTime)
.lastUpdateTime(currentTime)
.build()
case _ => baseRequest.build()
}

action match {
case AsyncQuerySchedulerAction.SCHEDULE => flintAsyncQueryScheduler.scheduleJob(request)
case AsyncQuerySchedulerAction.UPDATE => flintAsyncQueryScheduler.updateJob(request)
case AsyncQuerySchedulerAction.UNSCHEDULE => flintAsyncQueryScheduler.unscheduleJob(request)
case AsyncQuerySchedulerAction.REMOVE => flintAsyncQueryScheduler.removeJob(request)
case _ => throw new IllegalArgumentException(s"Unsupported action: $action")
}
}

private def isExternalSchedulerEnabled(index: FlintSparkIndex): Boolean = {
val autoRefresh = index.options.autoRefresh()
val schedulerModeExternal = index.options.isExternalSchedulerEnabled()
autoRefresh && schedulerModeExternal
}
}
Loading

0 comments on commit 0f7ebaf

Please sign in to comment.