1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-12-15 01:23:49 +01:00

gnrc_tcp: refactor tests

This commit is contained in:
Simon Brummer 2021-05-08 14:36:38 +02:00
parent 79ee4fd489
commit 876c24d1cc
13 changed files with 759 additions and 1084 deletions

View File

@ -1,42 +1,7 @@
Test description
==========
The entire GNRC TCP test consists of several test cases. Each test is ran via its own python script
in the tests directory.
1) 01-conn_lifecycle_as_client.py
This test covers TCP connection establishment and teardown with GNRC_TCP acting as tcp client.
2) 02-conn_lifecycle_as_server.py
This test covers TCP connection establishment and teardown with GNRC_TCP acting as tcp server.
3) 03-send_data.py
This test covers sending of a byte stream from GNRC_TCP to the host system.
The amount of data to send is large enough to force GNRC_TCP to split the given stream into
multiple packets.
4) 04-receive_data.py
This test covers receiving of a byte stream from the host system. The received data is
causing window opening and closing as well as data transmission over multiple packets.
5) 05-garbage-pkts.py
This test mostly is a regression test for issues that were found through fuzzing. It uses
`scapy` to interact with the node.
6) 06-receive_data_closed_conn.py
This test covers accessing received data after receiving a FIN packet. If the connection was closed
by the peer, a call to gnrc_tcp_recv must return directly with all currently received data
or zero if there is no data. The function must return immediately despite any given timeout.
7) 07-endpoint_construction.py
This test ensures the correctness of the endpoint construction.
8) 08-return_codes.py
This test tries to trigger all documented return codes from GNRC_TCPs functions.
9) 09-listen_accept_cycle.py
This test verifies that connection establishment via listen and accept can be repeated multiple
times.
The GNRC TCP test test all phases of a tcp connections lifecycle as a server or a client
as well as TCP behavior on incoming malformed packets.
Setup
==========

View File

@ -1,51 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
import threading
from testrunner import run
from shared_func import TcpServer, generate_port_number, get_host_tap_device, \
get_host_ll_addr, get_riot_if_id, verify_pktbuf_empty, \
sudo_guard
def tcp_server(port, shutdown_event):
with TcpServer(port, shutdown_event):
pass
def testfunc(child):
port = generate_port_number()
shutdown_event = threading.Event()
server_handle = threading.Thread(target=tcp_server, args=(port, shutdown_event))
server_handle.start()
target_addr = get_host_ll_addr(get_host_tap_device()) + '%' + get_riot_if_id(child)
# Setup RIOT Node to connect to host systems TCP Server
child.sendline('gnrc_tcp_tcb_init')
child.sendline('gnrc_tcp_open [{}]:{} 0'.format(target_addr, str(port)))
child.expect_exact('gnrc_tcp_open: returns 0')
# Close connection and verify that pktbuf is cleared
shutdown_event.set()
child.sendline('gnrc_tcp_close')
server_handle.join()
verify_pktbuf_empty(child)
print(os.path.basename(sys.argv[0]) + ': success')
if __name__ == '__main__':
sudo_guard()
sys.exit(run(testfunc, timeout=5, echo=False, traceback=True))

View File

