Skip to content

Commit

Permalink
Merge "Add Committer service API interface."
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Nov 10, 2016
2 parents e918b6f + 41e842f commit 3508592
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 229 deletions.
15 changes: 13 additions & 2 deletions core/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package committer

import (
"github.com/hyperledger/fabric/protos"
)

// Committer is the interface supported by committers
// The only committer is noopssinglechain committer.
// The interface is intentionally sparse with the sole
Expand All @@ -24,6 +28,13 @@ package committer
// more support (such as Gossip) this interface will
// change
type Committer interface {
//Start registers and opens communications
Start() error

// Commit block to the ledger
CommitBlock(block *protos.Block2) error

// Get recent block sequence number
LedgerHeight() (uint64, error)

// Gets blocks with sequence numbers provided in the slice
GetBlocks(blockSeqs []uint64) []*protos.Block2
}
84 changes: 84 additions & 0 deletions core/committer/committer_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package committer

import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos"
pb "github.com/hyperledger/fabric/protos"
"github.com/op/go-logging"
)

//--------!!!IMPORTANT!!-!!IMPORTANT!!-!!IMPORTANT!!---------
// This is used merely to complete the loop for the "skeleton"
// path so we can reason about and modify committer component
// more effectively using code.

var logger *logging.Logger // package-level logger

func init() {
logger = logging.MustGetLogger("committer")
}

type LedgerCommitter struct {
ledger ledger.ValidatedLedger
}

// NewLedgerCommitter is a factory function to create an instance of the committer
func NewLedgerCommitter(ledger ledger.ValidatedLedger) *LedgerCommitter {
return &LedgerCommitter{ledger}
}

// CommitBlock commits block to into the ledger
func (lc *LedgerCommitter) CommitBlock(block *protos.Block2) error {
if _, _, err := lc.ledger.RemoveInvalidTransactionsAndPrepare(block); err != nil {
return err
}
if err := lc.ledger.Commit(); err != nil {
return err
}
return nil
}

// LedgerHeight returns recently committed block sequence number
func (lc *LedgerCommitter) LedgerHeight() (uint64, error) {
var info *pb.BlockchainInfo
var err error
if info, err = lc.ledger.GetBlockchainInfo(); err != nil {
logger.Errorf("Cannot get blockchain info, %s\n", info)
return uint64(0), err
}

return info.Height, nil
}

// GetBlocks used to retrieve blocks with sequence numbers provided in the slice
func (lc *LedgerCommitter) GetBlocks(blockSeqs []uint64) []*protos.Block2 {
blocks := make([]*protos.Block2, 0)

for _, seqNum := range blockSeqs {
var block *protos.Block2
var err error
if block, err = lc.ledger.GetBlockByNumber(seqNum); err != nil {
logger.Errorf("Could not able to acquire block num %d, from the ledger skipping...\n", seqNum)
continue
}
blocks = append(blocks, block)
}

return blocks
}
73 changes: 73 additions & 0 deletions core/committer/committer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package committer

import (
"os"
"testing"

"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/protos"
"github.com/stretchr/testify/assert"
)

func TestKVLedgerBlockStorage(t *testing.T) {
conf := kvledger.NewConf("/tmp/tests/ledger/", 0)
defer os.RemoveAll("/tmp/tests/ledger/")

ledger, _ := kvledger.NewKVLedger(conf)
defer ledger.Close()

committer := NewLedgerCommitter(ledger)

var err error

height, err := committer.LedgerHeight()
assert.Equal(t, uint64(0), height)
assert.NoError(t, err)

bcInfo, _ := ledger.GetBlockchainInfo()
testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{
Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})

simulator, _ := ledger.NewTxSimulator()
simulator.SetState("ns1", "key1", []byte("value1"))
simulator.SetState("ns1", "key2", []byte("value2"))
simulator.SetState("ns1", "key3", []byte("value3"))
simulator.Done()

simRes, _ := simulator.GetTxSimulationResults()
block1 := testutil.ConstructBlockForSimulationResults(t, [][]byte{simRes})

err = committer.CommitBlock(block1)
assert.NoError(t, err)

height, err = committer.LedgerHeight()
assert.Equal(t, uint64(1), height)
assert.NoError(t, err)

