Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WatchList(..) endpoint for the resource service #16726

Merged
merged 34 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e8b49f8
storage: add backend interface and related types
boxofrad Mar 6, 2023
ae28bc8
storage: add conformance test suite
boxofrad Mar 6, 2023
531e995
storage: implement in-memory backend
boxofrad Mar 6, 2023
74eef9a
storage: better error messages
boxofrad Mar 6, 2023
2731ba8
storage: owner references should be anchored to a specific uid
boxofrad Mar 13, 2023
c25a7ec
Remove unused test function
boxofrad Mar 13, 2023
d5733f9
storage: clarify List docs
boxofrad Mar 14, 2023
56c4c3a
storage: fix potential out-of-order events
boxofrad Mar 14, 2023
033b612
storage: clarify WatchList consistency model
boxofrad Mar 15, 2023
f54bcb8
storage: s/Check-And-Set/Compare-And-Swap/
boxofrad Mar 15, 2023
0167a5b
storage: move event construction into publishEvent
boxofrad Mar 15, 2023
048712d
storage: document Read-Modify-Write patterns
boxofrad Mar 15, 2023
fbbbec9
storage: add clarifying comment about resource creation
boxofrad Mar 15, 2023
e17b50b
storage: clarify OwnerReferences consistency
boxofrad Mar 16, 2023
f441187
storage: separate ErrConflict into two errors
boxofrad Mar 17, 2023
84ff540
storage: make backends responsible for managing the version
boxofrad Mar 17, 2023
d7ac1c6
storage: make consistency an argument to Read
boxofrad Mar 17, 2023
ba94207
storage: add consistency parameter to List
boxofrad Mar 17, 2023
6e39fd5
storage: more correct consistency documentation
boxofrad Mar 17, 2023
4580e51
storage: fix integer alignment in inmem backend
boxofrad Mar 17, 2023
0748073
storage: support eventual consistency in conformance tests
boxofrad Mar 20, 2023
0a49eff
storage: correct eventLock comment
boxofrad Mar 20, 2023
8c75333
storage: rearrange inmem store files
boxofrad Mar 20, 2023
8bc4a80
storage: fix bug where watches could emit duplicate events
boxofrad Mar 21, 2023
730a73a
WatchList(..) endpoint for the resource service
analogue Mar 21, 2023
dc0f42b
Remove duplicate watch event
analogue Mar 22, 2023
7aaffec
Factor out resource.resolveType(..) so it can be re-used by all endpo…
analogue Mar 22, 2023
c11fbca
DRY server creation in tests to testServer(...)
analogue Mar 22, 2023
ed45648
Parallelize tests and shorten wait for no event
analogue Mar 22, 2023
2d85d82
Fix import
analogue Mar 22, 2023
28c49ff
make proto-format happy
analogue Mar 22, 2023
7e34f99
Merge branch 'main' into spatel/NET-2692-resource-service-watchlist-e…
analogue Mar 27, 2023
28caf48
Fix dupe WatchEvent creation
analogue Mar 27, 2023
e816a84
make proto-lint happy
analogue Mar 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions agent/grpc-external/services/resource/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import (
"context"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)

Expand All @@ -13,6 +17,18 @@ type Server struct {
}

type Config struct {
registry Registry
backend Backend
}

//go:generate mockery --name Registry --inpackage
type Registry interface {
resource.Registry
}

//go:generate mockery --name Backend --inpackage
type Backend interface {
storage.Backend
}

func NewServer(cfg Config) *Server {
Expand Down Expand Up @@ -50,7 +66,13 @@ func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pb
return &pbresource.DeleteResponse{}, nil
}

func (s *Server) Watch(req *pbresource.WatchRequest, ws pbresource.ResourceService_WatchServer) error {
// TODO
return nil
func (s *Server) resolveType(typ *pbresource.Type) (*resource.Registration, error) {
v, ok := s.registry.Resolve(typ)
if ok {
return &v, nil
}
return nil, status.Errorf(
codes.InvalidArgument,
"resource type %s not registered", resource.ToGVK(typ),
)
}
8 changes: 0 additions & 8 deletions agent/grpc-external/services/resource/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,3 @@ func TestDelete_TODO(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestWatch_TODO(t *testing.T) {
server := NewServer(Config{})
client := testClient(t, server)
wc, err := client.Watch(context.Background(), &pbresource.WatchRequest{})
require.NoError(t, err)
require.NotNil(t, wc)
}
43 changes: 43 additions & 0 deletions agent/grpc-external/services/resource/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package resource

import (
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)

func (s *Server) WatchList(req *pbresource.WatchListRequest, stream pbresource.ResourceService_WatchListServer) error {
// check type exists
if _, err := s.resolveType(req.Type); err != nil {
return err
}

unversionedType := storage.UnversionedTypeFrom(req.Type)
watch, err := s.backend.WatchList(
stream.Context(),
unversionedType,
req.Tenancy,
req.NamePrefix,
)
if err != nil {
return err
}
Comment on lines +21 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to myself 😄

When #16619 is merged, we'll need to update this to handle ErrWatchClosed (which I've just noticed is in the wrong package too).


for {
event, err := watch.Next(stream.Context())
if err != nil {
return err
}

// drop versions that don't match
if event.Resource.Id.Type.GroupVersion != req.Type.GroupVersion {
continue
}

if err = stream.Send(&pbresource.WatchEvent{
Operation: event.Operation,
Resource: event.Resource,
}); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit 🐜 I think you could simplify this to just:

Suggested change
if err = stream.Send(&pbresource.WatchEvent{
Operation: event.Operation,
Resource: event.Resource,
}); err != nil {
if err = stream.Send(event); err != nil {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Fixed in 28caf48

return err
}
}
}
222 changes: 222 additions & 0 deletions agent/grpc-external/services/resource/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package resource

import (
"context"
"errors"
"io"
"testing"
"time"

"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

func TestWatchList_TypeNotFound(t *testing.T) {
t.Parallel()
server := testServer(t)
client := testClient(t, server)

stream, err := client.WatchList(context.Background(), &pbresource.WatchListRequest{
Type: typev1,
Tenancy: tenancy,
NamePrefix: "",
})
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)

err = mustGetError(t, rspCh)
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
require.Contains(t, err.Error(), "resource type mesh/v1/service not registered")
}

func TestWatchList_GroupVersionMatches(t *testing.T) {
t.Parallel()
server := testServer(t)
client := testClient(t, server)
server.registry.Register(resource.Registration{Type: typev1})
ctx := context.Background()

// create a watch
stream, err := client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typev1,
Tenancy: tenancy,
NamePrefix: "",
})
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)

// insert and verify upsert event received
r1, err := server.backend.WriteCAS(ctx, resourcev1)
require.NoError(t, err)
rsp := mustGetResource(t, rspCh)
require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, rsp.Operation)
prototest.AssertDeepEqual(t, r1, rsp.Resource)