@ -0,0 +1,458 @@
#!/usr/bin/env python3
# Copyright (C) 2021 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
import random
import pexpect
import base64
from scapy.all import Ether, IPv6, TCP, raw, sendp
from helpers import Runner, RiotTcpServer, RiotTcpClient, HostTcpServer, HostTcpClient, \
generate_port_number, sudo_guard
@Runner(timeout=5)
def test_connection_lifecycle_as_client(child):
""" Open/close a single connection as tcp client """
# Setup Host as server
with HostTcpServer(generate_port_number()) as host_srv:
# Setup Riot as client
with RiotTcpClient(child, host_srv):
# Accept and close connection
host_srv.accept()
host_srv.close()
@Runner(timeout=10)
def test_connection_lifecycle_as_server(child):
""" Open/close a single connection as tcp server """
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Setup Host as client
with HostTcpClient(riot_srv):
# Accept and close connection
riot_srv.accept(timeout_ms=1000)
riot_srv.close()
@Runner(timeout=5)
def test_send_data_from_riot_to_host(child):
""" Send Data from RIOT Node to Host system """
# Setup Host as server
with HostTcpServer(generate_port_number()) as host_srv:
# Setup Riot as client
with RiotTcpClient(child, host_srv) as riot_cli:
# Accept and close connection
host_srv.accept()
# Send Data from RIOT to Host system and verify reception
data = '0123456789' * 200
riot_cli.send(timeout_ms=0, payload_to_send=data)
host_srv.receive(data)
# Teardown connection
host_srv.close()
@Runner(timeout=5)
def test_send_data_from_host_to_riot(child):
""" Send Data from Host system to RIOT node """
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Setup Host as client
with HostTcpClient(riot_srv) as host_cli:
# Accept and close connection
riot_srv.accept(timeout_ms=1000)
# Send Data from Host system to RIOT
data = '0123456789' * 200
host_cli.send(data)
riot_srv.receive(timeout_ms=1000, sent_payload=data)
riot_srv.close()
@Runner(timeout=5)
def test_gnrc_tcp_garbage_packets_short_payload(child):
""" Receive unusually short payload with timeout. Verifies fix for
https://github.com/RIOT-OS/RIOT/issues/11999
"""
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Setup Host as client
with HostTcpClient(riot_srv) as host_cli:
# Accept new connection
riot_srv.accept(timeout_ms=2000)
# Receive 1 byte with timeout, this should block
child.sendline('gnrc_tcp_recv 1000000 1')
child.expect_exact('gnrc_tcp_recv: argc=3, '
'argv[0] = gnrc_tcp_recv, '
'argv[1] = 1000000, argv[2] = 1')
# Send 1 byte from host to RIOT
assert 1 == host_cli.sock.send(b"f")
# Receive 1 byte
child.expect_exact('gnrc_tcp_recv: received 1', timeout=20)
# Close connection
riot_srv.close()
@Runner(timeout=5)
def test_gnrc_tcp_garbage_packets_short_header(child):
""" This test verifies fix malformed tcp header. See
https://github.com/RIOT-OS/RIOT/issues/12086
"""
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Construct HostTcpClient to lookup node properties
host_cli = HostTcpClient(riot_srv)
# Try to accept incoming connection from host system.
child.sendline('gnrc_tcp_accept 2000')
# Build malformed short header with SYN Flag
tcp_hdr = TCP(
dport=int(riot_srv.listen_port), flags="S", sport=2342, seq=1, dataofs=6
)
tcp_hdr = raw(tcp_hdr)[:-2]
sendp(
Ether(dst=riot_srv.mac) / IPv6(src=host_cli.address, dst=riot_srv.address) /
tcp_hdr, iface=host_cli.interface, verbose=0
)
# Establish normal tcp connection from host system to check if RIOT node
# was able to recover from malformed packet
with host_cli:
child.expect_exact('gnrc_tcp_accept: returns 0')
riot_srv.close()
@Runner(timeout=5)
def test_gnrc_tcp_garbage_packets_ack_instead_of_sym(child):
""" This test verfies that sending and ACK instead of a SYN.
doesn't break GNRC_TCP.
"""
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Construct HostTcpClient to lookup node properties
host_cli = HostTcpClient(riot_srv)
# Try to accept incoming connection from host system.
# Use timeout of 15s discarding 1000 packages can take a while on smaller platforms
child.sendline('gnrc_tcp_accept 15000')
# Check if debug output is enabled. Send fewer packets on if it is disabled
# To ensure that the amount of generated output doesn't break the test
debug = child.expect(
[pexpect.TIMEOUT, r'GNRC_TCP: Enter "\S+", File: .+\(\d+\)\s'], timeout=1
)
if debug:
count = 10
else:
count = 1000
# see https://github.com/RIOT-OS/RIOT/pull/12001
provided_data = base64.b64decode("rwsQf2pekYLaU+exUBBwgPDKAAA=")
tcp_hdr = TCP(provided_data)
assert provided_data == raw(tcp_hdr)
# set destination port to application specific port
tcp_hdr.dport = int(riot_srv.listen_port)
sendp(
Ether(dst=riot_srv.mac) / IPv6(src=host_cli.address, dst=riot_srv.address) /
tcp_hdr, iface=host_cli.interface, verbose=0, count=count
)
# check if server actually still works
with host_cli:
child.expect_exact('gnrc_tcp_accept: returns 0')
riot_srv.close()
@Runner(timeout=5, skip=False)
def test_gnrc_tcp_garbage_packets_option_parsing(child):
""" This test verfies that malformed option don't break TCP
doesn't break GNRC_TCP. See: https://github.com/RIOT-OS/RIOT/issues/12086
"""
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Construct HostTcpClient to lookup node properties
host_cli = HostTcpClient(riot_srv)
# Try to accept incoming connection from host system.
child.sendline('gnrc_tcp_accept 2000')
tcp_hdr = TCP(
dport=int(riot_srv.listen_port), flags="S", sport=2342, seq=1, dataofs=6
)
sendp(
Ether(dst=riot_srv.mac) / IPv6(src=host_cli.address, dst=riot_srv.address) /
tcp_hdr / b"\x50\x00\x00\x00", iface=host_cli.interface, verbose=0
)
# check if server actually still works
with host_cli:
child.expect_exact('gnrc_tcp_accept: returns 0')
riot_srv.close()
@Runner(timeout=5)
def test_gnrc_tcp_recv_behavior_on_closed_connection(child):
""" This test ensures that a gnrc_tcp_recv doesn't block if a connection
was closed already.
"""
# Setup Host as server
with HostTcpServer(generate_port_number()) as host_srv:
# Setup Riot as client
with RiotTcpClient(child, host_srv) as riot_cli:
# Accept connection
host_srv.accept()
# Transmit data to RIOT client and close connection from host side
data = 'test_data_'
host_srv.send(payload_to_send=data)
host_srv.close()
# Read half amount of data with huge timeout.
# Expectency: direct with verified test data. Timeout doesn't matter
half_data_len = int(len(data) / 2)
huge_timeout_ms = 1000000000
no_timeout_ms = 0
riot_cli.receive(timeout_ms=huge_timeout_ms, sent_payload=data[:half_data_len])
# Read half amount of data without timeout.
# Expectency: direct return with verified test data
riot_cli.receive(timeout_ms=no_timeout_ms, sent_payload=data[half_data_len:])
# All received data is read, try to read 1 byte with timeout.
# Expectency: fast return despite timeout since the connection is already closed
child.sendline('gnrc_tcp_recv {} 1'.format(huge_timeout_ms))
child.expect_exact('gnrc_tcp_recv: returns 0')
# All received data is read, try to read 1 byte without timeout.
# Expectency: fast return since the connection is already closed
child.sendline('gnrc_tcp_recv {} 1'.format(no_timeout_ms))
child.expect_exact('gnrc_tcp_recv: returns 0')
@Runner(timeout=1)
def test_gnrc_tcp_ep_from_str(child):
""" Verify Endpoint construction from string """
# Build funspec
valid = '[::]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: ::')
child.expect_exact('Port: 0')
child.expect_exact('Netif: 0')
# Build unspec with port
valid = '[::]:8080'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: ::')
child.expect_exact('Port: 8080')
child.expect_exact('Netif: 0')
# Build unspec with interface and port
valid = '[::%5]:8080'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: ::')
child.expect_exact('Port: 8080')
child.expect_exact('Netif: 5')
# Build addr, interface and port
valid = '[fe80::68bf:dbff:fe05:c35e%5]:8080'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: fe80::68bf:dbff:fe05:c35e')
child.expect_exact('Port: 8080')
child.expect_exact('Netif: 5')
@Runner(timeout=0.5)
def test_gnrc_tcp_ep_from_str_returns_EINVAL(child):
""" Verify faulty endpoint construction from string return -EINVAL"""
# Error: No brackets
invalid = '::'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error: Too much brackets
invalid = '[:]:]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error: Swapped bracket order
invalid = ']::['
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error: Empty brackets
invalid = '[]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error First char before brackets
invalid = 'a[]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error token but no port
invalid = '[::]:'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error invalid char in port
invalid = '[::]:103f2'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error port out of range
invalid = '[::]:65536'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error interface after bracket
invalid = '[::]%5'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error interface token bur no interface
invalid = '[::%]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error: invalid interface
invalid = '[::%5a]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
# Error: address to short
invalid = '[:%5]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
@Runner(timeout=0.5)
def test_gnrc_tcp_accept_returns_EAGAIN(child):
""" gnrc_tcp_accept must return with -EAGAIN
if no connection is ready and timeout is 0
"""
riot_srv = RiotTcpServer(child, generate_port_number())
riot_srv.listen()
child.sendline('gnrc_tcp_accept 0')
child.expect_exact('gnrc_tcp_accept: returns -EAGAIN')
@Runner(timeout=1.5)
def test_gnrc_tcp_accept_returns_ETIMEDOUT(child):
""" gnrc_tcp_accept must return with -ETIMEDOUT
if no connection is ready and timeout is not 0
"""
riot_srv = RiotTcpServer(child, generate_port_number())
riot_srv.listen()
child.sendline('gnrc_tcp_accept 1000')
child.expect_exact('gnrc_tcp_accept: returns -ETIMEDOUT')
@Runner(timeout=1)
def test_gnrc_tcp_accept_returns_ENOMEM(child):
""" gnrc_tcp_accept must return with -ENOMEM
if all TCBs already handle a connection
"""
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Establish connection to ensure that all TCBs are in use
with HostTcpClient(riot_srv):
riot_srv.accept(timeout_ms=0)
# Out of memory accept should return immediately despite a huge timeout.
child.sendline('gnrc_tcp_accept 100000000')
child.expect_exact('gnrc_tcp_accept: returns -ENOMEM')
@Runner(timeout=10)
def test_connection_listen_accept_cycle(child, iterations=10):
""" This test verifies gnrc_tcp in a typical server role by
accepting a connection, exchange data, teardown connection, handle the next one
"""
# Setup RIOT Node as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Establish multiple connections iterativly
for i in range(iterations):
print('\n Running listen/accept iteration {}'.format(i), end='')
with HostTcpClient(riot_srv) as host_cli:
# Accept connection from host system
riot_srv.accept(timeout_ms=0)
# Send data from host to RIOT
data = '0123456789'
host_cli.send(payload_to_send=data)
riot_srv.receive(timeout_ms=500, sent_payload=data)
# Send data from RIOT to host
riot_srv.send(timeout_ms=500, payload_to_send=data)
host_cli.receive(sent_payload=data)
# Randomize connection teardown: The connections
# can't be either closed or aborted from either
# side. Regardless the type of the connection teardown
# riot_srv must be able to accept the next connection
# Note: python sockets don't offer abort...
dice_throw = random.randint(0, 3)
if dice_throw == 0:
riot_srv.close()
host_cli.close()
elif dice_throw == 1:
riot_srv.abort()
host_cli.close()
elif dice_throw == 2:
host_cli.close()
riot_srv.close()
elif dice_throw == 3:
host_cli.close()
riot_srv.abort()
if __name__ == '__main__':
sudo_guard(uses_scapy=True)
# Read and run all test functions.
script = sys.modules[__name__]
tests = [getattr(script, t) for t in script.__dict__
if type(getattr(script, t)).__name__ == 'function'
and t.startswith('test_')]
for test in tests:
res = test()
if (res != 0):
sys.exit(res)
print('\n' + os.path.basename(sys.argv[0]) + ': success\n')

