diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 3917f2855f..0b780d0de9 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -21,6 +21,14 @@ public interface FlintClient { */ void createIndex(String indexName, FlintMetadata metadata); + /** + * Does Flint index with the given name exist + * + * @param indexName index name + * @return true if the index exists, otherwise false + */ + boolean exists(String indexName); + /** * Retrieve metadata in a Flint index. * diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 851c1bfb48..c4c53178c9 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -13,6 +13,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; @@ -42,7 +43,18 @@ public void createIndex(String indexName, FlintMetadata metadata) { request.mapping(buildIndexMapping(metadata)); client.indices().create(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to create Flint index", e); + } + } + + @Override + public boolean exists(String indexName) { + try (RestHighLevelClient client = createClient()) { + return client.indices() + .exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); } catch (IOException e) { + throw new IllegalStateException("Failed to check if Flint index exists", e); } } @@ -50,11 +62,12 @@ public void createIndex(String indexName, FlintMetadata metadata) { public FlintMetadata getIndexMetadata(String indexName) { try (RestHighLevelClient client = createClient()) { GetMappingsRequest request = new GetMappingsRequest().indices(indexName); - GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); + GetMappingsResponse response = + client.indices().getMapping(request, RequestOptions.DEFAULT); return parseIndexMapping(response.mappings().get(indexName)); - } catch (IOException e) { - return null; + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata", e); } } @@ -64,11 +77,13 @@ private RestHighLevelClient createClient() { } private Map buildIndexMapping(FlintMetadata metadata) { + // Convert from {"field": "int"} to {"field": {"type": "int"}} Map fieldTypes = metadata.getSchema().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> Map.of("type", entry.getValue()))); + return Map.of( "properties", fieldTypes, "_meta", metadata.getMeta()); @@ -77,6 +92,8 @@ private Map buildIndexMapping(FlintMetadata metadata) { @SuppressWarnings("unchecked") private FlintMetadata parseIndexMapping(MappingMetadata mapping) { Map source = mapping.getSourceAsMap(); + + // Parse {"field": {"type": "int"}} to {"field": "int"} Map schema = ((Map) source.get("properties")) .entrySet().stream() @@ -84,7 +101,6 @@ private FlintMetadata parseIndexMapping(MappingMetadata mapping) { Map.Entry::getKey, entry -> ((Map) entry.getValue()).get("type"))); - return new FlintMetadata(schema, - (Map) source.get("_meta")); + return new FlintMetadata(schema, (Map) source.get("_meta")); } } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 18e4863699..fe2ca14f5c 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -9,26 +9,36 @@ import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers import scala.collection.JavaConverters._ class FlintOpenSearchClientSuite extends AnyFlatSpec - with OpenSearchSuite { + with OpenSearchSuite + with Matchers { + /** Lazy initialize after container started. */ lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort) + behavior of "Flint OpenSearch client" + it should "create index successfully" in { + val indexName = "test" val schema = Map("age" -> "integer").asJava val meta = Map("index" -> Map("kind" -> "SkippingIndex").asJava.asInstanceOf[Object] ).asJava - flintClient.createIndex("test", new FlintMetadata(schema, meta)) + flintClient.createIndex(indexName, new FlintMetadata(schema, meta)) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName) should have ( + 'schema (schema), + 'meta (meta)) + } - val metadata = flintClient.getIndexMetadata("test") - metadata.getSchema shouldBe schema - metadata.getMeta shouldBe meta + it should "return false if index not exist" in { + flintClient.exists("non-exist-index") shouldBe false } }