diff --git a/dist/pythonlibs/riotctrl_shell/gnrc.py b/dist/pythonlibs/riotctrl_shell/gnrc.py index bf84cc675d..e127e9e190 100644 --- a/dist/pythonlibs/riotctrl_shell/gnrc.py +++ b/dist/pythonlibs/riotctrl_shell/gnrc.py @@ -74,6 +74,96 @@ class GNRCICMPv6EchoParser(ShellInteractionParser): return None +class GNRCPktbufStatsResults(dict): + def is_empty(self): + """ + Returns true if the packet buffer stats indicate that the packet buffer + is empty + """ + if "first_byte" not in self or \ + "first_unused" not in self or \ + "start" not in self["first_unused"] or \ + "size" not in self["first_unused"]: + raise ValueError("{} has no items 'first_byte' or 'first_unused' " + "or 'first_unused' has no items 'start' or 'size'" + .format(self)) + else: + return (self["first_byte"] == self["first_unused"]["start"]) and \ + (self["size"] == self["first_unused"]["size"]) + + def fullest_capacity(self): + """ + Returns the packet buffer usage at its fullest capacity when the + command was called + """ + if "last_byte_used" not in self or "size" not in self: + raise ValueError("{} has no items 'last_byte_used' or 'size'" + .format(self)) + else: + return self["last_byte_used"] / self["size"] + + +class GNRCPktbufStatsParser(ShellInteractionParser): + @staticmethod + def _init_res(first_byte, last_byte, size): + return GNRCPktbufStatsResults(( + ("first_byte", int(first_byte, base=16)), + ("last_byte", int(last_byte, base=16)), + ("size", int(size)), + )) + + @staticmethod + def _set_last_byte_used(res, last_byte_used): + res["last_byte_used"] = int(last_byte_used) + + @staticmethod + def _set_first_unused(res, first_unused): + res["first_unused"] = { + "start": int(first_unused["start"], base=16) + } + if first_unused["next"] is not None: + res["first_unused"]["next"] = int(first_unused["next"], base=16) + if "next" not in res["first_unused"] or \ + not res["first_unused"]["next"]: + res["first_unused"]["next"] = None + res["first_unused"]["size"] = int(first_unused["size"]) + + def parse(self, cmd_output): + c_init1 = re.compile(r"packet buffer: " + r"first byte: 0x(?P[0-9A-Fa-f]+), " + r"last byte: 0x(?P[0-9A-Fa-f]+) " + r"\(size: +(?P\d+)\)") + c_init2 = re.compile(r" position of last byte used: (\d+)") + c_unused = re.compile(r"~ unused: 0x(?P[0-9A-Fa-f]+) " + r"\(next: (0x(?P[0-9A-Fa-f]+)|\(nil\)), " + r"size: +(?P\d+)\) ~") + res = None + for line in cmd_output.splitlines(): + if res is None: + m = c_init1.match(line) + if m is not None: + res = self._init_res(**m.groupdict()) + # no sense in further parsing if we did not find the first line + # yet. If we found it just continue parsing with next line + continue + elif "last_byte_used" not in res: + m = c_init2.match(line) + if m is not None: + self._set_last_byte_used(res, m.group(1)) + continue + elif "first_unused" not in res: + m = c_unused.match(line) + if m is not None: + self._set_first_unused(res, m.groupdict()) + if res is not None and "last_byte_used" not in res: + # Could not parse second line of header => something went wrong + return None + else: + # Just return res (might be also None if first line of header was + # not found) + return res + + # ==== ShellInteractions ==== class GNRCICMPv6Echo(ShellInteraction): @@ -91,3 +181,9 @@ class GNRCICMPv6Echo(ShellInteraction): # wait a second longer than all pings cmd_timeout = ((timeout / 1000) * count) + 1 return self.cmd(cmd, timeout=cmd_timeout, async_=async_) + + +class GNRCPktbufStats(ShellInteraction): + @ShellInteraction.check_term + def pktbuf_stats(self, timeout=-1, async_=False): + return self.cmd("pktbuf", timeout=timeout, async_=async_) diff --git a/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py b/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py index 1abdb68c40..624e645bf7 100644 --- a/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py +++ b/dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py @@ -54,3 +54,50 @@ def test_ping6_parser_missing_rtts(): --- ::1 PING statistics --- 3 packets transmitted, 3 packets received, 0% packet loss""") assert ping_res is None + + +def test_pktbuf(): + rc = init_ctrl() + si = riotctrl_shell.gnrc.GNRCPktbufStats(rc) + res = si.pktbuf_stats() + # mock just returns last input + assert res == "pktbuf" + + +def test_pktbuf_parser_success_empty(): + parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser() + pktbuf_res = parser.parse(""" +packet buffer: first byte: 0x5660dce0, last byte: 0x5660fce0 (size: 8192) + position of last byte used: 1792 +~ unused: 0x5660dce0 (next: (nil), size: 8192) ~""") + assert pktbuf_res is not None + assert pktbuf_res["first_byte"] > 0 + assert "start" in pktbuf_res["first_unused"] + assert pktbuf_res.is_empty() + + +def test_pktbuf_parser_success_not_empty(): + parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser() + pktbuf_res = parser.parse(""" +packet buffer: first byte: 0x5660dce0, last byte: 0x5660fce0 (size: 8192) + position of last byte used: 1792 +~ unused: 0x5660de00 (next: (nil), size: 7904) ~""") + assert pktbuf_res is not None + assert pktbuf_res["first_byte"] > 0 + assert "start" in pktbuf_res["first_unused"] + assert not pktbuf_res.is_empty() + + +def test_pktbuf_parser_empty(): + parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser() + pktbuf_res = parser.parse("") + assert pktbuf_res is None + + +def test_pktbuf_parser_2nd_header_not_found(): + parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser() + pktbuf_res = parser.parse( + "packet buffer: first byte: 0x5668ace0, last byte: 0x5668cce0 " + "(size: 8192)" + ) + assert pktbuf_res is None