View File

@ -1,42 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
from shared_func import Runner, RiotTcpServer, HostTcpClient, generate_port_number, \
sudo_guard
@Runner(timeout=10)
def test_lifecycle_as_server(child):
""" Open/close a single connection as tcp server """
# Setup RIOT as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Setup Host as client
with HostTcpClient(riot_srv.addr, riot_srv.listen_port):
# Accept and close connection
riot_srv.accept(timeout_ms=1000)
riot_srv.close()
if __name__ == '__main__':
sudo_guard()
# Read and run all test functions.
script = sys.modules[__name__]
tests = [getattr(script, t) for t in script.__dict__
if type(getattr(script, t)).__name__ == 'function'
and t.startswith('test_')]
for test in tests:
res = test()
if (res != 0):
sys.exit(res)
print(os.path.basename(sys.argv[0]) + ': success\n')

View File

@ -1,63 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
import threading
from testrunner import run
from shared_func import TcpServer, generate_port_number, get_host_tap_device, \
get_host_ll_addr, get_riot_if_id, setup_internal_buffer, \
write_data_to_internal_buffer, verify_pktbuf_empty, \
sudo_guard
def tcp_server(port, shutdown_event, expected_data):
with TcpServer(port, shutdown_event) as tcp_srv:
assert tcp_srv.recv(len(expected_data)) == expected_data
def testfunc(child):
port = generate_port_number()
shutdown_event = threading.Event()
# Try to send 2000 byte from RIOT to the Host System.
data = '0123456789' * 200
data_len = len(data)
# Verify that RIOT Applications internal buffer can hold test data.
assert setup_internal_buffer(child) >= data_len
server_handle = threading.Thread(target=tcp_server, args=(port, shutdown_event, data))
server_handle.start()
target_addr = get_host_ll_addr(get_host_tap_device()) + '%' + get_riot_if_id(child)
# Setup RIOT Node to connect to host systems TCP Server
child.sendline('gnrc_tcp_tcb_init')
child.sendline('gnrc_tcp_open [{}]:{} 0'.format(target_addr, str(port)))
child.expect_exact('gnrc_tcp_open: returns 0')
# Send data from RIOT Node to Linux
write_data_to_internal_buffer(child, data)
child.sendline('gnrc_tcp_send 0')
child.expect_exact('gnrc_tcp_send: sent ' + str(data_len))
# Close connection and verify that pktbuf is cleared
shutdown_event.set()
child.sendline('gnrc_tcp_close')
server_handle.join()
verify_pktbuf_empty(child)
print(os.path.basename(sys.argv[0]) + ': success')
if __name__ == '__main__':
sudo_guard()
sys.exit(run(testfunc, timeout=5, echo=False, traceback=True))

View File

@ -1,65 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
import threading
from testrunner import run
from shared_func import TcpServer, generate_port_number, get_host_tap_device, \
get_host_ll_addr, get_riot_if_id, setup_internal_buffer, \
read_data_from_internal_buffer, verify_pktbuf_empty, \
sudo_guard
def tcp_server(port, shutdown_event, data):
with TcpServer(port, shutdown_event) as tcp_srv:
tcp_srv.send(data)
def testfunc(child):
port = generate_port_number()
shutdown_event = threading.Event()
# Try to receive 2000 bytes sent from the Host System.
data = '0123456789' * 200
data_len = len(data)
# Verify that RIOT Applications internal buffer can hold test data.
assert setup_internal_buffer(child) >= data_len
server_handle = threading.Thread(target=tcp_server, args=(port, shutdown_event, data))
server_handle.start()
target_addr = get_host_ll_addr(get_host_tap_device()) + '%' + get_riot_if_id(child)
# Setup RIOT Node to connect to Hostsystems TCP Server
child.sendline('gnrc_tcp_tcb_init')
child.sendline('gnrc_tcp_open [{}]:{} 0'.format(target_addr, str(port)))
child.expect_exact('gnrc_tcp_open: returns 0')
# Accept Data sent by the host system
child.sendline('gnrc_tcp_recv 1000000 ' + str(data_len))
child.expect_exact('gnrc_tcp_recv: received ' + str(data_len), timeout=20)
# Close connection and verify that pktbuf is cleared
shutdown_event.set()
child.sendline('gnrc_tcp_close')
server_handle.join()
verify_pktbuf_empty(child)
# Verify received Data
assert read_data_from_internal_buffer(child, data_len) == data
print(os.path.basename(sys.argv[0]) + ': success')
if __name__ == '__main__':
sudo_guard()
sys.exit(run(testfunc, timeout=5, echo=False, traceback=True))

