diff --git a/tests/emcute/Makefile b/tests/emcute/Makefile new file mode 100644 index 0000000000..69c27ae01d --- /dev/null +++ b/tests/emcute/Makefile @@ -0,0 +1,38 @@ +include ../Makefile.tests_common + +RIOTBASE ?= $(CURDIR)/../.. + +export TAP ?= tap0 + +# use Ethernet as link-layer protocol +ifeq (native,$(BOARD)) + TERMFLAGS ?= $(TAP) +else + ETHOS_BAUDRATE ?= 115200 + CFLAGS += -DETHOS_BAUDRATE=$(ETHOS_BAUDRATE) + TERMDEPS += ethos + TERMPROG ?= sudo $(RIOTTOOLS)/ethos/ethos + TERMFLAGS ?= $(TAP) $(PORT) $(ETHOS_BAUDRATE) +endif +USEMODULE += auto_init_gnrc_netif +USEMODULE += gnrc_ipv6_default +USEMODULE += gnrc_sock_udp +USEMODULE += emcute +USEMODULE += od +USEMODULE += shell +USEMODULE += shell_commands +USEMODULE += sock_util + +CFLAGS += -DEMCUTE_TOPIC_MAXLEN="249" # 256 - 7 +CFLAGS += -DSTDIO_UART_RX_BUFSIZE="512" # Adapt to SHELL_BUFSIZE in app + +# The test requires some setup and to be run as root +# So it cannot currently be run +TEST_ON_CI_BLACKLIST += all + +.PHONY: ethos + +ethos: + $(Q)env -u CC -u CFLAGS make -C $(RIOTTOOLS)/ethos + +include $(RIOTBASE)/Makefile.include diff --git a/tests/emcute/Makefile.board.dep b/tests/emcute/Makefile.board.dep new file mode 100644 index 0000000000..b595b8605c --- /dev/null +++ b/tests/emcute/Makefile.board.dep @@ -0,0 +1,6 @@ +# Put board specific dependencies here +ifeq (native,$(BOARD)) + USEMODULE += netdev_tap +else + USEMODULE += stdio_ethos +endif diff --git a/tests/emcute/Makefile.ci b/tests/emcute/Makefile.ci new file mode 100644 index 0000000000..01583a2b37 --- /dev/null +++ b/tests/emcute/Makefile.ci @@ -0,0 +1,56 @@ +BOARD_INSUFFICIENT_MEMORY := \ + airfy-beacon \ + arduino-duemilanove \ + arduino-leonardo \ + arduino-mega2560 \ + arduino-nano \ + arduino-uno \ + atmega1284p \ + atmega328p \ + b-l072z-lrwan1 \ + blackpill-128kib \ + blackpill \ + bluepill-128kib \ + bluepill \ + calliope-mini \ + cc2650-launchpad \ + cc2650stk \ + chronos \ + derfmega128 \ + hifive1 \ + hifive1b \ + i-nucleo-lrwan1 \ + lsn50 \ + maple-mini \ + mega-xplained \ + microbit \ + microduino-corerf \ + msb-430 \ + msb-430h \ + nrf51dongle \ + nrf6310 \ + nucleo-f030r8 \ + nucleo-f031k6 \ + nucleo-f042k6 \ + nucleo-f070rb \ + nucleo-f072rb \ + nucleo-f103rb \ + nucleo-f303k8 \ + nucleo-f334r8 \ + nucleo-l031k6 \ + nucleo-l053r8 \ + nucleo-l073rz \ + opencm904 \ + saml10-xpro \ + saml11-xpro \ + spark-core \ + stm32f030f4-demo \ + stm32f0discovery \ + stm32l0538-disco \ + telosb \ + waspmote-pro \ + wsn430-v1_3b \ + wsn430-v1_4 \ + yunjia-nrf51822 \ + z1 \ + # diff --git a/tests/emcute/README.md b/tests/emcute/README.md new file mode 100644 index 0000000000..fc310d3659 --- /dev/null +++ b/tests/emcute/README.md @@ -0,0 +1,8 @@ +# Overview + +This is a test application for emcute. It is supposed to be run with the test +scripts in `tests/`: + +``` +BOARD=" make flash test" +``` diff --git a/tests/emcute/main.c b/tests/emcute/main.c new file mode 100644 index 0000000000..beb9e75d47 --- /dev/null +++ b/tests/emcute/main.c @@ -0,0 +1,367 @@ +/* + * Copyright (C) 2019 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup tests + * @{ + * + * @file + * @brief emcute MQTT-SN test application + * + * @author Martine Sophie Lenders + * + * @} + */ + +#include +#include +#include + +#include "net/emcute.h" +#include "net/mqttsn.h" +#include "net/ipv6/addr.h" +#include "od.h" +#include "shell.h" +#include "thread.h" +#include "net/sock/util.h" + +/* get to maximum length for client ID ;-)*/ +#define EMCUTE_ID "emcute test app ......." +#define EMCUTE_PRIO (THREAD_PRIORITY_MAIN - 1) + +#define NUMOFTOPS (4U) +#define SHELL_BUFSIZE (512U) /* for sub with long topic */ + +static char _emcute_stack[THREAD_STACKSIZE_DEFAULT]; +static char _shell_buffer[SHELL_BUFSIZE]; +static uint8_t _pub_buf[EMCUTE_BUFSIZE]; + +static emcute_topic_t _topics[NUMOFTOPS]; +static emcute_sub_t _subscriptions[NUMOFTOPS]; +static char _topic_names[NUMOFTOPS][EMCUTE_TOPIC_MAXLEN + 1]; +static char _addr_str[IPV6_ADDR_MAX_STR_LEN]; + +static sock_udp_ep_t _gw = { .family = AF_INET6 }; + +static int _con(int argc, char **argv); +static int _discon(int argc, char **argv); +static int _reg(int argc, char **argv); +static int _pub(int argc, char **argv); +static int _sub(int argc, char **argv); +static int _unsub(int argc, char **argv); +static int _will(int argc, char **argv); +static int _info(int argc, char **argv); + +static const shell_command_t _shell_commands[] = { + { "con", "connect to a MQTT-SN broker", _con }, + { "discon", "disconnect from current broker", _discon }, + { "reg", "register to a topic", _reg }, + { "pub", "publish a number of bytes under a topic", _pub }, + { "sub", "subscribe to a topic", _sub }, + { "unsub", "unsubscribe from a topic", _unsub }, + { "will", "register a last will", _will }, + { "info", "print client state", _info }, + { NULL, NULL, NULL }, +}; + +static void *_emcute_thread(void *arg) +{ + (void)arg; + emcute_run(MQTTSN_DEFAULT_PORT, EMCUTE_ID); + return NULL; /* should never be reached */ +} + +static unsigned _get_qos(const char *str) +{ + int qos = atoi(str); + switch (qos) { + case 1: return EMCUTE_QOS_1; + case 2: return EMCUTE_QOS_2; + default: return EMCUTE_QOS_0; + } +} + +static void _on_pub(const emcute_topic_t *topic, void *data, size_t len) +{ + (void)data; + printf("### got publication of %u bytes for topic '%s' [%d] ###\n", + (unsigned)len, topic->name, (int)topic->id); +} + +static int _con(int argc, char **argv) +{ + char *topic = NULL; + char *message = NULL; + size_t len = 0; + + if (argc < 2) { + printf("usage %s [ ]\n", + argv[0]); + return 1; + } + + if (sock_udp_str2ep(&_gw, argv[1]) != 0) { + puts("error: unable to parse gateway address"); + _gw.port = 0; + return 1; + } + if (_gw.port == 0) { + _gw.port = MQTTSN_DEFAULT_PORT; + } + if (argc >= 4) { + topic = argv[2]; + message = argv[3]; + len = strlen(message); + } + + if (emcute_con(&_gw, true, topic, message, len, 0) != EMCUTE_OK) { + printf("error: unable to connect to %s\n", argv[1]); + _gw.port = 0; + return 1; + } + printf("success: connected to gateway at %s\n", argv[1]); + + return 0; +} + +static int _discon(int argc, char **argv) +{ + (void)argc; + (void)argv; + + int res = emcute_discon(); + if (res == EMCUTE_NOGW) { + puts("error: not connected to any broker"); + return 1; + } + else if (res != EMCUTE_OK) { + puts("error: unable to disconnect"); + return 1; + } + puts("success: disconnect successful"); + return 0; +} + +static int _topic_name_find(const char *name) +{ + int res = -1; + + for (unsigned i = 0; i < NUMOFTOPS; i++) { + if ((_topic_names[i][0] == '\0') && (res < 0)) { + res = i; + } + else if (strncmp(name, _topic_names[i], EMCUTE_TOPIC_MAXLEN) == 0) { + return i; + } + } + + return res; +} + +static int _reg(int argc, char **argv) +{ + emcute_topic_t *t; + int idx; + bool was_set = false; + + if (argc < 2) { + printf("usage: %s \n", argv[0]); + return 1; + } + + idx = _topic_name_find(argv[1]); + if (idx < 0) { + puts("error: no space left to register"); + return 1; + } + if (_topic_names[idx][0] != '\0') { + was_set = true; + } + else { + strncpy(_topic_names[idx], argv[1], EMCUTE_TOPIC_MAXLEN); + } + t = &_topics[idx]; + t->name = _topic_names[idx]; + if (emcute_reg(t) != EMCUTE_OK) { + puts("error: unable to obtain topic ID"); + if (was_set) { + _topic_names[idx][0] = '\0'; + } + return 1; + } + + printf("success: registered to topic '%s [%d]'\n", t->name, t->id); + + return 0; +} + +static int _pub(int argc, char **argv) +{ + unsigned flags = EMCUTE_QOS_0; + int len; + emcute_topic_t *t; + int idx; + + if (argc < 3) { + printf("usage: %s [QoS level]\n", argv[0]); + return 1; + } + + if (argc >= 4) { + flags |= _get_qos(argv[3]); + } + + idx = _topic_name_find(argv[1]); + if ((idx < 0) || !(_topics[idx].name)) { + puts("error: topic not registered"); + return 1; + } + t = &_topics[idx]; + len = atoi(argv[2]); + if ((unsigned)len > sizeof(_pub_buf)) { + printf("error: len %d > %lu\n", len, (unsigned long)sizeof(_pub_buf)); + return 1; + } + memset(_pub_buf, 92, len); + if (emcute_pub(t, _pub_buf, len, flags) != EMCUTE_OK) { + printf("error: unable to publish data to topic '%s [%d]'\n", + t->name, (int)t->id); + return 1; + } + + printf("success: published %d bytes to topic '%s [%d]'\n", + (int)len, t->name, t->id); + + return 0; +} + +static int _sub(int argc, char **argv) +{ + unsigned flags = EMCUTE_QOS_0; + int idx; + bool was_set = false; + + if (argc < 2) { + printf("usage: %s [QoS level]\n", argv[0]); + return 1; + } + + if (strlen(argv[1]) > EMCUTE_TOPIC_MAXLEN) { + puts("error: topic name exceeds maximum possible size"); + return 1; + } + if (argc >= 3) { + flags |= _get_qos(argv[2]); + } + + idx = _topic_name_find(argv[1]); + if (idx < 0) { + puts("error: no space to subscribe"); + } + + _subscriptions[idx].cb = _on_pub; + if (_topic_names[idx][0] != '\0') { + was_set = true; + } + else { + strncpy(_topic_names[idx], argv[1], EMCUTE_TOPIC_MAXLEN); + } + _subscriptions[idx].topic.name = _topic_names[idx]; + if (emcute_sub(&_subscriptions[idx], flags) != EMCUTE_OK) { + printf("error: unable to subscribe to %s\n", argv[1]); + if (was_set) { + _topic_names[idx][0] = '\0'; + } + memset(&_subscriptions[idx], 0, sizeof(emcute_sub_t)); + return 1; + } + + printf("success: now subscribed to %s\n", argv[1]); + return 0; +} + +static int _unsub(int argc, char **argv) +{ + int idx; + + if (argc < 2) { + printf("usage %s \n", argv[0]); + return 1; + } + + idx = _topic_name_find(argv[1]); + if ((idx < 0) || !_subscriptions[idx].topic.name) { + printf("error: no subscription for topic '%s' found\n", argv[1]); + } + else if (emcute_unsub(&_subscriptions[idx]) != EMCUTE_OK) { + printf("error: Unsubscription form '%s' failed\n", argv[1]); + } + else { + memset(&_subscriptions[idx], 0, sizeof(emcute_sub_t)); + printf("success: unsubscribed from '%s'\n", argv[1]); + return 0; + } + return 1; +} + +static int _will(int argc, char **argv) +{ + if (argc < 3) { + printf("usage %s \n", argv[0]); + return 1; + } + + if (emcute_willupd_topic(argv[1], 0) != EMCUTE_OK) { + puts("error: unable to update the last will topic"); + return 1; + } + if (emcute_willupd_msg(argv[2], strlen(argv[2])) != EMCUTE_OK) { + puts("error: unable to update the last will message"); + return 1; + } + + puts("success: updated last will topic and message"); + return 0; +} + +static int _info(int argc, char **argv) +{ + (void)argc; + (void)argv; + if (_gw.port > 0) { + printf("Broker: '[%s]:%u'\n", + ipv6_addr_to_str(_addr_str, (ipv6_addr_t *)_gw.addr.ipv6, + sizeof(_addr_str)), _gw.port); + puts("- Topics:"); + for (unsigned i = 0; i < NUMOFTOPS; i++) { + if (_topics[i].name) { + printf(" %2u: %s\n", _topics[i].id, + _topics[i].name); + } + } + puts("- Subscriptions:"); + for (unsigned i = 0; i < NUMOFTOPS; i++) { + if (_subscriptions[i].topic.name) { + printf(" %2u: %s\n", _subscriptions[i].topic.id, + _subscriptions[i].topic.name); + } + } + } + return 0; +} + +int main(void) +{ + puts("success: starting test application"); + /* start the emcute thread */ + thread_create(_emcute_stack, sizeof(_emcute_stack), EMCUTE_PRIO, 0, + _emcute_thread, NULL, "emcute"); + /* start shell */ + shell_run(_shell_commands, _shell_buffer, sizeof(_shell_buffer)); + return 0; +} diff --git a/tests/emcute/tests/01-run.py b/tests/emcute/tests/01-run.py new file mode 100755 index 0000000000..0be6064f19 --- /dev/null +++ b/tests/emcute/tests/01-run.py @@ -0,0 +1,482 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2018 Freie Universität Berlin +# +# This file is subject to the terms and conditions of the GNU Lesser +# General Public License v2.1. See the file LICENSE in the top level +# directory for more details. + +import os +import pprint +import random +import re +import socket +import sys +import subprocess +import time + +from scapy.all import Automaton, ATMT, log_runtime, MTU, raw, SimpleSocket +from scapy.contrib import mqttsn +from testrunner import run, utils + +TEST_INTERACTIVE_RETRIES = int(os.environ.get('TEST_INTERACTIVE_RETRIES') or 5) +TEST_INTERACTIVE_DELAY = int(os.environ.get('TEST_INTERACTIVE_DELAY') or 1) + +SERVER_PORT = 1883 +MODES = set(["pub", "sub", "sub_w_reg"]) +TIMEOUT = 1 + + +class MQTTSNServer(Automaton): + class MQTTSNServerSocket(SimpleSocket): + def __init__(self, server, *args, **kwargs): + super(MQTTSNServer.MQTTSNServerSocket, self)\ + .__init__(*args, **kwargs) + self.server = server + + def recv(self, x=MTU): + pkt, sa = self.ins.recvfrom(x) + self.server.last_remote = sa + return mqttsn.MQTTSN(pkt) + + def send(self, x): + assert self.server.last_remote is not None + try: + sx = raw(x) + x.sent_time = time.time() + self.outs.sendto(sx, self.server.last_remote) + except socket.error as msg: + log_runtime.error(msg) + + def __init__(self, *args, **kwargs): + kwargs["ll"] = MQTTSNServer.MQTTSNServerSocket + kwargs["recvsock"] = MQTTSNServer.MQTTSNServerSocket + self.last_remote = None + super(MQTTSNServer, self).__init__(*args, **kwargs) + + def parse_args(self, spawn, bind_addr, topic_name, mode, pub_interval, + qos_level=0, + data_len_start=1, data_len_end=1000, data_len_step=1, + bind_port=SERVER_PORT, family=socket.AF_INET, + type=socket.SOCK_DGRAM, proto=0, *args, **kwargs): + assert mode in MODES + super(MQTTSNServer, self).parse_args(*args, **kwargs) + self.spawn = spawn + self.topic_name = topic_name + self.mode = mode + self.pub_interval = pub_interval + self.qos_level = qos_level + self.data_len = data_len_start + self.data_len_end = data_len_end + self.data_len_step = data_len_step + self.last_mid = random.randint(0, 0xffff) + self.topics = [] + self.registered_topics = [] + self.subscriptions = [] + self.res = "" + + sock = socket.socket(family, type, proto) + res = socket.getaddrinfo(bind_addr, bind_port) + sockaddr = res[0][4] + sock.bind(sockaddr) + self.gw_addr = "[{}]:{}".format(sockaddr[0], sockaddr[1]) + self.socket_kargs = {"sock": sock, "server": self} + + # >>> private properties and methods <<< # + @property + def _qos_flags(self): + qos = min(self.qos_level, 2) + if qos < 0: + qos = mqttsn.QOS_NEG1 + return qos + + def _check_pkt_qos(self, pkt): + qos_types = [mqttsn.PUBLISH, mqttsn.SUBSCRIBE] + return (pkt.type not in qos_types) or (pkt.qos == self._qos_flags) + + def _get_tid(self, topic_name): + if topic_name not in self.topics: + self.topics.append(topic_name) + return self.topics.index(topic_name) + 1 + + def _get_topic_name(self, tid): + return self.topics[tid - 1] + + # >>> automaton states <<< # + @ATMT.state(initial=1) + def BEGIN(self): + utils.test_utils_interactive_sync(self.spawn, + TEST_INTERACTIVE_RETRIES, + TEST_INTERACTIVE_DELAY) + raise self.CONNECT_FROM_NODE() + + @ATMT.state() + def CONNECT_FROM_NODE(self): + self.spawn.sendline("con {}".format(self.gw_addr)) + raise self.WAITING(mqttsn.CONNECT) + + @ATMT.state() + def REGISTER_FROM_NODE(self): + self.spawn.sendline("reg {}".format(self.topic_name)) + raise self.WAITING(mqttsn.REGISTER) + + @ATMT.state() + def PUBLISH_FROM_NODE(self, topic_name): + if self.data_len < self.data_len_end: + self.spawn.sendline("pub {} {:d} {:d}" .format(topic_name, + self.data_len, + self.qos_level)) + raise self.WAITING(mqttsn.PUBLISH) + else: + raise self.END() + + @ATMT.state() + def SUBSCRIBE_FROM_NODE(self): + self.spawn.sendline("sub {} {}".format(self.topic_name, + self.qos_level)) + raise self.WAITING(mqttsn.SUBSCRIBE) + + @ATMT.state() + def PUBLISH_TO_NODE(self, subscription): + tid = subscription["tid"] + self.last_mid += 1 + mid = self.last_mid + if self.data_len == 0: + # send deliberately broken length packets + # (to small payload, len field < 256) + self.last_packet = mqttsn.MQTTSN(len=128) / mqttsn.MQTTSNPublish( + qos=self._qos_flags, tid=tid, mid=mid, data="128" + ) + self.send(self.last_packet) + # send deliberately broken length packets + # (to small payload, len field >= 256) + self.last_packet = mqttsn.MQTTSN(len=400) / mqttsn.MQTTSNPublish( + qos=self._qos_flags, tid=tid, mid=mid, data="400" + ) + self.send(self.last_packet) + # send deliberately broken length packets (too large payload) + self.last_packet = mqttsn.MQTTSN(len=10) / mqttsn.MQTTSNPublish( + qos=self._qos_flags, tid=tid, mid=mid, data="X" * 20 + ) + self.send(self.last_packet) + return subscription, mid + if self.data_len < self.data_len_end: + self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNPublish( + qos=self._qos_flags, tid=tid, mid=mid, data="X" * self.data_len + ) + self.send(self.last_packet) + return subscription, mid + else: + raise self.END() + + @ATMT.state() + def WAITING(self, exp_type, tid=None, mid=None): + return exp_type, mid, tid + + @ATMT.state(final=1) + def END(self): + self.spawn.sendline("info") + self.spawn.expect_exact("Broker: '{}'".format(self.gw_addr)) + self.spawn.expect_exact("- Topics") + for tid, topic_name in enumerate(self.registered_topics, 1): + self.spawn.expect_exact(" {:2d}: {}".format(tid, topic_name)) + self.spawn.expect_exact("- Subscriptions") + for sub in self.subscriptions: + self.spawn.expect_exact(" {:2d}: {}".format( + sub["tid"], sub["topic_name"].decode()) + ) + self.spawn.sendline("reboot") + return self.res + + @ATMT.state(error=1) + def UNEXPECTED_MESSAGE_TYPE(self, type, qos=None): + self.res += "\nUnexpected message type {} [{}]".format( + mqttsn.PACKET_TYPE[type], + mqttsn.QOS_LEVELS[qos] if qos is not None else "-", + ) + return self.res + + @ATMT.state(error=1) + def UNEXPECTED_PARAMETERS(self, pkt): + self.res += "\nUnexpected parameters \n" \ + " {}".format(repr(pkt)) + return self.res + + @ATMT.state(error=1) + def MESSAGE_TIMEOUT(self, exp_type): + self.res += "\n{} timed out".format(mqttsn.PACKET_TYPE[exp_type]) + return self.res + + # >>> automaton timeouts, conditions and actions <<< # + @ATMT.timeout(WAITING, TIMEOUT) + def timeout_message(self, args): + raise self.MESSAGE_TIMEOUT(args[0]) + + @ATMT.condition(PUBLISH_TO_NODE, prio=1) + def PUBLISH_asks_for_PUBACK(self, args): + subscription = args[0] + tid = subscription["tid"] + mid = args[1] + if self.last_packet.qos in [mqttsn.QOS_1, mqttsn.QOS_2]: + raise self.WAITING(mqttsn.PUBACK, tid, mid) + + @ATMT.condition(PUBLISH_TO_NODE, prio=2) + def wait_for_PUBLISH_on_node(self, args): + subscription = args[0] + if self.data_len > 0: + self.spawn.expect_exact( + "### got publication of {:d} bytes for topic " + "'{}' [{:d}] ###" + .format(self.data_len, subscription["topic_name"].decode(), + subscription["tid"])) + self.data_len += self.data_len_step + time.sleep(self.pub_interval) + raise self.PUBLISH_TO_NODE(subscription) + + @ATMT.receive_condition(WAITING, prio=1) + def receive_wrong_message(self, pkt, args): + exp_type = args[0] + if pkt.type != exp_type or not self._check_pkt_qos(pkt): + raise self.UNEXPECTED_MESSAGE_TYPE(pkt.type, pkt.qos) + + @ATMT.receive_condition(WAITING, prio=2) + def receive_unexpected_parameters(self, pkt, args): + if pkt.type == mqttsn.PUBLISH: + if self.data_len != len(pkt.data): + raise self.UNEXPECTED_PARAMETERS(pkt) + elif pkt.type == mqttsn.PUBACK: + exp_mid = args[1] + exp_tid = args[2] + if (exp_tid != pkt.tid) or (exp_mid != pkt.mid) or \ + (mqttsn.ACCEPTED != pkt.return_code): + raise self.UNEXPECTED_PARAMETERS(pkt) + + @ATMT.receive_condition(WAITING, prio=2) + def receive_CONNECT_mode_sub(self, pkt, args): + if pkt.type == mqttsn.CONNECT and self.mode == "sub": + raise self.SUBSCRIBE_FROM_NODE() + + @ATMT.receive_condition(WAITING, prio=2) + def receive_CONNECT_mode_pub_or_sub_w_reg(self, pkt, args): + if pkt.type == mqttsn.CONNECT and self.mode in ["pub", "sub_w_reg"]: + raise self.REGISTER_FROM_NODE() + + @ATMT.receive_condition(WAITING, prio=2) + def receive_REGISTER_mode_pub(self, pkt, args): + if pkt.type == mqttsn.REGISTER: + topic_name = pkt.topic_name.decode() + if self.mode in ["pub"]: + raise self.PUBLISH_FROM_NODE(topic_name) \ + .action_parameters(topic_name=topic_name, + mid=pkt.mid) + + @ATMT.receive_condition(WAITING, prio=3) + def receive_REGISTER_mode_sub_w_reg(self, pkt, args): + if pkt.type == mqttsn.REGISTER: + topic_name = pkt.topic_name.decode() + if self.mode in ["sub_w_reg"]: + raise self.SUBSCRIBE_FROM_NODE() \ + .action_parameters(topic_name=topic_name, + mid=pkt.mid) + + @ATMT.receive_condition(WAITING, prio=3) + def receive_PUBLISH(self, pkt, args): + if pkt.type == mqttsn.PUBLISH: + topic_name = self._get_topic_name(pkt.tid) + self.res += ":".join("{:02x}".format(c) for c in pkt.data) + self.data_len += self.data_len_step + raise self.PUBLISH_FROM_NODE(topic_name) \ + .action_parameters(topic_name=topic_name, + qos=pkt.qos, mid=pkt.mid, tid=pkt.tid) + + @ATMT.receive_condition(WAITING, prio=2) + def receive_SUBSCRIBE(self, pkt, args): + if pkt.type == mqttsn.SUBSCRIBE: + if pkt.tid_type in [mqttsn.TID_NORMAL, mqttsn.TID_SHORT]: + topic_name = pkt.topic_name + tid = self._get_tid(pkt.topic_name) + elif pkt.tid_type == mqttsn.TID_PREDEF: + tid = pkt.tid + topic_name = self._get_topic_name(tid) + else: + assert(False) + subscription = {"tid": tid, "topic_name": topic_name} + if subscription not in self.subscriptions: + self.subscriptions.append(subscription) + raise self.PUBLISH_TO_NODE(subscription) \ + .action_parameters(mid=pkt.mid, tid=tid) + + @ATMT.receive_condition(WAITING, prio=2) + def receive_PUBACK(self, pkt, args): + if pkt.type == mqttsn.PUBACK: + self.data_len += self.data_len_step + time.sleep(self.pub_interval) + raise self.PUBLISH_TO_NODE({ + "tid": pkt.tid, + "topic_name": self._get_topic_name(pkt.tid) + }) + + @ATMT.action(receive_CONNECT_mode_sub) + @ATMT.action(receive_CONNECT_mode_pub_or_sub_w_reg) + def send_CONNACK(self): + # send too large packet for reception buffer + # see https://github.com/RIOT-OS/RIOT/pull/12382 + self.last_packet = mqttsn.MQTTSN() / \ + mqttsn.MQTTSNConnack() / ("X" * 525) + self.send(self.last_packet) + # send deliberately broken length packets (too small len) + self.last_packet = mqttsn.MQTTSN(len=2) / mqttsn.MQTTSNConnack() + self.send(self.last_packet) + # send deliberately broken length packets (too large len) + self.last_packet = mqttsn.MQTTSN(len=3, type=mqttsn.CONNACK) + self.send(self.last_packet) + # send deliberately broken length packets (garbage payload) + self.last_packet = mqttsn.MQTTSN(len=128) / \ + mqttsn.MQTTSNConnack() / b"this is garbage" + self.send(self.last_packet) + self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNConnack() + self.send(self.last_packet) + self.spawn.expect_exact("success: connected to gateway at {}" + .format(self.gw_addr)) + + @ATMT.action(receive_REGISTER_mode_pub) + @ATMT.action(receive_REGISTER_mode_sub_w_reg) + def send_REGACK(self, topic_name, mid): + tid = self._get_tid(topic_name) + if topic_name not in self.registered_topics: + self.registered_topics.append(topic_name) + # send deliberately broken length packets (too small len) + self.last_packet = mqttsn.MQTTSN(len=4) / \ + mqttsn.MQTTSNRegack(mid=mid, tid=tid) + self.send(self.last_packet) + # send deliberately broken length packets (too large len) + # include valid MID for extra confusion + self.last_packet = mqttsn.MQTTSN(len=7, type=mqttsn.REGACK) / \ + bytes([tid >> 8, tid & 0xff, mid >> 8, mid & 0xff]) + self.send(self.last_packet) + # send deliberately broken length packets (garbage payload) + self.last_packet = mqttsn.MQTTSN(len=128) / \ + mqttsn.MQTTSNRegack(mid=mid, tid=tid) / b"this is garbage" + self.send(self.last_packet) + self.last_packet = mqttsn.MQTTSN() / \ + mqttsn.MQTTSNRegack(mid=mid, tid=tid) + self.send(self.last_packet) + self.spawn.expect_exact("success: registered to topic '{} [{:d}]'" + .format(topic_name, tid)) + + @ATMT.action(receive_PUBLISH) + def send_PUBACK_if_required(self, qos, topic_name, mid, tid): + if qos in (mqttsn.QOS_1, mqttsn.QOS_2): + # send deliberately broken length packets (too small len) + self.last_packet = mqttsn.MQTTSN(len=4) / \ + mqttsn.MQTTSNPuback(mid=mid, tid=tid) + self.send(self.last_packet) + # send deliberately broken length packets (too large len) + # include valid MID for extra confusion + self.last_packet = mqttsn.MQTTSN(len=7, type=mqttsn.PUBACK) / \ + bytes([tid >> 8, tid & 0xff, mid >> 8, mid & 0xff]) + self.send(self.last_packet) + # send deliberately broken length packets (garbage payload) + self.last_packet = mqttsn.MQTTSN(len=128) / \ + mqttsn.MQTTSNPuback(mid=mid, tid=tid) / b"this is garbage" + self.send(self.last_packet) + self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNPuback(mid=mid, + tid=tid) + self.send(self.last_packet) + self.spawn.expect_exact( + "success: published {:d} bytes to topic '{} [{:d}]'" + .format(self.data_len - self.data_len_step, topic_name, tid) + ) + time.sleep(self.pub_interval) + + @ATMT.action(receive_SUBSCRIBE) + def send_SUBACK(self, mid, tid): + self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNSuback( + tid=tid, mid=mid + ) + self.send(self.last_packet) + self.spawn.expect_exact("success: now subscribed to {}" + .format(self._get_topic_name(tid).decode())) + + +def check_and_search_output(cmd, pattern, res_group, *args, **kwargs): + if isinstance(cmd, str): + kwargs["shell"] = True + output = subprocess.check_output(cmd, *args, **kwargs).decode("utf-8") + for line in output.splitlines(): + m = re.search(pattern, line) + if m is not None: + return m.group(res_group) + return None + + +def get_bridge(tap): + res = check_and_search_output( + "command -v bridge", "^(.*bridge)", 1) + if res is not None: + res = check_and_search_output( + ["bridge", "link"], + r"{}.+master\s+(?P[^\s]+)".format(tap), + "master" + ) + return tap if res is None else res + + +def get_host_lladdr(tap): + res = check_and_search_output( + ["ip", "addr", "show", "dev", tap, "scope", "link"], + r"inet6\s+(?P[0-9A-Fa-f:]+)/\d+", + "lladdr" + ) + if res is None: + raise AssertionError( + "Can't find host link-local address on interface {}" + .format(tap) + ) + else: + return res + + +def testfunc(child): + tap = get_bridge(os.environ["TAP"]) + lladdr = get_host_lladdr(tap) + + time.sleep(1) + DATA_MAX_LEN = 512 - 9 # PUBLISH + 2 byte extra for length + TOPIC_MAX_LEN = 249 # see Makefile + for test_params in [ + {"qos_level": 0, "mode": "sub", "topic_name": "/test", + "data_len_start": 0, "data_len_end": DATA_MAX_LEN, + "data_len_step": 50}, + {"qos_level": 1, "mode": "sub", "topic_name": "/test", + "data_len_start": 0, "data_len_end": DATA_MAX_LEN, + "data_len_step": 50}, + {"qos_level": 1, "mode": "sub", + "topic_name": "/" + ("x" * (TOPIC_MAX_LEN - 1)), + "data_len_start": 8, "data_len_end": 9}, + {"qos_level": 1, "mode": "sub_w_reg", "topic_name": "/test", + "data_len_start": 8, "data_len_end": 9}, + {"qos_level": 0, "mode": "pub", "topic_name": "/test", + "data_len_start": 1, "data_len_end": DATA_MAX_LEN, + "data_len_step": 50}, + {"qos_level": 1, "mode": "pub", "topic_name": "/test", + "data_len_start": 1, "data_len_end": DATA_MAX_LEN, + "data_len_step": 50} + ]: + print("Run test case") + pprint.pprint(test_params, compact=False) + server = MQTTSNServer(child, pub_interval=.001, + family=socket.AF_INET6, + bind_addr=lladdr + "%" + tap, + bind_port=SERVER_PORT, **test_params) + try: + server.run() + finally: + server.stop() + server.socket_kargs["sock"].close() + time.sleep(1) + print("SUCCESS") + + +if __name__ == "__main__": + sys.exit(run(testfunc, timeout=TIMEOUT, echo=False))