1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-12-26 15:03:53 +01:00

Merge pull request #9464 from haukepetersen/add_asymcute

net: add Asymcute, an asynchronous MQTT-SN client implementation
This commit is contained in:
Martine Lenders 2018-07-05 16:00:58 +02:00 committed by GitHub
commit 2e717b226d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 2355 additions and 6 deletions

View File

@ -659,6 +659,14 @@ ifneq (,$(filter openthread_contrib,$(USEMODULE)))
FEATURES_REQUIRED += cpp
endif
ifneq (,$(filter asymcute,$(USEMODULE)))
USEMODULE += sock_udp
USEMODULE += sock_util
USEMODULE += random
USEMODULE += event_timeout
USEMODULE += event_callback
endif
ifneq (,$(filter emcute,$(USEMODULE)))
USEMODULE += core_thread_flags
USEMODULE += sock_udp

View File

@ -0,0 +1,43 @@
# name of your application
APPLICATION = asymcute_mqttsn
# If no BOARD is found in the environment, use this default:
BOARD ?= native
# This has to be the absolute path to the RIOT base directory:
RIOTBASE ?= $(CURDIR)/../..
# Not all boards have enough memory to build the default configuration of this
# example...
BOARD_INSUFFICIENT_MEMORY := airfy-beacon chronos hifive1 microbit msb-430 \
msb-430h nrf51dongle nrf6310 nucleo-f030r8 \
nucleo-f031k6 nucleo-f042k6 nucleo-f070rb \
nucleo-f072rb nucleo-f303k8 nucleo-f334r8 \
nucleo-l031k6 nucleo-l053r8 stm32f0discovery \
telosb wsn430-v1_3b wsn430-v1_4 yunjia-nrf51822 z1
# Include packages that pull up and auto-init the link layer.
# NOTE: 6LoWPAN will be included if IEEE802.15.4 devices are present
USEMODULE += gnrc_netdev_default
USEMODULE += auto_init_gnrc_netif
# Specify the mandatory networking modules for IPv6 and UDP
USEMODULE += gnrc_sock_udp
USEMODULE += gnrc_ipv6_default
# Include MQTT-SN
USEMODULE += asymcute
# Add also the shell, some shell commands
USEMODULE += shell
USEMODULE += shell_commands
USEMODULE += ps
# For testing we also include the ping6 command and some stats
USEMODULE += gnrc_icmpv6_echo
# Comment this out to disable code in RIOT that does safety checking
# which is not needed in a production environment but helps in the
# development process:
DEVELHELP ?= 1
# Change this to 0 show compiler invocation lines by default:
QUIET ?= 1
include $(RIOTBASE)/Makefile.include

View File

@ -0,0 +1,38 @@
## About
This example application demonstrates the usage of the `Asymcute` MQTT-SN client
library. It provides a number of shell commands that can be used to trigger
selected procedures like connecting to a gateway, registration and subscription
of topics, and publishing of data.
## Setup
For this application to do anything useful, a running MQTT-SN gateway is needed.
If you don't have access to one, RIOT provides a simple way to start one locally
by providing a dedicated `rsmb` make target, type:
```
make rsmb
```
This will download, build, and run the Eclipse Mosquitto.rsmb 'Really Small
Message Broker' [(found here)](https://github.com/eclipse/mosquitto.rsmb).
## Usage
Simply type
```
help
```
for a list of available commands.
## NOTE 1
The UDP socket handling for IPv6 based endpoints in the `Mosquitto.rsmb`
implementation is buggy when it comes to handling link local addresses,
as the implementation does not remember the interface on which data comes in,
hindering it from sending out any responses.
Quick workaround: simply use global addresses
## NOTE 2
It also seems that the `Mosquitto.rsmb` implementation has a bug when it comes
to subscribing to topics: if a topic name was formerly registered and the same
topic name is later used for issuing a subscription request, the gateway will
assign a new topic ID to the same topic name, so publish messages to the
initially assigned topic ID will not be seen by that subscription.

View File

