Skip to content

Commit

Permalink
FAB-11177 Multithreaded test.
Browse files Browse the repository at this point in the history
Change-Id: I6ded1d27df0a5c4e0690dceb128bea2de4b49ed8
Signed-off-by: rickr <cr22rc@gmail.com>
  • Loading branch information
cr22rc committed Jul 16, 2018
1 parent c5c31b8 commit 4cf7f82
Show file tree
Hide file tree
Showing 8 changed files with 1,631 additions and 20 deletions.
15 changes: 12 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
<tag>fabric-sdk-java-1.0</tag>
</scm>
<properties>
<grpc.version>1.13.1</grpc.version><!-- CURRENT_GRPC_VERSION -->
<grpc.version>1.13.2</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.6.0</protobuf.version>
<bouncycastle.version>1.60</bouncycastle.version>
<httpclient.version>4.5.6</httpclient.version>
<skipITs>true</skipITs>
<alpn-boot-version>8.1.7.v20160121</alpn-boot-version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jacoco.version>0.7.9</jacoco.version>
<log4j.version>1.2.17</log4j.version>
<org.hyperledger.fabric.sdktest.ITSuite>IntegrationSuite.java</org.hyperledger.fabric.sdktest.ITSuite>
<gpg.executable>gpg2</gpg.executable>
</properties>
Expand Down Expand Up @@ -97,7 +98,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>4.1.25.Final</version>
<version>4.1.26.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
Expand Down Expand Up @@ -142,9 +143,17 @@
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<version>${log4j.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/log4j/apache-log4j-extras -->
<dependency>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
<version>${log4j.version}</version>
</dependency>


<!--<dependency>-->
<!--<groupId>org.apache.logging.log4j</groupId>-->
<!--<artifactId>log4j-core</artifactId>-->
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/hyperledger/fabric/sdk/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -3370,6 +3370,8 @@ private ProposalResponse sendProposalSerially(TransactionRequest proposalRequest

for (Peer peer : peers) {

proposalRequest.submitted = false;

try {

Collection<ProposalResponse> proposalResponses = sendProposal(proposalRequest, Collections.singletonList(peer));
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/hyperledger/fabric/sdk/Orderer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class Orderer implements Serializable {
private Channel channel;
private transient volatile OrdererClient ordererClient = null;
private transient byte[] clientTLSCertificateDigest;
private String channelName = "";

Orderer(String name, String url, Properties properties) throws InvalidArgumentException {

Expand Down Expand Up @@ -104,6 +105,7 @@ public String getUrl() {
void unsetChannel() {

channel = null;
channelName = "";

}

Expand All @@ -127,6 +129,7 @@ void setChannel(Channel channel) throws InvalidArgumentException {
}

this.channel = channel;
this.channelName = channel.getName();

}

Expand Down Expand Up @@ -192,6 +195,7 @@ synchronized void shutdown(boolean force) {
}
shutdown = true;
channel = null;
channelName = "";

if (ordererClient != null) {
OrdererClient torderClientDeliver = ordererClient;
Expand Down Expand Up @@ -219,6 +223,6 @@ protected void finalize() throws Throwable {

@Override
public String toString() {
return "Orderer: " + name + "(" + url + ")";
return "Orderer-" + channelName + "-" + name + "(" + url + ")";
}
} // end Orderer
101 changes: 86 additions & 15 deletions src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class OrdererClient {
private final ManagedChannelBuilder channelBuilder;
private boolean shutdown = false;
private static final Log logger = LogFactory.getLog(OrdererClient.class);
private static final boolean TRACELEVEL = logger.isTraceEnabled();
private ManagedChannel managedChannel = null;
private final String name;
private final String url;
Expand Down Expand Up @@ -76,7 +77,7 @@ class OrdererClient {
try {
tempOrdererWaitTimeMilliSecs = Long.parseLong(ordererWaitTimeMilliSecsString);
} catch (NumberFormatException e) {
logger.warn(format("Orderer %s wait time %s not parsable.", name, ordererWaitTimeMilliSecsString), e);
logger.warn(format("Orderer %s wait time %s not parsable.", this.toString(), ordererWaitTimeMilliSecsString), e);
}

ordererWaitTimeMilliSecs = tempOrdererWaitTimeMilliSecs;
Expand All @@ -90,6 +91,7 @@ synchronized void shutdown(boolean force) {
return;
}
shutdown = true;
logger.debug(format("Shutdown %s", this.toString()));
ManagedChannel lchannel = managedChannel;
managedChannel = null;
if (lchannel == null) {
Expand Down Expand Up @@ -117,21 +119,38 @@ public void finalize() {
}

Ab.BroadcastResponse sendTransaction(Common.Envelope envelope) throws Exception {
logger.trace(this.toString() + " OrdererClient.sendTransaction entered.");
StreamObserver<Common.Envelope> nso = null;

if (shutdown) {
throw new TransactionException("Orderer client is shutdown");
}

ManagedChannel lmanagedChannel = managedChannel;
if (TRACELEVEL && lmanagedChannel != null) {
logger.trace(format("%s managed channel isTerminated: %b, isShutdown: %b, state: %s", this.toString(),
lmanagedChannel.isTerminated(), lmanagedChannel.isShutdown(), lmanagedChannel.getState(false).name()));
}

if (lmanagedChannel == null || lmanagedChannel.isTerminated() || lmanagedChannel.isShutdown()) {

if (lmanagedChannel != null && lmanagedChannel.isTerminated()) {
logger.warn(format("%s managed channel was marked terminated", this.toString()));
}
if (lmanagedChannel != null && lmanagedChannel.isShutdown()) {
logger.warn(format("%s managed channel was marked shutdown.", this.toString()));
}

lmanagedChannel = channelBuilder.build();
managedChannel = lmanagedChannel;

}

if (TRACELEVEL && lmanagedChannel != null) {
logger.trace(format("%s managed channel isTerminated: %b, isShutdown: %b, state: %s", this.toString(),
lmanagedChannel.isTerminated(), lmanagedChannel.isShutdown(), lmanagedChannel.getState(false).name()));
}

try {
final CountDownLatch finishLatch = new CountDownLatch(1);
AtomicBroadcastGrpc.AtomicBroadcastStub broadcast = AtomicBroadcastGrpc.newStub(lmanagedChannel);
Expand All @@ -143,11 +162,11 @@ Ab.BroadcastResponse sendTransaction(Common.Envelope envelope) throws Exception
@Override
public void onNext(Ab.BroadcastResponse resp) {
// logger.info("Got Broadcast response: " + resp);
logger.debug("resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus());
logger.debug(this.toString() + " resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus());
if (resp.getStatus() == Common.Status.SUCCESS) {
ret[0] = resp;
} else {
throwable[0] = new TransactionException(format("Channel %s orderer %s status returned failure code %d (%s) during order registration",
throwable[0] = new TransactionException(format("Channel %s orderer %s status returned failure code %d (%s) during orderer next",
channelName, name, resp.getStatusValue(), resp.getStatus().name()));
}
finishLatch.countDown();
Expand All @@ -157,6 +176,17 @@ public void onNext(Ab.BroadcastResponse resp) {
@Override
public void onError(Throwable t) {
if (!shutdown) {
ManagedChannel lmanagedChannel = managedChannel;
managedChannel = null;
if (lmanagedChannel == null) {
logger.error(this.toString() + " managed channel was null.");

} else {

logger.error(format("%s managed channel isTerminated: %b, isShutdown: %b, state: %s", this.toString(),
lmanagedChannel.isTerminated(), lmanagedChannel.isShutdown(), lmanagedChannel.getState(false).name()));

}
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
Expand All @@ -166,6 +196,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
logger.trace(this.toString() + " onComplete received.");
finishLatch.countDown();
}
};
Expand All @@ -177,7 +208,7 @@ public void onCompleted() {
try {
if (!finishLatch.await(ordererWaitTimeMilliSecs, TimeUnit.MILLISECONDS)) {
TransactionException ste = new TransactionException(format("Channel %s, send transactions failed on orderer %s. Reason: timeout after %d ms.",
channelName, name, ordererWaitTimeMilliSecs));
channelName, this.toString(), ordererWaitTimeMilliSecs));
logger.error("sendTransaction error " + ste.getMessage(), ste);
throw ste;
}
Expand All @@ -190,14 +221,14 @@ public void onCompleted() {
}
//get full stack trace
TransactionException ste = new TransactionException(format("Channel %s, send transaction failed on orderer %s. Reason: %s",
channelName, name, throwable[0].getMessage()), throwable[0]);
logger.error("sendTransaction error " + ste.getMessage(), ste);
channelName, this.toString(), throwable[0].getMessage()), throwable[0]);
logger.error(this.toString() + "sendTransaction error " + ste.getMessage(), ste);
throw ste;
}
logger.debug("Done waiting for reply! Got:" + ret[0]);
logger.debug(this.toString() + "Done waiting for reply! Got:" + ret[0]);

} catch (InterruptedException e) {
logger.error(e);
logger.error(this.toString(), e);

}

Expand All @@ -223,21 +254,41 @@ public void onCompleted() {

DeliverResponse[] sendDeliver(Common.Envelope envelope) throws TransactionException {

logger.trace(this.toString() + " OrdererClient.sendDeliver entered.");

if (shutdown) {
throw new TransactionException("Orderer client is shutdown");
}

StreamObserver<Common.Envelope> nso = null;

ManagedChannel lmanagedChannel = managedChannel;
if (TRACELEVEL && lmanagedChannel != null) {
logger.trace(format("%s managed channel isTerminated: %b, isShutdown: %b, state: %s", this.toString(),
lmanagedChannel.isTerminated(), lmanagedChannel.isShutdown(), lmanagedChannel.getState(false).name()));
}

if (lmanagedChannel == null || lmanagedChannel.isTerminated() || lmanagedChannel.isShutdown()) {

if (lmanagedChannel != null && lmanagedChannel.isTerminated()) {
logger.warn(format("%s managed channel was marked terminated", this.toString()));
}
if (lmanagedChannel != null && lmanagedChannel.isShutdown()) {
logger.warn(format("%s managed channel was marked shutdown.", this.toString()));
}
lmanagedChannel = channelBuilder.build();
managedChannel = lmanagedChannel;

}

if (TRACELEVEL && lmanagedChannel != null) {
logger.trace(format("%s managed channel isTerminated: %b, isShutdown: %b, state: %s", this.toString(),
lmanagedChannel.isTerminated(), lmanagedChannel.isShutdown(), lmanagedChannel.getState(false).name()));
}
/*
return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated() && ConnectivityState.READY.equals(lchannel.getState(true));
*/

try {

AtomicBroadcastGrpc.AtomicBroadcastStub broadcast = AtomicBroadcastGrpc.newStub(lmanagedChannel);
Expand All @@ -254,9 +305,10 @@ DeliverResponse[] sendDeliver(Common.Envelope envelope) throws TransactionExcept
public void onNext(DeliverResponse resp) {

// logger.info("Got Broadcast response: " + resp);
logger.debug("resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus() + ", type case: " + resp.getTypeCase());
logger.debug(this.toString() + "sendDeliver resp status value: " + resp.getStatusValue() + ", resp: " + resp.getStatus() + ", type case: " + resp.getTypeCase());

if (done) {
logger.trace(this.toString() + " sendDeliver done!");
return;
}

Expand All @@ -275,6 +327,18 @@ public void onNext(DeliverResponse resp) {
@Override
public void onError(Throwable t) {
if (!shutdown) {

ManagedChannel lmanagedChannel = managedChannel;
managedChannel = null;
if (lmanagedChannel == null) {
logger.error(this.toString() + " managed channel was null.");

} else {

logger.error(format("%s managed channel isTerminated: %b, isShutdown: %b, state: %s", this.toString(),
lmanagedChannel.isTerminated(), lmanagedChannel.isShutdown(), lmanagedChannel.getState(false).name()));

}
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
Expand All @@ -284,7 +348,7 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
logger.trace("onCompleted");
logger.trace(this.toString() + "onCompleted.");
finishLatch.countDown();
}
};
Expand All @@ -296,43 +360,50 @@ public void onCompleted() {
try {
if (!finishLatch.await(ordererWaitTimeMilliSecs, TimeUnit.MILLISECONDS)) {
TransactionException ex = new TransactionException(format(
"Channel %s sendDeliver time exceeded for orderer %s, timed out at %d ms.", channelName, name, ordererWaitTimeMilliSecs));
"Channel %s sendDeliver time exceeded for orderer %s, timed out at %d ms.", channelName, this.toString(), ordererWaitTimeMilliSecs));
logger.error(ex.getMessage(), ex);
throw ex;
}
logger.trace("Done waiting for reply!");
logger.trace(this.toString() + " Done waiting for reply!");

} catch (InterruptedException e) {
logger.error(e);
logger.error(this.toString() + " " + e.getMessage(), e);
}

if (!throwableList.isEmpty()) {
Throwable throwable = throwableList.get(0);
TransactionException e = new TransactionException(format(
"Channel %s sendDeliver failed on orderer %s. Reason: %s", channelName, name, throwable.getMessage()), throwable);
"Channel %s sendDeliver failed on orderer %s. Reason: %s", channelName, this.toString(), throwable.getMessage()), throwable);
logger.error(e.getMessage(), e);
throw e;
}

return retList.toArray(new DeliverResponse[retList.size()]);
} catch (Throwable t) {
managedChannel = null;
logger.error(this.toString() + " received error " + t.getMessage(), t);
throw t;

} finally {
if (null != nso) {

try {
logger.debug(this.toString() + "completed.");
nso.onCompleted();
} catch (Exception e) { //Best effort only report on debug
logger.debug(format("Exception completing sendDeliver with channel %s, name %s, url %s %s",
channelName, name, url, e.getMessage()), e);
channelName, this.toString(), url, e.getMessage()), e);
}

}
}
}

@Override
public String toString() {
return "OrdererClient-" + channelName + "-" + name + "(" + url + ")";
}

boolean isChannelActive() {
ManagedChannel lchannel = managedChannel;
return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated() && ConnectivityState.READY.equals(lchannel.getState(true));
Expand Down
Loading

0 comments on commit 4cf7f82

Please sign in to comment.