Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MACsec/ MKA config and stats support along with tests in snappi-ixnetwork #591

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5b8dacc
Add MAcsec support and sample scripts
sasubrata Jan 31, 2025
d1c48be
Add support for picking encryption only engine traffic source IP and …
sasubrata Feb 1, 2025
4898c00
Refactor code
sasubrata Feb 2, 2025
d510845
Add support for Tx/ Rx SAK pool with one SAK as of now. Also added fe…
sasubrata Feb 2, 2025
f25614a
Add macsec metric processing.
dipendughosh Feb 5, 2025
7c51568
Add a very simple MKA test case to start MKA.
sasubrata Feb 6, 2025
19d0b0f
Add MKA support for OTG to IxNetwork. Also update test script
sasubrata Feb 8, 2025
be3d8da
Support incrementing PN
sasubrata Feb 9, 2025
6642ddf
Keep incrementing PN and PN count option at one place as SDM has only…
sasubrata Feb 10, 2025
e0eacac
Set end_station and include_sci TxSC setting for both MKA/ static key.
sasubrata Feb 10, 2025
4037ea0
Add rekey mode support
sasubrata Feb 10, 2025
deade11
Add key source properties
sasubrata Feb 10, 2025
e423255
Merge branch 'mka' into macsec
sasubrata Feb 11, 2025
47b4995
Add MKA metric support
sasubrata Feb 11, 2025
b17fa06
Update code and script according to changed model
sasubrata Feb 13, 2025
5e059a7
Remove trailing space
sasubrata Feb 13, 2025
abc8045
Add changes after moedel upgrade
sasubrata Feb 14, 2025
caef16b
Merge branch 'main' into macsec
sasubrata Feb 14, 2025
4bb5766
Add missing tests
sasubrata Feb 14, 2025
8933d41
Correct test metric device names and restore timeout
sasubrata Feb 14, 2025
0427ffb
Comment TODO items. Also do not create Rx channels in case of statele…
sasubrata Feb 17, 2025
4001269
Support multiple PSKs in MKA
sasubrata Feb 18, 2025
b370432
Add multiple PSKs in MACsec with MKA test
sasubrata Feb 18, 2025
1bf00d4
Change MKA script after model update
sasubrata Feb 18, 2025
858ab6f
Add rekey support in static MACsec
sasubrata Feb 19, 2025
a693c67
Support for PSK key duration and overlapping key. Also added supporte…
sasubrata Feb 19, 2025
12900fb
Support waiting on MACsec session state and then fetching stats.
sasubrata Feb 20, 2025
031718e
Add SSCI and salt processing in case of XPN ciphers.
sasubrata Feb 20, 2025
39f545f
Rework tests after model review/ rework
sasubrata Feb 21, 2025
8b4c761
Modify test to do incrementing PN
sasubrata Feb 25, 2025
7506396
Merge branch 'main' into macsec
sasubrata Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions snappi_ixnetwork/device/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def _configure_ipv4(self, ixn_eth, ethernet):
ipv4_addresses = ethernet.get("ipv4_addresses")
if ipv4_addresses is None:
return
if self._ngpf.is_ip_allowed == False:
self.logger.debug("Skip IPv4 configuration")
return

eth_name = ethernet.name
if eth_name not in self._ngpf.ether_v4gateway_map:
Expand All @@ -85,6 +88,9 @@ def _configure_ipv6(self, ixn_eth, ethernet):
ipv6_addresses = ethernet.get("ipv6_addresses")
if ipv6_addresses is None:
return
if self._ngpf.is_ip_allowed == False:
self.logger.debug("Skip IPv6 configuration")
return

eth_name = ethernet.name
if eth_name not in self._ngpf.ether_v6gateway_map:
Expand Down
333 changes: 333 additions & 0 deletions snappi_ixnetwork/device/macsec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
from snappi_ixnetwork.device.base import Base
from snappi_ixnetwork.logger import get_ixnet_logger


class Macsec(Base):
_CIPHER = {
"cipher_suite": {
"ixn_attr": "cipherSuite",
"enum_map": {"gcm_aes_128": "aes128", "gcm_aes_256": "aes256", "gcm_aes_xpn_128": "aesxpn128", "gcm_aes_xpn_256": "aesxpn256"},
},
"confidentiality_offset": {
"ixn_attr": "confidentialityOffset",
"enum_map": {"zero": 0, "thirty": 30, "fifty": 50},
},
}

_TXSC = {
"end_station": "endStation",
"include_sci": "includeSci",
}

_TXSC_STATIC_KEY = {
"system_id": "systemId",
"port_id": "portId",
"confidentiality": "enableConfidentiality",
}

