Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in agent Traceflow controller #5954

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions pkg/agent/controller/traceflow/traceflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ const (
)

type traceflowState struct {
name string
name string
// Used to uniquely identify Traceflow.
uid types.UID
tag int8
liveTraffic bool
droppedOnly bool
Expand Down Expand Up @@ -268,10 +270,17 @@ func (c *Controller) syncTraceflow(traceflowName string) error {
if tf.Status.DataplaneTag != 0 {
start := false
c.runningTraceflowsMutex.Lock()
if _, ok := c.runningTraceflows[tf.Status.DataplaneTag]; !ok {
tfState, ok := c.runningTraceflows[tf.Status.DataplaneTag]
c.runningTraceflowsMutex.Unlock()
// This may happen if a Traceflow is assigned with a tag that was just released from an old Traceflow but
// the agent hasn't processed the deletion event of the old Traceflow yet.
if ok && tfState.uid != tf.UID {
klog.V(2).InfoS("Found a stale Traceflow associated with the dataplane tag, cleaning it up", "tag", tf.Status.DataplaneTag, "currentTraceflow", traceflowName, "staleTraceflow", tfState.name)
c.cleanupTraceflow(tfState.name)
start = true
} else if !ok {
start = true
}
c.runningTraceflowsMutex.Unlock()
if start {
err = c.startTraceflow(tf)
}
Expand Down Expand Up @@ -336,7 +345,7 @@ func (c *Controller) startTraceflow(tf *crdv1beta1.Traceflow) error {
// Store Traceflow to cache.
c.runningTraceflowsMutex.Lock()
tfState := traceflowState{
name: tf.Name, tag: tf.Status.DataplaneTag,
uid: tf.UID, name: tf.Name, tag: tf.Status.DataplaneTag,
liveTraffic: liveTraffic, droppedOnly: tf.Spec.DroppedOnly && liveTraffic,
receiverOnly: receiverOnly, isSender: isSender}
c.runningTraceflows[tfState.tag] = &tfState
Expand Down Expand Up @@ -570,29 +579,19 @@ func (c *Controller) errorTraceflowCRD(tf *crdv1beta1.Traceflow, reason string)
return c.crdClient.CrdV1beta1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
}

// Delete Traceflow from cache.
func (c *Controller) deleteTraceflowState(tfName string) *traceflowState {
// Delete Traceflow state and OVS flows.
func (c *Controller) cleanupTraceflow(tfName string) {
c.runningTraceflowsMutex.Lock()
defer c.runningTraceflowsMutex.Unlock()
// Controller could have deallocated the tag and cleared the DataplaneTag
// field in the Traceflow Status, so try looking up the tag from the
// cache by Traceflow name.
for tag, tfState := range c.runningTraceflows {
if tfName == tfState.name {
// This must be executed before deleting the tag from runningTraceflows, otherwise it may uninstall another
// Traceflow's flows if the tag is reassigned.
if err := c.ofClient.UninstallTraceflowFlows(uint8(tag)); err != nil {
klog.ErrorS(err, "Failed to uninstall Traceflow flows", "Traceflow", tfName, "state", tfState)
}
delete(c.runningTraceflows, tag)
return tfState
}
}
return nil
}

// Delete Traceflow state and OVS flows.
func (c *Controller) cleanupTraceflow(tfName string) {
tfState := c.deleteTraceflowState(tfName)
if tfState != nil {
err := c.ofClient.UninstallTraceflowFlows(uint8(tfState.tag))
if err != nil {
klog.Errorf("Failed to uninstall Traceflow %s flows: %v", tfName, err)
break
}
}
}
85 changes: 81 additions & 4 deletions pkg/agent/controller/traceflow/traceflow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,8 @@ func TestSyncTraceflow(t *testing.T) {
tcs := []struct {
name string
tf *crdv1beta1.Traceflow
tfState *traceflowState
existingState *traceflowState
newState *traceflowState
expectedCalls func(mockOFClient *openflowtest.MockClient)
}{
{
Expand All @@ -698,11 +699,83 @@ func TestSyncTraceflow(t *testing.T) {
DataplaneTag: 1,
},
},
tfState: &traceflowState{
existingState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
},
newState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
},
},
{
name: "traceflow in running phase with empty state",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: pod2.Namespace,
Pod: pod2.Name,
},
},
Status: crdv1beta1.TraceflowStatus{
Phase: crdv1beta1.Running,
DataplaneTag: 1,
},
},
newState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
isSender: true,
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20))
mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1))
},
},
{
name: "traceflow in running phase with conflict state",
tf: &crdv1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"},
Spec: crdv1beta1.TraceflowSpec{
Source: crdv1beta1.Source{
Namespace: pod1.Namespace,
Pod: pod1.Name,
},
Destination: crdv1beta1.Destination{
Namespace: pod2.Namespace,
Pod: pod2.Name,
},
},
Status: crdv1beta1.TraceflowStatus{
Phase: crdv1beta1.Running,
DataplaneTag: 1,
},
},
existingState: &traceflowState{
name: "tf1",
uid: "uid2",
tag: 1,
},
newState: &traceflowState{
name: "tf1",
uid: "uid1",
tag: 1,
isSender: true,
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().UninstallTraceflowFlows(uint8(1))
mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20))
mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1))
},
},
{
name: "traceflow in failed phase",
tf: &crdv1beta1.Traceflow{
Expand All @@ -722,7 +795,7 @@ func TestSyncTraceflow(t *testing.T) {
DataplaneTag: 1,
},
},
tfState: &traceflowState{
existingState: &traceflowState{
name: "tf1",
tag: 1,
},
Expand All @@ -740,13 +813,17 @@ func TestSyncTraceflow(t *testing.T) {
tfc.crdInformerFactory.Start(stopCh)
tfc.crdInformerFactory.WaitForCacheSync(stopCh)

tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.tfState
if tt.existingState != nil {
tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.existingState
}
antoninbas marked this conversation as resolved.
Show resolved Hide resolved

if tt.expectedCalls != nil {
tt.expectedCalls(tfc.mockOFClient)
}

err := tfc.syncTraceflow(tt.tf.Name)
require.NoError(t, err)
assert.Equal(t, tt.newState, tfc.runningTraceflows[tt.tf.Status.DataplaneTag])
})
}
}
Expand Down
Loading