From 4bdf40fab388e9901983e71ab6d9e10ea8d31b1e Mon Sep 17 00:00:00 2001
From: "opensearch-trigger-bot[bot]"
 <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Date: Mon, 23 Sep 2024 16:05:33 -0700
Subject: [PATCH] getAllIndexMetadata by pattern becomes optional (#682) (#688)

(cherry picked from commit 05e7f237082fef5db79d3d994fb46b7cf9595469)

Signed-off-by: Sean Kao <seankao@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
---
 .../metadata/FlintIndexMetadataService.java   |  14 +-
 .../opensearch/flint/core/FlintClient.java    |  10 ++
 .../core/storage/FlintOpenSearchClient.java   |  17 +++
 .../FlintOpenSearchIndexMetadataService.scala |   9 +-
 .../opensearch/flint/spark/FlintSpark.scala   |  19 ++-
 .../core/FlintOpenSearchClientSuite.scala     |  11 ++
 ...penSearchIndexMetadataServiceITSuite.scala |   9 +-
 .../FlintSparkIndexDescribeITSuite.scala      | 120 ++++++++++++++++++
 8 files changed, 195 insertions(+), 14 deletions(-)
 create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala

diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java
index b990998a9..a31fc9a78 100644
--- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java
+++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintIndexMetadataService.java
@@ -25,14 +25,22 @@ public interface FlintIndexMetadataService {
    */
   FlintMetadata getIndexMetadata(String indexName);
 
+  /**
+   * Whether the service supports retrieving metadata for Flint indexes by index pattern.
+   *
+   * @return true if supported, otherwise false
+   */
+  boolean supportsGetByIndexPattern();
+
   /**
    * Retrieve all metadata for Flint index whose name matches the given pattern.
+   * If get by index pattern is not supported, then the provided names must be full index names.
    *
-   * @param indexNamePattern index name pattern
-   * @return map where the keys are the matched index names, and the values are
+   * @param indexNamePatterns index full names or patterns
+   * @return map where the keys are the (matched) index names, and the values are
    *         corresponding index metadata
    */
-  Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);
+  Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePatterns);
 
   /**
    * Update metadata for a Flint index.
diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java
index 29b5f6de9..6ce344c3f 100644
--- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java
+++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java
@@ -8,6 +8,8 @@
 import org.opensearch.flint.common.metadata.FlintMetadata;
 import org.opensearch.flint.core.storage.FlintWriter;
 
+import java.util.List;
+
 /**
  * Flint index client that provides API for metadata and data operations
  * on a Flint index regardless of concrete storage.
@@ -30,6 +32,14 @@ public interface FlintClient {
    */
   boolean exists(String indexName);
 
