Skip to content

Commit

Permalink
[processor/k8sattributes] Handle all resource deletion event types (#…
Browse files Browse the repository at this point in the history
…27847)

**Description:**

The k8s go client's cache expects OnDelete handlers to handle objects of
type DeletedFinalStateUnknown when the cache's watch mechanism misses a
delete and notices later. This changes the processor to handle such
deletes as if they were normal, rather than logging an error and
dropping the change.

**Link to tracking Issue:**
#27632

**Testing:**

Only what you see in the unit tests. I am open to suggestions, but I
don't see this being a code path we can reasonably cover in the e2e test
suite.

Verified manually locally on a kind cluster.
* Stood up two deployments loosely based off e2e testing resources, one
w/ a collector built from this branch and the other
docker.io/otel/opentelemetry-collector-contrib:latest.
* Both included an additional container in the collector pod I used to
fiddle with iptables rules.
* Added rules to reject traffic to/from the kube api server
* Deleted some namespaces containing deployments generating telemetry.
* Restored connectivity by removing the iptables rules.
* Observed the collector built from this branch was silent (aside from
the junk the k8s client logs due to the broken connection)
* Observed the latest
([0.87.0](https://hub.docker.com/layers/otel/opentelemetry-collector-contrib/0.87.0/images/sha256-77cdd395b828b09cb920c671966f09a87a40611aa6107443146086f2046f4a9a?context=explore))
collector logged a handful of errors for the deleted resources
(api_v1.Pod, and apps_v1.ReplicaSet. I probably just didn't wait long
enough for Namespace.)

```
2023-10-19T02:18:37.781Z        error   kube/client.go:236      object received was not of type api_v1.Pod      {"kind": "processor", "name": "k8sattributes", "pipeline": "metrics", "received": {"Key":"src1/telemetrygen-patched-766d55cbcb-8zktr","Obj":{"metadata":{"name":"telemetrygen-patched-766d55cbcb-8zktr","namespace":"src1","uid":"be5d2268-c8b0-434d-b3b8-8b18083c7a8b","creat
ionTimestamp":"2023-10-19T02:01:08Z","labels":{"app":"telemetrygen-patched","pod-template-hash":"766d55cbcb"},"ownerReferences":[{"apiVersion":"apps/v1","kind":"ReplicaSet","name":"telemetrygen-patched-766d55cbcb","uid":"a887d67a-d5d6-4269-b520-45dbb4f1cd82","controller":true,"blockOwnerDeletion":true}]},"spec":{"containers":[{"name":"telemetrygen","image":"localhost/telemetrygen
:latest","resources":{}}],"nodeName":"manual-e2e-testing-control-plane"},"status":{"podIP":"10.244.0.56","startTime":"2023-10-19T02:01:08Z","containerStatuses":[{"name":"telemetrygen","state":{},"lastState":{},"ready":false,"restartCount":0,"image":"","imageID":"","containerID":"containerd://2821ef32cd8bf93a13414504c0f8f0c016c84be49d6ffdbd475d7e4681e90c51"}]}}}}
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube.(*WatchClient).handlePodDelete
        github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor@v0.87.0/internal/kube/client.go:236
k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnDelete
        k8s.io/client-go@v0.28.2/tools/cache/controller.go:253
...

2023-10-19T02:19:03.970Z        error   kube/client.go:868      object received was not of type apps_v1.ReplicaSet      {"kind": "processor", "name": "k8sattributes", "pipeline": "metrics", "received": {"Key":"src1/telemetrygen-stable-5c444bb8b8","Obj":{"metadata":{"name":"telemetrygen-stable-5c444bb8b8","namespace":"src1","uid":"d37707ff-b308-4339-8543-a1caf5705ea8","creationTimestamp":null,"ownerReferences":[{"apiVersion":"apps/v1","kind":"Deployment","name":"telemetrygen-stable","uid":"c421276e-e1bf-40c5-85e1-e92e30363da5","controller":true,"blockOwnerDeletion":true}]},"spec":{"selector":null,"template":{"metadata":{"creationTimestamp":null},"spec":{"containers":null}}},"status":{"replicas":0}}}}
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube.(*WatchClient).handleReplicaSetDelete
        github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor@v0.87.0/internal/kube/client.go:868
k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnDelete
        k8s.io/client-go@v0.28.2/tools/cache/controller.go:253
k8s.io/client-go/tools/cache.(*processorListener).run.func1
        k8s.io/client-go@v0.28.2/tools/cache/shared_informer.go:979
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
...
```
**Documentation:** N/A - it is not clear to me whether or not this
should land on the changelog. Its impact on users is marginal.

Signed-off-by: Christian Kruse <ctkruse99@gmail.com>
  • Loading branch information
c-kruse authored Oct 19, 2023
1 parent 4b41ec6 commit b0f0dda
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
16 changes: 13 additions & 3 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *WatchClient) handlePodUpdate(_, newPod interface{}) {

func (c *WatchClient) handlePodDelete(obj interface{}) {
observability.RecordPodDeleted()
if pod, ok := obj.(*api_v1.Pod); ok {
if pod, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Pod); ok {
c.forgetPod(pod)
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *WatchClient) handleNamespaceUpdate(_, newNamespace interface{}) {

func (c *WatchClient) handleNamespaceDelete(obj interface{}) {
observability.RecordNamespaceDeleted()
if namespace, ok := obj.(*api_v1.Namespace); ok {
if namespace, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Namespace); ok {
c.m.Lock()
if ns, ok := c.Namespaces[namespace.Name]; ok {
// When a namespace is deleted all the pods(and other k8s objects in that namespace) in that namespace are deleted before it.
Expand Down Expand Up @@ -859,7 +859,7 @@ func (c *WatchClient) handleReplicaSetUpdate(_, newRS interface{}) {

func (c *WatchClient) handleReplicaSetDelete(obj interface{}) {
observability.RecordReplicaSetDeleted()
if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok {
if replicaset, ok := ignoreDeletedFinalStateUnknown(obj).(*apps_v1.ReplicaSet); ok {
c.m.Lock()
key := string(replicaset.UID)
delete(c.ReplicaSets, key)
Expand Down Expand Up @@ -915,3 +915,13 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) {
}
return nil, false
}

// ignoreDeletedFinalStateUnknown returns the object wrapped in
// DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do
// not need the additional context.
func ignoreDeletedFinalStateUnknown(obj interface{}) interface{} {
if obj, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return obj.Obj
}
return obj
}
29 changes: 28 additions & 1 deletion processor/k8sattributesprocessor/internal/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)
Expand Down Expand Up @@ -250,6 +251,14 @@ func TestReplicaSetHandler(t *testing.T) {
// test delete replicaset
c.handleReplicaSetDelete(updatedReplicaset)
assert.Equal(t, len(c.ReplicaSets), 0)
// test delete replicaset when DeletedFinalStateUnknown
c.handleReplicaSetAdd(replicaset)
require.Equal(t, len(c.ReplicaSets), 1)
c.handleReplicaSetDelete(cache.DeletedFinalStateUnknown{
Obj: replicaset,
})
assert.Equal(t, len(c.ReplicaSets), 0)

}

func TestPodHostNetwork(t *testing.T) {
Expand Down Expand Up @@ -427,13 +436,14 @@ func TestPodDelete(t *testing.T) {
assert.False(t, deleteRequest.ts.Before(tsBeforeDelete))
assert.False(t, deleteRequest.ts.After(time.Now()))

// delete when DeletedFinalStateUnknown
c.deleteQueue = c.deleteQueue[:0]
pod = &api_v1.Pod{}
pod.Name = "podC"
pod.Status.PodIP = "2.2.2.2"
pod.UID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
tsBeforeDelete = time.Now()
c.handlePodDelete(pod)
c.handlePodDelete(cache.DeletedFinalStateUnknown{Obj: pod})
assert.Equal(t, 5, len(c.Pods))
assert.Equal(t, 5, len(c.deleteQueue))
deleteRequest = c.deleteQueue[0]
Expand Down Expand Up @@ -464,6 +474,23 @@ func TestNamespaceDelete(t *testing.T) {
assert.Equal(t, 2, len(c.Namespaces))
got := c.Namespaces["namespaceA"]
assert.Equal(t, "namespaceA", got.Name)
// delete non-existent namespace when DeletedFinalStateUnknown
c.handleNamespaceDelete(cache.DeletedFinalStateUnknown{Obj: namespace})
assert.Equal(t, 2, len(c.Namespaces))
got = c.Namespaces["namespaceA"]
assert.Equal(t, "namespaceA", got.Name)

// delete namespace A
namespace.Name = "namespaceA"
c.handleNamespaceDelete(namespace)
assert.Equal(t, 1, len(c.Namespaces))
got = c.Namespaces["namespaceB"]
assert.Equal(t, "namespaceB", got.Name)

// delete namespace B when DeletedFinalStateUnknown
namespace.Name = "namespaceB"
c.handleNamespaceDelete(cache.DeletedFinalStateUnknown{Obj: namespace})
assert.Equal(t, 0, len(c.Namespaces))
}

func TestDeleteQueue(t *testing.T) {
Expand Down

0 comments on commit b0f0dda

Please sign in to comment.