diff --git a/tests/gnrc_dhcpv6_client_6lbr/Kconfig b/tests/gnrc_dhcpv6_client_6lbr/Kconfig new file mode 100644 index 0000000000..a45569e9a3 --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/Kconfig @@ -0,0 +1,10 @@ +if MODULE_ETHOS +config GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE + default y + depends on MODULE_GNRC_DHCPV6_CLIENT_6LBR && KCONFIG_MODULE_GNRC_DHCPV6 +config GNRC_NETIF_IPV6_ADDRS_NUMOF + # CONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE=1 requires one more address + # for `fe80::2`. + default 3 + depends on KCONFIG_MODULE_GNRC_NETIF +endif # MODULE_ETHOS diff --git a/tests/gnrc_dhcpv6_client_6lbr/Makefile b/tests/gnrc_dhcpv6_client_6lbr/Makefile new file mode 100644 index 0000000000..486152ac1c --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/Makefile @@ -0,0 +1,52 @@ +DEVELHELP := 1 +include $(CURDIR)/../Makefile.tests_common + +export TAP ?= tap0 +GNRC_NETIF_NUMOF := 2 + +USEMODULE += auto_init_gnrc_netif +USEMODULE += gnrc_dhcpv6_client_6lbr +USEMODULE += gnrc_netdev_default +USEMODULE += gnrc_pktdump +USEMODULE += gnrc_sixlowpan_border_router_default +USEMODULE += ps +USEMODULE += shell +USEMODULE += shell_commands + +# use Ethernet as link-layer protocol +ifeq (native,$(BOARD)) + TERMFLAGS += -z [::1]:17754 +else + ETHOS_BAUDRATE ?= 115200 + CFLAGS += -DETHOS_BAUDRATE=$(ETHOS_BAUDRATE) + TERMDEPS += ethos + TERMPROG ?= sudo $(RIOTTOOLS)/ethos/ethos + TERMFLAGS ?= $(TAP) $(PORT) $(ETHOS_BAUDRATE) + STATIC_ROUTES ?= 1 +endif + +# The test requires some setup and to be run as root +# So it cannot currently be run on CI +TEST_ON_CI_BLACKLIST += all + +# As there is an 'app.config' we want to explicitly disable Kconfig by setting +# the variable to empty +SHOULD_RUN_KCONFIG ?= + +include $(RIOTBASE)/Makefile.include + +ifndef CONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE +ifeq (1,$(STATIC_ROUTES)) + CFLAGS += -DCONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE=1 + # CONFIG_GNRC_DHCPV6_CLIENT_6LBR_STATIC_ROUTE=1 requires one more address for + # `fe80::2`. + CFLAGS += -DCONFIG_GNRC_NETIF_IPV6_ADDRS_NUMOF=3 +endif +endif + +ifeq (,$(filter native,$(BOARD))) +.PHONY: ethos + +ethos: + $(Q)env -u CC -u CFLAGS make -C $(RIOTBASE)/dist/tools/ethos +endif diff --git a/tests/gnrc_dhcpv6_client_6lbr/Makefile.board.dep b/tests/gnrc_dhcpv6_client_6lbr/Makefile.board.dep new file mode 100644 index 0000000000..c20e0e5e31 --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/Makefile.board.dep @@ -0,0 +1,6 @@ +# Put board specific dependencies here +ifeq (native,$(BOARD)) + USEMODULE += socket_zep +else + USEMODULE += stdio_ethos +endif diff --git a/tests/gnrc_dhcpv6_client_6lbr/Makefile.ci b/tests/gnrc_dhcpv6_client_6lbr/Makefile.ci new file mode 100644 index 0000000000..914efb276a --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/Makefile.ci @@ -0,0 +1,56 @@ +BOARD_INSUFFICIENT_MEMORY := \ + airfy-beacon \ + arduino-duemilanove \ + arduino-leonardo \ + arduino-mega2560 \ + arduino-nano \ + arduino-uno \ + atmega1284p \ + atmega328p \ + b-l072z-lrwan1 \ + blackpill-128kib \ + blackpill \ + bluepill-128kib \ + bluepill \ + calliope-mini \ + cc2650-launchpad \ + cc2650stk \ + derfmega128 \ + hifive1 \ + hifive1b \ + i-nucleo-lrwan1 \ + lsn50 \ + maple-mini \ + mega-xplained \ + microbit \ + microduino-corerf \ + msb-430 \ + msb-430h \ + nrf51dongle \ + nrf6310 \ + nucleo-f030r8 \ + nucleo-f031k6 \ + nucleo-f042k6 \ + nucleo-f070rb \ + nucleo-f072rb \ + nucleo-f103rb \ + nucleo-f302r8 \ + nucleo-f303k8 \ + nucleo-f334r8 \ + nucleo-l031k6 \ + nucleo-l053r8 \ + nucleo-l073rz \ + opencm904 \ + saml10-xpro \ + saml11-xpro \ + spark-core \ + stm32f030f4-demo \ + stm32f0discovery \ + stm32l0538-disco \ + telosb \ + waspmote-pro \ + wsn430-v1_3b \ + wsn430-v1_4 \ + yunjia-nrf51822 \ + z1 \ + # diff --git a/tests/gnrc_dhcpv6_client_6lbr/README.md b/tests/gnrc_dhcpv6_client_6lbr/README.md new file mode 100644 index 0000000000..4f1cec624f --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/README.md @@ -0,0 +1,30 @@ +# `gnrc_dhcpv6_client_6lbr` test + +This test utilizes [scapy] to test the DHCPv6 client configuration for a 6LoWPAN +border router. + +To test, compile and flash the application to any board of your liking (since +`ethos` is used to communicate with non-native boards it really doesn't matter +as long as the application fits). + +``` +make flash +``` + +And run the tests using + +``` +sudo make test +``` + +Note that root privileges are required since `scapy` needs to construct Ethernet +frames to properly communicate over the TAP interface. + +The tests succeeds if you see the string `SUCCESS`. + +If any problems are encountered (i.e. if the test prints the sting `FAILED`), +set the echo parameter in the `run()` function at the bottom of the test script +(tests/01-run.py) to `True`. The test script will then offer a more detailed +output. + +[scapy]: https://scapy.readthedocs.io/en/latest/ diff --git a/tests/gnrc_dhcpv6_client_6lbr/main.c b/tests/gnrc_dhcpv6_client_6lbr/main.c new file mode 100644 index 0000000000..e0eeb3ec91 --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/main.c @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2020 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @{ + * + * @file + * @author Martine Lenders + */ + +#include "shell.h" + +int main(void) +{ + char line_buf[SHELL_DEFAULT_BUFSIZE]; + shell_run(NULL, line_buf, SHELL_DEFAULT_BUFSIZE); + + /* should be never reached */ + return 0; +} + +/** @} */ diff --git a/tests/gnrc_dhcpv6_client_6lbr/tests/01-run.py b/tests/gnrc_dhcpv6_client_6lbr/tests/01-run.py new file mode 100755 index 0000000000..21853385a0 --- /dev/null +++ b/tests/gnrc_dhcpv6_client_6lbr/tests/01-run.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2018 Freie Universität Berlin +# +# This file is subject to the terms and conditions of the GNU Lesser +# General Public License v2.1. See the file LICENSE in the top level +# directory for more details. + +import os +import pexpect +import random +import sys +import time + +from scapy.all import AsyncSniffer, sendp, Ether, IPv6, UDP +from scapy.all import DHCP6_Solicit, DHCP6_Advertise, DHCP6_Request, DHCP6_Reply +from scapy.all import DHCP6OptClientId, DHCP6OptServerId, DHCP6OptIA_PD +from scapy.all import DUID_LL, DHCP6OptIAPrefix +from testrunner import run + + +TIMEOUT = 1 + + +def get_upstream_netif(child): + child.sendline("ifconfig") + child.sendline("help") # workaround to spot end of ifconfig output + candidate = None + while True: # Search for an interface that does _not_ contain 6LO flag + if candidate is None: + child.expect(r"Iface\s+([^\s]+)\s+") + candidate = child.match.group(1) + res = child.expect([r"\b6LO\b", r"Iface\s+([^\s]+)\s+", "Command", + pexpect.TIMEOUT], timeout=.2) + if res > 0: + break + candidate = None + # wait for a line in "help" + child.expect("reboot") + return candidate + + +def get_downstream_netif(child): + child.sendline("ifconfig") + child.sendline("help") # workaround to spot end of ifconfig output + candidate = None + while True: # Search for an interface that does _not_ contain 6LO flag + if candidate is None: + child.expect(r"Iface\s+([^\s]+)\s+") + candidate = child.match.group(1) + res = child.expect([r"\b6LO\b", r"Iface\s+([^\s]+)\s+", "Command", + pexpect.TIMEOUT], timeout=.2) + if res == 0: + break + elif res == 1: + candidate = child.match.group(1) + elif res == 2: + break + else: + candidate = None + # wait for a line in "help" + child.expect("reboot") + return candidate + + +def get_hwaddrs(child, netif): + hwaddrs = [] + child.sendline("ifconfig {}".format(netif)) + child.expect(r"HWaddr:\s+(([A-Fa-f0-9]{2}:?)+)\s") + hwaddrs.append(child.match.group(1).lower()) + if len(hwaddrs[0]) == 5: # short address + res = child.expect([pexpect.TIMEOUT, + r"Long HWaddr:\s+(([A-Fa-f0-9]{2}:?)+)\s"]) + if res > 0: + hwaddrs.append(child.match.group(1).lower()) + return hwaddrs + + +def start_sniffer(iface, count=None, stop_filter=None): + sniffer = AsyncSniffer( + iface=iface, + filter="udp and dst port 547", + count=count, + stop_filter=stop_filter, + ) + sniffer.start() + return sniffer + + +def wait_for_dhcpv6_pkt(iface, sniffer=None, timeout=5): + if sniffer is None: + sniffer = start_sniffer(iface, count=1) + sniffer.join(timeout=timeout) + if sniffer.results is None: + raise TimeoutError("Sniffing for DHCPv6 traffic timed out") + return [p for p in sniffer.results + # filter out packets only belonging to stop_filter if it existed + if sniffer.kwargs.get("stop_filter") is None or + sniffer.kwargs["stop_filter"](p)][-1] + + +def build_reply_headers(pkt): + src_ether = pkt[Ether].src + src_ip = pkt[IPv6].src + sport = pkt[UDP].sport + dport = pkt[UDP].dport + return Ether(dst=src_ether) / IPv6(dst=src_ip) / \ + UDP(sport=dport, dport=sport) + + +def testfunc(child): + iface = os.environ["TAP"] + + pkt = wait_for_dhcpv6_pkt(iface) + # the packet was a solicit + assert DHCP6_Solicit in pkt + # check if the sender is the upstream interface of the node + upstream_netif = get_upstream_netif(child) + print(upstream_netif) + upstream_hwaddrs = get_hwaddrs(child, upstream_netif) + assert DHCP6OptClientId in pkt and DUID_LL in pkt[DHCP6OptClientId].duid + assert pkt[DHCP6OptClientId].duid[DUID_LL].lladdr in upstream_hwaddrs + # and it is asking for a prefix delegation + assert DHCP6OptIA_PD in pkt + + # reply to solicit with advertise and a prefix provided + trid = pkt[DHCP6_Solicit].trid + srv_duid = "aa:bb:cc:dd:ee:ff" + cli_id = DHCP6OptClientId(duid=pkt[DHCP6OptClientId].duid) + srv_id = DHCP6OptServerId(duid=DUID_LL(lladdr=srv_duid)) + prefix = "2001:db8:{:x}:{:x}::".format( + random.randint(0, 0xffff), + random.randint(0, 0xffff) + ) + ia_pd = DHCP6OptIA_PD(T1=12000, T2=13000, iaid=pkt[DHCP6OptIA_PD].iaid, + iapdopt=[ + DHCP6OptIAPrefix(preflft=14000, validlft=15000, + prefix=prefix, plen=64)]) + # start sniffer to catch incoming request + sniffer = start_sniffer(iface, + stop_filter=lambda pkt: DHCP6_Request in pkt) + sendp(build_reply_headers(pkt) / DHCP6_Advertise(trid=trid) / + cli_id / srv_id / ia_pd, iface=iface, verbose=False) + + # wait for request + pkt = wait_for_dhcpv6_pkt(iface, sniffer) + # the packet was indeed a request + assert DHCP6_Request in pkt + # and from the client + assert DHCP6OptClientId in pkt and DUID_LL in pkt[DHCP6OptClientId].duid + assert pkt[DHCP6OptClientId].duid[DUID_LL].lladdr in upstream_hwaddrs + # and it is trying to talk to this server + assert DHCP6OptServerId in pkt and DUID_LL in pkt[DHCP6OptServerId].duid + assert pkt[DHCP6OptServerId].duid[DUID_LL].lladdr == srv_duid + # and is still asking for a prefix delegation + assert DHCP6OptIA_PD in pkt + + # reply to request with reply and a prefix provided + trid = pkt[DHCP6_Request].trid + sendp(build_reply_headers(pkt) / DHCP6_Reply(trid=trid) / + cli_id / srv_id / ia_pd, iface=iface, verbose=False) + time.sleep(1) + + # check if global address was configured + child.sendline("ifconfig {}".format(get_downstream_netif(child))) + # remove one trailing ':' from prefix just to be safe ;-) + child.expect(r"inet6 addr:\s+{}[0-9a-fA-F:]+\s" + .format(prefix[:-1])) + print("SUCCESS") + + +if __name__ == "__main__": + sys.exit(run(testfunc, timeout=TIMEOUT, echo=True))