diff --git a/dist/pythonlibs/riotctrl_shell/loramac.py b/dist/pythonlibs/riotctrl_shell/loramac.py new file mode 100644 index 0000000000..45e98b708b --- /dev/null +++ b/dist/pythonlibs/riotctrl_shell/loramac.py @@ -0,0 +1,94 @@ +# Copyright (C) 2021 Inria +# +# This file is subject to the terms and conditions of the GNU Lesser +# General Public License v2.1. See the file LICENSE in the top level +# directory for more details. + +""" +Shell interactions related to Semtech Loramac package + +Defines shell command interactions related to Semtech Loramac package +""" + +import re + +from riotctrl.shell import ShellInteraction + +# ==== Parsers ==== + + +class LoramacHelpParser(): + def __init__(self): + self.eeprom_c = re.compile(r'Usage: loramac \<([\w+\|?]+)\>') + + def has_eeprom(self, cmd_output): + for line in cmd_output.splitlines(): + m = self.eeprom_c.search(line) + if m is not None: + if 'save' in m.group(1): + return True + return False + + +class LoramacUpLinkCounterParser(): + def __init__(self): + self.uplink_c = re.compile(r'Uplink Counter: (\d+)') + + def uplink_count(self, cmd_output): + for line in cmd_output.splitlines(): + m = self.uplink_c.search(line) + if m is not None: + return m.group(1) + raise RuntimeError + +# ==== ShellInteractions ==== + + +class Loramac(ShellInteraction): + @ShellInteraction.check_term + def loramac_cmd(self, args=None, timeout=-1, async_=False): + cmd = "loramac" + if args is not None: + cmd += " {args}".format(args=" ".join(str(a) for a in args)) + return self.cmd(cmd, timeout=timeout, async_=False) + + def loramac_join(self, mode, timeout=-1, async_=False): + res = self.loramac_cmd(args=("join", mode), + timeout=timeout, async_=async_) + if "succeeded" in res: + return res + raise RuntimeError(res) + + def loramac_tx(self, payload, cnf=False, port=2, timeout=-1, async_=False): + payload = '\"' + payload + '\"' + res = self.loramac_cmd( + args=("tx", payload, 'cnf' if cnf else "uncnf", port), + timeout=timeout, async_=async_ + ) + if "success" in res: + return res + raise RuntimeError(res) + + def loramac_set(self, key, value, timeout=-1, async_=False): + res = self.loramac_cmd(args=("set", key, value), + timeout=timeout, async_=async_) + if "Usage" in res: + raise RuntimeError(res) + return res + + def loramac_get(self, key, timeout=-1, async_=False): + res = self.loramac_cmd(args=("get", key), + timeout=timeout, async_=async_) + if "Usage" in res: + raise RuntimeError(res) + return res + + def loramac_eeprom_save(self, timeout=-1, async_=False): + return self.loramac_cmd(args=("save",), timeout=timeout, async_=async_) + + def loramac_eeprom_erase(self, timeout=-1, async_=False): + return self.loramac_cmd(args=("erase",), timeout=timeout, + async_=async_) + + def loramac_help(self, timeout=-1, async_=False): + return self.loramac_cmd(args=("help",), timeout=timeout, async_=async_) diff --git a/dist/pythonlibs/riotctrl_shell/tests/test_loramac.py b/dist/pythonlibs/riotctrl_shell/tests/test_loramac.py new file mode 100644 index 0000000000..d3fad42047 --- /dev/null +++ b/dist/pythonlibs/riotctrl_shell/tests/test_loramac.py @@ -0,0 +1,141 @@ +# Copyright (C) 2021 Inria +# This file is subject to the terms and conditions of the GNU Lesser +# General Public License v2.1. See the file LICENSE in the top level +# directory for more details. + +import pytest +import riotctrl_shell.loramac + +from .common import init_ctrl + + +def test_loramac_join_otaa_failed(): + rc = init_ctrl(output=""" +loramac join otaa +Join procedure failed! + """) + shell = riotctrl_shell.loramac.Loramac(rc) + with pytest.raises(RuntimeError) as error: + shell.loramac_join('otaa') + assert "Join procedure failed!" in str(error.value) + + +def test_loramac_join_otaa_succeed(): + rc = init_ctrl(output=""" +loramac join otaa +Join procedure succeeded! + """) + shell = riotctrl_shell.loramac.Loramac(rc) + res = shell.loramac_join('otaa') + assert "Join procedure succeeded!" in res + + +def test_loramac_join_abp_succeed(): + rc = init_ctrl(output=""" +loramac join abp +Join procedure succeeded! + """) + shell = riotctrl_shell.loramac.Loramac(rc) + res = shell.loramac_join('abp') + assert "Join procedure succeeded!" in res + + +def test_loramac_tx_not_joined(): + rc = init_ctrl(output=""" +loramac tx "Hello RIOT!" +Cannot send: not joined + """) + shell = riotctrl_shell.loramac.Loramac(rc) + with pytest.raises(RuntimeError) as error: + shell.loramac_tx("Hello RIOT!") + assert "Cannot send: not joined" in str(error.value) + + +def test_loramac_tx_succeed(): + rc = init_ctrl(output=""" +loramac tx "Hello RIOT!" +Message sent with success + """) + shell = riotctrl_shell.loramac.Loramac(rc) + res = shell.loramac_tx("Hello RIOT!") + assert "Message sent with success" in res + + +def test_loramac_help_parser_has_eeprom_no(): + rc = init_ctrl(output=""" +loramac help +Usage: loramac + """) + shell = riotctrl_shell.loramac.Loramac(rc) + parser = riotctrl_shell.loramac.LoramacHelpParser() + assert not parser.has_eeprom(shell.loramac_help()) + + +def test_loramac_help_parser_has_eeprom_yes(): + rc = init_ctrl(output=""" +loramac help +Usage: loramac + """) + shell = riotctrl_shell.loramac.Loramac(rc) + parser = riotctrl_shell.loramac.LoramacHelpParser() + assert parser.has_eeprom(shell.loramac_help()) + + +def test_loramac_set_fail(): + rc = init_ctrl(output=""" +loramac set dr 123 +Usage: loramac set dr <0..16> + """) + shell = riotctrl_shell.loramac.Loramac(rc) + with pytest.raises(RuntimeError) as error: + shell.loramac_set('dr', 123) + assert "Usage: loramac set dr <0..16>" in str(error.value) + + +def test_loramac_set_succeed(): + rc = init_ctrl(output="loramac set dr 5") + shell = riotctrl_shell.loramac.Loramac(rc) + shell.loramac_set('dr', 5) + assert rc.term.last_command == "loramac set dr 5" + + +def test_loramac_get_fail(): + rc = init_ctrl(output=""" +loramac get dxe +Usage: loramac get + """) # noqa: E501 + shell = riotctrl_shell.loramac.Loramac(rc) + with pytest.raises(RuntimeError) as error: + shell.loramac_get('dxe') + assert "Usage: loramac get" in str(error.value) + + +def test_loramac_get_succeed(): + rc = init_ctrl(output=""" +loramac get dr +DATARATE: 5 + """) + shell = riotctrl_shell.loramac.Loramac(rc) + res = shell.loramac_get('dr') + assert "Usage: loramac get" not in res + + +def test_loramac_uplink_counter_parser(): + rc = init_ctrl(output=""" +loramac get ul_cnt +Uplink Counter: 32 + """) + shell = riotctrl_shell.loramac.Loramac(rc) + parser = riotctrl_shell.loramac.LoramacUpLinkCounterParser() + assert parser.uplink_count(shell.loramac_get('ul_cnt')) == '32' + + +def test_loramac_uplink_counter_parser_fail(): + rc = init_ctrl(output=""" +loramac get ul_cnt +Uplink Counter: asdfasdf + """) + shell = riotctrl_shell.loramac.Loramac(rc) + parser = riotctrl_shell.loramac.LoramacUpLinkCounterParser() + with pytest.raises(RuntimeError): + parser.uplink_count(shell.loramac_get('ul_cnt'))