mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2025-12-24 22:13:52 +01:00
Merge pull request #12647 from javierfileiv/mqtt-paho
paho-mqtt: add package and example
This commit is contained in:
commit
d918c8e06a
59
examples/paho-mqtt/Makefile
Normal file
59
examples/paho-mqtt/Makefile
Normal file
@ -0,0 +1,59 @@
|
||||
APPLICATION = paho-mqtt-example
|
||||
|
||||
# If no BOARD is found in the environment, use this default:
|
||||
BOARD ?= native
|
||||
|
||||
# This has to be the absolute path to the RIOT base directory:
|
||||
RIOTBASE ?= $(CURDIR)/../..
|
||||
|
||||
# Comment this out to disable code in RIOT that does safety checking
|
||||
# which is not needed in a production environment but helps in the
|
||||
# development process:
|
||||
DEVELHELP ?= 1
|
||||
|
||||
# Change this to 0 show compiler invocation lines by default:
|
||||
QUIET ?= 1
|
||||
|
||||
WIFI_SSID ?= "Your_WiFi_name"
|
||||
WIFI_PASS ?= "Your_secure_password"
|
||||
|
||||
USEMODULE += shell
|
||||
USEMODULE += shell_commands
|
||||
USEMODULE += ps
|
||||
USEMODULE += netdev_default
|
||||
USEPKG += paho-mqtt
|
||||
|
||||
# paho-mqtt depends on TCP support, choose the stack you want
|
||||
LWIP_IPV4 ?= 0
|
||||
|
||||
ifneq (0, $(LWIP_IPV4))
|
||||
USEMODULE += ipv4_addr
|
||||
USEMODULE += lwip_arp
|
||||
USEMODULE += lwip_ipv4
|
||||
USEMODULE += lwip_dhcp_auto
|
||||
USEMODULE += lwip_sock_udp
|
||||
CFLAGS += -DETHARP_SUPPORT_STATIC_ENTRIES=1
|
||||
LWIP_IPV6 ?= 0
|
||||
else
|
||||
LWIP_IPV6 ?= 1
|
||||
endif
|
||||
|
||||
ifneq (0, $(LWIP_IPV6))
|
||||
USEMODULE += ipv6_addr
|
||||
USEMODULE += lwip_ipv6_autoconfig
|
||||
endif
|
||||
|
||||
USEMODULE += lwip_netdev
|
||||
USEMODULE += lwip lwip_sock_ip
|
||||
USEMODULE += lwip_tcp lwip_sock_tcp
|
||||
USEMODULE += lwip_sock_async
|
||||
|
||||
USEMODULE += sock_async_event
|
||||
####
|
||||
|
||||
include $(RIOTBASE)/Makefile.include
|
||||
|
||||
ifneq (,$(filter arch_esp,$(FEATURES_USED)))
|
||||
CFLAGS += -DESP_WIFI_SSID=\"$(WIFI_SSID)\"
|
||||
CFLAGS += -DESP_WIFI_PASS=\"$(WIFI_PASS)\"
|
||||
endif
|
||||
9
examples/paho-mqtt/Makefile.board.dep
Normal file
9
examples/paho-mqtt/Makefile.board.dep
Normal file
@ -0,0 +1,9 @@
|
||||
# Put board specific dependencies here
|
||||
|
||||
ifneq (,$(filter arch_esp,$(FEATURES_USED)))
|
||||
USEMODULE += esp_wifi
|
||||
endif
|
||||
|
||||
ifeq ($(BOARD),native)
|
||||
USEMODULE += netdev_default
|
||||
endif
|
||||
29
examples/paho-mqtt/Makefile.ci
Normal file
29
examples/paho-mqtt/Makefile.ci
Normal file
@ -0,0 +1,29 @@
|
||||
BOARD_INSUFFICIENT_MEMORY := \
|
||||
airfy-beacon \
|
||||
calliope-mini \
|
||||
i-nucleo-lrwan1 \
|
||||
im880b \
|
||||
microbit \
|
||||
nrf6310 \
|
||||
nucleo-f030r8 \
|
||||
nucleo-f031k6 \
|
||||
nucleo-f042k6 \
|
||||
nucleo-f070rb \
|
||||
nucleo-f072rb \
|
||||
nucleo-f303k8 \
|
||||
nucleo-f334r8 \
|
||||
nucleo-l031k6 \
|
||||
nucleo-l053r8 \
|
||||
nucleo-f302r8 \
|
||||
nrf51dongle \
|
||||
stm32f030f4-demo \
|
||||
stm32f0discovery \
|
||||
stm32l0538-disco \
|
||||
bluepill \
|
||||
blackpill \
|
||||
hifive1 \
|
||||
hifive1b \
|
||||
saml11-xpro \
|
||||
saml10-xpro \
|
||||
yunjia-nrf51822 \
|
||||
#
|
||||
50
examples/paho-mqtt/README.md
Normal file
50
examples/paho-mqtt/README.md
Normal file
@ -0,0 +1,50 @@
|
||||
### About
|
||||
This application demonstrates the usage of the Eclipse paho MQTT package in RIOT.
|
||||
|
||||
### Setup
|
||||
For using this example, two prerequisites have to be fulfilled:
|
||||
|
||||
1. You need a running MQTT broker like Mosquitto broker for example. Take a look at
|
||||
[Mosquitto Broker](https://mosquitto.org/). Check online any guide that will
|
||||
help you setting up the broker into some port (a).
|
||||
For example this one for debian base linux users
|
||||
[How to setup a Mosquitto MQTT Server and receive data](https://www.digitalocean.com/community/questions/how-to-setup-a-mosquitto-mqtt-server-and-receive-data-from-owntracks).
|
||||
|
||||
2. Your RIOT node needs to be able to speak to that broker at the same port you set in 1.
|
||||
|
||||
### Setting up RIOT `native` on Linux
|
||||
- Run `sudo ./dist/tools/tapsetup/tapsetup -c 1`
|
||||
|
||||
## Running the example
|
||||
- Run on RIOT's root directory:
|
||||
|
||||
make -C examples/paho-mqtt all term
|
||||
|
||||
- To connect to a broker, use the `con` command:
|
||||
```
|
||||
con <broker ip addr> [port] [clientID] [user] [password] [keepalivetime]
|
||||
```
|
||||
* *broker ip addr*: IPv6 or IPv4 broker address.
|
||||
* *port*: broker port. Default 1883
|
||||
* *client ID*: is the client id you set up on the broker. Default can be set
|
||||
through DEFAULT_MQTT_CLIENT_ID in your makefile. Otherwise is an empty string.
|
||||
* *user*: the one set in the broker, check online tutorial to do it regarding chosen broker.
|
||||
Default user can be set through DEFAULT_MQTT_USER in your makefile. Otherwise is an empty string.
|
||||
* *password*: the one set in the broker, check online tutorial to do it regarding chosen broker.
|
||||
Default user can be set through DEFAULT_MQTT_PWD in your makefile. Otherwise is an empty string.
|
||||
* *keepalivetime*: keep alive in seconds for your client. Default 10 secs.
|
||||
|
||||
- To subscribe to a topic, run `sub` with the topic name as parameter and a QoS
|
||||
level between 1 to 3, e.g.
|
||||
```
|
||||
sub hello/world 2
|
||||
```
|
||||
- To unsubscribe to a topic, run `unsub` with the topic name e.g.
|
||||
```
|
||||
unsub hello/world
|
||||
```
|
||||
|
||||
- For publishing, use the `pub` command with a QoS level between 1 to 3:
|
||||
```
|
||||
pub hello/world "One more beer, please." 2
|
||||
```
|
||||
312
examples/paho-mqtt/main.c
Normal file
312
examples/paho-mqtt/main.c
Normal file
@ -0,0 +1,312 @@
|
||||
/*
|
||||
* Copyright (C) 2019 Javier FILEIV <javier.fileiv@gmail.com>
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @ingroup examples
|
||||
* @{
|
||||
*
|
||||
* @file main.c
|
||||
* @brief Example using MQTT Paho package from RIOT
|
||||
*
|
||||
* @author Javier FILEIV <javier.fileiv@gmail.com>
|
||||
*
|
||||
* @}
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <stdbool.h>
|
||||
#include "xtimer.h"
|
||||
#include "shell.h"
|
||||
#include "thread.h"
|
||||
#include "mutex.h"
|
||||
#include "paho_mqtt.h"
|
||||
#include "MQTTClient.h"
|
||||
|
||||
#define BUF_SIZE 1024
|
||||
#define MQTT_VERSION_v311 4 /* MQTT v3.1.1 version is 4 */
|
||||
#define COMMAND_TIMEOUT_MS 4000
|
||||
|
||||
#ifndef DEFAULT_MQTT_CLIENT_ID
|
||||
#define DEFAULT_MQTT_CLIENT_ID ""
|
||||
#endif
|
||||
|
||||
#ifndef DEFAULT_MQTT_USER
|
||||
#define DEFAULT_MQTT_USER ""
|
||||
#endif
|
||||
|
||||
#ifndef DEFAULT_MQTT_PWD
|
||||
#define DEFAULT_MQTT_PWD ""
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Default MQTT port
|
||||
*/
|
||||
#define DEFAULT_MQTT_PORT 1883
|
||||
|
||||
/**
|
||||
* @brief Keepalive timeout in seconds
|
||||
*/
|
||||
#define DEFAULT_KEEPALIVE_SEC 10
|
||||
|
||||
#ifndef MAX_LEN_TOPIC
|
||||
#define MAX_LEN_TOPIC 100
|
||||
#endif
|
||||
|
||||
#ifndef MAX_TOPICS
|
||||
#define MAX_TOPICS 4
|
||||
#endif
|
||||
|
||||
#define IS_CLEAN_SESSION 1
|
||||
#define IS_RETAINED_MSG 0
|
||||
|
||||
static MQTTClient client;
|
||||
static Network network;
|
||||
static int topic_cnt = 0;
|
||||
static char _topic_to_subscribe[MAX_TOPICS][MAX_LEN_TOPIC];
|
||||
|
||||
static unsigned get_qos(const char *str)
|
||||
{
|
||||
int qos = atoi(str);
|
||||
|
||||
switch (qos) {
|
||||
case 1: return QOS1;
|
||||
case 2: return QOS2;
|
||||
default: return QOS0;
|
||||
}
|
||||
}
|
||||
|
||||
static void _on_msg_received(MessageData *data)
|
||||
{
|
||||
printf("paho_mqtt_example: message received on topic"
|
||||
" %.*s: %.*s\n",
|
||||
(int)data->topicName->lenstring.len,
|
||||
data->topicName->lenstring.data, (int)data->message->payloadlen,
|
||||
(char *)data->message->payload);
|
||||
}
|
||||
|
||||
static int _cmd_discon(int argc, char **argv)
|
||||
{
|
||||
(void)argc;
|
||||
(void)argv;
|
||||
|
||||
topic_cnt = 0;
|
||||
int res = MQTTDisconnect(&client);
|
||||
if (res < 0) {
|
||||
printf("mqtt_example: Unable to disconnect\n");
|
||||
}
|
||||
else {
|
||||
printf("mqtt_example: Disconnect successful\n");
|
||||
}
|
||||
|
||||
NetworkDisconnect(&network);
|
||||
return res;
|
||||
}
|
||||
|
||||
static int _cmd_con(int argc, char **argv)
|
||||
{
|
||||
if (argc < 2) {
|
||||
printf(
|
||||
"usage: %s <brokerip addr> [port] [clientID] [user] [password] "
|
||||
"[keepalivetime]\n",
|
||||
argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
char *remote_ip = argv[1];
|
||||
|
||||
int ret = -1;
|
||||
|
||||
/* ensure client isn't connected in case of a new connection */
|
||||
if (client.isconnected) {
|
||||
printf("mqtt_example: client already connected, disconnecting it\n");
|
||||
MQTTDisconnect(&client);
|
||||
NetworkDisconnect(&network);
|
||||
}
|
||||
|
||||
int port = DEFAULT_MQTT_PORT;
|
||||
if (argc > 2) {
|
||||
port = atoi(argv[2]);
|
||||
}
|
||||
|
||||
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
|
||||
data.MQTTVersion = MQTT_VERSION_v311;
|
||||
|
||||
data.clientID.cstring = DEFAULT_MQTT_CLIENT_ID;
|
||||
if (argc > 3) {
|
||||
data.username.cstring = argv[3];
|
||||
}
|
||||
|
||||
data.username.cstring = DEFAULT_MQTT_USER;
|
||||
if (argc > 4) {
|
||||
data.username.cstring = argv[4];
|
||||
}
|
||||
|
||||
data.password.cstring = DEFAULT_MQTT_PWD;
|
||||
if (argc > 5) {
|
||||
data.password.cstring = argv[5];
|
||||
}
|
||||
|
||||
data.keepAliveInterval = DEFAULT_KEEPALIVE_SEC;
|
||||
if (argc > 6) {
|
||||
data.keepAliveInterval = atoi(argv[6]);
|
||||
}
|
||||
|
||||
data.cleansession = IS_CLEAN_SESSION;
|
||||
data.willFlag = 0;
|
||||
|
||||
printf("mqtt_example: Connecting to MQTT Broker from %s %d\n",
|
||||
remote_ip, port);
|
||||
printf("mqtt_example: Trying to connect to %s , port: %d\n",
|
||||
remote_ip, port);
|
||||
ret = NetworkConnect(&network, remote_ip, port);
|
||||
if (ret < 0) {
|
||||
printf("mqtt_example: Unable to connect\n");
|
||||
return ret;
|
||||
}
|
||||
|
||||
printf("user:%s clientId:%s password:%s\n", data.username.cstring,
|
||||
data.clientID.cstring, data.password.cstring);
|
||||
ret = MQTTConnect(&client, &data);
|
||||
if (ret < 0) {
|
||||
printf("mqtt_example: Unable to connect client %d\n", ret);
|
||||
_cmd_discon(0, NULL);
|
||||
return ret;
|
||||
}
|
||||
else {
|
||||
printf("mqtt_example: Connection successfully\n");
|
||||
}
|
||||
|
||||
return (ret > 0) ? 0 : 1;
|
||||
}
|
||||
|
||||
static int _cmd_pub(int argc, char **argv)
|
||||
{
|
||||
enum QoS qos = QOS0;
|
||||
|
||||
if (argc < 3) {
|
||||
printf("usage: %s <topic name> <string msg> [QoS level]\n",
|
||||
argv[0]);
|
||||
return 1;
|
||||
}
|
||||
if (argc == 4) {
|
||||
qos = get_qos(argv[3]);
|
||||
}
|
||||
MQTTMessage message;
|
||||
message.qos = qos;
|
||||
message.retained = IS_RETAINED_MSG;
|
||||
message.payload = argv[2];
|
||||
message.payloadlen = strlen(message.payload);
|
||||
|
||||
int rc;
|
||||
if ((rc = MQTTPublish(&client, argv[1], &message)) < 0) {
|
||||
printf("mqtt_example: Unable to publish (%d)\n", rc);
|
||||
}
|
||||
else {
|
||||
printf("mqtt_example: Message (%s) has been published to topic %s"
|
||||
"with QOS %d\n",
|
||||
(char *)message.payload, argv[1], (int)message.qos);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int _cmd_sub(int argc, char **argv)
|
||||
{
|
||||
enum QoS qos = QOS0;
|
||||
|
||||
if (argc < 2) {
|
||||
printf("usage: %s <topic name> [QoS level]\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (argc >= 3) {
|
||||
qos = get_qos(argv[2]);
|
||||
}
|
||||
|
||||
if (topic_cnt > MAX_TOPICS) {
|
||||
printf("mqtt_example: Already subscribed to max %d topics,"
|
||||
"call 'unsub' command\n", topic_cnt);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (strlen(argv[1]) > MAX_LEN_TOPIC) {
|
||||
printf("mqtt_example: Not subscribing, topic too long %s\n", argv[1]);
|
||||
return -1;
|
||||
}
|
||||
strncpy(_topic_to_subscribe[topic_cnt], argv[1], strlen(argv[1]));
|
||||
|
||||
printf("mqtt_example: Subscribing to %s\n", _topic_to_subscribe[topic_cnt]);
|
||||
int ret = MQTTSubscribe(&client,
|
||||
_topic_to_subscribe[topic_cnt], qos, _on_msg_received);
|
||||
if (ret < 0) {
|
||||
printf("mqtt_example: Unable to subscribe to %s (%d)\n",
|
||||
_topic_to_subscribe[topic_cnt], ret);
|
||||
_cmd_discon(0, NULL);
|
||||
}
|
||||
else {
|
||||
printf("mqtt_example: Now subscribed to %s, QOS %d\n",
|
||||
argv[1], (int) qos);
|
||||
topic_cnt++;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int _cmd_unsub(int argc, char **argv)
|
||||
{
|
||||
if (argc < 2) {
|
||||
printf("usage %s <topic name>\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
int ret = MQTTUnsubscribe(&client, argv[1]);
|
||||
|
||||
if (ret < 0) {
|
||||
printf("mqtt_example: Unable to unsubscribe from topic: %s\n", argv[1]);
|
||||
_cmd_discon(0, NULL);
|
||||
}
|
||||
else {
|
||||
printf("mqtt_example: Unsubscribed from topic:%s\n", argv[1]);
|
||||
topic_cnt--;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static const shell_command_t shell_commands[] =
|
||||
{
|
||||
{ "con", "connect to MQTT broker", _cmd_con },
|
||||
{ "discon", "disconnect from the current broker", _cmd_discon },
|
||||
{ "pub", "publish something", _cmd_pub },
|
||||
{ "sub", "subscribe topic", _cmd_sub },
|
||||
{ "unsub", "unsubscribe from topic", _cmd_unsub },
|
||||
{ NULL, NULL, NULL }
|
||||
};
|
||||
|
||||
static unsigned char buf[BUF_SIZE];
|
||||
static unsigned char readbuf[BUF_SIZE];
|
||||
|
||||
int main(void)
|
||||
{
|
||||
#ifdef MODULE_LWIP
|
||||
/* let LWIP initialize */
|
||||
xtimer_sleep(1);
|
||||
#endif
|
||||
|
||||
NetworkInit(&network);
|
||||
|
||||
MQTTClientInit(&client, &network, COMMAND_TIMEOUT_MS, buf, BUF_SIZE,
|
||||
readbuf,
|
||||
BUF_SIZE);
|
||||
printf("Running mqtt paho example. Type help for commands info\n");
|
||||
|
||||
MQTTStartTask(&client);
|
||||
|
||||
char line_buf[SHELL_DEFAULT_BUFSIZE];
|
||||
shell_run(shell_commands, line_buf, SHELL_DEFAULT_BUFSIZE);
|
||||
return 0;
|
||||
}
|
||||
10
pkg/paho-mqtt/Makefile
Normal file
10
pkg/paho-mqtt/Makefile
Normal file
@ -0,0 +1,10 @@
|
||||
PKG_NAME = paho-mqtt
|
||||
PKG_URL = https://github.com/eclipse/paho.mqtt.embedded-c.git
|
||||
PKG_VERSION = 29ab2aa29c5e47794284376d7f8386cfd54c3eed
|
||||
PKG_LICENSE = EDL
|
||||
|
||||
include $(RIOTBASE)/pkg/pkg.mk
|
||||
|
||||
all:
|
||||
"$(MAKE)" -C $(PKG_SOURCE_DIR)/MQTTPacket/src/ -f $(CURDIR)/Makefile.$(PKG_NAME)-packet
|
||||
"$(MAKE)" -C $(PKG_SOURCE_DIR)/MQTTClient-C/src/ -f $(CURDIR)/Makefile.$(PKG_NAME)
|
||||
4
pkg/paho-mqtt/Makefile.dep
Normal file
4
pkg/paho-mqtt/Makefile.dep
Normal file
@ -0,0 +1,4 @@
|
||||
USEMODULE += xtimer
|
||||
USEMODULE += paho-mqtt-contrib
|
||||
USEMODULE += paho-mqtt-packet
|
||||
USEMODULE += tsrb
|
||||
8
pkg/paho-mqtt/Makefile.include
Normal file
8
pkg/paho-mqtt/Makefile.include
Normal file
@ -0,0 +1,8 @@
|
||||
INCLUDES += -I$(PKGDIRBASE)/paho-mqtt/MQTTClient-C/src/
|
||||
INCLUDES += -I$(PKGDIRBASE)/paho-mqtt/MQTTPacket/src
|
||||
INCLUDES += -I$(RIOTBASE)/pkg/paho-mqtt/include
|
||||
|
||||
DIRS += $(RIOTBASE)/pkg/paho-mqtt/contrib
|
||||
|
||||
#define to use MQTT Paho as a task using MQTTStartTask()
|
||||
CFLAGS += -DMQTT_TASK=1
|
||||
10
pkg/paho-mqtt/Makefile.paho-mqtt
Normal file
10
pkg/paho-mqtt/Makefile.paho-mqtt
Normal file
@ -0,0 +1,10 @@
|
||||
MODULE = paho-mqtt
|
||||
|
||||
RIOT_MQTT_IFACE_H = paho_mqtt.h
|
||||
|
||||
CFLAGS += -DMQTTCLIENT_PLATFORM_HEADER="$(RIOT_MQTT_IFACE_H)"
|
||||
CFLAGS += -Wno-sign-compare
|
||||
CFLAGS += -Wno-unused-parameter
|
||||
CFLAGS += -Wno-address-of-packed-member
|
||||
|
||||
include $(RIOTBASE)/Makefile.base
|
||||
5
pkg/paho-mqtt/Makefile.paho-mqtt-packet
Normal file
5
pkg/paho-mqtt/Makefile.paho-mqtt-packet
Normal file
@ -0,0 +1,5 @@
|
||||
MODULE = paho-mqtt-packet
|
||||
|
||||
CFLAGS += -Wno-unused-parameter
|
||||
|
||||
include $(RIOTBASE)/Makefile.base
|
||||
3
pkg/paho-mqtt/contrib/Makefile
Normal file
3
pkg/paho-mqtt/contrib/Makefile
Normal file
@ -0,0 +1,3 @@
|
||||
MODULE = paho-mqtt-contrib
|
||||
|
||||
include $(RIOTBASE)/Makefile.base
|
||||
231
pkg/paho-mqtt/contrib/riot_iface.c
Normal file
231
pkg/paho-mqtt/contrib/riot_iface.c
Normal file
@ -0,0 +1,231 @@
|
||||
/*
|
||||
* Copyright (C) 2019 Javier FILEIV <javier.fileiv@gmail.com>
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief MQTT common RIOT interface functions
|
||||
*
|
||||
* @author Javier FILEIV <javier.fileiv@gmail.com>
|
||||
*/
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#ifdef MODULE_IPV6_ADDR
|
||||
#include "net/ipv6/addr.h"
|
||||
#endif
|
||||
#ifdef MODULE_IPV4_ADDR
|
||||
#include "net/ipv4/addr.h"
|
||||
#endif
|
||||
#include "net/sock/tcp.h"
|
||||
#include "paho_mqtt.h"
|
||||
#include "MQTTClient.h"
|
||||
#include "xtimer.h"
|
||||
#include "tsrb.h"
|
||||
#include "log.h"
|
||||
|
||||
#define ENABLE_DEBUG (0)
|
||||
#include "debug.h"
|
||||
|
||||
#define IP_MAX_LEN_ADDRESS (39) /*IPv6 max length */
|
||||
|
||||
#ifndef TSRB_MAX_SIZE
|
||||
#define TSRB_MAX_SIZE (1024)
|
||||
#endif
|
||||
|
||||
#ifdef MODULE_LWIP
|
||||
static uint8_t buffer[TSRB_MAX_SIZE];
|
||||
static uint8_t _temp_buf[TSRB_MAX_SIZE];
|
||||
static tsrb_t tsrb_lwip_tcp;
|
||||
#endif
|
||||
|
||||
#ifndef PAHO_MQTT_YIELD_MS
|
||||
#define PAHO_MQTT_YIELD_MS (10)
|
||||
#endif
|
||||
|
||||
static int mqtt_read(struct Network *n, unsigned char *buf, int len,
|
||||
int timeout_ms)
|
||||
{
|
||||
int _timeout;
|
||||
int _len;
|
||||
void *_buf;
|
||||
int rc = -1;
|
||||
|
||||
if (IS_USED(MODULE_LWIP)) {
|
||||
/* As LWIP doesn't support packet reading byte per byte and
|
||||
* PAHO MQTT reads like that to decode it on the fly,
|
||||
* we read TSRB_MAX_SIZE at once and keep them in a ring buffer.
|
||||
*/
|
||||
_buf = _temp_buf;
|
||||
_len = TSRB_MAX_SIZE;
|
||||
_timeout = 0;
|
||||
}
|
||||
else {
|
||||
_buf = buf;
|
||||
_len = len;
|
||||
_timeout = timeout_ms;
|
||||
}
|
||||
|
||||
uint64_t send_tick = xtimer_now64().ticks64 +
|
||||
xtimer_ticks_from_usec64(timeout_ms * US_PER_MS).ticks64;
|
||||
do {
|
||||
rc = sock_tcp_read(&n->sock, _buf, _len, _timeout);
|
||||
if (rc == -EAGAIN) {
|
||||
rc = 0;
|
||||
}
|
||||
|
||||
if (IS_USED(MODULE_LWIP)) {
|
||||
if (rc > 0) {
|
||||
tsrb_add(&tsrb_lwip_tcp, _temp_buf, rc);
|
||||
}
|
||||
|
||||
rc = tsrb_get(&tsrb_lwip_tcp, buf, len);
|
||||
}
|
||||
} while (rc < len && xtimer_now64().ticks64 < send_tick && rc >= 0);
|
||||
|
||||
#ifdef ENABLE_DEBUG
|
||||
if (IS_USED(MODULE_LWIP) && rc > 0) {
|
||||
DEBUG("MQTT buf asked for %d, available to read %d\n",
|
||||
rc, tsrb_avail(&tsrb_lwip_tcp));
|
||||
for (int i = 0; i < rc; i++) {
|
||||
DEBUG("0x%02X ", buf[i]);
|
||||
}
|
||||
DEBUG("\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int mqtt_write(struct Network *n, unsigned char *buf, int len,
|
||||
int timeout_ms)
|
||||
{
|
||||
/* timeout is controlled by upper layer in PAHO */
|
||||
(void) timeout_ms;
|
||||
|
||||
return sock_tcp_write(&n->sock, buf, len);
|
||||
}
|
||||
|
||||
void NetworkInit(Network *n)
|
||||
{
|
||||
if (IS_USED(MODULE_LWIP)) {
|
||||
tsrb_init(&tsrb_lwip_tcp, buffer, TSRB_MAX_SIZE);
|
||||
}
|
||||
n->mqttread = mqtt_read;
|
||||
n->mqttwrite = mqtt_write;
|
||||
}
|
||||
|
||||
int NetworkConnect(Network *n, char *addr_ip, int port)
|
||||
{
|
||||
int ret =-1;
|
||||
sock_tcp_ep_t remote = SOCK_IPV4_EP_ANY;
|
||||
char _local_ip[IP_MAX_LEN_ADDRESS];
|
||||
|
||||
strncpy(_local_ip, addr_ip, sizeof(_local_ip));
|
||||
if (IS_USED(MODULE_IPV4_ADDR) &&
|
||||
ipv4_addr_from_str((ipv4_addr_t *)&remote.addr, _local_ip)) {
|
||||
remote.port = port;
|
||||
}
|
||||
|
||||
/* ipvN_addr_from_str modifies input buffer */
|
||||
strncpy(_local_ip, addr_ip, sizeof(_local_ip));
|
||||
if (IS_USED(MODULE_IPV6_ADDR) && (remote.port == 0) &&
|
||||
ipv6_addr_from_str((ipv6_addr_t *)&remote.addr, _local_ip)) {
|
||||
remote.port = port;
|
||||
remote.family = AF_INET6;
|
||||
}
|
||||
|
||||
if (remote.port == 0) {
|
||||
LOG_ERROR("Error: unable to parse destination address\n");
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = sock_tcp_connect(&n->sock, &remote, 0, 0);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void NetworkDisconnect(Network *n)
|
||||
{
|
||||
sock_tcp_disconnect(&n->sock);
|
||||
}
|
||||
|
||||
void TimerInit(Timer *timer)
|
||||
{
|
||||
timer->set_ticks.ticks64 = 0;
|
||||
timer->ticks_timeout.ticks64 = 0;
|
||||
}
|
||||
|
||||
char TimerIsExpired(Timer *timer)
|
||||
{
|
||||
return (TimerLeftMS(timer) == 0);
|
||||
}
|
||||
|
||||
void TimerCountdownMS(Timer *timer, unsigned int timeout_ms)
|
||||
{
|
||||
timer->set_ticks = xtimer_now64();
|
||||
timer->ticks_timeout = xtimer_ticks_from_usec64(timeout_ms * US_PER_MS);
|
||||
}
|
||||
|
||||
void TimerCountdown(Timer *timer, unsigned int timeout_s)
|
||||
{
|
||||
TimerCountdownMS(timer, timeout_s * MS_PER_SEC);
|
||||
}
|
||||
|
||||
int TimerLeftMS(Timer *timer)
|
||||
{
|
||||
xtimer_ticks64_t diff_ticks = xtimer_diff64(xtimer_now64(),
|
||||
timer->set_ticks); /* should be always greater than 0 */
|
||||
if (xtimer_less64(diff_ticks, timer->ticks_timeout)) {
|
||||
diff_ticks = xtimer_diff64(timer->ticks_timeout, diff_ticks);
|
||||
return (xtimer_usec_from_ticks64(diff_ticks) / US_PER_MS);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void MutexInit(Mutex *mutex)
|
||||
{
|
||||
mutex_init(&mutex->lock);
|
||||
}
|
||||
|
||||
int MutexLock(Mutex *mutex)
|
||||
{
|
||||
mutex_lock(&mutex->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int MutexUnlock(Mutex *mutex)
|
||||
{
|
||||
mutex_unlock(&mutex->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void *mqtt_riot_run(void *arg)
|
||||
{
|
||||
MQTTClient *client = (MQTTClient *)arg;
|
||||
assert(client);
|
||||
|
||||
while (1) {
|
||||
int rc;
|
||||
MutexLock(&client->mutex);
|
||||
if ((rc = MQTTYield(client, PAHO_MQTT_YIELD_MS)) != 0) {
|
||||
LOG_DEBUG("riot_iface: error while MQTTYield()(%d)\n", rc);
|
||||
}
|
||||
MutexUnlock(&client->mutex);
|
||||
/* let other threads do their work */
|
||||
xtimer_usleep(MQTT_YIELD_POLLING_MS * US_PER_MS);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int ThreadStart(Thread *thread, void (*fn)(void *), void *arg)
|
||||
{
|
||||
(void) fn;
|
||||
thread->pid = thread_create(thread->stack, sizeof(thread->stack),
|
||||
MQTT_THREAD_PRIORITY,
|
||||
THREAD_CREATE_STACKTEST, mqtt_riot_run, arg,
|
||||
"paho_mqtt_riot");
|
||||
return thread->pid;
|
||||
}
|
||||
9
pkg/paho-mqtt/doc.txt
Normal file
9
pkg/paho-mqtt/doc.txt
Normal file
@ -0,0 +1,9 @@
|
||||
/**
|
||||
* @defgroup pkg_paho_mqtt PAHO MQTT framework
|
||||
* @ingroup pkg
|
||||
* @brief The Eclipse Paho project provides open-source client implementations of MQTT for embedded systems
|
||||
* @see https://github.com/eclipse/paho.mqtt.embedded-c
|
||||
*
|
||||
* The Eclipse Paho project provides open-source client
|
||||
* implementations of MQTT.
|
||||
*/
|
||||
198
pkg/paho-mqtt/include/paho_mqtt.h
Normal file
198
pkg/paho-mqtt/include/paho_mqtt.h
Normal file
@ -0,0 +1,198 @@
|
||||
/*
|
||||
* Copyright (C) 2019 Javier FILEIV <javier.fileiv@gmail.com>
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*
|
||||
*/
|
||||
|
||||
/**
|
||||
* @addtogroup pkg_paho_mqtt
|
||||
* @{
|
||||
*
|
||||
* @file
|
||||
* @brief Network MQTT interface definitions
|
||||
*
|
||||
* @author Javier FILEIV <javier.fileiv@gmail.com>
|
||||
*/
|
||||
#ifndef PAHO_MQTT_H
|
||||
#define PAHO_MQTT_H
|
||||
|
||||
#include "mutex.h"
|
||||
#include "xtimer.h"
|
||||
#include "thread.h"
|
||||
#include "net/sock/tcp.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief RIOT's mqtt paho thread priority
|
||||
*/
|
||||
#ifndef MQTT_THREAD_PRIORITY
|
||||
#define MQTT_THREAD_PRIORITY (THREAD_PRIORITY_MAIN - 1)
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief RIOT's mqtt paho thread stack size
|
||||
*/
|
||||
|
||||
#ifndef MQTT_THREAD_STACKSIZE
|
||||
#define MQTT_THREAD_STACKSIZE (THREAD_STACKSIZE_LARGE)
|
||||
#endif
|
||||
/**
|
||||
* @brief MQTT thread YIELD polling time in msecs
|
||||
*/
|
||||
#ifndef MQTT_YIELD_POLLING_MS
|
||||
#define MQTT_YIELD_POLLING_MS (30)
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief struct to get time references within mqtt paho
|
||||
*
|
||||
*/
|
||||
typedef struct {
|
||||
xtimer_ticks64_t set_ticks; /**< timeout ticks */
|
||||
xtimer_ticks64_t ticks_timeout; /**< timeout in ticks */
|
||||
} Timer;
|
||||
|
||||
/**
|
||||
* @brief Initialize timer struct
|
||||
*
|
||||
* @param timer timer to init
|
||||
*/
|
||||
void TimerInit(Timer *timer);
|
||||
|
||||
/**
|
||||
* @brief is timer expired?
|
||||
*
|
||||
* @param timer timer to check
|
||||
*
|
||||
* @return 1 if timer expired, 0 otherwise
|
||||
*/
|
||||
char TimerIsExpired(Timer *timer);
|
||||
|
||||
/**
|
||||
* @brief start timer set to milli seconds
|
||||
*
|
||||
* @param timer timer to start
|
||||
* @param msecs time to set in msecs
|
||||
*/
|
||||
void TimerCountdownMS(Timer *timer, unsigned int msecs);
|
||||
|
||||
/**
|
||||
* @brief start timer set to seconds
|
||||
*
|
||||
* @param timer timer to start
|
||||
* @param secs time to set in secs
|
||||
*/
|
||||
void TimerCountdown(Timer *timer, unsigned int secs);
|
||||
|
||||
/**
|
||||
* @brief Returns millisecs left in timer
|
||||
*
|
||||
* @param timer timer to check
|
||||
*
|
||||
* @return msecs left
|
||||
*/
|
||||
int TimerLeftMS(Timer *timer);
|
||||
|
||||
/**
|
||||
* @brief Network struct within mqtt paho
|
||||
*/
|
||||
typedef struct Network {
|
||||
sock_tcp_t sock; /**< socket number */
|
||||
/**
|
||||
* @brief read internal function
|
||||
*/
|
||||
int (*mqttread) (struct Network*, unsigned char*, int, int);
|
||||
/**
|
||||
* @brief write internal function
|
||||
*/
|
||||
int (*mqttwrite) (struct Network*, unsigned char*, int, int);
|
||||
} Network;
|
||||
|
||||
/**
|
||||
* @brief Initialize network struct
|
||||
*
|
||||
* @param n network struct
|
||||
*/
|
||||
void NetworkInit(Network *n);
|
||||
|
||||
/**
|
||||
* @brief Connect network to host
|
||||
*
|
||||
* @param n network struct
|
||||
* @param address_ip IP address to connect to
|
||||
* @param port_number port to connect to
|
||||
*
|
||||
* @return 0 if success, !=0 otherwise
|
||||
*/
|
||||
int NetworkConnect(Network *n, char* address_ip, int port_number);
|
||||
|
||||
/**
|
||||
* @brief Disconnect network
|
||||
*
|
||||
* @param n network struct
|
||||
*/
|
||||
void NetworkDisconnect(Network *n);
|
||||
|
||||
/**
|
||||
* @brief Mutex struct within mqtt paho
|
||||
*/
|
||||
typedef struct {
|
||||
mutex_t lock; /**< MQTT thread mutex*/
|
||||
} Mutex;
|
||||
|
||||
/**
|
||||
* @brief Initialize mutex struct
|
||||
*
|
||||
* @param mutex pointer
|
||||
*/
|
||||
void MutexInit(Mutex *mutex);
|
||||
|
||||
/**
|
||||
* @brief Locks mutex struct
|
||||
*
|
||||
* @param mutex pointer
|
||||
*
|
||||
* @return 0 if success, !=0 otherwise
|
||||
*/
|
||||
int MutexLock(Mutex *mutex);
|
||||
|
||||
/**
|
||||
* @brief Unlocks mutex struct
|
||||
*
|
||||
* @param mutex pointer
|
||||
*
|
||||
* @return 0 if success, !=0 otherwise
|
||||
*/
|
||||
int MutexUnlock(Mutex *mutex);
|
||||
|
||||
/**
|
||||
* @brief Thread struct within mqtt paho
|
||||
*/
|
||||
typedef struct {
|
||||
char stack[MQTT_THREAD_STACKSIZE]; /**< stack for MQTT thread*/
|
||||
kernel_pid_t pid; /**< MQTT thread pid*/
|
||||
} Thread;
|
||||
|
||||
/**
|
||||
* @brief Start new thread
|
||||
*
|
||||
* @param thread to start
|
||||
* @param fn pointer function to execute
|
||||
* @param arg arguments to pass to that fn
|
||||
*
|
||||
* @return 0 if success, !=0 otherwise
|
||||
*/
|
||||
int ThreadStart(Thread *thread, void (*fn)(void *), void *arg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* PAHO_MQTT_H */
|
||||
/** @} */
|
||||
@ -0,0 +1,30 @@
|
||||
From cbb94538de6b28513685779f542587129f888db6 Mon Sep 17 00:00:00 2001
|
||||
From: Alexandre Abadie <alexandre.abadie@inria.fr>
|
||||
Date: Sun, 7 Jun 2020 10:27:56 +0200
|
||||
Subject: [PATCH 1/1] MQTTClient-C: skip SUCCESS enum on STM32L1/L4/WB cpus
|
||||
|
||||
---
|
||||
MQTTClient-C/src/MQTTClient.h | 6 ++++++
|
||||
1 file changed, 6 insertions(+)
|
||||
|
||||
diff --git a/MQTTClient-C/src/MQTTClient.h b/MQTTClient-C/src/MQTTClient.h
|
||||
index b612341..a051199 100755
|
||||
--- a/MQTTClient-C/src/MQTTClient.h
|
||||
+++ b/MQTTClient-C/src/MQTTClient.h
|
||||
@@ -54,7 +54,13 @@
|
||||
enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 };
|
||||
|
||||
/* all failure return codes must be negative */
|
||||
+#if defined(CPU_FAM_STM32L4) || defined(CPU_FAM_STM32L1) || defined(CPU_FAM_STM32WB) || defined(CPU_FAM_STM32G4)
|
||||
+/* the SUCCESS enum is also defined with stm32 L1, L4 and WB families.
|
||||
+ Since it contains the same value, we just skip its definition here for them. */
|
||||
+enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1};
|
||||
+#else
|
||||
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
|
||||
+#endif
|
||||
|
||||
/* The Platform specific header must define the Network and Timer structures and functions
|
||||
* which operate on them.
|
||||
--
|
||||
2.25.1
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user