Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate job-scheduler into observability #609

Merged
merged 1 commit into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions opensearch-observability/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ buildscript {
}
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
kotlin_version = System.getProperty("kotlin.version", "1.6.0")
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
}

repositories {
Expand Down Expand Up @@ -65,6 +66,7 @@ opensearchplugin {
name 'opensearch-observability'
description 'OpenSearch Plugin for OpenSearch Dashboards Observability'
classname "org.opensearch.observability.ObservabilityPlugin"
extendedPlugins = ['opensearch-job-scheduler']
}

allOpen {
Expand Down Expand Up @@ -130,6 +132,7 @@ dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
implementation "${group}:common-utils:${common_utils_version}"
implementation group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'
compileOnly "${group}:opensearch-job-scheduler-spi:${job_scheduler_version}"
testImplementation(
'org.assertj:assertj-core:3.16.1',
'org.junit.jupiter:junit-jupiter-api:5.6.2'
Expand Down Expand Up @@ -240,9 +243,31 @@ integTest {
Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
integTest.dependsOn(bundle)
integTest.getClusters().forEach { c -> c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile())) }
String jobSchedulerURL = "https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/" + opensearch_version.replace("-SNAPSHOT", "") + "/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-job-scheduler-" + opensearch_build.replace("-SNAPSHOT", "") + ".zip"

testClusters.integTest {
testDistribution = "INTEG_TEST"
// need to install job-scheduler first, need to assemble job-scheduler first
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
File dir = new File("src/test/resources/job-scheduler")
if (!dir.exists()) {
dir.mkdirs()
}
File file = new File(dir, "opensearch-job-scheduler-" + opensearch_build + ".zip")
if (!file.exists()) {
new URL(jobSchedulerURL).withInputStream{ ins -> file.withOutputStream{ it << ins }}
}
return fileTree("src/test/resources/job-scheduler").getSingleFile()
}
}
}
}))

// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand All @@ -260,7 +285,7 @@ testClusters.integTest {

String bwcVersion = "1.2.0-SNAPSHOT"
String baseName = "obsBwcCluster"
String bwcFilePath = "src/test/kotlin/org/opensearch/observability/resources/bwc/"
String bwcFilePath = "src/test/resources/bwc/"
String remoteFileURL = "https://github.com/opensearch-project/observability/releases/download/1.2.0.0/opensearch-observability-1.2.0.0.zip"

2.times {i ->
Expand All @@ -275,15 +300,15 @@ String remoteFileURL = "https://github.com/opensearch-project/observability/rele
return new RegularFile() {
@Override
File getAsFile() {
File dir = new File(bwcFilePath + bwcVersion)
File dir = new File(bwcFilePath + "observability/" + bwcVersion)
if (!dir.exists()) {
dir.mkdirs()
}
File file = new File(dir, "opensearch-observability-1.2.0.0-SNAPSHOT.zip")
if (!file.exists()) {
new URL(remoteFileURL).withInputStream{ ins -> file.withOutputStream{ it << ins }}
}
return fileTree(bwcFilePath + bwcVersion).getSingleFile()
return fileTree(bwcFilePath + "observability/" + bwcVersion).getSingleFile()
}
}
}
Expand All @@ -301,6 +326,25 @@ task prepareBwcTests {
dependsOn bundle
doLast {
plugins = [
provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
File dir = new File(bwcFilePath + "job-scheduler/" + project.version)
if (!dir.exists()) {
dir.mkdirs()
}
File file = new File(dir, "opensearch-job-scheduler-" + project.version + ".zip")
if (!file.exists()) {
new URL(jobSchedulerURL).withInputStream{ ins -> file.withOutputStream{ it << ins }}
}
return fileTree(bwcFilePath + "job-scheduler/" + project.version).getSingleFile()
}
}
}
}),
project.getObjects().fileProperty().value(bundle.getArchiveFile())
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ import org.opensearch.common.settings.SettingsFilter
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.observability.action.CreateObservabilityObjectAction
import org.opensearch.observability.action.DeleteObservabilityObjectAction
import org.opensearch.observability.action.GetObservabilityObjectAction
import org.opensearch.observability.action.UpdateObservabilityObjectAction
import org.opensearch.observability.index.ObservabilityIndex
import org.opensearch.observability.resthandler.ObservabilityRestHandler
import org.opensearch.observability.resthandler.SchedulerRestHandler
import org.opensearch.observability.scheduler.ObservabilityJobParser
import org.opensearch.observability.scheduler.ObservabilityJobRunner
import org.opensearch.observability.settings.PluginSettings
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
Expand All @@ -40,7 +46,7 @@ import java.util.function.Supplier
* Entry point of the OpenSearch Observability plugin.
* This class initializes the rest handlers.
*/
class ObservabilityPlugin : Plugin(), ActionPlugin {
class ObservabilityPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {

companion object {
const val PLUGIN_NAME = "opensearch-observability"
Expand Down Expand Up @@ -90,7 +96,8 @@ class ObservabilityPlugin : Plugin(), ActionPlugin {
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
ObservabilityRestHandler()
ObservabilityRestHandler(),
SchedulerRestHandler() // TODO: tmp rest handler only for POC purpose
)
}

Expand All @@ -117,4 +124,20 @@ class ObservabilityPlugin : Plugin(), ActionPlugin {
)
)
}