// update and verify upsert event received
r2 := clone(r1)
r2, err = server.backend.WriteCAS(ctx, r2)
require.NoError(t, err)
rsp = mustGetResource(t, rspCh)
require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, rsp.Operation)
prototest.AssertDeepEqual(t, r2, rsp.Resource)

// delete and verify delete event received
err = server.backend.DeleteCAS(ctx, r2.Id, r2.Version)
require.NoError(t, err)
rsp = mustGetResource(t, rspCh)
require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, rsp.Operation)
}

func TestWatchList_GroupVersionMismatch(t *testing.T) {
// Given a watch on typev2 that only differs from typev1 by GroupVersion
// When a resource of typev1 is created/updated/deleted
// Then no watch events should be emitted
t.Parallel()
server := testServer(t)
client := testClient(t, server)
ctx := context.Background()
server.registry.Register(resource.Registration{Type: typev1})
server.registry.Register(resource.Registration{Type: typev2})

// create a watch for typev2
stream, err := client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typev2,
Tenancy: tenancy,
NamePrefix: "",
})
require.NoError(t, err)
rspCh := handleResourceStream(t, stream)

// insert
r1, err := server.backend.WriteCAS(ctx, resourcev1)
require.NoError(t, err)

// update
r2 := clone(r1)
r2, err = server.backend.WriteCAS(ctx, r2)
require.NoError(t, err)

// delete
err = server.backend.DeleteCAS(ctx, r2.Id, r2.Version)
require.NoError(t, err)

// verify no events received
mustGetNoResource(t, rspCh)
}

func testServer(t *testing.T) *Server {
t.Helper()

backend, err := inmem.NewBackend()
require.NoError(t, err)
go backend.Run(testContext(t))

registry := resource.NewRegistry()
return NewServer(Config{registry: registry, backend: backend})
}

func mustGetNoResource(t *testing.T, ch <-chan resourceOrError) {
t.Helper()

select {
case rsp := <-ch:
require.NoError(t, rsp.err)
require.Nil(t, rsp.rsp, "expected nil response with no error")
case <-time.After(250 * time.Millisecond):
return
}
}

func mustGetResource(t *testing.T, ch <-chan resourceOrError) *pbresource.WatchEvent {
t.Helper()

select {
case rsp := <-ch:
require.NoError(t, rsp.err)
return rsp.rsp
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for WatchListResponse")
return nil
}
}

func mustGetError(t *testing.T, ch <-chan resourceOrError) error {
t.Helper()

select {
case rsp := <-ch:
require.Error(t, rsp.err)
return rsp.err
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting for WatchListResponse")
return nil
}
}

func handleResourceStream(t *testing.T, stream pbresource.ResourceService_WatchListClient) <-chan resourceOrError {
t.Helper()

rspCh := make(chan resourceOrError)
go func() {
for {
rsp, err := stream.Recv()
if errors.Is(err, io.EOF) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
return
}
rspCh <- resourceOrError{
rsp: rsp,
err: err,
}
}
}()
return rspCh
}

var (
tenancy = &pbresource.Tenancy{
Partition: "default",
Namespace: "default",
PeerName: "local",
}
typev1 = &pbresource.Type{
Group: "mesh",
GroupVersion: "v1",
Kind: "service",
}
typev2 = &pbresource.Type{
Group: "mesh",
GroupVersion: "v2",
Kind: "service",
}
resourcev1 = &pbresource.Resource{
Id: &pbresource.ID{
Uid: "someUid",
Name: "someName",
Type: typev1,
Tenancy: tenancy,
},
Version: "",
}
)

type resourceOrError struct {
rsp *pbresource.WatchEvent
err error
}

func clone[T proto.Message](v T) T { return proto.Clone(v).(T) }

func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
return ctx
}
2 changes: 1 addition & 1 deletion agent/grpc-middleware/rate_limit_mappings.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var rpcRateLimitSpecs = map[string]rate.OperationType{
"/hashicorp.consul.resource.ResourceService/Delete": rate.OperationTypeWrite,
"/hashicorp.consul.resource.ResourceService/List": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Read": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Watch": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/WatchList": rate.OperationTypeRead,
"/hashicorp.consul.resource.ResourceService/Write": rate.OperationTypeWrite,
"/hashicorp.consul.resource.ResourceService/WriteStatus": rate.OperationTypeWrite,
"/hashicorp.consul.serverdiscovery.ServerDiscoveryService/WatchServers": rate.OperationTypeRead,
Expand Down
Loading