Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the deadlock between the exporter and the conntrack polling go routines #2429

Merged
merged 1 commit into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (cs *ConntrackConnectionStore) Poll() ([]int, error) {
// Reset IsPresent flag for all connections in connection map before dumping flows in conntrack module.
// if the connection does not exist in conntrack table and has been exported, we will delete it from connection map.
deleteIfStaleOrResetConn := func(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
if !conn.IsPresent && conn.DoneExport {
if !conn.IsPresent && conn.DyingAndDoneExport {
if err := cs.DeleteConnWithoutLock(key); err != nil {
return err
}
Expand Down Expand Up @@ -247,16 +247,3 @@ func (cs *ConntrackConnectionStore) DeleteConnWithoutLock(connKey flowexporter.C
metrics.TotalAntreaConnectionsInConnTrackTable.Dec()
return nil
}

// SetExportDone sets DoneExport field of conntrack connection to true given the connection key.
func (cs *ConntrackConnectionStore) SetExportDone(connKey flowexporter.ConnectionKey) error {
cs.mutex.Lock()
defer cs.mutex.Unlock()

if conn, found := cs.connections[connKey]; !found {
return fmt.Errorf("connection with key %v does not exist in connection map", connKey)
} else {
conn.DoneExport = true
return nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn
updatedConns := make([]*flowexporter.Connection, length)
for i := 0; i < len(conns); i++ {
// replace deleted connection with new connection
if conns[i].DoneExport == true {
if conns[i].DyingAndDoneExport == true {
conns[i] = getNewConn()
} else { // update rest of connections
conns[i].OriginalPackets += 5
Expand All @@ -136,9 +136,9 @@ func generateUpdatedConns(conns []*flowexporter.Connection) []*flowexporter.Conn
}
randomNum := getRandomNum(int64(length - testNumOfDeletedConns))
for i := randomNum; i < testNumOfDeletedConns+randomNum; i++ {
// hardcode DoneExport here for testing deletion of connections
// hardcode DyingAndDoneExport here for testing deletion of connections
// not valid for testing update and export of records
updatedConns[i].DoneExport = true
updatedConns[i].DyingAndDoneExport = true
}
return updatedConns
}
Expand All @@ -154,7 +154,7 @@ func getNewConn() *flowexporter.Connection {
StartTime: time.Now().Add(-time.Duration(randomNum1) * time.Second),
StopTime: time.Now(),
IsPresent: true,
DoneExport: false,
DyingAndDoneExport: false,
FlowKey: flowKey,
OriginalPackets: 10,
OriginalBytes: 100,
Expand Down
13 changes: 4 additions & 9 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,10 @@ func (exp *flowExporter) sendFlowRecords() error {
exp.numDataSetsSent = exp.numDataSetsSent + 1

if flowexporter.IsConnectionDying(&record.Conn) {
// If the connection is in dying state or connection is not in conntrack table,
// we will delete the flow records from records map.
klog.V(2).Infof("Deleting the inactive flow records with key: %v from record map", key)
if err := exp.flowRecords.DeleteFlowRecordWithoutLock(key); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function DeleteFlowRecordWithoutLock(key) can be removed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

return err
}
if err := exp.conntrackConnStore.SetExportDone(key); err != nil {
return err
}
// If the connection is in dying state or connection is not in conntrack
// table, we set the DyingAndDoneExport flag to do the deletion later.
record.DyingAndDoneExport = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a future improvement, maybe we should just change ForAllFlowRecordsDo so that updateOrSendFlowRecord uses a flow record pointer instead of a copy of stored flow record. This whole code is executed with the lock any way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Or I was thinking to store the pointer to the record in the record map instead of value.
As this code is being refactored and the flow record map would be mostly removed in PR #2360, I tried to keep the changes to a minimum.

exp.flowRecords.AddFlowRecordWithoutLock(&key, &record)
} else {
exp.flowRecords.ValidateAndUpdateStats(key, record)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/exporter/exporter_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func addConnsAndGetRecords(connStore *connections.ConntrackConnectionStore) *flo
StopTime: time.Now(),
LastExportTime: time.Now().Add(-time.Duration(randomNum1)*time.Millisecond - testActiveFlowTimeout),
IsPresent: true,
DoneExport: false,
DyingAndDoneExport: false,
FlowKey: flowKey,
OriginalPackets: 100,
OriginalBytes: 10,
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) {
connKey := flowexporter.NewConnectionKey(conn)
flowExp.conntrackConnStore.AddOrUpdateConn(conn)
flowExp.flowRecords = flowrecords.NewFlowRecords()
err := flowExp.flowRecords.AddOrUpdateFlowRecord(connKey, conn)
err := flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord)
assert.NoError(t, err)
flowExp.numDataSetsSent = 0

Expand Down Expand Up @@ -498,10 +498,12 @@ func runSendFlowRecordTests(t *testing.T, flowExp *flowExporter, isIPv6 bool) {
assert.Equal(t, getNumOfConnections(flowExp.denyConnStore), 0)
}
if tt.isRecordActive && flowexporter.IsConnectionDying(conn) {
err = flowExp.conntrackConnStore.ForAllConnectionsDo(flowExp.flowRecords.AddOrUpdateFlowRecord)
assert.NoError(t, err)
_, recPresent := flowExp.flowRecords.GetFlowRecordFromMap(&connKey)
assert.Falsef(t, recPresent, "record should not be in the map")
connection, _ := flowExp.conntrackConnStore.GetConnByKey(connKey)
assert.True(t, connection.DoneExport)
assert.True(t, connection.DyingAndDoneExport)
}
})
}
Expand Down
38 changes: 22 additions & 16 deletions pkg/agent/flowexporter/flowrecords/flow_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package flowrecords

import (
"fmt"
"sync"
"time"

Expand All @@ -37,11 +36,12 @@ func NewFlowRecords() *FlowRecords {

// AddOrUpdateFlowRecord adds or updates the flow record in the record map given the connection.
// It makes a copy of the connection object to record, to avoid race conditions between the
// connection store and the flow exporter.
// connection store and the flow exporter. We expect caller to hold the lock for
// the connection store.
func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, conn *flowexporter.Connection) error {
// If the connection is in dying state and the corresponding flow records are already
// exported, then there is no need to add or update the record.
if flowexporter.IsConnectionDying(conn) && conn.DoneExport {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check whether connection is dying here again?

Changed the name of the flag. Removed this extra check here and other places as well.

// If the connection is in dying state and is already exported, then there is
// no need to add or update the record.
if conn.DyingAndDoneExport {
return nil
}

Expand All @@ -63,8 +63,19 @@ func (fr *FlowRecords) AddOrUpdateFlowRecord(key flowexporter.ConnectionKey, con
IsIPv6: isIPv6,
LastExportTime: conn.StartTime,
IsActive: true,
DyingAndDoneExport: false,
}
} else {
// If the connection is in dying state and the corresponding flow records are already
// exported, then update the DyingAndDoneExport flag on the connection.
if record.DyingAndDoneExport {
// It is safe to update the connection as we hold the connection map
// lock when calling this function.
conn.DyingAndDoneExport = true
delete(fr.recordsMap, key)
klog.V(2).InfoS("Deleting the inactive flow records in record map", "FlowKey", key)
return nil
}
// set IsActive flag to true when there are changes either in stats or TCP state
if (conn.OriginalPackets > record.PrevPackets) || (conn.ReversePackets > record.PrevReversePackets) || record.Conn.TCPState != conn.TCPState {
record.IsActive = true
Expand All @@ -83,6 +94,12 @@ func (fr *FlowRecords) AddFlowRecordToMap(connKey *flowexporter.ConnectionKey, r
fr.recordsMap[*connKey] = *record
}

// AddFlowRecordWithoutLock adds the flow record from record map given connection key.
// Caller is expected to grab the lock the record map.
func (fr *FlowRecords) AddFlowRecordWithoutLock(connKey *flowexporter.ConnectionKey, record *flowexporter.FlowRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can address naming consistency with the function above (AddFlowRecordToMap) in a separate PR, or implement the approach I suggested above if it is acceptable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this can be done. Hope you are ok with the explanation in the other comment.

fr.recordsMap[*connKey] = *record
}

// GetFlowRecordFromMap gets the flow record from record map given connection key.
// This is used only for unit tests.
func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey) (*flowexporter.FlowRecord, bool) {
Expand All @@ -92,17 +109,6 @@ func (fr *FlowRecords) GetFlowRecordFromMap(connKey *flowexporter.ConnectionKey)
return &record, exists
}

// DeleteFlowRecordWithoutLock deletes the record from the record map given
// the connection key without grabbing the lock. Caller is expected to grab lock.
func (fr *FlowRecords) DeleteFlowRecordWithoutLock(connKey flowexporter.ConnectionKey) error {
_, exists := fr.recordsMap[connKey]
if !exists {
return fmt.Errorf("flow record with key %v doesn't exist in map", connKey)
}
delete(fr.recordsMap, connKey)
return nil
}

// ValidateAndUpdateStats validates and updates the flow record given the connection
// key. Caller is expected to grab lock.
func (fr *FlowRecords) ValidateAndUpdateStats(connKey flowexporter.ConnectionKey, record flowexporter.FlowRecord) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type Connection struct {
StopTime time.Time
// IsPresent flag helps in cleaning up connections when they are not in conntrack table anymore.
IsPresent bool
// DoneExport marks whether the related flow records are already exported or not so that we can
// DyingAndDoneExport marks whether the related flow records are already exported or not so that we can
// safely delete the connection from the connection map.
DoneExport bool
DyingAndDoneExport bool
Zone uint16
Mark uint32
StatusFlag uint32
Expand Down Expand Up @@ -88,5 +88,6 @@ type FlowRecord struct {
PrevReverseBytes uint64
IsIPv6 bool
LastExportTime time.Time
DyingAndDoneExport bool
IsActive bool
}