1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-12-27 07:21:18 +01:00

tests/lwip: add regression test for multiple simultaneous connect case

This commit is contained in:
Martine Lenders 2020-02-21 21:48:43 +01:00 committed by Martine S. Lenders
parent a7ef50636e
commit 37df4ea7e2
No known key found for this signature in database
GPG Key ID: CCD317364F63286F

View File

@ -14,6 +14,7 @@ import subprocess
import time
import types
import pexpect
import socket
DEFAULT_TIMEOUT = 5
@ -268,6 +269,53 @@ def test_tcpv6_send(board_group, application, env=None):
client.expect_exact(u"could not send")
def test_tcpv6_multiconnect(board_group, application, env=None):
if any(b.name != "native" for b in board_group.boards):
# run test only with native
print("SKIP_TEST INFO found non-native board")
return
env_client = os.environ.copy()
if env is not None:
env_client.update(env)
env_client.update(board_group.boards[0].to_env())
env_server = os.environ.copy()
if env is not None:
env_server.update(env)
env_server.update(board_group.boards[1].to_env())
with pexpect.spawnu("make", ["-C", application, "term"], env=env_client,
timeout=DEFAULT_TIMEOUT) as client, \
pexpect.spawnu("make", ["-C", application, "term"], env=env_server,
timeout=DEFAULT_TIMEOUT) as server:
port = random.randint(0x0000, 0xffff)
server_ip = get_ipv6_address(server)
client_ip = get_ipv6_address(client)
try:
connect_addr = socket.getaddrinfo(
"%s%%tapbr0" % server_ip, port)[0][4]
except socket.gaierror as e:
print("SKIP_TEST INFO", e)
return
server.sendline(u"tcp server start %d" % port)
# wait for neighbor discovery to be done
time.sleep(5)
client.sendline(u"tcp connect %s %d" % (server_ip, port))
server.expect(u"TCP client \\[%s\\]:[0-9]+ connected" % client_ip)
with socket.socket(socket.AF_INET6) as sock:
sock.connect(connect_addr)
server.expect(u"Error on TCP accept \\[-[0-9]+\\]")
client.sendline(u"tcp disconnect")
server.expect(u"TCP connection to \\[%s\\]:[0-9]+ reset" % client_ip)
client.sendline(u"tcp connect %s %d" % (server_ip, port))
server.expect(u"TCP client \\[%s\\]:[0-9]+ connected" % client_ip)
client.sendline(u"tcp disconnect")
server.expect(u"TCP connection to \\[%s\\]:[0-9]+ reset" % client_ip)
with socket.socket(socket.AF_INET6) as sock:
sock.connect(connect_addr)
server.expect(u"TCP client \\[[0-9a-f:]+\\]:[0-9]+ connected")
server.expect(u"TCP connection to \\[[0-9a-f:]+\\]:[0-9]+ reset")
def test_triple_send(board_group, application, env=None):
env_sender = os.environ.copy()
if env is not None:
@ -312,4 +360,4 @@ if __name__ == "__main__":
TestStrategy().execute([BoardGroup((Board("native", "tap0"),
Board("native", "tap1")))],
[test_ipv6_send, test_udpv6_send, test_tcpv6_send,
test_triple_send])
test_tcpv6_multiconnect, test_triple_send])