-
Notifications
You must be signed in to change notification settings - Fork 386
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create and maintain connection store for Antrea flows in conntrack
Created connection store that stores the flows by polling conntrack module every 5s. cxnStore updates the flow if it is already there or add a new flow based on 5 tuple map. In addition, we add local pod information by querying interfaceStore. We added ipCache in interfaceStore for this purpose. Unit tests and testing on local setup is done with custom apps. Issue: #712
- Loading branch information
1 parent
b4a9688
commit c17da5e
Showing
19 changed files
with
819 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package connections | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
|
||
"k8s.io/klog" | ||
|
||
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter" | ||
"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore" | ||
"github.com/vmware-tanzu/antrea/pkg/agent/openflow" | ||
) | ||
|
||
var _ ConnectionStore = new(connectionStore) | ||
|
||
type ConnectionStore interface { | ||
Run(stopCh <-chan struct{}) | ||
} | ||
|
||
type connectionStore struct { | ||
connections map[flowexporter.ConnectionKey]flowexporter.Connection // Add 5-tuple as string array | ||
connTrackPoller ConnTrackPoller | ||
ifaceStore interfacestore.InterfaceStore | ||
mutex sync.Mutex | ||
} | ||
|
||
func NewConnectionStore(connTrack ConnTrackPoller, ifaceStore interfacestore.InterfaceStore) *connectionStore { | ||
return &connectionStore{ | ||
connections: make(map[flowexporter.ConnectionKey]flowexporter.Connection), | ||
connTrackPoller: connTrack, | ||
ifaceStore: ifaceStore, | ||
} | ||
} | ||
|
||
// Run polls the conntrack module periodically to get connections. These connections are used | ||
// to build connection store. | ||
func (cs *connectionStore) Run(stopCh <-chan struct{}) { | ||
klog.Infof("Starting conntrack polling") | ||
for { | ||
select { | ||
case <-stopCh: | ||
break | ||
case <-time.After(flowexporter.PollInterval): | ||
_, err := cs.poll() | ||
if err != nil { | ||
klog.Errorf("Error during conntrack poll cycle: %v", err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// addOrUpdateConn updates the connection if it is already present, i.e., update timestamp, counters etc., | ||
// or add a new Connection by 5-tuple of the flow along with local Pod and PodNameSpace. | ||
func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) { | ||
connKey := flowexporter.NewConnectionKey(conn) | ||
|
||
existingConn, exists := cs.getConnByKey(connKey) | ||
|
||
cs.mutex.Lock() | ||
defer cs.mutex.Unlock() | ||
if exists { | ||
// Update the necessary fields that are used in generating flow records. | ||
// Can same 5-tuple flow get deleted and added to conntrack table? If so use ID. | ||
existingConn.StopTime = conn.StopTime | ||
existingConn.OriginalBytes = conn.OriginalBytes | ||
existingConn.OriginalPackets = conn.OriginalPackets | ||
existingConn.ReverseBytes = conn.ReverseBytes | ||
existingConn.ReversePackets = conn.ReversePackets | ||
// Reassign the flow to update the map | ||
cs.connections[connKey] = *existingConn | ||
klog.V(4).Infof("Antrea flow updated: %v", existingConn) | ||
} else { | ||
var srcFound, dstFound bool | ||
sIface, srcFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleOrig.SourceAddress.String()) | ||
dIface, dstFound := cs.ifaceStore.GetInterfaceByIP(conn.TupleReply.SourceAddress.String()) | ||
if !srcFound && !dstFound { | ||
klog.Warningf("Cannot map any of the IP %s or %s to a local Pod", conn.TupleOrig.SourceAddress.String(), conn.TupleReply.SourceAddress.String()) | ||
} | ||
if srcFound && sIface.Type == interfacestore.ContainerInterface { | ||
conn.SourcePodName = sIface.ContainerInterfaceConfig.PodName | ||
conn.SourcePodNamespace = sIface.ContainerInterfaceConfig.PodNamespace | ||
} | ||
if dstFound && dIface.Type == interfacestore.ContainerInterface { | ||
conn.DestinationPodName = dIface.ContainerInterfaceConfig.PodName | ||
conn.DestinationPodNamespace = dIface.ContainerInterfaceConfig.PodNamespace | ||
} | ||
klog.V(2).Infof("New Antrea flow added: %v", conn) | ||
// Add new antrea connection to connection store | ||
cs.connections[connKey] = *conn | ||
} | ||
} | ||
|
||
func (cs *connectionStore) getConnByKey(flowTuple flowexporter.ConnectionKey) (*flowexporter.Connection, bool) { | ||
cs.mutex.Lock() | ||
defer cs.mutex.Unlock() | ||
conn, found := cs.connections[flowTuple] | ||
return &conn, found | ||
} | ||
|
||
// poll returns number of filtered connections after poll cycle | ||
func (cs *connectionStore) poll() (int, error) { | ||
klog.V(2).Infof("Polling conntrack") | ||
|
||
filteredConns, err := cs.connTrackPoller.DumpFlows(openflow.CtZone) | ||
if err != nil { | ||
klog.Errorf("Error when dumping flows from conntrack: %v", err) | ||
return 0, err | ||
} | ||
// Update only the Connection store. IPFIX records are generated based on Connection store. | ||
for _, conn := range filteredConns { | ||
cs.addOrUpdateConn(conn) | ||
} | ||
klog.V(2).Infof("Conntrack polling successful") | ||
|
||
return len(filteredConns), nil | ||
} |
Oops, something went wrong.