View File

@ -1,160 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2019 Freie Universität Berlin
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import base64
import pexpect
import os
import socket
import sys
from scapy.all import Ether, IPv6, TCP, raw, \
sendp
from testrunner import run
from shared_func import sudo_guard, get_host_tap_device, get_host_ll_addr, \
get_riot_if_id, get_riot_l2_addr, get_riot_ll_addr, \
generate_port_number, verify_pktbuf_empty
def testfunc(func):
def runner(child):
tap = get_host_tap_device()
host_ll = get_host_ll_addr(tap)
dst_if = get_riot_if_id(child)
dst_ll = get_riot_ll_addr(child)
dst_l2 = get_riot_l2_addr(child)
port = generate_port_number()
# Setup RIOT Node wait for incoming connections from host system
child.sendline('gnrc_tcp_tcb_init')
child.expect_exact('gnrc_tcp_tcb_init: argc=1, argv[0] = gnrc_tcp_tcb_init')
child.sendline('gnrc_tcp_listen [::]:{}'.format(port))
child.expect(r'gnrc_tcp_listen: argc=2, '
r'argv\[0\] = gnrc_tcp_listen, '
r'argv\[1\] = \[::\]:(\d+)\r\n')
assert int(child.match.group(1)) == port
child.sendline('gnrc_tcp_accept 15000')
try:
print("- {} ".format(func.__name__), end="")
if child.logfile == sys.stdout:
func(child, tap, host_ll, dst_if, dst_l2, dst_ll, port)
print("")
else:
try:
func(child, tap, host_ll, dst_if, dst_l2, dst_ll, port)
print("SUCCESS")
except Exception as e:
print("FAILED")
raise e
finally:
child.sendline('gnrc_tcp_close')
child.sendline('gnrc_tcp_stop_listen')
return runner
@testfunc
def test_short_payload(child, src_if, src_ll, dst_if, dst_l2, dst_ll, dst_port):
# see https://github.com/RIOT-OS/RIOT/issues/11999
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.settimeout(child.timeout)
addr_info = socket.getaddrinfo(dst_ll + '%' + src_if, dst_port,
type=socket.SOCK_STREAM)
sock.connect(addr_info[0][-1])
child.expect_exact('gnrc_tcp_accept: returns 0')
child.sendline('gnrc_tcp_recv 1000000 1')
child.expect_exact('gnrc_tcp_recv: argc=3, '
'argv[0] = gnrc_tcp_recv, '
'argv[1] = 1000000, argv[2] = 1')
assert 1 == sock.send(b"f")
child.expect_exact('gnrc_tcp_recv: received 1', timeout=20)
verify_pktbuf_empty(child)
@testfunc
def test_short_header(child, src_if, src_ll, dst_if, dst_l2, dst_ll, dst_port):
# see https://github.com/RIOT-OS/RIOT/issues/12086
tcp_hdr = TCP(dport=dst_port, flags="S", sport=2342, seq=1, dataofs=6)
# shorten header
tcp_hdr = raw(tcp_hdr)[:-2]
sendp(Ether(dst=dst_l2) / IPv6(src=src_ll, dst=dst_ll) / tcp_hdr,
iface=src_if, verbose=0)
# let server command return to later check packet buffer
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.settimeout(child.timeout)
addr_info = socket.getaddrinfo(dst_ll + '%' + src_if, dst_port,
type=socket.SOCK_STREAM)
sock.connect(addr_info[0][-1])
child.expect_exact('gnrc_tcp_accept: returns 0')
verify_pktbuf_empty(child)
@testfunc
def test_send_ack_instead_of_syn(child, src_if, src_ll,
dst_if, dst_l2, dst_ll, dst_port):
# check if any debug output is generated during `@testfunc` decorator
# execution. If so, send fewer packets to prevent the target node
# from being overwhelmed by packets and output.
debug = child.expect([pexpect.TIMEOUT,
r'GNRC_TCP: Enter "\S+", File: .+\(\d+\)\s'], timeout=1)
if debug:
count = 10
else:
count = 1000
# see https://github.com/RIOT-OS/RIOT/pull/12001
provided_data = base64.b64decode("rwsQf2pekYLaU+exUBBwgPDKAAA=")
tcp_hdr = TCP(provided_data)
assert provided_data == raw(tcp_hdr)
# set destination port to application specific port
tcp_hdr.dport = dst_port
sendp(Ether(dst=dst_l2) / IPv6(src=src_ll, dst=dst_ll) / tcp_hdr,
iface=src_if, verbose=0, count=count)
# check if server actually still works
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.settimeout(child.timeout)
addr_info = socket.getaddrinfo(dst_ll + '%' + src_if, dst_port,
type=socket.SOCK_STREAM)
sock.connect(addr_info[0][-1])
child.expect_exact('gnrc_tcp_accept: returns 0')
verify_pktbuf_empty(child)
@testfunc
def test_option_parsing_term(child, src_if, src_ll,
dst_if, dst_l2, dst_ll, dst_port):
# see https://github.com/RIOT-OS/RIOT/issues/12086
tcp_hdr = TCP(dport=dst_port, flags="S", sport=2342, seq=1, dataofs=6)
sendp(Ether(dst=dst_l2) / IPv6(src=src_ll, dst=dst_ll) / tcp_hdr /
b"\x50\x00\x00\x00", iface=src_if, verbose=0)
# check if server actually still works
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.settimeout(child.timeout)
addr_info = socket.getaddrinfo(dst_ll + '%' + src_if, dst_port,
type=socket.SOCK_STREAM)
sock.connect(addr_info[0][-1])
child.expect_exact('gnrc_tcp_accept: returns 0')
verify_pktbuf_empty(child)
if __name__ == "__main__":
sudo_guard(uses_scapy=True)
script = sys.modules[__name__]
tests = [getattr(script, t) for t in script.__dict__
if type(getattr(script, t)).__name__ == "function"
and t.startswith("test_")]
for test in tests:
res = run(test, timeout=10, echo=False)
if res != 0:
sys.exit(res)
print(os.path.basename(sys.argv[0]) + ": success\n")