+  /**
+   * Get all index names that match the given pattern.
+   *
+   * @param indexNamePatterns index name patterns
+   * @return list of index names
+   */
+  List<String> getIndexNames(String... indexNamePatterns);
+
   /**
    * Delete a Flint index.
    *
diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java
index ef97f65ac..da22e3751 100644
--- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java
+++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java
@@ -9,6 +9,7 @@
 import org.opensearch.client.RequestOptions;
 import org.opensearch.client.indices.CreateIndexRequest;
 import org.opensearch.client.indices.GetIndexRequest;
+import org.opensearch.client.indices.GetIndexResponse;
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.common.xcontent.XContentType;
 import org.opensearch.flint.common.metadata.FlintMetadata;
@@ -18,7 +19,10 @@
 import scala.Option;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 /**
  * Flint client implementation for OpenSearch storage.
@@ -65,6 +69,19 @@ public boolean exists(String indexName) {
     }
   }
 
+  @Override
+  public List<String> getIndexNames(String... indexNamePatterns) {
+    LOG.info("Getting Flint index names for pattern " + String.join(",", indexNamePatterns));
+    String[] osIndexNamePatterns = Arrays.stream(indexNamePatterns).map(this::sanitizeIndexName).toArray(String[]::new);
+    try (IRestHighLevelClient client = createClient()) {
+      GetIndexRequest request = new GetIndexRequest(osIndexNamePatterns);
+      GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);
+      return Arrays.stream(response.getIndices()).collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to get Flint index names for pattern " + String.join(", ", indexNamePatterns), e);
+    }
+  }
+
   @Override
   public void deleteIndex(String indexName) {
     LOG.info("Deleting Flint index " + indexName);
diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala
index fad2f1b63..765460da7 100644
--- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala
+++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala
@@ -46,9 +46,12 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions)
       }
   }
 
-  override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = {
-    logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePattern.mkString(",")}");
-    val indexNames = indexNamePattern.map(OpenSearchClientUtils.sanitizeIndexName)
+  override def supportsGetByIndexPattern(): Boolean = true
+
+  override def getAllIndexMetadata(
+      indexNamePatterns: String*): util.Map[String, FlintMetadata] = {
+    logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePatterns.mkString(",")}");
+    val indexNames = indexNamePatterns.map(OpenSearchClientUtils.sanitizeIndexName)
     var client: IRestHighLevelClient = null
     try {
       client = OpenSearchClientUtils.createClient(options)
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
index 88b8c38cc..fa0835d86 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
@@ -168,9 +168,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
   def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
     logInfo(s"Describing indexes with pattern $indexNamePattern")
     if (flintClient.exists(indexNamePattern)) {
-      flintIndexMetadataService
-        .getAllIndexMetadata(indexNamePattern)
-        .asScala
+      getAllIndexMetadata(indexNamePattern)
         .map { case (indexName, metadata) =>
           attachLatestLogEntry(indexName, metadata)
         }
@@ -368,6 +366,21 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
     }
   }
 
+  private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = {
+    if (flintIndexMetadataService.supportsGetByIndexPattern) {
+      flintIndexMetadataService
+        .getAllIndexMetadata(indexNamePattern)
+        .asScala
+        .toMap
+    } else {
+      val indexNames = flintClient.getIndexNames(indexNamePattern).asScala.toArray
+      flintIndexMetadataService
+        .getAllIndexMetadata(indexNames: _*)
+        .asScala
+        .toMap
+    }
+  }
+
   /**
    * Attaches latest log entry to metadata if available.
    *
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala
index 2dc6016b2..a2c2d26f6 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala
@@ -65,6 +65,17 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
     (settings \ "index.number_of_replicas").extract[String] shouldBe "2"
   }
 
+  it should "get all index names with the given index name pattern" in {
+    val metadata = FlintOpenSearchIndexMetadataService.deserialize(
+      """{"properties": {"test": { "type": "integer" } } }""")
+    flintClient.createIndex("flint_test_1_index", metadata)
+    flintClient.createIndex("flint_test_2_index", metadata)
+
+    val indexNames = flintClient.getIndexNames("flint_*_index")
+    indexNames should have size 2
+    indexNames should contain allOf ("flint_test_1_index", "flint_test_2_index")
+  }
+
   it should "convert index name to all lowercase" in {
     val indexName = "flint_ELB_logs_index"
     flintClient.createIndex(
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala
index c5bd75951..c7256013c 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchIndexMetadataServiceITSuite.scala
@@ -108,13 +108,12 @@ class FlintOpenSearchIndexMetadataServiceITSuite
 }
 
 class TestIndexMetadataService extends FlintIndexMetadataService {
-  override def getIndexMetadata(indexName: String): FlintMetadata = {
-    null
-  }
+  override def getIndexMetadata(indexName: String): FlintMetadata = null
 
-  override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = {
+  override def supportsGetByIndexPattern(): Boolean = true
+
+  override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] =
     null
-  }
 
   override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = {}
 
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala
new file mode 100644
index 000000000..594765b8a
--- /dev/null
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexDescribeITSuite.scala
@@ -0,0 +1,120 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.flint.spark
+
+import java.util
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
+import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService
+import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
+import org.scalatest.matchers.should.Matchers
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.flint.config.FlintSparkConf
+
+class FlintSparkIndexDescribeITSuite extends FlintSparkSuite with Matchers {
+
+  /** Test table and index name */
+  private val testTable = "spark_catalog.default.covering_sql_test"
+  private val testIndexMatch1 = "name_and_age_1"
+  private val testIndexMatch2 = "name_and_age_2"
+  private val testIndexOther = "address"
+  private val testFlintIndexMatch1 =
+    FlintSparkCoveringIndex.getFlintIndexName(testIndexMatch1, testTable)
+  private val testFlintIndexMatch2 =
+    FlintSparkCoveringIndex.getFlintIndexName(testIndexMatch2, testTable)
+  private val testFlintIndexOther =
+    FlintSparkCoveringIndex.getFlintIndexName(testIndexOther, testTable)
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+
+    createPartitionedAddressTable(testTable)
+
+    flint
+      .coveringIndex()
+      .name(testIndexMatch1)
+      .onTable(testTable)
+      .addIndexColumns("name", "age")
+      .create()
+
+    flint
+      .coveringIndex()
+      .name(testIndexMatch2)
+      .onTable(testTable)
+      .addIndexColumns("name", "age")
+      .create()
+
+    flint
+      .coveringIndex()
+      .name(testIndexOther)
+      .onTable(testTable)
+      .addIndexColumns("address")
+      .create()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+
+    // Delete all test indices
+    deleteTestIndex(testFlintIndexMatch1, testFlintIndexMatch2, testFlintIndexOther)
+    sql(s"DROP TABLE $testTable")
+  }
+
+  test("describe all indexes matching a pattern") {
+    val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("name_and_age_*", testTable)
+    val indexes = flint.describeIndexes(indexNamePattern)
+    indexes should have size 2
+    indexes.map(_.name) should contain allOf (testFlintIndexMatch1, testFlintIndexMatch2)
+  }
+
+  test(
+    "describe all indexes matching a pattern with custom index metadata service implementation without get-by-pattern support") {
+    setFlintSparkConf(
+      FlintSparkConf.CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS,
+      classOf[NoGetByPatternSupportIndexMetadataService].getName)
+    val testFlint = new FlintSpark(spark)
+    val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("name_and_age_*", testTable)
+    val indexes = testFlint.describeIndexes(indexNamePattern)
+    indexes should have size 2
+    indexes.map(_.name) should contain allOf (testFlintIndexMatch1, testFlintIndexMatch2)
+  }
+}
+
+class NoGetByPatternSupportIndexMetadataService extends FlintIndexMetadataService with Logging {
+
+  /**
+   * Cannot directly extend FlintOpenSearchIndexMetadataService because
+   * FlintIndexMetadataServiceBuilder expects custom implementation takes no arguments in its
+   * constructor
+   */
+  private val indexMetadataService = new FlintOpenSearchIndexMetadataService(
+    FlintSparkConf().flintOptions())
+
+  override def supportsGetByIndexPattern(): Boolean = false
+
+  /**
+   * Does not match index names by pattern. The input is expected to be a list of full index names
+   */
+  override def getAllIndexMetadata(indexNames: String*): util.Map[String, FlintMetadata] = {
+    logInfo(s"Fetching all Flint index metadata for indexes ${indexNames.mkString(",")}");
+    indexNames
+      .map(index => index -> getIndexMetadata(index))
+      .toMap
+      .asJava
+  }
+
+  override def getIndexMetadata(indexName: String): FlintMetadata =
+    indexMetadataService.getIndexMetadata(indexName)
+
+  override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit =
+    indexMetadataService.updateIndexMetadata(indexName, metadata)
+
+  override def deleteIndexMetadata(indexName: String): Unit =
+    indexMetadataService.deleteIndexMetadata(indexName)
+}