1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-12-24 22:13:52 +01:00

Merge pull request #12440 from miri64/tests/fix/gnrc_rpl_srh

tests/gnrc_rpl_srh: fix test assumption
This commit is contained in:
Cenk Gündoğan 2019-10-14 14:43:30 +02:00 committed by GitHub
commit b4cb32a8ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 22 deletions

View File

@ -99,12 +99,10 @@ int gnrc_rpl_srh_process(ipv6_hdr_t *ipv6, gnrc_rpl_srh_t *rh, void **err_ptr)
if (ipv6_addr_is_multicast(&ipv6->dst)) {
DEBUG("RPL SRH: found a multicast destination address - discard\n");
*err_ptr = &ipv6->dst;
return GNRC_IPV6_EXT_RH_ERROR;
}
if (ipv6_addr_is_multicast(&addr)) {
DEBUG("RPL SRH: found a multicast addres in next address - discard\n");
*err_ptr = current_address;
return GNRC_IPV6_EXT_RH_ERROR;
}

View File

@ -85,7 +85,7 @@ static void test_rpl_srh_dst_multicast(void)
static const ipv6_addr_t mcast = IPV6_MCAST_ADDR;
gnrc_rpl_srh_t *srh;
uint8_t *vec;
void *err_ptr;
void *err_ptr = NULL;
int res;
_init_hdrs(&srh, &vec, &mcast);
@ -96,7 +96,7 @@ static void test_rpl_srh_dst_multicast(void)
res = gnrc_rpl_srh_process(&hdr, srh, &err_ptr);
TEST_ASSERT_EQUAL_INT(res, GNRC_IPV6_EXT_RH_ERROR);
TEST_ASSERT((&hdr.dst) == err_ptr);
TEST_ASSERT_NULL(err_ptr);
}
static void test_rpl_srh_route_multicast(void)
@ -106,7 +106,7 @@ static void test_rpl_srh_route_multicast(void)
static const ipv6_addr_t dst = IPV6_DST;
gnrc_rpl_srh_t *srh;
uint8_t *vec;
void *err_ptr;
void *err_ptr = NULL;
int res;
_init_hdrs(&srh, &vec, &dst);
@ -117,7 +117,7 @@ static void test_rpl_srh_route_multicast(void)
res = gnrc_rpl_srh_process(&hdr, srh, &err_ptr);
TEST_ASSERT_EQUAL_INT(res, GNRC_IPV6_EXT_RH_ERROR);
TEST_ASSERT(vec == err_ptr);
TEST_ASSERT_NULL(err_ptr);
}
static void test_rpl_srh_too_many_seg_left(void)

View File

@ -13,7 +13,7 @@ import sys
import subprocess
import threading
from scapy.all import Ether, IPv6, \
from scapy.all import Ether, IPv6, UDP, \
IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \
IPv6ExtHdrFragment, IPv6ExtHdrRouting, \
ICMPv6ParamProblem, ICMPv6TimeExceeded, \
@ -194,29 +194,37 @@ def test_seg_left_gt_len_addresses(child, iface, hw_dst, ll_dst, ll_src):
def test_multicast_dst(child, iface, hw_dst, ll_dst, ll_src):
# sniffing for ICMPv6 parameter problem message
sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem))
sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem) or
(p.haslayer(UDP) and (p[IPv6].dst != "ff02::1")))
# send routing header with multicast destination
sendp(Ether(dst=hw_dst) / IPv6(dst="ff02::1", src=ll_src) /
IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]),
iface=iface, verbose=0)
IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]) /
UDP(dport=2606), iface=iface, verbose=0)
ps = sniffer.wait_for_sniff_results()
p = [p for p in ps if ICMPv6ParamProblem in p]
assert(len(p) > 0)
p = p[0]
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
assert(p[ICMPv6ParamProblem].ptr == 24) # IPv6 headers destination field
p = [p for p in ps if (ICMPv6ParamProblem in p) or
((UDP in p) and (p[UDP].dport == 2606) and
(p[IPv6].dst != "ff02::1"))]
# packet should be discarded silently:
# see https://tools.ietf.org/html/rfc6554#section-4.2
assert(len(p) == 0)
pktbuf_empty(child)
def test_multicast_addr(child, iface, hw_dst, ll_dst, ll_src):
# sniffing for ICMPv6 parameter problem message
sniffer.start_sniff(lambda p: p.haslayer(ICMPv6ParamProblem) or
(p.haslayer(UDP) and (p[IPv6].dst != ll_dst)))
# Send routing header with multicast address in its destinations
p = srp1(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
IPv6ExtHdrRouting(type=3, segleft=1, addresses=["ff02::1"]),
iface=iface, timeout=1, verbose=0)
assert(p is not None)
assert(ICMPv6ParamProblem in p)
assert(p[ICMPv6ParamProblem].code == 0) # erroneous header field encountered
assert(p[ICMPv6ParamProblem].ptr == 48) # first address in routing header
sendp(Ether(dst=hw_dst) / IPv6(dst=ll_dst, src=ll_src) /
IPv6ExtHdrRouting(type=3, segleft=1, addresses=["abcd::1"]) /
UDP(dport=2606), iface=iface, verbose=0)
ps = sniffer.wait_for_sniff_results()
p = [p for p in ps if (ICMPv6ParamProblem in p) or
((UDP in p) and (p[UDP].dport == 2606) and
(p[IPv6].dst != ll_dst))]
# packet should be discarded silently:
# see https://tools.ietf.org/html/rfc6554#section-4.2
assert(len(p) == 0)
pktbuf_empty(child)