From 03ac7681fac95926c63f029e9ccf1663cc914fbc Mon Sep 17 00:00:00 2001 From: Daniel Rohe Date: Thu, 14 Dec 2017 21:13:24 +0100 Subject: [PATCH] ISSUE-143 nested txn --- etc/scripts/run_etcd_docker.sh | 2 +- .../java/com/coreos/jetcd/kv/TxnResponse.java | 13 +++++ .../src/main/java/com/coreos/jetcd/op/Op.java | 51 ++++++++++++++++++- .../coreos/jetcd/internal/impl/KVTest.java | 27 ++++++++++ .../jetcd/internal/impl/TxnResponseTest.java | 7 +++ 5 files changed, 97 insertions(+), 3 deletions(-) diff --git a/etc/scripts/run_etcd_docker.sh b/etc/scripts/run_etcd_docker.sh index b377fc392..c53d874ce 100755 --- a/etc/scripts/run_etcd_docker.sh +++ b/etc/scripts/run_etcd_docker.sh @@ -15,7 +15,7 @@ # limitations under the License. # -ETCD_VERSION="v3.2" +ETCD_VERSION="v3.3" export SCRIPT_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" export ROOT=$SCRIPT_PATH/../../ diff --git a/jetcd-core/src/main/java/com/coreos/jetcd/kv/TxnResponse.java b/jetcd-core/src/main/java/com/coreos/jetcd/kv/TxnResponse.java index 960231823..f1a114039 100644 --- a/jetcd-core/src/main/java/com/coreos/jetcd/kv/TxnResponse.java +++ b/jetcd-core/src/main/java/com/coreos/jetcd/kv/TxnResponse.java @@ -19,6 +19,7 @@ import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_DELETE_RANGE; import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_PUT; import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_RANGE; +import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_TXN; import com.coreos.jetcd.data.AbstractResponse; import java.util.List; @@ -34,6 +35,7 @@ public class TxnResponse extends AbstractResponse putResponses; private List getResponses; private List deleteResponses; + private List txnResponses; public TxnResponse(com.coreos.jetcd.api.TxnResponse txnResponse) { @@ -88,4 +90,15 @@ public synchronized List getPutResponses() { return putResponses; } + + public synchronized List getTxnResponses() { + if (txnResponses == null) { + txnResponses = getResponse().getResponsesList().stream() + .filter((responseOp) -> responseOp.getResponseCase() == RESPONSE_TXN) + .map(responseOp -> new TxnResponse(responseOp.getResponseTxn())) + .collect(Collectors.toList()); + } + + return txnResponses; + } } diff --git a/jetcd-core/src/main/java/com/coreos/jetcd/op/Op.java b/jetcd-core/src/main/java/com/coreos/jetcd/op/Op.java index 75a1c67b1..c5ce34109 100644 --- a/jetcd-core/src/main/java/com/coreos/jetcd/op/Op.java +++ b/jetcd-core/src/main/java/com/coreos/jetcd/op/Op.java @@ -23,12 +23,15 @@ import com.coreos.jetcd.api.PutRequest; import com.coreos.jetcd.api.RangeRequest; import com.coreos.jetcd.api.RequestOp; +import com.coreos.jetcd.api.TxnRequest; import com.coreos.jetcd.data.ByteSequence; import com.coreos.jetcd.options.DeleteOption; import com.coreos.jetcd.options.GetOption; import com.coreos.jetcd.options.PutOption; import com.google.protobuf.ByteString; +import java.util.Arrays; + /** * Etcd Operation. */ @@ -38,7 +41,7 @@ public abstract class Op { * Operation type. */ public enum Type { - PUT, RANGE, DELETE_RANGE, + PUT, RANGE, DELETE_RANGE, TXN } protected final Type type; @@ -64,6 +67,10 @@ public static DeleteOp delete(ByteSequence key, DeleteOption option) { return new DeleteOp(ByteString.copyFrom(key.getBytes()), option); } + public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + return new TxnOp(cmps, thenOps, elseOps); + } + public static final class PutOp extends Op { private final ByteString value; @@ -135,4 +142,44 @@ RequestOp toRequestOp() { return RequestOp.newBuilder().setRequestDeleteRange(delete).build(); } } -} \ No newline at end of file + + public static final class TxnOp extends Op { + + private Cmp[] cmps; + + private Op[] thenOps; + + private Op[] elseOps; + + protected TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) { + super(Type.TXN, null); + this.cmps = cmps; + this.thenOps = thenOps; + this.elseOps = elseOps; + } + + RequestOp toRequestOp() { + TxnRequest.Builder txn = TxnRequest.newBuilder(); + + if (cmps != null) { + for (Cmp cmp : cmps) { + txn.addCompare(cmp.toCompare()); + } + } + + if (thenOps != null) { + for (Op thenOp : thenOps) { + txn.addSuccess(thenOp.toRequestOp()); + } + } + + if (elseOps != null) { + for (Op elseOp : elseOps) { + txn.addFailure(elseOp.toRequestOp()); + } + } + + return RequestOp.newBuilder().setRequestTxn(txn).build(); + } + } +} diff --git a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/KVTest.java b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/KVTest.java index da991d53a..51e9181df 100644 --- a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/KVTest.java +++ b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/KVTest.java @@ -194,4 +194,31 @@ public void testTxn() throws Exception { test.assertEquals(getResp.getKvs().size(), 1); test.assertEquals(getResp.getKvs().get(0).getValue().toStringUtf8(), putValue.toStringUtf8()); } + + @Test + public void testNestedTxn() throws Exception { + ByteSequence foo = ByteSequence.fromString("txn_foo"); + ByteSequence bar = ByteSequence.fromString("txn_bar"); + ByteSequence barz = ByteSequence.fromString("txn_barz"); + ByteSequence abc = ByteSequence.fromString("txn_abc"); + ByteSequence oneTwoThree = ByteSequence.fromString("txn_123"); + + Txn txn = kvClient.txn(); + Cmp cmp = new Cmp(foo, Cmp.Op.EQUAL, CmpTarget.version(0)); + CompletableFuture txnResp = txn.If(cmp) + .Then(Op.put(foo, bar, PutOption.DEFAULT), + Op.txn(null, + new Op[] {Op.put(abc, oneTwoThree, PutOption.DEFAULT)}, + null)) + .Else(Op.put(foo, barz, PutOption.DEFAULT)).commit(); + txnResp.get(); + + GetResponse getResp = kvClient.get(foo).get(); + test.assertEquals(getResp.getKvs().size(), 1); + test.assertEquals(getResp.getKvs().get(0).getValue().toStringUtf8(), bar.toStringUtf8()); + + GetResponse getResp2 = kvClient.get(abc).get(); + test.assertEquals(getResp2.getKvs().size(), 1); + test.assertEquals(getResp2.getKvs().get(0).getValue().toStringUtf8(), oneTwoThree.toStringUtf8()); + } } diff --git a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/TxnResponseTest.java b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/TxnResponseTest.java index 68a243cbe..5812e9184 100644 --- a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/TxnResponseTest.java +++ b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/TxnResponseTest.java @@ -37,6 +37,7 @@ public void setUp() { .setResponseDeleteRange(DeleteRangeResponse.getDefaultInstance())) .addResponses(ResponseOp.newBuilder() .setResponseRange(RangeResponse.getDefaultInstance())) + .addResponses(ResponseOp.newBuilder().setResponseTxn(com.coreos.jetcd.api.TxnResponse.getDefaultInstance())) .build(); txnResponse = new TxnResponse(response); } @@ -55,4 +56,10 @@ public void getPutResponsesTest() { public void getGetResponsesTest() { assertThat(txnResponse.getGetResponses().size()).isEqualTo(1); } + + @Test + public void getTxnResponsesTest() { + assertThat(txnResponse.getTxnResponses().size()).isEqualTo(1); + } + }