From 37df29f7d323f3a57ac1721008b99ba6a2b1e099 Mon Sep 17 00:00:00 2001 From: Zhongnan Su Date: Tue, 10 May 2022 11:35:45 -0700 Subject: [PATCH] integrate job-scheduler into observability (#609) Signed-off-by: Zhongnan Su --- opensearch-observability/build.gradle | 50 ++++- .../observability/ObservabilityPlugin.kt | 27 ++- .../opensearch/observability/model/RestTag.kt | 4 + .../observability/model/ScheduledJobDoc.kt | 171 ++++++++++++++++++ .../resthandler/SchedulerRestHandler.kt | 155 ++++++++++++++++ .../scheduler/ObservabilityJobParser.kt | 22 +++ .../scheduler/ObservabilityJobRunner.kt | 37 ++++ ...ch.jobscheduler.spi.JobSchedulerExtension} | 0 .../observability/ObservabilityPluginIT.kt | 4 + .../bwc/job-scheduler}/.gitignore | 0 .../resources/bwc/observability/.gitignore | 4 + .../test/resources/job-scheduler/.gitignore | 4 + 12 files changed, 473 insertions(+), 5 deletions(-) create mode 100644 opensearch-observability/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt create mode 100644 opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt create mode 100644 opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt create mode 100644 opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt rename opensearch-observability/src/main/resources/META-INF/services/{com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension => org.opensearch.jobscheduler.spi.JobSchedulerExtension} (100%) rename opensearch-observability/src/test/{kotlin/org/opensearch/observability/resources/bwc => resources/bwc/job-scheduler}/.gitignore (100%) create mode 100644 opensearch-observability/src/test/resources/bwc/observability/.gitignore create mode 100644 opensearch-observability/src/test/resources/job-scheduler/.gitignore diff --git a/opensearch-observability/build.gradle b/opensearch-observability/build.gradle index 7c2541cef..bff4b01b1 100644 --- a/opensearch-observability/build.gradle +++ b/opensearch-observability/build.gradle @@ -23,6 +23,7 @@ buildscript { } common_utils_version = System.getProperty("common_utils.version", opensearch_build) kotlin_version = System.getProperty("kotlin.version", "1.6.0") + job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) } repositories { @@ -65,6 +66,7 @@ opensearchplugin { name 'opensearch-observability' description 'OpenSearch Plugin for OpenSearch Dashboards Observability' classname "org.opensearch.observability.ObservabilityPlugin" + extendedPlugins = ['opensearch-job-scheduler'] } allOpen { @@ -130,6 +132,7 @@ dependencies { implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9" implementation "${group}:common-utils:${common_utils_version}" implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre' + compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}" testImplementation( 'org.assertj:assertj-core:3.16.1', 'org.junit.jupiter:junit-jupiter-api:5.6.2' @@ -240,9 +243,31 @@ integTest { Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); integTest.dependsOn(bundle) integTest.getClusters().forEach { c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile())) } +String jobSchedulerURL = "https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/" + opensearch_version.replace("-SNAPSHOT", "") + "/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-job-scheduler-" + opensearch_build.replace("-SNAPSHOT", "") + ".zip" testClusters.integTest { testDistribution = "INTEG_TEST" + // need to install job-scheduler first, need to assemble job-scheduler first + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + File dir = new File("src/test/resources/job-scheduler") + if (!dir.exists()) { + dir.mkdirs() + } + File file = new File(dir, "opensearch-job-scheduler-" + opensearch_build + ".zip") + if (!file.exists()) { + new URL(jobSchedulerURL).withInputStream{ ins -> file.withOutputStream{ it << ins }} + } + return fileTree("src/test/resources/job-scheduler").getSingleFile() + } + } + } + })) + // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 if (_numNodes > 1) numberOfNodes = _numNodes // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore @@ -260,7 +285,7 @@ testClusters.integTest { String bwcVersion = "1.2.0-SNAPSHOT" String baseName = "obsBwcCluster" -String bwcFilePath = "src/test/kotlin/org/opensearch/observability/resources/bwc/" +String bwcFilePath = "src/test/resources/bwc/" String remoteFileURL = "https://github.com/opensearch-project/observability/releases/download/1.2.0.0/opensearch-observability-1.2.0.0.zip" 2.times {i -> @@ -275,7 +300,7 @@ String remoteFileURL = "https://github.com/opensearch-project/observability/rele return new RegularFile() { @Override File getAsFile() { - File dir = new File(bwcFilePath + bwcVersion) + File dir = new File(bwcFilePath + "observability/" + bwcVersion) if (!dir.exists()) { dir.mkdirs() } @@ -283,7 +308,7 @@ String remoteFileURL = "https://github.com/opensearch-project/observability/rele if (!file.exists()) { new URL(remoteFileURL).withInputStream{ ins -> file.withOutputStream{ it << ins }} } - return fileTree(bwcFilePath + bwcVersion).getSingleFile() + return fileTree(bwcFilePath + "observability/" + bwcVersion).getSingleFile() } } } @@ -301,6 +326,25 @@ task prepareBwcTests { dependsOn bundle doLast { plugins = [ + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + File dir = new File(bwcFilePath + "job-scheduler/" + project.version) + if (!dir.exists()) { + dir.mkdirs() + } + File file = new File(dir, "opensearch-job-scheduler-" + project.version + ".zip") + if (!file.exists()) { + new URL(jobSchedulerURL).withInputStream{ ins -> file.withOutputStream{ it << ins }} + } + return fileTree(bwcFilePath + "job-scheduler/" + project.version).getSingleFile() + } + } + } + }), project.getObjects().fileProperty().value(bundle.getArchiveFile()) ] } diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt index bdf7890ac..cc46da157 100644 --- a/opensearch-observability/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/ObservabilityPlugin.kt @@ -19,12 +19,18 @@ import org.opensearch.common.settings.SettingsFilter import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment +import org.opensearch.jobscheduler.spi.JobSchedulerExtension +import org.opensearch.jobscheduler.spi.ScheduledJobParser +import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.observability.action.CreateObservabilityObjectAction import org.opensearch.observability.action.DeleteObservabilityObjectAction import org.opensearch.observability.action.GetObservabilityObjectAction import org.opensearch.observability.action.UpdateObservabilityObjectAction import org.opensearch.observability.index.ObservabilityIndex import org.opensearch.observability.resthandler.ObservabilityRestHandler +import org.opensearch.observability.resthandler.SchedulerRestHandler +import org.opensearch.observability.scheduler.ObservabilityJobParser +import org.opensearch.observability.scheduler.ObservabilityJobRunner import org.opensearch.observability.settings.PluginSettings import org.opensearch.plugins.ActionPlugin import org.opensearch.plugins.Plugin @@ -40,7 +46,7 @@ import java.util.function.Supplier * Entry point of the OpenSearch Observability plugin. * This class initializes the rest handlers. */ -class ObservabilityPlugin : Plugin(), ActionPlugin { +class ObservabilityPlugin : Plugin(), ActionPlugin, JobSchedulerExtension { companion object { const val PLUGIN_NAME = "opensearch-observability" @@ -90,7 +96,8 @@ class ObservabilityPlugin : Plugin(), ActionPlugin { nodesInCluster: Supplier ): List { return listOf( - ObservabilityRestHandler() + ObservabilityRestHandler(), + SchedulerRestHandler() // TODO: tmp rest handler only for POC purpose ) } @@ -117,4 +124,20 @@ class ObservabilityPlugin : Plugin(), ActionPlugin { ) ) } + + override fun getJobType(): String { + return "observability" + } + + override fun getJobIndex(): String { + return SchedulerRestHandler.SCHEDULED_JOB_INDEX + } + + override fun getJobRunner(): ScheduledJobRunner { + return ObservabilityJobRunner + } + + override fun getJobParser(): ScheduledJobParser { + return ObservabilityJobParser + } } diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/RestTag.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/RestTag.kt index 1bc390049..3cb519aca 100644 --- a/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/RestTag.kt +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/RestTag.kt @@ -33,6 +33,10 @@ internal object RestTag { const val OPERATIONAL_PANEL_FIELD = "operationalPanel" const val APPLICATION_FIELD = "application" const val TIMESTAMP_FIELD = "timestamp" + const val SCHEDULE_INFO_TAG = "schedule" + const val SCHEDULED_JOB_TYPE_TAG = "jobType" + const val ID_FIELD = "id" + const val IS_ENABLED_TAG = "isEnabled" private val INCLUDE_ID = Pair(OBJECT_ID_FIELD, "true") private val EXCLUDE_ACCESS = Pair(ACCESS_LIST_FIELD, "false") val REST_OUTPUT_PARAMS: Params = ToXContent.MapParams(mapOf(INCLUDE_ID)) diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt new file mode 100644 index 000000000..42ba7b786 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/model/ScheduledJobDoc.kt @@ -0,0 +1,171 @@ +package org.opensearch.observability.model + +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.jobscheduler.spi.ScheduledJobParameter +import org.opensearch.jobscheduler.spi.schedule.Schedule +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser +import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX +import org.opensearch.observability.model.RestTag.ACCESS_LIST_FIELD +import org.opensearch.observability.model.RestTag.CREATED_TIME_FIELD +import org.opensearch.observability.model.RestTag.ID_FIELD +import org.opensearch.observability.model.RestTag.IS_ENABLED_TAG +import org.opensearch.observability.model.RestTag.OBJECT_ID_FIELD +import org.opensearch.observability.model.RestTag.SCHEDULED_JOB_TYPE_TAG +import org.opensearch.observability.model.RestTag.SCHEDULE_INFO_TAG +import org.opensearch.observability.model.RestTag.TENANT_FIELD +import org.opensearch.observability.model.RestTag.UPDATED_TIME_FIELD +import org.opensearch.observability.security.UserAccessManager.DEFAULT_TENANT +import org.opensearch.observability.util.logger +import org.opensearch.observability.util.stringList +import java.io.IOException +import java.time.Instant + +/** + * TODO: this whole class is for poc purpose. As for actual implementation, it depends on the data model of Metric. + */ +internal data class ScheduledJobDoc( + val id: String, + val updatedTime: Instant, + val createdTime: Instant, + val tenant: String, + val access: List, + val jobType: JobType, + val scheduleInfo: Schedule, + val enabled: Boolean +) : ScheduledJobParameter, BaseModel { + + internal enum class JobType { Metrics, Uptime } + + internal companion object { + private val log by logger(ScheduledJobDoc::class.java) + + /** + * Parse the data from parser and create ScheduledJobDoc object + * @param parser data referenced at parser + * @param userId use this id if not available in the json + * @return created ScheduledJobDoc object + */ + @JvmStatic + @Throws(IOException::class) + @Suppress("ComplexMethod") + fun parse(parser: XContentParser, userId: String? = null): ScheduledJobDoc { + var id: String? = userId + var updatedTime: Instant? = null + var createdTime: Instant? = null + var tenant: String? = null + var access: List = listOf() + var jobType: JobType? = null + var scheduleInfo: Schedule? = null + var enabled = false + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser) + while (XContentParser.Token.END_OBJECT != parser.nextToken()) { + val fieldName = parser.currentName() + parser.nextToken() + when (fieldName) { + OBJECT_ID_FIELD -> id = parser.text() + UPDATED_TIME_FIELD -> updatedTime = Instant.ofEpochMilli(parser.longValue()) + CREATED_TIME_FIELD -> createdTime = Instant.ofEpochMilli(parser.longValue()) + TENANT_FIELD -> tenant = parser.text() + ACCESS_LIST_FIELD -> access = parser.stringList() + SCHEDULED_JOB_TYPE_TAG -> jobType = JobType.valueOf(parser.text()) + SCHEDULE_INFO_TAG -> scheduleInfo = ScheduleParser.parse(parser) + IS_ENABLED_TAG -> enabled = parser.booleanValue() + else -> { + parser.skipChildren() + log.info("$LOG_PREFIX:ScheduledJobDoc Skipping Unknown field $fieldName") + } + } + } + + id ?: throw IllegalArgumentException("$ID_FIELD field absent") + updatedTime ?: throw IllegalArgumentException("$UPDATED_TIME_FIELD field absent") + createdTime ?: throw IllegalArgumentException("$CREATED_TIME_FIELD field absent") + tenant = tenant ?: DEFAULT_TENANT + jobType ?: throw IllegalArgumentException("$SCHEDULED_JOB_TYPE_TAG field absent") + scheduleInfo ?: throw IllegalArgumentException("$SCHEDULE_INFO_TAG field absent") + + return ScheduledJobDoc( + id, + updatedTime, + createdTime, + tenant, + access, + jobType, + scheduleInfo, + enabled + ) + } + } + + /** + * create XContentBuilder from this object using [XContentFactory.jsonBuilder()] + * @param params XContent parameters + * @return created XContentBuilder object + */ + fun toXContent(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): XContentBuilder? { + return toXContent(XContentFactory.jsonBuilder(), params) + } + + override fun writeTo(output: StreamOutput) { + output.writeString(id) + output.writeInstant(updatedTime) + output.writeInstant(createdTime) + output.writeString(tenant) + output.writeStringCollection(access) + output.writeEnum(jobType) + output.writeEnum(jobType) // jobType is read twice in constructor + output.writeOptionalWriteable(scheduleInfo) + output.writeBoolean(enabled) + } + + /** + * {ref toXContent} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + builder!! + builder.startObject() + if (params?.paramAsBoolean(ID_FIELD, false) == true) { + builder.field(ID_FIELD, id) + } + builder.field(UPDATED_TIME_FIELD, updatedTime.toEpochMilli()) + .field(CREATED_TIME_FIELD, createdTime.toEpochMilli()) + .field(TENANT_FIELD, tenant) + if (params?.paramAsBoolean(ACCESS_LIST_FIELD, true) == true && access.isNotEmpty()) { + builder.field(ACCESS_LIST_FIELD, access) + } + + builder.field(SCHEDULE_INFO_TAG) + schedule.toXContent(builder, ToXContent.EMPTY_PARAMS) + + builder.field(SCHEDULED_JOB_TYPE_TAG, jobType) + .field(IS_ENABLED_TAG, enabled) + builder.endObject() + return builder + } + + override fun getName(): String { + return "poc name" // TODO: placeholder e.g. metric.name + } + + override fun getLastUpdateTime(): Instant { + return updatedTime + } + + override fun getEnabledTime(): Instant { + return createdTime + } + + override fun getSchedule(): Schedule { + return scheduleInfo + } + + override fun isEnabled(): Boolean { + return enabled + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt new file mode 100644 index 000000000..28f3082d8 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/SchedulerRestHandler.kt @@ -0,0 +1,155 @@ +package org.opensearch.observability.resthandler + +import org.opensearch.action.ActionListener +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.observability.model.RestTag +import org.opensearch.observability.model.ScheduledJobDoc +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.RestStatus +import java.io.IOException +import java.time.Instant +import java.time.temporal.ChronoUnit + +/** + * TODO: This REST handler is for POC to verify that job-scheduler workflow can run in Observability. + * In the future this will be removed. Scheduling won't have it's own REST API. It always comes with create Metic API + */ +internal class SchedulerRestHandler : BaseRestHandler() { + companion object { + private const val SCHEDULE_ACTION = "observability_jobs_actions" + const val SCHEDULED_JOB_INDEX = ".opensearch-observability-job" + private const val OBSERVABILITY_SCHEDULE_URL = "_plugins/poc/_schedule" + private const val CONSTANT = 10 + } + + /** + * {@inheritDoc} + */ + override fun getName(): String { + return SCHEDULE_ACTION + } + + /** + * {@inheritDoc} + */ + override fun routes(): List { + return listOf( + /** + * Create a new object + * Request URL: POST OBSERVABILITY_URL + * Request body: Ref [org.opensearch.observability.model.CreateObservabilityObjectRequest] + * Response body: Ref [org.opensearch.observability.model.CreateObservabilityObjectResponse] + */ + RestHandler.Route(RestRequest.Method.POST, OBSERVABILITY_SCHEDULE_URL) + ) + } + + /** + * {@inheritDoc} + */ + override fun responseParams(): Set { + return setOf( + RestTag.OBJECT_ID_FIELD, + RestTag.OBJECT_ID_LIST_FIELD, + RestTag.OBJECT_TYPE_FIELD, + RestTag.SORT_FIELD_FIELD, + RestTag.SORT_ORDER_FIELD, + RestTag.FROM_INDEX_FIELD, + RestTag.MAX_ITEMS_FIELD + ) + } + + private fun executePostRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + // TODO: Indexing a scheduled job will happen within the workflow of creating a metric. Below is for POC only. + + if (request.method() == RestRequest.Method.POST) { + val sampleId = "id" + Math.random() * CONSTANT + val scheduledJob = ScheduledJobDoc( + sampleId, + Instant.now(), + Instant.now(), + "__user__", + listOf(), + ScheduledJobDoc.JobType.Metrics, + IntervalSchedule( + Instant.now(), + 1, + ChronoUnit.MINUTES + ), + true, + ) + val indexRequest: IndexRequest = IndexRequest() + .index(SCHEDULED_JOB_INDEX) + .id(sampleId) + .source(scheduledJob.toXContent()) + + return RestChannelConsumer { restChannel: RestChannel -> + // index the job parameter + + client.index( + indexRequest, + object : ActionListener { + override fun onResponse(indexResponse: IndexResponse) { + try { + val restResponse: RestResponse = BytesRestResponse( + RestStatus.OK, + indexResponse.toXContent(JsonXContent.contentBuilder(), null) + ) + restChannel.sendResponse(restResponse) + } catch (e: IOException) { + restChannel.sendResponse( + BytesRestResponse( + RestStatus.INTERNAL_SERVER_ERROR, + e.message + ) + ) + } + } + + override fun onFailure(e: Exception) { + restChannel.sendResponse( + BytesRestResponse( + RestStatus.INTERNAL_SERVER_ERROR, + e.message + ) + ) + } + } + ) + } + } else { + return RestChannelConsumer { restChannel: RestChannel -> + restChannel.sendResponse( + BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, + request.method().toString() + " is not allowed." + ) + ) + } + } + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + return when (request.method()) { + RestRequest.Method.POST -> executePostRequest(request, client) + else -> RestChannelConsumer { + it.sendResponse( + BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, + "${request.method()} is not allowed" + ) + ) + } + } + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt new file mode 100644 index 000000000..0278f58a9 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobParser.kt @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.observability.scheduler + +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.jobscheduler.spi.JobDocVersion +import org.opensearch.jobscheduler.spi.ScheduledJobParameter +import org.opensearch.jobscheduler.spi.ScheduledJobParser +import org.opensearch.observability.model.ScheduledJobDoc + +internal object ObservabilityJobParser : ScheduledJobParser { + /** + * {@inheritDoc} + */ + override fun parse(xContentParser: XContentParser, id: String, jobDocVersion: JobDocVersion): ScheduledJobParameter { + xContentParser.nextToken() + return ScheduledJobDoc.parse(xContentParser, id) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt new file mode 100644 index 000000000..9f52113c3 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/scheduler/ObservabilityJobRunner.kt @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.observability.scheduler + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.opensearch.jobscheduler.spi.JobExecutionContext +import org.opensearch.jobscheduler.spi.ScheduledJobParameter +import org.opensearch.jobscheduler.spi.ScheduledJobRunner +import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX +import org.opensearch.observability.model.ScheduledJobDoc +import org.opensearch.observability.util.logger + +internal object ObservabilityJobRunner : ScheduledJobRunner { + private val log by logger(ObservabilityJobRunner::class.java) + private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + + override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { + if (job !is ScheduledJobDoc) { + log.warn("$LOG_PREFIX:job is not of type ScheduledJobDoc:${job.javaClass.name}") + throw IllegalArgumentException("job is not of type ScheduledJobDoc:${job.javaClass.name}") + } + + scope.launch { + val scheduledJob: ScheduledJobDoc = job + val jobType = scheduledJob.jobType + // TODO: Add logic to retrieve metric and update metric index. E,g. run PPL/SQL query via transport API + // and write to metric index after some processing. + + log.info("POC: Running job type: ${jobType.name}") + } + } +} diff --git a/opensearch-observability/src/main/resources/META-INF/services/com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension b/opensearch-observability/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension similarity index 100% rename from opensearch-observability/src/main/resources/META-INF/services/com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension rename to opensearch-observability/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension diff --git a/opensearch-observability/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt b/opensearch-observability/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt index 02478273e..0b9984ec7 100644 --- a/opensearch-observability/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt +++ b/opensearch-observability/src/test/kotlin/org/opensearch/observability/ObservabilityPluginIT.kt @@ -20,6 +20,10 @@ class ObservabilityPluginIT : OpenSearchIntegTestCase() { nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()) val nodesInfoResponse = client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet() val pluginInfos = nodesInfoResponse.nodes[0].getInfo(PluginsAndModules::class.java).pluginInfos + assertTrue( + pluginInfos.stream() + .anyMatch { pluginInfo: PluginInfo -> pluginInfo.name == "opensearch-job-scheduler" } + ) assertTrue( pluginInfos.stream() .anyMatch { pluginInfo: PluginInfo -> pluginInfo.name == "opensearch-observability" } diff --git a/opensearch-observability/src/test/kotlin/org/opensearch/observability/resources/bwc/.gitignore b/opensearch-observability/src/test/resources/bwc/job-scheduler/.gitignore similarity index 100% rename from opensearch-observability/src/test/kotlin/org/opensearch/observability/resources/bwc/.gitignore rename to opensearch-observability/src/test/resources/bwc/job-scheduler/.gitignore diff --git a/opensearch-observability/src/test/resources/bwc/observability/.gitignore b/opensearch-observability/src/test/resources/bwc/observability/.gitignore new file mode 100644 index 000000000..99f6ad50f --- /dev/null +++ b/opensearch-observability/src/test/resources/bwc/observability/.gitignore @@ -0,0 +1,4 @@ +# ignore artifacts in this directory +* +*/ +!.gitignore diff --git a/opensearch-observability/src/test/resources/job-scheduler/.gitignore b/opensearch-observability/src/test/resources/job-scheduler/.gitignore new file mode 100644 index 000000000..99f6ad50f --- /dev/null +++ b/opensearch-observability/src/test/resources/job-scheduler/.gitignore @@ -0,0 +1,4 @@ +# ignore artifacts in this directory +* +*/ +!.gitignore