From c50a329421d934b27bf15b3f713f3d2becf9d186 Mon Sep 17 00:00:00 2001 From: Srinivasan Muralidharan Date: Sun, 12 Feb 2017 21:55:16 -0500 Subject: [PATCH] FAB-2203 handle chaincode launch serially https://jira.hyperledger.org/browse/FAB-2203 Chaincode-peer communication was made asynch to handle concurrent invokes. The state machine requires that launch (one time operation) requires serial communication to get into "ready" state. While it is not incorrect to have async communication for launch also, it makes it hard to write test cases against (we have to wait for CC to get to ready state). A better approach would be for the system to enforce that at the end of init the system woukd be in ready state for inits/invokes. This is achieved by sending ready state serially to the CC. Change-Id: Ib5d3ec1b6188d5aa933dbcb43adaf9995f75de2d Signed-off-by: Srinivasan Muralidharan --- core/chaincode/handler.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index e70e0f34c66..72b87de3edb 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -67,6 +67,11 @@ type transactionContext struct { type nextStateInfo struct { msg *pb.ChaincodeMessage sendToCC bool + + //the only time we need to send synchronously is + //when launching the chaincode to take it to ready + //state (look for the panic when sending serial) + sendSync bool } //chaincode registered name is of the form @@ -261,7 +266,13 @@ func (handler *Handler) deregister() error { } func (handler *Handler) triggerNextState(msg *pb.ChaincodeMessage, send bool) { - handler.nextState <- &nextStateInfo{msg, send} + //this will send Async + handler.nextState <- &nextStateInfo{msg: msg, sendToCC: send, sendSync: false} +} + +func (handler *Handler) triggerNextStateSync(msg *pb.ChaincodeMessage) { + //this will send sync + handler.nextState <- &nextStateInfo{msg: msg, sendToCC: true, sendSync: true} } func (handler *Handler) waitForKeepaliveTimer() <-chan time.Time { @@ -361,8 +372,18 @@ func (handler *Handler) processStream() error { if nsInfo != nil && nsInfo.sendToCC { chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String()) - //if error bail in select - handler.serialSendAsync(in, errc) + //ready messages are sent sync + if nsInfo.sendSync { + if in.Type.String() != pb.ChaincodeMessage_READY.String() { + panic(fmt.Sprintf("[%s]Sync send can only be for READY state %s\n", shorttxid(in.Txid), in.Type.String())) + } + if err = handler.serialSend(in); err != nil { + return fmt.Errorf("[%s]Error sending ready message, ending stream: %s", shorttxid(in.Txid), err) + } + } else { + //if error bail in select + handler.serialSendAsync(in, errc) + } } } } @@ -1264,7 +1285,10 @@ func (handler *Handler) ready(ctxt context.Context, chainID string, txid string, return nil, err } - handler.triggerNextState(ccMsg, true) + //send the ready synchronously as the + //ready message is during launch and needs + //to happen before any init/invokes can sneak in + handler.triggerNextStateSync(ccMsg) return txctx.responseNotifier, nil }