1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-12-25 06:23:53 +01:00

riotctrl_shell: provide pktbuf interaction

This commit is contained in:
Martine S. Lenders 2020-06-24 14:23:33 +02:00
parent 98219e6746
commit d11b5b2b54
No known key found for this signature in database
GPG Key ID: CCD317364F63286F
2 changed files with 143 additions and 0 deletions

View File

@ -74,6 +74,96 @@ class GNRCICMPv6EchoParser(ShellInteractionParser):
return None
class GNRCPktbufStatsResults(dict):
def is_empty(self):
"""
Returns true if the packet buffer stats indicate that the packet buffer
is empty
"""
if "first_byte" not in self or \
"first_unused" not in self or \
"start" not in self["first_unused"] or \
"size" not in self["first_unused"]:
raise ValueError("{} has no items 'first_byte' or 'first_unused' "
"or 'first_unused' has no items 'start' or 'size'"
.format(self))
else:
return (self["first_byte"] == self["first_unused"]["start"]) and \
(self["size"] == self["first_unused"]["size"])
def fullest_capacity(self):
"""
Returns the packet buffer usage at its fullest capacity when the
command was called
"""
if "last_byte_used" not in self or "size" not in self:
raise ValueError("{} has no items 'last_byte_used' or 'size'"
.format(self))
else:
return self["last_byte_used"] / self["size"]
class GNRCPktbufStatsParser(ShellInteractionParser):
@staticmethod
def _init_res(first_byte, last_byte, size):
return GNRCPktbufStatsResults((
("first_byte", int(first_byte, base=16)),
("last_byte", int(last_byte, base=16)),
("size", int(size)),
))
@staticmethod
def _set_last_byte_used(res, last_byte_used):
res["last_byte_used"] = int(last_byte_used)
@staticmethod
def _set_first_unused(res, first_unused):
res["first_unused"] = {
"start": int(first_unused["start"], base=16)
}
if first_unused["next"] is not None:
res["first_unused"]["next"] = int(first_unused["next"], base=16)
if "next" not in res["first_unused"] or \
not res["first_unused"]["next"]:
res["first_unused"]["next"] = None
res["first_unused"]["size"] = int(first_unused["size"])
def parse(self, cmd_output):
c_init1 = re.compile(r"packet buffer: "
r"first byte: 0x(?P<first_byte>[0-9A-Fa-f]+), "
r"last byte: 0x(?P<last_byte>[0-9A-Fa-f]+) "
r"\(size: +(?P<size>\d+)\)")
c_init2 = re.compile(r" position of last byte used: (\d+)")
c_unused = re.compile(r"~ unused: 0x(?P<start>[0-9A-Fa-f]+) "
r"\(next: (0x(?P<next>[0-9A-Fa-f]+)|\(nil\)), "
r"size: +(?P<size>\d+)\) ~")
res = None
for line in cmd_output.splitlines():
if res is None:
m = c_init1.match(line)
if m is not None:
res = self._init_res(**m.groupdict())
# no sense in further parsing if we did not find the first line
# yet. If we found it just continue parsing with next line
continue
elif "last_byte_used" not in res:
m = c_init2.match(line)
if m is not None:
self._set_last_byte_used(res, m.group(1))
continue
elif "first_unused" not in res:
m = c_unused.match(line)
if m is not None:
self._set_first_unused(res, m.groupdict())
if res is not None and "last_byte_used" not in res:
# Could not parse second line of header => something went wrong
return None
else:
# Just return res (might be also None if first line of header was
# not found)
return res
# ==== ShellInteractions ====
class GNRCICMPv6Echo(ShellInteraction):
@ -91,3 +181,9 @@ class GNRCICMPv6Echo(ShellInteraction):
# wait a second longer than all pings
cmd_timeout = ((timeout / 1000) * count) + 1
return self.cmd(cmd, timeout=cmd_timeout, async_=async_)
class GNRCPktbufStats(ShellInteraction):
@ShellInteraction.check_term
def pktbuf_stats(self, timeout=-1, async_=False):
return self.cmd("pktbuf", timeout=timeout, async_=async_)

View File

@ -54,3 +54,50 @@ def test_ping6_parser_missing_rtts():
--- ::1 PING statistics ---
3 packets transmitted, 3 packets received, 0% packet loss""")
assert ping_res is None
def test_pktbuf():
rc = init_ctrl()
si = riotctrl_shell.gnrc.GNRCPktbufStats(rc)
res = si.pktbuf_stats()
# mock just returns last input
assert res == "pktbuf"
def test_pktbuf_parser_success_empty():
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
pktbuf_res = parser.parse("""
packet buffer: first byte: 0x5660dce0, last byte: 0x5660fce0 (size: 8192)
position of last byte used: 1792
~ unused: 0x5660dce0 (next: (nil), size: 8192) ~""")
assert pktbuf_res is not None
assert pktbuf_res["first_byte"] > 0
assert "start" in pktbuf_res["first_unused"]
assert pktbuf_res.is_empty()
def test_pktbuf_parser_success_not_empty():
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
pktbuf_res = parser.parse("""
packet buffer: first byte: 0x5660dce0, last byte: 0x5660fce0 (size: 8192)
position of last byte used: 1792
~ unused: 0x5660de00 (next: (nil), size: 7904) ~""")
assert pktbuf_res is not None
assert pktbuf_res["first_byte"] > 0
assert "start" in pktbuf_res["first_unused"]
assert not pktbuf_res.is_empty()
def test_pktbuf_parser_empty():
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
pktbuf_res = parser.parse("")
assert pktbuf_res is None
def test_pktbuf_parser_2nd_header_not_found():
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
pktbuf_res = parser.parse(
"packet buffer: first byte: 0x5668ace0, last byte: 0x5668cce0 "
"(size: 8192)"
)
assert pktbuf_res is None