diff --git a/dist/pythonlibs/riotctrl_shell/gnrc.py b/dist/pythonlibs/riotctrl_shell/gnrc.py index d0c97dc63c..31f8839b46 100644 --- a/dist/pythonlibs/riotctrl_shell/gnrc.py +++ b/dist/pythonlibs/riotctrl_shell/gnrc.py @@ -137,6 +137,221 @@ class GNRCICMPv6EchoParser(ShellInteractionParser): return res +class GNRCIPv6NIBNeighShowParser(ShellInteractionParser): + def __init__(self): + self.c_neigh = re.compile(r"(?P[0-9a-f:]+)\s+" + r"dev\s+#(?P\d+)\s+" + r"lladdr\s+(?P[0-9A-F:]+)?" + r"(\s+(?Prouter))?" + r"(\s+((?P[A-Z_]+)|-))?" + r"(\s+(?P[A-Z_]+))?$") + + def parse(self, cmd_output): + """ + Parses output of GNRCIPv6NIB::nib_neigh_show() + + >>> parser = GNRCIPv6NIBNeighShowParser() + >>> res = parser.parse("2001:db8::1 dev #5 lladdr AB:CD:EF:01:23:45 " + ... "router REACHABLE REGISTERED\\n" + ... "2001:db8::2 dev #5 lladdr -") + >>> len(res) + 2 + >>> sorted(res[0]) + ['ar_state', 'iface', 'ipv6_addr', 'l2addr', 'nud_state', 'router'] + >>> res[0]["ipv6_addr"] + '2001:db8::1' + >>> res[0]["iface"] + 5 + >>> res[0]["l2addr"] + 'AB:CD:EF:01:23:45' + >>> res[0]["router"] + True + >>> res[0]["nud_state"] + 'REACHABLE' + >>> res[0]["ar_state"] + 'REGISTERED' + >>> sorted(res[1]) + ['iface', 'ipv6_addr'] + >>> res[1]["ipv6_addr"] + '2001:db8::2' + >>> res[1]["iface"] + 5 + """ + res = [] + for line in cmd_output.splitlines(): + m = self.c_neigh.search(line) + if m is not None: + res.append({k: v for k, v in m.groupdict().items() + if v is not None}) + res[-1]["iface"] = int(res[-1]["iface"]) + if "router" in res[-1]: + res[-1]["router"] = True + return res + + +class GNRCIPv6NIBPrefixShowParser(ShellInteractionParser): + def __init__(self): + self.c_prefix = re.compile(r"(?P[0-9a-f:]+)/" + r"(?P\d+)\s+" + r"dev\s+#(?P\d+)" + r"(\s+expires (?P\d+) sec)?" + r"(\s+deprecates (?P\d+) sec)?$") + + def parse(self, cmd_output): + """ + Parses output of GNRCIPv6NIB::nib_prefix_show() + + >>> parser = GNRCIPv6NIBPrefixShowParser() + >>> res = parser.parse("2001:db8::/64 dev #5 expires 4999 sec " + ... "deprecates 3999 sec\\n" + ... "2001:db8:1::/64 dev #6") + >>> len(res) + 2 + >>> sorted(res[0]) + ['iface', 'pref_sec', 'prefix', 'prefix_len', 'valid_sec'] + >>> res[0]["prefix"] + '2001:db8::' + >>> res[0]["prefix_len"] + 64 + >>> sorted(res[1]) + ['iface', 'prefix', 'prefix_len'] + """ + res = [] + for line in cmd_output.splitlines(): + m = self.c_prefix.search(line) + if m is not None: + pfx = {k: v for k, v in m.groupdict().items() if v is not None} + pfx["prefix_len"] = int(pfx["prefix_len"]) + pfx["iface"] = int(pfx["iface"]) + if "valid_sec" in pfx: + pfx["valid_sec"] = int(pfx["valid_sec"]) + if "pref_sec" in pfx: + pfx["pref_sec"] = int(pfx["pref_sec"]) + res.append(pfx) + return res + + +class GNRCIPv6NIBRouteShowParser(ShellInteractionParser): + def __init__(self): + self.c_route = re.compile(r"(?Pdefault(?P\*)?|" + r"(?P[0-9a-f:]+)/" + r"(?P\d+))\s+" + r"(via\s+(?P[0-9a-f:]+)\s+)?" + r"dev\s+#(?P\d+)$") + + def parse(self, cmd_output): + """ + Parses output of GNRCIPv6NIB::nib_route_show() + + >>> parser = GNRCIPv6NIBRouteShowParser() + >>> res = parser.parse("2001:db8::/64 dev #5\\n" + ... "2001:db8:1::/64 via fe80::1 dev #5\\n" + ... "default via fe80::2 dev #5\\n" + ... "default* via fe80::3 dev #6\\n") + >>> len(res) + 4 + >>> sorted(res[0]) + ['iface', 'route'] + >>> sorted(res[0]["route"]) + ['prefix', 'prefix_len'] + >>> res[0]["route"]["prefix"] + '2001:db8::' + >>> res[0]["route"]["prefix_len"] + 64 + >>> res[0]["iface"] + 5 + >>> sorted(res[1]) + ['iface', 'next_hop', 'route'] + >>> res[1]["route"]["prefix"] + '2001:db8:1::' + >>> res[1]["route"]["prefix_len"] + 64 + >>> res[1]["next_hop"] + 'fe80::1' + >>> res[1]["iface"] + 5 + >>> sorted(res[2]) + ['iface', 'next_hop', 'route'] + >>> sorted(res[2]["route"]) + ['default'] + >>> res[2]["route"]["default"] + True + >>> res[2]["next_hop"] + 'fe80::2' + >>> res[2]["iface"] + 5 + >>> sorted(res[3]) + ['iface', 'next_hop', 'route'] + >>> sorted(res[3]["route"]) + ['default', 'primary'] + >>> res[3]["route"]["default"] + True + >>> res[3]["route"]["primary"] + True + >>> res[3]["next_hop"] + 'fe80::3' + >>> res[3]["iface"] + 6 + """ + res = [] + for line in cmd_output.splitlines(): + m = self.c_route.search(line) + if m is not None: + fte = {k: v for k, v in m.groupdict().items() if v is not None} + fte['iface'] = int(fte['iface']) + if "prefix" in fte and fte["prefix"] is not None and \ + "prefix_len" in fte and fte["prefix_len"] is not None: + fte["route"] = {"prefix": fte["prefix"], + "prefix_len": int(fte["prefix_len"])} + elif fte["route"].startswith("default"): + fte["route"] = {"default": True} + else: + raise ValueError("Unexpected route value {}".format(fte)) + if "primary" in fte and fte["primary"] is not None: + fte["route"]["primary"] = True + if fte.get("next_hop") is None: + fte.pop("next_hop", None) + fte.pop("prefix", None) + fte.pop("prefix_len", None) + fte.pop("primary", None) + res.append(fte) + return res + + +class GNRCIPv6NIBABRShowParser(ShellInteractionParser): + def __init__(self): + self.c_abr = re.compile(r"(?P[0-9a-f:]+)\s+" + r"v(?P\d+)\s+expires\s+" + r"(?P\d+)min$") + + def parse(self, cmd_output): + """ + Parses output of GNRCIPv6NIB::nib_abr_show() + + >>> parser = GNRCIPv6NIBABRShowParser() + >>> res = parser.parse("2001:db8::abcd:ef01 v43 expires 1400min") + >>> len(res) + 1 + >>> sorted(res[0]) + ['addr', 'valid_min', 'version'] + >>> res[0]["addr"] + '2001:db8::abcd:ef01' + >>> res[0]["version"] + 43 + >>> res[0]["valid_min"] + 1400 + """ + res = [] + for line in cmd_output.splitlines(): + m = self.c_abr.search(line) + if m is not None: + abr = m.groupdict() + abr["version"] = int(abr["version"]) + abr["valid_min"] = int(abr["valid_min"]) + res.append(abr) + return res + + class GNRCPktbufStatsResults(dict): def is_empty(self): """ @@ -284,6 +499,102 @@ class GNRCICMPv6Echo(ShellInteraction): return self.cmd(cmd, timeout=cmd_timeout, async_=async_) +class GNRCIPv6NIB(ShellInteraction): + NEIGH = "neigh" + PREFIX = "prefix" + ROUTE = "route" + ABR = "abr" + + @ShellInteraction.check_term + def nib_cmd(self, cmd, args=None, timeout=-1, async_=False): + return self.cmd(self._create_cmd(cmd, args), + timeout=timeout, async_=async_) + + def nib_neigh_show(self, iface=None, timeout=-1, async_=False): + return self._nib_show(self.NEIGH, iface, timeout, async_) + + def nib_neigh_add(self, iface, ipv6_addr, l2addr=None, + timeout=-1, async_=False): + args = [iface, ipv6_addr] + if l2addr: + args.append(l2addr) + return self._nib_add(self.NEIGH, args, timeout, async_) + + def nib_neigh_del(self, iface, ipv6_addr, timeout=-1, async_=False): + return self._nib_del(self.NEIGH, iface, ipv6_addr, timeout, async_) + + def nib_prefix_show(self, iface=None, timeout=-1, async_=False): + return self._nib_show(self.PREFIX, iface, timeout, async_) + + def nib_prefix_add(self, iface, prefix, valid_sec=None, pref_sec=None, + timeout=-1, async_=False): + if valid_sec is None and pref_sec is not None: + raise ValueError("pref_sec provided with no valid_sec") + args = [iface, prefix] + if valid_sec: + args.append(int(valid_sec)) + if pref_sec: + args.append(int(pref_sec)) + return self._nib_add(self.PREFIX, args, timeout, async_) + + def nib_prefix_del(self, iface, prefix, timeout=-1, async_=False): + return self._nib_del(self.PREFIX, iface, prefix, timeout, async_) + + def nib_route_show(self, iface=None, timeout=-1, async_=False): + return self._nib_show(self.ROUTE, iface, timeout, async_) + + def nib_route_add(self, iface, prefix, next_hop, ltime_sec=None, + timeout=-1, async_=False): + args = [iface, prefix, next_hop] + if ltime_sec: + args.append(int(ltime_sec)) + return self._nib_add(self.ROUTE, args, timeout, async_) + + def nib_route_del(self, iface, prefix, timeout=-1, async_=False): + return self._nib_del(self.ROUTE, iface, prefix, timeout, async_) + + def nib_abr_show(self, timeout=-1, async_=False): + return self._nib_show(self.ABR, timeout=timeout, async_=async_) + + def nib_abr_add(self, ipv6_addr, timeout=-1, async_=False): + args = [ipv6_addr] + return self._nib_add(self.ABR, args, timeout, async_) + + def nib_abr_del(self, ipv6_addr, timeout=-1, async_=False): + args = ["del", ipv6_addr] + return self._nib_error_cmd(self.ABR, args, timeout, async_) + + @staticmethod + def _create_cmd(cmd, args=None): + cmd_str = "nib {cmd}".format(cmd=cmd) + if args is not None: + cmd_str += " {args}".format(args=" ".join(str(a) for a in args)) + return cmd_str + + @ShellInteraction.check_term + def _nib_error_cmd(self, cmd, args=None, timeout=-1, async_=False): + cmd_str = self._create_cmd(cmd, args) + res = self.cmd(cmd_str, timeout=timeout, async_=async_) + # nib manipulation commands only show command string on success + if res.strip() != cmd_str: + raise RuntimeError(repr(res.strip()) + "!=" + repr(cmd_str)) + return res + + def _nib_show(self, view, iface=None, timeout=-1, async_=False): + args = ["show"] + if iface: + args.append(iface) + return self.nib_cmd(view, args, timeout, async_) + + def _nib_add(self, view, args, timeout=-1, async_=False): + args.insert(0, "add") + return self._nib_error_cmd(view, args, timeout, async_) + + def _nib_del(self, view, iface, item, timeout=-1, async_=False): + args = ["del", iface, item] + return self._nib_error_cmd(view, args, timeout, async_) + + class GNRCPktbufStats(ShellInteraction): @ShellInteraction.check_term def pktbuf_stats(self, timeout=-1, async_=False): diff --git a/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py b/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py index 87b095ae27..ce3501944e 100644 --- a/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py +++ b/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py @@ -4,6 +4,8 @@ # General Public License v2.1. See the file LICENSE in the top level # directory for more details. +import pytest + import riotctrl_shell.gnrc from .common import init_ctrl @@ -71,6 +73,196 @@ def test_ping6_parser_missing_rtts(): assert len(ping_res["replies"]) == 3 +@pytest.mark.parametrize( + "cmd,args,expected", + [("neigh", None, "nib neigh"), + ("route", ("show",), "nib route show"), + ("snafu", ("foobar", "hello", "world"), "nib snafu foobar hello world")] +) +def test_nib_cmd(cmd, args, expected): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_cmd(cmd, args) + # mock just returns last input + assert res == expected + + +@pytest.mark.parametrize( + "method,iface,expected", + [("nib_neigh_show", None, "nib neigh show"), + ("nib_neigh_show", 5, "nib neigh show 5"), + ("nib_prefix_show", None, "nib prefix show"), + ("nib_prefix_show", 5, "nib prefix show 5"), + ("nib_route_show", None, "nib route show"), + ("nib_route_show", 5, "nib route show 5")] +) +def test_nib_show(method, iface, expected): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = getattr(si, method)(iface) + # mock just returns last input + assert res == expected + + +def test_nib_abr_show(): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_abr_show() + # mock just returns last input + assert res == "nib abr show" + + +@pytest.mark.parametrize( + "method,iface,arg,expected", + [("nib_neigh_del", 42, "dead:beef::1", "nib neigh del 42 dead:beef::1"), + ("nib_prefix_del", 42, "dead:beef::/64", + "nib prefix del 42 dead:beef::/64"), + ("nib_route_del", 42, "::", "nib route del 42 ::")] +) +def test_nib_del(method, iface, arg, expected): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = getattr(si, method)(iface, arg) + # mock just returns last input + assert res == expected + + +@pytest.mark.parametrize( + "method,iface,arg", + [("nib_neigh_del", 42, "not an address"), + ("nib_prefix_del", 42, "not an address"), + ("nib_route_del", 42, "not an address")] +) +def test_nib_del_error(method, iface, arg): + rc = init_ctrl(output="This is representing some really unformatted error") + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + with pytest.raises(RuntimeError): + getattr(si, method)(iface, arg) + + +def test_nib_abr_del(): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_abr_del("c0ff::ee") + # mock just returns last input + assert res == "nib abr del c0ff::ee" + + +def test_nib_abr_del_err(): + rc = init_ctrl(output="This is representing some really unformatted error") + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + with pytest.raises(RuntimeError): + si.nib_abr_del("c0ff::ee") + + +@pytest.mark.parametrize( + "l2addr,expected", + [(None, "nib neigh add 12 affe::1"), + ("ab:cd:ef", "nib neigh add 12 affe::1 ab:cd:ef")] +) +def test_nib_neigh_add(l2addr, expected): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_neigh_add(12, "affe::1", l2addr) + assert res == expected + + +@pytest.mark.parametrize( + "valid_sec,pref_sec,expected", + [(None, None, "nib prefix add 12 dead:c0ff:ee::/45"), + (120, None, "nib prefix add 12 dead:c0ff:ee::/45 120"), + (120, 60, "nib prefix add 12 dead:c0ff:ee::/45 120 60")] +) +def test_nib_prefix_add(valid_sec, pref_sec, expected): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_prefix_add(12, "dead:c0ff:ee::/45", valid_sec, pref_sec) + assert res == expected + + +@pytest.mark.parametrize( + "valid_sec,pref_sec", + [(None, None), (120, None), (120, 60)] +) +def test_nib_prefix_add_runtime_error(valid_sec, pref_sec): + rc = init_ctrl(output="This is representing some really unformatted error") + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + with pytest.raises(RuntimeError): + si.nib_prefix_add(12, "dead:c0ff:ee::/45", valid_sec, pref_sec) + + +def test_nib_prefix_add_value_error(): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + with pytest.raises(ValueError): + si.nib_prefix_add(12, "dead:c0ff:ee::/45", pref_sec=60) + + +@pytest.mark.parametrize( + "route,ltime_sec,expected", + [("default", None, "nib route add 12 default fe80::1"), + ("dead:c0ff:ee::/45", None, "nib route add 12 dead:c0ff:ee::/45 fe80::1"), + ("default", 60, "nib route add 12 default fe80::1 60"), + ("dead:c0ff:ee::/45", 120, + "nib route add 12 dead:c0ff:ee::/45 fe80::1 120")] +) +def test_nib_route_add(route, ltime_sec, expected): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_route_add(12, route, "fe80::1", ltime_sec) + assert res == expected + + +def test_nib_abr_add(): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + res = si.nib_abr_add("dead:c0ff:ee::1") + assert res == "nib abr add dead:c0ff:ee::1" + + +def test_nib_abr_add_error(): + rc = init_ctrl(output="This is representing some really unformatted error") + si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc) + with pytest.raises(RuntimeError): + si.nib_abr_add("dead:c0ff:ee::1") + + +@pytest.mark.parametrize( + "parser", + [riotctrl_shell.gnrc.GNRCIPv6NIBNeighShowParser, + riotctrl_shell.gnrc.GNRCIPv6NIBPrefixShowParser, + riotctrl_shell.gnrc.GNRCIPv6NIBRouteShowParser, + riotctrl_shell.gnrc.GNRCIPv6NIBABRShowParser] +) +def test_nib_parsers_empty(parser): + """ + Most NIB parser tests were already done in doctests. + Just ensure with this test, that empty input generates an empty list + """ + res = parser().parse("") + assert res == [] + + +@pytest.mark.parametrize( + "parser", + [riotctrl_shell.gnrc.GNRCIPv6NIBNeighShowParser, + riotctrl_shell.gnrc.GNRCIPv6NIBPrefixShowParser, + riotctrl_shell.gnrc.GNRCIPv6NIBRouteShowParser, + riotctrl_shell.gnrc.GNRCIPv6NIBABRShowParser] +) +def test_nib_parsers_garbage(parser): + """ + Most NIB parser tests were already done in doctests. + Just ensure with this test, that garbage input generates an empty list + """ + res = parser().parse(""" + Lorem ipsum dolor sit amet, + consectetur adipiscing elit, + sed do eiusmod tempor incididunt ut labore + et dolore magna aliqua.""") + assert res == [] + + def test_pktbuf(): rc = init_ctrl() si = riotctrl_shell.gnrc.GNRCPktbufStats(rc)