Skip to content

Commit

Permalink
Add jvm aware setting and max num docs settings for batching docs for…
Browse files Browse the repository at this point in the history
… percolate queries (opensearch-project#1435)

* add jvm aware and max docs settings for batching docs for percolate queries

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* fix stats logging

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add queryfieldnames field in findings mapping

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Mar 7, 2024
1 parent d1027da commit f5752b7
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.painless.spi.Allowlist
import org.opensearch.painless.spi.AllowlistLoader
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.painless.spi.Whitelist
import org.opensearch.painless.spi.WhitelistLoader
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ReloadablePlugin
Expand All @@ -125,8 +126,8 @@ import java.util.function.Supplier
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, PercolatorPluginExt() {

override fun getContextWhitelists(): Map<ScriptContext<*>, List<Whitelist>> {
val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt")
override fun getContextAllowlists(): Map<ScriptContext<*>, List<Allowlist>> {
val whitelist = AllowlistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt")
return mapOf(TriggerScript.CONTEXT to listOf(whitelist))
}

Expand Down Expand Up @@ -272,6 +273,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
Expand Down Expand Up @@ -329,6 +331,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_MAX_DOCS,
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool

Expand All @@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext(
var alertService: AlertService? = null,
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.core.action.ActionListener
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -134,6 +135,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService {
this.monitorCtx.jvmStats = jvmStats
return this
}

// Must be called after registerClusterService and registerSettings in AlertingPlugin
fun registerConsumers(): MonitorRunnerService {
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(
Expand Down Expand Up @@ -258,11 +264,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
}
}
is Monitor -> {
launch {
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
}
}
Expand Down Expand Up @@ -307,7 +321,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
} else {
QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query
* index in document level monitor execution.
*/
val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit",
10,
0,
100,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate
* query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current
* execution
*/
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
300000, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val INPUT_TIMEOUT = Setting.positiveTimeSetting(
"plugins.alerting.input_timeout",
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
executionId
)
} else if (delegateMonitor.isDocLevelMonitor()) {
return DocumentLevelMonitorRunner.runMonitor(
return DocumentLevelMonitorRunner().runMonitor(
delegateMonitor,
monitorCtx,
periodStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
},
"fields": {
"type": "text"
},
"query_field_names": {
"type": "keyword"
}
}
},
Expand Down

0 comments on commit f5752b7

Please sign in to comment.