Skip to content

Commit

Permalink
Use CSI driver to determine unique name for migrated in-tree plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiawei0227 committed May 8, 2021
1 parent 1a608e1 commit 17859c8
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 5 deletions.
17 changes: 17 additions & 0 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"k8s.io/kubernetes/pkg/features"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/csimigration"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
Expand Down Expand Up @@ -725,6 +726,22 @@ func (adc *attachDetachController) processVolumeAttachments() error {
err)
continue
}
pluginName := plugin.GetPluginName()
if adc.csiMigratedPluginManager.IsMigrationEnabledForPlugin(pluginName) {
plugin, _ = adc.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
// podNamespace is not needed here for Azurefile as the volumeName generated will be the same with or without podNamespace
volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator)
if err != nil {
klog.Errorf(
"Failed to translate intree volumeSpec to CSI volumeSpec for volume:%q, va.Name:%q, nodeName:%q: %v",
*pvName,
va.Name,
nodeName,
pluginName,
err)
continue
}
}
volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
klog.Errorf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,23 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
kcache "k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/gcepd"
"k8s.io/kubernetes/pkg/volume/util"
)

const (
intreePDUniqueNamePrefix = "kubernetes.io/gce-pd/"
csiPDUniqueNamePrefix = "kubernetes.io/csi/pd.csi.storage.gke.io^projects/UNSPECIFIED/zones/UNSPECIFIED/disks/"
)

func Test_NewAttachDetachController_Positive(t *testing.T) {
Expand Down Expand Up @@ -348,6 +359,7 @@ type vaTest struct {
vaName string
vaNodeName string
vaAttachStatus bool
csiMigration bool
expected_attaches map[string][]string
expected_detaches map[string][]string
}
Expand Down Expand Up @@ -386,6 +398,16 @@ func Test_ADC_VolumeAttachmentRecovery(t *testing.T) {
expected_attaches: map[string][]string{},
expected_detaches: map[string][]string{"mynode-1": {"vol1"}},
},
{
testName: "CSI Migration",
volName: "vol1",
podNodeName: "mynode-1",
pvName: "pv1",
vaName: "va1",
vaNodeName: "mynode-1",
vaAttachStatus: false,
csiMigration: true,
},
} {
t.Run(tc.testName, func(t *testing.T) {
volumeAttachmentRecoveryTestCase(t, tc)
Expand All @@ -396,7 +418,17 @@ func Test_ADC_VolumeAttachmentRecovery(t *testing.T) {
func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
fakeKubeClient := controllervolumetesting.CreateTestClient()
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1)
plugins := controllervolumetesting.CreateTestPlugin()
var plugins []volume.VolumePlugin
if tc.csiMigration {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, tc.csiMigration)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationGCE, tc.csiMigration)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InTreePluginGCEUnregister, tc.csiMigration)()

plugins = gcepd.ProbeVolumePlugins()
plugins = append(plugins, csi.ProbeVolumePlugins()...)
} else {
plugins = controllervolumetesting.CreateTestPlugin()
}
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
podInformer := informerFactory.Core().V1().Pods().Informer()
pvInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
Expand Down Expand Up @@ -443,6 +475,32 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
nodeInformer.GetIndexer().Add(&nodeToAdd)
}

if tc.csiMigration {
newNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: tc.podNodeName,
Labels: map[string]string{
"name": tc.podNodeName,
},
Annotations: map[string]string{
util.ControllerManagedAttachAnnotation: "true",
},
},
Status: v1.NodeStatus{
VolumesAttached: []v1.AttachedVolume{
{
Name: v1.UniqueVolumeName(csiPDUniqueNamePrefix + tc.volName),
DevicePath: "fake/path",
},
},
},
}
_, err = adc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err)
}
nodeInformer.GetIndexer().Add(&newNode)
}
// Create and add objects requested by the test
if tc.podName != "" {
newPod := controllervolumetesting.NewPodWithVolume(tc.podName, tc.volName, tc.podNodeName)
Expand All @@ -461,7 +519,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
pvInformer.GetIndexer().Add(newPv)
}
if tc.vaName != "" {
newVa := controllervolumetesting.NewVolumeAttachment("va1", "pv1", "mynode-1", false)
newVa := controllervolumetesting.NewVolumeAttachment(tc.vaName, tc.pvName, tc.vaNodeName, tc.vaAttachStatus)
_, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(context.TODO(), newVa, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Run failed with error. Failed to create a new volumeAttachment: <%v>", err)
Expand Down Expand Up @@ -497,8 +555,34 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
go adc.desiredStateOfWorldPopulator.Run(stopCh)
defer close(stopCh)

// Verify if expected attaches and detaches have happened
testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
if tc.csiMigration {
verifyExpectedVolumeState(t, adc, tc)
} else {
// Verify if expected attaches and detaches have happened
testPlugin := plugins[0].(*controllervolumetesting.TestPlugin)
verifyAttachDetachCalls(t, testPlugin, tc)
}

}

func verifyExpectedVolumeState(t *testing.T, adc *attachDetachController, tc vaTest) {
// Since csi migration is turned on, the attach state for the PV should be in CSI format.
attachedState := adc.actualStateOfWorld.GetAttachState(
v1.UniqueVolumeName(csiPDUniqueNamePrefix+tc.volName), types.NodeName(tc.vaNodeName))
if attachedState != cache.AttachStateAttached {
t.Fatalf("Expected attachedState %v, but it is %v", cache.AttachStateAttached, attachedState)
}

// kubernetes.io/gce-pd/<volName> should not be marked when CSI Migration is on
// so it should be in detach status
attachedState = adc.actualStateOfWorld.GetAttachState(
v1.UniqueVolumeName(intreePDUniqueNamePrefix+tc.volName), types.NodeName(tc.vaNodeName))
if attachedState != cache.AttachStateDetached {
t.Fatalf("Expected attachedState not to be %v, but it is %v", cache.AttachStateDetached, attachedState)
}
}

func verifyAttachDetachCalls(t *testing.T, testPlugin *controllervolumetesting.TestPlugin, tc vaTest) {
for tries := 0; tries <= 10; tries++ { // wait & try few times before failing the test
expected_op_map := tc.expected_attaches
plugin_map := testPlugin.GetAttachedVolumes()
Expand Down Expand Up @@ -543,5 +627,4 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) {
if testPlugin.GetErrorEncountered() {
t.Fatalf("Fatal error encountered in the testing volume plugin")
}

}
10 changes: 10 additions & 0 deletions pkg/controller/volume/attachdetach/testing/testvolumespec.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ func CreateTestClient() *fake.Clientset {
}
attachVolumeToNode("lostVolumeName", nodeName)
}
fakeClient.AddReactor("update", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
updateAction := action.(core.UpdateAction)
node := updateAction.GetObject().(*v1.Node)
for index, n := range nodes.Items {
if n.Name == node.Name {
nodes.Items[index] = *node
}
}
return true, updateAction.GetObject(), nil
})
fakeClient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
obj := &v1.NodeList{}
obj.Items = append(obj.Items, nodes.Items...)
Expand Down

0 comments on commit 17859c8

Please sign in to comment.