diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index e3545e3602c6d..15bb523cda7cf 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/features" proxyutil "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" "k8s.io/kubernetes/pkg/volume/csimigration" volumeutil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" @@ -725,6 +726,22 @@ func (adc *attachDetachController) processVolumeAttachments() error { err) continue } + pluginName := plugin.GetPluginName() + if adc.csiMigratedPluginManager.IsMigrationEnabledForPlugin(pluginName) { + plugin, _ = adc.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName) + // podNamespace is not needed here for Azurefile as the volumeName generated will be the same with or without podNamespace + volumeSpec, err = csimigration.TranslateInTreeSpecToCSI(volumeSpec, "" /* podNamespace */, adc.intreeToCSITranslator) + if err != nil { + klog.Errorf( + "Failed to translate intree volumeSpec to CSI volumeSpec for volume:%q, va.Name:%q, nodeName:%q: %v", + *pvName, + va.Name, + nodeName, + pluginName, + err) + continue + } + } volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec) if err != nil { klog.Errorf( diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go index 36845402e8f3d..18d5359485e03 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller_test.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller_test.go @@ -26,12 +26,23 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" kcache "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" + "k8s.io/kubernetes/pkg/volume/csi" + "k8s.io/kubernetes/pkg/volume/gcepd" + "k8s.io/kubernetes/pkg/volume/util" +) + +const ( + intreePDUniqueNamePrefix = "kubernetes.io/gce-pd/" + csiPDUniqueNamePrefix = "kubernetes.io/csi/pd.csi.storage.gke.io^projects/UNSPECIFIED/zones/UNSPECIFIED/disks/" ) func Test_NewAttachDetachController_Positive(t *testing.T) { @@ -348,6 +359,7 @@ type vaTest struct { vaName string vaNodeName string vaAttachStatus bool + csiMigration bool expected_attaches map[string][]string expected_detaches map[string][]string } @@ -386,6 +398,16 @@ func Test_ADC_VolumeAttachmentRecovery(t *testing.T) { expected_attaches: map[string][]string{}, expected_detaches: map[string][]string{"mynode-1": {"vol1"}}, }, + { + testName: "CSI Migration", + volName: "vol1", + podNodeName: "mynode-1", + pvName: "pv1", + vaName: "va1", + vaNodeName: "mynode-1", + vaAttachStatus: false, + csiMigration: true, + }, } { t.Run(tc.testName, func(t *testing.T) { volumeAttachmentRecoveryTestCase(t, tc) @@ -396,7 +418,17 @@ func Test_ADC_VolumeAttachmentRecovery(t *testing.T) { func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) { fakeKubeClient := controllervolumetesting.CreateTestClient() informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, time.Second*1) - plugins := controllervolumetesting.CreateTestPlugin() + var plugins []volume.VolumePlugin + if tc.csiMigration { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigration, tc.csiMigration)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIMigrationGCE, tc.csiMigration)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InTreePluginGCEUnregister, tc.csiMigration)() + + plugins = gcepd.ProbeVolumePlugins() + plugins = append(plugins, csi.ProbeVolumePlugins()...) + } else { + plugins = controllervolumetesting.CreateTestPlugin() + } nodeInformer := informerFactory.Core().V1().Nodes().Informer() podInformer := informerFactory.Core().V1().Pods().Informer() pvInformer := informerFactory.Core().V1().PersistentVolumes().Informer() @@ -443,6 +475,32 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) { nodeInformer.GetIndexer().Add(&nodeToAdd) } + if tc.csiMigration { + newNode := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.podNodeName, + Labels: map[string]string{ + "name": tc.podNodeName, + }, + Annotations: map[string]string{ + util.ControllerManagedAttachAnnotation: "true", + }, + }, + Status: v1.NodeStatus{ + VolumesAttached: []v1.AttachedVolume{ + { + Name: v1.UniqueVolumeName(csiPDUniqueNamePrefix + tc.volName), + DevicePath: "fake/path", + }, + }, + }, + } + _, err = adc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Run failed with error. Failed to create a new pod: <%v>", err) + } + nodeInformer.GetIndexer().Add(&newNode) + } // Create and add objects requested by the test if tc.podName != "" { newPod := controllervolumetesting.NewPodWithVolume(tc.podName, tc.volName, tc.podNodeName) @@ -461,7 +519,7 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) { pvInformer.GetIndexer().Add(newPv) } if tc.vaName != "" { - newVa := controllervolumetesting.NewVolumeAttachment("va1", "pv1", "mynode-1", false) + newVa := controllervolumetesting.NewVolumeAttachment(tc.vaName, tc.pvName, tc.vaNodeName, tc.vaAttachStatus) _, err = adc.kubeClient.StorageV1().VolumeAttachments().Create(context.TODO(), newVa, metav1.CreateOptions{}) if err != nil { t.Fatalf("Run failed with error. Failed to create a new volumeAttachment: <%v>", err) @@ -497,8 +555,34 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) { go adc.desiredStateOfWorldPopulator.Run(stopCh) defer close(stopCh) - // Verify if expected attaches and detaches have happened - testPlugin := plugins[0].(*controllervolumetesting.TestPlugin) + if tc.csiMigration { + verifyExpectedVolumeState(t, adc, tc) + } else { + // Verify if expected attaches and detaches have happened + testPlugin := plugins[0].(*controllervolumetesting.TestPlugin) + verifyAttachDetachCalls(t, testPlugin, tc) + } + +} + +func verifyExpectedVolumeState(t *testing.T, adc *attachDetachController, tc vaTest) { + // Since csi migration is turned on, the attach state for the PV should be in CSI format. + attachedState := adc.actualStateOfWorld.GetAttachState( + v1.UniqueVolumeName(csiPDUniqueNamePrefix+tc.volName), types.NodeName(tc.vaNodeName)) + if attachedState != cache.AttachStateAttached { + t.Fatalf("Expected attachedState %v, but it is %v", cache.AttachStateAttached, attachedState) + } + + // kubernetes.io/gce-pd/ should not be marked when CSI Migration is on + // so it should be in detach status + attachedState = adc.actualStateOfWorld.GetAttachState( + v1.UniqueVolumeName(intreePDUniqueNamePrefix+tc.volName), types.NodeName(tc.vaNodeName)) + if attachedState != cache.AttachStateDetached { + t.Fatalf("Expected attachedState not to be %v, but it is %v", cache.AttachStateDetached, attachedState) + } +} + +func verifyAttachDetachCalls(t *testing.T, testPlugin *controllervolumetesting.TestPlugin, tc vaTest) { for tries := 0; tries <= 10; tries++ { // wait & try few times before failing the test expected_op_map := tc.expected_attaches plugin_map := testPlugin.GetAttachedVolumes() @@ -543,5 +627,4 @@ func volumeAttachmentRecoveryTestCase(t *testing.T, tc vaTest) { if testPlugin.GetErrorEncountered() { t.Fatalf("Fatal error encountered in the testing volume plugin") } - } diff --git a/pkg/controller/volume/attachdetach/testing/testvolumespec.go b/pkg/controller/volume/attachdetach/testing/testvolumespec.go index 1a176f74fe7a1..3fd8ce9705773 100644 --- a/pkg/controller/volume/attachdetach/testing/testvolumespec.go +++ b/pkg/controller/volume/attachdetach/testing/testvolumespec.go @@ -158,6 +158,16 @@ func CreateTestClient() *fake.Clientset { } attachVolumeToNode("lostVolumeName", nodeName) } + fakeClient.AddReactor("update", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + updateAction := action.(core.UpdateAction) + node := updateAction.GetObject().(*v1.Node) + for index, n := range nodes.Items { + if n.Name == node.Name { + nodes.Items[index] = *node + } + } + return true, updateAction.GetObject(), nil + }) fakeClient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) { obj := &v1.NodeList{} obj.Items = append(obj.Items, nodes.Items...)