View File

@ -1,84 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2019 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import os
import sys
import threading
from testrunner import run
from shared_func import TcpServer, generate_port_number, get_host_tap_device, \
get_host_ll_addr, get_riot_if_id, setup_internal_buffer, \
read_data_from_internal_buffer, verify_pktbuf_empty, \
sudo_guard
def tcp_server(port, shutdown_event, data):
with TcpServer(port, shutdown_event) as tcp_srv:
tcp_srv.send(data)
def testfunc(child):
port = generate_port_number()
shutdown_event = threading.Event()
# Try to receive 10 bytes sent from the Host System.
data = 'test_data_'
data_len = len(data)
assert (data_len % 2) == 0
# Verify that RIOT Applications internal buffer can hold test data.
assert setup_internal_buffer(child) >= data_len
server_handle = threading.Thread(target=tcp_server, args=(port, shutdown_event, data))
server_handle.start()
target_addr = get_host_ll_addr(get_host_tap_device()) + '%' + get_riot_if_id(child)
# Setup RIOT Node to connect to Hostsystems TCP Server
child.sendline('gnrc_tcp_tcb_init')
child.sendline('gnrc_tcp_open [{}]:{} 0'.format(target_addr, str(port)))
child.expect_exact('gnrc_tcp_open: returns 0')
# Initiate connection teardown from test host
shutdown_event.set()
half_data_len = int(data_len / 2)
# Read half the test data with timeout. Verify Buffer contents
child.sendline('gnrc_tcp_recv 1000000 ' + str(half_data_len))
child.expect_exact('gnrc_tcp_recv: received ' + str(half_data_len))
assert read_data_from_internal_buffer(child, half_data_len) == data[:half_data_len]
# Read half the test data without timeout. Verify Buffer contents
child.sendline('gnrc_tcp_recv 0 ' + str(half_data_len))
child.expect_exact('gnrc_tcp_recv: received ' + str(half_data_len))
assert read_data_from_internal_buffer(child, half_data_len) == data[half_data_len:]
# Buffer should have been read entirely and the connection was closed, there can be no new data.
# Reading with a timeout must return 0 not -ETIMEOUT
child.sendline('gnrc_tcp_recv 1000000 ' + str(half_data_len))
child.expect_exact('gnrc_tcp_recv: returns 0')
# Reading without a timeout must return 0 not -EAGAIN.
child.sendline('gnrc_tcp_recv 0 ' + str(half_data_len))
child.expect_exact('gnrc_tcp_recv: returns 0')
# Close connection
child.sendline('gnrc_tcp_close')
server_handle.join()
verify_pktbuf_empty(child)
# Verify received Data
print(os.path.basename(sys.argv[0]) + ': success')
if __name__ == '__main__':
sudo_guard()
sys.exit(run(testfunc, timeout=5, echo=False, traceback=True))

View File

@ -1,155 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2020 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import sys
import os
from testrunner import run
from shared_func import sudo_guard
def test_build_unspec(child):
valid = '[::]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: ::')
child.expect_exact('Port: 0')
child.expect_exact('Netif: 0')
def test_build_unspec_with_port(child):
valid = '[::]:8080'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: ::')
child.expect_exact('Port: 8080')
child.expect_exact('Netif: 0')
def test_build_unspec_with_interface_and_port(child):
valid = '[::%5]:8080'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: ::')
child.expect_exact('Port: 8080')
child.expect_exact('Netif: 5')
def test_build_addr_with_interface_and_port(child):
valid = '[fe80::68bf:dbff:fe05:c35e%5]:8080'
child.sendline('gnrc_tcp_ep_from_str {}'.format(valid))
child.expect_exact('gnrc_tcp_ep_from_str: returns 0')
child.expect_exact('Family: AF_INET6')
child.expect_exact('Addr: fe80::68bf:dbff:fe05:c35e')
child.expect_exact('Port: 8080')
child.expect_exact('Netif: 5')
def test_einval_no_brackets(child):
invalid = '::'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_much_brackets(child):
invalid = '[:]:]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_swaped_brackets(child):
invalid = ']::['
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_no_addr_in_brackets(child):
invalid = '[]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_first_char_no_open_bracket(child):
invalid = 'a[]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_port_token_but_no_number(child):
invalid = '[::]:'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_port_inval_char(child):
invalid = '[::]:103f2'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_port_out_of_range(child):
invalid = '[::]:65536'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_interface_id_after_bracket(child):
invalid = '[::]%5'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_interface_id_token_but_no_number(child):
invalid = '[::%]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_interface_id_inval_char(child):
invalid = '[::%5a]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def test_einval_interface_addr_to_short(child):
invalid = '[:%5]'
child.sendline('gnrc_tcp_ep_from_str {}'.format(invalid))
child.expect_exact('gnrc_tcp_ep_from_str: returns -EINVAL')
def main(child):
# Read and run all test functions.
script = sys.modules[__name__]
tests = [getattr(script, t) for t in script.__dict__
if type(getattr(script, t)).__name__ == "function"
and t.startswith("test_")]
res = 0
for test in tests:
try:
test(child)
print('- {} SUCCESS'.format(test.__name__))
except Exception:
res = -1
print('- {} FAILED'.format(test.__name__))
return res
if __name__ == '__main__':
sudo_guard()
res = run(main, timeout=0.5, echo=False)
if res != 0:
sys.exit(res)
print(os.path.basename(sys.argv[0]) + ": success\n")

View File