@ -0,0 +1,554 @@
/*
* Copyright (C) 2015 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup examples
* @{
*
* @file
* @brief Example application for demonstrating RIOT's MQTT-SN library
* Asymcute
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*
* @}
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <limits.h>
#include "shell.h"
#include "mutex.h"
#include "thread.h"
#include "net/asymcute.h"
#include "net/ipv6/addr.h"
#include "net/sock/udp.h"
#include "net/sock/util.h"
#ifndef REQ_CTX_NUMOF
#define REQ_CTX_NUMOF (8U)
#endif
#ifndef SUB_CTX_NUMOF
#define SUB_CTX_NUMOF (8U)
#endif
#ifndef TOPIC_BUF_NUMOF
#define TOPIC_BUF_NUMOF (8U + SUB_CTX_NUMOF)
#endif
/* needed for the `ping6` shell command */
#define MAIN_QUEUE_SIZE (8)
static msg_t _main_msg_queue[MAIN_QUEUE_SIZE];
#define LISTENER_PRIO (THREAD_PRIORITY_MAIN - 1)
static char listener_stack[ASYMCUTE_LISTENER_STACKSIZE];
static asymcute_con_t _connection;
static asymcute_req_t _reqs[REQ_CTX_NUMOF];
static asymcute_sub_t _subscriptions[SUB_CTX_NUMOF];
static asymcute_topic_t _topics[TOPIC_BUF_NUMOF];
static asymcute_req_t *_get_req_ctx(void)
{
for (unsigned i = 0; i < REQ_CTX_NUMOF; i++) {
if (!asymcute_req_in_use(&_reqs[i])) {
return &_reqs[i];
}
}
puts("error: no request context available\n");
return NULL;
}
static asymcute_sub_t *_get_sub_ctx(void)
{
for (unsigned i = 0; i < SUB_CTX_NUMOF; i++) {
if (!asymcute_sub_active(&_subscriptions[i])) {
return &_subscriptions[i];
}
}
return NULL;
}
static asymcute_sub_t *_find_sub(const char *name)
{
for (unsigned i = 0; i < SUB_CTX_NUMOF; i++) {
if (asymcute_sub_active(&_subscriptions[i]) &&
strcmp(_subscriptions[i].topic->name, name) == 0) {
return &_subscriptions[i];
}
}
return NULL;
}
static uint16_t _topic_parse_pre(const char *name)
{
if (strncmp(name, "pre_", 4) == 0) {
return (uint16_t)atoi(&name[4]);
}
return 0;
}
static int _topic_init(asymcute_topic_t *t, const char *name)
{
uint16_t id = _topic_parse_pre(name);
if (id != 0) {
name = NULL;
}
if (asymcute_topic_init(t, name, id) != ASYMCUTE_OK) {
return 1;
}
return 0;
}
static int _topic_find(asymcute_topic_t *t, const char *name)
{
size_t len = strlen(name);
uint16_t id = _topic_parse_pre(name);
if ((id != 0) || (len == 2)) {
if (t) {
return _topic_init(t, name);
}
return 0;
}
/* need to find topic in list of registered ones */
for (unsigned i = 0; i < TOPIC_BUF_NUMOF; i++) {
if (asymcute_topic_is_reg(&_topics[i]) &&
(strncmp(name, _topics[i].name, sizeof(_topics[i].name)) == 0)) {
if (t) {
memcpy(t, &_topics[i], sizeof(asymcute_topic_t));
}
return 0;
}
}
return 1;
}
static void _topics_clear(void)
{
memset(_topics, 0, sizeof(_topics));
}
static asymcute_topic_t *_topic_get_free(void)
{
for (unsigned i = 0; i < TOPIC_BUF_NUMOF; i++) {
if (!asymcute_topic_is_init(&_topics[i])) {
return &_topics[i];
}
}
return NULL;
}
static void _topic_print_help(void)
{
puts(" topic can be\n"
" - short topic: 2 byte string\n"
" - predefined topic id: pre_XXXXX (e.g. pre_738)\n"
" - normal: any string\n");
}
static int _qos_parse(int argc, char **argv, int pos, unsigned *flags)
{
if (argc <= pos) {
return 0;
}
/* parse QoS level */
int qos = atoi(argv[pos]);
switch (qos) {
case 0: *flags = MQTTSN_QOS_0; break;
case 1: *flags = MQTTSN_QOS_1; break;
case 2: *flags = MQTTSN_QOS_2; break;
default: return -1;
}
return qos;
}
static void _on_con_evt(asymcute_req_t *req, unsigned evt_type)
{
printf("Request %p: ", (void *)req);
switch (evt_type) {
case ASYMCUTE_TIMEOUT:
puts("Timeout");
break;
case ASYMCUTE_REJECTED:
puts("Rejected by gateway");
break;
case ASYMCUTE_CONNECTED:
puts("Connection to gateway established");
break;
case ASYMCUTE_DISCONNECTED:
puts("Connection to gateway closed");
_topics_clear();
break;
case ASYMCUTE_REGISTERED:
puts("Topic registered");
break;
case ASYMCUTE_PUBLISHED:
puts("Data was published");
break;
case ASYMCUTE_SUBSCRIBED:
puts("Subscribed topic");
break;
case ASYMCUTE_UNSUBSCRIBED:
puts("Unsubscribed topic");
break;
case ASYMCUTE_CANCELED:
puts("Canceled");
break;
default:
puts("unknown event");
break;
}
}
static void _on_pub_evt(const asymcute_sub_t *sub, unsigned evt_type,
const void *data, size_t len, void *arg)
{
(void)arg;
if (evt_type == ASYMCUTE_PUBLISHED) {
char *in = (char *)data;
printf("subscription to topic #%i [%s]: NEW DATA\n",
(int)sub->topic->id, sub->topic->name);
printf(" data -> ");
for (size_t i = 0; i < len; i++) {
printf("%c", in[i]);
}
puts("");
printf(" -> %u bytes\n", (unsigned)len);
}
else if (evt_type == ASYMCUTE_CANCELED) {
printf("subscription -> topic #%i [%s]: CANCELED\n",
(int)sub->topic->id, sub->topic->name);
}
}
static int _ok(asymcute_req_t *req)
{
printf("Request %p: issued\n", (void *)req);
return 0;
}
static int _cmd_connect(int argc, char **argv)
{
if (argc < 3) {
printf("usage %s <cli id> <addr> [<will topic> <will msg>]\n",
argv[0]);
return 1;
}
/* get sock ep */
sock_udp_ep_t ep;
if (sock_udp_str2ep(&ep, argv[2]) != 0) {
puts("error: unable to parse gateway address");
return 1;
}
if (ep.port == 0) {
ep.port = MQTTSN_DEFAULT_PORT;
}
/* get request context */
asymcute_req_t *req = _get_req_ctx();
if (req == NULL) {
return 1;
}
if (asymcute_connect(&_connection, req, &ep, argv[1], true, NULL)
!= ASYMCUTE_OK) {
puts("error: failed to issue CONNECT request");
return 1;
}
return _ok(req);
}
static int _cmd_disconnect(int argc, char **argv)
{
(void)argc;
(void)argv;
/* get request context */
asymcute_req_t *req = _get_req_ctx();
if (req == NULL) {
return 1;
}
if (asymcute_disconnect(&_connection, req) != ASYMCUTE_OK) {
puts("error: failed to issue DISCONNECT request");
return 1;
}
return _ok(req);
}
static int _cmd_reg(int argc, char **argv)
{
if (argc < 2) {
printf("usage: %s <topic name>\n", argv[0]);
_topic_print_help();
return 1;
}
if (_topic_find(NULL, argv[1]) == 0) {
puts("success: topic already registered (or no registration needed)\n");
return 0;
}
/* find unused slot */
asymcute_topic_t *t = NULL;
for (unsigned i = 0; i < TOPIC_BUF_NUMOF; i++) {
if (!asymcute_topic_is_reg(&_topics[i])) {
t = &_topics[i];
break;
}
}
if (t == NULL) {
puts("error: no empty slot left for storing the topic\n");
return 1;
}
/* send registration request */
asymcute_req_t *req = _get_req_ctx();
if (req == NULL) {
return 1;
}
if (_topic_init(t, argv[1]) != 0) {
puts("error: unable to initialize topic");
return 1;
}
if (asymcute_register(&_connection, req, t) != ASYMCUTE_OK) {
puts("error: unable to send REGISTER request\n");
return 1;
}
return _ok(req);
}
static int _cmd_unreg(int argc, char **argv)
{
if (argc < 2) {
printf("usage: %s <topic name>\n", argv[0]);
return 1;
}
unsigned i = 0;
for (; i < TOPIC_BUF_NUMOF; i++) {
if (strcmp(argv[1], _topics[i].name) == 0) {
for (unsigned s = 0; s < SUB_CTX_NUMOF; s++) {
if (_subscriptions[i].topic == &_topics[i]) {
puts("error: topic used in active subscription");
return 1;
}
}
memset(&_topics[i], 0, sizeof(asymcute_topic_t));
puts("success: removed local entry for given topic");
break;
}
}
if (i == TOPIC_BUF_NUMOF) {
puts("error: unable to find topic in list of registered topics");
}
return 0;
}
static int _cmd_pub(int argc, char **argv)
{
if (argc < 3) {
printf("usage: %s <topic> <data> [QoS level]\n", argv[0]);
_topic_print_help();
return 1;
}
/* parse and register topic */
asymcute_topic_t t;
if (_topic_find(&t, argv[1]) != 0) {
puts("error: given topic is not registered");
return 1;
}
/* parse QoS level */
unsigned flags = 0;
int qos = _qos_parse(argc, argv, 3, &flags);
if (qos < 0) {
puts("error: unable to parse QoS level");
return 1;
}
/* get request context */
asymcute_req_t *req = _get_req_ctx();
if (req == NULL) {
return 1;
}
/* publish data */
size_t len = strlen(argv[2]);
if (asymcute_publish(&_connection, req, &t, argv[2], len, flags) !=
ASYMCUTE_OK) {
puts("error: unable to send PUBLISH message");
return 1;
}
if (qos == 0) {
printf("Request %p: issued (one way)\n", (void *)req);
return 0;
}
return _ok(req);
}
static int _cmd_sub(int argc, char **argv)
{
if (argc < 2) {
printf("usage: %s <topic> [QoS level]\n", argv[0]);
_topic_print_help();
return 1;
}
/* parse QoS level */
unsigned flags = 0;
int qos = _qos_parse(argc, argv, 2, &flags);
if (qos < 0) {
puts("error: unable to parse QoS level");
return 1;
}
/* get request context */
asymcute_req_t *req = _get_req_ctx();
if (req == NULL) {
return 1;
}
/* get subscription context */
asymcute_sub_t *sub = _get_sub_ctx();
if (sub == NULL) {
puts("error: unable to allocate subscription context");
return 1;
}
/* parse topic */
asymcute_topic_t *t = _topic_get_free();
if (t == NULL) {
puts("error: no free topic memory");
return 1;
}
if (_topic_init(t, argv[1]) != 0) {
puts("error: unable to initialize topic");
return 1;
}
printf("using req %p, sub %p\n", (void *)req, (void *)sub);
if (asymcute_subscribe(&_connection, req, sub, t, _on_pub_evt, NULL, flags)
!= ASYMCUTE_OK) {
asymcute_topic_reset(t);
puts("error: unable to send SUBSCRIBE request");
return 1;
}
return _ok(req);
}
static int _cmd_unsub(int argc, char **argv)
{
if (argc < 2) {
printf("usage: %s <topic>\n", argv[0]);
return 1;
}
/* find active subscription for given topic name */
asymcute_sub_t *sub = _find_sub(argv[1]);
if (sub == NULL) {
puts("error: no subscription for given topic found");
return 1;
}
/* get request context */
asymcute_req_t *req = _get_req_ctx();
if (req == NULL) {
return 1;
}
/* issue unsubscribe request */
if (asymcute_unsubscribe(&_connection, req, sub) != ASYMCUTE_OK) {
puts("error: unable to send UNSUBSCRIBE request");
return 1;
}
return _ok(req);
}
static int _cmd_info(int argc, char **argv)
{
(void)argc;
(void)argv;
puts("--- Asymcute MQTT-SN client state ---");
puts("Topics:");
for (unsigned i = 0; i < TOPIC_BUF_NUMOF; i++) {
printf("topic #%2u - ", i);
if (asymcute_topic_is_reg(&_topics[i])) {
printf("[registered] id: %u, name: %s\n",
(unsigned)_topics[i].id, _topics[i].name);
}
else {
puts("[unused]");
}
}
puts("Subscriptions:");
for (unsigned i = 0; i < SUB_CTX_NUMOF; i++) {
printf("sub #%2u - ", i);
if (asymcute_sub_active(&_subscriptions[i])) {
printf("[subscribed] id: %u, name: %s\n",
(unsigned)_subscriptions[i].topic->id,
_subscriptions[i].topic->name);
}
else {
puts("[unused]");
}
}
return 0;
}
static const shell_command_t shell_commands[] = {
{ "connect", "connect to MQTT-SN gateway", _cmd_connect },
{ "disconnect", "disconnect from MQTT-SN gateway", _cmd_disconnect },
{ "reg", "register a given topic", _cmd_reg },
{ "unreg", "remove a topic registration [locally]", _cmd_unreg },
{ "pub", "publish data", _cmd_pub },
{ "sub", "subscribe to topic", _cmd_sub },
{ "unsub", "unsubscribe from topic", _cmd_unsub },
{ "info", "print state information", _cmd_info },
{ NULL, NULL, NULL },
};
int main(void)
{
puts("Asymcute MQTT-SN example application\n");
puts("Type 'help' to get started and have a look at the README.md for more"
"information.");
/* setup the connection context */
asymcute_listener_run(&_connection, listener_stack, sizeof(listener_stack),
LISTENER_PRIO, _on_con_evt);
/* we need a message queue for the thread running the shell in order to
* receive potentially fast incoming networking packets */
msg_init_queue(_main_msg_queue, MAIN_QUEUE_SIZE);
/* start shell */
char line_buf[SHELL_DEFAULT_BUFSIZE];
shell_run(shell_commands, line_buf, SHELL_DEFAULT_BUFSIZE);
/* should be never reached */
return 0;
}