override fun getJobType(): String {
return "observability"
}

override fun getJobIndex(): String {
return SchedulerRestHandler.SCHEDULED_JOB_INDEX
}

override fun getJobRunner(): ScheduledJobRunner {
return ObservabilityJobRunner
}

override fun getJobParser(): ScheduledJobParser {
return ObservabilityJobParser
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ internal object RestTag {
const val OPERATIONAL_PANEL_FIELD = "operationalPanel"
const val APPLICATION_FIELD = "application"
const val TIMESTAMP_FIELD = "timestamp"
const val SCHEDULE_INFO_TAG = "schedule"
const val SCHEDULED_JOB_TYPE_TAG = "jobType"
const val ID_FIELD = "id"
const val IS_ENABLED_TAG = "isEnabled"
private val INCLUDE_ID = Pair(OBJECT_ID_FIELD, "true")
private val EXCLUDE_ACCESS = Pair(ACCESS_LIST_FIELD, "false")
val REST_OUTPUT_PARAMS: Params = ToXContent.MapParams(mapOf(INCLUDE_ID))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package org.opensearch.observability.model

import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.schedule.Schedule
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser
import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX
import org.opensearch.observability.model.RestTag.ACCESS_LIST_FIELD
import org.opensearch.observability.model.RestTag.CREATED_TIME_FIELD
import org.opensearch.observability.model.RestTag.ID_FIELD
import org.opensearch.observability.model.RestTag.IS_ENABLED_TAG
import org.opensearch.observability.model.RestTag.OBJECT_ID_FIELD
import org.opensearch.observability.model.RestTag.SCHEDULED_JOB_TYPE_TAG
import org.opensearch.observability.model.RestTag.SCHEDULE_INFO_TAG
import org.opensearch.observability.model.RestTag.TENANT_FIELD
import org.opensearch.observability.model.RestTag.UPDATED_TIME_FIELD
import org.opensearch.observability.security.UserAccessManager.DEFAULT_TENANT
import org.opensearch.observability.util.logger
import org.opensearch.observability.util.stringList
import java.io.IOException
import java.time.Instant

/**
* TODO: this whole class is for poc purpose. As for actual implementation, it depends on the data model of Metric.
*/
internal data class ScheduledJobDoc(
val id: String,
val updatedTime: Instant,
val createdTime: Instant,
val tenant: String,
val access: List<String>,
val jobType: JobType,
val scheduleInfo: Schedule,
val enabled: Boolean
) : ScheduledJobParameter, BaseModel {

internal enum class JobType { Metrics, Uptime }

internal companion object {
private val log by logger(ScheduledJobDoc::class.java)

/**
* Parse the data from parser and create ScheduledJobDoc object
* @param parser data referenced at parser
* @param userId use this id if not available in the json
* @return created ScheduledJobDoc object
*/
@JvmStatic
@Throws(IOException::class)
@Suppress("ComplexMethod")
fun parse(parser: XContentParser, userId: String? = null): ScheduledJobDoc {
var id: String? = userId
var updatedTime: Instant? = null
var createdTime: Instant? = null
var tenant: String? = null
var access: List<String> = listOf()
var jobType: JobType? = null
var scheduleInfo: Schedule? = null
var enabled = false

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser)
while (XContentParser.Token.END_OBJECT != parser.nextToken()) {
val fieldName = parser.currentName()
parser.nextToken()
when (fieldName) {
OBJECT_ID_FIELD -> id = parser.text()
UPDATED_TIME_FIELD -> updatedTime = Instant.ofEpochMilli(parser.longValue())
CREATED_TIME_FIELD -> createdTime = Instant.ofEpochMilli(parser.longValue())
TENANT_FIELD -> tenant = parser.text()
ACCESS_LIST_FIELD -> access = parser.stringList()
SCHEDULED_JOB_TYPE_TAG -> jobType = JobType.valueOf(parser.text())
SCHEDULE_INFO_TAG -> scheduleInfo = ScheduleParser.parse(parser)
IS_ENABLED_TAG -> enabled = parser.booleanValue()
else -> {
parser.skipChildren()
log.info("$LOG_PREFIX:ScheduledJobDoc Skipping Unknown field $fieldName")
}
}
}

id ?: throw IllegalArgumentException("$ID_FIELD field absent")
updatedTime ?: throw IllegalArgumentException("$UPDATED_TIME_FIELD field absent")
createdTime ?: throw IllegalArgumentException("$CREATED_TIME_FIELD field absent")
tenant = tenant ?: DEFAULT_TENANT
jobType ?: throw IllegalArgumentException("$SCHEDULED_JOB_TYPE_TAG field absent")
scheduleInfo ?: throw IllegalArgumentException("$SCHEDULE_INFO_TAG field absent")

return ScheduledJobDoc(
id,
updatedTime,
createdTime,
tenant,
access,
jobType,
scheduleInfo,
enabled
)
}
}

/**
* create XContentBuilder from this object using [XContentFactory.jsonBuilder()]
* @param params XContent parameters
* @return created XContentBuilder object
*/
fun toXContent(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): XContentBuilder? {
return toXContent(XContentFactory.jsonBuilder(), params)
}

override fun writeTo(output: StreamOutput) {
output.writeString(id)
output.writeInstant(updatedTime)
output.writeInstant(createdTime)
output.writeString(tenant)
output.writeStringCollection(access)
output.writeEnum(jobType)
output.writeEnum(jobType) // jobType is read twice in constructor
output.writeOptionalWriteable(scheduleInfo)
output.writeBoolean(enabled)
}

/**
* {ref toXContent}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
builder!!
builder.startObject()
if (params?.paramAsBoolean(ID_FIELD, false) == true) {
builder.field(ID_FIELD, id)
}
builder.field(UPDATED_TIME_FIELD, updatedTime.toEpochMilli())
.field(CREATED_TIME_FIELD, createdTime.toEpochMilli())
.field(TENANT_FIELD, tenant)
if (params?.paramAsBoolean(ACCESS_LIST_FIELD, true) == true && access.isNotEmpty()) {
builder.field(ACCESS_LIST_FIELD, access)
}

builder.field(SCHEDULE_INFO_TAG)
schedule.toXContent(builder, ToXContent.EMPTY_PARAMS)

builder.field(SCHEDULED_JOB_TYPE_TAG, jobType)
.field(IS_ENABLED_TAG, enabled)
builder.endObject()
return builder
}

override fun getName(): String {
return "poc name" // TODO: placeholder e.g. metric.name
}

override fun getLastUpdateTime(): Instant {
return updatedTime
}

override fun getEnabledTime(): Instant {
return createdTime
}

override fun getSchedule(): Schedule {
return scheduleInfo
}

override fun isEnabled(): Boolean {
return enabled
}
}
Loading