@ -1,69 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2021 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import sys
import os
from shared_func import Runner, RiotTcpServer, HostTcpClient, generate_port_number, \
sudo_guard
@Runner(timeout=0.5)
def test_gnrc_tcp_accept_returns_EAGAIN(child):
""" gnrc_tcp_accept must return with -EAGAIN
if no connection is ready and timeout is 0
"""
riot_srv = RiotTcpServer(child, generate_port_number())
riot_srv.listen()
child.sendline('gnrc_tcp_accept 0')
child.expect_exact('gnrc_tcp_accept: returns -EAGAIN')
@Runner(timeout=1.5)
def test_gnrc_tcp_accept_returns_ETIMEDOUT(child):
""" gnrc_tcp_accept must return with -ETIMEDOUT
if no connection is ready and timeout is not 0
"""
riot_srv = RiotTcpServer(child, generate_port_number())
riot_srv.listen()
child.sendline('gnrc_tcp_accept 1000')
child.expect_exact('gnrc_tcp_accept: returns -ETIMEDOUT')
@Runner(timeout=1)
def test_gnrc_tcp_accept_returns_ENOMEM(child):
""" gnrc_tcp_accept must return with -ENOMEM
if all TCBs already handle a connection
"""
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Establish connection to ensure that all TCBs are in use
with HostTcpClient(riot_srv.addr, riot_srv.listen_port):
riot_srv.accept(timeout_ms=0)
# Faulty accept should return immediately despite a huge timeout.
child.sendline('gnrc_tcp_accept 100000000')
child.expect_exact('gnrc_tcp_accept: returns -ENOMEM')
if __name__ == '__main__':
sudo_guard()
# Read and run all test functions.
script = sys.modules[__name__]
tests = [getattr(script, t) for t in script.__dict__
if type(getattr(script, t)).__name__ == 'function'
and t.startswith('test_')]
for test in tests:
res = test()
if (res != 0):
sys.exit(res)
print(os.path.basename(sys.argv[0]) + ': success\n')

View File

@ -1,78 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2021 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import sys
import os
import random
from shared_func import Runner, RiotTcpServer, HostTcpClient, generate_port_number, \
sudo_guard
@Runner(timeout=10)
def test_listen_accept_cycle(child, iterations=10):
""" This test verifies gnrc_tcp in a typical server role by
accepting a connection, exchange data, teardown connection, handle the next one
"""
# Setup RIOT Node as server
with RiotTcpServer(child, generate_port_number()) as riot_srv:
# Establish multiple connections iterativly
for i in range(iterations):
print(' Running iteration {}'.format(i))
with HostTcpClient(riot_srv.addr, riot_srv.listen_port) as host_cli:
# Accept connection from host system
riot_srv.accept(timeout_ms=0)
# Send data from host to RIOT
data = '0123456789'
host_cli.send(payload_to_send=data)
riot_srv.receive(timeout_ms=500, sent_payload=data)
# Send data from RIOT to host
riot_srv.send(timeout_ms=500, payload_to_send=data)
host_cli.receive(sent_payload=data)
# Randomize connection teardown: The connections
# can't be either closed or aborted from either
# side. Regardless type of the connection teardown
# riot_srv must be able to accept the next connection
# Note: python sockets don't offer abort...
dice_throw = random.randint(0, 3)
if dice_throw == 0:
riot_srv.close()
host_cli.close()
elif dice_throw == 1:
riot_srv.abort()
host_cli.close()
elif dice_throw == 2:
host_cli.close()
riot_srv.close()
elif dice_throw == 3:
host_cli.close()
riot_srv.abort()
if __name__ == '__main__':
sudo_guard()
# Read and run all test functions.
script = sys.modules[__name__]
tests = [getattr(script, t) for t in script.__dict__
if type(getattr(script, t)).__name__ == 'function'
and t.startswith('test_')]
for test in tests:
res = test()
if (res != 0):
sys.exit(res)
print(os.path.basename(sys.argv[0]) + ': success\n')

View File