View File

@ -103,6 +103,9 @@ endif
ifneq (,$(filter gcoap,$(USEMODULE)))
DIRS += net/application_layer/gcoap
endif
ifneq (,$(filter asymcute,$(USEMODULE)))
DIRS += net/application_layer/asymcute
endif
ifneq (,$(filter emcute,$(USEMODULE)))
DIRS += net/application_layer/emcute
endif

View File

@ -84,6 +84,10 @@
#include "ndn-riot/ndn.h"
#endif
#ifdef MODULE_ASYMCUTE
#include "net/asymcute.h"
#endif
#define ENABLE_DEBUG (0)
#include "debug.h"
@ -164,6 +168,10 @@ void auto_init(void)
extern void rdcli_simple_run(void);
rdcli_simple_run();
#endif
#ifdef MODULE_ASYMCUTE
DEBUG("Auto init Asymcute\n");
asymcute_handler_run();
#endif
/* initialize network devices */
#ifdef MODULE_AUTO_INIT_GNRC_NETIF

561
sys/include/net/asymcute.h Normal file
View File

@ -0,0 +1,561 @@
/*
* Copyright (C) 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @defgroup net_asymcute MQTT-SN Client (Asymcute)
* @ingroup net
* @brief Asymcute is an asynchronous MQTT-SN implementation
*
* # About
* `Asymcute` is a asynchronous MQTT-SN client implementation, aiming at
* providing the user a high degree of flexibility. It provides a flexible
* interface that allows users to issue any number of concurrent requests to
* one or more different gateways simultaneously.
*
* # Implementation state
*
* Implemented features:
* - Connecting to multiple gateways simultaneously
* - Registration of topic names
* - Publishing of data (QoS 0 and QoS 1)
* - Subscription to topics
* - Pre-defined topic IDs as well as short and normal topic names
*
* Missing features:
* - Gateway discovery process not implemented
* - Last will feature not implemented
* - No support for QoS level 2
* - No support for wildcard characters in topic names when subscribing
* - Actual granted QoS level on subscription is ignored
*
* @{
* @file
* @brief Asymcute MQTT-SN interface definition
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*/
#ifndef NET_ASYMCUTE_H
#define NET_ASYMCUTE_H
#include <stdint.h>
#include <stddef.h>
#include <stdbool.h>
#include "assert.h"
#include "event/timeout.h"
#include "event/callback.h"
#include "net/mqttsn.h"
#include "net/sock/udp.h"
#include "net/sock/util.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifndef ASYMCUTE_BUFSIZE
/**
* @brief Default buffer size used for receive and request buffers
*/
#define ASYMCUTE_BUFSIZE (128U)
#endif
#ifndef ASYMCUTE_HANDLER_PRIO
/**
* @brief Default priority for Asymcute's handler thread
*/
#define ASYMCUTE_HANDLER_PRIO (THREAD_PRIORITY_MAIN - 2)
#endif
#ifndef ASYMCUTE_HANDLER_STACKSIZE
/**
* @brief Default stack size for Asymcute's handler thread
*/
#define ASYMCUTE_HANDLER_STACKSIZE (THREAD_STACKSIZE_DEFAULT)
#endif
#ifndef ASYMCUTE_LISTENER_PRIO
/**
* @brief Default priority for an Asymcute listener thread
*
* @note Must be of higher priority than @ref ASYMCUTE_HANDLER_PRIO
*/
#define ASYMCUTE_LISTENER_PRIO (THREAD_PRIORITY_MAIN - 3)
#endif
#ifndef ASYMCUTE_LISTENER_STACKSIZE
/**
* @brief Default stack size for an Asymcute listener thread
*/
#define ASYMCUTE_LISTENER_STACKSIZE (THREAD_STACKSIZE_DEFAULT)
#endif
#ifndef ASYMCUTE_ID_MAXLEN
/**
* @brief Maximum client ID length
*
* @note Must be less than (256 - 8) and less than (ASYMCUTE_BUFSIZE - 8)
*/
#define ASYMCUTE_ID_MAXLEN (32U)
#endif
#ifndef ASYMCUTE_TOPIC_MAXLEN
/**
* @brief Maximum topic length
*
* @note Must be less than (256 - 8) AND less than (ASYMCUTE_BUFSIZE - 8).
*/
#define ASYMCUTE_TOPIC_MAXLEN (32U)
#endif
#ifndef ASYMCUTE_KEEPALIVE
/**
* @brief Keep alive interval [in s] communicated to the gateway
*
* For the default value, see spec v1.2, section 7.2 -> T_WAIT: > 5 min
*/
#define ASYMCUTE_KEEPALIVE (360) /* -> 6 min*/
#endif
#ifndef ASYMCUTE_KEEPALIVE_PING
/**
* @brief Interval to use for sending periodic ping messages
*
* The default behavior of this implementation is to send ping messages as soon
* as three quarters of the keep alive interval have passed.
*
* @note Must be less than ASYMCUTE_KEEPALIVE
*/
#define ASYMCUTE_KEEPALIVE_PING ((ASYMCUTE_KEEPALIVE / 4) * 3)
#endif
#ifndef ASYMCUTE_T_RETRY
/**
* @brief Resend interval [in seconds]
*
* For the default value, see spec v1.2, section 7.2 -> T_RETRY: 10 to 15 sec
*/
#define ASYMCUTE_T_RETRY (10U) /* -> 10 sec */
#endif
#ifndef ASYMCUTE_N_RETRY
/**
* @brief Number of retransmissions until requests time out
*
* For the default value, see spec v1.2, section 7.2 -> N_RETRY: 3-5
*/
#define ASYMCUTE_N_RETRY (3U)
#endif
/**
* @brief Return values used by public Asymcute functions
*/
enum {
ASYMCUTE_OK = 0, /**< all is good */
ASYMCUTE_OVERFLOW = -1, /**< error: insufficient buffer space */
ASYMCUTE_GWERR = -2, /**< error: bad gateway connection state */
ASYMCUTE_NOTSUP = -3, /**< error: feature not supported */
ASYMCUTE_BUSY = -4, /**< error: context already in use */
ASYMCUTE_REGERR = -5, /**< error: registration invalid */
ASYMCUTE_SUBERR = -6, /**< error: subscription invalid */
};
/**
* @brief Possible event types passed to the event callback
*/
enum {
ASYMCUTE_TIMEOUT, /**< request timed out */
ASYMCUTE_CANCELED, /**< request was canceled */
ASYMCUTE_REJECTED, /**< request was rejected */
ASYMCUTE_CONNECTED, /**< connected to gateway */
ASYMCUTE_DISCONNECTED, /**< connection got disconnected */
ASYMCUTE_REGISTERED, /**< topic was registered */
ASYMCUTE_PUBLISHED, /**< data was published */
ASYMCUTE_SUBSCRIBED, /**< client was subscribed to topic */
ASYMCUTE_UNSUBSCRIBED, /**< client was unsubscribed from topic */
};
/**
* @brief Forward type declaration for connections contexts
*/
typedef struct asymcute_con asymcute_con_t;
/**
* @brief Forward type declaration for request contexts
*/
typedef struct asymcute_req asymcute_req_t;
/**
* @brief Forward type declaration for subscription contexts
*/
typedef struct asymcute_sub asymcute_sub_t;
/**
* @brief Forward type declaration for topic definitions
*/
typedef struct asymcute_topic asymcute_topic_t;
/**
* @brief Forward type declaration for last will definitions
*/
typedef struct asymcute_will asymcute_will_t;
/**
* @brief Event callback used for communicating connection and request related
* events to the user
*
* @param[in] req pointer to the request context that triggered the event,
* may be NULL of unsolicited events
* @param[in] evt_type type of the event
*/
typedef void(*asymcute_evt_cb_t)(asymcute_req_t *req, unsigned evt_type);
/**
* @brief Callback triggered on events for active subscriptions
*
* @param[in] sub pointer to subscription context triggering this event
* @param[in] evt_type type of the event
* @param[in] data incoming data for PUBLISHED events, may be NULL
* @param[in] len length of @p data in bytes
* @param[in] arg user supplied argument
*/
typedef void(*asymcute_sub_cb_t)(const asymcute_sub_t *sub, unsigned evt_type,
const void *data, size_t len, void *arg);
/**
* @brief Context specific timeout callback, only used internally
*
* @internal
*
* @param[in] con connection context for this timeout
* @param[in] req request that timed out
*
* @return Event type to communicate to the user
*/
typedef unsigned(*asymcute_to_cb_t)(asymcute_con_t *con, asymcute_req_t *req);
/**
* @brief Asymcute request context
*/
struct asymcute_req {
mutex_t lock; /**< synchronization lock */
struct asymcute_req *next; /**< the requests list entry */
asymcute_con_t *con; /**< connection the request is using */
asymcute_to_cb_t cb; /**< internally used callback */
void *arg; /**< internally used additional state */
event_callback_t to_evt; /**< timeout event */
event_timeout_t to_timer; /**< timeout timer */
uint8_t data[ASYMCUTE_BUFSIZE]; /**< buffer holding the request's data */
size_t data_len; /**< length of the request packet in byte */
uint16_t msg_id; /**< used message id for this request */
uint8_t retry_cnt; /**< retransmission counter */
};
/**
* @brief Asymcute connection context
*/
struct asymcute_con {
mutex_t lock; /**< synchronization lock */
sock_udp_t sock; /**< socket used by a connections */
sock_udp_ep_t server_ep; /**< the gateway's UDP endpoint */
asymcute_req_t *pending; /**< list holding pending requests */
asymcute_sub_t *subscriptions; /**< list holding active subscriptions */
asymcute_evt_cb_t user_cb; /**< event callback provided by user */
event_callback_t keepalive_evt; /**< keep alive event */
event_timeout_t keepalive_timer; /**< keep alive timer */
uint16_t last_id; /**< last used message ID for this
* connection */
uint8_t keepalive_retry_cnt; /**< keep alive transmission counter */
uint8_t state; /**< connection state */
uint8_t rxbuf[ASYMCUTE_BUFSIZE]; /**< connection specific receive buf */
char cli_id[ASYMCUTE_ID_MAXLEN + 1];/**< buffer to store client ID */
};
/**
* @brief Data-structure for holding topics and their registration status
*/
struct asymcute_topic {
asymcute_con_t *con; /**< connection used for registration */
char name[ASYMCUTE_TOPIC_MAXLEN + 1]; /**< topic string (ACSII only) */
uint8_t flags; /**< normal, short, or pre-defined */
uint16_t id; /**< topic id */
};
/**
* @brief Data-structure holding the state of subscriptions
*/
struct asymcute_sub {
asymcute_sub_t *next; /**< the subscriptions list entry */
asymcute_topic_t *topic; /**< topic we subscribe to */
asymcute_sub_cb_t cb; /**< called on incoming data */
void *arg; /**< user supplied callback argument */
};
/**
* @brief Data structure for defining a last will
*/
struct asymcute_will {
const char *topic; /**< last will topic */
void *msg; /**< last will message content */
size_t msg_len; /**< length of last will message content */
};
/**
* @brief Check if a given request context is currently used
*
* @param[in] req request context to check
*
* @return true if context is currently used
* @return false if context is not used
*/
static inline bool asymcute_req_in_use(const asymcute_req_t *req)
{
assert(req);
return (req->con != NULL);
}
/**
* @brief Check if a given subscription is currently active
*
* @param[in] sub subscription context to check
*
* @return true if subscription is active
* @return false if subscription is not active
*/
static inline bool asymcute_sub_active(const asymcute_sub_t *sub)
{
assert(sub);
return (sub->topic != NULL);
}
/**
* @brief Reset the given topic
*
* @warning Make sure that the given topic is not used by any subscription or
* last will when calling this function
*
* @param[out] topic topic to reset
*/
static inline void asymcute_topic_reset(asymcute_topic_t *topic)
{
assert(topic);
memset(topic, 0, sizeof(asymcute_topic_t));
}
/**
* @brief Check if a given topic is currently registered with a gateway
*
* @param[in] topic topic to check
*
* @return true if topic is registered
* @return false if topic is not registered
*/
static inline bool asymcute_topic_is_reg(const asymcute_topic_t *topic)
{
assert(topic);
return (topic->con != NULL);
}
/**
* @brief Check if a given topic is initialized
*
* @param[in] topic topic to check
*
* @return true if topic is initialized
* @return false if topic is not initialized
*/
static inline bool asymcute_topic_is_init(const asymcute_topic_t *topic)
{
assert(topic);
return (topic->name[0] != '\0');
}
/**
* @brief Compare two given topics and check if they are equal
*
* @param[in] a topic A
* @param[in] b topic B
*
* @return true if both topics are equal
* @return false if topics differ
*/
static inline bool asymcute_topic_equal(const asymcute_topic_t *a,
const asymcute_topic_t *b)
{
assert(a);
assert(b);
return ((a->flags == b->flags) && (a->id == b->id));
}
/**
* @brief Initialize the given topic
*
* @param[out] topic topic to initialize
* @param[in] topic_name topic name (ASCII), may be NULL if topic should use
* a pre-defined topic ID
* @param[in] topic_id pre-defined topic ID, or don't care if @p topic_name
* is given
*
* @return ASYMCUTE_OK on success
* @return ASYMCUTE_REGERR if topic is already registered
* @return ASYMCUTE_OVERFLOW if topic name does not fit into buffer or if pre-
* defined topic ID is invalid
*/
int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name,
uint16_t topic_id);
/**
* @brief Start a listener thread
*
* @note Must have higher priority then the handler thread (defined by
* @ref ASYMCUTE_HANDLER_PRIO)
*
* @param[in] con connection context to use for this connection
* @param[in] stack stack used to run the listener thread
* @param[in] stacksize size of @p stack in bytes
* @param[in] priority priority of the listener thread created by this function
* @param[in] callback user callback for notification about connection related
* events
*
* @return ASYMCUTE_OK on success
* @return ASYMCUTE_BUSY if connection context is already in use
*/
int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize,
char priority, asymcute_evt_cb_t callback);
/**
* @brief Start the global Asymcute handler thread for processing timeouts and
* keep alive events
*
* This function is typically called during system initialization.
*/
void asymcute_handler_run(void);
/**
* @brief Check if the given connection context is connected to a gateway
*
* @param[in] con connection to check
*
* @return true if context is connected
* @return false if not connected
*/
bool asymcute_is_connected(const asymcute_con_t *con);
/**
* @brief Connect to the given MQTT-SN gateway
*
* @param[in,out] con connection to use
* @param[in,out] req request context to use for CONNECT procedure
* @param[in] server UDP endpoint of the target gateway
* @param[in] cli_id client ID to register with the gateway
* @param[in] clean set `true` to start a clean session
* @param[in] will last will (currently not implemented)
*
* @return ASYMCUTE_OK if CONNECT message has been sent
* @return ASYMCUTE_NOTSUP if last will was given (temporary until implemented)
* @return ASYMCUTE_OVERFLOW if @p cli_id is larger than ASYMCUTE_ID_MAXLEN
* @return ASYMCUTE_GWERR if the connection is not in idle state
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
sock_udp_ep_t *server, const char *cli_id, bool clean,
asymcute_will_t *will);
/**
* @brief Close the given connection
*
* @param[in,out] con connection to close
* @param[in,out] req request context to use for DISCONNECT procedure
*
* @return ASYMCUTE_OK if DISCONNECT message has been sent
* @return ASYMCUTE_GWERR if connection context is not connected
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req);
/**
* @brief Register a given topic with the connected gateway
*
* @param[in] con connection to use
* @param[in,out] req request context to use for REGISTER procedure
* @param[in,out] topic topic to register
*
* @return ASYMCUTE_OK if REGISTER message has been sent
* @return ASYMCUTE_REGERR if topic is already registered
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_register(asymcute_con_t *con, asymcute_req_t *req,
asymcute_topic_t *topic);
/**
* @brief Publish the given data to the given topic
*
* @param[in] con connection to use
* @param[in,out] req request context used for PUBLISH procedure
* @param[in] topic publish data to this topic
* @param[in] data actual payload to send
* @param[in] data_len size of @p data in bytes
* @param[in] flags additional flags (QoS level, DUP, and RETAIN)
*
* @return ASYMCUTE_OK if PUBLISH message has been sent
* @return ASYMCUTE_NOTSUP if unsupported flags have been set
* @return ASYMCUTE_OVERFLOW if data does not fit into transmit buffer
* @return ASYMCUTE_REGERR if given topic is not registered
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req,
const asymcute_topic_t *topic,
const void *data, size_t data_len, uint8_t flags);
/**
* @brief Subscribe to a given topic
*
* @param[in] con connection to use
* @param[in,out] req request context used for SUBSCRIBE procedure
* @param[out] sub subscription context to store subscription state
* @param[in,out] topic topic to subscribe to, must be initialized (see
* asymcute_topic_init())
* @param[in] callback user callback triggered on events for this subscription
* @param[in] arg user supplied argument passed to the event callback
* @param[in] flags additional flags (QoS level and DUP)
*
* @return ASYMCUTE_OK if SUBSCRIBE message has been sent
* @return ASYMCUTE_NOTSUP if invalid or unsupported flags have been set
* @return ASYMCUTE_REGERR if topic is not initialized
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_SUBERR if already subscribed to the given topic
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub, asymcute_topic_t *topic,
asymcute_sub_cb_t callback, void *arg, uint8_t flags);
/**
* @brief Cancel an active subscription
*
* @param[in] con connection to use
* @param[in,out] req request context used for UNSUBSCRIBE procedure
* @param[in,out] sub subscription to cancel
*
* @return ASYMCUTE_OK if UNSUBSCRIBE message has been sent
* @return ASYMCUTE_SUBERR if subscription is not currently active
* @return ASYMCUTE_GWERR if not connected to a gateway
* @return ASYMCUTE_BUSY if the given request context is already in use
*/
int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub);
#ifdef __cplusplus
}
#endif
#endif /* NET_ASYMCUTE_H */
/** @} */

