diff --git a/examples/suit_update/tests/01-run.py b/examples/suit_update/tests/01-run.py index 2462f11ad3..1cf3ca7b82 100755 --- a/examples/suit_update/tests/01-run.py +++ b/examples/suit_update/tests/01-run.py @@ -50,7 +50,7 @@ def notify(coap_server, client_url, version=None): assert not subprocess.call(cmd) -def publish(server_dir, server_url, app_ver, latest_name=None): +def publish(server_dir, server_url, app_ver, keys='default', latest_name=None): cmd = [ "make", "suit/publish", @@ -58,6 +58,7 @@ def publish(server_dir, server_url, app_ver, latest_name=None): "SUIT_COAP_SERVER={}".format(server_url), "APP_VER={}".format(app_ver), "RIOTBOOT_SKIP_COMPILE=1", + "SUIT_KEY={}".format(keys), ] if latest_name is not None: cmd.append("SUIT_MANIFEST_SIGNED_LATEST={}".format(latest_name)) @@ -73,6 +74,7 @@ def wait_for_update(child): def get_ipv6_addr(child): + child.expect_exact('>') child.sendline('ifconfig') if USE_ETHOS == 0: # Get device global address @@ -111,28 +113,45 @@ def ping6(client): return ping_ok -def testfunc(child): - """For one board test if specified application is updatable""" +def get_reachable_addr(child): + # Wait for suit_coap thread to start + child.expect_exact("suit_coap: started.") + child.expect_exact("Starting the shell") + # give some time for the network interface to be configured + time.sleep(1) + # Get address + client_addr = get_ipv6_addr(child) + # Verify address is reachable + ping6(client_addr) + return "[{}]".format(client_addr) - # Initial Setup and wait for address configuration - child.expect_exact("main(): This is RIOT!") +def app_version(child): # get version of currently running image # "Image Version: 0x00000000" child.expect(r"Image Version: (?P0x[0-9a-fA-F:]+)\r\n") - current_app_ver = int(child.match.group("app_ver"), 16) + app_ver = int(child.match.group("app_ver"), 16) + return app_ver - for version in [current_app_ver + 1, current_app_ver + 2]: - child.expect_exact("suit_coap: started.") - child.expect_exact("Starting the shell") - # give some time for the network interface to be configured - time.sleep(1) - # Get address, if using ethos it will change on each reboot - client_addr = get_ipv6_addr(child) - client = "[{}]".format(client_addr) - # Wait for suit_coap thread to start - # Ping6 - ping6(client_addr) + +def _test_invalid_version(child, client, app_ver): + publish(TMPDIR.name, COAP_HOST, app_ver - 1) + notify(COAP_HOST, client, app_ver - 1) + child.expect_exact("suit_coap: trigger received") + child.expect_exact("suit: verifying manifest signature...") + child.expect_exact("seq_nr <= running image") + + +def _test_invalid_signature(child, client, app_ver): + publish(TMPDIR.name, COAP_HOST, app_ver + 1, 'invalid_keys') + notify(COAP_HOST, client, app_ver + 1) + child.expect_exact("suit_coap: trigger received") + child.expect_exact("suit: verifying manifest signature...") + child.expect_exact("Unable to validate signature") + + +def _test_successful_update(child, client, app_ver): + for version in [app_ver + 1, app_ver + 2]: # Trigger update process, verify it validates manifest correctly publish(TMPDIR.name, COAP_HOST, version) notify(COAP_HOST, client, version) @@ -150,6 +169,30 @@ def testfunc(child): # Verify running slot child.expect(r"running from slot (\d+)\r\n") assert target_slot == int(child.match.group(1)), "BOOTED FROM SAME SLOT" + # Verify client is reachable and get address + client = get_reachable_addr(child) + + +def testfunc(child): + # Get current app_ver + current_app_ver = app_version(child) + # Verify client is reachable and get address + client = get_reachable_addr(child) + + def run(func): + if child.logfile == sys.stdout: + func(child, client, current_app_ver) + else: + try: + func(child, client, current_app_ver) + print(".", end="", flush=True) + except Exception as e: + print("FAILED") + raise e + + run(_test_invalid_signature) + run(_test_invalid_version) + run(_test_successful_update) print("TEST PASSED") @@ -159,7 +202,6 @@ if __name__ == "__main__": res = 1 aiocoap_process = start_aiocoap_fileserver() # TODO: wait for coap port to be available - res = run(testfunc, echo=True) except Exception as e: