riotctrl_shell.gnrc: provide ipv6_nib interactions and parsers

This commit is contained in:
Martine S. Lenders 2020-07-08 16:47:57 +02:00
parent 4dc82bce70
commit 94c8b19d68
No known key found for this signature in database
GPG Key ID: CCD317364F63286F
2 changed files with 503 additions and 0 deletions

View File

@ -137,6 +137,221 @@ class GNRCICMPv6EchoParser(ShellInteractionParser):
return res return res
class GNRCIPv6NIBNeighShowParser(ShellInteractionParser):
def __init__(self):
self.c_neigh = re.compile(r"(?P<ipv6_addr>[0-9a-f:]+)\s+"
r"dev\s+#(?P<iface>\d+)\s+"
r"lladdr\s+(?P<l2addr>[0-9A-F:]+)?"
r"(\s+(?P<router>router))?"
r"(\s+((?P<nud_state>[A-Z_]+)|-))?"
r"(\s+(?P<ar_state>[A-Z_]+))?$")
def parse(self, cmd_output):
"""
Parses output of GNRCIPv6NIB::nib_neigh_show()
>>> parser = GNRCIPv6NIBNeighShowParser()
>>> res = parser.parse("2001:db8::1 dev #5 lladdr AB:CD:EF:01:23:45 "
... "router REACHABLE REGISTERED\\n"
... "2001:db8::2 dev #5 lladdr -")
>>> len(res)
2
>>> sorted(res[0])
['ar_state', 'iface', 'ipv6_addr', 'l2addr', 'nud_state', 'router']
>>> res[0]["ipv6_addr"]
'2001:db8::1'
>>> res[0]["iface"]
5
>>> res[0]["l2addr"]
'AB:CD:EF:01:23:45'
>>> res[0]["router"]
True
>>> res[0]["nud_state"]
'REACHABLE'
>>> res[0]["ar_state"]
'REGISTERED'
>>> sorted(res[1])
['iface', 'ipv6_addr']
>>> res[1]["ipv6_addr"]
'2001:db8::2'
>>> res[1]["iface"]
5
"""
res = []
for line in cmd_output.splitlines():
m = self.c_neigh.search(line)
if m is not None:
res.append({k: v for k, v in m.groupdict().items()
if v is not None})
res[-1]["iface"] = int(res[-1]["iface"])
if "router" in res[-1]:
res[-1]["router"] = True
return res
class GNRCIPv6NIBPrefixShowParser(ShellInteractionParser):
def __init__(self):
self.c_prefix = re.compile(r"(?P<prefix>[0-9a-f:]+)/"
r"(?P<prefix_len>\d+)\s+"
r"dev\s+#(?P<iface>\d+)"
r"(\s+expires (?P<valid_sec>\d+) sec)?"
r"(\s+deprecates (?P<pref_sec>\d+) sec)?$")
def parse(self, cmd_output):
"""
Parses output of GNRCIPv6NIB::nib_prefix_show()
>>> parser = GNRCIPv6NIBPrefixShowParser()
>>> res = parser.parse("2001:db8::/64 dev #5 expires 4999 sec "
... "deprecates 3999 sec\\n"
... "2001:db8:1::/64 dev #6")
>>> len(res)
2
>>> sorted(res[0])
['iface', 'pref_sec', 'prefix', 'prefix_len', 'valid_sec']
>>> res[0]["prefix"]
'2001:db8::'
>>> res[0]["prefix_len"]
64
>>> sorted(res[1])
['iface', 'prefix', 'prefix_len']
"""
res = []
for line in cmd_output.splitlines():
m = self.c_prefix.search(line)
if m is not None:
pfx = {k: v for k, v in m.groupdict().items() if v is not None}
pfx["prefix_len"] = int(pfx["prefix_len"])
pfx["iface"] = int(pfx["iface"])
if "valid_sec" in pfx:
pfx["valid_sec"] = int(pfx["valid_sec"])
if "pref_sec" in pfx:
pfx["pref_sec"] = int(pfx["pref_sec"])
res.append(pfx)
return res
class GNRCIPv6NIBRouteShowParser(ShellInteractionParser):
def __init__(self):
self.c_route = re.compile(r"(?P<route>default(?P<primary>\*)?|"
r"(?P<prefix>[0-9a-f:]+)/"
r"(?P<prefix_len>\d+))\s+"
r"(via\s+(?P<next_hop>[0-9a-f:]+)\s+)?"
r"dev\s+#(?P<iface>\d+)$")
def parse(self, cmd_output):
"""
Parses output of GNRCIPv6NIB::nib_route_show()
>>> parser = GNRCIPv6NIBRouteShowParser()
>>> res = parser.parse("2001:db8::/64 dev #5\\n"
... "2001:db8:1::/64 via fe80::1 dev #5\\n"
... "default via fe80::2 dev #5\\n"
... "default* via fe80::3 dev #6\\n")
>>> len(res)
4
>>> sorted(res[0])
['iface', 'route']
>>> sorted(res[0]["route"])
['prefix', 'prefix_len']
>>> res[0]["route"]["prefix"]
'2001:db8::'
>>> res[0]["route"]["prefix_len"]
64
>>> res[0]["iface"]
5
>>> sorted(res[1])
['iface', 'next_hop', 'route']
>>> res[1]["route"]["prefix"]
'2001:db8:1::'
>>> res[1]["route"]["prefix_len"]
64
>>> res[1]["next_hop"]
'fe80::1'
>>> res[1]["iface"]
5
>>> sorted(res[2])
['iface', 'next_hop', 'route']
>>> sorted(res[2]["route"])
['default']
>>> res[2]["route"]["default"]
True
>>> res[2]["next_hop"]
'fe80::2'
>>> res[2]["iface"]
5
>>> sorted(res[3])
['iface', 'next_hop', 'route']
>>> sorted(res[3]["route"])
['default', 'primary']
>>> res[3]["route"]["default"]
True
>>> res[3]["route"]["primary"]
True
>>> res[3]["next_hop"]
'fe80::3'
>>> res[3]["iface"]
6
"""
res = []
for line in cmd_output.splitlines():
m = self.c_route.search(line)
if m is not None:
fte = {k: v for k, v in m.groupdict().items() if v is not None}
fte['iface'] = int(fte['iface'])
if "prefix" in fte and fte["prefix"] is not None and \
"prefix_len" in fte and fte["prefix_len"] is not None:
fte["route"] = {"prefix": fte["prefix"],
"prefix_len": int(fte["prefix_len"])}
elif fte["route"].startswith("default"):
fte["route"] = {"default": True}
else:
raise ValueError("Unexpected route value {}".format(fte))
if "primary" in fte and fte["primary"] is not None:
fte["route"]["primary"] = True
if fte.get("next_hop") is None:
fte.pop("next_hop", None)
fte.pop("prefix", None)
fte.pop("prefix_len", None)
fte.pop("primary", None)
res.append(fte)
return res
class GNRCIPv6NIBABRShowParser(ShellInteractionParser):
def __init__(self):
self.c_abr = re.compile(r"(?P<addr>[0-9a-f:]+)\s+"
r"v(?P<version>\d+)\s+expires\s+"
r"(?P<valid_min>\d+)min$")
def parse(self, cmd_output):
"""
Parses output of GNRCIPv6NIB::nib_abr_show()
>>> parser = GNRCIPv6NIBABRShowParser()
>>> res = parser.parse("2001:db8::abcd:ef01 v43 expires 1400min")
>>> len(res)
1
>>> sorted(res[0])
['addr', 'valid_min', 'version']
>>> res[0]["addr"]
'2001:db8::abcd:ef01'
>>> res[0]["version"]
43
>>> res[0]["valid_min"]
1400
"""
res = []
for line in cmd_output.splitlines():
m = self.c_abr.search(line)
if m is not None:
abr = m.groupdict()
abr["version"] = int(abr["version"])
abr["valid_min"] = int(abr["valid_min"])
res.append(abr)
return res
class GNRCPktbufStatsResults(dict): class GNRCPktbufStatsResults(dict):
def is_empty(self): def is_empty(self):
""" """
@ -284,6 +499,102 @@ class GNRCICMPv6Echo(ShellInteraction):
return self.cmd(cmd, timeout=cmd_timeout, async_=async_) return self.cmd(cmd, timeout=cmd_timeout, async_=async_)
class GNRCIPv6NIB(ShellInteraction):
NEIGH = "neigh"
PREFIX = "prefix"
ROUTE = "route"
ABR = "abr"
@ShellInteraction.check_term
def nib_cmd(self, cmd, args=None, timeout=-1, async_=False):
return self.cmd(self._create_cmd(cmd, args),
timeout=timeout, async_=async_)
def nib_neigh_show(self, iface=None, timeout=-1, async_=False):
return self._nib_show(self.NEIGH, iface, timeout, async_)
def nib_neigh_add(self, iface, ipv6_addr, l2addr=None,
timeout=-1, async_=False):
args = [iface, ipv6_addr]
if l2addr:
args.append(l2addr)
return self._nib_add(self.NEIGH, args, timeout, async_)
def nib_neigh_del(self, iface, ipv6_addr, timeout=-1, async_=False):
return self._nib_del(self.NEIGH, iface, ipv6_addr, timeout, async_)
def nib_prefix_show(self, iface=None, timeout=-1, async_=False):
return self._nib_show(self.PREFIX, iface, timeout, async_)
def nib_prefix_add(self, iface, prefix, valid_sec=None, pref_sec=None,
timeout=-1, async_=False):
if valid_sec is None and pref_sec is not None:
raise ValueError("pref_sec provided with no valid_sec")
args = [iface, prefix]
if valid_sec:
args.append(int(valid_sec))
if pref_sec:
args.append(int(pref_sec))
return self._nib_add(self.PREFIX, args, timeout, async_)
def nib_prefix_del(self, iface, prefix, timeout=-1, async_=False):
return self._nib_del(self.PREFIX, iface, prefix, timeout, async_)
def nib_route_show(self, iface=None, timeout=-1, async_=False):
return self._nib_show(self.ROUTE, iface, timeout, async_)
def nib_route_add(self, iface, prefix, next_hop, ltime_sec=None,
timeout=-1, async_=False):
args = [iface, prefix, next_hop]
if ltime_sec:
args.append(int(ltime_sec))
return self._nib_add(self.ROUTE, args, timeout, async_)
def nib_route_del(self, iface, prefix, timeout=-1, async_=False):
return self._nib_del(self.ROUTE, iface, prefix, timeout, async_)
def nib_abr_show(self, timeout=-1, async_=False):
return self._nib_show(self.ABR, timeout=timeout, async_=async_)
def nib_abr_add(self, ipv6_addr, timeout=-1, async_=False):
args = [ipv6_addr]
return self._nib_add(self.ABR, args, timeout, async_)
def nib_abr_del(self, ipv6_addr, timeout=-1, async_=False):
args = ["del", ipv6_addr]
return self._nib_error_cmd(self.ABR, args, timeout, async_)
@staticmethod
def _create_cmd(cmd, args=None):
cmd_str = "nib {cmd}".format(cmd=cmd)
if args is not None:
cmd_str += " {args}".format(args=" ".join(str(a) for a in args))
return cmd_str
@ShellInteraction.check_term
def _nib_error_cmd(self, cmd, args=None, timeout=-1, async_=False):
cmd_str = self._create_cmd(cmd, args)
res = self.cmd(cmd_str, timeout=timeout, async_=async_)
# nib manipulation commands only show command string on success
if res.strip() != cmd_str:
raise RuntimeError(repr(res.strip()) + "!=" + repr(cmd_str))
return res
def _nib_show(self, view, iface=None, timeout=-1, async_=False):
args = ["show"]
if iface:
args.append(iface)
return self.nib_cmd(view, args, timeout, async_)
def _nib_add(self, view, args, timeout=-1, async_=False):
args.insert(0, "add")
return self._nib_error_cmd(view, args, timeout, async_)
def _nib_del(self, view, iface, item, timeout=-1, async_=False):
args = ["del", iface, item]
return self._nib_error_cmd(view, args, timeout, async_)
class GNRCPktbufStats(ShellInteraction): class GNRCPktbufStats(ShellInteraction):
@ShellInteraction.check_term @ShellInteraction.check_term
def pktbuf_stats(self, timeout=-1, async_=False): def pktbuf_stats(self, timeout=-1, async_=False):