111
sys/include/net/mqttsn.h Normal file
View File

@ -0,0 +1,111 @@
/*
* Copyright (C) 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @defgroup net_mqttsn MQTT-SN Defines
* @ingroup net
* @brief Shared definitions for MQTT-SN
*
* @see http://mqtt.org/new/wp-content/uploads/2009/06/MQTT-SN_spec_v1.2.pdf
*
* @{
* @file
* @brief Generic MQTT-SN definitions
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*/
#ifndef NET_MQTTSN_H
#define NET_MQTTSN_H
#ifdef __cplusplus
extern "C" {
#endif
#ifndef MQTTSN_DEFAULT_PORT
/**
* @brief Default UDP port for MQTT-SN servers
*/
#define MQTTSN_DEFAULT_PORT (1883U)
#endif
/**
* @brief MQTT-SN flags
*
* All MQTT-SN functions only support a sub-set of the available flags. It is up
* to the user to only supply valid/supported flags to a function.
*
* Refer to the MQTT-SN spec section 5.3.4 for further information.
*/
enum {
MQTTSN_DUP = 0x80, /**< duplicate flag */
MQTTSN_QOS_MASK = 0x60, /**< QoS level mask */
MQTTSN_QOS_2 = 0x40, /**< QoS level 2 */
MQTTSN_QOS_1 = 0x20, /**< QoS level 1 */
MQTTSN_QOS_0 = 0x00, /**< QoS level 0 */
MQTTSN_RETAIN = 0x10, /**< retain flag */
MQTTSN_WILL = 0x08, /**< will flag, used during CONNECT */
MQTTSN_CS = 0x04, /**< clean session flag */
MQTTSN_TIT_MASK = 0x03, /**< topic ID type mask */
MQTTSN_TIT_SHORT = 0x02, /**< topic ID: short */
MQTTSN_TIT_PREDEF = 0x01, /**< topic ID: pre-defined */
MQTTSN_TIT_NORMAL = 0x00, /**< topic ID: normal */
};
/**
* @brief MQTT-SN message types
*/
enum {
MQTTSN_ADVERTISE = 0x00, /**< advertise message */
MQTTSN_SEARCHGW = 0x01, /**< search gateway message */
MQTTSN_GWINFO = 0x02, /**< gateway info message */
MQTTSN_CONNECT = 0x04, /**< connect message */
MQTTSN_CONNACK = 0x05, /**< connection acknowledgment message */
MQTTSN_WILLTOPICREQ = 0x06, /**< will topic request */
MQTTSN_WILLTOPIC = 0x07, /**< will topic */
MQTTSN_WILLMSGREQ = 0x08, /**< will message request */
MQTTSN_WILLMSG = 0x09, /**< will message */
MQTTSN_REGISTER = 0x0a, /**< topic registration request */
MQTTSN_REGACK = 0x0b, /**< topic registration acknowledgment */
MQTTSN_PUBLISH = 0x0c, /**< publish message */
MQTTSN_PUBACK = 0x0d, /**< publish acknowledgment */
MQTTSN_PUBCOMP = 0x0e, /**< publish received (QoS 2) */
MQTTSN_PUBREC = 0x0f, /**< publish complete (QoS 2) */
MQTTSN_PUBREL = 0x10, /**< publish release (QoS 2) */
MQTTSN_SUBSCRIBE = 0x12, /**< subscribe message */
MQTTSN_SUBACK = 0x13, /**< subscription acknowledgment */
MQTTSN_UNSUBSCRIBE = 0x14, /**< unsubscribe message */
MQTTSN_UNSUBACK = 0x15, /**< unsubscription acknowledgment */
MQTTSN_PINGREQ = 0x16, /**< ping request */
MQTTSN_PINGRESP = 0x17, /**< ping response */
MQTTSN_DISCONNECT = 0x18, /**< disconnect message */
MQTTSN_WILLTOPICUPD = 0x1a, /**< will topic update request */
MQTTSN_WILLTOPICRESP = 0x1b, /**< will topic update response */
MQTTSN_WILLMSGUPD = 0x1c, /**< will message update request */
MQTTSN_WILLMSGRESP = 0x1d, /**< will topic update response */
};
/**
* @brief MQTT-SN return codes
*
* @see MQTT-SN spec v1.2, section 5.3.10, table 5
*/
enum {
MQTTSN_ACCEPTED = 0x00, /**< accepted */
MQTTSN_REJ_CONGESTION = 0x01, /**< rejected: congestion */
MQTTSN_REJ_INV_TOPIC_ID = 0x02, /**< rejected: invalid topic id */
MQTTSN_REJ_NOTSUP = 0x03, /**< rejected: not supported */
};
#ifdef __cplusplus
}
#endif
#endif /* NET_MQTTSN_H */
/** @} */

