Skip to content

Commit

Permalink
Add no transaction class to avoid impact on existing IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Oct 30, 2023
1 parent 0bc782f commit 51689b1
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* initial log (precondition)
* => transient log (with pending operation to do)
* => final log (after operation succeeds)
* For example, "empty" => creating (operation is to create index) => created
* For example, "empty" => creating (operation is to create index) => active
*
* @param <T> result type
*/
Expand Down Expand Up @@ -46,4 +46,29 @@ public interface OptimisticTransaction<T> {
* @return result
*/
T execute(Supplier<T> operation);

/**
* No optimistic transaction.
*/
class NoOptimisticTransaction<T> implements OptimisticTransaction<T> {
@Override
public OptimisticTransaction<T> initialLog(Predicate<FlintMetadataLogEntry> initialCondition) {
return this;
}

@Override
public OptimisticTransaction<T> transientLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public OptimisticTransaction<T> finalLog(Function<FlintMetadataLogEntry, FlintMetadataLogEntry> action) {
return this;
}

@Override
public T execute(Supplier<T> operation) {
return operation.get();
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.flint.core.auth.AWSRequestSigningApacheInterceptor;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -70,7 +71,16 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override public <T> OptimisticTransaction<T> startTransaction(String indexName) {
return new OpenSearchOptimisticTransaction<>(indexName, this);
String metaLogIndexName = ".query_request_history_mys3";
try (RestHighLevelClient client = createClient()) {
if (client.indices().exists(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
return new OpenSearchOptimisticTransaction<>(this, indexName, metaLogIndexName);
} else {
return new NoOptimisticTransaction<>();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
}

@Override public void createIndex(String indexName, FlintMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core.storage;

import java.io.IOException;
import java.util.Base64;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -39,7 +40,7 @@ public class OpenSearchOptimisticTransaction<T> implements OptimisticTransaction
/**
* Reuse query request index as Flint metadata log store
*/
private final String metadataLogIndexName = ".query_request_history_mys3"; // TODO: get suffix ds name from Spark conf
private final String metadataLogIndexName;

/**
* Doc id for latest log entry (Naming rule is static so no need to query Flint index metadata)
Expand All @@ -50,9 +51,10 @@ public class OpenSearchOptimisticTransaction<T> implements OptimisticTransaction
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> transientAction = null;
private Function<FlintMetadataLogEntry, FlintMetadataLogEntry> finalAction = null;

public OpenSearchOptimisticTransaction(String flintIndexName, FlintClient flintClient) {
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
public OpenSearchOptimisticTransaction(FlintClient flintClient, String flintIndexName, String metadataLogIndexName) {
this.flintClient = flintClient;
this.latestId = Base64.getEncoder().encodeToString(flintIndexName.getBytes());
this.metadataLogIndexName = metadataLogIndexName;
}

@Override
Expand All @@ -79,16 +81,22 @@ public T execute(Supplier<T> operation) {
Objects.requireNonNull(transientAction);
Objects.requireNonNull(finalAction);

FlintMetadataLogEntry latest = getLatestLogEntry();
if (initialCondition.test(latest)) {
// TODO: log entry can be same?
createOrUpdateDoc(transientAction.apply(latest));
T result = operation.get();
// TODO: don't get latest, use previous entry again. (set seqno in update)
createOrUpdateDoc(finalAction.apply(getLatestLogEntry()));
return result;
} else {
throw new IllegalStateException();
try {
FlintMetadataLogEntry latest = getLatestLogEntry();
if (initialCondition.test(latest)) {
// TODO: log entry can be same?
createOrUpdateDoc(transientAction.apply(latest));

T result = operation.get();

// TODO: don't get latest, use previous entry again. (set seqno in update)
createOrUpdateDoc(finalAction.apply(getLatestLogEntry()));
return result;
} else {
throw new IllegalStateException();
}
} catch (IOException e) {
throw new RuntimeException();
}
}

Expand All @@ -101,15 +109,15 @@ private FlintMetadataLogEntry getLatestLogEntry() {
response.getSeqNo(),
response.getPrimaryTerm(),
response.getSourceAsMap());
} catch (Exception e) {
} catch (Exception e) { // TODO: resource not found exception?
return new FlintMetadataLogEntry("", -1, -1, "empty", "mys3", "");
}
}

private void createOrUpdateDoc(FlintMetadataLogEntry logEntry) {
private void createOrUpdateDoc(FlintMetadataLogEntry logEntry) throws IOException {
try (RestHighLevelClient client = flintClient.createClient()) {
DocWriteResponse response;
if (logEntry.id().isEmpty()) {
if (logEntry.id().isEmpty()) { // TODO: Only create before initialLog for the first time
logEntry.id_$eq(latestId);
response = client.index(
new IndexRequest()
Expand All @@ -131,8 +139,6 @@ private void createOrUpdateDoc(FlintMetadataLogEntry logEntry) {
// Update seqNo and primaryTerm in log entry object
logEntry.seqNo_$eq(response.getSeqNo()); // TODO: convert log entry to Java class?
logEntry.primaryTerm_$eq(response.getPrimaryTerm());
} catch (Exception e) {
throw new RuntimeException(e); // TODO: handle expression properly
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.rest_client.RestClientTransport
import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NoOptimisticTransaction
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -30,6 +31,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M

behavior of "Flint OpenSearch client"

it should "start no optimistic transaction if metadata log index doesn't exists" in {
val transaction = flintClient.startTransaction("test")
transaction shouldBe a[NoOptimisticTransaction[AnyRef]]
}

it should "create index successfully" in {
val indexName = "test"
val content =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConv

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.get.GetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.update.UpdateRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.FlintSparkSuite
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -43,21 +47,41 @@ class FlintOpenSearchTransactionITSuite extends FlintSparkSuite with Matchers {
super.afterEach()
}

test("normal transition from initial to transient to final log") {
test("should transit from initial to final if initial is empty") {
flintClient
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.state shouldBe "empty"
true
})
.transientLog(latest => latest.copy(state = "creating"))
.finalLog(latest => latest.copy(state = "created"))
.finalLog(latest => latest.copy(state = "active"))
.execute(() => {
latestLogEntry should contain("state" -> "creating")
null
})

latestLogEntry should contain("state" -> "created")
latestLogEntry should contain("state" -> "active")
}

test("should transit from initial to final if initial is not empty but meet precondition") {
// Create doc first to simulate this scenario
createLatestLogEntry(FlintMetadataLogEntry(id = testLatestId, state = "active"))

flintClient
.startTransaction(testFlintIndex)
.initialLog(latest => {
latest.state shouldBe "active"
true
})
.transientLog(latest => latest.copy(state = "refreshing"))
.finalLog(latest => latest.copy(state = "active"))
.execute(() => {
latestLogEntry should contain("state" -> "refreshing")
null
})

latestLogEntry should contain("state" -> "active")
}

test("should exit if initial log entry doesn't meet precondition") {
Expand All @@ -71,10 +95,42 @@ class FlintOpenSearchTransactionITSuite extends FlintSparkSuite with Matchers {
}
}

ignore("should exit if initial log entry updated by others when adding transient log entry") {
the[IllegalStateException] thrownBy {
flintClient
.startTransaction(testFlintIndex)
.initialLog(_ => true)
.transientLog(latest => {
// This update will happen first and thus cause version conflict
updateLatestLogEntry(latest, "deleting")

latest.copy(state = "creating")
})
.finalLog(latest => latest)
.execute(() => {})
}
}

private def latestLogEntry: Map[String, AnyRef] = {
val response = openSearchClient
.get(new GetRequest(testMetadataLogIndex, testLatestId), RequestOptions.DEFAULT)

Option(response.getSourceAsMap).getOrElse(Collections.emptyMap()).asScala.toMap
}

private def createLatestLogEntry(latest: FlintMetadataLogEntry): Unit = {
openSearchClient.index(
new IndexRequest()
.index(testMetadataLogIndex)
.id(testLatestId)
.source(latest.toJson, XContentType.JSON),
RequestOptions.DEFAULT)
}

private def updateLatestLogEntry(latest: FlintMetadataLogEntry, newState: String): Unit = {
openSearchClient.update(
new UpdateRequest(testMetadataLogIndex, testLatestId)
.doc(latest.copy(state = newState).toJson, XContentType.JSON),
RequestOptions.DEFAULT)
}
}

0 comments on commit 51689b1

Please sign in to comment.