Skip to content

Commit

Permalink
FAB-2203 handle chaincode launch serially
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2203

Chaincode-peer communication was made asynch to handle concurrent
invokes. The state machine requires that launch (one time operation)
requires serial communication to get into "ready" state. While it is not
incorrect to have async communication for launch also, it makes it hard
to write test cases against (we have to wait for CC to get to ready state).

A better approach would be for the system to enforce that at the end of init
the system woukd be in ready state for inits/invokes. This is achieved by
sending ready state serially to the CC.

Change-Id: Ib5d3ec1b6188d5aa933dbcb43adaf9995f75de2d
Signed-off-by: Srinivasan Muralidharan <muralisr@us.ibm.com>
  • Loading branch information
Srinivasan Muralidharan committed Feb 13, 2017
1 parent 9a2d8fc commit c50a329
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type transactionContext struct {
type nextStateInfo struct {
msg *pb.ChaincodeMessage
sendToCC bool

//the only time we need to send synchronously is
//when launching the chaincode to take it to ready
//state (look for the panic when sending serial)
sendSync bool
}

//chaincode registered name is of the form
Expand Down Expand Up @@ -261,7 +266,13 @@ func (handler *Handler) deregister() error {
}

func (handler *Handler) triggerNextState(msg *pb.ChaincodeMessage, send bool) {
handler.nextState <- &nextStateInfo{msg, send}
//this will send Async
handler.nextState <- &nextStateInfo{msg: msg, sendToCC: send, sendSync: false}
}

func (handler *Handler) triggerNextStateSync(msg *pb.ChaincodeMessage) {
//this will send sync
handler.nextState <- &nextStateInfo{msg: msg, sendToCC: true, sendSync: true}
}

func (handler *Handler) waitForKeepaliveTimer() <-chan time.Time {
Expand Down Expand Up @@ -361,8 +372,18 @@ func (handler *Handler) processStream() error {

if nsInfo != nil && nsInfo.sendToCC {
chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String())
//if error bail in select
handler.serialSendAsync(in, errc)
//ready messages are sent sync
if nsInfo.sendSync {
if in.Type.String() != pb.ChaincodeMessage_READY.String() {
panic(fmt.Sprintf("[%s]Sync send can only be for READY state %s\n", shorttxid(in.Txid), in.Type.String()))
}
if err = handler.serialSend(in); err != nil {
return fmt.Errorf("[%s]Error sending ready message, ending stream: %s", shorttxid(in.Txid), err)
}
} else {
//if error bail in select
handler.serialSendAsync(in, errc)
}
}
}
}
Expand Down Expand Up @@ -1264,7 +1285,10 @@ func (handler *Handler) ready(ctxt context.Context, chainID string, txid string,
return nil, err
}

handler.triggerNextState(ccMsg, true)
//send the ready synchronously as the
//ready message is during launch and needs
//to happen before any init/invokes can sneak in
handler.triggerNextStateSync(ccMsg)

return txctx.responseNotifier, nil
}
Expand Down

0 comments on commit c50a329

Please sign in to comment.