blocks := committer.GetBlocks([]uint64{1})
assert.Equal(t, 1, len(blocks))
assert.NoError(t, err)

bcInfo, _ = ledger.GetBlockchainInfo()
serBlock1, _ := protos.ConstructSerBlock2(block1)
block1Hash := serBlock1.ComputeHash()
testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{
Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}})
}
168 changes: 168 additions & 0 deletions core/committer/noopssinglechain/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package noopssinglechain

import (
"fmt"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/protos"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/spf13/viper"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

var logger *logging.Logger // package-level logger

func init() {
logger = logging.MustGetLogger("committer")
}

// DeliverService used to communicate with orderers to obtain
// new block and send the to the committer service
type DeliverService struct {
client orderer.AtomicBroadcast_DeliverClient
windowSize uint64
unAcknowledged uint64
committer *committer.LedgerCommitter
}

// NewDeliverService construction function to create and initilize
// delivery service instance
func NewDeliverService() *DeliverService {
if viper.GetBool("peer.committer.enabled") {
logger.Infof("Creating committer for single noops endorser")

var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithTimeout(3*time.Second))
opts = append(opts, grpc.WithBlock())
endpoint := viper.GetString("peer.committer.ledger.orderer")
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
logger.Errorf("Cannot dial to %s, because of %s", endpoint, err)
return nil
}
var abc orderer.AtomicBroadcast_DeliverClient
abc, err = orderer.NewAtomicBroadcastClient(conn).Deliver(context.TODO())
if err != nil {
logger.Errorf("Unable to initialize atomic broadcast, due to %s", err)
return nil
}

deliverService := &DeliverService{
// Atomic Broadcast Deliver Clienet
client: abc,
// Instance of RawLedger
committer: committer.NewLedgerCommitter(kvledger.GetLedger(string(chaincode.DefaultChain))),
windowSize: 10,
}
return deliverService
}
logger.Infof("Committer disabled")
return nil
}

// Start the delivery service to read the block via delivery
// protocol from the orderers
func (d *DeliverService) Start() error {
if err := d.seekOldest(); err != nil {
return err
}

d.readUntilClose()
return nil
}

func (d *DeliverService) seekOldest() error {
return d.client.Send(&orderer.DeliverUpdate{
Type: &orderer.DeliverUpdate_Seek{
Seek: &orderer.SeekInfo{
Start: orderer.SeekInfo_OLDEST,
WindowSize: d.windowSize,
},
},
})
}

func (d *DeliverService) readUntilClose() {
for {
msg, err := d.client.Recv()
if err != nil {
return
}

switch t := msg.Type.(type) {
case *orderer.DeliverResponse_Error:
if t.Error == common.Status_SUCCESS {
fmt.Println("ERROR! Received success in error field")
return
}
fmt.Println("Got error ", t)
case *orderer.DeliverResponse_Block:
block := &protos.Block2{}
for _, d := range t.Block.Data.Data {
if d != nil {
if tx, err := putils.GetEndorserTxFromBlock(d); err != nil {
fmt.Printf("Error getting tx from block(%s)\n", err)
} else if tx != nil {
if t, err := proto.Marshal(tx); err == nil {
block.Transactions = append(block.Transactions, t)
} else {
fmt.Printf("Cannot marshal transactoins %s\n", err)
}
} else {
fmt.Printf("Nil tx from block\n")
}
}
}
// Once block is constructed need to commit into the ledger
if err = d.committer.CommitBlock(block); err != nil {
fmt.Printf("Got error while committing(%s)\n", err)
} else {
fmt.Printf("Commit success, created a block!\n")
}

d.unAcknowledged++
if d.unAcknowledged >= d.windowSize/2 {
fmt.Println("Sending acknowledgement")
err = d.client.Send(&orderer.DeliverUpdate{
Type: &orderer.DeliverUpdate_Acknowledgement{
Acknowledgement: &orderer.Acknowledgement{
Number: t.Block.Header.Number,
},
},
})
if err != nil {
return
}
d.unAcknowledged = 0
}
default:
fmt.Println("Received unknown: ", t)
return
}
}
}
Loading

0 comments on commit 3508592

Please sign in to comment.