Skip to content

Commit

Permalink
Fix endless PVC await (#3130)
Browse files Browse the repository at this point in the history
This fixes an issue where `WaitForFirstConsumer` PVCs could cause
deadlocks due to the downstream consumer waiting for the PVC to be
bound.

We now attempt to fetch the PVC's storage class and the corresponding
`volumeBindMode`. If we can find the bind mode, then we consider a
`WaitForFirstConsumer ` PVC ready once it's `Pending`. The existing 
behavior is preserved if we can't find the bind mode, or if it's
`Immediate`.

An e2e test is included. I'm omitting unit tests as this might end up
getting refactored in some later PRs.

Fixes #895.
  • Loading branch information
blampe authored Jul 25, 2024
1 parent 641e0e6 commit 645df55
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
- Added java as supported language for CustomResource overlays. (https://github.com/pulumi/pulumi-kubernetes/pull/3120)
- Status messages reported during updates are now more accurately scoped to the
affected resource. (https://github.com/pulumi/pulumi-kubernetes/pull/3128)
- `PersistentVolumeClaims` with a bind mode of `WaitForFirstConsumer` will no
longer hang indefinitely. (https://github.com/pulumi/pulumi-kubernetes/pull/3130)

## 4.15.0 (July 9, 2024)

Expand Down
81 changes: 61 additions & 20 deletions provider/pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
)

const (
statusAvailable = "Available"
statusBound = "Bound"
)

// createAwaitConfig specifies on which conditions we are to consider a resource created and fully
// initialized. For example, we might consider a `Deployment` created and initialized only when the
// live number of Pods reaches the minimum liveness threshold. `pool` and `disco` are provided
Expand Down Expand Up @@ -244,7 +242,7 @@ var awaiters = map[string]awaitSpec{
awaitCreation: untilCoreV1PersistentVolumeInitialized,
},
coreV1PersistentVolumeClaim: {
awaitCreation: untilCoreV1PersistentVolumeClaimBound,
awaitCreation: untilCoreV1PersistentVolumeClaimReady,
},
coreV1Pod: {
awaitCreation: awaitPodInit,
Expand Down Expand Up @@ -436,7 +434,7 @@ func untilCoreV1NamespaceDeleted(config deleteAwaitConfig) error {
return err
}

statusPhase, _ := openapi.Pluck(ns.Object, "status", "phase")
statusPhase, _, _ := unstructured.NestedString(ns.Object, "status", "phase")
logger.V(3).Infof("Namespace %q status received: %#v", ns.GetName(), statusPhase)
if statusPhase == "" {
return nil
Expand All @@ -458,15 +456,17 @@ func untilCoreV1NamespaceDeleted(config deleteAwaitConfig) error {
// --------------------------------------------------------------------------

func untilCoreV1PersistentVolumeInitialized(c createAwaitConfig) error {
available := string(corev1.VolumeAvailable)
bound := string(corev1.VolumeBound)
pvAvailableOrBound := func(pv *unstructured.Unstructured) bool {
statusPhase, _ := openapi.Pluck(pv.Object, "status", "phase")
logger.V(3).Infof("Persistent volume %q status received: %#v", pv.GetName(), statusPhase)
if statusPhase == statusAvailable {
c.logStatus(diag.Info, "✅ PVC marked available")
} else if statusPhase == statusBound {
c.logStatus(diag.Info, "✅ PVC has been bound")
phase, _ := openapi.Pluck(pv.Object, "status", "phase")
logger.V(3).Infof("Persistent volume %q status received: %#v", pv.GetName(), phase)
if phase == available {
c.logStatus(diag.Info, "✅ PV marked available")
} else if phase == bound {
c.logStatus(diag.Info, "✅ PV has been bound")
}
return statusPhase == statusAvailable || statusPhase == statusBound
return phase == available || phase == bound
}

client, err := c.clientSet.ResourceClientForObject(c.currentOutputs)
Expand All @@ -483,19 +483,60 @@ func untilCoreV1PersistentVolumeInitialized(c createAwaitConfig) error {

// --------------------------------------------------------------------------

func untilCoreV1PersistentVolumeClaimBound(c createAwaitConfig) error {
pvcBound := func(pvc *unstructured.Unstructured) bool {
statusPhase, _ := openapi.Pluck(pvc.Object, "status", "phase")
logger.V(3).Infof("Persistent volume claim %s status received: %#v", pvc.GetName(), statusPhase)
return statusPhase == statusBound
func untilCoreV1PersistentVolumeClaimReady(c createAwaitConfig) error {
var bindMode string
pvcReady := func(pvc *unstructured.Unstructured) bool {
// Lookup the PVC's storage class once it's available.
if bindMode == "" {
b, err := pvcBindMode(c.ctx, c.clientSet, pvc)
if err != nil {
c.logStatus(diag.Warning, err.Error())
}
bindMode = b
}

phase, _, _ := unstructured.NestedString(pvc.Object, "status", "phase")
logger.V(3).Infof("Persistent volume claim %s status received: %#v", pvc.GetName(), phase)

if bindMode == string(storagev1.VolumeBindingWaitForFirstConsumer) {
return phase == string(corev1.ClaimPending)
}
return phase == string(corev1.ClaimBound)
}

client, err := c.clientSet.ResourceClientForObject(c.currentOutputs)
if err != nil {
return err
}
return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()).
WatchUntil(pvcBound, 5*time.Minute)
WatchUntil(pvcReady, 5*time.Minute)
}

// pvcBindMode attempts to fetch a PVC's StorageClass and returns its
// volumeBindingMode.
func pvcBindMode(
ctx context.Context,
clientset *clients.DynamicClientSet,
pvc *unstructured.Unstructured,
) (string, error) {
gvk := storagev1.SchemeGroupVersion.WithKind("StorageClass")
scClient, err := clientset.ResourceClient(gvk, "")
if err != nil {
return "", fmt.Errorf("getting storageclass client: %w", err)
}
name, _, err := unstructured.NestedString(pvc.Object, "spec", "storageClassName")
if err != nil {
return "", err
}
if name == "" {
return "", fmt.Errorf("no storage class found for %q", pvc.GetName())
}
sc, err := scClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("getting storagclass %q: %w", name, err)
}
bindMode, _, err := unstructured.NestedString(sc.Object, "volumeBindingMode")
return bindMode, err
}

// --------------------------------------------------------------------------
Expand Down
9 changes: 5 additions & 4 deletions provider/pkg/await/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -334,7 +336,6 @@ func (dia *deploymentInitAwaiter) await(
timeout,
aggregateErrorTicker <-chan time.Time,
) error {

// Before we start processing any ReplicaSet, PVC or Pod events, we need to wait until the Deployment controller
// has seen and updated the status of the Deployment object.
if err := dia.waitUntilDeploymentControllerReconciles(deploymentEvents, timeout); err != nil {
Expand Down Expand Up @@ -551,7 +552,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
dia.replicaSetAvailable = condition["reason"] == "NewReplicaSetAvailable" && isProgressing
}

if condition["type"] == statusAvailable {
if condition["type"] == string(appsv1.DeploymentAvailable) {
dia.deploymentAvailable = condition["status"] == trueStatus
if !dia.deploymentAvailable {
rawReason, hasReason := condition["reason"]
Expand Down Expand Up @@ -749,7 +750,7 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {

// Success only occurs when there are no PersistentVolumeClaims
// defined, or when all PVCs have a status of 'Bound'
if phase != statusBound {
if phase != string(corev1.ClaimBound) {
allPVCsReady = false
message := fmt.Sprintf(
"PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase)
Expand Down Expand Up @@ -855,7 +856,7 @@ func (dia *deploymentInitAwaiter) getFailedPersistentValueClaims() []string {
failed := make([]string, 0)
for _, pvc := range dia.pvcs {
phase, _ := openapi.Pluck(pvc.Object, "status", "phase")
if phase != statusBound {
if phase != string(corev1.ClaimBound) {
failed = append(failed, pvc.GetName())
}
}
Expand Down
20 changes: 20 additions & 0 deletions tests/sdk/java/await_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,23 @@ func TestAwaitDaemonSet(t *testing.T) {
_, err := test.CurrentStack().Up(context.Background())
assert.ErrorContains(t, err, `the Kubernetes API server reported that "default/await-daemonset" failed to fully initialize or become live: timed out waiting for the condition`)
}

func TestAwaitPVC(t *testing.T) {
t.Parallel()

test := pulumitest.NewPulumiTest(t,
"testdata/await/pvc",
opttest.SkipInstall(),
)
t.Cleanup(func() {
test.Destroy()
})

// WaitUntilFirstConsumer PVC should still be Pending.
up := test.Up()
assert.Equal(t, "Pending", up.Outputs["status"].Value)

// Adding a Deployment to consume the PVC should succeed.
test.UpdateSource("testdata/await/pvc/step2")
up = test.Up()
}
37 changes: 37 additions & 0 deletions tests/sdk/java/testdata/await/pvc/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: pvc
runtime: yaml
description: |
Tests PVC with WaitForFirstConsumer (https://github.com/pulumi/pulumi-kubernetes/issues/895)
resources:
ns:
type: kubernetes:core/v1:Namespace

provider:
type: pulumi:providers:kubernetes
properties:
namespace: ${ns.metadata.name}

sc:
type: kubernetes:storage.k8s.io/v1:StorageClass
properties:
volumeBindingMode: WaitForFirstConsumer
provisioner: rancher.io/local-path
options:
provider: ${provider}

pvc:
type: kubernetes:core/v1:PersistentVolumeClaim
properties:
spec:
accessModes:
- ReadWriteOnce
storageClassName: ${sc.metadata.name}
resources:
requests:
storage: 1Mi
options:
provider: ${provider}

outputs:
status: ${pvc.status.phase}
64 changes: 64 additions & 0 deletions tests/sdk/java/testdata/await/pvc/step2/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: pvc
runtime: yaml
description: |
Tests PVC with WaitForFirstConsumer (https://github.com/pulumi/pulumi-kubernetes/issues/895)
resources:
ns:
type: kubernetes:core/v1:Namespace

provider:
type: pulumi:providers:kubernetes
properties:
namespace: ${ns.metadata.name}

sc:
type: kubernetes:storage.k8s.io/v1:StorageClass
properties:
volumeBindingMode: WaitForFirstConsumer
provisioner: rancher.io/local-path
options:
provider: ${provider}

pvc:
type: kubernetes:core/v1:PersistentVolumeClaim
properties:
spec:
accessModes:
- ReadWriteOnce
storageClassName: ${sc.metadata.name}
resources:
requests:
storage: 1Mi
options:
provider: ${provider}

# Add a deployment to consume the PVC.
deployment:
type: kubernetes:apps/v1:Deployment
properties:
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:stable-alpine3.17-slim
volumeMounts:
- name: local
mountPath: /usr/share/nginx/html
volumes:
- name: local
persistentVolumeClaim:
claimName: ${pvc.metadata.name}
options:
provider: ${provider}

outputs:
status: ${pvc.status.phase}

0 comments on commit 645df55

Please sign in to comment.