Skip to content

Commit

Permalink
Merge "[FAB-5568] Add filtered block event"
Browse files Browse the repository at this point in the history
  • Loading branch information
yacovm authored and Gerrit Code Review committed Oct 2, 2017
2 parents 87c5305 + e2f285c commit 04f62ad
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 90 deletions.
14 changes: 11 additions & 3 deletions core/committer/committer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,17 @@ func (lc *LedgerCommitter) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.Block

// postCommit publish event or handle other tasks once block committed to the ledger
func (lc *LedgerCommitter) postCommit(block *common.Block) {
// send block event *after* the block has been committed
if err := producer.SendProducerBlockEvent(block); err != nil {
logger.Errorf("Error publishing block %d, because: %v", block.Header.Number, err)
// create/send block events *after* the block has been committed
bevent, fbevent, channelID, err := producer.CreateBlockEvents(block)
if err != nil {
logger.Errorf("Channel [%s] Error processing block events for block number [%d]: %s", channelID, block.Header.Number, err)
} else {
if err := producer.Send(bevent); err != nil {
logger.Errorf("Channel [%s] Error sending block event for block number [%d]: %s", channelID, block.Header.Number, err)
}
if err := producer.Send(fbevent); err != nil {
logger.Errorf("Channel [%s] Error sending filtered block event for block number [%d]: %s", channelID, block.Header.Number, err)
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions core/scc/cscc/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,13 @@ func joinChain(chainID string, block *common.Block) pb.Response {

peer.InitChain(chainID)

if err := producer.SendProducerBlockEvent(block); err != nil {
cnflogger.Errorf("Error sending block event %s", err)
bevent, _, _, err := producer.CreateBlockEvents(block)
if err != nil {
cnflogger.Errorf("Error processing block events for block number [%d]: %s", block.Header.Number, err)
} else {
if err := producer.Send(bevent); err != nil {
cnflogger.Errorf("Channel [%s] Error sending block event for block number [%d]: %s", chainID, block.Header.Number, err)
}
}

return shim.Success(nil)
Expand Down
76 changes: 50 additions & 26 deletions events/producer/eventhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,57 +19,75 @@ package producer
import (
"fmt"

"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
)

// SendProducerBlockEvent sends block event to clients
func SendProducerBlockEvent(block *common.Block) error {
// CreateBlockEvents creates block events for a block. It removes the RW set
// and creates a block event and a filtered block event. Sending the events
// is the responsibility of the code that calls this function.
func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event, channelID string, err error) {
logger.Debugf("Entry")
defer logger.Debugf("Exit")
bevent := &common.Block{}
bevent.Header = block.Header
bevent.Metadata = block.Metadata
bevent.Data = &common.BlockData{}
var channelId string
for _, d := range block.Data.Data {

blockForEvent := &common.Block{}
filteredBlockForEvent := &pb.FilteredBlock{}
filteredTxArray := []*pb.FilteredTransaction{}
blockForEvent.Header = block.Header
blockForEvent.Metadata = block.Metadata
blockForEvent.Data = &common.BlockData{}
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])

for txIndex, d := range block.Data.Data {
ebytes := d
if ebytes != nil {
if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil {
logger.Errorf("error getting tx from block(%s)\n", err)
logger.Errorf("error getting tx from block: %s", err)
} else if env != nil {
// get the payload from the envelope
payload, err := utils.GetPayload(env)
if err != nil {
return fmt.Errorf("could not extract payload from envelope, err %s", err)
return nil, nil, "", fmt.Errorf("could not extract payload from envelope: %s", err)
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
return nil, nil, "", err
}
channelId = chdr.ChannelId
channelID = chdr.ChannelId

if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelId, block.Header.Number, chdr.TxId)
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelID, block.Header.Number, chdr.TxId)
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
return nil, nil, "", fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
}
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
if err != nil {
return fmt.Errorf("error unmarshalling transaction action payload for block event: %s", err)
return nil, nil, "", fmt.Errorf("error unmarshalling transaction action payload for block event: %s", err)
}
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
if err != nil {
return fmt.Errorf("error unmarshalling proposal response payload for block event: %s", err)
return nil, nil, "", fmt.Errorf("error unmarshalling proposal response payload for block event: %s", err)
}
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
if err != nil {
return fmt.Errorf("error unmarshalling chaincode action for block event: %s", err)
return nil, nil, "", fmt.Errorf("error unmarshalling chaincode action for block event: %s", err)
}

ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)
filteredTx := &pb.FilteredTransaction{Txid: chdr.TxId, TxValidationCode: txsFltr.Flag(txIndex)}

if ccEvent != nil {
filteredCcEvent := ccEvent
// nil out ccevent payload
filteredCcEvent.Payload = nil
filteredTx.CcEvent = filteredCcEvent
}
filteredTxArray = append(filteredTxArray, filteredTx)
// Drop read write set from transaction before sending block event
// Performance issue with chaincode deploy txs and causes nodejs grpc
// to hit max message size bug
Expand All @@ -78,40 +96,46 @@ func SendProducerBlockEvent(block *common.Block) error {
caPayload.Results = nil
chaincodeActionPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(propRespPayload.ProposalHash, caPayload.Response, caPayload.Results, caPayload.Events, caPayload.ChaincodeId)
if err != nil {
return fmt.Errorf("error marshalling tx proposal payload for block event: %s", err)
return nil, nil, "", fmt.Errorf("error marshalling tx proposal payload for block event: %s", err)
}
tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(chaincodeActionPayload)
if err != nil {
return fmt.Errorf("error marshalling tx action payload for block event: %s", err)
return nil, nil, "", fmt.Errorf("error marshalling tx action payload for block event: %s", err)
}
payload.Data, err = utils.GetBytesTransaction(tx)
if err != nil {
return fmt.Errorf("error marshalling payload for block event: %s", err)
return nil, nil, "", fmt.Errorf("error marshalling payload for block event: %s", err)
}
env.Payload, err = utils.GetBytesPayload(payload)
if err != nil {
return fmt.Errorf("error marshalling tx envelope for block event: %s", err)
return nil, nil, "", fmt.Errorf("error marshalling tx envelope for block event: %s", err)
}
ebytes, err = utils.GetBytesEnvelope(env)
if err != nil {
return fmt.Errorf("cannot marshal transaction %s", err)
return nil, nil, "", fmt.Errorf("cannot marshal transaction %s", err)
}
}
}
}
bevent.Data.Data = append(bevent.Data.Data, ebytes)
blockForEvent.Data.Data = append(blockForEvent.Data.Data, ebytes)
}
filteredBlockForEvent.ChannelId = channelID
filteredBlockForEvent.Number = block.Header.Number
filteredBlockForEvent.FilteredTx = filteredTxArray

logger.Infof("Channel [%s]: Sending event for block number [%d]", channelId, block.Header.Number)

return Send(CreateBlockEvent(bevent))
return CreateBlockEvent(blockForEvent), CreateFilteredBlockEvent(filteredBlockForEvent), channelID, nil
}

//CreateBlockEvent creates a Event from a Block
func CreateBlockEvent(te *common.Block) *pb.Event {
return &pb.Event{Event: &pb.Event_Block{Block: te}}
}

//CreateFilteredBlockEvent creates a Event from a FilteredBlock
func CreateFilteredBlockEvent(te *pb.FilteredBlock) *pb.Event {
return &pb.Event{Event: &pb.Event_FilteredBlock{FilteredBlock: te}}
}

//CreateChaincodeEvent creates a Event from a ChaincodeEvent
func CreateChaincodeEvent(te *pb.ChaincodeEvent) *pb.Event {
return &pb.Event{Event: &pb.Event_ChaincodeEvent{ChaincodeEvent: te}}
Expand Down
2 changes: 2 additions & 0 deletions events/producer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ func AddEventType(eventType pb.EventType) error {
switch eventType {
case pb.EventType_BLOCK:
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
case pb.EventType_FILTEREDBLOCK:
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
case pb.EventType_CHAINCODE:
gEventProcessor.eventConsumers[eventType] = &chaincodeHandlerList{handlers: make(map[string]map[string]map[*handler]bool)}
case pb.EventType_REJECTION:
Expand Down
2 changes: 2 additions & 0 deletions events/producer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func getInterestKey(interest pb.Interest) string {
switch interest.EventType {
case pb.EventType_BLOCK:
key = "/" + strconv.Itoa(int(pb.EventType_BLOCK))
case pb.EventType_FILTEREDBLOCK:
key = "/" + strconv.Itoa(int(pb.EventType_FILTEREDBLOCK))
case pb.EventType_REJECTION:
key = "/" + strconv.Itoa(int(pb.EventType_REJECTION))
case pb.EventType_CHAINCODE:
Expand Down
21 changes: 16 additions & 5 deletions events/producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var ehServer *EventsServer
func (a *Adapter) GetInterestedEvents() ([]*ehpb.Interest, error) {
return []*ehpb.Interest{
&ehpb.Interest{EventType: ehpb.EventType_BLOCK},
&ehpb.Interest{EventType: ehpb.EventType_FILTEREDBLOCK},
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event1"}}},
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event2"}}},
&ehpb.Interest{EventType: ehpb.EventType_REGISTER, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event3"}}},
Expand All @@ -81,7 +82,7 @@ func (a *Adapter) updateCountNotify() {

func (a *Adapter) Recv(msg *ehpb.Event) (bool, error) {
switch x := msg.Event.(type) {
case *ehpb.Event_Block, *ehpb.Event_ChaincodeEvent, *ehpb.Event_Register, *ehpb.Event_Unregister:
case *ehpb.Event_Block, *ehpb.Event_ChaincodeEvent, *ehpb.Event_Register, *ehpb.Event_Unregister, *ehpb.Event_FilteredBlock:
a.updateCountNotify()
case nil:
// The field is not set.
Expand Down Expand Up @@ -213,9 +214,19 @@ func TestReceiveAnyMessage(t *testing.T) {

adapter.count = 1
block := testutil.ConstructTestBlock(t, 1, 10, 100)
if err = SendProducerBlockEvent(block); err != nil {

bevent, fbevent, _, err := CreateBlockEvents(block)
if err != nil {
t.Fail()
t.Logf("Error sending message %s", err)
t.Logf("Error processing block for events %s", err)
}
if err = Send(bevent); err != nil {
t.Fail()
t.Logf("Error sending block event: %s", err)
}
if err = Send(fbevent); err != nil {
t.Fail()
t.Logf("Error sending filtered block event: %s", err)
}

emsg := createTestChaincodeEvent("0xffffffff", "event2")
Expand All @@ -224,8 +235,8 @@ func TestReceiveAnyMessage(t *testing.T) {
t.Logf("Error sending message %s", err)
}

//receive 2 messages - a block and a chaincode event
for i := 0; i < 2; i++ {
//receive 3 messages - a block, a filtered block, and a chaincode event
for i := 0; i < 3; i++ {
select {
case <-adapter.notfy:
case <-time.After(5 * time.Second):
Expand Down
3 changes: 3 additions & 0 deletions events/producer/register_internal_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func getMessageType(e *pb.Event) pb.EventType {
return pb.EventType_CHAINCODE
case *pb.Event_Rejection:
return pb.EventType_REJECTION
case *pb.Event_FilteredBlock:
return pb.EventType_FILTEREDBLOCK
default:
return -1
}
Expand All @@ -53,4 +55,5 @@ func addInternalEventTypes() {
AddEventType(pb.EventType_CHAINCODE)
AddEventType(pb.EventType_REJECTION)
AddEventType(pb.EventType_REGISTER)
AddEventType(pb.EventType_FILTEREDBLOCK)
}
2 changes: 2 additions & 0 deletions protos/peer/admin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 04f62ad

Please sign in to comment.