Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: implement failover based distributed etcd lock (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang authored and ShiKaiWi committed Apr 20, 2023
1 parent 19f90f5 commit f8d24fd
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230116070525-9430fe7d3219
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330101212-9add015b62cf
github.com/axw/gocov v1.1.0
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto v1.0.1-0.20230330080500-1c3bf4e803ef h1:uWHypLA/6JxWGMYY/ohQgQq9j9CgMOJaQTnh3YErQCQ=
github.com/CeresDB/ceresdbproto v1.0.1-0.20230330080500-1c3bf4e803ef/go.mod h1:6qWvbMz3dvjcyKVZkg1L/yKb6OrD8r+/rk5Q1GWhyWQ=
github.com/CeresDB/ceresdbproto v1.0.1-0.20230330101212-9add015b62cf h1:RZqP97cwSEW1bg0K/cB9jfwXjY7ykiEd25hl6YgzlQY=
github.com/CeresDB/ceresdbproto v1.0.1-0.20230330101212-9add015b62cf/go.mod h1:6qWvbMz3dvjcyKVZkg1L/yKb6OrD8r+/rk5Q1GWhyWQ=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230116070525-9430fe7d3219 h1:xI3o/UcsSX0S15+NXhhOnoY0tNvlxgBo3g1inXxhmrU=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230116070525-9430fe7d3219/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330080500-1c3bf4e803ef h1:LkT/saqReicelzVPfUcF23kEkJ14cWMi4LBaQ0X/ag8=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330080500-1c3bf4e803ef/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330101212-9add015b62cf h1:WOLpPZuC5azSCxEyiZ236PMxUXgluGM7u8PwIK6yLXA=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330101212-9add015b62cf/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
174 changes: 174 additions & 0 deletions server/coordinator/watch/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

package coordinator

import (
"context"
"fmt"
"strconv"
"strings"
"sync"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

const (
shardPath = "shards"
keySep = "/"
)

type ShardRegisterEvent struct {
ShardID storage.ShardID
NewLeaderNode string
}

type ShardExpireEvent struct {
ShardID storage.ShardID
OldLeaderNode string
}

type ShardEventCallback interface {
OnShardRegistered(event ShardRegisterEvent) error
OnShardExpired(event ShardExpireEvent) error
}

// ShardWatch used to watch the distributed lock of shard, and provide the corresponding callback function.
type ShardWatch struct {
rootPath string
etcdClient *clientv3.Client
eventCallbacks []ShardEventCallback

lock sync.RWMutex
isRunning bool
cancel context.CancelFunc
}

func NewWatch(rootPath string, client *clientv3.Client) *ShardWatch {
return &ShardWatch{
rootPath: rootPath,
etcdClient: client,
eventCallbacks: []ShardEventCallback{},
}
}

func (w *ShardWatch) Start(ctx context.Context) error {
w.lock.Lock()
defer w.lock.Unlock()

if w.isRunning {
return nil
}

shardKeyPrefix := encodeShardKeyPrefix(w.rootPath, shardPath)
if err := w.startWatch(ctx, shardKeyPrefix); err != nil {
return errors.WithMessage(err, "etcd register watch failed")
}

w.isRunning = true
return nil
}

func (w *ShardWatch) Stop(_ context.Context) error {
w.lock.Lock()
defer w.lock.Unlock()

w.isRunning = false
w.cancel()
return nil
}

func (w *ShardWatch) RegisteringEventCallback(eventCallback ShardEventCallback) {
w.eventCallbacks = append(w.eventCallbacks, eventCallback)
}

func (w *ShardWatch) startWatch(ctx context.Context, path string) error {
log.Info("register shard watch", zap.String("watchPath", path))
go func() {
ctxWithCancel, cancel := context.WithCancel(ctx)
w.cancel = cancel
respChan := w.etcdClient.Watch(ctxWithCancel, path, clientv3.WithPrefix(), clientv3.WithPrevKV())
for resp := range respChan {
for _, event := range resp.Events {
if err := w.processEvent(event); err != nil {
log.Error("process event", zap.Error(err))
}
}
}
}()
return nil
}

func (w *ShardWatch) processEvent(event *clientv3.Event) error {
switch event.Type {
case mvccpb.DELETE:
shardID, err := decodeShardKey(string(event.Kv.Key))
if err != nil {
return err
}
shardLockValue, err := convertShardLockValueToPB(event.PrevKv.Value)
if err != nil {
return err
}
log.Info("receive delete event", zap.String("preKV", fmt.Sprintf("%v", event.PrevKv)), zap.String("event", fmt.Sprintf("%v", event)), zap.Uint64("shardID", shardID), zap.String("oldLeader", shardLockValue.NodeName))
for _, callback := range w.eventCallbacks {
if err := callback.OnShardExpired(ShardExpireEvent{
ShardID: storage.ShardID(shardID),
OldLeaderNode: shardLockValue.NodeName,
}); err != nil {
return err
}
}
case mvccpb.PUT:
shardID, err := decodeShardKey(string(event.Kv.Key))
if err != nil {
return err
}
shardLockValue, err := convertShardLockValueToPB(event.Kv.Value)
if err != nil {
return err
}
log.Info("receive put event", zap.String("event", fmt.Sprintf("%v", event)), zap.Uint64("shardID", shardID), zap.String("oldLeader", shardLockValue.NodeName))
for _, callback := range w.eventCallbacks {
if err := callback.OnShardRegistered(ShardRegisterEvent{
ShardID: storage.ShardID(shardID),
NewLeaderNode: shardLockValue.NodeName,
}); err != nil {
return err
}
}
}
return nil
}

func decodeShardKey(keyPath string) (uint64, error) {
pathList := strings.Split(keyPath, keySep)
shardID, err := strconv.ParseUint(pathList[len(pathList)-1], 10, 64)
if err != nil {
return 0, errors.WithMessage(err, "decode etcd event key failed")
}
return shardID, nil
}

func encodeShardKeyPrefix(rootPath, shardPath string) string {
return strings.Join([]string{rootPath, shardPath}, keySep)
}

func encodeShardKey(rootPath string, shardPath string, shardID uint64) string {
shardKeyPrefix := encodeShardKeyPrefix(rootPath, shardPath)
return strings.Join([]string{shardKeyPrefix, strconv.FormatUint(shardID, 10)}, keySep)
}

func convertShardLockValueToPB(value []byte) (*metaeventpb.ShardLockValue, error) {
shardLockValue := &metaeventpb.ShardLockValue{}
if err := proto.Unmarshal(value, shardLockValue); err != nil {
return shardLockValue, errors.WithMessage(err, "unmarshal shardLockValue failed")
}
return shardLockValue, nil
}
74 changes: 74 additions & 0 deletions server/coordinator/watch/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

package coordinator

import (
"context"
"testing"
"time"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/protobuf/proto"
)

const (
TestRootPath = "/rootPath"
TestShardPath = "shards"
TestShardID = 1
TestNodeName = "testNode"
)

func TestWatch(t *testing.T) {
re := require.New(t)
ctx := context.Background()

_, client, _ := etcdutil.PrepareEtcdServerAndClient(t)
watch := NewWatch(TestRootPath, client)
err := watch.Start(ctx)
re.NoError(err)

testCallback := testShardEventCallback{
result: 0,
re: re,
}

watch.RegisteringEventCallback(&testCallback)

// Valid that callback function is executed and the params are as expected.
b, err := proto.Marshal(&metaeventpb.ShardLockValue{NodeName: TestNodeName})
re.NoError(err)

keyPath := encodeShardKey(TestRootPath, TestShardPath, TestShardID)
_, err = client.Put(ctx, keyPath, string(b))
re.NoError(err)
time.Sleep(time.Millisecond * 10)
re.Equal(2, testCallback.result)

_, err = client.Delete(ctx, keyPath, clientv3.WithPrevKV())
re.NoError(err)
time.Sleep(time.Millisecond * 10)
re.Equal(1, testCallback.result)
}

type testShardEventCallback struct {
result int
re *require.Assertions
}

func (c *testShardEventCallback) OnShardRegistered(event ShardRegisterEvent) error {
c.result = 2
c.re.Equal(storage.ShardID(TestShardID), event.ShardID)
c.re.Equal(TestNodeName, event.NewLeaderNode)
return nil
}

func (c *testShardEventCallback) OnShardExpired(event ShardExpireEvent) error {
c.result = 1
c.re.Equal(storage.ShardID(TestShardID), event.ShardID)
c.re.Equal(TestNodeName, event.OldLeaderNode)
return nil
}

0 comments on commit f8d24fd

Please sign in to comment.