From f8d24fdf5c49212e47540d738340511cddf66aae Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Thu, 30 Mar 2023 19:45:28 +0800 Subject: [PATCH] refactor: implement failover based distributed etcd lock (#146) --- go.mod | 2 +- go.sum | 8 ++ server/coordinator/watch/watch.go | 174 +++++++++++++++++++++++++ server/coordinator/watch/watch_test.go | 74 +++++++++++ 4 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 server/coordinator/watch/watch.go create mode 100644 server/coordinator/watch/watch_test.go diff --git a/go.mod b/go.mod index db463aac..0f5f672b 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/AlekSi/gocov-xml v1.0.0 - github.com/CeresDB/ceresdbproto/golang v0.0.0-20230116070525-9430fe7d3219 + github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330101212-9add015b62cf github.com/axw/gocov v1.1.0 github.com/caarlos0/env/v6 v6.10.1 github.com/julienschmidt/httprouter v1.3.0 diff --git a/go.sum b/go.sum index a6e0f4cd..d0033485 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,16 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/CeresDB/ceresdbproto v1.0.1-0.20230330080500-1c3bf4e803ef h1:uWHypLA/6JxWGMYY/ohQgQq9j9CgMOJaQTnh3YErQCQ= +github.com/CeresDB/ceresdbproto v1.0.1-0.20230330080500-1c3bf4e803ef/go.mod h1:6qWvbMz3dvjcyKVZkg1L/yKb6OrD8r+/rk5Q1GWhyWQ= +github.com/CeresDB/ceresdbproto v1.0.1-0.20230330101212-9add015b62cf h1:RZqP97cwSEW1bg0K/cB9jfwXjY7ykiEd25hl6YgzlQY= +github.com/CeresDB/ceresdbproto v1.0.1-0.20230330101212-9add015b62cf/go.mod h1:6qWvbMz3dvjcyKVZkg1L/yKb6OrD8r+/rk5Q1GWhyWQ= github.com/CeresDB/ceresdbproto/golang v0.0.0-20230116070525-9430fe7d3219 h1:xI3o/UcsSX0S15+NXhhOnoY0tNvlxgBo3g1inXxhmrU= github.com/CeresDB/ceresdbproto/golang v0.0.0-20230116070525-9430fe7d3219/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330080500-1c3bf4e803ef h1:LkT/saqReicelzVPfUcF23kEkJ14cWMi4LBaQ0X/ag8= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330080500-1c3bf4e803ef/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330101212-9add015b62cf h1:WOLpPZuC5azSCxEyiZ236PMxUXgluGM7u8PwIK6yLXA= +github.com/CeresDB/ceresdbproto/golang v0.0.0-20230330101212-9add015b62cf/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/server/coordinator/watch/watch.go b/server/coordinator/watch/watch.go new file mode 100644 index 00000000..894327de --- /dev/null +++ b/server/coordinator/watch/watch.go @@ -0,0 +1,174 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package coordinator + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb" + "github.com/CeresDB/ceresmeta/pkg/log" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/pkg/errors" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +const ( + shardPath = "shards" + keySep = "/" +) + +type ShardRegisterEvent struct { + ShardID storage.ShardID + NewLeaderNode string +} + +type ShardExpireEvent struct { + ShardID storage.ShardID + OldLeaderNode string +} + +type ShardEventCallback interface { + OnShardRegistered(event ShardRegisterEvent) error + OnShardExpired(event ShardExpireEvent) error +} + +// ShardWatch used to watch the distributed lock of shard, and provide the corresponding callback function. +type ShardWatch struct { + rootPath string + etcdClient *clientv3.Client + eventCallbacks []ShardEventCallback + + lock sync.RWMutex + isRunning bool + cancel context.CancelFunc +} + +func NewWatch(rootPath string, client *clientv3.Client) *ShardWatch { + return &ShardWatch{ + rootPath: rootPath, + etcdClient: client, + eventCallbacks: []ShardEventCallback{}, + } +} + +func (w *ShardWatch) Start(ctx context.Context) error { + w.lock.Lock() + defer w.lock.Unlock() + + if w.isRunning { + return nil + } + + shardKeyPrefix := encodeShardKeyPrefix(w.rootPath, shardPath) + if err := w.startWatch(ctx, shardKeyPrefix); err != nil { + return errors.WithMessage(err, "etcd register watch failed") + } + + w.isRunning = true + return nil +} + +func (w *ShardWatch) Stop(_ context.Context) error { + w.lock.Lock() + defer w.lock.Unlock() + + w.isRunning = false + w.cancel() + return nil +} + +func (w *ShardWatch) RegisteringEventCallback(eventCallback ShardEventCallback) { + w.eventCallbacks = append(w.eventCallbacks, eventCallback) +} + +func (w *ShardWatch) startWatch(ctx context.Context, path string) error { + log.Info("register shard watch", zap.String("watchPath", path)) + go func() { + ctxWithCancel, cancel := context.WithCancel(ctx) + w.cancel = cancel + respChan := w.etcdClient.Watch(ctxWithCancel, path, clientv3.WithPrefix(), clientv3.WithPrevKV()) + for resp := range respChan { + for _, event := range resp.Events { + if err := w.processEvent(event); err != nil { + log.Error("process event", zap.Error(err)) + } + } + } + }() + return nil +} + +func (w *ShardWatch) processEvent(event *clientv3.Event) error { + switch event.Type { + case mvccpb.DELETE: + shardID, err := decodeShardKey(string(event.Kv.Key)) + if err != nil { + return err + } + shardLockValue, err := convertShardLockValueToPB(event.PrevKv.Value) + if err != nil { + return err + } + log.Info("receive delete event", zap.String("preKV", fmt.Sprintf("%v", event.PrevKv)), zap.String("event", fmt.Sprintf("%v", event)), zap.Uint64("shardID", shardID), zap.String("oldLeader", shardLockValue.NodeName)) + for _, callback := range w.eventCallbacks { + if err := callback.OnShardExpired(ShardExpireEvent{ + ShardID: storage.ShardID(shardID), + OldLeaderNode: shardLockValue.NodeName, + }); err != nil { + return err + } + } + case mvccpb.PUT: + shardID, err := decodeShardKey(string(event.Kv.Key)) + if err != nil { + return err + } + shardLockValue, err := convertShardLockValueToPB(event.Kv.Value) + if err != nil { + return err + } + log.Info("receive put event", zap.String("event", fmt.Sprintf("%v", event)), zap.Uint64("shardID", shardID), zap.String("oldLeader", shardLockValue.NodeName)) + for _, callback := range w.eventCallbacks { + if err := callback.OnShardRegistered(ShardRegisterEvent{ + ShardID: storage.ShardID(shardID), + NewLeaderNode: shardLockValue.NodeName, + }); err != nil { + return err + } + } + } + return nil +} + +func decodeShardKey(keyPath string) (uint64, error) { + pathList := strings.Split(keyPath, keySep) + shardID, err := strconv.ParseUint(pathList[len(pathList)-1], 10, 64) + if err != nil { + return 0, errors.WithMessage(err, "decode etcd event key failed") + } + return shardID, nil +} + +func encodeShardKeyPrefix(rootPath, shardPath string) string { + return strings.Join([]string{rootPath, shardPath}, keySep) +} + +func encodeShardKey(rootPath string, shardPath string, shardID uint64) string { + shardKeyPrefix := encodeShardKeyPrefix(rootPath, shardPath) + return strings.Join([]string{shardKeyPrefix, strconv.FormatUint(shardID, 10)}, keySep) +} + +func convertShardLockValueToPB(value []byte) (*metaeventpb.ShardLockValue, error) { + shardLockValue := &metaeventpb.ShardLockValue{} + if err := proto.Unmarshal(value, shardLockValue); err != nil { + return shardLockValue, errors.WithMessage(err, "unmarshal shardLockValue failed") + } + return shardLockValue, nil +} diff --git a/server/coordinator/watch/watch_test.go b/server/coordinator/watch/watch_test.go new file mode 100644 index 00000000..c6135aae --- /dev/null +++ b/server/coordinator/watch/watch_test.go @@ -0,0 +1,74 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +package coordinator + +import ( + "context" + "testing" + "time" + + "github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb" + "github.com/CeresDB/ceresmeta/server/etcdutil" + "github.com/CeresDB/ceresmeta/server/storage" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" +) + +const ( + TestRootPath = "/rootPath" + TestShardPath = "shards" + TestShardID = 1 + TestNodeName = "testNode" +) + +func TestWatch(t *testing.T) { + re := require.New(t) + ctx := context.Background() + + _, client, _ := etcdutil.PrepareEtcdServerAndClient(t) + watch := NewWatch(TestRootPath, client) + err := watch.Start(ctx) + re.NoError(err) + + testCallback := testShardEventCallback{ + result: 0, + re: re, + } + + watch.RegisteringEventCallback(&testCallback) + + // Valid that callback function is executed and the params are as expected. + b, err := proto.Marshal(&metaeventpb.ShardLockValue{NodeName: TestNodeName}) + re.NoError(err) + + keyPath := encodeShardKey(TestRootPath, TestShardPath, TestShardID) + _, err = client.Put(ctx, keyPath, string(b)) + re.NoError(err) + time.Sleep(time.Millisecond * 10) + re.Equal(2, testCallback.result) + + _, err = client.Delete(ctx, keyPath, clientv3.WithPrevKV()) + re.NoError(err) + time.Sleep(time.Millisecond * 10) + re.Equal(1, testCallback.result) +} + +type testShardEventCallback struct { + result int + re *require.Assertions +} + +func (c *testShardEventCallback) OnShardRegistered(event ShardRegisterEvent) error { + c.result = 2 + c.re.Equal(storage.ShardID(TestShardID), event.ShardID) + c.re.Equal(TestNodeName, event.NewLeaderNode) + return nil +} + +func (c *testShardEventCallback) OnShardExpired(event ShardExpireEvent) error { + c.result = 1 + c.re.Equal(storage.ShardID(TestShardID), event.ShardID) + c.re.Equal(TestNodeName, event.OldLeaderNode) + return nil +}