1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-12-26 23:11:19 +01:00

tests: provide tests for 6LBR DHCPv6 client

This commit is contained in:
Martine S. Lenders 2020-02-07 16:51:42 +01:00
parent db463a3373
commit 8cafcc3ebf
No known key found for this signature in database
GPG Key ID: CCD317364F63286F
7 changed files with 354 additions and 0 deletions

View File

@ -0,0 +1,10 @@
if MODULE_ETHOS
config GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE
default y
depends on MODULE_GNRC_DHCPV6_CLIENT_6LBR && KCONFIG_MODULE_GNRC_DHCPV6
config GNRC_NETIF_IPV6_ADDRS_NUMOF
# CONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE=1 requires one more address
# for `fe80::2`.
default 3
depends on KCONFIG_MODULE_GNRC_NETIF
endif # MODULE_ETHOS

View File

@ -0,0 +1,52 @@
DEVELHELP := 1
include $(CURDIR)/../Makefile.tests_common
export TAP ?= tap0
GNRC_NETIF_NUMOF := 2
USEMODULE += auto_init_gnrc_netif
USEMODULE += gnrc_dhcpv6_client_6lbr
USEMODULE += gnrc_netdev_default
USEMODULE += gnrc_pktdump
USEMODULE += gnrc_sixlowpan_border_router_default
USEMODULE += ps
USEMODULE += shell
USEMODULE += shell_commands
# use Ethernet as link-layer protocol
ifeq (native,$(BOARD))
TERMFLAGS += -z [::1]:17754
else
ETHOS_BAUDRATE ?= 115200
CFLAGS += -DETHOS_BAUDRATE=$(ETHOS_BAUDRATE)
TERMDEPS += ethos
TERMPROG ?= sudo $(RIOTTOOLS)/ethos/ethos
TERMFLAGS ?= $(TAP) $(PORT) $(ETHOS_BAUDRATE)
STATIC_ROUTES ?= 1
endif
# The test requires some setup and to be run as root
# So it cannot currently be run on CI
TEST_ON_CI_BLACKLIST += all
# As there is an 'app.config' we want to explicitly disable Kconfig by setting
# the variable to empty
SHOULD_RUN_KCONFIG ?=
include $(RIOTBASE)/Makefile.include
ifndef CONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE
ifeq (1,$(STATIC_ROUTES))
CFLAGS += -DCONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE=1
# CONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE=1 requires one more address for
# `fe80::2`.
CFLAGS += -DCONFIG_GNRC_NETIF_IPV6_ADDRS_NUMOF=3
endif
endif
ifeq (,$(filter native,$(BOARD)))
.PHONY: ethos
ethos:
$(Q)env -u CC -u CFLAGS make -C $(RIOTBASE)/dist/tools/ethos
endif

View File

@ -0,0 +1,6 @@
# Put board specific dependencies here
ifeq (native,$(BOARD))
USEMODULE += socket_zep
else
USEMODULE += stdio_ethos
endif

View File

@ -0,0 +1,56 @@
BOARD_INSUFFICIENT_MEMORY := \
airfy-beacon \
arduino-duemilanove \
arduino-leonardo \
arduino-mega2560 \
arduino-nano \
arduino-uno \
atmega1284p \
atmega328p \
b-l072z-lrwan1 \
blackpill-128kib \
blackpill \
bluepill-128kib \
bluepill \
calliope-mini \
cc2650-launchpad \
cc2650stk \
derfmega128 \
hifive1 \
hifive1b \
i-nucleo-lrwan1 \
lsn50 \
maple-mini \
mega-xplained \
microbit \
microduino-corerf \
msb-430 \
msb-430h \
nrf51dongle \
nrf6310 \
nucleo-f030r8 \
nucleo-f031k6 \
nucleo-f042k6 \
nucleo-f070rb \
nucleo-f072rb \
nucleo-f103rb \
nucleo-f302r8 \
nucleo-f303k8 \
nucleo-f334r8 \
nucleo-l031k6 \
nucleo-l053r8 \
nucleo-l073rz \
opencm904 \
saml10-xpro \
saml11-xpro \
spark-core \
stm32f030f4-demo \
stm32f0discovery \
stm32l0538-disco \
telosb \
waspmote-pro \
wsn430-v1_3b \
wsn430-v1_4 \
yunjia-nrf51822 \
z1 \
#