@ -0,0 +1,299 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import sys
import os
import re
import socket
import random
import testrunner
class Runner:
def __init__(self, timeout, echo=False, skip=False):
self.timeout = timeout
self.echo = echo
self.skip = skip
def __call__(self, fn):
if self.skip:
print('\n- "{}": SKIPPED'.format(fn.__name__), end='')
return 0
res = -1
try:
res = testrunner.run(fn, self.timeout, self.echo)
finally:
if res == 0:
print('- "{}": SUCCESS'.format(fn.__name__), end='')
else:
print('- "{}": FAILED'.format(fn.__name__), end='')
return res
class _HostTcpNode:
def __init__(self):
self.opened = False
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.interface = self._get_interface()
self.address = self._get_ip_address(self.interface)
def send(self, payload_to_send):
self.sock.send(payload_to_send.encode('utf-8'))
def receive(self, sent_payload):
total_bytes = len(sent_payload)
assert self.sock.recv(total_bytes, socket.MSG_WAITALL).decode('utf-8') == sent_payload
def close(self):
self.sock.close()
self.opened = False
def _get_interface(self):
# Check if given tap device is part of a network bridge
# if so use bridged interface instead of given tap device
tap = os.environ["TAPDEV"]
result = os.popen('bridge link show dev {}'.format(tap))
bridge = re.search('master (.*) state', result.read())
return bridge.group(1).strip() if bridge else tap
def _get_ip_address(self, interface):
result = os.popen('ip addr show dev ' + interface + ' scope link')
return re.search('inet6 (.*)/64', result.read()).group(1).strip()
class HostTcpServer(_HostTcpNode):
def __init__(self, listen_port):
super().__init__()
self.listening = False
self.listen_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_port = listen_port
def __enter__(self):
if not self.listening:
self.listen()
return self
def __exit__(self, _1, _2, _3):
if self.listening:
self.stop_listen()
def listen(self):
self.listen_sock.bind(('::', self.listen_port))
self.listen_sock.listen(1)
self.listening = True
def accept(self):
self.sock, _ = self.listen_sock.accept()
def stop_listen(self):
self.listen_sock.close()
self.listening = False
class HostTcpClient(_HostTcpNode):
def __init__(self, target):
super().__init__()
self.target_addr = str(target.address)
self.target_port = str(target.listen_port)
def __enter__(self):
if not self.opened:
self.open()
return self
def __exit__(self, _1, _2, _3):
if self.opened:
self.close()
def open(self):
addrinfo = socket.getaddrinfo(
self.target_addr + '%' + self.interface,
self.target_port,
type=socket.SOCK_STREAM
)
self.sock.connect(addrinfo[0][-1])
self.opened = True
class _RiotTcpNode:
def __init__(self, child):
self.child = child
self.opened = False
self.interface = self._get_interface()
self.address = self._get_ip_address()
self.mac = self._get_mac_address()
def tcb_init(self):
self.child.sendline('gnrc_tcp_tcb_init')
self.child.expect_exact('gnrc_tcp_tcb_init: returns')
def send(self, timeout_ms, payload_to_send):
total_bytes = len(payload_to_send)
# Verify that internal buffer can hold the given amount of data
assert self._setup_internal_buffer() >= total_bytes
# Write data to RIOT nodes internal buffer
self._write_data_to_internal_buffer(payload_to_send)
# Send buffer contents via tcp
self.child.sendline('gnrc_tcp_send {}'.format(str(timeout_ms)))
self.child.expect_exact('gnrc_tcp_send: sent {}'.format(str(total_bytes)))
# Verify that packet buffer is empty
self._verify_pktbuf_empty()
def receive(self, timeout_ms, sent_payload):
total_bytes = len(sent_payload)
# Verify that internal Buffer can hold the test data
assert self._setup_internal_buffer() >= total_bytes
# Receive Data
self.child.sendline('gnrc_tcp_recv {} {}'.format(timeout_ms, str(total_bytes)))
self.child.expect_exact('gnrc_tcp_recv: received ' + str(total_bytes), timeout=20)
# Readout internal buffer content of RIOT Note
assert self._read_data_from_internal_buffer(total_bytes) == sent_payload
# Verify that packet buffer is empty
self._verify_pktbuf_empty()
def close(self):
self.child.sendline('gnrc_tcp_close')
self.child.expect_exact('gnrc_tcp_close: returns')
self._verify_pktbuf_empty()
self.opened = False
def abort(self):
self.child.sendline('gnrc_tcp_abort')
self.child.expect_exact('gnrc_tcp_abort: returns')
self._verify_pktbuf_empty()
self.opened = False
def _get_interface(self):
self.child.sendline('ifconfig')
self.child.expect(r'Iface\s+(\d+)\s')
return self.child.match.group(1).strip()
def _get_ip_address(self):
self.child.sendline('ifconfig')
self.child.expect(r'(fe80:[0-9a-f:]+)\s')
return self.child.match.group(1).strip()
def _get_mac_address(self):
self.child.sendline('ifconfig')
self.child.expect(r'HWaddr: ([A-F-a-f0-9:]+)\s')
return self.child.match.group(1)
def _setup_internal_buffer(self):
self.child.sendline('buffer_init')
self.child.sendline('buffer_get_max_size')
self.child.expect(r'returns (\d+)\s')
return int(self.child.match.group(1).strip())
def _write_data_to_internal_buffer(self, data):
# This is a workaround to avoid the restrictions of the RIOT Shell
# Buffersize by filling an internal buffer piece by piece.
# The internal buffers current content is via tcp
# by calling gnrc_tcp_send.
CHUNK_SIZE = 100
for i in range(0, len(data), CHUNK_SIZE):
self.child.sendline('buffer_write ' + str(i) + ' ' + data[i: CHUNK_SIZE + i])
def _read_data_from_internal_buffer(self, bytes_to_read):
self.child.sendline('buffer_read 0 ' + str(bytes_to_read))
self.child.expect('<begin>(.*)<end>')
return self.child.match.group(1)
def _verify_pktbuf_empty(self):
self.child.sendline('pktbuf')
self.child.expect(r'first byte: (0x[0-9a-fA-F]+).*\(size: (\d+)\)')
pktbuf_addr = self.child.match.group(1)
pktbuf_size = self.child.match.group(2)
self.child.expect(r'~ unused: {} \(next: (\(nil\)|0), size: {}\) ~'.format(
pktbuf_addr, pktbuf_size)
)
class RiotTcpServer(_RiotTcpNode):
def __init__(self, child, listen_port):
super().__init__(child)
self.listening = False
self.listen_port = str(listen_port)
self.tcb_init()
def __enter__(self):
if not self.listening:
self.listen()
return self
def __exit__(self, _1, _2, _3):
if self.listening:
self.stop_listen()
def listen(self):
self.child.sendline('gnrc_tcp_listen [::]:{}'.format(self.listen_port))
self.child.expect_exact('gnrc_tcp_listen: returns 0')
self.listening = True
def accept(self, timeout_ms):
self.child.sendline('gnrc_tcp_accept {}'.format(str(timeout_ms)))
self.child.expect_exact('gnrc_tcp_accept: returns 0')
self.opened = True
def stop_listen(self):
self.child.sendline('gnrc_tcp_stop_listen')
self.child.expect_exact('gnrc_tcp_stop_listen: returns')
self._verify_pktbuf_empty()
self.listening = False
class RiotTcpClient(_RiotTcpNode):
def __init__(self, child, target):
super().__init__(child)
self.target_addr = target.address + '%' + self.interface
self.target_port = str(target.listen_port)
self.tcb_init()
def __enter__(self):
if not self.opened:
self.open()
return self
def __exit__(self, _1, _2, _3):
if self.opened:
self.close()
def open(self):
self.child.sendline('gnrc_tcp_open [{}]:{} 0'.format(self.target_addr, self.target_port))
self.child.expect_exact('gnrc_tcp_open: returns 0')
self.opened = True
def generate_port_number():
return random.randint(1024, 65535)
def sudo_guard(uses_scapy=False):
sudo_required = uses_scapy or (os.environ.get("BOARD", "") != "native")
if sudo_required and os.geteuid() != 0:
print("\x1b[1;31mThis test requires root privileges.\n"
"It uses `./dist/tools/ethos/start_networking.sh` as term" +
(" and it's constructing and sending Ethernet frames."
if uses_scapy else "") + "\x1b[0m\n",
file=sys.stderr)
sys.exit(1)

View File

