Skip to content

Commit

Permalink
Merge pull request #265 from danielrohe/master
Browse files Browse the repository at this point in the history
api:support nested txn
  • Loading branch information
xiang90 authored Jan 18, 2018
2 parents e3a4f38 + 03ac768 commit 82edd9f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 3 deletions.
2 changes: 1 addition & 1 deletion etc/scripts/run_etcd_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

ETCD_VERSION="v3.2"
ETCD_VERSION="v3.3"

export SCRIPT_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export ROOT=$SCRIPT_PATH/../../
Expand Down
13 changes: 13 additions & 0 deletions jetcd-core/src/main/java/com/coreos/jetcd/kv/TxnResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_DELETE_RANGE;
import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_PUT;
import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_RANGE;
import static com.coreos.jetcd.api.ResponseOp.ResponseCase.RESPONSE_TXN;

import com.coreos.jetcd.data.AbstractResponse;
import java.util.List;
Expand All @@ -34,6 +35,7 @@ public class TxnResponse extends AbstractResponse<com.coreos.jetcd.api.TxnRespon
private List<PutResponse> putResponses;
private List<GetResponse> getResponses;
private List<DeleteResponse> deleteResponses;
private List<TxnResponse> txnResponses;


public TxnResponse(com.coreos.jetcd.api.TxnResponse txnResponse) {
Expand Down Expand Up @@ -88,4 +90,15 @@ public synchronized List<PutResponse> getPutResponses() {

return putResponses;
}

public synchronized List<TxnResponse> getTxnResponses() {
if (txnResponses == null) {
txnResponses = getResponse().getResponsesList().stream()
.filter((responseOp) -> responseOp.getResponseCase() == RESPONSE_TXN)
.map(responseOp -> new TxnResponse(responseOp.getResponseTxn()))
.collect(Collectors.toList());
}

return txnResponses;
}
}
51 changes: 49 additions & 2 deletions jetcd-core/src/main/java/com/coreos/jetcd/op/Op.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import com.coreos.jetcd.api.PutRequest;
import com.coreos.jetcd.api.RangeRequest;
import com.coreos.jetcd.api.RequestOp;
import com.coreos.jetcd.api.TxnRequest;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.options.DeleteOption;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.PutOption;
import com.google.protobuf.ByteString;

import java.util.Arrays;

/**
* Etcd Operation.
*/
Expand All @@ -38,7 +41,7 @@ public abstract class Op {
* Operation type.
*/
public enum Type {
PUT, RANGE, DELETE_RANGE,
PUT, RANGE, DELETE_RANGE, TXN
}

protected final Type type;
Expand All @@ -64,6 +67,10 @@ public static DeleteOp delete(ByteSequence key, DeleteOption option) {
return new DeleteOp(ByteString.copyFrom(key.getBytes()), option);
}

public static TxnOp txn(Cmp[] cmps, Op[] thenOps, Op[] elseOps) {
return new TxnOp(cmps, thenOps, elseOps);
}

public static final class PutOp extends Op {

private final ByteString value;
Expand Down Expand Up @@ -135,4 +142,44 @@ RequestOp toRequestOp() {
return RequestOp.newBuilder().setRequestDeleteRange(delete).build();
}
}
}

public static final class TxnOp extends Op {

private Cmp[] cmps;

private Op[] thenOps;

private Op[] elseOps;

protected TxnOp(Cmp[] cmps, Op[] thenOps, Op[] elseOps) {
super(Type.TXN, null);
this.cmps = cmps;
this.thenOps = thenOps;
this.elseOps = elseOps;
}

RequestOp toRequestOp() {
TxnRequest.Builder txn = TxnRequest.newBuilder();

if (cmps != null) {
for (Cmp cmp : cmps) {
txn.addCompare(cmp.toCompare());
}
}

if (thenOps != null) {
for (Op thenOp : thenOps) {
txn.addSuccess(thenOp.toRequestOp());
}
}

if (elseOps != null) {
for (Op elseOp : elseOps) {
txn.addFailure(elseOp.toRequestOp());
}
}

return RequestOp.newBuilder().setRequestTxn(txn).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,31 @@ public void testTxn() throws Exception {
test.assertEquals(getResp.getKvs().size(), 1);
test.assertEquals(getResp.getKvs().get(0).getValue().toStringUtf8(), putValue.toStringUtf8());
}

@Test
public void testNestedTxn() throws Exception {
ByteSequence foo = ByteSequence.fromString("txn_foo");
ByteSequence bar = ByteSequence.fromString("txn_bar");
ByteSequence barz = ByteSequence.fromString("txn_barz");
ByteSequence abc = ByteSequence.fromString("txn_abc");
ByteSequence oneTwoThree = ByteSequence.fromString("txn_123");

Txn txn = kvClient.txn();
Cmp cmp = new Cmp(foo, Cmp.Op.EQUAL, CmpTarget.version(0));
CompletableFuture<com.coreos.jetcd.kv.TxnResponse> txnResp = txn.If(cmp)
.Then(Op.put(foo, bar, PutOption.DEFAULT),
Op.txn(null,
new Op[] {Op.put(abc, oneTwoThree, PutOption.DEFAULT)},
null))
.Else(Op.put(foo, barz, PutOption.DEFAULT)).commit();
txnResp.get();

GetResponse getResp = kvClient.get(foo).get();
test.assertEquals(getResp.getKvs().size(), 1);
test.assertEquals(getResp.getKvs().get(0).getValue().toStringUtf8(), bar.toStringUtf8());

GetResponse getResp2 = kvClient.get(abc).get();
test.assertEquals(getResp2.getKvs().size(), 1);
test.assertEquals(getResp2.getKvs().get(0).getValue().toStringUtf8(), oneTwoThree.toStringUtf8());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void setUp() {
.setResponseDeleteRange(DeleteRangeResponse.getDefaultInstance()))
.addResponses(ResponseOp.newBuilder()
.setResponseRange(RangeResponse.getDefaultInstance()))
.addResponses(ResponseOp.newBuilder().setResponseTxn(com.coreos.jetcd.api.TxnResponse.getDefaultInstance()))
.build();
txnResponse = new TxnResponse(response);
}
Expand All @@ -55,4 +56,10 @@ public void getPutResponsesTest() {
public void getGetResponsesTest() {
assertThat(txnResponse.getGetResponses().size()).isEqualTo(1);
}

@Test
public void getTxnResponsesTest() {
assertThat(txnResponse.getTxnResponses().size()).isEqualTo(1);
}

}

0 comments on commit 82edd9f

Please sign in to comment.