dist/tools: provide tool to parse pktbuf output
This commit is contained in:
parent
d5a5b18da3
commit
697f16eab7
18
dist/tools/pktbuf-stats/README.md
vendored
Normal file
18
dist/tools/pktbuf-stats/README.md
vendored
Normal file
@ -0,0 +1,18 @@
|
||||
`pktbuf` parser
|
||||
===============
|
||||
|
||||
This parses the output of the command `pktbuf` provided by the module
|
||||
`gnrc_pktbuf_cmd` for the `gnrc_pktbuf_static` implementation.
|
||||
|
||||
The command expects the ELF file of the binary the `pktbuf` command was executed
|
||||
in to get some binary information on structs potentially stored in the packet
|
||||
buffer.
|
||||
|
||||
A packet buffer dump can also be provided as a file. If not provided, it is read
|
||||
from STDIN.
|
||||
|
||||
```sh
|
||||
./pktbuf-stats.py <ELF file> [<pktbuf-dump>]
|
||||
```
|
||||
|
||||
Requires GDB to be installed
|
||||
739
dist/tools/pktbuf-stats/pktbuf-stats.py
vendored
Executable file
739
dist/tools/pktbuf-stats/pktbuf-stats.py
vendored
Executable file
@ -0,0 +1,739 @@
|
||||
#! /usr/bin/env python3
|
||||
#
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
#
|
||||
# @author Martine Lenders <m.lenders@fu-berlin.de>
|
||||
|
||||
"""
|
||||
Script to parse the output of the `pktbuf` (provided by the `gnrc_pktbuf_cmd`
|
||||
pseudo-module) command and the `gnrc_pktbuf_stats()` function.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import collections
|
||||
import pprint
|
||||
import re
|
||||
import subprocess
|
||||
import os
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
|
||||
NETTYPES = {}
|
||||
PKTSNIP_STRUCT = {
|
||||
"name": "gnrc_pktsnip_t",
|
||||
"endianness": "<"
|
||||
}
|
||||
NETTYPE_STRUCTS = {
|
||||
"GNRC_NETTYPE_NETIF": {
|
||||
"name": "gnrc_netif_hdr_t",
|
||||
"endianness": "<",
|
||||
"subparser": 'gnrc_netif_parser'
|
||||
},
|
||||
"GNRC_NETTYPE_IPV6": {
|
||||
"name": "ipv6_hdr_t",
|
||||
"endianness": "!",
|
||||
"subparser": "ipv6_hdr_parser"
|
||||
},
|
||||
"GNRC_NETTYPE_IPV6_EXT": {
|
||||
"name": "ipv6_ext_t",
|
||||
"endianness": "!",
|
||||
},
|
||||
"GNRC_NETTYPE_ICMPV6": {
|
||||
"name": "icmpv6_hdr_t",
|
||||
"endianness": "!",
|
||||
},
|
||||
"GNRC_NETTYPE_UDP": {
|
||||
"name": "udp_hdr_t",
|
||||
"endianness": "!",
|
||||
},
|
||||
}
|
||||
S = {
|
||||
1: "B",
|
||||
2: "H",
|
||||
4: "L",
|
||||
8: "Q",
|
||||
}
|
||||
PROTNUMS = {n: name[8:] for name, n in vars(socket).items()
|
||||
if name.startswith("IPPROTO")}
|
||||
|
||||
|
||||
class NoDebugSymbolsError(Exception):
|
||||
"""
|
||||
Error to convey that no debugging symbols were found in the provided ELF
|
||||
file
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class LineNotFound(Exception):
|
||||
"""
|
||||
Error to convey that GDB output was not found
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class InconsistentPktbuf(Exception):
|
||||
"""
|
||||
Error to indicate inconsistencies in the packet buffer stats
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
def _exec_gdb(elffile, commands):
|
||||
ex = ["gdb", elffile]
|
||||
for c in commands:
|
||||
ex.extend(["-ex", c])
|
||||
ex.extend(["-ex", "quit"])
|
||||
p = subprocess.Popen(ex, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE,
|
||||
env={"CLANG": "en_US"})
|
||||
return p.stdout
|
||||
|
||||
|
||||
def _check_debug_symbols(line):
|
||||
if re.search(r"No debugging symbols found in ", line):
|
||||
raise NoDebugSymbolsError(line.strip().strip("()"))
|
||||
|
||||
|
||||
def _parse_struct_str(members_str, members=None, level=0):
|
||||
in_member_name = re.search(r"^\s+\w", members_str) is not None
|
||||
for char in members_str:
|
||||
if char == '{':
|
||||
level += 1
|
||||
if level == 1:
|
||||
in_member_name = True
|
||||
elif char == '}':
|
||||
level -= 1
|
||||
elif level == 1:
|
||||
if in_member_name and re.match(r"\w", char):
|
||||
if members is None:
|
||||
members = [char]
|
||||
else:
|
||||
members[-1] += char
|
||||
elif char == ',':
|
||||
members.append('')
|
||||
in_member_name = True
|
||||
elif re.match(r"\s", char) is None:
|
||||
in_member_name = False
|
||||
return members, level
|
||||
|
||||
|
||||
def _parse_struct_print(line):
|
||||
m = re.match(r"^\$1 = (.*)?$", line)
|
||||
if m is not None:
|
||||
return _parse_struct_str(m.group(1))
|
||||
return None, 0
|
||||
|
||||
|
||||
def get_struct_size(elffile, struct):
|
||||
"""
|
||||
Determines the size in bytes of a struct from a given ELF file.
|
||||
|
||||
Parameters:
|
||||
elffile (string-like): Path to an ELF file.
|
||||
struct (string-like): Type name of a struct.
|
||||
|
||||
Returns:
|
||||
int: Size of the struct in bytes.
|
||||
|
||||
Raises:
|
||||
LineNotFound: When the line with the size of the struct can not be
|
||||
found in the GDB output.
|
||||
"""
|
||||
for line in _exec_gdb(elffile,
|
||||
["print sizeof(*(({}*)_pktbuf))"
|
||||
.format(struct)]).readlines():
|
||||
line = line.decode()
|
||||
_check_debug_symbols(line)
|
||||
m = re.match(r"^\$1 = (\d+)$", line)
|
||||
if m is not None:
|
||||
return int(m.group(1))
|
||||
raise LineNotFound("Unable to find line of size of {}"
|
||||
.format(struct))
|
||||
|
||||
|
||||
def get_struct_members(elffile, struct):
|
||||
"""
|
||||
Determines the members of a struct from a given ELF file.
|
||||
|
||||
Parameters:
|
||||
elffile (string-like): Path to an ELF file.
|
||||
struct (string-like): Type name of a struct.
|
||||
|
||||
Returns:
|
||||
list: Names of the members of the struct as list of strings.
|
||||
|
||||
Raises:
|
||||
LineNotFound: When the output with the members of the struct can not be
|
||||
found in the GDB output.
|
||||
"""
|
||||
level = 0
|
||||
struct_members = None
|
||||
for line in _exec_gdb(elffile,
|
||||
["print/d *({}*)_pktbuf"
|
||||
.format(struct)]).readlines():
|
||||
assert (level > 0) or (struct_members is None)
|
||||
line = line.decode()
|
||||
_check_debug_symbols(line)
|
||||
if level > 0:
|
||||
struct_members, level = _parse_struct_str(
|
||||
line, struct_members, level
|
||||
)
|
||||
else:
|
||||
struct_members, level = _parse_struct_print(line)
|
||||
if struct_members is not None and (level == 0):
|
||||
return struct_members
|
||||
raise LineNotFound("Unable to find line of {} members".format(struct))
|
||||
|
||||
|
||||
def get_struct_member_size(elffile, struct, member):
|
||||
"""
|
||||
Determines the size in bytes of a member of a struct from a given ELF file.
|
||||
|
||||
Parameters:
|
||||
elffile (string-like): Path to an ELF file.
|
||||
struct (string-like): Type name of a struct.
|
||||
member (string-like): Name of a member of the given struct.
|
||||
|
||||
Returns:
|
||||
int: Size of the member of the struct in bytes.
|
||||
|
||||
Raises:
|
||||
LineNotFound: When the line with the size of the member of the struct
|
||||
can not be found in the GDB output.
|
||||
"""
|
||||
for line in _exec_gdb(elffile,
|
||||
["print sizeof((({}*)_pktbuf)->{})"
|
||||
.format(struct, member)]).readlines():
|
||||
line = line.decode()
|
||||
_check_debug_symbols(line)
|
||||
m = re.match(r"^\$1 = (\d+)$", line)
|
||||
if m is not None:
|
||||
return int(m.group(1))
|
||||
raise LineNotFound("Unable to find line of size of {} in {}"
|
||||
.format(member, struct))
|
||||
|
||||
|
||||
def get_struct(elffile, struct_name):
|
||||
"""
|
||||
Gets struct definition dictionary of a struct from a given ELF file.
|
||||
|
||||
Parameters:
|
||||
elffile (string-like): Path to an ELF file.
|
||||
struct_name (string-like): Type name of a struct.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary containing
|
||||
- "name": the type name of the struct as a string.
|
||||
- "endianness": Endianness of the struct in `struct` notation as
|
||||
string.
|
||||
- "subparser" (optional): Name of a subparser function as string,
|
||||
to parse the byte string to human readable output.
|
||||
- "members": OrderedDict of the members of the struct with the name
|
||||
of the member as key and the size of the member in bytes as
|
||||
value. The order of the keys is in accordance with the member in
|
||||
memory.
|
||||
- "size": Total size of the struct (including padding) in bytes.
|
||||
|
||||
Raises:
|
||||
LineNotFound: When lines regarding the struct can not be found in the
|
||||
GDB output
|
||||
NotImplementedError: If no base information for the struct can be
|
||||
found. Extend NETTYPE_STRUCTS in this case accordingly.
|
||||
"""
|
||||
global PKTSNIP_STRUCT, NETTYPE_STRUCTS
|
||||
d = None
|
||||
if struct_name == PKTSNIP_STRUCT["name"]:
|
||||
d = PKTSNIP_STRUCT
|
||||
else:
|
||||
for v in NETTYPE_STRUCTS.values():
|
||||
if struct_name == v["name"]:
|
||||
d = v
|
||||
if d is None:
|
||||
raise NotImplementedError("Parsing of struct {} not implemented"
|
||||
.format(struct_name))
|
||||
if "members" not in d:
|
||||
d["members"] = collections.OrderedDict([
|
||||
(m, get_struct_member_size(elffile, struct_name, m))
|
||||
for m in get_struct_members(elffile, struct_name)
|
||||
])
|
||||
d["size"] = get_struct_size(elffile, struct_name)
|
||||
return d
|
||||
|
||||
|
||||
def to_nettype(elffile, number):
|
||||
"""
|
||||
Gets a gnrc_nettype name by its number.
|
||||
|
||||
Parameters:
|
||||
elffile (string-like): Path to an ELF file.
|
||||
number (number-like): Integer representation of a gnrc_nettype.
|
||||
|
||||
Returns:
|
||||
str: Name of the gnrc_nettype associated with the given number.
|
||||
|
||||
Raises:
|
||||
LineNotFound: If the nettype can not be found in the GDB output.
|
||||
"""
|
||||
if number in NETTYPES:
|
||||
return NETTYPES[number]
|
||||
for line in _exec_gdb(elffile,
|
||||
["print (gnrc_nettype_t){:d}"
|
||||
.format(number)]).readlines():
|
||||
line = line.decode()
|
||||
_check_debug_symbols(line)
|
||||
m = re.match(r"^\$1 = ([0-9A-Z_]+)$", line)
|
||||
if m is not None:
|
||||
NETTYPES[number] = m.group(1)
|
||||
return m.group(1)
|
||||
raise LineNotFound("Unable to find line of nettype")
|
||||
|
||||
|
||||
def parse_struct(struct_dict, byts):
|
||||
"""
|
||||
Parses a struct from a given bytes chunk
|
||||
|
||||
Parameters:
|
||||
struct_dict (dict): a struct descriptor dictionary as returned by
|
||||
get_struct()
|
||||
byts (dict): bytes to parse as a chunk descriptor dictionary as
|
||||
provided in the result of parse_hexdump()
|
||||
|
||||
Returns:
|
||||
dict: dictionary containing the name of the struct members as keys
|
||||
and their value as value. The "padding" key contains a byte string of
|
||||
the data padding the struct.
|
||||
"""
|
||||
if struct_dict["size"] > len(byts["data"]):
|
||||
return None
|
||||
size = sum(s for s in struct_dict["members"].values())
|
||||
vals = struct.unpack(
|
||||
struct_dict["endianness"] +
|
||||
"".join(S.get(size, '{}s'.format(size))
|
||||
for size in struct_dict["members"].values()),
|
||||
byts["data"][:size]
|
||||
)
|
||||
res = {k: vals[i] for i, k in enumerate(struct_dict["members"])}
|
||||
res["padding"] = byts["data"][size:byts["size"]]
|
||||
return res
|
||||
|
||||
|
||||
def parse_hexdump(dump):
|
||||
"""
|
||||
Generator to parse the packet buffer dump into a processable dictionary
|
||||
|
||||
Parameters:
|
||||
dump (string): The output of the `gnrcp_pktbuf_stats()` function / the
|
||||
`pktbuf` command.
|
||||
Outputs of multiples invocation are possible to be parsed. They each
|
||||
will be outputted one by one when iterating over the generator.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary describing the state of the packet buffer:
|
||||
- "line" (int): The summary line of the packet buffer.
|
||||
- "first_byte" (int): address of the first byte of the packet
|
||||
buffer.
|
||||
- "last_byte" (int): address of the last byte of the packet buffer
|
||||
- "size" (int): size in bytes of the packet buffer.
|
||||
- "last_byte_used" (int): the maximum number of byte used by the
|
||||
packet buffer at that point in time.
|
||||
- "segments" (list): list of segment dictionaries marked in the
|
||||
packet buffer. There are two types of segments "unused" and
|
||||
"chunk". "unused" segments are not in use and have the following
|
||||
members:
|
||||
- "type" (str): Marking it as "unused" segment.
|
||||
- "name" (str): Human-readable name for the unused segment.
|
||||
- "start" (int): Start address of the segment.
|
||||
- "size" (int): Size in bytes of the segment.
|
||||
- "next" (int, NoneType): Start address of the next "unused"
|
||||
segment in the packet buffer. `None` if there is no next
|
||||
"unused" segment.
|
||||
"chunk" segments are in use and have the following members:
|
||||
- "type" (str): Marking it as "chunk" segment.
|
||||
- "name" (str): Human-readable name for the chunk.
|
||||
- "start" (int): Start address of the segment.
|
||||
- "size" (int): Size in bytes of the segment.
|
||||
- "content" (list): List of dictionaries of the content of the
|
||||
chunk. May contain raw data ("type": "raw") that can be
|
||||
parsed with parse_struct().
|
||||
"""
|
||||
pktbuf = {"segments": []}
|
||||
number_of_bytes = 0
|
||||
current_bytes = None
|
||||
chunk_bytes = 0
|
||||
for line in dump.readlines():
|
||||
if "size" not in pktbuf:
|
||||
m = re.search(r"packet buffer: first byte: 0x([0-9A-Fa-f]+), "
|
||||
r"last byte: 0x([0-9A-Fa-f]+) \(size: +(\d+)\)",
|
||||
line)
|
||||
if m is not None:
|
||||
pktbuf["line"] = line.strip()
|
||||
pktbuf["first_byte"] = int(m.group(1), base=16)
|
||||
pktbuf["last_byte"] = int(m.group(2), base=16)
|
||||
pktbuf["size"] = int(m.group(3))
|
||||
else:
|
||||
if number_of_bytes >= pktbuf["size"]:
|
||||
res = pktbuf
|
||||
pktbuf = {"segments": []}
|
||||
number_of_bytes = 0
|
||||
yield res
|
||||
m = re.search(r" position of last byte used: (\d+)", line)
|
||||
if m is not None:
|
||||
pktbuf["last_byte_used"] = int(m.group(1))
|
||||
continue
|
||||
m = re.search(r"~ unused: 0x([0-9A-Fa-f]+) "
|
||||
r"\(next: (0x([0-9A-Fa-f]+)|\(nil\)), "
|
||||
r"size: +(\d+)\) ~", line)
|
||||
if m is not None:
|
||||
nxt = m.group(3)
|
||||
size = int(m.group(4))
|
||||
start = int(m.group(1), base=16)
|
||||
pktbuf["segments"].append({
|
||||
"type": "unused",
|
||||
"name": "unused 0x{:x}".format(start),
|
||||
"start": start,
|
||||
"size": size,
|
||||
"next": int(nxt, base=16) if nxt is not None else None,
|
||||
})
|
||||
number_of_bytes += size
|
||||
continue
|
||||
if current_bytes is None:
|
||||
m = re.search(r"=+ chunk +(\d+) "
|
||||
r"\(0x([0-9A-Fa-f]+) size: +(\d+)\) =+", line)
|
||||
if m is not None:
|
||||
current_bytes = bytearray(b"")
|
||||
chunk_bytes = int(m.group(3))
|
||||
start = int(m.group(2), base=16)
|
||||
pktbuf["segments"].append({
|
||||
"type": "chunk",
|
||||
"name": "chunk {}".format(m.group(1)),
|
||||
"start": start,
|
||||
"size": chunk_bytes,
|
||||
"content": [{"data": current_bytes, "type": "raw",
|
||||
"size": chunk_bytes, "start": start}],
|
||||
})
|
||||
number_of_bytes += chunk_bytes
|
||||
continue
|
||||
if (len(pktbuf["segments"])) and \
|
||||
(pktbuf["segments"][-1]["type"] == "chunk"):
|
||||
m = re.search(r"[0-9A-Fa-f]{8}(.*)$", line)
|
||||
if m is not None:
|
||||
byts = bytes.fromhex(
|
||||
"".join(re.findall(" ([0-9A-Fa-f]{2})", m.group(1)))
|
||||
)
|
||||
current_bytes.extend(byts)
|
||||
chunk_bytes -= len(byts)
|
||||
if not chunk_bytes:
|
||||
current_bytes = None
|
||||
continue
|
||||
if number_of_bytes:
|
||||
yield pktbuf
|
||||
|
||||
|
||||
def empty_pktbuf(pktbuf):
|
||||
"""
|
||||
Checks if the packet buffer is empty.
|
||||
|
||||
Parameters:
|
||||
pktbuf (dict): packet buffer descriptor as returned by parse_hexdump().
|
||||
|
||||
Returns:
|
||||
True, if packet buffer is empty.
|
||||
False, if packet buffer is not empty.
|
||||
"""
|
||||
return (len(pktbuf["segments"]) == 1) and \
|
||||
(pktbuf["first_byte"] == pktbuf["segments"][0]["start"]) and \
|
||||
((pktbuf["last_byte"] - pktbuf["first_byte"]) ==
|
||||
pktbuf["segments"][0]["size"])
|
||||
|
||||
|
||||
def in_pktbuf(pktbuf, addr):
|
||||
"""
|
||||
Checks if a given address is in the packet buffer.
|
||||
|
||||
Parameters:
|
||||
pktbuf (dict): packet buffer descriptor as returned by parse_hexdump().
|
||||
addr (int): Address to check.
|
||||
|
||||
Returns:
|
||||
True, if the address is in the packet buffer.
|
||||
False, if the address is not in the packet buffer.
|
||||
"""
|
||||
return addr is not None and \
|
||||
(pktbuf["first_byte"] <= addr <
|
||||
(pktbuf["first_byte"] + pktbuf["size"]))
|
||||
|
||||
|
||||
def in_segment(segment, addr):
|
||||
"""
|
||||
Checks if a given address is in the packet buffer segment.
|
||||
|
||||
Parameters:
|
||||
segment (dict): packet buffer segment descriptor as they exist in the
|
||||
result of parse_hexdump().
|
||||
addr (int): Address to check.
|
||||
|
||||
Returns:
|
||||
True, if the address is in the packet buffer segment.
|
||||
False, if the address is not in the packet buffer segment.
|
||||
"""
|
||||
return addr is not None and \
|
||||
(segment["start"] <= addr < (segment["start"] + segment["size"]))
|
||||
|
||||
|
||||
def _is_file(f):
|
||||
if not os.path.exists(f):
|
||||
raise ValueError("File {} does not exist".format(f))
|
||||
return f
|
||||
|
||||
|
||||
def _round_up_to_8(number):
|
||||
return int(((number + 7) // 8) * 8)
|
||||
|
||||
|
||||
def _tidyup_pktsnip(elffile, pktsnip):
|
||||
size = sum(s for s in PKTSNIP_STRUCT["members"].values())
|
||||
pktsnip["type"] = to_nettype(elffile, pktsnip["type"])
|
||||
pktsnip["next"] = {"addr": pktsnip["next"]}
|
||||
pktsnip["data"] = {"addr": pktsnip["data"]}
|
||||
pktsnip["padding"] = pktsnip["padding"][
|
||||
:(_round_up_to_8(PKTSNIP_STRUCT["size"]) - size)
|
||||
]
|
||||
|
||||
|
||||
def identify_pktsnip(elffile, content, pktbuf):
|
||||
"""
|
||||
Tries to identify a pktsnip from a given chunk content descriptor.
|
||||
|
||||
Parameters:
|
||||
elffile (str): Path to an ELF file.
|
||||
content (dict): chunk content descriptor as returned in the result of
|
||||
parse_hexdump().
|
||||
pktbuf (dict): packet buffer descriptor as returned by parse_hexdump().
|
||||
|
||||
Returns:
|
||||
dict: a dictionary describing the packet snip struct as returned by
|
||||
parse_struct() and brought into a human readable form.
|
||||
NoneType: None when no pktsnip can be identified.
|
||||
"""
|
||||
pktsnip = None
|
||||
idx = None
|
||||
# find first fitting raw content marker
|
||||
for i, c in enumerate(content):
|
||||
if (c["size"] >= PKTSNIP_STRUCT["size"]) and (c["type"] == "raw"):
|
||||
idx = i
|
||||
break
|
||||
if idx is None:
|
||||
return None
|
||||
while (pktsnip is None) and (idx < len(content)):
|
||||
c = content[idx]
|
||||
pktsnip = parse_struct(PKTSNIP_STRUCT, c)
|
||||
if (pktsnip is not None) and \
|
||||
((pktsnip["next"] == 0) or in_pktbuf(pktbuf, pktsnip["next"])) and \
|
||||
((pktsnip["data"] == 0) or in_pktbuf(pktbuf, pktsnip["data"])):
|
||||
size = _round_up_to_8(PKTSNIP_STRUCT["size"])
|
||||
new_data = c["data"][size:]
|
||||
new_size = c["size"] - size
|
||||
c["type"] = "gnrc_pktsnip"
|
||||
c["data"] = pktsnip
|
||||
c["size"] = size
|
||||
_tidyup_pktsnip(elffile, pktsnip)
|
||||
if not len(new_data):
|
||||
# end of content
|
||||
break
|
||||
else:
|
||||
pktsnip = None
|
||||
size = 8
|
||||
new_data = c["data"][size:]
|
||||
new_size = c["size"] - size
|
||||
if not len(new_data):
|
||||
# end of content
|
||||
break
|
||||
elif (idx > 0) and (content[idx - 1]["type"] == "raw"):
|
||||
prev = idx - 1
|
||||
# merge new raw with previous raw
|
||||
content[prev]["data"] += c["data"][:size]
|
||||
content[prev]["size"] += size
|
||||
c["start"] = content[prev]["start"] + content[prev]["size"]
|
||||
c["data"] = new_data
|
||||
c["size"] -= size
|
||||
continue
|
||||
else:
|
||||
c["data"] = new_data
|
||||
c["size"] = size
|
||||
content.insert(idx + 1, {
|
||||
"data": new_data,
|
||||
"size": new_size,
|
||||
"type": "raw",
|
||||
"start": c["start"] + size,
|
||||
})
|
||||
idx += 1
|
||||
return pktsnip
|
||||
|
||||
|
||||
def identify_struct(elffile, content, pktsnip):
|
||||
"""
|
||||
Tries to identify a struct in the given content via the provided pktsnip.
|
||||
|
||||
Parameters:
|
||||
elffile: Path to an ELF file.
|
||||
content (dict): chunk content descriptor as returned in the result of
|
||||
parse_hexdump().
|
||||
pktsnip: Packet snip descriptor, pointing its data pointer to content.
|
||||
|
||||
Returns:
|
||||
dict: struct descriptor as returned by parse_struct() or by a provided
|
||||
sub-parser for that struct in NETTYPE_STRUCTS.
|
||||
"""
|
||||
global NETTYPE_STRUCTS
|
||||
nettype = pktsnip["type"]
|
||||
struct_dict = get_struct(elffile, NETTYPE_STRUCTS[nettype]["name"])
|
||||
idx = None
|
||||
offset = pktsnip["data"]["addr"] - content[0]["start"]
|
||||
# find first fitting raw content marker
|
||||
for i, c in enumerate(content):
|
||||
if (c["size"] >= struct_dict["size"]) and (c["type"] == "raw") and \
|
||||
((c["start"] + c["size"]) > pktsnip["data"]["addr"]):
|
||||
idx = i
|
||||
offset = pktsnip["data"]["addr"] - content[idx]["start"]
|
||||
break
|
||||
offset -= c["size"]
|
||||
if (offset < 0):
|
||||
# something is broken or already parsed => ignore this
|
||||
return
|
||||
if idx is None:
|
||||
# something is broken or already parsed => ignore this
|
||||
return
|
||||
c = content[idx]
|
||||
if c["type"] not in ["raw", nettype.lower()]:
|
||||
raise InconsistentPktbuf("pointer 0x{:x} points to two distinct types"
|
||||
.format(pktsnip["data"]["addr"]))
|
||||
size = _round_up_to_8(pktsnip["size"])
|
||||
if offset > 0:
|
||||
new_data = c["data"][offset:]
|
||||
new_size = c["size"] - offset
|
||||
new_start = c["start"] + offset
|
||||
content.insert(idx, {
|
||||
"start": c["start"],
|
||||
"type": "raw",
|
||||
"data": c["data"][:offset],
|
||||
"size": offset,
|
||||
})
|
||||
c["data"] = new_data
|
||||
c["size"] = new_size
|
||||
c["start"] = new_start
|
||||
if size < c["size"]:
|
||||
new_data = c["data"][size:]
|
||||
new_size = c["size"] - size
|
||||
new_start = c["start"] + size
|
||||
c["data"] = c["data"][:size]
|
||||
c["size"] = size
|
||||
content.insert(idx + 1, {
|
||||
"start": new_start,
|
||||
"type": "raw",
|
||||
"data": new_data,
|
||||
"size": new_size,
|
||||
})
|
||||
c["type"] = nettype.lower()
|
||||
if "subparser" in struct_dict:
|
||||
subparser = struct_dict["subparser"]
|
||||
if isinstance(subparser, str):
|
||||
subparser = globals()[subparser]
|
||||
c["data"] = subparser(parse_struct(struct_dict, c))
|
||||
else:
|
||||
c["data"] = parse_struct(struct_dict, c)
|
||||
|
||||
|
||||
def main():
|
||||
global PKTSNIP_STRUCT
|
||||
args_parser = argparse.ArgumentParser(
|
||||
description="Analyze `pktbuf` command output"
|
||||
)
|
||||
args_parser.add_argument("elffile", type=_is_file,
|
||||
help="The elffile of the application `pktbuf` "
|
||||
"was executed in")
|
||||
args_parser.add_argument("dump", type=argparse.FileType("r"),
|
||||
default=sys.stdin,
|
||||
help="Output of one or more `pktbuf` executions "
|
||||
"(default: stdin)")
|
||||
args = args_parser.parse_args()
|
||||
get_struct(args.elffile, PKTSNIP_STRUCT["name"])
|
||||
for i, pktbuf in enumerate(parse_hexdump(args.dump), 1):
|
||||
if empty_pktbuf(pktbuf):
|
||||
print("pktbuf output {} shows an empty pktbuf".format(i))
|
||||
pprint.pprint(pktbuf)
|
||||
continue
|
||||
pktsnip = None
|
||||
while True:
|
||||
for nr, segment in enumerate(pktbuf["segments"]):
|
||||
if segment["type"] != "chunk":
|
||||
continue
|
||||
pktsnip = identify_pktsnip(args.elffile, segment["content"],
|
||||
pktbuf)
|
||||
if pktsnip is not None:
|
||||
break
|
||||
if pktsnip is None:
|
||||
break
|
||||
for nr, segment in enumerate(pktbuf["segments"]):
|
||||
if in_segment(segment, pktsnip["next"]["addr"]):
|
||||
pktsnip["next"]["name"] = {
|
||||
"segment": segment["name"],
|
||||
"offset": pktsnip["next"]["addr"] - segment["start"]
|
||||
}
|
||||
if in_segment(segment, pktsnip["data"]["addr"]):
|
||||
if segment["type"] == "chunk":
|
||||
identify_struct(args.elffile, segment["content"],
|
||||
pktsnip)
|
||||
pktsnip["data"]["name"] = {
|
||||
"segment": segment["name"],
|
||||
"offset": pktsnip["data"]["addr"] - segment["start"]
|
||||
}
|
||||
print("pktbuf output {}".format(i))
|
||||
pprint.pprint(pktbuf)
|
||||
|
||||
|
||||
# Subparsers
|
||||
def gnrc_netif_parser(data_dict):
|
||||
"""
|
||||
Sub-parser for GNRC netif headers.
|
||||
"""
|
||||
src_len = data_dict["src_l2addr_len"]
|
||||
dst_len = data_dict["dst_l2addr_len"]
|
||||
if (src_len > 0) and (len(data_dict["padding"]) >= src_len):
|
||||
data_dict["src_l2addr"] = struct.unpack(
|
||||
"!{}s".format(src_len),
|
||||
data_dict["padding"][:src_len]
|
||||
)
|
||||
data_dict["padding"] = data_dict["padding"][src_len:]
|
||||
else:
|
||||
data_dict["src_l2addr"] = b""
|
||||
if (dst_len > 0) and (len(data_dict["padding"]) >= dst_len):
|
||||
data_dict["dst_l2addr"] = struct.unpack(
|
||||
"!{}s".format(dst_len),
|
||||
data_dict["padding"][:dst_len]
|
||||
)
|
||||
data_dict["padding"] = data_dict["padding"][dst_len:]
|
||||
else:
|
||||
data_dict["dst_l2addr"] = b""
|
||||
return data_dict
|
||||
|
||||
|
||||
def ipv6_hdr_parser(data_dict):
|
||||
"""
|
||||
Sub-parser for IPv6 headers.
|
||||
"""
|
||||
data_dict["nh"] = PROTNUMS.get(data_dict["nh"], data_dict["nh"])
|
||||
data_dict["version"] = data_dict["v_tc_fl"] >> 28
|
||||
data_dict["tc"] = {"ecn": (data_dict["v_tc_fl"] & 0x0c000000) >> 26,
|
||||
"dscp": (data_dict["v_tc_fl"] & 0x03f00000) >> 20}
|
||||
data_dict["fl"] = data_dict["v_tc_fl"] & 0x000fffff
|
||||
del data_dict["v_tc_fl"]
|
||||
return data_dict
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user