tests/gnrc_rpl_srh: introduce dedicated gnrc_rpl_srh test
This commit is contained in:
parent
d9f26dbbe2
commit
df3d1be3e6
52
tests/gnrc_rpl_srh/Makefile
Normal file
52
tests/gnrc_rpl_srh/Makefile
Normal file
@ -0,0 +1,52 @@
|
||||
DEVELHELP := 1
|
||||
# name of your application
|
||||
include ../Makefile.tests_common
|
||||
|
||||
BOARD_INSUFFICIENT_MEMORY := arduino-duemilanove arduino-mega2560 arduino-uno \
|
||||
hifive1 mega-xplained msb-430 msb-430h \
|
||||
nucleo-f030r8 nucleo-f031k6 nucleo-f042k6 \
|
||||
nucleo-f070rb nucleo-f072rb nucleo-f303k8 \
|
||||
nucleo-f334r8 nucleo-l031k6 nucleo-l053r8 \
|
||||
stm32f0discovery telosb thingy52 waspmote-pro \
|
||||
wsn430-v1_3b wsn430-v1_4 z1
|
||||
# chronos, mips-malta, and ruuvitag boards don't support ethos
|
||||
BOARD_BLACKLIST := chronos mips-malta ruuvitag
|
||||
|
||||
export TAP ?= tap0
|
||||
|
||||
# use Ethernet as link-layer protocol
|
||||
ifeq (native,$(BOARD))
|
||||
USEMODULE += netdev_tap
|
||||
|
||||
TERMFLAGS ?= $(TAP)
|
||||
else
|
||||
USEMODULE += ethos
|
||||
|
||||
ETHOS_BAUDRATE ?= 115200
|
||||
CFLAGS += -DETHOS_BAUDRATE=$(ETHOS_BAUDRATE) -DUSE_ETHOS_FOR_STDIO
|
||||
TERMDEPS += ethos
|
||||
TERMPROG ?= sudo $(RIOTTOOLS)/ethos/ethos
|
||||
TERMFLAGS ?= $(TAP) $(PORT) $(ETHOS_BAUDRATE)
|
||||
endif
|
||||
USEMODULE += auto_init_gnrc_netif
|
||||
# Specify the mandatory networking modules for IPv6
|
||||
USEMODULE += gnrc_ipv6_router_default
|
||||
USEMODULE += gnrc_icmpv6_error
|
||||
USEMODULE += gnrc_pktdump
|
||||
USEMODULE += gnrc_pktbuf_cmd
|
||||
# IPv6 extension headers
|
||||
USEMODULE += gnrc_rpl_srh
|
||||
USEMODULE += od
|
||||
# Add also the shell, some shell commands
|
||||
USEMODULE += shell
|
||||
USEMODULE += shell_commands
|
||||
USEMODULE += ps
|
||||
|
||||
# TEST_ON_CI_WHITELIST += all
|
||||
|
||||
.PHONY: ethos
|
||||
|
||||
ethos:
|
||||
$(Q)env -u CC -u CFLAGS make -C $(RIOTTOOLS)/ethos
|
||||
|
||||
include $(RIOTBASE)/Makefile.include
|
||||
34
tests/gnrc_rpl_srh/README.md
Normal file
34
tests/gnrc_rpl_srh/README.md
Normal file
@ -0,0 +1,34 @@
|
||||
# `gnrc_ipv6_ext` test
|
||||
|
||||
This test utilizes [scapy] to test the GNRC's RPL source routing header
|
||||
handling.
|
||||
|
||||
To test, compile and flash the application to any board of your liking (since
|
||||
`ethos` is used to communicate with non-native boards it really doesn't matter
|
||||
as long as the application fits).
|
||||
|
||||
```
|
||||
make flash
|
||||
```
|
||||
|
||||
And run the tests using
|
||||
|
||||
```
|
||||
sudo make test
|
||||
```
|
||||
|
||||
Note that root privileges are required since `scapy` needs to construct Ethernet
|
||||
frames to properly communicate over the TAP interface.
|
||||
|
||||
The tests succeeds if you see the string `SUCCESS`.
|
||||
|
||||
If any problems are encountered (i.e. if the test prints the sting `FAILED`),
|
||||
set the echo parameter in the `run()` function at the bottom of the test script
|
||||
(tests/01-run.py) to `True`. The test script will then offer a more detailed
|
||||
output.
|
||||
|
||||
It might be that due to `scapy`'s sniffer not picking up an expected packet
|
||||
sometimes that the test application hangs for a while and then issues `FAILED`.
|
||||
Just restart the test in that case.
|
||||
|
||||
[scapy]: https://scapy.readthedocs.io/en/latest/
|
||||
77
tests/gnrc_rpl_srh/main.c
Normal file
77
tests/gnrc_rpl_srh/main.c
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright (C) 2015 Freie Universität Berlin
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @ingroup tests
|
||||
* @{
|
||||
*
|
||||
* @file
|
||||
* @brief Tests extension header handling of gnrc stack.
|
||||
*
|
||||
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
|
||||
* @author Takuo Yonezawa <Yonezawa-T2@mail.dnp.co.jp>
|
||||
*
|
||||
* @}
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include "shell.h"
|
||||
#include "net/gnrc/pktbuf.h"
|
||||
#include "net/gnrc/pktdump.h"
|
||||
#include "net/gnrc/netreg.h"
|
||||
|
||||
static char line_buf[SHELL_DEFAULT_BUFSIZE];
|
||||
static gnrc_netreg_entry_t ip_entry = GNRC_NETREG_ENTRY_INIT_PID(
|
||||
0, KERNEL_PID_UNDEF
|
||||
);
|
||||
|
||||
static inline void _ipreg_usage(char *cmd)
|
||||
{
|
||||
printf("Usage: %s {reg|unreg}", cmd);
|
||||
}
|
||||
|
||||
static int _ipreg(int argc, char **argv)
|
||||
{
|
||||
if (argc < 2) {
|
||||
_ipreg_usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
else if (strcmp("reg", argv[1]) == 0) {
|
||||
if (ip_entry.target.pid != KERNEL_PID_UNDEF) {
|
||||
puts("Already registered to protocol number 59");
|
||||
return 1;
|
||||
}
|
||||
gnrc_netreg_entry_init_pid(&ip_entry, PROTNUM_IPV6_NONXT,
|
||||
gnrc_pktdump_pid);
|
||||
gnrc_netreg_register(GNRC_NETTYPE_IPV6, &ip_entry);
|
||||
puts("Registered to protocol number 59");
|
||||
}
|
||||
else if (strcmp("unreg", argv[1]) == 0) {
|
||||
puts("Unregistered from protocol number 59");
|
||||
gnrc_netreg_unregister(GNRC_NETTYPE_IPV6, &ip_entry);
|
||||
gnrc_netreg_entry_init_pid(&ip_entry, 0, KERNEL_PID_UNDEF);
|
||||
}
|
||||
else {
|
||||
_ipreg_usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const shell_command_t shell_commands[] = {
|
||||
{ "ip", "Registers pktdump to protocol number 59 (no next header)", _ipreg },
|
||||
{ NULL, NULL, NULL }
|
||||
};
|
||||
|
||||
int main(void)
|
||||
{
|
||||
shell_run(shell_commands, line_buf, SHELL_DEFAULT_BUFSIZE);
|
||||
return 0;
|
||||
}
|
||||
374
tests/gnrc_rpl_srh/tests/01-run.py
Executable file
374
tests/gnrc_rpl_srh/tests/01-run.py
Executable file
@ -0,0 +1,374 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Copyright (C) 2018 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import sys
|
||||
import subprocess
|
||||
import threading
|
||||
|
||||
from scapy.all import Ether, IPv6, \
|
||||
IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \
|
||||
IPv6ExtHdrFragment, IPv6ExtHdrRouting, \
|
||||
ICMPv6ParamProblem, ICMPv6TimeExceeded, \
|
||||
sendp, srp1, sniff
|
||||
from testrunner import run
|
||||
|
||||
|
||||
EXT_HDR_NH = {
|
||||
IPv6ExtHdrHopByHop: 0,
|
||||
IPv6ExtHdrRouting: 43,
|
||||
IPv6ExtHdrFragment: 44,
|
||||
# IPSec headers currently unsupported by scapy
|
||||
IPv6ExtHdrDestOpt: 60,
|
||||
# Mobility header currently unsupported by scapy
|
||||
}
|
||||
|
||||
|
||||
class Sniffer(threading.Thread):
|
||||
def __init__(self, iface, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.stop_filter = None
|
||||
self.stopped = False
|
||||
self.iface = iface
|
||||
self.ps = []
|
||||
self.enter_loop = threading.Event()
|
||||
self.sniff_results = threading.Event()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.enter_loop.wait()
|
||||
self.enter_loop.clear()
|
||||
if self.stopped:
|
||||
return
|
||||
if self.stop_filter:
|
||||
self.ps = sniff(stop_filter=self.stop_filter,
|
||||
iface=self.iface, timeout=5)
|
||||
self.stop_filter = None
|
||||
self.sniff_results.set()
|
||||
|
||||
def start_sniff(self, stop_filter):
|
||||
self.stop_filter = stop_filter
|
||||
self.enter_loop.set()
|
||||
|
||||
def wait_for_sniff_results(self):
|
||||
res = []
|
||||
if self.sniff_results.wait(5):
|
||||
self.sniff_results.clear()
|
||||
res = self.ps
|
||||
self.ps = []
|
||||
return res
|
||||
|
||||
def stop(self):
|
||||
self.stopped = True
|
||||
self.enter_loop.set()
|
||||
self.join()
|
||||
|
||||
|
||||
sniffer = None
|
||||
|
||||
|
||||
def check_and_search_output(cmd, pattern, res_group, *args, **kwargs):
|
||||
output = subprocess.check_output(cmd, *args, **kwargs).decode("utf-8")
|
||||
for line in output.splitlines():
|
||||
m = re.search(pattern, line)
|
||||
if m is not None:
|
||||
return m.group(res_group)
|
||||
return None
|
||||
|
||||
|
||||
def get_bridge(tap):
|
||||
res = check_and_search_output(
|
||||
["bridge", "link"],
|
||||
r"{}.+master\s+(?P<master>[^\s]+)".format(tap),
|
||||
"master"
|
||||
)
|
||||
return tap if res is None else res
|
||||
|
||||
|
||||
def get_host_lladdr(tap):
|
||||
res = check_and_search_output(
|
||||
["ip", "addr", "show", "dev", tap, "scope", "link"],
|
||||
r"inet6\s+(?P<lladdr>[0-9A-Fa-f:]+)/\d+",
|
||||
"lladdr"
|
||||
)
|
||||
if res is None:
|
||||
raise AssertionError(
|
||||
"Can't find host link-local address on interface {}".format(tap)
|
||||
)
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def get_host_hwaddr(tap):
|
||||
res = check_and_search_output(
|
||||
["ip", "addr", "show", "dev", tap, "scope", "link"],
|
||||
r"link[^\s]+\s+(?P<hwaddr>[0-9A-Fa-f:]+)",
|
||||
"hwaddr"
|
||||
)
|
||||
if res is None:
|
||||
raise AssertionError(
|
||||
"Can't find host hardware address on interface {}".format(tap)
|
||||
)
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def pktbuf_empty(child):
|
||||
child.sendline("pktbuf")
|
||||
child.expect(r"packet buffer: first byte: (?P<first_byte>0x[0-9a-fA-F]+), "
|
||||
r"last byte: 0x[0-9a-fA-F]+ \(size: (?P<size>\d+)\)")
|
||||
first_byte = child.match.group("first_byte")
|
||||
size = child.match.group("size")
|
||||
child.expect(
|
||||
r"~ unused: {} \(next: (\(nil\)|0), size: {}\) ~".format(
|
||||
first_byte, size))
|
||||
|
||||
|
||||
def register_protnum(child):
|
||||
child.sendline("ip reg")
|
||||
child.expect("Registered to protocol number 59")
|
||||
|
||||
|
||||
def unregister(child):
|
||||
child.sendline("ip unreg")
|
||||
child.expect(r"Unregistered from protocol number \d")
|
||||
|
||||
|
||||
def get_first_interface(child):
|
||||
child.sendline("ifconfig")
|
||||
child.expect(r"Iface\s+(\d+)")
|
||||
return int(child.match.group(1))
|
||||
|
||||
|
||||
def add_ipv6_address(child, iface, ipv6_addr):
|
||||
child.sendline("ifconfig {} add {}".format(iface, ipv6_addr))
|
||||
child.expect(r"success: added [a-f0-9:]+/\d+ to interface \d+")
|
||||
|
||||
|
||||
def del_ipv6_address(child, iface, ipv6_addr):
|
||||
child.sendline("ifconfig {} del {}".format(iface, ipv6_addr))
|
||||
child.expect(r"success: removed [a-f0-9:]+ to interface \d+")
|
||||
|
||||
|
||||
def add_neighbor(child, iface, ipv6_addr, hw_addr):
|
||||
child.sendline("nib neigh add {} {} {}".format(iface, ipv6_addr, hw_addr))
|
||||
child.sendline("nib neigh")
|
||||
child.expect(r"{} dev #{} lladdr {}".format(ipv6_addr.lower(), iface,
|
||||
hw_addr.upper()))
|
||||
|
||||
|
||||
def del_neighbor(child, iface, ipv6_addr):
|
||||
child.sendline("nib neigh del {} {}".format(iface, ipv6_addr))
|
||||
|
||||
|
||||
def test_wrong_type(child, iface, hw_dst, ll_dst, ll_src):
|
||||
p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=255, segleft=1, addresses=["abcd::1"]),
|
||||
iface=iface, timeout=1, verbose=0)
|
||||
assert(p is not None)
|
||||
assert(ICMPv6ParamProblem in p)
|
||||
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
|
||||
assert(p[ICMPv6ParamProblem].ptr == 42) # routing header type field
|
||||
pktbuf_empty(child)
|
||||
|
||||
|
||||
def test_seg_left_gt_len_addresses(child, iface, hw_dst, ll_dst, ll_src):
|
||||
# send routing header with no (0) addresses but segleft set to a value
|
||||
# larger than 0
|
||||
p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=18, addresses=[]),
|
||||
iface=iface, timeout=1, verbose=0)
|
||||
assert(p is not None)
|
||||
assert(ICMPv6ParamProblem in p)
|
||||
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
|
||||
assert(p[ICMPv6ParamProblem].ptr == 43) # segleft field
|
||||
pktbuf_empty(child)
|
||||
|
||||
|
||||
def test_multicast_dst(child, iface, hw_dst, ll_dst, ll_src):
|
||||
# sniffing for ICMPv6 parameter problem message
|
||||
sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem))
|
||||
# send routing header with multicast destination
|
||||
sendp(Ether(dst=hw_dst) / IPv6(dst="ff02::1", src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]),
|
||||
iface=iface, verbose=0)
|
||||
ps = sniffer.wait_for_sniff_results()
|
||||
p = [p for p in ps if ICMPv6ParamProblem in p]
|
||||
assert(len(p) > 0)
|
||||
p = p[0]
|
||||
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
|
||||
assert(p[ICMPv6ParamProblem].ptr == 24) # IPv6 headers destination field
|
||||
pktbuf_empty(child)
|
||||
|
||||
|
||||
def test_multicast_addr(child, iface, hw_dst, ll_dst, ll_src):
|
||||
# Send routing header with multicast address in its destinations
|
||||
p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=1, addresses=["ff02::1"]),
|
||||
iface=iface, timeout=1, verbose=0)
|
||||
assert(p is not None)
|
||||
assert(ICMPv6ParamProblem in p)
|
||||
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
|
||||
assert(p[ICMPv6ParamProblem].ptr == 48) # first address in routing header
|
||||
pktbuf_empty(child)
|
||||
|
||||
|
||||
def test_multiple_addrs_of_mine_uncomp(child, iface, hw_dst, ll_dst, ll_src):
|
||||
dummy = "affe::1"
|
||||
# add dummy IPv6 address
|
||||
dst_iface = get_first_interface(child)
|
||||
add_ipv6_address(child, dst_iface, dummy)
|
||||
p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=3, addresses=[ll_dst, ll_src,
|
||||
dummy]),
|
||||
iface=iface, timeout=1, verbose=0)
|
||||
assert(p is not None)
|
||||
assert(ICMPv6ParamProblem in p)
|
||||
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
|
||||
assert(p[ICMPv6ParamProblem].ptr == 40+8+(2 * 16)) # dummy in routing header
|
||||
pktbuf_empty(child)
|
||||
del_ipv6_address(child, dst_iface, dummy)
|
||||
|
||||
|
||||
def test_forward_uncomp(child, iface, hw_dst, ll_dst, ll_src):
|
||||
dummy = "affe::1"
|
||||
hl = random.randint(2, 255)
|
||||
# sniffing for packets to dummy
|
||||
sniffer.start_sniff(lambda p: p[Ether].src == hw_dst)
|
||||
# add dummy IPv6 address
|
||||
dst_iface = get_first_interface(child)
|
||||
hw_src = get_host_hwaddr(iface)
|
||||
add_neighbor(child, dst_iface, dummy, hw_src)
|
||||
sendp(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src, hlim=hl) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=1, addresses=[dummy]),
|
||||
iface=iface, verbose=0)
|
||||
ps = sniffer.wait_for_sniff_results()
|
||||
p = [p for p in ps if p[Ether].src == hw_dst]
|
||||
assert(len(p) > 0)
|
||||
p = p[0]
|
||||
assert(IPv6 in p)
|
||||
assert(IPv6ExtHdrRouting in p)
|
||||
assert(p[IPv6].src == ll_src)
|
||||
assert(p[IPv6].dst == dummy)
|
||||
assert(p[IPv6].hlim == (hl - 1))
|
||||
assert(p[IPv6ExtHdrRouting].type == 3)
|
||||
assert(p[IPv6ExtHdrRouting].segleft == 0)
|
||||
pktbuf_empty(child)
|
||||
del_neighbor(child, dst_iface, dummy)
|
||||
|
||||
|
||||
def test_forward_uncomp_not_first_ext_hdr(child, iface, hw_dst, ll_dst, ll_src):
|
||||
dummy = "affe::1"
|
||||
hl = random.randint(2, 255)
|
||||
# sniffing for packets to dummy
|
||||
sniffer.start_sniff(lambda p: p[Ether].src == hw_dst)
|
||||
# add dummy IPv6 address
|
||||
dst_iface = get_first_interface(child)
|
||||
hw_src = get_host_hwaddr(iface)
|
||||
add_neighbor(child, dst_iface, dummy, hw_src)
|
||||
sendp(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src, hlim=hl) /
|
||||
IPv6ExtHdrHopByHop() /
|
||||
IPv6ExtHdrRouting(type=3, segleft=1, addresses=[dummy]),
|
||||
iface=iface, verbose=0)
|
||||
ps = sniffer.wait_for_sniff_results()
|
||||
p = [p for p in ps if p[Ether].src == hw_dst]
|
||||
assert(len(p) > 0)
|
||||
p = p[0]
|
||||
assert(IPv6 in p)
|
||||
assert(IPv6ExtHdrRouting in p)
|
||||
assert(p[IPv6].src == ll_src)
|
||||
assert(p[IPv6].dst == dummy)
|
||||
assert(p[IPv6].hlim == (hl - 1))
|
||||
assert(p[IPv6ExtHdrRouting].type == 3)
|
||||
assert(p[IPv6ExtHdrRouting].segleft == 0)
|
||||
pktbuf_empty(child)
|
||||
del_neighbor(child, dst_iface, dummy)
|
||||
|
||||
|
||||
def test_seq_left_0(child, iface, hw_dst, ll_dst, ll_src):
|
||||
register_protnum(child)
|
||||
sendp(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=0), iface=iface, verbose=0)
|
||||
# we are the target, so the packet should be dumped
|
||||
# empty snip
|
||||
child.expect(r"~~ SNIP 0 - size:\s+0 byte, type: NETTYPE_UNDEF \(\d+\)")
|
||||
ipv6_payload_len = 0
|
||||
# parsed routing header
|
||||
child.expect(r"~~ SNIP 1 - size:\s+(\d+) byte, type: NETTYPE_\w+ \(\d+\)")
|
||||
ipv6_payload_len += int(child.match.group(1))
|
||||
# NH = 59 (IPV6_NONXT), len = 0x00, routing type = 3, segments left = 0
|
||||
child.expect(r"00000000 3B 00 03 00 00 00 00 00")
|
||||
# IPv6 header
|
||||
child.expect(r"~~ SNIP 2 - size:\s+40 byte, type: NETTYPE_IPV6 \(\d+\)")
|
||||
child.expect_exact(r"length: {} next header: {}".format(
|
||||
ipv6_payload_len, EXT_HDR_NH[IPv6ExtHdrRouting]
|
||||
))
|
||||
child.expect_exact(r"destination address: {}".format(ll_dst))
|
||||
pktbuf_empty(child)
|
||||
unregister(child)
|
||||
|
||||
|
||||
def test_time_exc(child, iface, hw_dst, ll_dst, ll_src):
|
||||
dummy = "affe::1"
|
||||
p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, hlim=1, src=ll_src) /
|
||||
IPv6ExtHdrRouting(type=3, segleft=1, addresses=[dummy]),
|
||||
iface=iface, timeout=1, verbose=0)
|
||||
assert(p is not None)
|
||||
assert(ICMPv6TimeExceeded in p)
|
||||
assert(p[ICMPv6TimeExceeded].code == 0)
|
||||
pktbuf_empty(child)
|
||||
|
||||
|
||||
def testfunc(child):
|
||||
global sniffer
|
||||
tap = get_bridge(os.environ["TAP"])
|
||||
|
||||
lladdr_src = get_host_lladdr(tap)
|
||||
child.sendline("ifconfig")
|
||||
child.expect("HWaddr: (?P<hwaddr>[A-Fa-f:0-9]+)")
|
||||
hwaddr_dst = child.match.group("hwaddr").lower()
|
||||
child.expect("(?P<lladdr>fe80::[A-Fa-f:0-9]+)")
|
||||
lladdr_dst = child.match.group("lladdr").lower()
|
||||
sniffer = Sniffer(tap)
|
||||
sniffer.start()
|
||||
|
||||
def run(func):
|
||||
if child.logfile == sys.stdout:
|
||||
func(child, tap, hwaddr_dst, lladdr_dst, lladdr_src)
|
||||
else:
|
||||
try:
|
||||
func(child, tap, hwaddr_dst, lladdr_dst, lladdr_src)
|
||||
print(".", end="", flush=True)
|
||||
except Exception as e:
|
||||
print("FAILED")
|
||||
raise e
|
||||
|
||||
run(test_wrong_type)
|
||||
run(test_seg_left_gt_len_addresses)
|
||||
run(test_multicast_dst)
|
||||
run(test_multicast_addr)
|
||||
run(test_multiple_addrs_of_mine_uncomp)
|
||||
run(test_forward_uncomp)
|
||||
run(test_forward_uncomp_not_first_ext_hdr)
|
||||
# compressed tests hard to implement with scapy and also covered in
|
||||
# unittests
|
||||
run(test_seq_left_0)
|
||||
run(test_time_exc)
|
||||
print("SUCCESS")
|
||||
sniffer.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if os.geteuid() != 0:
|
||||
print("\x1b[1;31mThis test requires root privileges.\n"
|
||||
"It's constructing and sending Ethernet frames.\x1b[0m\n",
|
||||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
sys.exit(run(testfunc, timeout=1, echo=False))
|
||||
Loading…
x
Reference in New Issue
Block a user