View File

@ -0,0 +1,30 @@
# `gnrc_dhcpv6_client_6lbr` test
This test utilizes [scapy] to test the DHCPv6 client configuration for a 6LoWPAN
border router.
To test, compile and flash the application to any board of your liking (since
`ethos` is used to communicate with non-native boards it really doesn't matter
as long as the application fits).
```
make flash
```
And run the tests using
```
sudo make test
```
Note that root privileges are required since `scapy` needs to construct Ethernet
frames to properly communicate over the TAP interface.
The tests succeeds if you see the string `SUCCESS`.
If any problems are encountered (i.e. if the test prints the sting `FAILED`),
set the echo parameter in the `run()` function at the bottom of the test script
(tests/01-run.py) to `True`. The test script will then offer a more detailed
output.
[scapy]: https://scapy.readthedocs.io/en/latest/

View File

@ -0,0 +1,27 @@
/*
* Copyright (C) 2020 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @{
*
* @file
* @author Martine Lenders <m.lenders@fu-berlin.de>
*/
#include "shell.h"
int main(void)
{
char line_buf[SHELL_DEFAULT_BUFSIZE];
shell_run(NULL, line_buf, SHELL_DEFAULT_BUFSIZE);
/* should be never reached */
return 0;
}
/** @} */

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Freie Universität Berlin
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import pexpect
import random
import sys
import time
from scapy.all import AsyncSniffer, sendp, Ether, IPv6, UDP
from scapy.all import DHCP6_Solicit, DHCP6_Advertise, DHCP6_Request, DHCP6_Reply
from scapy.all import DHCP6OptClientId, DHCP6OptServerId, DHCP6OptIA_PD
from scapy.all import DUID_LL, DHCP6OptIAPrefix
from testrunner import run
TIMEOUT = 1
def get_upstream_netif(child):
child.sendline("ifconfig")
child.sendline("help") # workaround to spot end of ifconfig output
candidate = None
while True: # Search for an interface that does _not_ contain 6LO flag
if candidate is None:
child.expect(r"Iface\s+([^\s]+)\s+")
candidate = child.match.group(1)
res = child.expect([r"\b6LO\b", r"Iface\s+([^\s]+)\s+", "Command",
pexpect.TIMEOUT], timeout=.2)
if res > 0:
break
candidate = None
# wait for a line in "help"
child.expect("reboot")
return candidate
def get_downstream_netif(child):
child.sendline("ifconfig")
child.sendline("help") # workaround to spot end of ifconfig output
candidate = None
while True: # Search for an interface that does _not_ contain 6LO flag
if candidate is None:
child.expect(r"Iface\s+([^\s]+)\s+")
candidate = child.match.group(1)
res = child.expect([r"\b6LO\b", r"Iface\s+([^\s]+)\s+", "Command",
pexpect.TIMEOUT], timeout=.2)
if res == 0:
break
elif res == 1:
candidate = child.match.group(1)
elif res == 2:
break
else:
candidate = None
# wait for a line in "help"
child.expect("reboot")
return candidate
def get_hwaddrs(child, netif):
hwaddrs = []
child.sendline("ifconfig {}".format(netif))
child.expect(r"HWaddr:\s+(([A-Fa-f0-9]{2}:?)+)\s")
hwaddrs.append(child.match.group(1).lower())
if len(hwaddrs[0]) == 5: # short address
res = child.expect([pexpect.TIMEOUT,
r"Long HWaddr:\s+(([A-Fa-f0-9]{2}:?)+)\s"])
if res > 0:
hwaddrs.append(child.match.group(1).lower())
return hwaddrs
def start_sniffer(iface, count=None, stop_filter=None):
sniffer = AsyncSniffer(
iface=iface,
filter="udp and dst port 547",
count=count,
stop_filter=stop_filter,
)
sniffer.start()
return sniffer
def wait_for_dhcpv6_pkt(iface, sniffer=None, timeout=5):
if sniffer is None:
sniffer = start_sniffer(iface, count=1)
sniffer.join(timeout=timeout)
if sniffer.results is None:
raise TimeoutError("Sniffing for DHCPv6 traffic timed out")
return [p for p in sniffer.results
# filter out packets only belonging to stop_filter if it existed
if sniffer.kwargs.get("stop_filter") is None or
sniffer.kwargs["stop_filter"](p)][-1]
def build_reply_headers(pkt):
src_ether = pkt[Ether].src
src_ip = pkt[IPv6].src
sport = pkt[UDP].sport
dport = pkt[UDP].dport
return Ether(dst=src_ether) / IPv6(dst=src_ip) / \
UDP(sport=dport, dport=sport)
def testfunc(child):
iface = os.environ["TAP"]
pkt = wait_for_dhcpv6_pkt(iface)
# the packet was a solicit
assert DHCP6_Solicit in pkt
# check if the sender is the upstream interface of the node
upstream_netif = get_upstream_netif(child)
print(upstream_netif)
upstream_hwaddrs = get_hwaddrs(child, upstream_netif)
assert DHCP6OptClientId in pkt and DUID_LL in pkt[DHCP6OptClientId].duid
assert pkt[DHCP6OptClientId].duid[DUID_LL].lladdr in upstream_hwaddrs
# and it is asking for a prefix delegation
assert DHCP6OptIA_PD in pkt
# reply to solicit with advertise and a prefix provided
trid = pkt[DHCP6_Solicit].trid
srv_duid = "aa:bb:cc:dd:ee:ff"
cli_id = DHCP6OptClientId(duid=pkt[DHCP6OptClientId].duid)
srv_id = DHCP6OptServerId(duid=DUID_LL(lladdr=srv_duid))
prefix = "2001:db8:{:x}:{:x}::".format(
random.randint(0, 0xffff),
random.randint(0, 0xffff)
)
ia_pd = DHCP6OptIA_PD(T1=12000, T2=13000, iaid=pkt[DHCP6OptIA_PD].iaid,
iapdopt=[
DHCP6OptIAPrefix(preflft=14000, validlft=15000,
prefix=prefix, plen=64)])
# start sniffer to catch incoming request
sniffer = start_sniffer(iface,
stop_filter=lambda pkt: DHCP6_Request in pkt)
sendp(build_reply_headers(pkt) / DHCP6_Advertise(trid=trid) /
cli_id / srv_id / ia_pd, iface=iface, verbose=False)
# wait for request
pkt = wait_for_dhcpv6_pkt(iface, sniffer)
# the packet was indeed a request
assert DHCP6_Request in pkt
# and from the client
assert DHCP6OptClientId in pkt and DUID_LL in pkt[DHCP6OptClientId].duid
assert pkt[DHCP6OptClientId].duid[DUID_LL].lladdr in upstream_hwaddrs
# and it is trying to talk to this server
assert DHCP6OptServerId in pkt and DUID_LL in pkt[DHCP6OptServerId].duid
assert pkt[DHCP6OptServerId].duid[DUID_LL].lladdr == srv_duid
# and is still asking for a prefix delegation
assert DHCP6OptIA_PD in pkt
# reply to request with reply and a prefix provided
trid = pkt[DHCP6_Request].trid
sendp(build_reply_headers(pkt) / DHCP6_Reply(trid=trid) /
cli_id / srv_id / ia_pd, iface=iface, verbose=False)
time.sleep(1)
# check if global address was configured
child.sendline("ifconfig {}".format(get_downstream_netif(child)))
# remove one trailing ':' from prefix just to be safe ;-)
child.expect(r"inet6 addr:\s+{}[0-9a-fA-F:]+\s"
.format(prefix[:-1]))
print("SUCCESS")
if __name__ == "__main__":
sys.exit(run(testfunc, timeout=TIMEOUT, echo=True))