forked from etcd-io/etcd
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce grpc-1.30+ compatible client/v3/naming API.
This is not yet implementation, just API and tests to be filled with implementation in next CLs, tracked by: etcd-io#12652 We propose here 3 packages: - clientv3/naming/endpoints -> That is abstraction layer over etcd that allows to write, read & watch Endpoints information. It's independent from GRPC API. It hides the storage details. - clientv3/naming/endpoints/internal -> That contains the grpc's compatible Update class to preserve the internal JSON mashalling format. - clientv3/naming/resolver -> That implements the GRPC resolver API, such that etcd can be used for connection.Dial in grpc. Please see the grpc_naming.md document changes & grpcproxy/cluster.go new integration, to see how the new abstractions work. Signed-off-by: Chao Chen <chaochn@amazon.com>
- Loading branch information
Showing
10 changed files
with
532 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package endpoints | ||
|
||
import ( | ||
"context" | ||
|
||
"go.etcd.io/etcd/clientv3" | ||
) | ||
|
||
// Endpoint represents a single address the connection can be established with. | ||
// | ||
// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address. | ||
// Please document etcd version since which version each field is supported. | ||
type Endpoint struct { | ||
// Addr is the server address on which a connection will be established. | ||
// Since etcd 3.1 | ||
Addr string | ||
|
||
// Metadata is the information associated with Addr, which may be used | ||
// to make load balancing decision. | ||
// Since etcd 3.1 | ||
Metadata interface{} | ||
} | ||
|
||
type Operation uint8 | ||
|
||
const ( | ||
// Add indicates an Endpoint is added. | ||
Add Operation = iota | ||
// Delete indicates an existing address is deleted. | ||
Delete | ||
) | ||
|
||
// Update describes a single edit action of an Endpoint. | ||
type Update struct { | ||
// Op - action Add or Delete. | ||
Op Operation | ||
Key string | ||
Endpoint Endpoint | ||
} | ||
|
||
// WatchChannel is used to deliver notifications about endpoints updates. | ||
type WatchChannel chan []*Update | ||
|
||
// Key2EndpointMap maps etcd key into struct describing the endpoint. | ||
type Key2EndpointMap map[string]Endpoint | ||
|
||
// UpdateWithOpts describes endpoint update (add or delete) together | ||
// with etcd options (e.g. to attach an endpoint to a lease). | ||
type UpdateWithOpts struct { | ||
Update | ||
Opts []clientv3.OpOption | ||
} | ||
|
||
// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration. | ||
func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts { | ||
return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts} | ||
} | ||
|
||
// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion. | ||
func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts { | ||
return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts} | ||
} | ||
|
||
// Manager can be used to add/remove & inspect endpoints stored in etcd for | ||
// a particular target. | ||
type Manager interface { | ||
// Update allows to atomically add/remove a few endpoints from etcd. | ||
Update(ctx context.Context, updates []*UpdateWithOpts) error | ||
|
||
// AddEndpoint registers a single endpoint in etcd. | ||
// For more advanced use-cases use the Update method. | ||
AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error | ||
// DeleteEndpoint deletes a single endpoint stored in etcd. | ||
// For more advanced use-cases use the Update method. | ||
DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error | ||
|
||
// List returns all the endpoints for the current target as a map. | ||
List(ctx context.Context) (Key2EndpointMap, error) | ||
// NewWatchChannel creates a channel that populates or endpoint updates. | ||
// Cancel the 'ctx' to close the watcher. | ||
NewWatchChannel(ctx context.Context) (WatchChannel, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package endpoints | ||
|
||
// TODO: The API is not yet implemented. | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.etcd.io/etcd/clientv3" | ||
"go.etcd.io/etcd/clientv3/naming/endpoints/internal" | ||
) | ||
|
||
type endpointManager struct { | ||
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652 | ||
} | ||
|
||
func NewManager(client *clientv3.Client, target string) (Manager, error) { | ||
// To be implemented (https://github.com/etcd-io/etcd/issues/12652) | ||
return nil, fmt.Errorf("Not implemented yet") | ||
} | ||
|
||
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error { | ||
// TODO: For loop in a single transaction: | ||
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format. | ||
switch internalUpdate.Op { | ||
//case internal.Add: | ||
// var v []byte | ||
// if v, err = json.Marshal(internalUpdate); err != nil { | ||
// return status.Error(codes.InvalidArgument, err.Error()) | ||
// } | ||
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...) | ||
//case internal.Delete: | ||
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...) | ||
//default: | ||
// return status.Error(codes.InvalidArgument, "naming: bad naming op") | ||
} | ||
return fmt.Errorf("Not implemented yet") | ||
} | ||
|
||
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error { | ||
return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)}) | ||
} | ||
|
||
func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error { | ||
return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)}) | ||
} | ||
|
||
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) { | ||
return nil, fmt.Errorf("Not implemented yet") | ||
|
||
// TODO: Implementation to be inspired by: | ||
// Next gets the next set of updates from the etcd resolver. | ||
//// Calls to Next should be serialized; concurrent calls are not safe since | ||
//// there is no way to reconcile the update ordering. | ||
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { | ||
// if gw.wch == nil { | ||
// // first Next() returns all addresses | ||
// return gw.firstNext() | ||
// } | ||
// if gw.err != nil { | ||
// return nil, gw.err | ||
// } | ||
// | ||
// // process new events on target/* | ||
// wr, ok := <-gw.wch | ||
// if !ok { | ||
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error()) | ||
// return nil, gw.err | ||
// } | ||
// if gw.err = wr.Err(); gw.err != nil { | ||
// return nil, gw.err | ||
// } | ||
// | ||
// updates := make([]*naming.Update, 0, len(wr.Events)) | ||
// for _, e := range wr.Events { | ||
// var jupdate naming.Update | ||
// var err error | ||
// switch e.Type { | ||
// case etcd.EventTypePut: | ||
// err = json.Unmarshal(e.Kv.Value, &jupdate) | ||
// jupdate.Op = naming.Add | ||
// case etcd.EventTypeDelete: | ||
// err = json.Unmarshal(e.PrevKv.Value, &jupdate) | ||
// jupdate.Op = naming.Delete | ||
// default: | ||
// continue | ||
// } | ||
// if err == nil { | ||
// updates = append(updates, &jupdate) | ||
// } | ||
// } | ||
// return updates, nil | ||
//} | ||
// | ||
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) { | ||
// // Use serialized request so resolution still works if the target etcd | ||
// // server is partitioned away from the quorum. | ||
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable()) | ||
// if gw.err = err; err != nil { | ||
// return nil, err | ||
// } | ||
// | ||
// updates := make([]*naming.Update, 0, len(resp.Kvs)) | ||
// for _, kv := range resp.Kvs { | ||
// var jupdate naming.Update | ||
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil { | ||
// continue | ||
// } | ||
// updates = append(updates, &jupdate) | ||
// } | ||
// | ||
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()} | ||
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...) | ||
// return updates, nil | ||
//} | ||
} | ||
|
||
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) { | ||
// TODO: Implementation | ||
return nil, fmt.Errorf("Not implemented yet") | ||
} |
Oops, something went wrong.