_RXSC_STATIC_KEY = {
"dut_system_id": "dutSciMac",
"dut_sci_port_id": "dutSciPortId",
"dut_msb_xpn": "dutMsbOfXpn",
}

def __init__(self, ngpf):
super(Macsec, self).__init__()
self._ngpf = ngpf
self.is_dynamic_key = False
self.logger = get_ixnet_logger(__name__)

def config(self, device):
self.logger.debug("Configuring MACsec")
macsec = device.get("macsec")
if macsec is None:
return
self.is_dynamic_key = self._is_dynamic_key(device)
self._config_ethernet_interfaces(device)

def _is_valid(self, ethernet_name, secy):
is_valid = True
if secy is None:
is_valid = False
else:
self.logger.debug("Validating SecY of etherner interface %s" % (ethernet_name))
static_key = secy.get("static_key")
tx = secy.get("tx")
rx = secy.get("rx")
txscs = rxscs = tx_rekey_mode = replay_protection = replay_window = None
if not tx is None:
txscs = tx.scs
if not tx.static_key is None:
tx_rekey_mode = tx.static_key.rekey_mode
if not rx is None:
replay_protection = rx.replay_protection
replay_window = rx.replay_window
if not rx.static_key is None:
rxscs = rx.static_key.scs
crypto_engine = secy.get("crypto_engine")
if not self.is_dynamic_key:
if txscs is None:
self._ngpf.api.add_error(
"TxSC not added when key generation is static".format(
name=ethernet_name
)
)
is_valid = False
elif len(txscs) > 1:
self._ngpf.api.add_error(
"More than one TxSC added when key generation is static".format(
name=ethernet_name
)
)
is_valid = False
if crypto_engine is None:
self._ngpf.api.add_error(
"Replay protectin cannot be true when engine type is stateless_encryption_only".format(
name=ethernet_name
)
)
is_valid = False
elif crypto_engine.choice == "stateless_encryption_only":
if not tx_rekey_mode is None:
if tx_rekey_mode.choice == "pn_based":
self._ngpf.api.add_error(
"PN based rekey not supported when engine type is stateless_encryption_only".format(
name=ethernet_name
)
)
is_valid = False
if not replay_protection is None:
if replay_protection == True:
self._ngpf.api.add_error(
"Replay protectin cannot be true when engine type is stateless_encryption_only".format(
name=ethernet_name
)
)
is_valid = False
if not replay_protection is None:
if replay_protection == True:
self._ngpf.api.add_error(
"Replay protectin cannot be true when engine type is stateless_encryption_only".format(
name=ethernet_name
)
)
is_valid = False
else:
self._ngpf.api.add_error(
"SecY crypto_engine is not set to stateless_encryption_only. Only tateless_encryption_only engine type is supported as of now.".format(
name=ethernet_name
)
)
is_valid = False
if is_valid == True:
self.logger.debug("MACsec validation success")
else:
self.logger.debug("MACsec validation failure")
return is_valid

def _is_ip_allowed(self, device):
self.logger.debug("Checking if IPv4/ v6 is allowed")
is_allowed = True
macsec = device.get("macsec")
if macsec is None:
return is_allowed
ethernet_interfaces = macsec.get("ethernet_interfaces")
if ethernet_interfaces is None:
return is_allowed
for ethernet_interface in ethernet_interfaces:
secy = ethernet_interface.get("secy")
if secy.crypto_engine.choice == "stateless_encryption_only":
is_allowed = False
break
return is_allowed

def _is_dynamic_key(self, device):
self.logger.debug("Checking if MKA is confgured")
is_mka = False
mka = device.get("mka")
if not mka is None:
is_mka = True
return is_mka

def _config_ethernet_interfaces(self, device):
self.logger.debug("Configuring MACsec interfaces")
macsec = device.get("macsec")
ethernet_interfaces = macsec.get("ethernet_interfaces")
if ethernet_interfaces is None:
return
for ethernet_interface in ethernet_interfaces:
ethernet_name = ethernet_interface.get("eth_name")
self._ngpf.working_dg = self._ngpf.api.ixn_objects.get_working_dg(
ethernet_name
)
if not self._is_valid(ethernet_name, ethernet_interface.get("secy")):
continue
self._config_secy(device, ethernet_interface)

