Skip to content

Commit

Permalink
grpc add sync load balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
caopingcp authored and vipwzw committed Mar 24, 2022
1 parent c0c2d0b commit 339a5a6
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 3 deletions.
18 changes: 15 additions & 3 deletions rpc/grpcclient/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package grpcclient

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -35,9 +36,20 @@ func NewMainChainClient(cfg *types.Chain33Config, grpcaddr string) (types.Chain3
Timeout: time.Second * 20,
PermitWithoutStream: true,
}
conn, err := grpc.Dial(NewMultipleURL(paraRemoteGrpcClient), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(paraChainGrpcRecSize)),
grpc.WithKeepaliveParams(kp))

var conn *grpc.ClientConn
var err error
useLBSync := types.Conf(cfg, "config.consensus.sub.para").IsEnable("useGrpcLBSync")
if useLBSync {
conn, err = grpc.Dial(NewSyncURL(paraRemoteGrpcClient), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(paraChainGrpcRecSize)),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, SyncLbName)),
grpc.WithKeepaliveParams(kp))
} else {
conn, err = grpc.Dial(NewMultipleURL(paraRemoteGrpcClient), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(paraChainGrpcRecSize)),
grpc.WithKeepaliveParams(kp))
}
if err != nil {
return nil, err
}
Expand Down
89 changes: 89 additions & 0 deletions rpc/grpcclient/syncBalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package grpcclient

import (
"errors"
"math/rand"
"sync"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/resolver"
)

func init() {
balancer.Register(newBuilder())
}

// SyncLbName is the name of sync balancer.
const SyncLbName = "sync"

// attributeKey is the type used as the key to store AddrInfo in the Attributes
// field of resolver.Address.
type attributeKey struct{}

// AddrInfo will be stored inside Address metadata in order to use weighted balancer.
type AddrInfo struct {
State int
}

// SetAddrInfo returns a copy of addr in which the Attributes field is updated
// with addrInfo.
func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
addr.Attributes = attributes.New()
addr.Attributes = addr.Attributes.WithValues(attributeKey{}, addrInfo)
return addr
}

// GetAddrInfo returns the AddrInfo stored in the Attributes fields of addr.
func GetAddrInfo(addr resolver.Address) AddrInfo {
v := addr.Attributes.Value(attributeKey{})
ai, _ := v.(AddrInfo)
return ai
}

// NewBuilder creates a new weight balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(SyncLbName, &syncPickerBuilder{}, base.Config{HealthCheck: false})
}

type syncPickerBuilder struct{}

// Build return picker
func (*syncPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
for subConn, addr := range info.ReadySCs {
node := GetAddrInfo(addr.Address)
if node.State == SYNC {
scs = append(scs, subConn)
}
}
return &syncPicker{
subConns: scs,
}
}

type syncPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn

mu sync.Mutex
}

// Pick return subConn
func (p *syncPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock()
defer p.mu.Unlock()

if len(p.subConns) == 0 {
return balancer.PickResult{}, errors.New("no SubConn is sync")
}
index := rand.Intn(len(p.subConns))
sc := p.subConns[index]
return balancer.PickResult{SubConn: sc}, nil
}
144 changes: 144 additions & 0 deletions rpc/grpcclient/syncResolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package grpcclient

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/33cn/chain33/types"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)

// scheme 自定义grpc负载局衡名
const syncScheme = "sync"

// sync url前缀
const syncPrefix = syncScheme + separator

// 节点同步状态
const (
UNKNOWN = 0
NOTSYNC = 1
SYNC = 2
)

func init() {
resolver.Register(&syncBuilder{})
}

// NewSyncURL 创建url
func NewSyncURL(url string) string {
return syncPrefix + url
}

type syncBuilder struct{}

// Build 为给定目标创建一个新的`resolver`,当调用`grpc.Dial()`时执行
func (*syncBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &syncResolver{
cc: cc,
stopC: make(chan struct{}, 1),
}

urls := strings.Split(target.Endpoint, ",")
if len(urls) < 1 {
return nil, fmt.Errorf("invalid target address %v", target)
}

for _, url := range urls {
host, port, err := parseTarget(url, defaultGrpcPort)
if err != nil {
return nil, err
}
if host != "" {
addr := resolver.Address{Addr: host + ":" + port}
state := getSyncState(addr)
r.SetServiceList(addr.Addr, state)
}
}
r.start()
return r, nil
}

// Scheme return syncScheme
func (*syncBuilder) Scheme() string {
return syncScheme
}

type syncResolver struct {
cc resolver.ClientConn
serverList sync.Map //服务列表
stopC chan struct{}
}

func (s *syncResolver) start() {
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
go s.watcher()
}

// ResolveNow 监视目标更新
func (s *syncResolver) ResolveNow(rn resolver.ResolveNowOptions) {}

// Close 关闭
func (s *syncResolver) Close() {
s.stopC <- struct{}{}
}

// watcher 监听节点同步状态
func (s *syncResolver) watcher() {
hint := time.NewTicker(10 * time.Second)
defer hint.Stop()
for {
select {
case <-s.stopC:
return
case <-hint.C:
for _, addr := range s.getServices() {
state := getSyncState(addr)
s.SetServiceList(addr.Addr, state)
}
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
}
}
}

// SetServiceList 设置服务地址
func (s *syncResolver) SetServiceList(key string, val int) {
//获取服务地址
addr := resolver.Address{Addr: key}
//把节点同步状态存储到resolver.Address的元数据中
addr = SetAddrInfo(addr, AddrInfo{State: val})
s.serverList.Store(key, addr)
}

// GetServices 获取服务地址
func (s *syncResolver) getServices() []resolver.Address {
addrs := make([]resolver.Address, 0, 10)
s.serverList.Range(func(k, v interface{}) bool {
addrs = append(addrs, v.(resolver.Address))
return true
})
return addrs
}

func getSyncState(addr resolver.Address) int {
conn, err := grpc.Dial(addr.Addr, grpc.WithInsecure())
defer conn.Close()

if err != nil {
return UNKNOWN
}
grpcClient := types.NewChain33Client(conn)
req := &types.ReqNil{}
reply, err := grpcClient.IsSync(context.Background(), req)
if err != nil {
return UNKNOWN
}
if !reply.IsOk {
return NOTSYNC
}
return SYNC
}

0 comments on commit 339a5a6

Please sign in to comment.