mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2025-12-25 06:23:53 +01:00
Merge pull request #21853 from crasbe/pr/tests_zep_random_port
tests/net: use random, free port for socket_zep&gnrc_netif_ieeee802154
This commit is contained in:
commit
a8f0171bc1
@ -6,6 +6,7 @@
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import socket
|
||||
from testrunner import run
|
||||
@ -16,9 +17,22 @@ ZEP_V2_VERSION = 2
|
||||
ZEP_V2_TYPE_DATA = 1
|
||||
|
||||
|
||||
def get_random_free_port() -> int:
|
||||
# we let the OS determine a free port and then return it
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(("127.0.0.1", 0))
|
||||
# quickly reuse the port after the return
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
||||
local_port = get_random_free_port()
|
||||
remote_port = get_random_free_port()
|
||||
|
||||
|
||||
def testfunc(child):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
||||
s.bind(("", 17754))
|
||||
s.bind(("", local_port))
|
||||
child.expect(r"l2_addr: ([0-9A-F:]+)\r\n")
|
||||
dst = int(child.match.group(1).replace(':', ''), base=16)
|
||||
|
||||
@ -35,7 +49,7 @@ def testfunc(child):
|
||||
ZEP2(ver=ZEP_V2_VERSION, type=ZEP_V2_TYPE_DATA, channel=26,
|
||||
length=len(payload)) / payload
|
||||
)
|
||||
s.sendto(packet, ("localhost", 17755))
|
||||
s.sendto(packet, ("localhost", remote_port))
|
||||
child.expect("PKTDUMP: data received:")
|
||||
child.expect("00000000 7E 33 F3 00")
|
||||
child.expect("~~ PKT - 2 snips, total size: 16 byte")
|
||||
@ -49,7 +63,7 @@ def testfunc(child):
|
||||
ZEP2(ver=ZEP_V2_VERSION, type=ZEP_V2_TYPE_DATA, channel=26,
|
||||
length=len(payload)) / payload
|
||||
)
|
||||
s.sendto(packet, ("localhost", 17755))
|
||||
s.sendto(packet, ("localhost", remote_port))
|
||||
res = child.expect([TIMEOUT, EOF, "PKTDUMP: data received:"])
|
||||
# we actually want the timeout here. The application either
|
||||
# receives a bogus packet or crashes in an error case case
|
||||
@ -57,4 +71,5 @@ def testfunc(child):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
os.environ['TERMFLAGS'] = '-z "0.0.0.0:%d,localhost:%d"' % (remote_port, local_port)
|
||||
sys.exit(run(testfunc, timeout=1))
|
||||
|
||||
@ -12,15 +12,24 @@ import socket
|
||||
from testrunner import run
|
||||
|
||||
|
||||
def get_random_free_port() -> int:
|
||||
# we let the OS determine a free port and then return it
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(("127.0.0.1", 0))
|
||||
# quickly reuse the port after the return
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
||||
IEEE802154_FRAME_LEN_MAX = 127
|
||||
ZEP_DATA_HEADER_SIZE = 32
|
||||
FCS_LEN = 2
|
||||
RCVBUF_LEN = IEEE802154_FRAME_LEN_MAX + ZEP_DATA_HEADER_SIZE + FCS_LEN
|
||||
zep_params = {
|
||||
"local_addr": "127.0.0.1",
|
||||
"local_port": 12345,
|
||||
"local_port": get_random_free_port(),
|
||||
"remote_addr": "127.0.0.1",
|
||||
"remote_port": 17754,
|
||||
"remote_port": get_random_free_port(),
|
||||
}
|
||||
s = None
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user