def _config_secy(self, device, ethernet_interface):
self.logger.debug("Configuring SecY")
secy = ethernet_interface.get("secy")
if secy is None:
return
ethernet_name = ethernet_interface.get("eth_name")
ixn_ethernet = self._ngpf.api.ixn_objects.get_object(ethernet_name)
if secy.crypto_engine.choice == "stateless_encryption_only":
self.logger.debug("Configuring SecY: stateless_encryption_only")
ixn_staticmacsec = self.create_node_elemet(
ixn_ethernet, "staticMacsec", secy.get("name")
)
self._ngpf.set_device_info(secy, ixn_staticmacsec)
if not self.is_dynamic_key:
self._config_cipher(secy, ixn_staticmacsec)
self._config_secy_engine_encryption_only(device, ethernet_interface, ixn_staticmacsec)
self._config_tx(secy, ixn_staticmacsec)
if not secy.crypto_engine.choice == "stateless_encryption_only":
self._config_rx(secy, ixn_staticmacsec)

def _config_cipher(self, secy, ixn_staticmacsec):
self.logger.debug("Configuring cipher from static key")
static_key = secy.get("static_key")
self.configure_multivalues(static_key, ixn_staticmacsec, Macsec._CIPHER)

def _config_tx(self, secy, ixn_staticmacsec):
self.logger.debug("Configuring Tx properties")
tx = secy.get("tx")
if tx is None:
return
self._config_txsc(secy, ixn_staticmacsec)
if not self.is_dynamic_key:
self._config_rekey_mode(tx.static_key.rekey_mode, ixn_staticmacsec)

def _config_rx(self, secy, ixn_staticmacsec):
self.logger.debug("Configuring Rx properties")
rx = secy.get("rx")
if rx is None:
return
#TODO: replay protection, replay window.
self.logger.debug("replay_protection %s replay_window %s" % (rx.replay_protection, rx.replay_window))
if not self.is_dynamic_key:
self._config_rxsc(secy, ixn_staticmacsec)

def _config_txsc(self, secy, ixn_staticmacsec):
self.logger.debug("Configuring TxSC")
tx = secy.get("tx")
txscs = tx.get("scs")
txsc = txscs[0]
self.logger.debug("end_station %s include_sci %s" % (txsc.end_station, txsc.include_sci))
self.configure_multivalues(txsc, ixn_staticmacsec, Macsec._TXSC)
if not self.is_dynamic_key:
self.configure_multivalues(txsc.static_key, ixn_staticmacsec, Macsec._TXSC_STATIC_KEY)
tx_saks = txsc.static_key.saks
ixn_staticmacsec["txSakPoolSize"] = len(tx_saks)
saks = []
sscis = []
salts = []
for tx_sak in tx_saks:
saks.append(tx_sak.sak)
sscis.append(tx_sak.ssci)
salts.append(tx_sak.salt)
ixn_tx_sak_pool = self.create_node_elemet(
ixn_staticmacsec, "txSakPool", name=None
)
static_key = secy.get("static_key")
cipher_suite = static_key.cipher_suite
if cipher_suite == "gcm_aes_128" or cipher_suite == "gcm_aes_xpn_128":
ixn_tx_sak_pool["txSak128"] = self.multivalue(saks)
elif cipher_suite == "gcm_aes_256" or cipher_suite == "gcm_aes_xpn_256":
ixn_tx_sak_pool["txSak256"] = self.multivalue(saks)
if cipher_suite == "gcm_aes_xpn_128" or cipher_suite == "gcm_aes_xpn_256":
ixn_tx_sak_pool["txSsci"] = self.multivalue(sscis)
ixn_tx_sak_pool["txSalt"] = self.multivalue(salts)

def _config_rxsc(self, secy, ixn_staticmacsec):
self.logger.debug("Configuring RxSC")
rx = secy.get("rx")
rx_sk = rx.static_key
rxscs = rx_sk.get("scs")
rxsc = rxscs[0]
self.configure_multivalues(rxsc, ixn_staticmacsec, Macsec._RXSC_STATIC_KEY)
rx_saks = rxsc.saks
ixn_staticmacsec["rxSakPoolSize"] = len(rx_saks)
saks = []
sscis = []
salts = []
for rx_sak in rx_saks:
saks.append(rx_sak.sak)
sscis.append(rx_sak.ssci)
salts.append(rx_sak.salt)
ixn_rx_sak_pool = self.create_node_elemet(
ixn_staticmacsec, "rxSakPool", name=None
)
static_key = secy.get("static_key")
cipher_suite = static_key.cipher_suite
if cipher_suite == "gcm_aes_128" or cipher_suite == "gcm_aes_xpn_128":
ixn_rx_sak_pool["rxSak128"] = self.multivalue(saks)
elif cipher_suite == "gcm_aes_256" or cipher_suite == "gcm_aes_xpn_256":
ixn_rx_sak_pool["rxSak256"] = self.multivalue(saks)
if cipher_suite == "gcm_aes_xpn_128" or cipher_suite == "gcm_aes_xpn_256":
ixn_rx_sak_pool["rxSsci"] = self.multivalue(sscis)
ixn_rx_sak_pool["rxSalt"] = self.multivalue(salts)