@ -1,280 +0,0 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import sys
import os
import re
import socket
import random
import testrunner
class Runner:
def __init__(self, timeout, echo=False, skip=False):
self.timeout = timeout
self.echo = echo
self.skip = skip
def __call__(self, fn):
if self.skip:
print('- Test "{}": SKIPPED'.format(fn.__name__))
return 0
res = -1
try:
res = testrunner.run(fn, self.timeout, self.echo)
finally:
if res == 0:
print('- Test "{}": SUCCESS'.format(fn.__name__))
else:
print('- Test "{}": FAILED'.format(fn.__name__))
return res
class TcpServer:
def __init__(self, port, shutdown_event):
self._port = port
self._shutdown = shutdown_event
def __enter__(self):
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(('::', self._port))
self.sock.listen(1)
self.conn, self.addr = self.sock.accept()
return self
def __exit__(self, exc, exc_val, exc_trace):
self._shutdown.wait()
self.conn.close()
self.sock.close()
def send(self, data):
self.conn.send(data.encode('utf-8'))
def recv(self, number_of_bytes):
return self.conn.recv(number_of_bytes, socket.MSG_WAITALL).decode('utf-8')
class HostTcpNode:
def __init__(self):
self.opened = False
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.interface = get_host_tap_device()
def send(self, payload_to_send):
self.sock.send(payload_to_send.encode('utf-8'))
def receive(self, sent_payload):
total_bytes = len(sent_payload)
assert self.sock.recv(total_bytes, socket.MSG_WAITALL).decode('utf-8') == sent_payload
def close(self):
self.sock.close()
self.opened = False
class HostTcpClient(HostTcpNode):
def __init__(self, target_addr, target_port):
super().__init__()
self.target_addr = target_addr
self.target_port = target_port
def __enter__(self):
if not self.opened:
self.open()
return self
def __exit__(self, _1, _2, _3):
if self.opened:
self.close()
def open(self):
addrinfo = socket.getaddrinfo(
self.target_addr + '%' + self.interface,
self.target_port,
type=socket.SOCK_STREAM
)
self.sock.connect(addrinfo[0][-1])
self.opened = True
class RiotTcpNode:
def __init__(self, child):
self.child = child
self.opened = False
# Query local address of RIOT Node
self.addr = get_riot_ll_addr(self.child)
def tcb_init(self):
self.child.sendline('gnrc_tcp_tcb_init')
self.child.expect_exact('gnrc_tcp_tcb_init: returns')
def send(self, timeout_ms, payload_to_send):
total_bytes = len(payload_to_send)
# Verify that internal buffer can hold the given amount of data
assert setup_internal_buffer(self.child) >= total_bytes
# Write data to RIOT nodes internal buffer
write_data_to_internal_buffer(self.child, payload_to_send)
# Send buffer contents via tcp
self.child.sendline('gnrc_tcp_send {}'.format(str(timeout_ms)))
self.child.expect_exact('gnrc_tcp_send: sent {}'.format(str(total_bytes)))
# Verify that packet buffer is empty
verify_pktbuf_empty(self.child)
def receive(self, timeout_ms, sent_payload):
total_bytes = len(sent_payload)
# Verify that internal Buffer can hold the test data
assert setup_internal_buffer(self.child) >= total_bytes
# Receive Data
self.child.sendline('gnrc_tcp_recv {} {}'.format(timeout_ms, str(total_bytes)))
self.child.expect_exact('gnrc_tcp_recv: received ' + str(total_bytes), timeout=20)
# Readout internal buffer content of RIOT Note
assert read_data_from_internal_buffer(self.child, total_bytes) == sent_payload
# Verify that packet buffer is empty
verify_pktbuf_empty(self.child)
def close(self):
self.child.sendline('gnrc_tcp_close')
self.child.expect_exact('gnrc_tcp_close: returns')
verify_pktbuf_empty(self.child)
self.opened = False
def abort(self):
self.child.sendline('gnrc_tcp_abort')
self.child.expect_exact('gnrc_tcp_abort: returns')
verify_pktbuf_empty(self.child)
self.opened = False
class RiotTcpServer(RiotTcpNode):
def __init__(self, child, listen_port, listen_addr='[::]'):
super().__init__(child)
self.listening = False
self.listen_port = str(listen_port)
self.listen_addr = str(listen_addr)
self.tcb_init()
def __enter__(self):
if not self.listening:
self.listen()
self.listening = True
return self
def __exit__(self, _1, _2, _3):
if self.listening:
self.stop_listen()
self.listening = False
def listen(self):
self.child.sendline('gnrc_tcp_listen {}:{}'.format(self.listen_addr, self.listen_port))
self.child.expect_exact('gnrc_tcp_listen: returns 0')
def accept(self, timeout_ms):
self.child.sendline('gnrc_tcp_accept {}'.format(str(timeout_ms)))
self.child.expect_exact('gnrc_tcp_accept: returns 0')
self.opened = True
def stop_listen(self):
self.child.sendline('gnrc_tcp_stop_listen')
self.child.expect_exact('gnrc_tcp_stop_listen: returns')
verify_pktbuf_empty(self.child)
def generate_port_number():
return random.randint(1024, 65535)
def get_host_tap_device():
# Check if given tap device is part of a network bridge
# if so use bridged interface instead of given tap device
tap = os.environ["TAPDEV"]
result = os.popen('bridge link show dev {}'.format(tap))
bridge = re.search('master (.*) state', result.read())
return bridge.group(1).strip() if bridge else tap
def get_host_ll_addr(interface):
result = os.popen('ip addr show dev ' + interface + ' scope link')
return re.search('inet6 (.*)/64', result.read()).group(1).strip()
def get_riot_l2_addr(child):
child.sendline('ifconfig')
child.expect(r'HWaddr: ([A-F-a-f0-9:]+)\s')
return child.match.group(1)
def get_riot_ll_addr(child):
child.sendline('ifconfig')
child.expect(r'(fe80:[0-9a-f:]+)\s')
return child.match.group(1).strip()
def get_riot_if_id(child):
child.sendline('ifconfig')
child.expect(r'Iface\s+(\d+)\s')
return child.match.group(1).strip()
def setup_internal_buffer(child):
child.sendline('buffer_init')
child.sendline('buffer_get_max_size')
child.expect(r'returns (\d+)\s')
return int(child.match.group(1).strip())
def write_data_to_internal_buffer(child, data):
# This is a workaround to avoid the restrictions of the RIOT Shell Buffersize by
# filling an internal buffer piece by piece. The internal buffers current content is via tcp
# by calling gnrc_tcp_send.
CHUNK_SIZE = 100
for i in range(0, len(data), CHUNK_SIZE):
child.sendline('buffer_write ' + str(i) + ' ' + data[i: CHUNK_SIZE + i])
def read_data_from_internal_buffer(child, bytes_to_read):
child.sendline('buffer_read 0 ' + str(bytes_to_read))
child.expect('<begin>(.*)<end>')
return child.match.group(1)
def verify_pktbuf_empty(child):
child.sendline('pktbuf')
child.expect(r'first byte: (0x[0-9a-fA-F]+).*\(size: (\d+)\)')
pktbuf_addr = child.match.group(1)
pktbuf_size = child.match.group(2)
child.expect(r'~ unused: {} \(next: (\(nil\)|0), size: {}\) ~'.format(pktbuf_addr, pktbuf_size))
def sudo_guard(uses_scapy=False):
sudo_required = uses_scapy or (os.environ.get("BOARD", "") != "native")
if sudo_required and os.geteuid() != 0:
print("\x1b[1;31mThis test requires root privileges.\n"
"It uses `./dist/tools/ethos/start_networking.sh` as term" +
(" and it's constructing and sending Ethernet frames."
if uses_scapy else "") + "\x1b[0m\n",
file=sys.stderr)
sys.exit(1)