Skip to content

Commit

Permalink
Improve TLV parser (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery authored May 30, 2023
1 parent 6c4be23 commit 530502c
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 39 deletions.
93 changes: 75 additions & 18 deletions python_otbr_api/tlv_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Parse datasets TLV encoded as specified by Thread."""
from __future__ import annotations

from dataclasses import dataclass, field
from enum import IntEnum
import struct

Expand Down Expand Up @@ -54,42 +55,98 @@ class MeshcopTLVType(IntEnum):
JOINERADVERTISEMENT = 241


def _encode_item(tag: MeshcopTLVType, data: str) -> bytes:
"""Encode a dataset item to TLV format."""
if tag == MeshcopTLVType.NETWORKNAME:
data_bytes = bytes(data, "utf-8")
else:
data_bytes = bytes.fromhex(data)
@dataclass
class MeshcopTLVItem:
"""Base class for TLV items."""

tag: int
data: bytes

def __str__(self) -> str:
"""Return a string representation."""
return self.data.hex()


@dataclass
class Channel(MeshcopTLVItem):
"""Channel."""

channel: int = field(init=False)

def __post_init__(self) -> None:
"""Decode the channel."""
self.channel = int.from_bytes(self.data, "big")
if not self.channel:
raise TLVError(f"invalid channel '{self.channel}'")


@dataclass
class NetworkName(MeshcopTLVItem):
"""Network name."""

name: str = field(init=False)

data_len = len(data_bytes)
return struct.pack(f"!BB{data_len}s", tag, data_len, data_bytes)
def __post_init__(self) -> None:
"""Decode the name."""
try:
self.name = self.data.decode()
except UnicodeDecodeError as err:
raise TLVError(f"invalid network name '{self.data.hex()}'") from err

def __str__(self) -> str:
return self.name


@dataclass
class Timestamp(MeshcopTLVItem):
"""Timestamp."""

authoritative: bool = field(init=False)
seconds: int = field(init=False)
ticks: int = field(init=False)

def __post_init__(self) -> None:
"""Decode the timestamp."""
# The timestamps are packed in 8 bytes:
# [seconds 48 bits][ticks 15 bits][authoritative flag 1 bit]
unpacked: int = struct.unpack("!Q", self.data)[0]
self.authoritative = bool(unpacked & 1)
self.seconds = unpacked >> 16
self.ticks = (unpacked >> 1) & 0x7FF

def encode_tlv(items: dict[MeshcopTLVType, str]) -> str:

def _encode_item(item: MeshcopTLVItem) -> bytes:
"""Encode a dataset item to TLV format."""
data_len = len(item.data)
return struct.pack(f"!BB{data_len}s", item.tag, data_len, item.data)


def encode_tlv(items: dict[MeshcopTLVType, MeshcopTLVItem]) -> str:
"""Encode a TLV encoded dataset to a hex string.
Raises if the TLV is invalid.
"""
result = b""

for item_type, item in items.items():
result += _encode_item(item_type, item)
for item in items.values():
result += _encode_item(item)

return result.hex()


def _parse_item(tag: MeshcopTLVType, data: bytes) -> str:
def _parse_item(tag: MeshcopTLVType, data: bytes) -> MeshcopTLVItem:
"""Parse a TLV encoded dataset item."""
if tag == MeshcopTLVType.ACTIVETIMESTAMP:
return Timestamp(tag, data)
if tag == MeshcopTLVType.CHANNEL:
return Channel(tag, data)
if tag == MeshcopTLVType.NETWORKNAME:
try:
return data.decode()
except UnicodeDecodeError as err:
raise TLVError(f"invalid network name '{data.hex()}'") from err
return NetworkName(tag, data)

return data.hex()
return MeshcopTLVItem(tag, data)


def parse_tlv(data: str) -> dict[MeshcopTLVType, str]:
def parse_tlv(data: str) -> dict[MeshcopTLVType, MeshcopTLVItem]:
"""Parse a TLV encoded dataset.
Raises if the TLV is invalid.
Expand Down
91 changes: 70 additions & 21 deletions tests/test_tlv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,51 @@

import pytest

from python_otbr_api.tlv_parser import MeshcopTLVType, TLVError, encode_tlv, parse_tlv
from python_otbr_api.tlv_parser import (
Timestamp,
Channel,
MeshcopTLVItem,
MeshcopTLVType,
NetworkName,
TLVError,
encode_tlv,
parse_tlv,
)


def test_encode_tlv() -> None:
"""Test the TLV parser."""
dataset = {
MeshcopTLVType.ACTIVETIMESTAMP: "0000000000010000",
MeshcopTLVType.CHANNEL: "00000f",
MeshcopTLVType.CHANNELMASK: "0004001fffe0",
MeshcopTLVType.EXTPANID: "1111111122222222",
MeshcopTLVType.MESHLOCALPREFIX: "fdad70bfe5aa15dd",
MeshcopTLVType.NETWORKKEY: "00112233445566778899aabbccddeeff",
MeshcopTLVType.NETWORKNAME: "OpenThreadDemo",
MeshcopTLVType.PANID: "1234",
MeshcopTLVType.PSKC: "445f2b5ca6f2a93a55ce570a70efeecb",
MeshcopTLVType.SECURITYPOLICY: "02a0f7f8",
MeshcopTLVType.ACTIVETIMESTAMP: MeshcopTLVItem(
MeshcopTLVType.ACTIVETIMESTAMP, bytes.fromhex("0000000000010000")
),
MeshcopTLVType.CHANNEL: MeshcopTLVItem(
MeshcopTLVType.CHANNEL, bytes.fromhex("00000f")
),
MeshcopTLVType.CHANNELMASK: MeshcopTLVItem(
MeshcopTLVType.CHANNELMASK, bytes.fromhex("0004001fffe0")
),
MeshcopTLVType.EXTPANID: MeshcopTLVItem(
MeshcopTLVType.EXTPANID, bytes.fromhex("1111111122222222")
),
MeshcopTLVType.MESHLOCALPREFIX: MeshcopTLVItem(
MeshcopTLVType.MESHLOCALPREFIX, bytes.fromhex("fdad70bfe5aa15dd")
),
MeshcopTLVType.NETWORKKEY: MeshcopTLVItem(
MeshcopTLVType.NETWORKKEY, bytes.fromhex("00112233445566778899aabbccddeeff")
),
MeshcopTLVType.NETWORKNAME: NetworkName(
MeshcopTLVType.NETWORKNAME, "OpenThreadDemo".encode()
),
MeshcopTLVType.PANID: MeshcopTLVItem(
MeshcopTLVType.PANID, bytes.fromhex("1234")
),
MeshcopTLVType.PSKC: MeshcopTLVItem(
MeshcopTLVType.PSKC, bytes.fromhex("445f2b5ca6f2a93a55ce570a70efeecb")
),
MeshcopTLVType.SECURITYPOLICY: MeshcopTLVItem(
MeshcopTLVType.SECURITYPOLICY, bytes.fromhex("02a0f7f8")
),
}
dataset_tlv = encode_tlv(dataset)
assert (
Expand All @@ -39,16 +68,36 @@ def test_parse_tlv() -> None:
)
dataset = parse_tlv(dataset_tlv)
assert dataset == {
MeshcopTLVType.CHANNEL: "00000f",
MeshcopTLVType.PANID: "1234",
MeshcopTLVType.EXTPANID: "1111111122222222",
MeshcopTLVType.NETWORKNAME: "OpenThreadDemo",
MeshcopTLVType.PSKC: "445f2b5ca6f2a93a55ce570a70efeecb",
MeshcopTLVType.NETWORKKEY: "00112233445566778899aabbccddeeff",
MeshcopTLVType.MESHLOCALPREFIX: "fdad70bfe5aa15dd",
MeshcopTLVType.SECURITYPOLICY: "02a0f7f8",
MeshcopTLVType.ACTIVETIMESTAMP: "0000000000010000",
MeshcopTLVType.CHANNELMASK: "0004001fffe0",
MeshcopTLVType.CHANNEL: Channel(
MeshcopTLVType.CHANNEL, bytes.fromhex("00000f")
),
MeshcopTLVType.PANID: MeshcopTLVItem(
MeshcopTLVType.PANID, bytes.fromhex("1234")
),
MeshcopTLVType.EXTPANID: MeshcopTLVItem(
MeshcopTLVType.EXTPANID, bytes.fromhex("1111111122222222")
),
MeshcopTLVType.NETWORKNAME: NetworkName(
MeshcopTLVType.NETWORKNAME, "OpenThreadDemo".encode()
),
MeshcopTLVType.PSKC: MeshcopTLVItem(
MeshcopTLVType.PSKC, bytes.fromhex("445f2b5ca6f2a93a55ce570a70efeecb")
),
MeshcopTLVType.NETWORKKEY: MeshcopTLVItem(
MeshcopTLVType.NETWORKKEY, bytes.fromhex("00112233445566778899aabbccddeeff")
),
MeshcopTLVType.MESHLOCALPREFIX: MeshcopTLVItem(
MeshcopTLVType.MESHLOCALPREFIX, bytes.fromhex("fdad70bfe5aa15dd")
),
MeshcopTLVType.SECURITYPOLICY: MeshcopTLVItem(
MeshcopTLVType.SECURITYPOLICY, bytes.fromhex("02a0f7f8")
),
MeshcopTLVType.ACTIVETIMESTAMP: Timestamp(
MeshcopTLVType.ACTIVETIMESTAMP, bytes.fromhex("0000000000010000")
),
MeshcopTLVType.CHANNELMASK: MeshcopTLVItem(
MeshcopTLVType.CHANNELMASK, bytes.fromhex("0004001fffe0")
),
}


Expand Down

0 comments on commit 530502c

Please sign in to comment.