View File

@ -4,6 +4,8 @@
# General Public License v2.1. See the file LICENSE in the top level # General Public License v2.1. See the file LICENSE in the top level
# directory for more details. # directory for more details.
import pytest
import riotctrl_shell.gnrc import riotctrl_shell.gnrc
from .common import init_ctrl from .common import init_ctrl
@ -71,6 +73,196 @@ def test_ping6_parser_missing_rtts():
assert len(ping_res["replies"]) == 3 assert len(ping_res["replies"]) == 3
@pytest.mark.parametrize(
"cmd,args,expected",
[("neigh", None, "nib neigh"),
("route", ("show",), "nib route show"),
("snafu", ("foobar", "hello", "world"), "nib snafu foobar hello world")]
)
def test_nib_cmd(cmd, args, expected):
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_cmd(cmd, args)
# mock just returns last input
assert res == expected
@pytest.mark.parametrize(
"method,iface,expected",
[("nib_neigh_show", None, "nib neigh show"),
("nib_neigh_show", 5, "nib neigh show 5"),
("nib_prefix_show", None, "nib prefix show"),
("nib_prefix_show", 5, "nib prefix show 5"),
("nib_route_show", None, "nib route show"),
("nib_route_show", 5, "nib route show 5")]
)
def test_nib_show(method, iface, expected):
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = getattr(si, method)(iface)
# mock just returns last input
assert res == expected
def test_nib_abr_show():
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_abr_show()
# mock just returns last input
assert res == "nib abr show"
@pytest.mark.parametrize(
"method,iface,arg,expected",
[("nib_neigh_del", 42, "dead:beef::1", "nib neigh del 42 dead:beef::1"),
("nib_prefix_del", 42, "dead:beef::/64",
"nib prefix del 42 dead:beef::/64"),
("nib_route_del", 42, "::", "nib route del 42 ::")]
)
def test_nib_del(method, iface, arg, expected):
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = getattr(si, method)(iface, arg)
# mock just returns last input
assert res == expected
@pytest.mark.parametrize(
"method,iface,arg",
[("nib_neigh_del", 42, "not an address"),
("nib_prefix_del", 42, "not an address"),
("nib_route_del", 42, "not an address")]
)
def test_nib_del_error(method, iface, arg):
rc = init_ctrl(output="This is representing some really unformatted error")
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
with pytest.raises(RuntimeError):
getattr(si, method)(iface, arg)
def test_nib_abr_del():
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_abr_del("c0ff::ee")
# mock just returns last input
assert res == "nib abr del c0ff::ee"
def test_nib_abr_del_err():
rc = init_ctrl(output="This is representing some really unformatted error")
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
with pytest.raises(RuntimeError):
si.nib_abr_del("c0ff::ee")
@pytest.mark.parametrize(
"l2addr,expected",
[(None, "nib neigh add 12 affe::1"),
("ab:cd:ef", "nib neigh add 12 affe::1 ab:cd:ef")]
)
def test_nib_neigh_add(l2addr, expected):
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_neigh_add(12, "affe::1", l2addr)
assert res == expected
@pytest.mark.parametrize(
"valid_sec,pref_sec,expected",
[(None, None, "nib prefix add 12 dead:c0ff:ee::/45"),
(120, None, "nib prefix add 12 dead:c0ff:ee::/45 120"),
(120, 60, "nib prefix add 12 dead:c0ff:ee::/45 120 60")]
)
def test_nib_prefix_add(valid_sec, pref_sec, expected):
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_prefix_add(12, "dead:c0ff:ee::/45", valid_sec, pref_sec)
assert res == expected
@pytest.mark.parametrize(
"valid_sec,pref_sec",
[(None, None), (120, None), (120, 60)]
)
def test_nib_prefix_add_runtime_error(valid_sec, pref_sec):
rc = init_ctrl(output="This is representing some really unformatted error")
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
with pytest.raises(RuntimeError):
si.nib_prefix_add(12, "dead:c0ff:ee::/45", valid_sec, pref_sec)
def test_nib_prefix_add_value_error():
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
with pytest.raises(ValueError):
si.nib_prefix_add(12, "dead:c0ff:ee::/45", pref_sec=60)
@pytest.mark.parametrize(
"route,ltime_sec,expected",
[("default", None, "nib route add 12 default fe80::1"),
("dead:c0ff:ee::/45", None, "nib route add 12 dead:c0ff:ee::/45 fe80::1"),
("default", 60, "nib route add 12 default fe80::1 60"),
("dead:c0ff:ee::/45", 120,
"nib route add 12 dead:c0ff:ee::/45 fe80::1 120")]
)
def test_nib_route_add(route, ltime_sec, expected):
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_route_add(12, route, "fe80::1", ltime_sec)
assert res == expected
def test_nib_abr_add():
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
res = si.nib_abr_add("dead:c0ff:ee::1")
assert res == "nib abr add dead:c0ff:ee::1"
def test_nib_abr_add_error():
rc = init_ctrl(output="This is representing some really unformatted error")
si = riotctrl_shell.gnrc.GNRCIPv6NIB(rc)
with pytest.raises(RuntimeError):
si.nib_abr_add("dead:c0ff:ee::1")
@pytest.mark.parametrize(
"parser",
[riotctrl_shell.gnrc.GNRCIPv6NIBNeighShowParser,
riotctrl_shell.gnrc.GNRCIPv6NIBPrefixShowParser,
riotctrl_shell.gnrc.GNRCIPv6NIBRouteShowParser,
riotctrl_shell.gnrc.GNRCIPv6NIBABRShowParser]
)
def test_nib_parsers_empty(parser):
"""
Most NIB parser tests were already done in doctests.
Just ensure with this test, that empty input generates an empty list
"""
res = parser().parse("")
assert res == []
@pytest.mark.parametrize(
"parser",
[riotctrl_shell.gnrc.GNRCIPv6NIBNeighShowParser,
riotctrl_shell.gnrc.GNRCIPv6NIBPrefixShowParser,
riotctrl_shell.gnrc.GNRCIPv6NIBRouteShowParser,
riotctrl_shell.gnrc.GNRCIPv6NIBABRShowParser]
)
def test_nib_parsers_garbage(parser):
"""
Most NIB parser tests were already done in doctests.
Just ensure with this test, that garbage input generates an empty list
"""
res = parser().parse("""
Lorem ipsum dolor sit amet,
consectetur adipiscing elit,
sed do eiusmod tempor incididunt ut labore
et dolore magna aliqua.""")
assert res == []
def test_pktbuf(): def test_pktbuf():
rc = init_ctrl() rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCPktbufStats(rc) si = riotctrl_shell.gnrc.GNRCPktbufStats(rc)