Merge pull request #14441 from miri64/riotctrl_shell/feat/netif
riotctrl_shell: provide netif interactions and parsers
This commit is contained in:
commit
db5070c772
382
dist/pythonlibs/riotctrl_shell/netif.py
vendored
Normal file
382
dist/pythonlibs/riotctrl_shell/netif.py
vendored
Normal file
@ -0,0 +1,382 @@
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
"""
|
||||
Shell interactions related to network interfaces
|
||||
|
||||
Defines shell command interactions related to network interfaces
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from riotctrl.shell import ShellInteraction, ShellInteractionParser
|
||||
|
||||
|
||||
# ==== Parsers ====
|
||||
|
||||
class IfconfigListParser(ShellInteractionParser):
|
||||
def __init__(self):
|
||||
self.iface_c = re.compile(r"Iface\s+(?P<name>\S+)\s")
|
||||
# option values are repetitions of at least one non white space
|
||||
# separated by at most one whitespace
|
||||
# e.g. for MCS: 1 (BPSK, rate 1/2, 2x frequency repetition) MTU :1280
|
||||
# "1 (BPSK, rate 1/2, 2x frequency repetition)" belongs to the option
|
||||
# value, "MTU" does not
|
||||
self.option_c = re.compile(r"^(?P<option>[^:]+):\s?"
|
||||
r"(?P<value>\S+(\s\S+)*)$")
|
||||
# options are evaluated before flags, so all options that don't contain
|
||||
# colons are flags
|
||||
self.flag_c = re.compile(r"^(?P<flag>[^:]+)$")
|
||||
self.ipv6_c = re.compile(r"inet6 (?P<type>addr|group): "
|
||||
r"(?P<addr>[0-9a-f:]+)(\s+"
|
||||
r"scope:\s+(?P<scope>\S+)"
|
||||
r"(?P<anycast>\s+\[anycast\])?\s+"
|
||||
r"(?P<state>\S+))?$")
|
||||
self.bl_header_c = re.compile(r"(?P<mode>White|Black)-listed "
|
||||
r"link layer addresses:")
|
||||
self.bl_c = re.compile(r"\d+: (?P<addr>[0-9a-f]{2}(:[0-9a-f]{2})*)$")
|
||||
|
||||
def parse(self, cmd_output):
|
||||
"""
|
||||
Parses output of Ifconfig::ifconfig_list()
|
||||
|
||||
See tests (tests/test_netif_list_parse.py) for further possible items:
|
||||
|
||||
>>> parser = IfconfigListParser()
|
||||
>>> res = parser.parse("Iface WP_01 HWAddr: ab:cd 6LO PROMISC\\n"
|
||||
... "Iface ET_01 HWaddr: 01:23:45:67:89:AB\\n")
|
||||
>>> len(res)
|
||||
2
|
||||
>>> res["WP_01"]["hwaddr"]
|
||||
'ab:cd'
|
||||
>>> sorted(res["WP_01"]["flags"])
|
||||
['6LO', 'PROMISC']
|
||||
>>> res["ET_01"]["hwaddr"]
|
||||
'01:23:45:67:89:AB'
|
||||
"""
|
||||
netifs = None
|
||||
current = None
|
||||
parse_ipv6 = False
|
||||
parse_blacklist = False
|
||||
stats_parser = IfconfigStatsParser()
|
||||
offset = 0
|
||||
for line in cmd_output.splitlines():
|
||||
m = self.iface_c.search(line)
|
||||
if m is not None:
|
||||
name = m.group("name")
|
||||
if netifs is None:
|
||||
netifs = {}
|
||||
current = netifs[name] = {}
|
||||
# Go ahead in line to not confuse options parser
|
||||
line = line[m.end():]
|
||||
offset += m.end() + 1
|
||||
if current is not None:
|
||||
# XXX checking for IPv4 address might also go here
|
||||
if "ipv6_addrs" not in current:
|
||||
parse_ipv6 = self.ipv6_c.search(line) is not None
|
||||
if not parse_ipv6 and not parse_blacklist:
|
||||
self._parse_netif_option(current, line)
|
||||
if parse_ipv6:
|
||||
parse_ipv6 = self._parse_ipv6(current, line)
|
||||
elif parse_blacklist:
|
||||
m = self.bl_c.search(line)
|
||||
if m is not None:
|
||||
if "blacklist" in current:
|
||||
current["blacklist"].append(m.group("addr"))
|
||||
else:
|
||||
current["whitelist"].append(m.group("addr"))
|
||||
else:
|
||||
parse_blacklist = False
|
||||
elif not parse_blacklist and \
|
||||
"blacklist" not in current and \
|
||||
"whitelist" not in current:
|
||||
m = self.bl_header_c.search(line)
|
||||
if m is not None:
|
||||
if m.group("mode") == "Black":
|
||||
current["blacklist"] = []
|
||||
else:
|
||||
current["whitelist"] = []
|
||||
parse_blacklist = True
|
||||
m = stats_parser.header_c.search(line)
|
||||
if m is not None:
|
||||
stats = stats_parser.parse(cmd_output[offset:])
|
||||
if stats is not None:
|
||||
current["stats"] = stats
|
||||
# assume stats to be always last
|
||||
break
|
||||
offset += len(line)
|
||||
return netifs
|
||||
|
||||
@staticmethod
|
||||
def _snake_case(option):
|
||||
"""
|
||||
Converts all option names parsed by _parse_netif_option() to snake_case
|
||||
|
||||
>>> IfconfigListParser._snake_case("Max. Retrans.")
|
||||
'max_retrans'
|
||||
"""
|
||||
return re.sub(r"\W+", "_", option.strip().lower()).strip("_")
|
||||
|
||||
@staticmethod
|
||||
def _convert_value(value_str):
|
||||
"""
|
||||
Tries to converts an option value parsed by _parse_netif_option() to
|
||||
int or float if possible
|
||||
|
||||
>>> IfconfigListParser._convert_value("12345")
|
||||
12345
|
||||
>>> IfconfigListParser._convert_value("0xf")
|
||||
15
|
||||
>>> IfconfigListParser._convert_value("3.14")
|
||||
3.14
|
||||
>>> IfconfigListParser._convert_value("3.14ispi")
|
||||
'3.14ispi'
|
||||
"""
|
||||
try:
|
||||
# try to convert to int
|
||||
return int(value_str)
|
||||
except ValueError:
|
||||
try:
|
||||
# next try to convert to int from hex
|
||||
if value_str.startswith("0x"):
|
||||
return int(value_str[2:], base=16)
|
||||
else:
|
||||
return int(value_str, base=16)
|
||||
except ValueError:
|
||||
try:
|
||||
# next try to convert to float
|
||||
return float(value_str)
|
||||
except ValueError:
|
||||
# lastly just use the string
|
||||
return value_str
|
||||
|
||||
def _parse_netif_option(self, netif, line):
|
||||
"""Parses all the options found before the IP address listing"""
|
||||
# remove potential content before line like logging tag, date etc.
|
||||
line = line.split(" ")[-1]
|
||||
# options and flags are separated by two spaces
|
||||
for token in line.strip().split(" "):
|
||||
# ensure there are no whitespaces at start or end => output bug
|
||||
assert token == token.strip()
|
||||
m = self.option_c.search(token)
|
||||
if m is not None:
|
||||
option = self._snake_case(m.group("option"))
|
||||
value_str = m.group("value").strip()
|
||||
netif[option] = self._convert_value(value_str)
|
||||
m = self.flag_c.search(token)
|
||||
if m is not None:
|
||||
flag = m.group("flag")
|
||||
if "flags" in netif:
|
||||
netif["flags"].append(flag)
|
||||
else:
|
||||
netif["flags"] = [flag]
|
||||
|
||||
def _parse_ipv6(self, netif, line):
|
||||
"""
|
||||
Parses IPv6 unicast, anycast, and multicast addresses
|
||||
"""
|
||||
m = self.ipv6_c.search(line)
|
||||
if m is not None:
|
||||
addr = m.groupdict()
|
||||
typ = addr.pop("type")
|
||||
if typ == "addr": # unicast address
|
||||
# reformat anycast item if existent
|
||||
if addr.get("anycast") is None:
|
||||
addr.pop("anycast", None)
|
||||
else:
|
||||
addr["anycast"] = True
|
||||
if "ipv6_addrs" in netif:
|
||||
netif["ipv6_addrs"].append(addr)
|
||||
else:
|
||||
netif["ipv6_addrs"] = [addr]
|
||||
else: # multicast address
|
||||
for key in set(addr):
|
||||
# remove empty matches
|
||||
if addr[key] is None:
|
||||
del addr[key]
|
||||
if "ipv6_groups" in netif:
|
||||
netif["ipv6_groups"].append(addr)
|
||||
else:
|
||||
netif["ipv6_groups"] = [addr]
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class IfconfigStatsParser(ShellInteractionParser):
|
||||
def __init__(self):
|
||||
self.header_c = re.compile(r"Statistics for (?P<module>.+)$")
|
||||
self.rx_c = re.compile(r"RX packets\s+(?P<packets>\d+)\s+"
|
||||
r"bytes\s+(?P<bytes>\d+)$")
|
||||
self.tx_c = re.compile(r"TX packets\s+(?P<packets>\d+)\s+"
|
||||
r"\(Multicast:\s+(?P<multicast>\d+)\)\s+"
|
||||
r"bytes\s+(?P<bytes>\d+)$")
|
||||
self.tx_err_c = re.compile(r"TX succeeded\s+(?P<succeeded>\d+)\s+"
|
||||
r"errors\s+(?P<errors>\d+)$")
|
||||
|
||||
def parse(self, cmd_output):
|
||||
"""
|
||||
Parses output of Ifconfig::ifconfig_stats or the statistics part of
|
||||
Ifconfig::ifconfig_list.
|
||||
|
||||
:param cmd_output(str): output of Ifconfig::ifconfig_stats or
|
||||
Ifconfig::ifconfig_list
|
||||
:return: dictionary with one entry per statistics module, each module
|
||||
containing an entry 'rx' and 'tx' with respective statistics
|
||||
|
||||
>>> parser = IfconfigStatsParser()
|
||||
>>> res = parser.parse(
|
||||
... "Statistics for IPv6\\n"
|
||||
... " RX packets 14 bytes 1104\\n"
|
||||
... " TX packets 3 (Multicast: 1) bytes 192\\n"
|
||||
... " TX succeeded 3 errors 0\\n")
|
||||
>>> sorted(res)
|
||||
['IPv6']
|
||||
>>> sorted(res["IPv6"])
|
||||
['rx', 'tx']
|
||||
>>> sorted(res["IPv6"]["rx"])
|
||||
['bytes', 'packets']
|
||||
>>> sorted(res["IPv6"]["tx"])
|
||||
['bytes', 'errors', 'multicast', 'packets', 'succeeded']
|
||||
>>> res["IPv6"]["rx"]["bytes"]
|
||||
1104
|
||||
"""
|
||||
stats = None
|
||||
current = None
|
||||
for line in cmd_output.splitlines():
|
||||
line = line.strip()
|
||||
m = self.header_c.search(line)
|
||||
if m is not None:
|
||||
module = m.group("module")
|
||||
if stats is None:
|
||||
stats = {}
|
||||
current = stats[module] = {}
|
||||
if current is not None:
|
||||
if "rx" not in current:
|
||||
m = self.rx_c.search(line)
|
||||
if m is not None:
|
||||
current["rx"] = {k: int(v)
|
||||
for k, v in m.groupdict().items()}
|
||||
elif "tx" not in current:
|
||||
m = self.tx_c.search(line)
|
||||
if m is not None:
|
||||
current["tx"] = {k: int(v)
|
||||
for k, v in m.groupdict().items()}
|
||||
elif "tx" in current:
|
||||
m = self.tx_err_c.search(line)
|
||||
if m is not None:
|
||||
current["tx"].update(
|
||||
{k: int(v) for k, v in m.groupdict().items()}
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ==== ShellInteractions ====
|
||||
|
||||
class Ifconfig(ShellInteraction):
|
||||
def ifconfig_list(self, netif=None, timeout=-1, async_=False):
|
||||
return self.ifconfig_cmd(netif=netif, timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_cmd(self, netif=None, args=None, timeout=-1, async_=False):
|
||||
cmd = "ifconfig"
|
||||
if netif is not None:
|
||||
cmd += " {netif}".format(netif=netif)
|
||||
if args is not None:
|
||||
if netif is None:
|
||||
raise ValueError("netif required when args are provided")
|
||||
cmd += " {args}".format(args=" ".join(args))
|
||||
return self.cmd(cmd, timeout=timeout, async_=False)
|
||||
|
||||
def ifconfig_help(self, netif, timeout=-1, async_=False):
|
||||
return self.ifconfig_cmd(netif=netif, args=("help",),
|
||||
timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_set(self, netif, key, value, timeout=-1, async_=False):
|
||||
return self._ifconfig_success_cmd(netif=netif,
|
||||
args=("set", key, value),
|
||||
timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_up(self, netif, timeout=-1, async_=False):
|
||||
self._ifconfig_error_cmd(netif=netif, args=("up",),
|
||||
timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_down(self, netif, timeout=-1, async_=False):
|
||||
self._ifconfig_error_cmd(netif=netif, args=("down",),
|
||||
timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_add(self, netif, addr, anycast=False,
|
||||
timeout=-1, async_=False):
|
||||
args = ["add", addr]
|
||||
if anycast:
|
||||
args.append("anycast")
|
||||
return self._ifconfig_success_cmd(netif=netif, args=args,
|
||||
timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_del(self, netif, addr, timeout=-1, async_=False):
|
||||
return self._ifconfig_success_cmd(netif=netif, args=("del", addr),
|
||||
timeout=timeout, async_=async_)
|
||||
|
||||
def ifconfig_flag(self, netif, flag, enable=True,
|
||||
timeout=-1, async_=False):
|
||||
return self._ifconfig_success_cmd(
|
||||
netif=netif, args=("{}{}".format("" if enable else "-", flag),),
|
||||
timeout=timeout, async_=async_
|
||||
)
|
||||
|
||||
def ifconfig_l2filter_add(self, netif, addr, timeout=-1, async_=False):
|
||||
return self._ifconfig_success_cmd(
|
||||
netif=netif, args=("l2filter", "add", addr),
|
||||
timeout=timeout, async_=async_
|
||||
)
|
||||
|
||||
def ifconfig_l2filter_del(self, netif, addr, timeout=-1, async_=False):
|
||||
return self._ifconfig_success_cmd(
|
||||
netif=netif, args=("l2filter", "del", addr),
|
||||
timeout=timeout, async_=async_
|
||||
)
|
||||
|
||||
def ifconfig_stats(self, netif, module, timeout=-1, async_=False):
|
||||
res = self.ifconfig_cmd(netif=netif, args=("stats", module),
|
||||
timeout=timeout, async_=async_)
|
||||
if "Statistics for " in res:
|
||||
return res
|
||||
raise RuntimeError(res)
|
||||
|
||||
def ifconfig_stats_reset(self, netif, module, timeout=-1, async_=False):
|
||||
res = self.ifconfig_cmd(netif=netif, args=("stats", module, "reset"),
|
||||
timeout=timeout, async_=async_)
|
||||
if "Reset statistics for module " in res:
|
||||
return res
|
||||
raise RuntimeError(res)
|
||||
|
||||
def _ifconfig_success_cmd(self, netif=None, args=None,
|
||||
timeout=-1, async_=False):
|
||||
"""For commands that have a success output"""
|
||||
res = self.ifconfig_cmd(netif=netif, args=args,
|
||||
timeout=timeout, async_=async_)
|
||||
if "success" in res:
|
||||
return res
|
||||
raise RuntimeError(res)
|
||||
|
||||
def _ifconfig_error_cmd(self, netif=None, args=None,
|
||||
timeout=-1, async_=False):
|
||||
"""For commands that only have an error output"""
|
||||
res = self.ifconfig_cmd(netif=netif, args=args,
|
||||
timeout=timeout, async_=async_)
|
||||
if "error" in res:
|
||||
raise RuntimeError(res)
|
||||
|
||||
|
||||
class TXTSnd(ShellInteraction):
|
||||
def netif_txtsnd(self, netif, target, data, timeout=-1, async_=False):
|
||||
cmd = "txtsnd {netif} {target} {data}".format(
|
||||
netif=netif,
|
||||
target=target,
|
||||
data=data
|
||||
)
|
||||
res = self.cmd(cmd)
|
||||
if "error" in res or "usage" in res:
|
||||
raise RuntimeError(res)
|
||||
return res
|
||||
18
dist/pythonlibs/riotctrl_shell/tests/common.py
vendored
18
dist/pythonlibs/riotctrl_shell/tests/common.py
vendored
@ -10,11 +10,19 @@ class MockSpawn():
|
||||
# set some expected attributes
|
||||
self.before = None
|
||||
self.echo = False
|
||||
self.output = None
|
||||
self.last_command = None
|
||||
|
||||
def sendline(self, line, *args, **kwargs):
|
||||
# just echo last input for before (what replwrap is assembling output
|
||||
# from)
|
||||
self.before = line
|
||||
self.last_command = line
|
||||
if self.output is None:
|
||||
# just echo last input for before (what replwrap is assembling
|
||||
# output from)
|
||||
self.before = line
|
||||
else:
|
||||
# use pre-configured output in case command expects a specific
|
||||
# output
|
||||
self.before = self.output
|
||||
|
||||
def expect_exact(self, *args, **kwargs):
|
||||
# always match on prompt with replwrap
|
||||
@ -27,8 +35,10 @@ class MockRIOTCtrl():
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.term = MockSpawn()
|
||||
self.last_command = None
|
||||
|
||||
|
||||
def init_ctrl():
|
||||
def init_ctrl(output=None):
|
||||
rc = MockRIOTCtrl("foobar", env={"BOARD": "native"})
|
||||
rc.term.output = output
|
||||
return rc
|
||||
|
||||
287
dist/pythonlibs/riotctrl_shell/tests/test_netif.py
vendored
Normal file
287
dist/pythonlibs/riotctrl_shell/tests/test_netif.py
vendored
Normal file
@ -0,0 +1,287 @@
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import pytest
|
||||
import riotctrl_shell.netif
|
||||
|
||||
from .common import init_ctrl
|
||||
|
||||
|
||||
def test_ifconfig_stats_parser():
|
||||
cmd_output = """
|
||||
Statistics for Layer 2
|
||||
RX packets 4 bytes 400
|
||||
TX packets 1 (Multicast: 1) bytes 78
|
||||
TX succeeded 1 errors 0
|
||||
Statistics for IPv6
|
||||
RX packets 4 bytes 344
|
||||
TX packets 1 (Multicast: 1) bytes 64
|
||||
TX succeeded 1 errors 0"""
|
||||
parser = riotctrl_shell.netif.IfconfigStatsParser()
|
||||
res = parser.parse(cmd_output)
|
||||
assert len(res) == 2
|
||||
assert res["Layer 2"]["rx"]["packets"] == 4
|
||||
assert res["Layer 2"]["rx"]["bytes"] == 400
|
||||
assert res["Layer 2"]["tx"]["packets"] == 1
|
||||
assert res["Layer 2"]["tx"]["multicast"] == 1
|
||||
assert res["Layer 2"]["tx"]["bytes"] == 78
|
||||
assert res["Layer 2"]["tx"]["succeeded"] == 1
|
||||
assert res["Layer 2"]["tx"]["errors"] == 0
|
||||
assert res["IPv6"]["rx"]["packets"] == 4
|
||||
assert res["IPv6"]["rx"]["bytes"] == 344
|
||||
assert res["IPv6"]["tx"]["packets"] == 1
|
||||
assert res["IPv6"]["tx"]["multicast"] == 1
|
||||
assert res["IPv6"]["tx"]["bytes"] == 64
|
||||
assert res["IPv6"]["tx"]["succeeded"] == 1
|
||||
assert res["IPv6"]["tx"]["errors"] == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"args,expected",
|
||||
[((), "ifconfig"), (("foobar",), "ifconfig foobar")]
|
||||
)
|
||||
def test_ifconfig_list(args, expected):
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_list(*args)
|
||||
# mock just returns last input
|
||||
assert res == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"args,expected",
|
||||
[((), "ifconfig"), (("foobar",), "ifconfig foobar")]
|
||||
)
|
||||
def test_ifconfig_cmd_empty(args, expected):
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_cmd(*args)
|
||||
assert res == expected
|
||||
|
||||
|
||||
def test_ifconfig_cmd_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(ValueError):
|
||||
si.ifconfig_cmd(args=("test", "12345"))
|
||||
|
||||
|
||||
def test_ifconfig_help():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_help("foobar")
|
||||
assert res == "ifconfig foobar help"
|
||||
|
||||
|
||||
def test_ifconfig_set():
|
||||
rc = init_ctrl(output="success: address set")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_set("foobar", "addr", "42:de:ad:c0:ff:ee")
|
||||
assert res == "success: address set"
|
||||
assert rc.term.last_command == "ifconfig foobar set addr 42:de:ad:c0:ff:ee"
|
||||
|
||||
|
||||
def test_ifconfig_set_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_set("foobar", "addr", "42:de:ad:c0:ff:ee")
|
||||
assert rc.term.last_command == "ifconfig foobar set addr 42:de:ad:c0:ff:ee"
|
||||
|
||||
|
||||
def test_ifconfig_up():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
si.ifconfig_up("foobar")
|
||||
assert rc.term.last_command == "ifconfig foobar up"
|
||||
|
||||
|
||||
def test_ifconfig_up_error():
|
||||
rc = init_ctrl("error: unable to set link foobar")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_up("foobar")
|
||||
assert rc.term.last_command == "ifconfig foobar up"
|
||||
|
||||
|
||||
def test_ifconfig_down():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
si.ifconfig_down("foobar")
|
||||
assert rc.term.last_command == "ifconfig foobar down"
|
||||
|
||||
|
||||
def test_ifconfig_down_error():
|
||||
rc = init_ctrl("error: unable to set link foobar")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_down("foobar")
|
||||
assert rc.term.last_command == "ifconfig foobar down"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"kwargs,expected",
|
||||
[({"netif": "foobar", "addr": "dead:coff:ee::/64"},
|
||||
"ifconfig foobar add dead:coff:ee::/64"),
|
||||
({"netif": "foobar", "addr": "dead:coff:ee::/64", "anycast": False},
|
||||
"ifconfig foobar add dead:coff:ee::/64"),
|
||||
({"netif": "foobar", "addr": "dead:coff:ee::/64", "anycast": True},
|
||||
"ifconfig foobar add dead:coff:ee::/64 anycast")]
|
||||
)
|
||||
def test_ifconfig_add(kwargs, expected):
|
||||
rc = init_ctrl(output="success: added address to interface")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_add(**kwargs)
|
||||
assert res == "success: added address to interface"
|
||||
assert rc.term.last_command == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"kwargs,expected",
|
||||
[({"netif": "foobar", "addr": "dead:coff:ee::/64"},
|
||||
"ifconfig foobar add dead:coff:ee::/64"),
|
||||
({"netif": "foobar", "addr": "dead:coff:ee::/64", "anycast": False},
|
||||
"ifconfig foobar add dead:coff:ee::/64"),
|
||||
({"netif": "foobar", "addr": "dead:coff:ee::/64", "anycast": True},
|
||||
"ifconfig foobar add dead:coff:ee::/64 anycast")]
|
||||
)
|
||||
def test_ifconfig_add_error(kwargs, expected):
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_add(**kwargs)
|
||||
assert rc.term.last_command == expected
|
||||
|
||||
|
||||
def test_ifconfig_del():
|
||||
rc = init_ctrl(output="success: removed address from interface")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_del("foobar", "dead:coff:ee::/64")
|
||||
assert res == "success: removed address from interface"
|
||||
assert rc.term.last_command == "ifconfig foobar del dead:coff:ee::/64"
|
||||
|
||||
|
||||
def test_ifconfig_del_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_del("foobar", "dead:coff:ee::/64")
|
||||
assert rc.term.last_command == "ifconfig foobar del dead:coff:ee::/64"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"kwargs,expected",
|
||||
[({"netif": "foobar", "flag": "6lo"}, "ifconfig foobar 6lo"),
|
||||
({"netif": "foobar", "flag": "6lo", "enable": True},
|
||||
"ifconfig foobar 6lo"),
|
||||
({"netif": "foobar", "flag": "6lo", "enable": False},
|
||||
"ifconfig foobar -6lo")]
|
||||
)
|
||||
def test_ifconfig_flag(kwargs, expected):
|
||||
rc = init_ctrl(output="success: set option")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_flag(**kwargs)
|
||||
assert res == "success: set option"
|
||||
assert rc.term.last_command == expected
|
||||
|
||||
|
||||
def test_ifconfig_flag_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_flag("foobar", "6lo", False)
|
||||
assert rc.term.last_command == "ifconfig foobar -6lo"
|
||||
|
||||
|
||||
def test_ifconfig_l2filter_add():
|
||||
rc = init_ctrl(output="successfully added address to filter")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_l2filter_add("foobar", "ab:cd:ef:01:23:45")
|
||||
assert res == "successfully added address to filter"
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar l2filter add ab:cd:ef:01:23:45"
|
||||
|
||||
|
||||
def test_ifconfig_l2filter_add_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_l2filter_add("foobar", "ab:cd:ef:01:23:45")
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar l2filter add ab:cd:ef:01:23:45"
|
||||
|
||||
|
||||
def test_ifconfig_l2filter_del():
|
||||
rc = init_ctrl(output="successfully removed address to filter")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_l2filter_del("foobar", "ab:cd:ef:01:23:45")
|
||||
assert res == "successfully removed address to filter"
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar l2filter del ab:cd:ef:01:23:45"
|
||||
|
||||
|
||||
def test_ifconfig_l2filter_del_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_l2filter_del("foobar", "ab:cd:ef:01:23:45")
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar l2filter del ab:cd:ef:01:23:45"
|
||||
|
||||
|
||||
def test_ifconfig_stats():
|
||||
rc = init_ctrl(output=" Statistics for Layer 2\n RX ...")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_stats("foobar", "l2")
|
||||
assert res.startswith(" Statistics for ")
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar stats l2"
|
||||
|
||||
|
||||
def test_ifconfig_stats_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_stats("foobar", "l2")
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar stats l2"
|
||||
|
||||
|
||||
def test_ifconfig_stats_reset():
|
||||
rc = init_ctrl(output="Reset statistics for module Layer 2!")
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
res = si.ifconfig_stats_reset("foobar", "l2")
|
||||
assert res == "Reset statistics for module Layer 2!"
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar stats l2 reset"
|
||||
|
||||
|
||||
def test_ifconfig_stats_reset_error():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.Ifconfig(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.ifconfig_stats_reset("foobar", "l2")
|
||||
assert rc.term.last_command == \
|
||||
"ifconfig foobar stats l2 reset"
|
||||
|
||||
|
||||
def test_txtsnd():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.netif.TXTSnd(rc)
|
||||
res = si.netif_txtsnd("foobar", "bcast", "abcdef")
|
||||
assert res == "txtsnd foobar bcast abcdef"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"error_msg",
|
||||
["error: foobar", "usage: txtsnd foobar"]
|
||||
)
|
||||
def test_txtsnd_error(error_msg):
|
||||
rc = init_ctrl(output=error_msg)
|
||||
si = riotctrl_shell.netif.TXTSnd(rc)
|
||||
with pytest.raises(RuntimeError):
|
||||
si.netif_txtsnd("foobar", "bcast", "abcdef")
|
||||
assert rc.term.last_command == "txtsnd foobar bcast abcdef"
|
||||
366
dist/pythonlibs/riotctrl_shell/tests/test_netif_list_parse.py
vendored
Normal file
366
dist/pythonlibs/riotctrl_shell/tests/test_netif_list_parse.py
vendored
Normal file
@ -0,0 +1,366 @@
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
"""Tests various example outputs of Ifconfig::ifconfig_list()"""
|
||||
|
||||
import riotctrl_shell.netif
|
||||
|
||||
|
||||
def test_ifconfig_list_parser1():
|
||||
cmd_output = """
|
||||
Iface 5 HWaddr: E2:BC:7D:CB:F5:50
|
||||
L2-PDU:1500 MTU:1500 HL:64 RTR
|
||||
RTR_ADV
|
||||
Source address length: 6
|
||||
Link type: wired
|
||||
inet6 addr: fe80::e0bc:7dff:fecb:f550 scope: link VAL
|
||||
inet6 addr: fe80::e0bc:7dff:fecb:f551 scope: link [anycast] TNT[2]
|
||||
inet6 group: ff02::2
|
||||
inet6 group: ff02::1
|
||||
inet6 group: ff02::1:ffcb:f550
|
||||
|
||||
White-listed link layer addresses:
|
||||
0: ab:cd:ef:01:23:45
|
||||
|
||||
Statistics for Layer 2
|
||||
RX packets 14 bytes 1300
|
||||
TX packets 3 (Multicast: 1) bytes 234
|
||||
TX succeeded 3 errors 0
|
||||
Statistics for IPv6
|
||||
RX packets 14 bytes 1104
|
||||
TX packets 3 (Multicast: 1) bytes 192
|
||||
TX succeeded 3 errors 0"""
|
||||
parser = riotctrl_shell.netif.IfconfigListParser()
|
||||
res = parser.parse(cmd_output)
|
||||
assert len(res) == 1
|
||||
assert "5" in res
|
||||
assert len(res["5"]["flags"]) == 2
|
||||
assert "RTR" in res["5"]["flags"]
|
||||
assert "RTR_ADV" in res["5"]["flags"]
|
||||
assert res["5"]["hwaddr"] == "E2:BC:7D:CB:F5:50"
|
||||
assert res["5"]["l2_pdu"] == 1500
|
||||
assert res["5"]["mtu"] == 1500
|
||||
assert res["5"]["hl"] == 64
|
||||
assert res["5"]["source_address_length"] == 6
|
||||
assert res["5"]["link_type"] == "wired"
|
||||
assert len(res["5"]["ipv6_addrs"]) == 2
|
||||
assert {"addr": "fe80::e0bc:7dff:fecb:f550", "scope": "link",
|
||||
"state": "VAL"} in res["5"]["ipv6_addrs"]
|
||||
assert {"addr": "fe80::e0bc:7dff:fecb:f551", "scope": "link",
|
||||
"state": "TNT[2]", "anycast": True} in res["5"]["ipv6_addrs"]
|
||||
assert len(res["5"]["ipv6_groups"]) == 3
|
||||
assert {"addr": "ff02::1"} in res["5"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::2"} in res["5"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::1:ffcb:f550"} in res["5"]["ipv6_groups"]
|
||||
assert len(res["5"]["whitelist"]) == 1
|
||||
# white and blacklist are mutually exclusive
|
||||
assert "blacklist" not in res["5"]
|
||||
assert "ab:cd:ef:01:23:45" in res["5"]["whitelist"]
|
||||
assert len(res["5"]["stats"]) == 2
|
||||
assert res["5"]["stats"]["Layer 2"]["rx"]["packets"] == 14
|
||||
assert res["5"]["stats"]["Layer 2"]["rx"]["bytes"] == 1300
|
||||
assert res["5"]["stats"]["Layer 2"]["tx"]["packets"] == 3
|
||||
assert res["5"]["stats"]["Layer 2"]["tx"]["multicast"] == 1
|
||||
assert res["5"]["stats"]["Layer 2"]["tx"]["bytes"] == 234
|
||||
assert res["5"]["stats"]["Layer 2"]["tx"]["succeeded"] == 3
|
||||
assert res["5"]["stats"]["Layer 2"]["tx"]["errors"] == 0
|
||||
assert res["5"]["stats"]["IPv6"]["rx"]["packets"] == 14
|
||||
assert res["5"]["stats"]["IPv6"]["rx"]["bytes"] == 1104
|
||||
assert res["5"]["stats"]["IPv6"]["tx"]["packets"] == 3
|
||||
assert res["5"]["stats"]["IPv6"]["tx"]["multicast"] == 1
|
||||
assert res["5"]["stats"]["IPv6"]["tx"]["bytes"] == 192
|
||||
assert res["5"]["stats"]["IPv6"]["tx"]["succeeded"] == 3
|
||||
assert res["5"]["stats"]["IPv6"]["tx"]["errors"] == 0
|
||||
|
||||
|
||||
def test_ifconfig_list_parser2():
|
||||
cmd_output = """
|
||||
Iface 7 HWaddr: 76:F5:98:9F:40:22
|
||||
L2-PDU:1500 MTU:1500 HL:64 RTR
|
||||
Source address length: 6
|
||||
Link type: wired
|
||||
inet6 addr: fe80::74f5:98ff:fe9f:4022 scope: link VAL
|
||||
inet6 addr: fe80::2 scope: link VAL
|
||||
inet6 group: ff02::2
|
||||
inet6 group: ff02::1
|
||||
inet6 group: ff02::1:ff9f:4022
|
||||
inet6 group: ff02::1:ff00:2
|
||||
|
||||
Iface 6 HWaddr: 2D:4A Channel: 26 Page: 0 NID: 0x23 PHY: O-QPSK
|
||||
|
||||
Long HWaddr: 5A:9D:93:86:22:08:65:7B
|
||||
TX-Power: 0dBm State: IDLE max. Retrans.: 3 CSMA Retries: 4
|
||||
AUTOACK ACK_REQ CSMA L2-PDU:102 MTU:1280 HL:64 RTR
|
||||
RTR_ADV 6LO IPHC
|
||||
Source address length: 8
|
||||
Link type: wireless
|
||||
inet6 addr: fe80::589d:9386:2208:657b scope: link VAL
|
||||
inet6 group: ff02::2
|
||||
inet6 group: ff02::1
|
||||
inet6 group: ff02::1:ff08:657b"""
|
||||
parser = riotctrl_shell.netif.IfconfigListParser()
|
||||
res = parser.parse(cmd_output)
|
||||
assert len(res) == 2
|
||||
assert "7" in res
|
||||
assert len(res["7"]["flags"]) == 1
|
||||
assert "RTR" in res["7"]["flags"]
|
||||
assert res["7"]["source_address_length"] == 6
|
||||
assert res["7"]["link_type"] == "wired"
|
||||
assert len(res["7"]["ipv6_addrs"]) == 2
|
||||
assert {"addr": "fe80::74f5:98ff:fe9f:4022", "scope": "link",
|
||||
"state": "VAL"} in res["7"]["ipv6_addrs"]
|
||||
assert {"addr": "fe80::2", "scope": "link",
|
||||
"state": "VAL"} in res["7"]["ipv6_addrs"]
|
||||
assert len(res["7"]["ipv6_groups"]) == 4
|
||||
assert {"addr": "ff02::1"} in res["7"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::2"} in res["7"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::1:ff9f:4022"} in res["7"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::1:ff00:2"} in res["7"]["ipv6_groups"]
|
||||
assert "6" in res
|
||||
assert len(res["6"]["flags"]) == 7
|
||||
assert "AUTOACK" in res["6"]["flags"]
|
||||
assert "ACK_REQ" in res["6"]["flags"]
|
||||
assert "CSMA" in res["6"]["flags"]
|
||||
assert "RTR" in res["6"]["flags"]
|
||||
assert "RTR_ADV" in res["6"]["flags"]
|
||||
assert "6LO" in res["6"]["flags"]
|
||||
assert "IPHC" in res["6"]["flags"]
|
||||
assert res["6"]["hwaddr"] == "2D:4A"
|
||||
assert res["6"]["channel"] == 26
|
||||
assert res["6"]["page"] == 0
|
||||
assert res["6"]["nid"] == 0x23
|
||||
assert res["6"]["phy"] == "O-QPSK"
|
||||
assert res["6"]["long_hwaddr"] == "5A:9D:93:86:22:08:65:7B"
|
||||
assert res["6"]["tx_power"] == "0dBm"
|
||||
assert res["6"]["state"] == "IDLE"
|
||||
assert res["6"]["max_retrans"] == 3
|
||||
assert res["6"]["csma_retries"] == 4
|
||||
assert res["6"]["l2_pdu"] == 102
|
||||
assert res["6"]["mtu"] == 1280
|
||||
assert res["6"]["hl"] == 64
|
||||
assert res["6"]["source_address_length"] == 8
|
||||
assert res["6"]["link_type"] == "wireless"
|
||||
assert len(res["6"]["ipv6_addrs"]) == 1
|
||||
assert {"addr": "fe80::589d:9386:2208:657b", "scope": "link",
|
||||
"state": "VAL"} in res["6"]["ipv6_addrs"]
|
||||
assert {"addr": "ff02::1"} in res["6"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::2"} in res["6"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::1:ff08:657b"} in res["6"]["ipv6_groups"]
|
||||
|
||||
|
||||
def test_ifconfig_list_parser3():
|
||||
cmd_output = """
|
||||
ifconfig
|
||||
Iface 3 HWaddr: 26:01:24:C0 Frequency: 869524963Hz BW: 125kHz SF: 12 CR: 4/5 Link: up
|
||||
TX-Power: 14dBm State: SLEEP Demod margin.: 0 Num gateways.: 0
|
||||
IQ_INVERT
|
||||
RX_SINGLE OTAA L2-PDU:2559
|
||||
""" # noqa: E501
|
||||
parser = riotctrl_shell.netif.IfconfigListParser()
|
||||
res = parser.parse(cmd_output)
|
||||
assert len(res) == 1
|
||||
assert "3" in res
|
||||
print(res)
|
||||
assert len(res["3"]["flags"]) == 3
|
||||
assert "IQ_INVERT" in res["3"]["flags"]
|
||||
assert "RX_SINGLE" in res["3"]["flags"]
|
||||
assert "OTAA" in res["3"]["flags"]
|
||||
assert res["3"]["hwaddr"] == "26:01:24:C0"
|
||||
assert res["3"]["frequency"] == "869524963Hz"
|
||||
assert res["3"]["bw"] == "125kHz"
|
||||
assert res["3"]["sf"] == 12
|
||||
assert res["3"]["cr"] == "4/5"
|
||||
assert res["3"]["link"] == "up"
|
||||
assert res["3"]["tx_power"] == "14dBm"
|
||||
assert res["3"]["state"] == "SLEEP"
|
||||
assert res["3"]["demod_margin"] == 0
|
||||
assert res["3"]["num_gateways"] == 0
|
||||
assert res["3"]["l2_pdu"] == 2559
|
||||
|
||||
|
||||
def test_ifconfig_list_parser4():
|
||||
cmd_output = """
|
||||
> ifconfig 7
|
||||
2020-07-06 12:13:37,636 # ifconfig 7
|
||||
2020-07-06 12:13:37,653 # Iface 7 HWaddr: 3A:A4 Channel: 26 NID: 0x23 PHY: MR-O-QPSK
|
||||
2020-07-06 12:13:37,654 # chip rate: 2000 rate mode: 0
|
||||
2020-07-06 12:13:37,655 # Long HWaddr: 22:68:31:23:59:F5:D2:38
|
||||
2020-07-06 12:13:37,669 # TX-Power: 0dBm State: IDLE max. Retrans.: 3 CSMA Retries: 4
|
||||
2020-07-06 12:13:37,671 # AUTOACK ACK_REQ CSMA L2-PDU:2022 MTU:1280 HL:64 RTR
|
||||
2020-07-06 12:13:37,671 # 6LO IPHC
|
||||
2020-07-06 12:13:37,672 # Source address length: 8
|
||||
2020-07-06 12:13:37,684 # Link type: wireless
|
||||
2020-07-06 12:13:37,686 # inet6 addr: fe80::2068:3123:59f5:d238 scope: link VAL
|
||||
2020-07-06 12:13:37,686 # inet6 group: ff02::2
|
||||
2020-07-06 12:13:37,687 # inet6 group: ff02::1
|
||||
2020-07-06 12:13:37,688 # inet6 group: ff02::1:fff5:d238
|
||||
2020-07-06 12:13:37,700 #
|
||||
2020-07-06 12:13:37,701 # Statistics for Layer 2
|
||||
2020-07-06 12:13:37,702 # RX packets 1 bytes 43
|
||||
2020-07-06 12:13:37,703 # TX packets 6 (Multicast: 6) bytes 258
|
||||
2020-07-06 12:13:37,704 # TX succeeded 6 errors 0
|
||||
2020-07-06 12:13:37,716 # Statistics for IPv6
|
||||
2020-07-06 12:13:37,717 # RX packets 1 bytes 64
|
||||
2020-07-06 12:13:37,718 # TX packets 6 (Multicast: 6) bytes 384
|
||||
2020-07-06 12:13:37,719 # TX succeeded 6 errors 0 """ # noqa: E501
|
||||
parser = riotctrl_shell.netif.IfconfigListParser()
|
||||
res = parser.parse(cmd_output)
|
||||
assert len(res) == 1
|
||||
assert "7" in res
|
||||
assert len(res["7"]["flags"]) == 6
|
||||
assert "AUTOACK" in res["7"]["flags"]
|
||||
assert "ACK_REQ" in res["7"]["flags"]
|
||||
assert "CSMA" in res["7"]["flags"]
|
||||
assert "RTR" in res["7"]["flags"]
|
||||
assert "6LO" in res["7"]["flags"]
|
||||
assert "IPHC" in res["7"]["flags"]
|
||||
assert res["7"]["hwaddr"] == "3A:A4"
|
||||
assert res["7"]["channel"] == 26
|
||||
assert "page" not in res["7"]
|
||||
assert res["7"]["nid"] == 0x23
|
||||
assert res["7"]["phy"] == "MR-O-QPSK"
|
||||
assert res["7"]["chip_rate"] == 2000
|
||||
assert res["7"]["rate_mode"] == 0
|
||||
assert res["7"]["long_hwaddr"] == "22:68:31:23:59:F5:D2:38"
|
||||
assert res["7"]["tx_power"] == "0dBm"
|
||||
assert res["7"]["state"] == "IDLE"
|
||||
assert res["7"]["max_retrans"] == 3
|
||||
assert res["7"]["csma_retries"] == 4
|
||||
assert res["7"]["l2_pdu"] == 2022
|
||||
assert res["7"]["mtu"] == 1280
|
||||
assert res["7"]["hl"] == 64
|
||||
assert res["7"]["source_address_length"] == 8
|
||||
assert res["7"]["link_type"] == "wireless"
|
||||
assert len(res["7"]["ipv6_addrs"]) == 1
|
||||
assert {"addr": "fe80::2068:3123:59f5:d238", "scope": "link",
|
||||
"state": "VAL"} in res["7"]["ipv6_addrs"]
|
||||
assert len(res["7"]["ipv6_groups"]) == 3
|
||||
assert {"addr": "ff02::1"} in res["7"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::2"} in res["7"]["ipv6_groups"]
|
||||
assert {"addr": "ff02::1:fff5:d238"} in res["7"]["ipv6_groups"]
|
||||
assert len(res["7"]["stats"]) == 2
|
||||
assert res["7"]["stats"]["Layer 2"]["rx"]["packets"] == 1
|
||||
assert res["7"]["stats"]["Layer 2"]["rx"]["bytes"] == 43
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["packets"] == 6
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["multicast"] == 6
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["bytes"] == 258
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["succeeded"] == 6
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["errors"] == 0
|
||||
assert res["7"]["stats"]["IPv6"]["rx"]["packets"] == 1
|
||||
assert res["7"]["stats"]["IPv6"]["rx"]["bytes"] == 64
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["packets"] == 6
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["multicast"] == 6
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["bytes"] == 384
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["succeeded"] == 6
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["errors"] == 0
|
||||
|
||||
|
||||
def test_ifconfig_list_parser5():
|
||||
cmd_output = """
|
||||
> ifconfig 7
|
||||
Iface 7 HWaddr: 3A:A4 Channel: 26 NID: 0x23 PHY: MR-OFDM
|
||||
Option: 1 MCS: 0 (BPSK, rate 1/2, 4x frequency repetition)
|
||||
Long HWaddr: 22:68:31:23:59:F5:D2:38
|
||||
TX-Power: 0dBm State: IDLE max. Retrans.: 3 CSMA Retries: 4
|
||||
AUTOACK ACK_REQ CSMA L2-PDU:2022 MTU:1280 HL:64 RTR
|
||||
6LO IPHC
|
||||
Source address length: 8
|
||||
Link type: wireless
|
||||
inet6 addr: fe80::2068:3123:59f5:d238 scope: link VAL
|
||||
inet6 group: ff02::2
|
||||
inet6 group: ff02::1
|
||||
inet6 group: ff02::1:fff5:d238
|
||||
|
||||
Statistics for Layer 2
|
||||
RX packets 1 bytes 43
|
||||
TX packets 7 (Multicast: 7) bytes 301
|
||||
TX succeeded 7 errors 0
|
||||
Statistics for IPv6
|
||||
RX packets 1 bytes 64
|
||||
TX packets 7 (Multicast: 7) bytes 448
|
||||
TX succeeded 7 errors 0""" # noqa: E501
|
||||
parser = riotctrl_shell.netif.IfconfigListParser()
|
||||
res = parser.parse(cmd_output)
|
||||
# not much changed compared to test_ifconfig_list_parser4, so only check
|
||||
# mainly differences
|
||||
assert len(res) == 1
|
||||
assert "7" in res
|
||||
assert len(res["7"]["flags"]) == 6
|
||||
assert "page" not in res["7"]
|
||||
assert res["7"]["phy"] == "MR-OFDM"
|
||||
assert res["7"]["option"] == 1
|
||||
assert res["7"]["mcs"] == "0 (BPSK, rate 1/2, 4x frequency repetition)"
|
||||
assert "chip_rate" not in res["7"]
|
||||
assert "rate_mode" not in res["7"]
|
||||
assert len(res["7"]["ipv6_addrs"]) == 1
|
||||
assert len(res["7"]["ipv6_groups"]) == 3
|
||||
assert len(res["7"]["stats"]) == 2
|
||||
assert res["7"]["stats"]["Layer 2"]["rx"]["packets"] == 1
|
||||
assert res["7"]["stats"]["Layer 2"]["rx"]["bytes"] == 43
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["packets"] == 7
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["multicast"] == 7
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["bytes"] == 301
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["succeeded"] == 7
|
||||
assert res["7"]["stats"]["Layer 2"]["tx"]["errors"] == 0
|
||||
assert res["7"]["stats"]["IPv6"]["rx"]["packets"] == 1
|
||||
assert res["7"]["stats"]["IPv6"]["rx"]["bytes"] == 64
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["packets"] == 7
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["multicast"] == 7
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["bytes"] == 448
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["succeeded"] == 7
|
||||
assert res["7"]["stats"]["IPv6"]["tx"]["errors"] == 0
|
||||
|
||||
|
||||
def test_ifconfig_list_parser6():
|
||||
cmd_output = """
|
||||
ifconfig 7
|
||||
Iface 7 HWaddr: 3A:A4 Channel: 26 NID: 0x23 PHY: MR-FSK
|
||||
modulation index: 1 2-FSK symbol rate: 50 kHz FEC: none BW: 400kHz
|
||||
Long HWaddr: 22:68:31:23:59:F5:D2:38
|
||||
TX-Power: 0dBm State: IDLE max. Retrans.: 3 CSMA Retries: 4
|
||||
AUTOACK ACK_REQ CSMA L2-PDU:2022 MTU:1280 HL:64 RTR
|
||||
6LO IPHC
|
||||
Source address length: 8
|
||||
Link type: wireless
|
||||
inet6 addr: fe80::2068:3123:59f5:d238 scope: link VAL
|
||||
inet6 group: ff02::2
|
||||
inet6 group: ff02::1
|
||||
inet6 group: ff02::1:fff5:d238
|
||||
|
||||
Statistics for Layer 2
|
||||
RX packets 1 bytes 43
|
||||
TX packets 8 (Multicast: 8) bytes 344
|
||||
TX succeeded 8 errors 0
|
||||
Statistics for IPv6
|
||||
RX packets 1 bytes 64
|
||||
TX packets 8 (Multicast: 8) bytes 512
|
||||
TX succeeded 8 errors 0""" # noqa: E501
|
||||
parser = riotctrl_shell.netif.IfconfigListParser()
|
||||
res = parser.parse(cmd_output)
|
||||
# not much changed compared to test_ifconfig_list_parser4 and 5, so only
|
||||
# check mainly differences
|
||||
assert len(res) == 1
|
||||
assert "7" in res
|
||||
assert len(res["7"]["flags"]) == 7
|
||||
assert "AUTOACK" in res["7"]["flags"]
|
||||
assert "ACK_REQ" in res["7"]["flags"]
|
||||
assert "CSMA" in res["7"]["flags"]
|
||||
assert "RTR" in res["7"]["flags"]
|
||||
assert "6LO" in res["7"]["flags"]
|
||||
assert "IPHC" in res["7"]["flags"]
|
||||
assert "2-FSK" in res["7"]["flags"]
|
||||
assert "page" not in res["7"]
|
||||
assert res["7"]["phy"] == "MR-FSK"
|
||||
assert res["7"]["modulation_index"] == 1
|
||||
assert res["7"]["symbol_rate"] == "50 kHz"
|
||||
assert res["7"]["fec"] == "none"
|
||||
assert res["7"]["bw"] == "400kHz"
|
||||
assert "chip_rate" not in res["7"]
|
||||
assert "rate_mode" not in res["7"]
|
||||
assert "option" not in res["7"]
|
||||
assert "mcs" not in res["7"]
|
||||
assert len(res["7"]["ipv6_addrs"]) == 1
|
||||
assert len(res["7"]["ipv6_groups"]) == 3
|
||||
2
dist/pythonlibs/riotctrl_shell/tox.ini
vendored
2
dist/pythonlibs/riotctrl_shell/tox.ini
vendored
@ -12,7 +12,7 @@ deps =
|
||||
pytest
|
||||
-rrequirements.txt
|
||||
commands =
|
||||
pytest -v
|
||||
pytest -v --doctest-modules
|
||||
|
||||
[testenv:flake8]
|
||||
deps = flake8
|
||||
|
||||
@ -540,6 +540,7 @@ static void _netif_list(netif_t *iface)
|
||||
_print_iface_name(iface);
|
||||
printf(" ");
|
||||
|
||||
/* XXX divide options and flags by at least two spaces! */
|
||||
res = netif_get_opt(iface, NETOPT_ADDRESS, 0, hwaddr, sizeof(hwaddr));
|
||||
if (res >= 0) {
|
||||
char hwaddr_str[res * 3];
|
||||
@ -678,6 +679,7 @@ static void _netif_list(netif_t *iface)
|
||||
line_thresh++;
|
||||
}
|
||||
#endif
|
||||
/* XXX divide options and flags by at least two spaces! */
|
||||
line_thresh = _newline(0U, line_thresh);
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_PROMISCUOUSMODE, "PROMISC ",
|
||||
line_thresh);
|
||||
@ -694,19 +696,20 @@ static void _netif_list(netif_t *iface)
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_CSMA, "CSMA ",
|
||||
line_thresh);
|
||||
line_thresh += _LINE_THRESHOLD + 1; /* enforce linebreak after this option */
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_AUTOCCA, "AUTOCCA ",
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_AUTOCCA, "AUTOCCA ",
|
||||
line_thresh);
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_IQ_INVERT, "IQ_INVERT ",
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_IQ_INVERT, "IQ_INVERT ",
|
||||
line_thresh);
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_SINGLE_RECEIVE, "RX_SINGLE ",
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_SINGLE_RECEIVE, "RX_SINGLE ",
|
||||
line_thresh);
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_CHANNEL_HOP, "CHAN_HOP ",
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_CHANNEL_HOP, "CHAN_HOP ",
|
||||
line_thresh);
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_OTAA, "OTAA ",
|
||||
line_thresh = _netif_list_flag(iface, NETOPT_OTAA, "OTAA ",
|
||||
line_thresh);
|
||||
/* XXX divide options and flags by at least two spaces! */
|
||||
res = netif_get_opt(iface, NETOPT_MAX_PDU_SIZE, 0, &u16, sizeof(u16));
|
||||
if (res > 0) {
|
||||
printf("L2-PDU:%" PRIu16 " ", u16);
|
||||
printf("L2-PDU:%" PRIu16 " ", u16);
|
||||
line_thresh++;
|
||||
}
|
||||
#ifdef MODULE_GNRC_IPV6
|
||||
@ -737,6 +740,7 @@ static void _netif_list(netif_t *iface)
|
||||
#endif
|
||||
#endif
|
||||
res = netif_get_opt(iface, NETOPT_SRC_LEN, 0, &u16, sizeof(u16));
|
||||
/* XXX divide options and flags by at least two spaces before this line! */
|
||||
if (res >= 0) {
|
||||
printf("Source address length: %" PRIu16, u16);
|
||||
line_thresh++;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user