Skip to content

Commit

Permalink
Handle exception in Flint client
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed May 16, 2023
1 parent e2ba9d0 commit c0ca5f9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public interface FlintClient {
*/
void createIndex(String indexName, FlintMetadata metadata);

/**
* Does Flint index with the given name exist
*
* @param indexName index name
* @return true if the index exists, otherwise false
*/
boolean exists(String indexName);

/**
* Retrieve metadata in a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -42,19 +43,31 @@ public void createIndex(String indexName, FlintMetadata metadata) {
request.mapping(buildIndexMapping(metadata));

client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index", e);
}
}

@Override
public boolean exists(String indexName) {
try (RestHighLevelClient client = createClient()) {
return client.indices()
.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists", e);
}
}

@Override
public FlintMetadata getIndexMetadata(String indexName) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexName);
GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
GetMappingsResponse response =
client.indices().getMapping(request, RequestOptions.DEFAULT);

return parseIndexMapping(response.mappings().get(indexName));
} catch (IOException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata", e);
}
}

Expand All @@ -64,11 +77,13 @@ private RestHighLevelClient createClient() {
}

private Map<String, Object> buildIndexMapping(FlintMetadata metadata) {
// Convert from {"field": "int"} to {"field": {"type": "int"}}
Map<String, Object> fieldTypes =
metadata.getSchema().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> Map.of("type", entry.getValue())));

return Map.of(
"properties", fieldTypes,
"_meta", metadata.getMeta());
Expand All @@ -77,14 +92,15 @@ private Map<String, Object> buildIndexMapping(FlintMetadata metadata) {
@SuppressWarnings("unchecked")
private FlintMetadata parseIndexMapping(MappingMetadata mapping) {
Map<String, Object> source = mapping.getSourceAsMap();

// Parse {"field": {"type": "int"}} to {"field": "int"}
Map<String, String> schema =
((Map<String, Object>) source.get("properties"))
.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> ((Map<String, String>) entry.getValue()).get("type")));

return new FlintMetadata(schema,
(Map<String, Object>) source.get("_meta"));
return new FlintMetadata(schema, (Map<String, Object>) source.get("_meta"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,36 @@ import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters._

class FlintOpenSearchClientSuite
extends AnyFlatSpec
with OpenSearchSuite {
with OpenSearchSuite
with Matchers {

/** Lazy initialize after container started. */
lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort)

behavior of "Flint OpenSearch client"

it should "create index successfully" in {
val indexName = "test"
val schema = Map("age" -> "integer").asJava
val meta =
Map("index" ->
Map("kind" -> "SkippingIndex").asJava.asInstanceOf[Object]
).asJava
flintClient.createIndex("test", new FlintMetadata(schema, meta))
flintClient.createIndex(indexName, new FlintMetadata(schema, meta))

flintClient.exists(indexName) shouldBe true
flintClient.getIndexMetadata(indexName) should have (
'schema (schema),
'meta (meta))
}

val metadata = flintClient.getIndexMetadata("test")
metadata.getSchema shouldBe schema
metadata.getMeta shouldBe meta
it should "return false if index not exist" in {
flintClient.exists("non-exist-index") shouldBe false
}
}

0 comments on commit c0ca5f9

Please sign in to comment.