From 47b3ac6f7041b7285f426ce914ffcbd8ae7e5db2 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 25 Mar 2019 16:43:15 -0400 Subject: [PATCH] move snapshotter implementation to its own package pkg/snapshotter --- cmd/csi-snapshotter/main.go | 19 +- cmd/csi-snapshotter/main_test.go | 161 +++++++++++ pkg/connection/connection.go | 251 ------------------ pkg/controller/csi_handler.go | 5 +- pkg/controller/snapshot_controller_base.go | 3 +- .../snapshotter.go | 9 +- .../snapshotter_test.go | 4 +- 7 files changed, 183 insertions(+), 269 deletions(-) create mode 100644 cmd/csi-snapshotter/main_test.go delete mode 100644 pkg/connection/connection.go rename pkg/{controller => snapshotter}/snapshotter.go (94%) rename pkg/{controller => snapshotter}/snapshotter_test.go (99%) diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 55173f4da..7aea51ad0 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -33,10 +33,10 @@ import ( "k8s.io/klog" "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/golang/glog" "github.com/kubernetes-csi/csi-lib-utils/connection" csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc" "github.com/kubernetes-csi/external-snapshotter/pkg/controller" + "github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter" clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned" snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme" @@ -55,7 +55,7 @@ const ( // Command line flags var ( - snapshotter = flag.String("snapshotter", "", "This option is deprecated.") + snapshotterName = flag.String("snapshotter", "", "This option is deprecated.") kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") connectionTimeout = flag.Duration("connection-timeout", 0, "The --connection-timeout flag is deprecated") csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") @@ -85,7 +85,8 @@ func main() { if *connectionTimeout != 0 { klog.Warning("--connection-timeout is deprecated and will have no effect") } - if *snapshotter != "" { + + if *snapshotterName != "" { klog.Warning("--snapshotter is deprecated and will have no effect") } @@ -140,13 +141,13 @@ func main() { defer cancel() // Find driver name - *snapshotter, err = csirpc.GetDriverName(ctx, csiConn) + *snapshotterName, err = csirpc.GetDriverName(ctx, csiConn) if err != nil { klog.Errorf("error getting CSI driver name: %v", err) os.Exit(1) } - klog.V(2).Infof("CSI driver name: %q", *snapshotter) + klog.V(2).Infof("CSI driver name: %q", *snapshotterName) // Check it's ready if err = csirpc.ProbeForever(csiConn, csiTimeout); err != nil { @@ -162,7 +163,7 @@ func main() { os.Exit(1) } if !supportsCreateSnapshot { - klog.Errorf("CSI driver %s does not support ControllerCreateSnapshot", *snapshotter) + klog.Errorf("CSI driver %s does not support ControllerCreateSnapshot", *snapshotterName) os.Exit(1) } @@ -171,13 +172,13 @@ func main() { os.Exit(1) } - klog.V(2).Infof("Start NewCSISnapshotController with snapshotter [%s] kubeconfig [%s] connectionTimeout [%+v] csiAddress [%s] createSnapshotContentRetryCount [%d] createSnapshotContentInterval [%+v] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", *snapshotter, *kubeconfig, *connectionTimeout, *csiAddress, createSnapshotContentRetryCount, *createSnapshotContentInterval, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength) + klog.V(2).Infof("Start NewCSISnapshotController with snapshotter [%s] kubeconfig [%s] connectionTimeout [%+v] csiAddress [%s] createSnapshotContentRetryCount [%d] createSnapshotContentInterval [%+v] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", *snapshotterName, *kubeconfig, *connectionTimeout, *csiAddress, createSnapshotContentRetryCount, *createSnapshotContentInterval, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength) - snapShotter := controller.NewSnapshotter(csiConn) + snapShotter := snapshotter.NewSnapshotter(csiConn) ctrl := controller.NewCSISnapshotController( snapClient, kubeClient, - *snapshotter, + *snapshotterName, factory.Volumesnapshot().V1alpha1().VolumeSnapshots(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(), factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(), diff --git a/cmd/csi-snapshotter/main_test.go b/cmd/csi-snapshotter/main_test.go new file mode 100644 index 000000000..f13aba72b --- /dev/null +++ b/cmd/csi-snapshotter/main_test.go @@ -0,0 +1,161 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/mock/gomock" + "github.com/kubernetes-csi/csi-lib-utils/connection" + "github.com/kubernetes-csi/csi-test/driver" + + "google.golang.org/grpc" +) + +func Test_supportsControllerCreateSnapshot(t *testing.T) { + tests := []struct { + name string + output *csi.ControllerGetCapabilitiesResponse + injectError bool + expectError bool + expectResult bool + }{ + { + name: "success", + output: &csi.ControllerGetCapabilitiesResponse{ + Capabilities: []*csi.ControllerServiceCapability{ + { + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + }, + }, + }, + { + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + }, + }, + }, + }, + }, + expectError: false, + expectResult: true, + }, + { + name: "gRPC error", + output: nil, + injectError: true, + expectError: true, + expectResult: false, + }, + { + name: "no create snapshot", + output: &csi.ControllerGetCapabilitiesResponse{ + Capabilities: []*csi.ControllerServiceCapability{ + { + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + }, + }, + }, + }, + }, + expectError: false, + expectResult: false, + }, + { + name: "empty capability", + output: &csi.ControllerGetCapabilitiesResponse{ + Capabilities: []*csi.ControllerServiceCapability{ + { + Type: nil, + }, + }, + }, + expectError: false, + expectResult: false, + }, + { + name: "no capabilities", + output: &csi.ControllerGetCapabilitiesResponse{ + Capabilities: []*csi.ControllerServiceCapability{}, + }, + expectError: false, + expectResult: false, + }, + } + + mockController, driver, _, controllerServer, csiConn, err := createMockServer(t) + if err != nil { + t.Fatal(err) + } + defer mockController.Finish() + defer driver.Stop() + defer csiConn.Close() + + for _, test := range tests { + + in := &csi.ControllerGetCapabilitiesRequest{} + + out := test.output + var injectedErr error + if test.injectError { + injectedErr = fmt.Errorf("mock error") + } + + // Setup expectation + controllerServer.EXPECT().ControllerGetCapabilities(gomock.Any(), in).Return(out, injectedErr).Times(1) + + ok, err := supportsControllerCreateSnapshot(context.Background(), csiConn) + if test.expectError && err == nil { + t.Errorf("test %q: Expected error, got none", test.name) + } + if !test.expectError && err != nil { + t.Errorf("test %q: got error: %v", test.name, err) + } + if err == nil && test.expectResult != ok { + t.Errorf("test fail expected result %t but got %t\n", test.expectResult, ok) + } + } +} + +func createMockServer(t *testing.T) (*gomock.Controller, *driver.MockCSIDriver, *driver.MockIdentityServer, *driver.MockControllerServer, *grpc.ClientConn, error) { + // Start the mock server + mockController := gomock.NewController(t) + identityServer := driver.NewMockIdentityServer(mockController) + controllerServer := driver.NewMockControllerServer(mockController) + drv := driver.NewMockCSIDriver(&driver.MockCSIDriverServers{ + Identity: identityServer, + Controller: controllerServer, + }) + drv.Start() + + // Create a client connection to it + addr := drv.Address() + csiConn, err := connection.Connect(addr) + if err != nil { + return nil, nil, nil, nil, nil, err + } + + return mockController, drv, identityServer, controllerServer, csiConn, nil +} diff --git a/pkg/connection/connection.go b/pkg/connection/connection.go deleted file mode 100644 index 760ae2cdb..000000000 --- a/pkg/connection/connection.go +++ /dev/null @@ -1,251 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package connection - -import ( - "context" - "fmt" - - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/timestamp" - "github.com/kubernetes-csi/csi-lib-utils/connection" - "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" - "google.golang.org/grpc" - "k8s.io/api/core/v1" - "k8s.io/klog" -) - -// CSIConnection is gRPC connection to a remote CSI driver and abstracts all -// CSI calls. -type CSIConnection interface { - // GetDriverName returns driver name as discovered by GetPluginInfo() - // gRPC call. - GetDriverName(ctx context.Context) (string, error) - - // SupportsControllerCreateSnapshot returns true if the CSI driver reports - // CREATE_DELETE_SNAPSHOT in ControllerGetCapabilities() gRPC call. - SupportsControllerCreateSnapshot(ctx context.Context) (bool, error) - - // SupportsControllerListSnapshots returns true if the CSI driver reports - // LIST_SNAPSHOTS in ControllerGetCapabilities() gRPC call. - SupportsControllerListSnapshots(ctx context.Context) (bool, error) - - // CreateSnapshot creates a snapshot for a volume - CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp int64, size int64, readyToUse bool, err error) - - // DeleteSnapshot deletes a snapshot from a volume - DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) - - // GetSnapshotStatus returns if a snapshot is ready to use, creation time, and restore size. - GetSnapshotStatus(ctx context.Context, snapshotID string) (bool, int64, int64, error) - - // Probe checks that the CSI driver is ready to process requests - Probe(ctx context.Context) error - - // Close the connection - Close() error -} - -type csiConnection struct { - conn *grpc.ClientConn -} - -var ( - _ CSIConnection = &csiConnection{} -) - -// New returns a CSI connection object. -func New(address string) (CSIConnection, error) { - conn, err := connection.Connect(address) - if err != nil { - return nil, err - } - return &csiConnection{ - conn: conn, - }, nil -} - -func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) { - client := csi.NewIdentityClient(c.conn) - - req := csi.GetPluginInfoRequest{} - - rsp, err := client.GetPluginInfo(ctx, &req) - if err != nil { - return "", err - } - name := rsp.GetName() - if name == "" { - return "", fmt.Errorf("name is empty") - } - return name, nil -} - -func (c *csiConnection) Probe(ctx context.Context) error { - client := csi.NewIdentityClient(c.conn) - - req := csi.ProbeRequest{} - - _, err := client.Probe(ctx, &req) - if err != nil { - return err - } - return nil -} - -func (c *csiConnection) SupportsControllerCreateSnapshot(ctx context.Context) (bool, error) { - client := csi.NewControllerClient(c.conn) - req := csi.ControllerGetCapabilitiesRequest{} - - rsp, err := client.ControllerGetCapabilities(ctx, &req) - if err != nil { - return false, err - } - caps := rsp.GetCapabilities() - for _, cap := range caps { - if cap == nil { - continue - } - rpc := cap.GetRpc() - if rpc == nil { - continue - } - if rpc.GetType() == csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT { - return true, nil - } - } - return false, nil -} - -func (c *csiConnection) SupportsControllerListSnapshots(ctx context.Context) (bool, error) { - client := csi.NewControllerClient(c.conn) - req := csi.ControllerGetCapabilitiesRequest{} - - rsp, err := client.ControllerGetCapabilities(ctx, &req) - if err != nil { - return false, err - } - caps := rsp.GetCapabilities() - for _, cap := range caps { - if cap == nil { - continue - } - rpc := cap.GetRpc() - if rpc == nil { - continue - } - if rpc.GetType() == csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS { - return true, nil - } - } - return false, nil -} - -func (c *csiConnection) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, bool, error) { - klog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName) - if volume.Spec.CSI == nil { - return "", "", 0, 0, false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") - } - - client := csi.NewControllerClient(c.conn) - - driverName, err := c.GetDriverName(ctx) - if err != nil { - return "", "", 0, 0, false, err - } - - req := csi.CreateSnapshotRequest{ - SourceVolumeId: volume.Spec.CSI.VolumeHandle, - Name: snapshotName, - Parameters: parameters, - Secrets: snapshotterCredentials, - } - - rsp, err := client.CreateSnapshot(ctx, &req) - if err != nil { - return "", "", 0, 0, false, err - } - - klog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] readyToUse [%v]", snapshotName, driverName, rsp.Snapshot.SnapshotId, rsp.Snapshot.CreationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse) - creationTime, err := timestampToUnixTime(rsp.Snapshot.CreationTime) - if err != nil { - return "", "", 0, 0, false, err - } - return driverName, rsp.Snapshot.SnapshotId, creationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse, nil -} - -func (c *csiConnection) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) (err error) { - client := csi.NewControllerClient(c.conn) - - req := csi.DeleteSnapshotRequest{ - SnapshotId: snapshotID, - Secrets: snapshotterCredentials, - } - - if _, err := client.DeleteSnapshot(ctx, &req); err != nil { - return err - } - - return nil -} - -func (c *csiConnection) GetSnapshotStatus(ctx context.Context, snapshotID string) (bool, int64, int64, error) { - client := csi.NewControllerClient(c.conn) - - req := csi.ListSnapshotsRequest{ - SnapshotId: snapshotID, - } - - rsp, err := client.ListSnapshots(ctx, &req) - if err != nil { - return false, 0, 0, err - } - - if rsp.Entries == nil || len(rsp.Entries) == 0 { - return false, 0, 0, fmt.Errorf("can not find snapshot for snapshotID %s", snapshotID) - } - - creationTime, err := timestampToUnixTime(rsp.Entries[0].Snapshot.CreationTime) - if err != nil { - return false, 0, 0, err - } - return rsp.Entries[0].Snapshot.ReadyToUse, creationTime, rsp.Entries[0].Snapshot.SizeBytes, nil -} - -func (c *csiConnection) Close() error { - return c.conn.Close() -} - -func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - klog.V(5).Infof("GRPC call: %s", method) - klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) - err := invoker(ctx, method, req, reply, cc, opts...) - klog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(reply)) - klog.V(5).Infof("GRPC error: %v", err) - return err -} - -func timestampToUnixTime(t *timestamp.Timestamp) (int64, error) { - time, err := ptypes.Timestamp(t) - if err != nil { - return -1, err - } - // TODO: clean this up, we probably don't need this translation layer - // and can just use time.Time - return time.UnixNano(), nil -} diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go index 2f38bb015..2abee0978 100644 --- a/pkg/controller/csi_handler.go +++ b/pkg/controller/csi_handler.go @@ -23,6 +23,7 @@ import ( "time" crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" + "github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter" "k8s.io/api/core/v1" ) @@ -36,7 +37,7 @@ type Handler interface { // csiHandler is a handler that calls CSI to create/delete volume snapshot. type csiHandler struct { - snapshotter Snapshotter + snapshotter snapshotter.Snapshotter timeout time.Duration snapshotNamePrefix string snapshotNameUUIDLength int @@ -44,7 +45,7 @@ type csiHandler struct { // NewCSIHandler returns a handler which includes the csi connection and Snapshot name details func NewCSIHandler( - snapshotter Snapshotter, + snapshotter snapshotter.Snapshotter, timeout time.Duration, snapshotNamePrefix string, snapshotNameUUIDLength int, diff --git a/pkg/controller/snapshot_controller_base.go b/pkg/controller/snapshot_controller_base.go index 7e362624b..5458b1bcf 100644 --- a/pkg/controller/snapshot_controller_base.go +++ b/pkg/controller/snapshot_controller_base.go @@ -24,6 +24,7 @@ import ( clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned" storageinformers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions/volumesnapshot/v1alpha1" storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1alpha1" + "github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -81,7 +82,7 @@ func NewCSISnapshotController( pvcInformer coreinformers.PersistentVolumeClaimInformer, createSnapshotContentRetryCount int, createSnapshotContentInterval time.Duration, - snapshotter Snapshotter, + snapshotter snapshotter.Snapshotter, timeout time.Duration, resyncPeriod time.Duration, snapshotNamePrefix string, diff --git a/pkg/controller/snapshotter.go b/pkg/snapshotter/snapshotter.go similarity index 94% rename from pkg/controller/snapshotter.go rename to pkg/snapshotter/snapshotter.go index 3e24df3ee..ee5031dfc 100644 --- a/pkg/controller/snapshotter.go +++ b/pkg/snapshotter/snapshotter.go @@ -14,14 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package snapshotter import ( "context" "fmt" "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/golang/glog" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" csirpc "github.com/kubernetes-csi/csi-lib-utils/rpc" @@ -29,8 +28,10 @@ import ( "google.golang.org/grpc" "k8s.io/api/core/v1" + "k8s.io/klog" ) +// Snapshotter implements CreateSnapshot/DeleteSnapshot operations against a remote CSI driver. type Snapshotter interface { // CreateSnapshot creates a snapshot for a volume CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (driverName string, snapshotId string, timestamp int64, size int64, readyToUse bool, err error) @@ -53,7 +54,7 @@ func NewSnapshotter(conn *grpc.ClientConn) Snapshotter { } func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, int64, int64, bool, error) { - glog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName) + klog.V(5).Infof("CSI CreateSnapshot: %s", snapshotName) if volume.Spec.CSI == nil { return "", "", 0, 0, false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") } @@ -77,7 +78,7 @@ func (s *snapshot) CreateSnapshot(ctx context.Context, snapshotName string, volu return "", "", 0, 0, false, err } - glog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] readyToUse [%v]", snapshotName, driverName, rsp.Snapshot.SnapshotId, rsp.Snapshot.CreationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse) + klog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%d] size [%d] readyToUse [%v]", snapshotName, driverName, rsp.Snapshot.SnapshotId, rsp.Snapshot.CreationTime, rsp.Snapshot.SizeBytes, rsp.Snapshot.ReadyToUse) creationTime, err := timestampToUnixTime(rsp.Snapshot.CreationTime) if err != nil { return "", "", 0, 0, false, err diff --git a/pkg/controller/snapshotter_test.go b/pkg/snapshotter/snapshotter_test.go similarity index 99% rename from pkg/controller/snapshotter_test.go rename to pkg/snapshotter/snapshotter_test.go index 56da8dc76..dbe973d38 100644 --- a/pkg/controller/snapshotter_test.go +++ b/pkg/snapshotter/snapshotter_test.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Kubernetes Authors. +Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package controller +package snapshotter import ( "context"