View File

@ -1,5 +1,6 @@
/*
* Copyright (C) 2017 Kaspar Schleiser <kaspar@schleiser.de>
* 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
@ -15,19 +16,23 @@
* @{
*
* @file
* @brief sock utility function definitions
* @brief sock utility function definitions
*
* @author Kaspar Schleiser <kaspar@schleiser.de>
* @author Kaspar Schleiser <kaspar@schleiser.de>
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*/
#ifndef NET_SOCK_UTIL_H
#define NET_SOCK_UTIL_H
#include <stdbool.h>
#include "net/sock/udp.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief Format UDP endpoint to string and port
*
@ -73,6 +78,21 @@ int sock_urlsplit(const char *url, char *hostport, char *urlpath);
*/
int sock_udp_str2ep(sock_udp_ep_t *ep_out, const char *str);
/**
* @brief Compare the two given UDP endpoints
*
* The given endpoint identifiers are compared by checking their address family,
* their addresses, and their port value.
*
* @param[in] a Endpoint A
* @param[in] b Endpoint B
*
* @return true if given endpoint identifiers point to the same destination
* @return false if given endpoint identifiers do not point to the same
* destination, or if the address family is unknown
*/
bool sock_udp_ep_equal(const sock_udp_ep_t *a, const sock_udp_ep_t *b);
/**
* @name helper definitions
* @{

View File

@ -0,0 +1,3 @@
MODULE = asymcute
include $(RIOTBASE)/Makefile.base

View File

@ -0,0 +1,973 @@
/*
* Copyright (C) 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup net_asymcute
* @{
*
* @file
* @brief Asynchronous MQTT-SN implementation
*
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
*
* @}
*/
#include <limits.h>
#include "log.h"
#include "random.h"
#include "byteorder.h"
#include "net/asymcute.h"
#define ENABLE_DEBUG (0)
#include "debug.h"
#define PROTOCOL_VERSION (0x01)
#define RETRY_TO (ASYMCUTE_T_RETRY * US_PER_SEC)
#define KEEPALIVE_TO (ASYMCUTE_KEEPALIVE_PING * US_PER_SEC)
#define VALID_PUBLISH_FLAGS (MQTTSN_QOS_1 | MQTTSN_DUP | MQTTSN_RETAIN)
#define VALID_SUBSCRIBE_FLAGS (MQTTSN_QOS_1 | MQTTSN_DUP)
#define MINLEN_CONNACK (3U)
#define MINLEN_DISCONNECT (2U)
#define MINLEN_REGACK (7U)
#define MINLEN_PUBACK (7U)
#define MINLEN_SUBACK (8U)
#define MINLEN_UNSUBACK (4U)
#define IDPOS_REGACK (4U)
#define IDPOS_PUBACK (4U)
#define IDPOS_SUBACK (5U)
#define IDPOS_UNSUBACK (2U)
#define LEN_PINGRESP (2U)
/* Internally used connection states */
enum {
UNINITIALIZED = 0, /**< connection context is not initialized */
NOTCON, /**< not connected to any gateway */
CONNECTING, /**< connection is being setup */
CONNECTED, /**< connection is established */
TEARDOWN, /**< connection is being torn down */
};
/* the main handler thread needs a stack and a message queue */
static event_queue_t _queue;
static char _stack[ASYMCUTE_HANDLER_STACKSIZE];
/* necessary forward function declarations */
static void _on_req_timeout(void *arg);
static size_t _len_set(uint8_t *buf, size_t len)
{
if (len < (0xff - 7)) {
buf[0] = len + 1;
return 1;
}
else {
buf[0] = 0x01;
byteorder_htobebufs(&buf[1], (uint16_t)(len + 3));
return 3;
}
}
static size_t _len_get(uint8_t *buf, size_t *len)
{
if (buf[0] != 0x01) {
*len = (uint16_t)buf[0];
return 1;
}
else {
*len = byteorder_bebuftohs(&buf[1]);
return 3;
}
}
/* @pre con is locked */
static uint16_t _msg_id_next(asymcute_con_t *con)
{
if (++con->last_id == 0) {
return ++con->last_id;
}
return con->last_id;
}
/* @pre con is locked */
static asymcute_req_t *_req_preprocess(asymcute_con_t *con,
size_t msg_len, size_t min_len,
const uint8_t *buf, unsigned id_pos)
{
/* verify message length */
if (msg_len < min_len) {
return NULL;
}
uint16_t msg_id = (buf == NULL) ? 0 : byteorder_bebuftohs(&buf[id_pos]);
asymcute_req_t *res = NULL;
asymcute_req_t *iter = con->pending;
if (iter == NULL) {
return NULL;
}
if (iter->msg_id == msg_id) {
res = iter;
con->pending = iter->next;
}
while (iter && !res) {
if (iter->next && (iter->next->msg_id == msg_id)) {
res = iter->next;
iter->next = iter->next->next;
}
iter = iter->next;
}
if (res) {
res->con = NULL;
event_timeout_clear(&res->to_timer);
}
return res;
}
/* @pre con is locked */
static void _req_remove(asymcute_con_t *con, asymcute_req_t *req)
{
if (con->pending == req) {
con->pending = con->pending->next;
}
for (asymcute_req_t *cur = con->pending; cur; cur = cur->next) {
if (cur->next == req) {
cur->next = cur->next->next;
}
}
req->con = NULL;
}
/* @pre con is locked */
static void _compile_sub_unsub(asymcute_req_t *req, asymcute_con_t *con,
asymcute_sub_t *sub, uint8_t type)
{
size_t topic_len = strlen(sub->topic->name);
size_t pos = _len_set(req->data, (topic_len + 4));
req->msg_id = _msg_id_next(con);
req->data[pos] = type;
req->data[pos + 1] = sub->topic->flags;
byteorder_htobebufs(&req->data[pos + 2], req->msg_id);
memcpy(&req->data[pos + 4], sub->topic->name, topic_len);
req->data_len = (pos + 4 + topic_len);
req->arg = (void *)sub;
}
static void _req_resend(asymcute_req_t *req, asymcute_con_t *con)
{
event_timeout_set(&req->to_timer, RETRY_TO);
sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep);
}
/* @pre con is locked */
static void _req_send(asymcute_req_t *req, asymcute_con_t *con,
asymcute_to_cb_t cb)
{
/* initialize request */
req->con = con;
req->cb = cb;
req->retry_cnt = ASYMCUTE_N_RETRY;
event_callback_init(&req->to_evt, _on_req_timeout, (void *)req);
event_timeout_init(&req->to_timer, &_queue, &req->to_evt.super);
/* add request to the pending queue (if non-con request) */
req->next = con->pending;
con->pending = req;
/* send request */
_req_resend(req, con);
}
static void _req_send_once(asymcute_req_t *req, asymcute_con_t *con)
{
sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep);
mutex_unlock(&req->lock);
}
static void _req_cancel(asymcute_req_t *req)
{
asymcute_con_t *con = req->con;
event_timeout_clear(&req->to_timer);
req->con = NULL;
mutex_unlock(&req->lock);
con->user_cb(req, ASYMCUTE_CANCELED);
}
static void _sub_cancel(asymcute_sub_t *sub)
{
sub->cb(sub, ASYMCUTE_CANCELED, NULL, 0, sub->arg);
sub->topic = NULL;
}
/* @pre con is locked */
static void _disconnect(asymcute_con_t *con, uint8_t state)
{
if (con->state == CONNECTED) {
/* cancel all pending requests */
event_timeout_clear(&con->keepalive_timer);
for (asymcute_req_t *req = con->pending; req; req = req->next) {
_req_cancel(req);
}
con->pending = NULL;
for (asymcute_sub_t *sub = con->subscriptions; sub; sub = sub->next) {
_sub_cancel(sub);
}
con->subscriptions = NULL;
}
con->state = state;
}
static void _on_req_timeout(void *arg)
{
asymcute_req_t *req = (asymcute_req_t *)arg;
/* only process the timeout, if the request is still active */
if (req->con == NULL) {
return;
}
if (req->retry_cnt--) {
/* resend the packet */
_req_resend(req, req->con);
return;
}
else {
asymcute_con_t *con = req->con;
mutex_lock(&con->lock);
_req_remove(con, req);
/* communicate timeout to outer world */
unsigned ret = ASYMCUTE_TIMEOUT;
if (req->cb) {
ret = req->cb(con, req);
}
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
}
static unsigned _on_con_timeout(asymcute_con_t *con, asymcute_req_t *req)
{
(void)req;
con->state = NOTCON;
return ASYMCUTE_TIMEOUT;
}
static unsigned _on_discon_timeout(asymcute_con_t *con, asymcute_req_t *req)
{
(void)req;
con->state = NOTCON;
return ASYMCUTE_DISCONNECTED;
}
static unsigned _on_suback_timeout(asymcute_con_t *con, asymcute_req_t *req)
{
(void)con;
/* reset the subscription context */
asymcute_sub_t *sub = (asymcute_sub_t *)req->arg;
sub->topic = NULL;
return ASYMCUTE_TIMEOUT;
}
static void _on_keepalive_evt(void *arg)
{
asymcute_con_t *con = (asymcute_con_t *)arg;
mutex_lock(&con->lock);
if (con->state != CONNECTED) {
mutex_unlock(&con->lock);
return;
}
if (con->keepalive_retry_cnt) {
/* (re)send keep alive ping and set dedicated retransmit timer */
uint8_t ping[2] = { 2, MQTTSN_PINGREQ };
sock_udp_send(&con->sock, ping, sizeof(ping), &con->server_ep);
con->keepalive_retry_cnt--;
event_timeout_set(&con->keepalive_timer, RETRY_TO);
mutex_unlock(&con->lock);
}
else {
_disconnect(con, NOTCON);
mutex_unlock(&con->lock);
con->user_cb(NULL, ASYMCUTE_DISCONNECTED);
}
}
static void _on_connack(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_CONNACK, NULL, 0);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
/* check return code and mark connection as established */
unsigned ret = ASYMCUTE_REJECTED;
if (data[2] == MQTTSN_ACCEPTED) {
con->state = CONNECTED;
/* start keep alive timer */
event_timeout_set(&con->keepalive_timer, KEEPALIVE_TO);
ret = ASYMCUTE_CONNECTED;
}
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_disconnect(asymcute_con_t *con, size_t len)
{
mutex_lock(&con->lock);
/* we might have triggered the DISCONNECT process ourselves, so make sure
* the pending request is being handled */
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_DISCONNECT, NULL, 0);
/* put the connection back to NOTCON in any case and let the user know */
_disconnect(con, NOTCON);
if (req) {
mutex_unlock(&req->lock);
}
mutex_unlock(&con->lock);
con->user_cb(req, ASYMCUTE_DISCONNECTED);
}
static void _on_pingreq(asymcute_con_t *con)
{
/* simply reply with a PINGRESP message */
mutex_lock(&con->lock);
uint8_t resp[2] = { LEN_PINGRESP, MQTTSN_PINGRESP };
sock_udp_send(&con->sock, resp, sizeof(resp), &con->server_ep);
mutex_unlock(&con->lock);
}
static void _on_pingresp(asymcute_con_t *con)
{
mutex_lock(&con->lock);
/* only handle ping resp message if we are actually waiting for a reply */
if (con->keepalive_retry_cnt < ASYMCUTE_N_RETRY) {
event_timeout_clear(&con->keepalive_timer);
con->keepalive_retry_cnt = ASYMCUTE_N_RETRY;
event_timeout_set(&con->keepalive_timer, KEEPALIVE_TO);
}
mutex_unlock(&con->lock);
}
static void _on_regack(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_REGACK,
data, IDPOS_REGACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
/* check return code */
unsigned ret = ASYMCUTE_REJECTED;
if (data[6] == MQTTSN_ACCEPTED) {
/* finish the registration by applying the topic id */
asymcute_topic_t *topic = (asymcute_topic_t *)req->arg;
topic->id = byteorder_bebuftohs(&data[2]);
topic->con = con;
ret = ASYMCUTE_REGISTERED;
}
/* finally notify the user and free the request */
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_publish(asymcute_con_t *con, uint8_t *data,
size_t pos, size_t len)
{
/* verify message length */
if (len < (pos + 6)) {
return;
}
uint16_t topic_id = byteorder_bebuftohs(&data[pos + 2]);
/* find any subscription for that topic */
mutex_lock(&con->lock);
asymcute_sub_t *sub = NULL;
for (asymcute_sub_t *cur = con->subscriptions; cur; cur = cur->next) {
if (cur->topic->id == topic_id) {
sub = cur;
break;
}
}
/* send PUBACK if needed (QoS > 0 or on invalid topic ID) */
if ((sub == NULL) || (data[pos + 1] & MQTTSN_QOS_1)) {
uint8_t ret = (sub) ? MQTTSN_ACCEPTED : MQTTSN_REJ_INV_TOPIC_ID;
uint8_t pkt[7] = { 7, MQTTSN_PUBACK, 0, 0, 0, 0, ret };
/* copy topic and message id */
memcpy(&pkt[2], &data[pos + 2], 4);
sock_udp_send(&con->sock, pkt, 7, &con->server_ep);
}
/* release the context and notify the user (on success) */
mutex_unlock(&con->lock);
if (sub) {
sub->cb(sub, ASYMCUTE_PUBLISHED,
&data[pos + 6], (len - (pos + 6)), sub->arg);
}
}
static void _on_puback(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_PUBACK,
data, IDPOS_PUBACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
unsigned ret = (data[6] == MQTTSN_ACCEPTED) ?
ASYMCUTE_PUBLISHED : ASYMCUTE_REJECTED;
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_suback(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_SUBACK,
data, IDPOS_SUBACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
unsigned ret = ASYMCUTE_REJECTED;
if (data[7] == MQTTSN_ACCEPTED) {
/* parse and apply assigned topic id */
asymcute_sub_t *sub = (asymcute_sub_t *)req->arg;
sub->topic->id = byteorder_bebuftohs(&data[3]);
sub->topic->con = con;
/* insert subscription to connection context */
sub->next = con->subscriptions;
con->subscriptions = sub;
ret = ASYMCUTE_SUBSCRIBED;
}
/* notify the user */
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ret);
}
static void _on_unsuback(asymcute_con_t *con, const uint8_t *data, size_t len)
{
mutex_lock(&con->lock);
asymcute_req_t *req = _req_preprocess(con, len, MINLEN_UNSUBACK,
data, IDPOS_UNSUBACK);
if (req == NULL) {
mutex_unlock(&con->lock);
return;
}
/* remove subscription from list */
asymcute_sub_t *sub = (asymcute_sub_t *)req->arg;
if (con->subscriptions == sub) {
con->subscriptions = sub->next;
}
else {
for (asymcute_sub_t *e = con->subscriptions; e && e->next; e = e->next) {
if (e->next == sub) {
e->next = e->next->next;
break;
}
}
}
/* reset subscription context */
sub->topic = NULL;
/* notify user */
mutex_unlock(&req->lock);
mutex_unlock(&con->lock);
con->user_cb(req, ASYMCUTE_UNSUBSCRIBED);
}
static void _on_data(asymcute_con_t *con, size_t pkt_len, sock_udp_ep_t *remote)
{
size_t len;
size_t pos = _len_get(con->rxbuf, &len);
/* make sure the incoming data was send by 'our' gateway */
if (!sock_udp_ep_equal(&con->server_ep, remote)) {
return;
}
/* validate incoming data: verify message length */
if ((pkt_len < 2) ||
(pkt_len <= pos) || (pkt_len < len)) {
/* length field of MQTT-SN packet seems to be invalid -> drop the pkt */
return;
}
/* figure out required action based on message type */
uint8_t type = con->rxbuf[pos];
switch (type) {
case MQTTSN_CONNACK:
_on_connack(con, con->rxbuf, len);
break;
case MQTTSN_DISCONNECT:
_on_disconnect(con, len);
break;
case MQTTSN_PINGREQ:
_on_pingreq(con);
break;
case MQTTSN_PINGRESP:
_on_pingresp(con);
break;
case MQTTSN_REGACK:
_on_regack(con, con->rxbuf, len);
break;
case MQTTSN_PUBLISH:
_on_publish(con, con->rxbuf, pos, len);
break;
case MQTTSN_PUBACK:
_on_puback(con, con->rxbuf, len);
break;
case MQTTSN_SUBACK:
_on_suback(con, con->rxbuf, len);
break;
case MQTTSN_UNSUBACK:
_on_unsuback(con, con->rxbuf, len);
break;
default:
break;
}
}
void *_listener(void *arg)
{
asymcute_con_t *con = (asymcute_con_t *)arg;
/* create a socket for this listener, using an ephemeral port */
sock_udp_ep_t local = SOCK_IPV6_EP_ANY;
if (sock_udp_create(&con->sock, &local, NULL, 0) != 0) {
LOG_ERROR("[asymcute] error creating listener socket\n");
return NULL;
}
while (1) {
sock_udp_ep_t remote;
int n = sock_udp_recv(&con->sock, con->rxbuf, ASYMCUTE_BUFSIZE,
SOCK_NO_TIMEOUT, &remote);
if (n > 0) {
_on_data(con, (size_t)n, &remote);
}
}
/* should never be reached */
return NULL;
}
void *_handler(void *arg)
{
(void)arg;
event_queue_init(&_queue);
event_loop(&_queue);
/* should never be reached */
return NULL;
}
int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize,
char priority, asymcute_evt_cb_t callback)
{
/* make sure con is not running */
assert(con);
assert((priority > 0) && (priority < THREAD_PRIORITY_IDLE - 1));
assert(callback);
int ret = ASYMCUTE_OK;
/* make sure the connection context is not already used */
mutex_lock(&con->lock);
if (con->state != UNINITIALIZED) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* initialize the connection context */
memset(con, 0, sizeof(asymcute_con_t));
random_bytes((uint8_t *)&con->last_id, 2);
event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con);
event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super);
con->keepalive_retry_cnt = ASYMCUTE_N_RETRY;
con->state = NOTCON;
con->user_cb = callback;
/* start listener thread */
thread_create(stack,
stacksize,
priority,
THREAD_CREATE_WOUT_YIELD,
_listener,
con,
"asymcute_listener");
end:
mutex_unlock(&con->lock);
return ret;
}
void asymcute_handler_run(void)
{
thread_create(_stack, sizeof(_stack), ASYMCUTE_HANDLER_PRIO,
0, _handler, NULL, "asymcute_main");
}
int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name,
uint16_t topic_id)
{
assert(topic);
size_t len = 0;
if (asymcute_topic_is_reg(topic)) {
return ASYMCUTE_REGERR;
}
if (topic_name == NULL) {
if ((topic_id == 0) || (topic_id == UINT16_MAX)) {
return ASYMCUTE_OVERFLOW;
}
}
else {
len = strlen(topic_name);
if ((len == 0) || (len > ASYMCUTE_TOPIC_MAXLEN)) {
return ASYMCUTE_OVERFLOW;
}
}
/* reset given topic */
asymcute_topic_reset(topic);
/* pre-defined topic ID? */
if (topic_name == NULL) {
topic->id = topic_id;
topic->flags = MQTTSN_TIT_PREDEF;
memcpy(topic->name, &topic_id, 2);
topic->name[2] = '\0';
}
else {
strncpy(topic->name, topic_name, sizeof(topic->name));
if (len == 2) {
memcpy(&topic->id, topic_name, 2);
topic->flags = MQTTSN_TIT_SHORT;
}
}
return ASYMCUTE_OK;
}
bool asymcute_is_connected(const asymcute_con_t *con)
{
return (con->state == CONNECTED);
}
int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
sock_udp_ep_t *server, const char *cli_id, bool clean,
asymcute_will_t *will)
{
assert(con);
assert(req);
assert(server);
assert(cli_id);
int ret = ASYMCUTE_OK;
size_t id_len = strlen(cli_id);
/* the will feature is not yet supported */
if (will) {
return ASYMCUTE_NOTSUP;
}
/* make sure the client ID will fit into the dedicated buffer */
if (id_len > ASYMCUTE_ID_MAXLEN) {
return ASYMCUTE_OVERFLOW;
}
/* check if the context is not already connected to any gateway */
mutex_lock(&con->lock);
if (con->state != NOTCON) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* get mutual access to the request context */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare the connection context */
con->state = CONNECTING;
strncpy(con->cli_id, cli_id, sizeof(con->cli_id));
memcpy(&con->server_ep, server, sizeof(con->server_ep));
/* compile and send connect message */
req->msg_id = 0;
req->data[0] = (uint8_t)(id_len + 6);
req->data[1] = MQTTSN_CONNECT;
req->data[2] = ((clean) ? MQTTSN_CS : 0);
req->data[3] = PROTOCOL_VERSION;
byteorder_htobebufs(&req->data[4], ASYMCUTE_KEEPALIVE);
memcpy(&req->data[6], cli_id, id_len);
req->data_len = (size_t)req->data[0];
_req_send(req, con, _on_con_timeout);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req)
{
assert(con);
assert(req);
int ret = ASYMCUTE_OK;
/* check if we are actually connected */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* get mutual access to the request context */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* put connection into TEARDOWN state */
_disconnect(con, TEARDOWN);
/* prepare and send disconnect message */
req->msg_id = 0;
req->data[0] = 2;
req->data[1] = MQTTSN_DISCONNECT;
req->data_len = 2;
_req_send(req, con, _on_discon_timeout);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_register(asymcute_con_t *con, asymcute_req_t *req,
asymcute_topic_t *topic)
{
assert(con);
assert(req);
assert(topic);
int ret = ASYMCUTE_OK;
/* test if topic is already registered */
if (asymcute_topic_is_reg(topic)) {
return ASYMCUTE_REGERR;
}
/* make sure we are connected */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* get mutual access to the request context */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare topic */
req->arg = (void *)topic;
size_t topic_len = strlen(topic->name);
/* prepare registration request */
req->msg_id = _msg_id_next(con);
size_t pos = _len_set(req->data, (topic_len + 5));
req->data[pos] = MQTTSN_REGISTER;
byteorder_htobebufs(&req->data[pos + 1], 0);
byteorder_htobebufs(&req->data[pos + 3], req->msg_id);
memcpy(&req->data[pos + 5], topic->name, topic_len);
req->data_len = (pos + 5 + topic_len);
/* send the request */
_req_send(req, con, NULL);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req,
const asymcute_topic_t *topic,
const void *data, size_t data_len, uint8_t flags)
{
assert(con);
assert(req);
assert(topic);
assert((data_len == 0) || data);
int ret = ASYMCUTE_OK;
/* check for valid flags */
if ((flags & VALID_PUBLISH_FLAGS) != flags) {
return ASYMCUTE_NOTSUP;
}
/* check for message size */
if ((data_len + 9) > ASYMCUTE_BUFSIZE) {
return ASYMCUTE_OVERFLOW;
}
/* make sure topic is registered */
if (!asymcute_topic_is_reg(topic) || (topic->con != con)) {
return ASYMCUTE_REGERR;
}
/* check if we are connected to a gateway */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* make sure request context is clear to be used */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* get message id */
req->msg_id = _msg_id_next(con);
/* assemble message */
size_t pos = _len_set(req->data, data_len + 6);
req->data[pos] = MQTTSN_PUBLISH;
req->data[pos + 1] = (flags | topic->flags);
byteorder_htobebufs(&req->data[pos + 2], topic->id);
byteorder_htobebufs(&req->data[pos + 4], req->msg_id);
memcpy(&req->data[pos + 6], data, data_len);
req->data_len = (pos + 6 + data_len);
/* publish selected data */
if (flags & MQTTSN_QOS_1) {
_req_send(req, con, NULL);
}
else {
_req_send_once(req, con);
}
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub, asymcute_topic_t *topic,
asymcute_sub_cb_t callback, void *arg, uint8_t flags)
{
assert(con);
assert(req);
assert(sub);
assert(topic);
assert(callback);
int ret = ASYMCUTE_OK;
/* check flags for validity */
if ((flags & VALID_SUBSCRIBE_FLAGS) != flags) {
return ASYMCUTE_NOTSUP;
}
/* is topic initialized? (though it does not need to be registered) */
if (!asymcute_topic_is_init(topic)) {
return ASYMCUTE_REGERR;
}
/* check if we are connected to a gateway */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* check if we are already subscribed to the given topic */
for (asymcute_sub_t *sub = con->subscriptions; sub; sub = sub->next) {
if (asymcute_topic_equal(topic, sub->topic)) {
ret = ASYMCUTE_SUBERR;
goto end;
}
}
/* make sure request context is clear to be used */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare subscription context */
sub->cb = callback;
sub->arg = arg;
sub->topic = topic;
/* send SUBSCRIBE message */
_compile_sub_unsub(req, con, sub, MQTTSN_SUBSCRIBE);
_req_send(req, con, _on_suback_timeout);
end:
mutex_unlock(&con->lock);
return ret;
}
int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req,
asymcute_sub_t *sub)
{
assert(con);
assert(req);
assert(sub);
int ret = ASYMCUTE_OK;
/* make sure the subscription is actually active */
if (!asymcute_sub_active(sub)) {
return ASYMCUTE_SUBERR;
}
/* check if we are connected to a gateway */
mutex_lock(&con->lock);
if (!asymcute_is_connected(con)) {
ret = ASYMCUTE_GWERR;
goto end;
}
/* make sure request context is clear to be used */
if (mutex_trylock(&req->lock) != 1) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* prepare and send UNSUBSCRIBE message */
_compile_sub_unsub(req, con, sub, MQTTSN_UNSUBSCRIBE);
_req_send(req, con, NULL);
end:
mutex_unlock(&con->lock);
return ret;
}

