diff --git a/build.gradle b/build.gradle index 89878f8ea..6e36f5cf3 100644 --- a/build.gradle +++ b/build.gradle @@ -20,20 +20,31 @@ buildscript { version_tokens = opensearch_version.tokenize('-') opensearch_build = version_tokens[0] + '.0' job_scheduler_no_snapshot = opensearch_build + notifications_no_snapshot = opensearch_build if (buildVersionQualifier) { opensearch_build += "-${buildVersionQualifier}" job_scheduler_no_snapshot += "-${buildVersionQualifier}" + notifications_no_snapshot += "-${buildVersionQualifier}" } if (isSnapshot) { opensearch_build += "-SNAPSHOT" } opensearch_no_snapshot = opensearch_version.replace("-SNAPSHOT","") job_scheduler_resource_folder = "src/test/resources/job-scheduler" + + notifications_resource_folder = "src/test/resources/notifications" + notifications_core_resource_folder = "src/test/resources/notifications-core" // notification_version = System.getProperty("notification.version", opensearch_build) common_utils_version = System.getProperty("common_utils.version", opensearch_build) job_scheduler_version = System.getProperty("job_scheduler_version.version", opensearch_build) job_scheduler_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-job-scheduler-' + job_scheduler_no_snapshot + '.zip' + notifications_version = System.getProperty("notifications.version", opensearch_build) + notifications_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-' + notifications_no_snapshot + '.zip' + notifications_core_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-notifications-core-' + notifications_no_snapshot + '.zip' + kotlin_version = System.getProperty("kotlin.version", "1.6.10") } @@ -160,10 +171,9 @@ dependencies { implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' implementation "org.jetbrains:annotations:13.0" implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') - // implementation "org.opensearch:notification:${notification_version}" implementation "org.opensearch:common-utils:${common_utils_version}" implementation "com.github.seancfoley:ipaddress:5.3.3" - implementation "commons-codec:commons-codec:1.13" + implementation "commons-codec:commons-codec:${versions.commonscodec}" testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" @@ -286,6 +296,44 @@ testClusters.integTest { } })) + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$notifications_core_resource_folder").exists()) { + project.delete(files("$project.rootDir/$notifications_core_resource_folder")) + } + project.mkdir notifications_core_resource_folder + ant.get(src: notifications_core_build_download, + dest: notifications_core_resource_folder, + httpusecaches: false) + return fileTree(notifications_core_resource_folder).getSingleFile() + } + } + } + })) + + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$notifications_resource_folder").exists()) { + project.delete(files("$project.rootDir/$notifications_resource_folder")) + } + project.mkdir notifications_resource_folder + ant.get(src: notifications_build_download, + dest: notifications_resource_folder, + httpusecaches: false) + return fileTree(notifications_resource_folder).getSingleFile() + } + } + } + })) + if (securityEnabled) { plugin(provider({ new RegularFile() { @@ -591,6 +639,20 @@ testClusters.mixedCluster { } })) + node.plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree(notifications_core_resource_folder).getSingleFile() } + } + })) + + node.plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree(notifications_resource_folder).getSingleFile() } + } + })) + if (mixedClusterFlag && node.name == "mixedCluster-1") { node.plugin(provider({ new RegularFile() { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 9074db28a..a21efdd4a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -43,6 +43,7 @@ import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.action.TransitionsAction +import org.opensearch.indexmanagement.indexstatemanagement.model.ErrorNotification import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.Policy import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getManagedIndexMetadata @@ -69,6 +70,8 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.isSafeToChange import org.opensearch.indexmanagement.indexstatemanagement.util.isSuccessfulDelete import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataIndexRequest +import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification +import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification import org.opensearch.indexmanagement.indexstatemanagement.util.shouldBackoff import org.opensearch.indexmanagement.indexstatemanagement.util.shouldChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.util.updateDisableManagedIndexRequest @@ -778,9 +781,9 @@ object ManagedIndexRunner : private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) { policy.errorNotification?.run { errorNotificationRetryPolicy.retry(logger) { - withContext(Dispatchers.IO) { - // destination.publish(null, compileTemplate(messageTemplate, managedIndexMetaData), hostDenyList) - } + val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData) + destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client) + channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt index 7437cd24f..0fb483a98 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationAction.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.action import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination import org.opensearch.indexmanagement.indexstatemanagement.step.notification.AttemptNotificationStep import org.opensearch.indexmanagement.spi.indexstatemanagement.Action @@ -16,12 +17,15 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext import org.opensearch.script.Script class NotificationAction( - val destination: Destination, + val destination: Destination?, + val channel: Channel?, val messageTemplate: Script, index: Int ) : Action(name, index) { init { + require(destination != null || channel != null) { "Notification must contain a destination or channel" } + require(destination == null || channel == null) { "Notification can only contain a single destination or channel" } require(messageTemplate.lang == MUSTACHE) { "Notification message template must be a mustache script" } } @@ -36,13 +40,15 @@ class NotificationAction( override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { builder.startObject(type) - builder.field(DESTINATION_FIELD, destination) + if (destination != null) builder.field(DESTINATION_FIELD, destination) + if (channel != null) builder.field(CHANNEL_FIELD, channel) builder.field(MESSAGE_TEMPLATE_FIELD, messageTemplate) builder.endObject() } override fun populateAction(out: StreamOutput) { - destination.writeTo(out) + out.writeOptionalWriteable(destination) + out.writeOptionalWriteable(channel) messageTemplate.writeTo(out) out.writeInt(actionIndex) } @@ -50,6 +56,7 @@ class NotificationAction( companion object { const val name = "notification" const val DESTINATION_FIELD = "destination" + const val CHANNEL_FIELD = "channel" const val MESSAGE_TEMPLATE_FIELD = "message_template" const val MUSTACHE = "mustache" } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt index e5493cca7..9f351f15c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionParser.kt @@ -9,8 +9,10 @@ import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.CHANNEL_FIELD import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.DESTINATION_FIELD import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction.Companion.MESSAGE_TEMPLATE_FIELD +import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser @@ -18,15 +20,17 @@ import org.opensearch.script.Script class NotificationActionParser : ActionParser() { override fun fromStreamInput(sin: StreamInput): Action { - val destination = Destination(sin) + val destination = sin.readOptionalWriteable(::Destination) + val channel = sin.readOptionalWriteable(::Channel) val messageTemplate = Script(sin) val index = sin.readInt() - return NotificationAction(destination, messageTemplate, index) + return NotificationAction(destination, channel, messageTemplate, index) } override fun fromXContent(xcp: XContentParser, index: Int): Action { var destination: Destination? = null + var channel: Channel? = null var messageTemplate: Script? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -36,13 +40,15 @@ class NotificationActionParser : ActionParser() { when (fieldName) { DESTINATION_FIELD -> destination = Destination.parse(xcp) + CHANNEL_FIELD -> channel = Channel.parse(xcp) MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in NotificationAction.") } } return NotificationAction( - destination = requireNotNull(destination) { "NotificationAction destination is null" }, + destination = destination, + channel = channel, messageTemplate = requireNotNull(messageTemplate) { "NotificationAction message template is null" }, index = index ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt index 8d57f72c8..589ff9b0d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotification.kt @@ -14,47 +14,58 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination import org.opensearch.script.Script import java.io.IOException data class ErrorNotification( - val destination: Destination, + val destination: Destination?, + val channel: Channel?, val messageTemplate: Script ) : ToXContentObject, Writeable { init { + require(destination != null || channel != null) { "ErrorNotification must contain a destination or channel" } + require(destination == null || channel == null) { "ErrorNotification can only contain a single destination or channel" } require(messageTemplate.lang == MUSTACHE) { "ErrorNotification message template must be a mustache script" } } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { - return builder.startObject() - .field(DESTINATION_FIELD, destination) + builder.startObject() + if (destination != null) builder.field(DESTINATION_FIELD, destination) + if (channel != null) builder.field(CHANNEL_FIELD, channel) + return builder .field(MESSAGE_TEMPLATE_FIELD, messageTemplate) .endObject() } @Throws(IOException::class) constructor(sin: StreamInput) : this( - Destination(sin), + sin.readOptionalWriteable(::Destination), + sin.readOptionalWriteable(::Channel), Script(sin) ) @Throws(IOException::class) override fun writeTo(out: StreamOutput) { - destination.writeTo(out) + out.writeOptionalWriteable(destination) + out.writeOptionalWriteable(channel) messageTemplate.writeTo(out) } companion object { const val DESTINATION_FIELD = "destination" + const val CHANNEL_FIELD = "channel" const val MESSAGE_TEMPLATE_FIELD = "message_template" const val MUSTACHE = "mustache" + const val CHANNEL_TITLE = "Index Management-ISM-Error Notification" @JvmStatic @Throws(IOException::class) fun parse(xcp: XContentParser): ErrorNotification { var destination: Destination? = null + var channel: Channel? = null var messageTemplate: Script? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -63,14 +74,16 @@ data class ErrorNotification( xcp.nextToken() when (fieldName) { - DESTINATION_FIELD -> destination = Destination.parse(xcp) + DESTINATION_FIELD -> destination = if (xcp.currentToken() == Token.VALUE_NULL) null else Destination.parse(xcp) + CHANNEL_FIELD -> channel = if (xcp.currentToken() == Token.VALUE_NULL) null else Channel.parse(xcp) MESSAGE_TEMPLATE_FIELD -> messageTemplate = Script.parse(xcp, Script.DEFAULT_TEMPLATE_LANG) else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ErrorNotification.") } } return ErrorNotification( - destination = requireNotNull(destination) { "ErrorNotification destination is null" }, + destination = destination, + channel = channel, messageTemplate = requireNotNull(messageTemplate) { "ErrorNotification message template is null" } ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Channel.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Channel.kt new file mode 100644 index 000000000..8f28d32b1 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Channel.kt @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.model.destination + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException + +data class Channel(val id: String) : ToXContent, Writeable { + + init { + require(id.isNotEmpty()) { "Channel ID cannot be empty" } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(ID, id) + .endObject() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + } + + companion object { + const val ID = "id" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Channel { + var id: String? = null + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + ID -> id = xcp.text() + else -> { + throw IllegalStateException("Unexpected field: $fieldName, while parsing Channel destination") + } + } + } + + return Channel(requireNotNull(id) { "Channel ID is null" }) + } + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt index d0c2d75d3..c7e5102ce 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Chime.kt @@ -11,12 +11,9 @@ import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder -import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.common.xcontent.XContentType -import org.opensearch.indexmanagement.opensearchapi.string import java.io.IOException import java.lang.IllegalStateException @@ -69,12 +66,8 @@ data class Chime(val url: String) : ToXContent, Writeable { } } - fun constructMessageContent(subject: String?, message: String?): String { - val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message" - val builder = XContentFactory.contentBuilder(XContentType.JSON) - builder.startObject() - .field("Content", messageContent) - .endObject() - return builder.string() + // Complete JSON structure is now constructed in the notification plugin + fun constructMessageContent(subject: String?, message: String): String { + return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Destination.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Destination.kt index b1a20cf46..ef65b1173 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Destination.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Destination.kt @@ -6,12 +6,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.model.destination import org.apache.logging.log4j.LogManager -// import org.opensearch.alerting.destination.Notification -// import org.opensearch.alerting.destination.message.BaseMessage -// import org.opensearch.alerting.destination.message.ChimeMessage -// import org.opensearch.alerting.destination.message.CustomWebhookMessage -// import org.opensearch.alerting.destination.message.SlackMessage -// import org.opensearch.alerting.destination.response.DestinationResponse import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -21,7 +15,10 @@ import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -// import org.opensearch.indexmanagement.indexstatemanagement.util.isHostInDenylist +import org.opensearch.commons.destination.message.LegacyBaseMessage +import org.opensearch.commons.destination.message.LegacyChimeMessage +import org.opensearch.commons.destination.message.LegacyCustomWebhookMessage +import org.opensearch.commons.destination.message.LegacySlackMessage import org.opensearch.indexmanagement.opensearchapi.convertToMap import java.io.IOException @@ -105,41 +102,33 @@ data class Destination( } } -// @Throws(IOException::class) -// fun publish(compiledSubject: String?, compiledMessage: String, denyHostRanges: List): DestinationResponse { -// val destinationMessage: BaseMessage -// when (type) { -// DestinationType.CHIME -> { -// val messageContent = chime?.constructMessageContent(compiledSubject, compiledMessage) -// destinationMessage = ChimeMessage.Builder("chime_message") -// .withUrl(chime?.url) -// .withMessage(messageContent) -// .build() -// } -// DestinationType.SLACK -> { -// val messageContent = slack?.constructMessageContent(compiledSubject, compiledMessage) -// destinationMessage = SlackMessage.Builder("slack_message") -// .withUrl(slack?.url) -// .withMessage(messageContent) -// .build() -// } -// DestinationType.CUSTOM_WEBHOOK -> { -// destinationMessage = CustomWebhookMessage.Builder("custom_webhook") -// .withUrl(customWebhook?.url) -// .withScheme(customWebhook?.scheme) -// .withHost(customWebhook?.host) -// .withPort(customWebhook?.port) -// .withPath(customWebhook?.path) -// .withQueryParams(customWebhook?.queryParams) -// .withHeaderParams(customWebhook?.headerParams) -// .withMessage(compiledMessage).build() -// } -// } -// validateDestinationUri(destinationMessage, denyHostRanges) -// val response = Notification.publish(destinationMessage) as DestinationResponse -// logger.info("Message published for action type: $type, messageid: ${response.responseContent}, statuscode: ${response.statusCode}") -// return response -// } + @Throws(IOException::class) + fun buildLegacyBaseMessage(compiledSubject: String?, compiledMessage: String): LegacyBaseMessage { + val destinationMessage: LegacyBaseMessage + when (type) { + DestinationType.CHIME -> { + val messageContent = chime?.constructMessageContent(compiledSubject, compiledMessage) + destinationMessage = LegacyChimeMessage.Builder("chime_message") + .withUrl(chime?.url) + .withMessage(messageContent) + .build() + } + DestinationType.SLACK -> { + val messageContent = slack?.constructMessageContent(compiledSubject, compiledMessage) + destinationMessage = LegacySlackMessage.Builder("slack_message") + .withUrl(slack?.url) + .withMessage(messageContent) + .build() + } + DestinationType.CUSTOM_WEBHOOK -> { + destinationMessage = LegacyCustomWebhookMessage.Builder("custom_webhook") + .withUrl(getLegacyCustomWebhookMessageURL(customWebhook, compiledMessage)) + .withHeaderParams(customWebhook?.headerParams) + .withMessage(compiledMessage).build() + } + } + return destinationMessage + } fun constructResponseForDestinationType(type: DestinationType): Any { var content: Any? = null @@ -154,9 +143,15 @@ data class Destination( return content } -// private fun validateDestinationUri(destinationMessage: BaseMessage, denyHostRanges: List) { -// if (destinationMessage.isHostInDenylist(denyHostRanges)) { -// throw IllegalArgumentException("The destination address is invalid.") -// } -// } + private fun getLegacyCustomWebhookMessageURL(customWebhook: CustomWebhook?, compiledMessage: String): String { + return LegacyCustomWebhookMessage.Builder("custom_webhook") + .withUrl(customWebhook?.url) + .withScheme(customWebhook?.scheme) + .withHost(customWebhook?.host) + .withPort(customWebhook?.port) + .withPath(customWebhook?.path) + .withQueryParams(customWebhook?.queryParams) + .withMessage(compiledMessage) + .build().uri.toString() + } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt index 32cf40939..a3db82c97 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/destination/Slack.kt @@ -11,12 +11,9 @@ import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder -import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.common.xcontent.XContentType -import org.opensearch.indexmanagement.opensearchapi.string import java.io.IOException import java.lang.IllegalStateException @@ -69,12 +66,8 @@ data class Slack(val url: String) : ToXContent, Writeable { } } + // Complete JSON structure is now constructed in the notification plugin fun constructMessageContent(subject: String?, message: String): String { - val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message" - val builder = XContentFactory.contentBuilder(XContentType.JSON) - builder.startObject() - .field("text", messageContent) - .endObject() - return builder.string() + return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message" } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt index 6b48046e7..f531cbe90 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -5,11 +5,10 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.notification -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNotification +import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.spi.indexstatemanagement.Step import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData @@ -27,14 +26,11 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam override suspend fun execute(): Step { val context = this.context ?: return this val indexName = context.metadata.index - val hostDenyList = context.settings.getAsList(ManagedIndexSettings.HOST_DENY_LIST) val scriptService = context.scriptService try { - withContext(Dispatchers.IO) { - // action.destination.publish(null, compileTemplate(scriptService, action.messageTemplate, context.metadata), hostDenyList) - } - - // publish internally throws an error for any invalid responses so its safe to assume if we reach this point it was successful + val compiledMessage = compileTemplate(scriptService, action.messageTemplate, context.metadata) + action.destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(context.client) + action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage) // publish and send throws an error for any invalid responses so its safe to assume if we reach this point it was successful stepStatus = StepStatus.COMPLETED info = mapOf("message" to getSuccessMessage(indexName)) @@ -73,6 +69,7 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam companion object { const val name = "attempt_notification" + const val CHANNEL_TITLE = "Index Management-ISM-Notification Action" fun getFailedMessage(index: String) = "Failed to send notification [index=$index]" fun getSuccessMessage(index: String) = "Successfully sent notification [index=$index]" } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index 29d34155b..769b2d22a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -57,7 +57,6 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetry import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.search.builder.SearchSourceBuilder -// import java.net.InetAddress import java.time.Instant import java.time.temporal.ChronoUnit diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt new file mode 100644 index 000000000..db65accf0 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +@file:JvmName("NotificationUtils") +package org.opensearch.indexmanagement.indexstatemanagement.util + +import org.opensearch.OpenSearchStatusException +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.destination.message.LegacyBaseMessage +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest +import org.opensearch.commons.notifications.action.LegacyPublishNotificationResponse +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.model.ChannelMessage +import org.opensearch.commons.notifications.model.EventSource +import org.opensearch.commons.notifications.model.SeverityType +import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.rest.RestStatus + +/** + * Extension function for publishing a notification to a legacy destination. + * + * We now support the new channels from the Notification plugin. But, we still need to support + * the old embedded legacy destinations that are directly on the policies in the error notifications + * or notification actions. So we have a separate API in the NotificationsPluginInterface that allows + * us to publish these old legacy ones directly. + */ +suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) { + val baseMessage = this + val res: LegacyPublishNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.publishLegacyNotification( + (client as NodeClient), + LegacyPublishNotificationRequest(baseMessage), + it + ) + } + validateResponseStatus(RestStatus.fromCode(res.destinationResponse.statusCode), res.destinationResponse.responseContent) +} + +/** + * Extension function for publishing a notification to a channel in the Notification plugin. + */ +suspend fun Channel.sendNotification(client: Client, title: String, managedIndexMetaData: ManagedIndexMetaData, compiledMessage: String) { + val channel = this + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + managedIndexMetaData.getEventSource(title), + ChannelMessage(compiledMessage, null, null), + listOf(channel.id), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) +} + +fun ManagedIndexMetaData.getEventSource(title: String): EventSource { + return EventSource(title, indexUuid, SeverityType.INFO) +} + +/** + * all valid response status + */ +private val VALID_RESPONSE_STATUS = setOf( + RestStatus.OK.status, RestStatus.CREATED.status, RestStatus.ACCEPTED.status, + RestStatus.NON_AUTHORITATIVE_INFORMATION.status, RestStatus.NO_CONTENT.status, + RestStatus.RESET_CONTENT.status, RestStatus.PARTIAL_CONTENT.status, + RestStatus.MULTI_STATUS.status +) + +@Throws(OpenSearchStatusException::class) +fun validateResponseStatus(restStatus: RestStatus, responseContent: String) { + if (!VALID_RESPONSE_STATUS.contains(restStatus.status)) { + throw OpenSearchStatusException("Failed: $responseContent", restStatus) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 4288c88cf..726df1adf 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -39,6 +39,7 @@ import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User +import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate @@ -222,6 +223,20 @@ suspend fun LockService.suspendUntil(block: LockService.(ActionListener) }) } +/** + * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. + */ +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + fun Throwable.findRemoteTransportException(): RemoteTransportException? { if (this is RemoteTransportException) return this return this.cause?.findRemoteTransportException() diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index c83df1a68..bf2b61e1c 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -118,6 +118,13 @@ } } }, + "channel": { + "properties": { + "id": { + "type": "keyword" + } + } + }, "message_template": { "type": "object", "enabled": false @@ -304,6 +311,13 @@ } } }, + "channel": { + "properties": { + "id": { + "type": "keyword" + } + } + }, "message_template": { "type": "object", "enabled": false diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 7ddadd7b9..bab64d0b5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -37,6 +37,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.StateFilter import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Chime import org.opensearch.indexmanagement.indexstatemanagement.model.destination.CustomWebhook import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination @@ -186,7 +187,7 @@ fun randomNotificationActionConfig( messageTemplate: Script = randomTemplateScript("random message"), index: Int = 0 ): NotificationAction { - return NotificationAction(destination, messageTemplate, index) + return NotificationAction(destination, null, messageTemplate, index) } fun randomAllocationActionConfig(require: Map = emptyMap(), exclude: Map = emptyMap(), include: Map = emptyMap()): AllocationAction { @@ -242,9 +243,11 @@ fun randomCustomWebhook(): CustomWebhook { } fun randomTemplateScript( - source: String, - params: Map = emptyMap() -): Script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, source, params) + source: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + params: Map = emptyMap(), + scriptType: ScriptType = ScriptType.INLINE, + lang: String = Script.DEFAULT_TEMPLATE_LANG +): Script = Script(scriptType, lang, source, params) fun randomSnapshotActionConfig(repository: String = "repo", snapshot: String = "sp"): SnapshotAction { return SnapshotAction(repository, snapshot, index = 0) @@ -366,6 +369,10 @@ fun randomISMTemplate( ) } +fun randomChannel(id: String = OpenSearchRestTestCase.randomAlphaOfLength(10)): Channel { + return Channel(id = id) +} + fun Policy.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder).string() @@ -476,6 +483,11 @@ fun ISMTemplate.toJsonString(): String { return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() } +fun Channel.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() +} + @Suppress("RethrowCaughtException") fun wait( timeout: Instant = Instant.ofEpochSecond(10), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.kt index e90790192..8b7aa558b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/NotificationActionIT.kt @@ -23,6 +23,7 @@ import java.util.Locale class NotificationActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + // TODO: this seems to have broken with the notification plugin // cannot test chime/slack in integ tests, but can test a custom webhook by // using the POST call to write to the local integTest cluster and verify that index has 1 doc @Suppress("UNCHECKED_CAST") @@ -48,7 +49,7 @@ class NotificationActionIT : IndexStateManagementRestTestCase() { ) ) val messageTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{ \"testing\": 5 }", emptyMap()) - val actionConfig = NotificationAction(destination = destination, messageTemplate = messageTemplate, index = 0) + val actionConfig = NotificationAction(destination = destination, channel = null, messageTemplate = messageTemplate, index = 0) val states = listOf(State(name = "NotificationState", actions = listOf(actionConfig), transitions = emptyList())) val policy = Policy( id = policyID, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt index 23684072e..0119a24ba 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ActionTests.kt @@ -17,10 +17,13 @@ import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction +import org.opensearch.indexmanagement.indexstatemanagement.action.NotificationAction import org.opensearch.indexmanagement.indexstatemanagement.randomAllocationActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomChannel import org.opensearch.indexmanagement.indexstatemanagement.randomByteSizeValue import org.opensearch.indexmanagement.indexstatemanagement.randomCloseActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomDeleteActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomDestination import org.opensearch.indexmanagement.indexstatemanagement.randomForceMergeActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomIndexPriorityActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomNotificationActionConfig @@ -32,6 +35,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.randomRolloverActionC import org.opensearch.indexmanagement.indexstatemanagement.randomRollupActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomShrinkAction import org.opensearch.indexmanagement.indexstatemanagement.randomSnapshotActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomTemplateScript import org.opensearch.indexmanagement.indexstatemanagement.randomTimeValueObject import org.opensearch.indexmanagement.indexstatemanagement.util.getFreeBytesThresholdHigh import org.opensearch.indexmanagement.opensearchapi.convertToMap @@ -96,6 +100,12 @@ class ActionTests : OpenSearchTestCase() { roundTripAction(randomReadOnlyActionConfig()) } + fun `test notification having both channel and destination fails`() { + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for notification using both destination and channel") { + NotificationAction(destination = randomDestination(), channel = randomChannel(), messageTemplate = randomTemplateScript(), index = 0) + } + } + fun `test rollover action round trip`() { roundTripAction(randomRolloverActionConfig()) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotificationTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotificationTests.kt new file mode 100644 index 000000000..0a2ddf546 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ErrorNotificationTests.kt @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.model + +import org.opensearch.indexmanagement.indexstatemanagement.randomChannel +import org.opensearch.indexmanagement.indexstatemanagement.randomDestination +import org.opensearch.indexmanagement.indexstatemanagement.randomTemplateScript +import org.opensearch.test.OpenSearchTestCase +import kotlin.test.assertFailsWith + +class ErrorNotificationTests : OpenSearchTestCase() { + fun `test error notification having both channel and destination fails`() { + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for error notification using both destination and channel") { + ErrorNotification(destination = randomDestination(), channel = randomChannel(), messageTemplate = randomTemplateScript()) + } + } + + fun `test error notification having neither channel or destination fails`() { + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for error notification having neither destination or channel") { + ErrorNotification(destination = null, channel = null, messageTemplate = randomTemplateScript()) + } + } + + fun `test error notification having non mustache lang fails`() { + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for error notification with non mustache lang") { + ErrorNotification(destination = randomDestination(), channel = null, messageTemplate = randomTemplateScript(lang = "painless")) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt index 1fe960408..9d9aaf2f5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/XContentTests.kt @@ -10,10 +10,12 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import org.opensearch.indexmanagement.indexstatemanagement.action.RollupAction +import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Channel import org.opensearch.indexmanagement.indexstatemanagement.model.destination.DestinationType import org.opensearch.indexmanagement.indexstatemanagement.nonNullRandomConditions import org.opensearch.indexmanagement.indexstatemanagement.randomAllocationActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy +import org.opensearch.indexmanagement.indexstatemanagement.randomChannel import org.opensearch.indexmanagement.indexstatemanagement.randomCloseActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomDeleteActionConfig import org.opensearch.indexmanagement.indexstatemanagement.randomDestination @@ -262,6 +264,14 @@ class XContentTests : OpenSearchTestCase() { assertEquals("Round tripping ChangePolicy doesn't work", changePolicy, parsedChangePolicy) } + fun `test channel parsing`() { + val channel = randomChannel() + + val channelString = channel.toJsonString() + val parsedChannel = Channel.parse(parser(channelString)) + assertEquals("Round tripping Channel doesn't work", channel, parsedChannel) + } + private fun parser(xc: String): XContentParser { val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc) parser.nextToken() diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 2e08c057c..207ad3845 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -6,8 +6,6 @@ package org.opensearch.indexmanagement.indexstatemanagement.util import org.opensearch.action.delete.DeleteRequest -// import org.opensearch.alerting.destination.message.BaseMessage -// import org.opensearch.alerting.destination.message.CustomWebhookMessage import org.opensearch.common.bytes.BytesReference import org.opensearch.common.unit.ByteSizeValue import org.opensearch.common.unit.TimeValue @@ -15,6 +13,8 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.destination.message.LegacyBaseMessage +import org.opensearch.commons.destination.message.LegacyCustomWebhookMessage import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions @@ -283,30 +283,6 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { ) } -// fun `test ips in denylist`() { -// val ips = listOf( -// "127.0.0.1", // 127.0.0.0/8 -// "10.0.0.1", // 10.0.0.0/8 -// "10.11.12.13", // 10.0.0.0/8 -// "172.16.0.1", // "172.16.0.0/12" -// "192.168.0.1", // 192.168.0.0/16" -// "0.0.0.1", // 0.0.0.0/8 -// "9.9.9.9" -// ) -// for (ip in ips) { -// val bm = createMessageWithHost(ip) -// assertEquals(true, bm.isHostInDenylist(HOST_DENY_LIST)) -// } -// } - -// fun `test url in denylist`() { -// val urls = listOf("https://www.amazon.com", "https://mytest.com", "https://mytest.com") -// for (url in urls) { -// val bm = createMessageWithURl(url) -// assertEquals(false, bm.isHostInDenylist(HOST_DENY_LIST)) -// } -// } - private fun contentParser(bytesReference: BytesReference): XContentParser { return XContentHelper.createParser( xContentRegistry(), LoggingDeprecationHandler.INSTANCE, @@ -314,30 +290,21 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { ) } - private val HOST_DENY_LIST = listOf( - "127.0.0.0/8", - "10.0.0.0/8", - "172.16.0.0/12", - "192.168.0.0/16", - "0.0.0.0/8", - "9.9.9.9" // ip - ) - -// private fun createMessageWithHost(host: String): BaseMessage { -// return CustomWebhookMessage.Builder("abc") -// .withHost(host) -// .withPath("incomingwebhooks/383c0e2b-d028-44f4-8d38-696754bc4574") -// .withMessage("{\"Content\":\"Message test\"}") -// .withMethod("POST") -// .withQueryParams(HashMap()).build() -// } -// -// private fun createMessageWithURl(url: String): BaseMessage { -// return CustomWebhookMessage.Builder("abc") -// .withUrl(url) -// .withPath("incomingwebhooks/383c0e2b-d028-44f4-8d38-696754bc4574") -// .withMessage("{\"Content\":\"Message test\"}") -// .withMethod("POST") -// .withQueryParams(HashMap()).build() -// } + private fun createMessageWithHost(host: String): LegacyBaseMessage { + return LegacyCustomWebhookMessage.Builder("abc") + .withHost(host) + .withPath("incomingwebhooks/383c0e2b-d028-44f4-8d38-696754bc4574") + .withMessage("{\"Content\":\"Message test\"}") + .withMethod("POST") + .withQueryParams(HashMap()).build() + } + + private fun createMessageWithURl(url: String): LegacyBaseMessage { + return LegacyCustomWebhookMessage.Builder("abc") + .withUrl(url) + .withPath("incomingwebhooks/383c0e2b-d028-44f4-8d38-696754bc4574") + .withMessage("{\"Content\":\"Message test\"}") + .withMethod("POST") + .withQueryParams(HashMap()).build() + } } diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index c83df1a68..bf2b61e1c 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -118,6 +118,13 @@ } } }, + "channel": { + "properties": { + "id": { + "type": "keyword" + } + } + }, "message_template": { "type": "object", "enabled": false @@ -304,6 +311,13 @@ } } }, + "channel": { + "properties": { + "id": { + "type": "keyword" + } + } + }, "message_template": { "type": "object", "enabled": false