Skip to content
This repository has been archived by the owner on Oct 21, 2020. It is now read-only.

Commit

Permalink
Snapshot: Rework controller
Browse files Browse the repository at this point in the history
This is a cumulative patch changing controller to be conformant with the
proposal mainly with regards to status conditions. Most of the changes
were made by Xing Yang with contributions by Huamin Chen and Jing Xu.
  • Loading branch information
tsmetana committed Sep 7, 2017
1 parent 2a0d51a commit f696e70
Show file tree
Hide file tree
Showing 12 changed files with 920 additions and 186 deletions.
10 changes: 6 additions & 4 deletions snapshot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,25 @@ A Volume plugin must provide `RegisterPlugin()` to return plugin struct, `GetPlu
import (
"k8s.io/client-go/pkg/api/v1"

crdv1 "github.com/rootfs/snapshot/pkg/apis/crd/v1"
"github.com/rootfs/snapshot/pkg/cloudprovider"
crdv1 "github.com/kubernetes-incubator/external-storage/snapshot/pkg/apis/crd/v1"
"github.com/kubernetes-incubator/external-storage/snapshot/pkg/cloudprovider"
)

type VolumePlugin interface {
// Init inits volume plugin
Init(cloudprovider.Interface)
// SnapshotCreate creates a VolumeSnapshot from a PersistentVolumeSpec
SnapshotCreate(*v1.PersistentVolume) (*crdv1.VolumeSnapshotDataSource, error)
SnapshotCreate(*v1.PersistentVolume, *map[string]string) (*crdv1.VolumeSnapshotDataSource, *[]crdv1.VolumeSnapshotCondition, error)
// SnapshotDelete deletes a VolumeSnapshot
// PersistentVolume is provided for volume types, if any, that need PV Spec to delete snapshot
SnapshotDelete(*crdv1.VolumeSnapshotDataSource, *v1.PersistentVolume) error
// SnapshotRestore restores (promotes) a volume snapshot into a volume
SnapshotRestore(*crdv1.VolumeSnapshotData, *v1.PersistentVolumeClaim, string, map[string]string) (*v1.PersistentVolumeSource, map[string]string, error)
// Describe volume snapshot status.
// return true if the snapshot is ready
DescribeSnapshot(snapshotData *crdv1.VolumeSnapshotData) (isCompleted bool, err error)
DescribeSnapshot(snapshotData *crdv1.VolumeSnapshotData) (snapConditions *[]crdv1.VolumeSnapshotCondition, isCompleted bool, err error)
// FindSnapshot finds a VolumeSnapshot by matching metadata
FindSnapshot(tags *map[string]string) (*crdv1.VolumeSnapshotDataSource, *[]crdv1.VolumeSnapshotCondition, error)
// VolumeDelete deletes a PV
// TODO in the future pass kubernetes client for certain volumes (e.g. rbd) so they can access storage class to retrieve secret
VolumeDelete(pv *v1.PersistentVolume) error
Expand Down
12 changes: 10 additions & 2 deletions snapshot/pkg/apis/crd/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,24 @@ type VolumeSnapshotStatus struct {
// +optional
CreationTimestamp metav1.Time `json:"creationTimestamp" protobuf:"bytes,1,opt,name=creationTimestamp"`

// Representes the lates available observations about the volume snapshot
// Representes the latest available observations about the volume snapshot
Conditions []VolumeSnapshotCondition `json:"conditions" protobuf:"bytes,2,rep,name=conditions"`
}

type VolumeSnapshotConditionType string

// These are valid conditions of a volume snapshot.
const (
// VolumeSnapshotReady is added when the snapshot has been successfully created and is ready to be used.
// VolumeSnapshotConditionPending means the snapshot is cut and the application
// can resume accessing data if core_v1.ConditionStatus is True. It corresponds
// to "Uploading" in GCE PD or "Pending" in AWS and core_v1.ConditionStatus is True.
// It also corresponds to "Creating" in OpenStack Cinder and core_v1.ConditionStatus
// is Unknown.
VolumeSnapshotConditionPending VolumeSnapshotConditionType = "Pending"
// VolumeSnapshotConditionReady is added when the snapshot has been successfully created and is ready to be used.
VolumeSnapshotConditionReady VolumeSnapshotConditionType = "Ready"
// VolumeSnapshotConditionError means an error occurred during snapshot creation.
VolumeSnapshotConditionError VolumeSnapshotConditionType = "Error"
)

// VolumeSnapshot Condition describes the state of a volume snapshot at a certain point.
Expand Down
70 changes: 57 additions & 13 deletions snapshot/pkg/cloudprovider/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ type VolumeOptions struct {
// VolumeOptions specifies volume snapshot options.
type SnapshotOptions struct {
VolumeId string
Tags *map[string]string
}

// Volumes is an interface for managing cloud-provisioned volumes
Expand Down Expand Up @@ -348,14 +349,17 @@ type Volumes interface {
DisksAreAttached(map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error)

// Create an EBS volume snapshot
CreateSnapshot(snapshotOptions *SnapshotOptions) (snapshotId string, err error)
CreateSnapshot(snapshotOptions *SnapshotOptions) (snapshotId string, status string, err error)

// Delete an EBS volume snapshot
DeleteSnapshot(snapshotId string) (bool, error)

// Describe an EBS volume snapshot status for create or delete.
// return status (completed or pending or error), and error
DescribeSnapshot(snapshotId string) (isCompleted bool, err error)
DescribeSnapshot(snapshotId string) (status string, isCompleted bool, err error)

// Find snapshot by tags
FindSnapshot(tags map[string]string) ([]string, []string, error)
}

// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
Expand Down Expand Up @@ -1904,20 +1908,31 @@ func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolume
}

// CreateSnapshot creates an EBS volume snapshot
func (c *Cloud) CreateSnapshot(snapshotOptions *SnapshotOptions) (snapshotId string, err error) {
func (c *Cloud) CreateSnapshot(snapshotOptions *SnapshotOptions) (snapshotId string, status string, err error) {
request := &ec2.CreateSnapshotInput{}
request.VolumeId = aws.String(snapshotOptions.VolumeId)
request.DryRun = aws.Bool(false)
descriptions := "Created by Kubernetes for volume " + snapshotOptions.VolumeId
request.Description = aws.String(descriptions)
res, err := c.ec2.CreateSnapshot(request)
if err != nil {
return "", err
return "", "", err
}
if res == nil {
return "", fmt.Errorf("nil CreateSnapshotResponse")
return "", "", fmt.Errorf("nil CreateSnapshotResponse")
}
if snapshotOptions.Tags != nil {
awsID := awsVolumeID(aws.StringValue(res.SnapshotId))
// apply tags
if err := c.tagging.createTags(c.ec2, string(awsID), ResourceLifecycleOwned, *snapshotOptions.Tags); err != nil {
_, delerr := c.DeleteSnapshot(*res.SnapshotId)
if delerr != nil {
return "", "", fmt.Errorf("error tagging snapshot %s, could not delete the snapshot: %v", *res.SnapshotId, delerr)
}
return "", "", fmt.Errorf("error tagging snapshot %s: %v", *res.SnapshotId, err)
}
}
return *res.SnapshotId, nil
return *res.SnapshotId, *res.State, nil

}

Expand All @@ -1934,29 +1949,58 @@ func (c *Cloud) DeleteSnapshot(snapshotId string) (bool, error) {
}

// DescribeSnapshot returns the status of the snapshot
func (c *Cloud) DescribeSnapshot(snapshotId string) (isCompleted bool, err error) {
func (c *Cloud) DescribeSnapshot(snapshotId string) (status string, isCompleted bool, err error) {
request := &ec2.DescribeSnapshotsInput{
SnapshotIds: []*string{
aws.String(snapshotId),
},
}
result, err := c.ec2.DescribeSnapshots(request)
if err != nil {
return false, err
return "", false, err
}
if len(result) != 1 {
return false, fmt.Errorf("wrong result from DescribeSnapshots: %#v", result)
return "", false, fmt.Errorf("wrong result from DescribeSnapshots: %#v", result)
}
if result[0].State == nil {
return false, fmt.Errorf("missing state from DescribeSnapshots: %#v", result)
return "", false, fmt.Errorf("missing state from DescribeSnapshots: %#v", result)
}
if *result[0].State == ec2.SnapshotStateCompleted {
return true, nil
return *result[0].State, true, nil
}
if *result[0].State == ec2.SnapshotStateError {
return false, fmt.Errorf("snapshot state is error: %s", *result[0].StateMessage)
return *result[0].State, false, fmt.Errorf("snapshot state is error: %s", *result[0].StateMessage)
}
if *result[0].State == ec2.SnapshotStatePending {
return *result[0].State, false, nil
}
return *result[0].State, false, fmt.Errorf("unknown state")
}

// FindSnapshot returns the found snapshot
func (c *Cloud) FindSnapshot(tags map[string]string) ([]string, []string, error) {
request := &ec2.DescribeSnapshotsInput{}
for k, v := range tags {
filter := &ec2.Filter{}
filter.SetName(k)
filter.SetValues([]*string{&v})

request.Filters = append(request.Filters, filter)
}

result, err := c.ec2.DescribeSnapshots(request)
if err != nil {
return nil, nil, err
}
var snapshotIDs, statuses []string
for _, snapshot := range result {
id := *snapshot.SnapshotId
status := *snapshot.State
glog.Infof("found %s, status %s", id, status)
snapshotIDs = append(snapshotIDs, id)
statuses = append(statuses, status)
}
return false, fmt.Errorf(*result[0].StateMessage)
return snapshotIDs, statuses, nil
}

// Gets the current load balancer state
Expand Down
22 changes: 16 additions & 6 deletions snapshot/pkg/cloudprovider/providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ type Disks interface {

// Describe a GCE PD volume snapshot status for create or delete.
// return status (completed or pending or error), and error
DescribeSnapshot(snapshotToGet string) (isCompleted bool, err error)
DescribeSnapshot(snapshotToGet string) (status string, isCompleted bool, err error)

// Find snapshot by tags
FindSnapshot(tags map[string]string) ([]string, []string, error)

// GetAutoLabelsForPD returns labels to apply to PersistentVolume
// representing this PD, namely failure domain and zone.
Expand Down Expand Up @@ -2843,19 +2846,25 @@ func (gce *GCECloud) CreateDiskFromSnapshot(snapshot string,
return err
}

func (gce *GCECloud) DescribeSnapshot(snapshotToGet string) (isCompleted bool, err error) {
func (gce *GCECloud) DescribeSnapshot(snapshotToGet string) (status string, isCompleted bool, err error) {
snapshot, err := gce.getSnapshotByName(snapshotToGet)
if err != nil {
return false, err
return "", false, err
}
//no snapshot is found
if snapshot == nil {
return false, fmt.Errorf("snapshot %s is found", snapshotToGet)
return "", false, fmt.Errorf("snapshot %s is found", snapshotToGet)
}
if snapshot.Status == "READY" {
return true, nil
return snapshot.Status, true, nil
}
return false, nil
return snapshot.Status, false, nil
}

// FindSnapshot returns the found snapshots
func (gce *GCECloud) FindSnapshot(tags map[string]string) ([]string, []string, error) {
var snapshotIDs, statuses []string
return snapshotIDs, statuses, nil
}

func (gce *GCECloud) DeleteSnapshot(snapshotToDelete string) error {
Expand Down Expand Up @@ -2892,6 +2901,7 @@ func (gce *GCECloud) getSnapshotByName(snapshotName string) (*gceSnapshot, error
return nil, nil
}

// TODO: CreateSnapshot should return snapshot status
func (gce *GCECloud) CreateSnapshot(diskName string, zone string, snapshotName string, tags map[string]string) error {
isManaged := false
for _, managedZone := range gce.managedZones {
Expand Down
Loading

0 comments on commit f696e70

Please sign in to comment.