diff --git a/pkg/kubelet/volumemanager/cache/BUILD b/pkg/kubelet/volumemanager/cache/BUILD index 76a34e9f614f7..cfe44cf64a9da 100644 --- a/pkg/kubelet/volumemanager/cache/BUILD +++ b/pkg/kubelet/volumemanager/cache/BUILD @@ -40,6 +40,7 @@ go_test( "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go index 08fe393e5a2a2..c5d84ee3cde1d 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world.go @@ -24,7 +24,7 @@ import ( "fmt" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog" @@ -59,7 +59,7 @@ type ActualStateOfWorld interface { // volume, reset the pod's remountRequired value. // If a volume with the name volumeName does not exist in the list of // attached volumes, an error is returned. - AddPodToVolume(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error + AddPodToVolume(operationexecutor.MarkVolumeOpts) error // MarkRemountRequired marks each volume that is successfully attached and // mounted for the specified pod as requiring remount (if the plugin for the @@ -68,13 +68,13 @@ type ActualStateOfWorld interface { // pod update. MarkRemountRequired(podName volumetypes.UniquePodName) - // SetVolumeGloballyMounted sets the GloballyMounted value for the given - // volume. When set to true this value indicates that the volume is mounted - // to the underlying device at a global mount point. This global mount point - // must unmounted prior to detach. + // SetDeviceMountState sets device mount state for the given volume. When deviceMountState is set to DeviceGloballyMounted + // then device is mounted at a global mount point. When it is set to DeviceMountUncertain then also it means volume + // MAY be globally mounted at a global mount point. In both cases - the volume must be unmounted from + // global mount point prior to detach. // If a volume with the name volumeName does not exist in the list of // attached volumes, an error is returned. - SetVolumeGloballyMounted(volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error + SetDeviceMountState(volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error // DeletePodFromVolume removes the given pod from the given volume in the // cache indicating the volume has been successfully unmounted from the pod. @@ -127,6 +127,10 @@ type ActualStateOfWorld interface { // actual state of the world. GetMountedVolumes() []MountedVolume + // GetAllMountedVolumes returns list of all possibly mounted volumes including + // those that are in VolumeMounted state and VolumeMountUncertain state. + GetAllMountedVolumes() []MountedVolume + // GetMountedVolumesForPod generates and returns a list of volumes that are // successfully attached and mounted for the specified pod based on the // current actual state of the world. @@ -165,10 +169,15 @@ type MountedVolume struct { type AttachedVolume struct { operationexecutor.AttachedVolume - // GloballyMounted indicates that the volume is mounted to the underlying - // device at a global mount point. This global mount point must unmounted - // prior to detach. - GloballyMounted bool + // DeviceMountState indicates if device has been globally mounted or is not. + DeviceMountState operationexecutor.DeviceMountState +} + +// DeviceMayBeMounted returns true if device is mounted in global path or is in +// uncertain state. +func (av AttachedVolume) DeviceMayBeMounted() bool { + return av.DeviceMountState == operationexecutor.DeviceGloballyMounted || + av.DeviceMountState == operationexecutor.DeviceMountUncertain } // NewActualStateOfWorld returns a new instance of ActualStateOfWorld. @@ -245,10 +254,9 @@ type attachedVolume struct { // this volume implements the volume.Attacher interface pluginIsAttachable bool - // globallyMounted indicates that the volume is mounted to the underlying - // device at a global mount point. This global mount point must be unmounted - // prior to detach. - globallyMounted bool + // deviceMountState stores information that tells us if device is mounted + // globally or not + deviceMountState operationexecutor.DeviceMountState // devicePath contains the path on the node where the volume is attached for // attachable volumes @@ -301,6 +309,11 @@ type mountedPod struct { // fsResizeRequired indicates the underlying volume has been successfully // mounted to this pod but its size has been expanded after that. fsResizeRequired bool + + // volumeMountStateForPod stores state of volume mount for the pod. if it is: + // - VolumeMounted: means volume for pod has been successfully mounted + // - VolumeMountUncertain: means volume for pod may not be mounted, but it must be unmounted + volumeMountStateForPod operationexecutor.VolumeMountState } func (asw *actualStateOfWorld) MarkVolumeAsAttached( @@ -318,24 +331,8 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached( asw.DeleteVolume(volumeName) } -func (asw *actualStateOfWorld) MarkVolumeAsMounted( - podName volumetypes.UniquePodName, - podUID types.UID, - volumeName v1.UniqueVolumeName, - mounter volume.Mounter, - blockVolumeMapper volume.BlockVolumeMapper, - outerVolumeSpecName string, - volumeGidValue string, - volumeSpec *volume.Spec) error { - return asw.AddPodToVolume( - podName, - podUID, - volumeName, - mounter, - blockVolumeMapper, - outerVolumeSpecName, - volumeGidValue, - volumeSpec) +func (asw *actualStateOfWorld) MarkVolumeAsMounted(markVolumeOpts operationexecutor.MarkVolumeOpts) error { + return asw.AddPodToVolume(markVolumeOpts) } func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) { @@ -354,12 +351,50 @@ func (asw *actualStateOfWorld) MarkVolumeAsUnmounted( func (asw *actualStateOfWorld) MarkDeviceAsMounted( volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error { - return asw.SetVolumeGloballyMounted(volumeName, true /* globallyMounted */, devicePath, deviceMountPath) + return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceGloballyMounted, devicePath, deviceMountPath) +} + +func (asw *actualStateOfWorld) MarkDeviceAsUncertain( + volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error { + return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceMountUncertain, devicePath, deviceMountPath) +} + +func (asw *actualStateOfWorld) MarkVolumeMountAsUncertain(markVolumeOpts operationexecutor.MarkVolumeOpts) error { + markVolumeOpts.VolumeMountState = operationexecutor.VolumeMountUncertain + return asw.AddPodToVolume(markVolumeOpts) } func (asw *actualStateOfWorld) MarkDeviceAsUnmounted( volumeName v1.UniqueVolumeName) error { - return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */, "", "") + return asw.SetDeviceMountState(volumeName, operationexecutor.DeviceNotMounted, "", "") +} + +func (asw *actualStateOfWorld) GetDeviceMountState(volumeName v1.UniqueVolumeName) operationexecutor.DeviceMountState { + asw.RLock() + defer asw.RUnlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if !volumeExists { + return operationexecutor.DeviceNotMounted + } + + return volumeObj.deviceMountState +} + +func (asw *actualStateOfWorld) GetVolumeMountState(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) operationexecutor.VolumeMountState { + asw.RLock() + defer asw.RUnlock() + + volumeObj, volumeExists := asw.attachedVolumes[volumeName] + if !volumeExists { + return operationexecutor.VolumeNotMounted + } + + podObj, podExists := volumeObj.mountedPods[podName] + if !podExists { + return operationexecutor.VolumeNotMounted + } + return podObj.volumeMountStateForPod } // addVolume adds the given volume to the cache indicating the specified @@ -405,7 +440,7 @@ func (asw *actualStateOfWorld) addVolume( mountedPods: make(map[volumetypes.UniquePodName]mountedPod), pluginName: volumePlugin.GetPluginName(), pluginIsAttachable: pluginIsAttachable, - globallyMounted: false, + deviceMountState: operationexecutor.DeviceNotMounted, devicePath: devicePath, } } else { @@ -420,15 +455,15 @@ func (asw *actualStateOfWorld) addVolume( return nil } -func (asw *actualStateOfWorld) AddPodToVolume( - podName volumetypes.UniquePodName, - podUID types.UID, - volumeName v1.UniqueVolumeName, - mounter volume.Mounter, - blockVolumeMapper volume.BlockVolumeMapper, - outerVolumeSpecName string, - volumeGidValue string, - volumeSpec *volume.Spec) error { +func (asw *actualStateOfWorld) AddPodToVolume(markVolumeOpts operationexecutor.MarkVolumeOpts) error { + podName := markVolumeOpts.PodName + podUID := markVolumeOpts.PodUID + volumeName := markVolumeOpts.VolumeName + mounter := markVolumeOpts.Mounter + blockVolumeMapper := markVolumeOpts.BlockVolumeMapper + outerVolumeSpecName := markVolumeOpts.OuterVolumeSpecName + volumeGidValue := markVolumeOpts.VolumeGidVolume + volumeSpec := markVolumeOpts.VolumeSpec asw.Lock() defer asw.Unlock() @@ -442,20 +477,21 @@ func (asw *actualStateOfWorld) AddPodToVolume( podObj, podExists := volumeObj.mountedPods[podName] if !podExists { podObj = mountedPod{ - podName: podName, - podUID: podUID, - mounter: mounter, - blockVolumeMapper: blockVolumeMapper, - outerVolumeSpecName: outerVolumeSpecName, - volumeGidValue: volumeGidValue, - volumeSpec: volumeSpec, + podName: podName, + podUID: podUID, + mounter: mounter, + blockVolumeMapper: blockVolumeMapper, + outerVolumeSpecName: outerVolumeSpecName, + volumeGidValue: volumeGidValue, + volumeSpec: volumeSpec, + volumeMountStateForPod: markVolumeOpts.VolumeMountState, } } // If pod exists, reset remountRequired value podObj.remountRequired = false + podObj.volumeMountStateForPod = markVolumeOpts.VolumeMountState asw.attachedVolumes[volumeName].mountedPods[podName] = podObj - return nil } @@ -554,8 +590,8 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired( } } -func (asw *actualStateOfWorld) SetVolumeGloballyMounted( - volumeName v1.UniqueVolumeName, globallyMounted bool, devicePath, deviceMountPath string) error { +func (asw *actualStateOfWorld) SetDeviceMountState( + volumeName v1.UniqueVolumeName, deviceMountState operationexecutor.DeviceMountState, devicePath, deviceMountPath string) error { asw.Lock() defer asw.Unlock() @@ -566,7 +602,7 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted( volumeName) } - volumeObj.globallyMounted = globallyMounted + volumeObj.deviceMountState = deviceMountState volumeObj.deviceMountPath = deviceMountPath if devicePath != "" { volumeObj.devicePath = devicePath @@ -628,6 +664,10 @@ func (asw *actualStateOfWorld) PodExistsInVolume( podObj, podExists := volumeObj.mountedPods[podName] if podExists { + // if volume mount was uncertain we should keep trying to mount the volume + if podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain { + return false, volumeObj.devicePath, nil + } if podObj.remountRequired { return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName) } @@ -668,9 +708,30 @@ func (asw *actualStateOfWorld) GetMountedVolumes() []MountedVolume { mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { for _, podObj := range volumeObj.mountedPods { - mountedVolume = append( - mountedVolume, - getMountedVolume(&podObj, &volumeObj)) + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } + } + } + return mountedVolume +} + +// GetAllMountedVolumes returns all volumes which could be locally mounted for a pod. +func (asw *actualStateOfWorld) GetAllMountedVolumes() []MountedVolume { + asw.RLock() + defer asw.RUnlock() + mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) + for _, volumeObj := range asw.attachedVolumes { + for _, podObj := range volumeObj.mountedPods { + if podObj.volumeMountStateForPod == operationexecutor.VolumeMounted || + podObj.volumeMountStateForPod == operationexecutor.VolumeMountUncertain { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } + } } @@ -683,10 +744,12 @@ func (asw *actualStateOfWorld) GetMountedVolumesForPod( defer asw.RUnlock() mountedVolume := make([]MountedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { - if podObj, podExists := volumeObj.mountedPods[podName]; podExists { - mountedVolume = append( - mountedVolume, - getMountedVolume(&podObj, &volumeObj)) + for mountedPodName, podObj := range volumeObj.mountedPods { + if mountedPodName == podName && podObj.volumeMountStateForPod == operationexecutor.VolumeMounted { + mountedVolume = append( + mountedVolume, + getMountedVolume(&podObj, &volumeObj)) + } } } @@ -699,7 +762,7 @@ func (asw *actualStateOfWorld) GetGloballyMountedVolumes() []AttachedVolume { globallyMountedVolumes := make( []AttachedVolume, 0 /* len */, len(asw.attachedVolumes) /* cap */) for _, volumeObj := range asw.attachedVolumes { - if volumeObj.globallyMounted { + if volumeObj.deviceMountState == operationexecutor.DeviceGloballyMounted { globallyMountedVolumes = append( globallyMountedVolumes, asw.newAttachedVolume(&volumeObj)) @@ -749,7 +812,7 @@ func (asw *actualStateOfWorld) newAttachedVolume( DevicePath: attachedVolume.devicePath, DeviceMountPath: attachedVolume.deviceMountPath, PluginName: attachedVolume.pluginName}, - GloballyMounted: attachedVolume.globallyMounted, + DeviceMountState: attachedVolume.deviceMountState, } } diff --git a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go index fd703f8d3dfac..e44f958a4ad3e 100644 --- a/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go +++ b/pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/volume" volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -220,9 +221,16 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) { } // Act - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) - + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -287,16 +295,22 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) { t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } // Act - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) - + err = asw.AddPodToVolume(markVolumeOpts) // Assert if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) @@ -388,8 +402,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName1, pod1.UID, generatedVolumeName1, mounter1, mapper1, volumeSpec1.Name(), "" /* volumeGidValue */, volumeSpec1) + markVolumeOpts1 := operationexecutor.MarkVolumeOpts{ + PodName: podName1, + PodUID: pod1.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter1, + BlockVolumeMapper: mapper1, + OuterVolumeSpecName: volumeSpec1.Name(), + VolumeSpec: volumeSpec1, + } + err = asw.AddPodToVolume(markVolumeOpts1) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -406,8 +428,16 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName2, pod2.UID, generatedVolumeName1, mounter2, mapper2, volumeSpec2.Name(), "" /* volumeGidValue */, volumeSpec2) + markVolumeOpts2 := operationexecutor.MarkVolumeOpts{ + PodName: podName2, + PodUID: pod2.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter2, + BlockVolumeMapper: mapper2, + OuterVolumeSpecName: volumeSpec2.Name(), + VolumeSpec: volumeSpec2, + } + err = asw.AddPodToVolume(markVolumeOpts2) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } @@ -421,7 +451,6 @@ func Test_AddTwoPodsToVolume_Positive(t *testing.T) { verifyVolumeExistsWithSpecNameInVolumeAsw(t, podName2, volumeSpec2.Name(), asw) verifyVolumeSpecNameInVolumeAsw(t, podName1, []*volume.Spec{volumeSpec1}, asw) verifyVolumeSpecNameInVolumeAsw(t, podName2, []*volume.Spec{volumeSpec2}, asw) - } // Calls AddPodToVolume() to add pod to empty data struct @@ -484,9 +513,16 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) { } // Act - err = asw.AddPodToVolume( - podName, pod.UID, volumeName, mounter, mapper, volumeSpec.Name(), "" /* volumeGidValue */, volumeSpec) - + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: volumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + } + err = asw.AddPodToVolume(markVolumeOpts) // Assert if err == nil { t.Fatalf("AddPodToVolume did not fail. Expected: <\"no volume with the name ... exists in the list of attached volumes\"> Actual: ") @@ -556,6 +592,76 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) { verifyVolumeExistsInGloballyMountedVolumes(t, generatedVolumeName, asw) } +func TestUncertainVolumeMounts(t *testing.T) { + // Arrange + volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t) + asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr) + devicePath := "fake/device/path" + + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name-1", + VolumeSource: v1.VolumeSource{ + GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + } + volumeSpec1 := &volume.Spec{Volume: &pod1.Spec.Volumes[0]} + generatedVolumeName1, err := util.GetUniqueVolumeNameFromSpec( + plugin, volumeSpec1) + require.NoError(t, err) + + err = asw.MarkVolumeAsAttached(generatedVolumeName1, volumeSpec1, "" /* nodeName */, devicePath) + if err != nil { + t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) + } + podName1 := util.GetUniquePodName(pod1) + + mounter1, err := plugin.NewMounter(volumeSpec1, pod1, volume.VolumeOptions{}) + if err != nil { + t.Fatalf("NewMounter failed. Expected: Actual: <%v>", err) + } + + markVolumeOpts1 := operationexecutor.MarkVolumeOpts{ + PodName: podName1, + PodUID: pod1.UID, + VolumeName: generatedVolumeName1, + Mounter: mounter1, + OuterVolumeSpecName: volumeSpec1.Name(), + VolumeSpec: volumeSpec1, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + err = asw.AddPodToVolume(markVolumeOpts1) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + mountedVolumes := asw.GetMountedVolumesForPod(podName1) + volumeFound := false + for _, volume := range mountedVolumes { + if volume.InnerVolumeSpecName == volumeSpec1.Name() { + volumeFound = true + } + } + if volumeFound { + t.Fatalf("expected volume %s to be not found in asw", volumeSpec1.Name()) + } + + volExists, _, _ := asw.PodExistsInVolume(podName1, generatedVolumeName1) + if volExists { + t.Fatalf("expected volume %s to not exist in asw", generatedVolumeName1) + } +} + func verifyVolumeExistsInGloballyMountedVolumes( t *testing.T, expectedVolumeName v1.UniqueVolumeName, asw ActualStateOfWorld) { globallyMountedVolumes := asw.GetGloballyMountedVolumes() diff --git a/pkg/kubelet/volumemanager/metrics/BUILD b/pkg/kubelet/volumemanager/metrics/BUILD index 385d70102e1d7..d39ab810a2342 100644 --- a/pkg/kubelet/volumemanager/metrics/BUILD +++ b/pkg/kubelet/volumemanager/metrics/BUILD @@ -37,6 +37,7 @@ go_test( "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/kubelet/volumemanager/metrics/metrics_test.go b/pkg/kubelet/volumemanager/metrics/metrics_test.go index 3c630d0061ca7..33f4d27eb015a 100644 --- a/pkg/kubelet/volumemanager/metrics/metrics_test.go +++ b/pkg/kubelet/volumemanager/metrics/metrics_test.go @@ -27,6 +27,7 @@ import ( volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) func TestMetricCollection(t *testing.T) { @@ -77,8 +78,17 @@ func TestMetricCollection(t *testing.T) { t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) } - err = asw.AddPodToVolume( - podName, pod.UID, generatedVolumeName, mounter, mapper, volumeSpec.Name(), "", volumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + OuterVolumeSpecName: volumeSpec.Name(), + VolumeSpec: volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = asw.AddPodToVolume(markVolumeOpts) if err != nil { t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) } diff --git a/pkg/kubelet/volumemanager/populator/BUILD b/pkg/kubelet/volumemanager/populator/BUILD index 09896dcadd009..cc3044a4bbf99 100644 --- a/pkg/kubelet/volumemanager/populator/BUILD +++ b/pkg/kubelet/volumemanager/populator/BUILD @@ -63,6 +63,7 @@ go_test( "//pkg/volume/csimigration:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/operationexecutor:go_default_library", "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go index 7d872deb8e891..9f887a1830598 100644 --- a/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go +++ b/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/volume/csimigration" volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" + "k8s.io/kubernetes/pkg/volume/util/operationexecutor" "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -854,8 +855,16 @@ func reconcileASW(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld, t if err != nil { t.Fatalf("Unexpected error when MarkVolumeAsAttached: %v", err) } - err = asw.MarkVolumeAsMounted(volumeToMount.PodName, volumeToMount.Pod.UID, - volumeToMount.VolumeName, nil, nil, volumeToMount.OuterVolumeSpecName, volumeToMount.VolumeGidValue, volumeToMount.VolumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = asw.MarkVolumeAsMounted(markVolumeOpts) if err != nil { t.Fatalf("Unexpected error when MarkVolumeAsMounted: %v", err) } diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 3524446d5d402..adb2e2038cd02 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -164,9 +164,21 @@ func (rc *reconciler) reconcile() { // referenced by a pod that was deleted and is now referenced by another // pod is unmounted from the first pod before being mounted to the new // pod. + rc.unmountVolumes() + // Next we mount required volumes. This function could also trigger + // attach if kubelet is responsible for attaching volumes. + // If underlying PVC was resized while in-use then this function also handles volume + // resizing. + rc.mountAttachVolumes() + + // Ensure devices that should be detached/unmounted are detached/unmounted. + rc.unmountDetachDevices() +} + +func (rc *reconciler) unmountVolumes() { // Ensure volumes that should be unmounted are unmounted. - for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { + for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() { if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { // Volume is mounted, unmount it klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) @@ -184,7 +196,9 @@ func (rc *reconciler) reconcile() { } } } +} +func (rc *reconciler) mountAttachVolumes() { // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) @@ -274,13 +288,14 @@ func (rc *reconciler) reconcile() { } } } +} - // Ensure devices that should be detached/unmounted are detached/unmounted. +func (rc *reconciler) unmountDetachDevices() { for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { - if attachedVolume.GloballyMounted { + if attachedVolume.DeviceMayBeMounted() { // Volume is globally mounted to device, unmount it klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) err := rc.operationExecutor.UnmountDevice( @@ -625,15 +640,18 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re klog.Errorf("Could not add volume information to actual state of world: %v", err) continue } - err = rc.actualStateOfWorld.MarkVolumeAsMounted( - volume.podName, - types.UID(volume.podName), - volume.volumeName, - volume.mounter, - volume.blockVolumeMapper, - volume.outerVolumeSpecName, - volume.volumeGidValue, - volume.volumeSpec) + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: volume.podName, + PodUID: types.UID(volume.podName), + VolumeName: volume.volumeName, + Mounter: volume.mounter, + BlockVolumeMapper: volume.blockVolumeMapper, + OuterVolumeSpecName: volume.outerVolumeSpecName, + VolumeGidVolume: volume.volumeGidValue, + VolumeSpec: volume.volumeSpec, + VolumeMountState: operationexecutor.VolumeMounted, + } + err = rc.actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if err != nil { klog.Errorf("Could not add pod to volume information to actual state of world: %v", err) continue diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index 0fccd8d7567bc..9fde0713d92a6 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -51,9 +51,11 @@ const ( reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond // waitForAttachTimeout is the maximum amount of time a // operationexecutor.Mount call will wait for a volume to be attached. - waitForAttachTimeout time.Duration = 1 * time.Second - nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename") - kubeletPodsDir string = "fake-dir" + waitForAttachTimeout time.Duration = 1 * time.Second + nodeName k8stypes.NodeName = k8stypes.NodeName("mynodename") + kubeletPodsDir string = "fake-dir" + testOperationBackOffDuration time.Duration = 100 * time.Millisecond + reconcilerSyncWaitDuration time.Duration = 10 * time.Second ) func hasAddedPods() bool { return true } @@ -336,7 +338,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownCallCount( @@ -428,7 +430,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownCallCount( @@ -739,7 +741,7 @@ func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount( @@ -855,7 +857,7 @@ func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) { // Act dsw.DeletePodFromVolume(podName, generatedVolumeName) - waitForDetach(t, fakePlugin, generatedVolumeName, asw) + waitForDetach(t, generatedVolumeName, asw) // Assert assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount( @@ -1134,7 +1136,7 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { // resize operation and clear the fsResizeRequired flag for volume. go reconciler.Run(wait.NeverStop) - waitErr := retryWithExponentialBackOff(500*time.Millisecond, func() (done bool, err error) { + waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) { mounted, _, err := asw.PodExistsInVolume(podName, volumeName) return mounted && err == nil, nil }) @@ -1145,13 +1147,385 @@ func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) { } } +func Test_UncertainDeviceGlobalMounts(t *testing.T) { + fsMode := v1.PersistentVolumeFilesystem + var tests = []struct { + name string + deviceState operationexecutor.DeviceMountState + unmountDeviceCallCount int + volumeName string + supportRemount bool + }{ + { + name: "timed out operations should result in device marked as uncertain", + deviceState: operationexecutor.DeviceMountUncertain, + unmountDeviceCallCount: 1, + volumeName: volumetesting.TimeoutOnMountDeviceVolumeName, + }, + { + name: "failed operation should result in not-mounted device", + deviceState: operationexecutor.DeviceNotMounted, + unmountDeviceCallCount: 0, + volumeName: volumetesting.FailMountDeviceVolumeName, + }, + { + name: "timeout followed by failed operation should result in non-mounted device", + deviceState: operationexecutor.DeviceNotMounted, + unmountDeviceCallCount: 0, + volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName, + }, + { + name: "success followed by timeout operation should result in mounted device", + deviceState: operationexecutor.DeviceGloballyMounted, + unmountDeviceCallCount: 1, + volumeName: volumetesting.SuccessAndTimeoutDeviceName, + supportRemount: true, + }, + { + name: "success followed by failed operation should result in mounted device", + deviceState: operationexecutor.DeviceGloballyMounted, + unmountDeviceCallCount: 1, + volumeName: volumetesting.SuccessAndFailOnMountDeviceName, + supportRemount: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &fsMode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + fakePlugin.SupportsRemount = tc.supportRemount + + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName { + // Wait upto 10s for reconciler to catchup + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName || + tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName { + // wait for mount and then break it via remount + waitForMount(t, fakePlugin, volumeName, asw) + asw.MarkRemountRequired(podName) + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.deviceState == operationexecutor.DeviceMountUncertain { + waitForUncertainGlobalMount(t, volumeName, asw) + } + + if tc.deviceState == operationexecutor.DeviceGloballyMounted { + waitForMount(t, fakePlugin, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) + if err != nil { + t.Errorf("Error verifying UnMountDeviceCallCount: %v", err) + } + }) + } +} + +func Test_UncertainVolumeMountState(t *testing.T) { + fsMode := v1.PersistentVolumeFilesystem + var tests = []struct { + name string + volumeState operationexecutor.VolumeMountState + unmountDeviceCallCount int + unmountVolumeCount int + volumeName string + supportRemount bool + }{ + { + name: "timed out operations should result in volume marked as uncertain", + volumeState: operationexecutor.VolumeMountUncertain, + unmountDeviceCallCount: 1, + unmountVolumeCount: 1, + volumeName: volumetesting.TimeoutOnSetupVolumeName, + }, + { + name: "failed operation should result in not-mounted volume", + volumeState: operationexecutor.VolumeNotMounted, + unmountDeviceCallCount: 0, + unmountVolumeCount: 0, + volumeName: volumetesting.FailOnSetupVolumeName, + }, + { + name: "timeout followed by failed operation should result in non-mounted volume", + volumeState: operationexecutor.VolumeNotMounted, + unmountDeviceCallCount: 0, + unmountVolumeCount: 0, + volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName, + }, + { + name: "success followed by timeout operation should result in mounted volume", + volumeState: operationexecutor.VolumeMounted, + unmountDeviceCallCount: 1, + unmountVolumeCount: 1, + volumeName: volumetesting.SuccessAndTimeoutSetupVolumeName, + supportRemount: true, + }, + { + name: "success followed by failed operation should result in mounted volume", + volumeState: operationexecutor.VolumeMounted, + unmountDeviceCallCount: 1, + unmountVolumeCount: 1, + volumeName: volumetesting.SuccessAndFailOnSetupVolumeName, + supportRemount: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.volumeName, + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + VolumeMode: &fsMode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: tc.volumeName, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } + + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + fakePlugin.SupportsRemount = tc.supportRemount + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{ + Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)), + DevicePath: "fake/path", + }) + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + false, /* checkNodeCapabilitiesBeforeMount */ + fakeHandler)) + + reconciler := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + &mount.FakeMounter{}, + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + volumeName, err := dsw.AddPodToVolume( + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */) + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName}) + + // Start the reconciler to fill ASW. + stopChan, stoppedChan := make(chan struct{}), make(chan struct{}) + go func() { + reconciler.Run(stopChan) + close(stoppedChan) + }() + waitForVolumeToExistInASW(t, volumeName, asw) + if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName { + // Wait upto 10s for reconciler to catchup + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName || + tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName { + // wait for mount and then break it via remount + waitForMount(t, fakePlugin, volumeName, asw) + asw.MarkRemountRequired(podName) + time.Sleep(reconcilerSyncWaitDuration) + } + + if tc.volumeState == operationexecutor.VolumeMountUncertain { + waitForUncertainPodMount(t, volumeName, asw) + } + + if tc.volumeState == operationexecutor.VolumeMounted { + waitForMount(t, fakePlugin, volumeName, asw) + } + + dsw.DeletePodFromVolume(podName, volumeName) + waitForDetach(t, volumeName, asw) + + volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin) + volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin) + }) + } + +} + +func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { + // check if volume is globally mounted in uncertain state + err := retryWithExponentialBackOff( + testOperationBackOffDuration, + func() (bool, error) { + unmountedVolumes := asw.GetUnmountedVolumes() + for _, v := range unmountedVolumes { + if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain { + return true, nil + } + } + return false, nil + }, + ) + + if err != nil { + t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName) + } +} + +func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { + // check if volume is locally pod mounted in uncertain state + err := retryWithExponentialBackOff( + testOperationBackOffDuration, + func() (bool, error) { + allMountedVolumes := asw.GetAllMountedVolumes() + for _, v := range allMountedVolumes { + if v.VolumeName == volumeName { + return true, nil + } + } + return false, nil + }, + ) + + if err != nil { + t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName) + } +} + func waitForMount( t *testing.T, fakePlugin *volumetesting.FakeVolumePlugin, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { mountedVolumes := asw.GetMountedVolumes() for _, mountedVolume := range mountedVolumes { @@ -1169,13 +1543,27 @@ func waitForMount( } } +func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { + err := retryWithExponentialBackOff( + testOperationBackOffDuration, + func() (bool, error) { + if asw.VolumeExists(volumeName) { + return true, nil + } + return false, nil + }, + ) + if err != nil { + t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName) + } +} + func waitForDetach( t *testing.T, - fakePlugin *volumetesting.FakeVolumePlugin, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( - time.Duration(500*time.Millisecond), + testOperationBackOffDuration, func() (bool, error) { if asw.VolumeExists(volumeName) { return false, nil diff --git a/pkg/kubelet/volumemanager/volume_manager.go b/pkg/kubelet/volumemanager/volume_manager.go index 3fe144a073fa4..300cf7159b8a5 100644 --- a/pkg/kubelet/volumemanager/volume_manager.go +++ b/pkg/kubelet/volumemanager/volume_manager.go @@ -430,6 +430,7 @@ func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, e // getUnmountedVolumes fetches the current list of mounted volumes from // the actual state of the world, and uses it to process the list of // expectedVolumes. It returns a list of unmounted volumes. +// The list also includes volume that may be mounted in uncertain state. func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string { mountedVolumes := sets.NewString() for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) { diff --git a/pkg/volume/csi/BUILD b/pkg/volume/csi/BUILD index 1b4bf551c8f51..f81314c7565ed 100644 --- a/pkg/volume/csi/BUILD +++ b/pkg/volume/csi/BUILD @@ -20,6 +20,7 @@ go_library( "//pkg/volume:go_default_library", "//pkg/volume/csi/nodeinfomanager:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", @@ -37,6 +38,8 @@ go_library( "//staging/src/k8s.io/csi-translation-lib/plugins:go_default_library", "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/strings:go_default_library", ], @@ -63,6 +66,7 @@ go_test( "//pkg/volume/csi/fake:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", + "//pkg/volume/util/types:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1:go_default_library", "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", diff --git a/pkg/volume/csi/csi_attacher.go b/pkg/volume/csi/csi_attacher.go index 1ca94ddfdfa80..fbe710fbfb55d 100644 --- a/pkg/volume/csi/csi_attacher.go +++ b/pkg/volume/csi/csi_attacher.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) const ( @@ -219,7 +220,7 @@ func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) { return deviceMountPath, nil } -func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) (err error) { +func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string) error { klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath)) if deviceMountPath == "" { @@ -246,6 +247,43 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo return errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err)) } + // lets check if node/unstage is supported + if c.csiClient == nil { + c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver)) + if err != nil { + return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err)) + } + } + csi := c.csiClient + + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + // Check whether "STAGE_UNSTAGE_VOLUME" is set + stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) + if err != nil { + return err + } + + // Get secrets and publish context required for mountDevice + nodeName := string(c.plugin.host.GetNodeName()) + publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName) + + if err != nil { + return volumetypes.NewTransientOperationFailure(err.Error()) + } + + nodeStageSecrets := map[string]string{} + // we only require secrets if csiSource has them and volume has NodeStage capability + if csiSource.NodeStageSecretRef != nil && stageUnstageSet { + nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef) + if err != nil { + err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v", + csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err) + // if we failed to fetch secret then that could be a transient error + return volumetypes.NewTransientOperationFailure(err.Error()) + } + } + // Store volume metadata for UnmountDevice. Keep it around even if the // driver does not support NodeStage, UnmountDevice still needs it. if err = os.MkdirAll(deviceMountPath, 0750); err != nil { @@ -265,7 +303,9 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo return err } defer func() { - if err != nil { + // Only if there was an error and volume operation was considered + // finished, we should remove the directory. + if err != nil && volumetypes.IsOperationFinishedError(err) { // clean up metadata klog.Errorf(log("attacher.MountDevice failed: %v", err)) if err := removeMountDir(c.plugin, deviceMountPath); err != nil { @@ -274,41 +314,12 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } }() - if c.csiClient == nil { - c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver)) - if err != nil { - return errors.New(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err)) - } - } - csi := c.csiClient - - ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) - defer cancel() - // Check whether "STAGE_UNSTAGE_VOLUME" is set - stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx) - if err != nil { - return err - } if !stageUnstageSet { klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice...")) // defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there. return nil } - // Start MountDevice - nodeName := string(c.plugin.host.GetNodeName()) - publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName) - - nodeStageSecrets := map[string]string{} - if csiSource.NodeStageSecretRef != nil { - nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef) - if err != nil { - err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v", - csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err) - return err - } - } - //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI accessMode := v1.ReadWriteOnce if spec.PersistentVolume.Spec.AccessModes != nil { @@ -336,7 +347,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo } klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath)) - return nil + return err } var _ volume.Detacher = &csiAttacher{} diff --git a/pkg/volume/csi/csi_attacher_test.go b/pkg/volume/csi/csi_attacher_test.go index 075601ad9ca5d..41c646cf8399f 100644 --- a/pkg/volume/csi/csi_attacher_test.go +++ b/pkg/volume/csi/csi_attacher_test.go @@ -44,7 +44,9 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" + fakecsi "k8s.io/kubernetes/pkg/volume/csi/fake" volumetest "k8s.io/kubernetes/pkg/volume/testing" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) var ( @@ -1054,73 +1056,107 @@ func TestAttacherGetDeviceMountPath(t *testing.T) { func TestAttacherMountDevice(t *testing.T) { pvName := "test-pv" + nonFinalError := volumetypes.NewUncertainProgressError("") + transientError := volumetypes.NewTransientOperationFailure("") + testCases := []struct { - testName string - volName string - devicePath string - deviceMountPath string - stageUnstageSet bool - shouldFail bool - spec *volume.Spec + testName string + volName string + devicePath string + deviceMountPath string + stageUnstageSet bool + shouldFail bool + createAttachment bool + exitError error + spec *volume.Spec }{ { - testName: "normal PV", - volName: "test-vol1", - devicePath: "path1", - deviceMountPath: "path2", - stageUnstageSet: true, - spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + testName: "normal PV", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), }, { - testName: "normal PV with mount options", - volName: "test-vol1", - devicePath: "path1", - deviceMountPath: "path2", - stageUnstageSet: true, - spec: volume.NewSpecFromPersistentVolume(makeTestPVWithMountOptions(pvName, 10, testDriver, "test-vol1", []string{"test-op"}), false), + testName: "normal PV with mount options", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPVWithMountOptions(pvName, 10, testDriver, "test-vol1", []string{"test-op"}), false), }, { - testName: "no vol name", - volName: "", - devicePath: "path1", - deviceMountPath: "path2", - stageUnstageSet: true, - shouldFail: true, - spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false), + testName: "normal PV but with missing attachment should result in no-change", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + createAttachment: false, + shouldFail: true, + exitError: transientError, + spec: volume.NewSpecFromPersistentVolume(makeTestPVWithMountOptions(pvName, 10, testDriver, "test-vol1", []string{"test-op"}), false), }, { - testName: "no device path", - volName: "test-vol1", - devicePath: "", - deviceMountPath: "path2", - stageUnstageSet: true, - shouldFail: false, - spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + testName: "no vol name", + volName: "", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + shouldFail: true, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, ""), false), }, { - testName: "no device mount path", - volName: "test-vol1", - devicePath: "path1", - deviceMountPath: "", - stageUnstageSet: true, - shouldFail: true, - spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + testName: "no device path", + volName: "test-vol1", + devicePath: "", + deviceMountPath: "path2", + stageUnstageSet: true, + shouldFail: false, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), }, { - testName: "stage_unstage cap not set", - volName: "test-vol1", - devicePath: "path1", - deviceMountPath: "path2", - stageUnstageSet: false, - spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + testName: "no device mount path", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "", + stageUnstageSet: true, + shouldFail: true, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), }, { - testName: "failure with volume source", - volName: "test-vol1", - devicePath: "path1", - deviceMountPath: "path2", - shouldFail: true, - spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)), + testName: "stage_unstage cap not set", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: false, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, "test-vol1"), false), + }, + { + testName: "failure with volume source", + volName: "test-vol1", + devicePath: "path1", + deviceMountPath: "path2", + shouldFail: true, + createAttachment: true, + spec: volume.NewSpecFromVolume(makeTestVol(pvName, testDriver)), + }, + { + testName: "pv with nodestage timeout should result in in-progress device", + volName: fakecsi.NodeStageTimeOut_VolumeID, + devicePath: "path1", + deviceMountPath: "path2", + stageUnstageSet: true, + createAttachment: true, + spec: volume.NewSpecFromPersistentVolume(makeTestPV(pvName, 10, testDriver, fakecsi.NodeStageTimeOut_VolumeID), false), + exitError: nonFinalError, + shouldFail: true, }, } @@ -1146,18 +1182,20 @@ func TestAttacherMountDevice(t *testing.T) { nodeName := string(csiAttacher.plugin.host.GetNodeName()) attachID := getAttachmentName(tc.volName, testDriver, nodeName) - // Set up volume attachment - attachment := makeTestAttachment(attachID, nodeName, pvName) - _, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) - if err != nil { - t.Fatalf("failed to attach: %v", err) + if tc.createAttachment { + // Set up volume attachment + attachment := makeTestAttachment(attachID, nodeName, pvName) + _, err := csiAttacher.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to attach: %v", err) + } + go func() { + fakeWatcher.Delete(attachment) + }() } - go func() { - fakeWatcher.Delete(attachment) - }() // Run - err = csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath) + err := csiAttacher.MountDevice(tc.spec, tc.devicePath, tc.deviceMountPath) // Verify if err != nil { @@ -1170,6 +1208,10 @@ func TestAttacherMountDevice(t *testing.T) { t.Errorf("test should fail, but no error occurred") } + if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) { + t.Fatalf("expected exitError: %v got: %v", tc.exitError, err) + } + // Verify call goes through all the way numStaged := 1 if !tc.stageUnstageSet { diff --git a/pkg/volume/csi/csi_client.go b/pkg/volume/csi/csi_client.go index 99c3181a0aa9f..1dce2eccc4cf7 100644 --- a/pkg/volume/csi/csi_client.go +++ b/pkg/volume/csi/csi_client.go @@ -28,11 +28,14 @@ import ( csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) type csiClient interface { @@ -213,6 +216,7 @@ func (c *csiDriverClient) NodePublishVolume( if targetPath == "" { return errors.New("missing target path") } + if c.nodeV1ClientCreator == nil { return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil") @@ -255,6 +259,9 @@ func (c *csiDriverClient) NodePublishVolume( } _, err = nodeClient.NodePublishVolume(ctx, req) + if err != nil && !isFinalError(err) { + return volumetypes.NewUncertainProgressError(err.Error()) + } return err } @@ -374,6 +381,9 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context, } _, err = nodeClient.NodeStageVolume(ctx, req) + if err != nil && !isFinalError(err) { + return volumetypes.NewUncertainProgressError(err.Error()) + } return err } @@ -613,3 +623,27 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, } return metrics, nil } + +func isFinalError(err error) bool { + // Sources: + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + // https://github.com/container-storage-interface/spec/blob/master/spec.md + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The operation must have failed before gRPC + // method was called, otherwise we would get gRPC error. + // We don't know if any previous volume operation is in progress, be on the safe side. + return false + } + switch st.Code() { + case codes.Canceled, // gRPC: Client Application cancelled the request + codes.DeadlineExceeded, // gRPC: Timeout + codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress. + codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress. + codes.Aborted: // CSI: Operation pending for volume + return false + } + // All other errors mean that operation either did not + // even start or failed. It is for sure not in progress. + return true +} diff --git a/pkg/volume/csi/csi_client_test.go b/pkg/volume/csi/csi_client_test.go index cef0e814fc6eb..83080da25753a 100644 --- a/pkg/volume/csi/csi_client_test.go +++ b/pkg/volume/csi/csi_client_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume/csi/fake" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) type fakeCsiDriverClient struct { @@ -156,6 +157,9 @@ func (c *fakeCsiDriverClient) NodePublishVolume( } _, err := c.nodeClient.NodePublishVolume(ctx, req) + if err != nil && !isFinalError(err) { + return volumetypes.NewUncertainProgressError(err.Error()) + } return err } @@ -201,6 +205,9 @@ func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context, } _, err := c.nodeClient.NodeStageVolume(ctx, req) + if err != nil && !isFinalError(err) { + return volumetypes.NewUncertainProgressError(err.Error()) + } return err } diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 2cce6de579ce2..ad53d124b783d 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" utilstrings "k8s.io/utils/strings" ) @@ -117,7 +118,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error csi, err := c.csiClientGetter.Get() if err != nil { - return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err)) + return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err)) + } ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) defer cancel() @@ -199,7 +201,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error nodeName := string(c.plugin.host.GetNodeName()) c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName) if err != nil { - return err + // we could have a transient error associated with fetching publish context + return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to fetch publishContext: %v", err)) } publishContext = c.publishContext } @@ -218,8 +221,8 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error if secretRef != nil { nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef) if err != nil { - return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v", - secretRef.Namespace, secretRef.Name, err) + return volumetypes.NewTransientOperationFailure(fmt.Sprintf("fetching NodePublishSecretRef %s/%s failed: %v", + secretRef.Namespace, secretRef.Name, err)) } } @@ -227,7 +230,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error // Inject pod information into volume_attributes podAttrs, err := c.podAttributes() if err != nil { - return errors.New(log("mounter.SetUpAt failed to assemble volume attributes: %v", err)) + return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err)) } if podAttrs != nil { if volAttribs == nil { @@ -254,10 +257,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error ) if err != nil { - if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil { - klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr)) + // If operation finished with error then we can remove the mount directory. + if volumetypes.IsOperationFinishedError(err) { + if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil { + klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr)) + } } - return errors.New(log("mounter.SetupAt failed: %v", err)) + return err } c.supportsSELinux, err = c.kubeVolHost.GetHostUtil().GetSELinuxSupport(dir) @@ -269,21 +275,13 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error // The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323 // if fstype is "", then skip fsgroup (could be indication of non-block filesystem) // if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup - err = c.applyFSGroup(fsType, mounterArgs.FsGroup) if err != nil { - // attempt to rollback mount. - fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err) - if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil { - klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr)) - return fsGrpErr - } - - if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil { - klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr)) - return fsGrpErr - } - return fsGrpErr + // At this point mount operation is successful: + // 1. Since volume can not be used by the pod because of invalid permissions, we must return error + // 2. Since mount is successful, we must record volume as mounted in uncertain state, so it can be + // cleaned up. + return volumetypes.NewUncertainProgressError(fmt.Sprintf("applyFSGroup failed for vol %s: %v", c.volumeID, err)) } klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir)) diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 985e208faa0ff..ef73b31ad9754 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -36,7 +36,9 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/volume" + fakecsi "k8s.io/kubernetes/pkg/volume/csi/fake" "k8s.io/kubernetes/pkg/volume/util" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) var ( @@ -396,6 +398,113 @@ func TestMounterSetUpSimple(t *testing.T) { } } +func TestMounterSetupWithStatusTracking(t *testing.T) { + fakeClient := fakeclient.NewSimpleClientset() + plug, tmpDir := newTestPlugin(t, fakeClient) + defer os.RemoveAll(tmpDir) + nonFinalError := volumetypes.NewUncertainProgressError("non-final-error") + transientError := volumetypes.NewTransientOperationFailure("transient-error") + + testCases := []struct { + name string + podUID types.UID + spec func(string, []string) *volume.Spec + shouldFail bool + exitError error + createAttachment bool + }{ + { + name: "setup with correct persistent volume source should result in finish exit status", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + spec: func(fsType string, options []string) *volume.Spec { + pvSrc := makeTestPV("pv1", 20, testDriver, "vol1") + pvSrc.Spec.CSI.FSType = fsType + pvSrc.Spec.MountOptions = options + return volume.NewSpecFromPersistentVolume(pvSrc, false) + }, + createAttachment: true, + }, + { + name: "setup with missing attachment should result in nochange", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + spec: func(fsType string, options []string) *volume.Spec { + return volume.NewSpecFromPersistentVolume(makeTestPV("pv3", 20, testDriver, "vol4"), false) + }, + exitError: transientError, + createAttachment: false, + shouldFail: true, + }, + { + name: "setup with timeout errors on NodePublish", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + spec: func(fsType string, options []string) *volume.Spec { + return volume.NewSpecFromPersistentVolume(makeTestPV("pv4", 20, testDriver, fakecsi.NodePublishTimeOut_VolumeID), false) + }, + createAttachment: true, + exitError: nonFinalError, + shouldFail: true, + }, + { + name: "setup with missing secrets should result in nochange exit", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + spec: func(fsType string, options []string) *volume.Spec { + pv := makeTestPV("pv5", 20, testDriver, "vol6") + pv.Spec.PersistentVolumeSource.CSI.NodePublishSecretRef = &api.SecretReference{ + Name: "foo", + Namespace: "default", + } + return volume.NewSpecFromPersistentVolume(pv, false) + }, + exitError: transientError, + createAttachment: true, + shouldFail: true, + }, + } + + for _, tc := range testCases { + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + t.Run(tc.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + tc.spec("ext4", []string{}), + &api.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}}, + volume.VolumeOptions{}, + ) + if mounter == nil { + t.Fatal("failed to create CSI mounter") + } + + csiMounter := mounter.(*csiMountMgr) + csiMounter.csiClient = setupClient(t, true) + + if csiMounter.volumeLifecycleMode != storagev1beta1.VolumeLifecyclePersistent { + t.Fatal("unexpected volume mode: ", csiMounter.volumeLifecycleMode) + } + + if tc.createAttachment { + attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName())) + attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name()) + _, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(attachment) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + } + err = csiMounter.SetUp(volume.MounterArgs{}) + + if tc.exitError != nil && reflect.TypeOf(tc.exitError) != reflect.TypeOf(err) { + t.Fatalf("expected exitError: %+v got: %+v", tc.exitError, err) + } + + if tc.shouldFail && err == nil { + t.Fatalf("expected failure but Setup succeeded") + } + + if !tc.shouldFail && err != nil { + t.Fatalf("expected success got mounter.Setup failed with: %v", err) + } + }) + } +} + func TestMounterSetUpWithInline(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIInlineVolume, true)() diff --git a/pkg/volume/csi/fake/BUILD b/pkg/volume/csi/fake/BUILD index c2c6b44d4f595..0d50b64946dc1 100644 --- a/pkg/volume/csi/fake/BUILD +++ b/pkg/volume/csi/fake/BUILD @@ -11,6 +11,8 @@ go_library( deps = [ "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", + "//vendor/google.golang.org/grpc/codes:go_default_library", + "//vendor/google.golang.org/grpc/status:go_default_library", ], ) diff --git a/pkg/volume/csi/fake/fake_client.go b/pkg/volume/csi/fake/fake_client.go index 19072963da022..0ce54f50ad826 100644 --- a/pkg/volume/csi/fake/fake_client.go +++ b/pkg/volume/csi/fake/fake_client.go @@ -21,9 +21,17 @@ import ( "errors" "strings" + csipb "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) - csipb "github.com/container-storage-interface/spec/lib/go/csi" +const ( + // NodePublishTimeout_VolumeID is volume id that will result in NodePublish operation to timeout + NodePublishTimeOut_VolumeID = "node-publish-timeout" + // NodeStageTimeOut_VolumeID is a volume id that will result in NodeStage operation to timeout + NodeStageTimeOut_VolumeID = "node-stage-timeout" ) // IdentityClient is a CSI identity client used for testing @@ -158,6 +166,12 @@ func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePubli if !strings.Contains(fsTypes, fsType) { return nil, errors.New("invalid fstype") } + + if req.GetVolumeId() == NodePublishTimeOut_VolumeID { + timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded") + return nil, timeoutErr + } + f.nodePublishedVolumes[req.GetVolumeId()] = CSIVolume{ VolumeHandle: req.GetVolumeId(), Path: req.GetTargetPath(), @@ -214,6 +228,11 @@ func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVo return nil, errors.New("invalid fstype") } + if req.GetVolumeId() == NodeStageTimeOut_VolumeID { + timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded") + return nil, timeoutErr + } + f.nodeStagedVolumes[req.GetVolumeId()] = csiVol return &csipb.NodeStageVolumeResponse{}, nil } diff --git a/pkg/volume/testing/BUILD b/pkg/volume/testing/BUILD index 8d9502fd3316d..b585365f1ee4f 100644 --- a/pkg/volume/testing/BUILD +++ b/pkg/volume/testing/BUILD @@ -18,6 +18,7 @@ go_library( "//pkg/volume/util/hostutil:go_default_library", "//pkg/volume/util/recyclerclient:go_default_library", "//pkg/volume/util/subpath:go_default_library", + "//pkg/volume/util/types:go_default_library", "//pkg/volume/util/volumepathhandler:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 0134d1b207771..0bdad8924bad2 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -51,6 +51,7 @@ import ( "k8s.io/kubernetes/pkg/volume/util/hostutil" "k8s.io/kubernetes/pkg/volume/util/recyclerclient" "k8s.io/kubernetes/pkg/volume/util/subpath" + volumetypes "k8s.io/kubernetes/pkg/volume/util/types" "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" ) @@ -66,6 +67,35 @@ const ( TimeoutAttachNode = "timeout-attach-node" // The node is marked as multi-attach which means it is allowed to attach the volume to multiple nodes. MultiAttachNode = "multi-attach-node" + // TimeoutOnSetupVolumeName will cause Setup call to timeout but volume will finish mounting. + TimeoutOnSetupVolumeName = "timeout-setup-volume" + // FailOnSetupVolumeName will cause setup call to fail + FailOnSetupVolumeName = "fail-setup-volume" + //TimeoutAndFailOnSetupVolumeName will first timeout and then fail the setup + TimeoutAndFailOnSetupVolumeName = "timeout-and-fail-setup-volume" + // SuccessAndTimeoutSetupVolumeName will cause first mount operation to succeed but subsequent attempts to timeout + SuccessAndTimeoutSetupVolumeName = "success-and-timeout-setup-volume-name" + // SuccessAndFailOnSetupVolumeName will cause first mount operation to succeed but subsequent attempts to fail + SuccessAndFailOnSetupVolumeName = "success-and-failed-setup-device-name" + + // TimeoutOnMountDeviceVolumeName will cause MountDevice call to timeout but Setup will finish. + TimeoutOnMountDeviceVolumeName = "timeout-mount-device-volume" + // TimeoutAndFailOnMountDeviceVolumeName will cause first MountDevice call to timeout but second call will fail + TimeoutAndFailOnMountDeviceVolumeName = "timeout-and-fail-mount-device-name" + // FailMountDeviceVolumeName will cause MountDevice operation on volume to fail + FailMountDeviceVolumeName = "fail-mount-device-volume-name" + // SuccessAndTimeoutDeviceName will cause first mount operation to succeed but subsequent attempts to timeout + SuccessAndTimeoutDeviceName = "success-and-timeout-device-name" + // SuccessAndFailOnMountDeviceName will cause first mount operation to succeed but subsequent attempts to fail + SuccessAndFailOnMountDeviceName = "success-and-failed-mount-device-name" + + deviceNotMounted = "deviceNotMounted" + deviceMountUncertain = "deviceMountUncertain" + deviceMounted = "deviceMounted" + + volumeNotMounted = "volumeNotMounted" + volumeMountUncertain = "volumeMountUncertain" + volumeMounted = "volumeMounted" ) // fakeVolumeHost is useful for testing volume plugins. @@ -345,6 +375,7 @@ type FakeVolumePlugin struct { VolumeLimitsError error LimitKey string ProvisionDelaySeconds int + SupportsRemount bool // Add callbacks as needed WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) @@ -383,6 +414,8 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume { UnmountDeviceHook: plugin.UnmountDeviceHook, } volume.VolumesAttached = make(map[string]types.NodeName) + volume.DeviceMountState = make(map[string]string) + volume.VolumeMountState = make(map[string]string) *list = append(*list, volume) return volume } @@ -420,7 +453,7 @@ func (plugin *FakeVolumePlugin) CanSupport(spec *Spec) bool { } func (plugin *FakeVolumePlugin) RequiresRemount() bool { - return false + return plugin.SupportsRemount } func (plugin *FakeVolumePlugin) SupportsMountOption() bool { @@ -784,7 +817,9 @@ type FakeVolume struct { VolName string Plugin *FakeVolumePlugin MetricsNil - VolumesAttached map[string]types.NodeName + VolumesAttached map[string]types.NodeName + DeviceMountState map[string]string + VolumeMountState map[string]string // Add callbacks as needed WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) @@ -835,7 +870,50 @@ func (fv *FakeVolume) CanMount() error { func (fv *FakeVolume) SetUp(mounterArgs MounterArgs) error { fv.Lock() defer fv.Unlock() + err := fv.setupInternal(mounterArgs) fv.SetUpCallCount++ + return err +} + +func (fv *FakeVolume) setupInternal(mounterArgs MounterArgs) error { + if fv.VolName == TimeoutOnSetupVolumeName { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return volumetypes.NewUncertainProgressError("time out on setup") + } + + if fv.VolName == FailOnSetupVolumeName { + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return fmt.Errorf("mounting volume failed") + } + + if fv.VolName == TimeoutAndFailOnSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if !ok { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return volumetypes.NewUncertainProgressError("time out on setup") + } + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return fmt.Errorf("mounting volume failed") + + } + + if fv.VolName == SuccessAndFailOnSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if ok { + fv.VolumeMountState[fv.VolName] = volumeNotMounted + return fmt.Errorf("mounting volume failed") + } + } + + if fv.VolName == SuccessAndTimeoutSetupVolumeName { + _, ok := fv.VolumeMountState[fv.VolName] + if ok { + fv.VolumeMountState[fv.VolName] = volumeMountUncertain + return volumetypes.NewUncertainProgressError("time out on setup") + } + } + + fv.VolumeMountState[fv.VolName] = volumeNotMounted return fv.SetUpAt(fv.getPath(), mounterArgs) } @@ -1036,19 +1114,64 @@ func (fv *FakeVolume) GetDeviceMountPath(spec *Spec) (string, error) { return "", nil } -func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string) error { +func (fv *FakeVolume) mountDeviceInternal(spec *Spec, devicePath string, deviceMountPath string) error { fv.Lock() defer fv.Unlock() + if spec.Name() == TimeoutOnMountDeviceVolumeName { + fv.DeviceMountState[spec.Name()] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("mount failed") + } + + if spec.Name() == FailMountDeviceVolumeName { + fv.DeviceMountState[spec.Name()] = deviceNotMounted + return fmt.Errorf("error mounting disk: %s", devicePath) + } + + if spec.Name() == TimeoutAndFailOnMountDeviceVolumeName { + _, ok := fv.DeviceMountState[spec.Name()] + if !ok { + fv.DeviceMountState[spec.Name()] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("timed out mounting error") + } + fv.DeviceMountState[spec.Name()] = deviceNotMounted + return fmt.Errorf("error mounting disk: %s", devicePath) + } + + if spec.Name() == SuccessAndTimeoutDeviceName { + _, ok := fv.DeviceMountState[spec.Name()] + if ok { + fv.DeviceMountState[spec.Name()] = deviceMountUncertain + return volumetypes.NewUncertainProgressError("error mounting state") + } + } + + if spec.Name() == SuccessAndFailOnMountDeviceName { + _, ok := fv.DeviceMountState[spec.Name()] + if ok { + return fmt.Errorf("error mounting disk: %s", devicePath) + } + } + fv.DeviceMountState[spec.Name()] = deviceMounted fv.MountDeviceCallCount++ return nil } +func (fv *FakeVolume) MountDevice(spec *Spec, devicePath string, deviceMountPath string) error { + return fv.mountDeviceInternal(spec, devicePath, deviceMountPath) +} + func (fv *FakeVolume) GetMountDeviceCallCount() int { fv.RLock() defer fv.RUnlock() return fv.MountDeviceCallCount } +func (fv *FakeVolume) GetUnmountDeviceCallCount() int { + fv.RLock() + defer fv.RUnlock() + return fv.UnmountDeviceCallCount +} + func (fv *FakeVolume) Detach(volumeName string, nodeName types.NodeName) error { fv.Lock() defer fv.Unlock() @@ -1304,6 +1427,28 @@ func VerifyMountDeviceCallCount( expectedMountDeviceCallCount) } +func VerifyUnmountDeviceCallCount(expectedCallCount int, fakeVolumePlugin *FakeVolumePlugin) error { + detachers := fakeVolumePlugin.GetDetachers() + if len(detachers) == 0 && (expectedCallCount == 0) { + return nil + } + actualCallCount := 0 + for _, detacher := range detachers { + actualCallCount = detacher.GetUnmountDeviceCallCount() + if expectedCallCount == 0 && actualCallCount == expectedCallCount { + return nil + } + + if (expectedCallCount > 0) && (actualCallCount >= expectedCallCount) { + return nil + } + } + + return fmt.Errorf( + "Expected DeviceUnmount Call %d, got %d", + expectedCallCount, actualCallCount) +} + // VerifyZeroMountDeviceCallCount ensures that all Attachers for this plugin // have a zero MountDeviceCallCount. Otherwise it returns an error. func VerifyZeroMountDeviceCallCount(fakeVolumePlugin *FakeVolumePlugin) error { @@ -1358,9 +1503,18 @@ func VerifyZeroSetUpCallCount(fakeVolumePlugin *FakeVolumePlugin) error { func VerifyTearDownCallCount( expectedTearDownCallCount int, fakeVolumePlugin *FakeVolumePlugin) error { - for _, unmounter := range fakeVolumePlugin.GetUnmounters() { + unmounters := fakeVolumePlugin.GetUnmounters() + if len(unmounters) == 0 && (expectedTearDownCallCount == 0) { + return nil + } + + for _, unmounter := range unmounters { actualCallCount := unmounter.GetTearDownCallCount() - if actualCallCount >= expectedTearDownCallCount { + if expectedTearDownCallCount == 0 && actualCallCount == expectedTearDownCallCount { + return nil + } + + if (expectedTearDownCallCount > 0) && (actualCallCount >= expectedTearDownCallCount) { return nil } } diff --git a/pkg/volume/util/operationexecutor/operation_executor.go b/pkg/volume/util/operationexecutor/operation_executor.go index c95029e6fd7f0..463770f1730fe 100644 --- a/pkg/volume/util/operationexecutor/operation_executor.go +++ b/pkg/volume/util/operationexecutor/operation_executor.go @@ -160,23 +160,48 @@ func NewOperationExecutor( } } +// MarkVolumeOpts is an struct to pass arguments to MountVolume functions +type MarkVolumeOpts struct { + PodName volumetypes.UniquePodName + PodUID types.UID + VolumeName v1.UniqueVolumeName + Mounter volume.Mounter + BlockVolumeMapper volume.BlockVolumeMapper + OuterVolumeSpecName string + VolumeGidVolume string + VolumeSpec *volume.Spec + VolumeMountState VolumeMountState +} + // ActualStateOfWorldMounterUpdater defines a set of operations updating the actual // state of the world cache after successful mount/unmount. type ActualStateOfWorldMounterUpdater interface { // Marks the specified volume as mounted to the specified pod - MarkVolumeAsMounted(podName volumetypes.UniquePodName, podUID types.UID, volumeName v1.UniqueVolumeName, mounter volume.Mounter, blockVolumeMapper volume.BlockVolumeMapper, outerVolumeSpecName string, volumeGidValue string, volumeSpec *volume.Spec) error + MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error // Marks the specified volume as unmounted from the specified pod MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error + // MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain + MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error + // Marks the specified volume as having been globally mounted. MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error + // MarkDeviceAsUncertain marks device state in global mount path as uncertain + MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath string) error + // Marks the specified volume as having its global mount unmounted. MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error // Marks the specified volume's file system resize request is finished. MarkVolumeAsResized(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error + + // GetDeviceMountState returns mount state of the device in global path + GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState + + // GetVolumeMountState returns mount state of the volume for the Pod + GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState } // ActualStateOfWorldAttacherUpdater defines a set of operations updating the @@ -354,6 +379,35 @@ type VolumeToMount struct { DesiredSizeLimit *resource.Quantity } +// DeviceMountState represents device mount state in a global path. +type DeviceMountState string + +const ( + // DeviceGloballyMounted means device has been globally mounted successfully + DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted" + + // DeviceMountUncertain means device may not be mounted but a mount operation may be + // in-progress which can cause device mount to succeed. + DeviceMountUncertain DeviceMountState = "DeviceMountUncertain" + + // DeviceNotMounted means device has not been mounted globally. + DeviceNotMounted DeviceMountState = "DeviceNotMounted" +) + +// VolumeMountState represents volume mount state in a path local to the pod. +type VolumeMountState string + +const ( + // VolumeMounted means volume has been mounted in pod's local path + VolumeMounted VolumeMountState = "VolumeMounted" + + // VolumeMountUncertain means volume may or may not be mounted in pods' local path + VolumeMountUncertain VolumeMountState = "VolumeMountUncertain" + + // VolumeNotMounted means volume has not be mounted in pod's local path + VolumeNotMounted VolumeMountState = "VolumeNotMounted" +) + // GenerateMsgDetailed returns detailed msgs for volumes to mount func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) { detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID) diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 91b86de990dc8..d453c0496084a 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -580,6 +580,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( devicePath, deviceMountPath) if err != nil { + og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld) // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MountVolume.MountDevice failed", err) } @@ -621,7 +622,19 @@ func (og *operationGenerator) GenerateMountVolumeFunc( FsGroup: fsGroup, DesiredSize: volumeToMount.DesiredSizeLimit, }) + // Update actual state of world + markOpts := MarkVolumeOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + Mounter: volumeMounter, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: VolumeMounted, + } if mountErr != nil { + og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld) // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr) } @@ -647,16 +660,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } } - // Update actual state of world - markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted( - volumeToMount.PodName, - volumeToMount.Pod.UID, - volumeToMount.VolumeName, - volumeMounter, - nil, - volumeToMount.OuterVolumeSpecName, - volumeToMount.VolumeGidValue, - volumeToMount.VolumeSpec) + markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr) @@ -679,6 +683,49 @@ func (og *operationGenerator) GenerateMountVolumeFunc( } } +func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, devicePath, deviceMountPath string, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { + if volumetypes.IsOperationFinishedError(mountError) && + actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceMountUncertain { + // Only devices which were uncertain can be marked as unmounted + markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName) + if markDeviceUnmountError != nil { + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error()) + } + return + } + + if volumetypes.IsUncertainProgressError(mountError) && + actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceNotMounted { + // only devices which are not mounted can be marked as uncertain. We do not want to mark a device + // which was previously marked as mounted here as uncertain. + markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath) + if markDeviceUncertainError != nil { + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error()) + } + } + +} + +func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) { + if volumetypes.IsOperationFinishedError(mountError) && + actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain { + t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName) + if t != nil { + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error()) + } + return + } + + if volumetypes.IsUncertainProgressError(mountError) && + actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeNotMounted { + t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts) + if t != nil { + klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error()) + } + } + +} + func (og *operationGenerator) GenerateUnmountVolumeFunc( volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, @@ -982,16 +1029,18 @@ func (og *operationGenerator) GenerateMapVolumeFunc( return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError) } - // Update actual state of world - markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted( - volumeToMount.PodName, - volumeToMount.Pod.UID, - volumeToMount.VolumeName, - nil, - blockVolumeMapper, - volumeToMount.OuterVolumeSpecName, - volumeToMount.VolumeGidValue, - volumeToMount.VolumeSpec) + markVolumeOpts := MarkVolumeOpts{ + PodName: volumeToMount.PodName, + PodUID: volumeToMount.Pod.UID, + VolumeName: volumeToMount.VolumeName, + BlockVolumeMapper: blockVolumeMapper, + OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName, + VolumeGidVolume: volumeToMount.VolumeGidValue, + VolumeSpec: volumeToMount.VolumeSpec, + VolumeMountState: VolumeMounted, + } + + markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts) if markVolMountedErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr) diff --git a/pkg/volume/util/types/types.go b/pkg/volume/util/types/types.go index 2811cd1dc6c73..1a34a7d31533a 100644 --- a/pkg/volume/util/types/types.go +++ b/pkg/volume/util/types/types.go @@ -51,6 +51,57 @@ func (o *GeneratedOperations) Run() (eventErr, detailedErr error) { return o.OperationFunc() } +// TransientOperationFailure indicates operation failed with a transient error +// and may fix itself when retried. +type TransientOperationFailure struct { + msg string +} + +func (err *TransientOperationFailure) Error() string { + return err.msg +} + +// NewTransientOperationFailure creates an instance of TransientOperationFailure error +func NewTransientOperationFailure(msg string) *TransientOperationFailure { + return &TransientOperationFailure{msg: msg} +} + +// UncertainProgressError indicates operation failed with a non-final error +// and operation may be in-progress in background. +type UncertainProgressError struct { + msg string +} + +func (err *UncertainProgressError) Error() string { + return err.msg +} + +// NewUncertainProgressError creates an instance of UncertainProgressError type +func NewUncertainProgressError(msg string) *UncertainProgressError { + return &UncertainProgressError{msg: msg} +} + +// IsOperationFinishedError checks if given error is of type that indicates +// operation is finished with a FINAL error. +func IsOperationFinishedError(err error) bool { + if _, ok := err.(*UncertainProgressError); ok { + return false + } + if _, ok := err.(*TransientOperationFailure); ok { + return false + } + return true +} + +// IsUncertainProgressError checks if given error is of type that indicates +// operation might be in-progress in background. +func IsUncertainProgressError(err error) bool { + if _, ok := err.(*UncertainProgressError); ok { + return true + } + return false +} + const ( // VolumeResizerKey is key that will be used to store resizer used // for resizing PVC. The generated key/value pair will be added diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 31fa8216e3ec6..0efabf9250878 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -19,7 +19,7 @@ package volume import ( "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -128,7 +128,12 @@ type Mounter interface { // content should be owned by 'fsGroup' so that it can be // accessed by the pod. This may be called more than once, so // implementations must be idempotent. + // It could return following types of errors: + // - TransientOperationFailure + // - UncertainProgressError + // - Error of any other type should be considered a final error SetUp(mounterArgs MounterArgs) error + // SetUpAt prepares and mounts/unpacks the volume to the // specified directory path, which may or may not exist yet. // The mount point and its content should be owned by @@ -247,6 +252,10 @@ type DeviceMounter interface { // MountDevice mounts the disk to a global path which // individual pods can then bind mount // Note that devicePath can be empty if the volume plugin does not implement any of Attach and WaitForAttach methods. + // It could return following types of errors: + // - TransientOperationFailure + // - UncertainProgressError + // - Error of any other type should be considered a final error MountDevice(spec *Spec, devicePath string, deviceMountPath string) error }