View File

@ -1,5 +1,6 @@
/*
* Copyright (C) 2017 Kaspar Schleiser <kaspar@schleiser.de>
* 2018 Freie Universität Berlin
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
@ -7,11 +8,14 @@
*/
/**
* @ingroup net_sock_util
* @ingroup net_sock_util
* @{
*
* @file
* @brief sock utility functions implementation
* @author Kaspar Schleiser <kaspar@schleiser.de>
* @brief sock utility functions implementation
*
* @author Kaspar Schleiser <kaspar@schleiser.de>
* @author Hauke Petersen <hauke.petersen@fu-berlin.de>
* @}
*/
@ -181,3 +185,26 @@ int sock_udp_str2ep(sock_udp_ep_t *ep_out, const char *str)
#endif
return -EINVAL;
}
bool sock_udp_ep_equal(const sock_udp_ep_t *a, const sock_udp_ep_t *b)
{
assert(a && b);
/* compare family and port */
if ((a->family != b->family) || (a->port != b->port)) {
return false;
}
/* compare addresses */
switch (a->family) {
#ifdef SOCK_HAS_IPV6
case AF_INET6:
return (memcmp(a->addr.ipv6, b->addr.ipv6, 16) == 0);
#endif
case AF_INET:
return (memcmp(a->addr.ipv4, b->addr.ipv4, 4) == 0);
default:
return false;
}
}