def _config_rekey_mode(self, rekey_mode, ixn_staticmacsec):
self.logger.debug("Configuring rekey settings")
if rekey_mode.choice == "dont_rekey":
#ixn_staticmacsec["rekeyMode"] = "timerBased"
ixn_staticmacsec["rekeyBehaviour"] = "dontRekey"
elif rekey_mode.choice == "timer_based":
timer_based = rekey_mode.timer_based
#ixn_staticmacsec["rekeyMode"] = "timerBased"
if timer_based.choice == "fixed_count":
ixn_staticmacsec["rekeyBehaviour"] = "rekeyFixedCount"
ixn_staticmacsec["periodicRekeyAttempts"] = timer_based.fixed_count
ixn_staticmacsec["periodicRekeyInterval"] = timer_based.interval
elif timer_based.choice == "continuous":
ixn_staticmacsec["rekeyBehaviour"] = "rekeyContinuous"
ixn_staticmacsec["periodicRekeyInterval"] = timer_based.interval
#elif rekey_mode.choice == "pn_based":
# ixn_staticmacsec["rekeyMode"] = "pnBased"
# ixn_staticmacsec["rekeyBehaviour"] = "rekeyContinuous"

def _config_secy_engine_encryption_only(self, device, ethernet_interface, ixn_staticmacsec):
secy = ethernet_interface.get("secy")
self._config_secy_engine_encryption_only_tx_pn(secy, ixn_staticmacsec)
self._config_secy_engine_encryption_only_traffic(device, ethernet_interface, ixn_staticmacsec)

def _config_secy_engine_encryption_only_traffic(self, device, ethernet_interface, ixn_staticmacsec):
ethernet_name = ethernet_interface.get("eth_name")
secy = ethernet_interface.get("secy")
self.logger.debug("Configuring stateless encryption traffic from ethernet %s secy %s" % (ethernet_name, secy.get("name")))
ethernets = device.get("ethernets")
for ethernet in ethernets:
if ethernet.get("name") == ethernet_name:
ipv4_addresses = ethernet.get("ipv4_addresses")
if ipv4_addresses is None:
raise Exception("IPv4 not configured on ethernet %s" % ethernet_name)
elif len(ipv4_addresses) > 1:
raise Exception("More than one IPv4 address configured on ethernet %s" % ethernet_name)
ipv4_address = ipv4_addresses[0].address
ipv4_gateway_mac = ipv4_addresses[0].gateway_mac
if ipv4_gateway_mac is None:
raise Exception("IPv4 gateway mac not configured for IPv4 address %s on ethernet %s" % (ethernet_name, ipv4_address))
elif not ipv4_gateway_mac.choice == "value" or ipv4_gateway_mac.value is None:
raise Exception("IPv4 gateway static mac not configured for IPv4 address %s on ethernet %s" % (ethernet_name, ipv4_address))
ipv4_gateway_static_mac = ipv4_addresses[0].gateway_mac.value
ixn_staticmacsec["sourceIp"] = self.multivalue(ipv4_address)
ixn_staticmacsec["dutMac"] = self.multivalue(ipv4_gateway_static_mac)
break

def _config_secy_engine_encryption_only_tx_pn(self, secy, ixn_staticmacsec):
engine_tx_pn = secy.crypto_engine.stateless_encryption_only.tx_pn
if engine_tx_pn is None:
return
self.logger.debug("Configuring Tx PN of stateless encryption only engine secy %s" % secy.get("name"))
if engine_tx_pn.choice == "fixed_pn":
ixn_staticmacsec["incrementingPn"] = False
engine_tx_pn_fixed = engine_tx_pn.fixed
ixn_staticmacsec["fixedPn"] = engine_tx_pn_fixed.pn
ixn_staticmacsec["mvFixedXpn"] = self.multivalue(engine_tx_pn_fixed.xpn)
elif engine_tx_pn.choice == "incrementing_pn":
ixn_staticmacsec["incrementingPn"] = True
engine_tx_pn_incr = engine_tx_pn.incrementing
ixn_staticmacsec["packetCountPn"] = engine_tx_pn_incr.count
ixn_staticmacsec["firstPn"] = engine_tx_pn_incr.first_pn
ixn_staticmacsec["mvFirstXpn"] = self.multivalue(engine_tx_pn_incr.first_xpn)
Loading
Loading