net/asymcute: switch to sock_asnyc_event

This commit is contained in:
Hauke Petersen 2020-12-16 12:32:26 +01:00
parent b5a36e9af7
commit ee01950ed7
3 changed files with 118 additions and 180 deletions

View File

@ -802,6 +802,7 @@ endif
ifneq (,$(filter asymcute,$(USEMODULE))) ifneq (,$(filter asymcute,$(USEMODULE)))
USEMODULE += sock_udp USEMODULE += sock_udp
USEMODULE += sock_util USEMODULE += sock_util
USEMODULE += sock_async_event
USEMODULE += random USEMODULE += random
USEMODULE += event_timeout USEMODULE += event_timeout
USEMODULE += event_callback USEMODULE += event_callback

View File

@ -169,22 +169,6 @@ extern "C" {
#define ASYMCUTE_HANDLER_STACKSIZE (THREAD_STACKSIZE_DEFAULT) #define ASYMCUTE_HANDLER_STACKSIZE (THREAD_STACKSIZE_DEFAULT)
#endif #endif
#ifndef ASYMCUTE_LISTENER_PRIO
/**
* @brief Default priority for an Asymcute listener thread
*
* @note Must be of higher priority than @ref ASYMCUTE_HANDLER_PRIO
*/
#define ASYMCUTE_LISTENER_PRIO (THREAD_PRIORITY_MAIN - 3)
#endif
#ifndef ASYMCUTE_LISTENER_STACKSIZE
/**
* @brief Default stack size for an Asymcute listener thread
*/
#define ASYMCUTE_LISTENER_STACKSIZE (THREAD_STACKSIZE_DEFAULT)
#endif
/** /**
* @brief Return values used by public Asymcute functions * @brief Return values used by public Asymcute functions
*/ */
@ -196,6 +180,7 @@ enum {
ASYMCUTE_BUSY = -4, /**< error: context already in use */ ASYMCUTE_BUSY = -4, /**< error: context already in use */
ASYMCUTE_REGERR = -5, /**< error: registration invalid */ ASYMCUTE_REGERR = -5, /**< error: registration invalid */
ASYMCUTE_SUBERR = -6, /**< error: subscription invalid */ ASYMCUTE_SUBERR = -6, /**< error: subscription invalid */
ASYMCUTE_SENDERR = -7, /**< error: unable to sent packet */
}; };
/** /**
@ -295,7 +280,6 @@ struct asymcute_req {
struct asymcute_con { struct asymcute_con {
mutex_t lock; /**< synchronization lock */ mutex_t lock; /**< synchronization lock */
sock_udp_t sock; /**< socket used by a connections */ sock_udp_t sock; /**< socket used by a connections */
sock_udp_ep_t server_ep; /**< the gateway's UDP endpoint */
asymcute_req_t *pending; /**< list holding pending requests */ asymcute_req_t *pending; /**< list holding pending requests */
asymcute_sub_t *subscriptions; /**< list holding active subscriptions */ asymcute_sub_t *subscriptions; /**< list holding active subscriptions */
asymcute_evt_cb_t user_cb; /**< event callback provided by user */ asymcute_evt_cb_t user_cb; /**< event callback provided by user */
@ -471,25 +455,6 @@ static inline bool asymcute_topic_equal(const asymcute_topic_t *a,
int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name, int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name,
uint16_t topic_id); uint16_t topic_id);
/**
* @brief Start a listener thread
*
* @note Must have higher priority then the handler thread (defined by
* @ref ASYMCUTE_HANDLER_PRIO)
*
* @param[in] con connection context to use for this connection
* @param[in] stack stack used to run the listener thread
* @param[in] stacksize size of @p stack in bytes
* @param[in] priority priority of the listener thread created by this function
* @param[in] callback user callback for notification about connection related
* events
*
* @return ASYMCUTE_OK on success
* @return ASYMCUTE_BUSY if connection context is already in use
*/
int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize,
char priority, asymcute_evt_cb_t callback);
/** /**
* @brief Start the global Asymcute handler thread for processing timeouts and * @brief Start the global Asymcute handler thread for processing timeouts and
* keep alive events * keep alive events
@ -517,16 +482,17 @@ bool asymcute_is_connected(const asymcute_con_t *con);
* @param[in] cli_id client ID to register with the gateway * @param[in] cli_id client ID to register with the gateway
* @param[in] clean set `true` to start a clean session * @param[in] clean set `true` to start a clean session
* @param[in] will last will (currently not implemented) * @param[in] will last will (currently not implemented)
* @param[in] callback user callback triggered on defined events
* *
* @return ASYMCUTE_OK if CONNECT message has been sent * @return ASYMCUTE_OK if CONNECT message has been sent
* @return ASYMCUTE_NOTSUP if last will was given (temporary until implemented) * @return ASYMCUTE_NOTSUP if last will was given (temporary until implemented)
* @return ASYMCUTE_OVERFLOW if @p cli_id is larger than ASYMCUTE_ID_MAXLEN * @return ASYMCUTE_OVERFLOW if @p cli_id is larger than ASYMCUTE_ID_MAXLEN
* @return ASYMCUTE_GWERR if the connection is not in idle state * @return ASYMCUTE_GWERR if initializing the socket for the connection failed
* @return ASYMCUTE_BUSY if the given request context is already in use * @return ASYMCUTE_BUSY if the connection or the request context are in use
*/ */
int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
sock_udp_ep_t *server, const char *cli_id, bool clean, sock_udp_ep_t *server, const char *cli_id, bool clean,
asymcute_will_t *will); asymcute_will_t *will, asymcute_evt_cb_t callback);
/** /**
* @brief Close the given connection * @brief Close the given connection

View File

@ -21,11 +21,11 @@
#include <assert.h> #include <assert.h>
#include <limits.h> #include <limits.h>
#include "log.h"
#include "random.h" #include "random.h"
#include "byteorder.h" #include "byteorder.h"
#include "timex.h" #include "timex.h"
#include "net/sock/async/event.h"
#include "net/asymcute.h" #include "net/asymcute.h"
#define ENABLE_DEBUG 0 #define ENABLE_DEBUG 0
@ -53,10 +53,11 @@
#define LEN_PINGRESP (2U) #define LEN_PINGRESP (2U)
#define MIN_PKT_LEN (2)
/* Internally used connection states */ /* Internally used connection states */
enum { enum {
UNINITIALIZED = 0, /**< connection context is not initialized */ NOTCON = 0, /**< not connected to any gateway */
NOTCON, /**< not connected to any gateway */
CONNECTING, /**< connection is being setup */ CONNECTING, /**< connection is being setup */
CONNECTED, /**< connection is established */ CONNECTED, /**< connection is established */
TEARDOWN, /**< connection is being torn down */ TEARDOWN, /**< connection is being torn down */
@ -175,14 +176,19 @@ static void _compile_sub_unsub(asymcute_req_t *req, asymcute_con_t *con,
req->arg = sub; req->arg = sub;
} }
static void _req_resend(asymcute_req_t *req, asymcute_con_t *con) static ssize_t _req_resend(asymcute_req_t *req, asymcute_con_t *con, int initial)
{ {
ssize_t n = sock_udp_send(&con->sock, req->data, req->data_len, NULL);
/* if sending the initial packet fails we do not set the retry timer, as we
* handle the return value directly */
if (!((initial == 1) && (n < MIN_PKT_LEN))) {
event_timeout_set(&req->to_timer, RETRY_TO); event_timeout_set(&req->to_timer, RETRY_TO);
sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep); }
return n;
} }
/* @pre con is locked */ /* @pre con is locked */
static void _req_send(asymcute_req_t *req, asymcute_con_t *con, static int _req_send(asymcute_req_t *req, asymcute_con_t *con,
asymcute_to_cb_t cb) asymcute_to_cb_t cb)
{ {
/* initialize request */ /* initialize request */
@ -195,13 +201,20 @@ static void _req_send(asymcute_req_t *req, asymcute_con_t *con,
req->next = con->pending; req->next = con->pending;
con->pending = req; con->pending = req;
/* send request */ /* send request */
_req_resend(req, con); ssize_t n = _req_resend(req, con, 1);
if (n < MIN_PKT_LEN) {
req->con = NULL;
mutex_unlock(&req->lock);
return ASYMCUTE_SENDERR;
}
return ASYMCUTE_OK;
} }
static void _req_send_once(asymcute_req_t *req, asymcute_con_t *con) static int _req_send_once(asymcute_req_t *req, asymcute_con_t *con)
{ {
sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep); ssize_t n = sock_udp_send(&con->sock, req->data, req->data_len, NULL);
mutex_unlock(&req->lock); mutex_unlock(&req->lock);
return (n >= MIN_PKT_LEN) ? ASYMCUTE_OK : ASYMCUTE_SENDERR;
} }
static void _req_cancel(asymcute_req_t *req) static void _req_cancel(asymcute_req_t *req)
@ -234,6 +247,9 @@ static void _disconnect(asymcute_con_t *con, uint8_t state)
} }
con->subscriptions = NULL; con->subscriptions = NULL;
} }
if (state == NOTCON) {
sock_udp_close(&con->sock);
}
con->state = state; con->state = state;
} }
@ -248,7 +264,7 @@ static void _on_req_timeout(void *arg)
if (req->retry_cnt--) { if (req->retry_cnt--) {
/* resend the packet */ /* resend the packet */
_req_resend(req, req->con); _req_resend(req, req->con, 0);
return; return;
} }
else { else {
@ -271,6 +287,7 @@ static unsigned _on_con_timeout(asymcute_con_t *con, asymcute_req_t *req)
(void)req; (void)req;
con->state = NOTCON; con->state = NOTCON;
sock_udp_close(&con->sock);
return ASYMCUTE_TIMEOUT; return ASYMCUTE_TIMEOUT;
} }
@ -278,7 +295,7 @@ static unsigned _on_discon_timeout(asymcute_con_t *con, asymcute_req_t *req)
{ {
(void)req; (void)req;
con->state = NOTCON; _disconnect(con, NOTCON);
return ASYMCUTE_DISCONNECTED; return ASYMCUTE_DISCONNECTED;
} }
@ -309,7 +326,7 @@ static void _on_keepalive_evt(void *arg)
if (con->keepalive_retry_cnt) { if (con->keepalive_retry_cnt) {
/* (re)send keep alive ping and set dedicated retransmit timer */ /* (re)send keep alive ping and set dedicated retransmit timer */
uint8_t ping[2] = { 2, MQTTSN_PINGREQ }; uint8_t ping[2] = { 2, MQTTSN_PINGREQ };
sock_udp_send(&con->sock, ping, sizeof(ping), &con->server_ep); sock_udp_send(&con->sock, ping, sizeof(ping), NULL);
con->keepalive_retry_cnt--; con->keepalive_retry_cnt--;
event_timeout_set(&con->keepalive_timer, RETRY_TO); event_timeout_set(&con->keepalive_timer, RETRY_TO);
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
@ -360,7 +377,6 @@ static void _on_disconnect(asymcute_con_t *con, size_t len)
} }
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
con->user_cb(req, ASYMCUTE_DISCONNECTED); con->user_cb(req, ASYMCUTE_DISCONNECTED);
} }
static void _on_pingreq(asymcute_con_t *con) static void _on_pingreq(asymcute_con_t *con)
@ -368,7 +384,7 @@ static void _on_pingreq(asymcute_con_t *con)
/* simply reply with a PINGRESP message */ /* simply reply with a PINGRESP message */
mutex_lock(&con->lock); mutex_lock(&con->lock);
uint8_t resp[2] = { LEN_PINGRESP, MQTTSN_PINGRESP }; uint8_t resp[2] = { LEN_PINGRESP, MQTTSN_PINGRESP };
sock_udp_send(&con->sock, resp, sizeof(resp), &con->server_ep); sock_udp_send(&con->sock, resp, sizeof(resp), NULL);
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
} }
@ -440,7 +456,7 @@ static void _on_publish(asymcute_con_t *con, uint8_t *data,
uint8_t pkt[7] = { 7, MQTTSN_PUBACK, 0, 0, 0, 0, ret }; uint8_t pkt[7] = { 7, MQTTSN_PUBACK, 0, 0, 0, 0, ret };
/* copy topic and message id */ /* copy topic and message id */
memcpy(&pkt[2], &data[pos + 2], 4); memcpy(&pkt[2], &data[pos + 2], 4);
sock_udp_send(&con->sock, pkt, 7, &con->server_ep); sock_udp_send(&con->sock, pkt, 7, NULL);
} }
/* release the context and notify the user (on success) */ /* release the context and notify the user (on success) */
@ -541,21 +557,28 @@ static void _on_unsuback(asymcute_con_t *con, const uint8_t *data, size_t len)
con->user_cb(req, ASYMCUTE_UNSUBSCRIBED); con->user_cb(req, ASYMCUTE_UNSUBSCRIBED);
} }
static void _on_data(asymcute_con_t *con, size_t pkt_len, sock_udp_ep_t *remote) void *_eventloop(void *arg)
{ {
if (pkt_len < 2) { (void)arg;
return; event_queue_init(&_queue);
event_loop(&_queue);
/* should never be reached */
return NULL;
} }
void _on_pkt(sock_udp_t *sock, sock_async_flags_t type, void *arg)
{
asymcute_con_t *con = (asymcute_con_t *)arg;
if (type & SOCK_ASYNC_MSG_RECV) {
ssize_t pkt_len = sock_udp_recv(sock, con->rxbuf,
CONFIG_ASYMCUTE_BUFSIZE, 0, NULL);
if (pkt_len >= MIN_PKT_LEN) {
size_t len; size_t len;
size_t pos = _len_get(con->rxbuf, &len); size_t pos = _len_get(con->rxbuf, &len);
/* make sure the incoming data was send by 'our' gateway */
if (!sock_udp_ep_equal(&con->server_ep, remote)) {
return;
}
/* validate incoming data: verify message length */ /* validate incoming data: verify message length */
if ((pkt_len <= pos) || (pkt_len < len)) { if (((size_t)pkt_len <= pos) || ((size_t)pkt_len < len)) {
/* length field of MQTT-SN packet seems to be invalid -> drop the pkt */ /* length field of MQTT-SN packet seems to be invalid -> drop the pkt */
return; return;
} }
@ -594,84 +617,13 @@ static void _on_data(asymcute_con_t *con, size_t pkt_len, sock_udp_ep_t *remote)
break; break;
} }
} }
void *_listener(void *arg)
{
asymcute_con_t *con = arg;
/* create a socket for this listener, using an ephemeral port */
sock_udp_ep_t local = SOCK_IPV6_EP_ANY;
if (sock_udp_create(&con->sock, &local, NULL, 0) != 0) {
LOG_ERROR("[asymcute] error creating listener socket\n");
return NULL;
} }
while (1) {
sock_udp_ep_t remote;
int n = sock_udp_recv(&con->sock, con->rxbuf, CONFIG_ASYMCUTE_BUFSIZE,
SOCK_NO_TIMEOUT, &remote);
if (n > 0) {
_on_data(con, (size_t)n, &remote);
}
}
/* should never be reached */
return NULL;
}
void *_handler(void *arg)
{
(void)arg;
event_queue_init(&_queue);
event_loop(&_queue);
/* should never be reached */
return NULL;
}
int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize,
char priority, asymcute_evt_cb_t callback)
{
/* make sure con is not running */
assert(con);
assert((priority > 0) && (priority < THREAD_PRIORITY_IDLE - 1));
assert(callback);
int ret = ASYMCUTE_OK;
/* make sure the connection context is not already used */
mutex_lock(&con->lock);
if (con->state != UNINITIALIZED) {
ret = ASYMCUTE_BUSY;
goto end;
}
/* initialize the connection context */
memset(con, 0, sizeof(asymcute_con_t));
random_bytes((uint8_t *)&con->last_id, 2);
event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con);
event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super);
con->keepalive_retry_cnt = CONFIG_ASYMCUTE_N_RETRY;
con->state = NOTCON;
con->user_cb = callback;
/* start listener thread */
thread_create(stack,
stacksize,
priority,
THREAD_CREATE_WOUT_YIELD,
_listener,
con,
"asymcute_listener");
end:
mutex_unlock(&con->lock);
return ret;
} }
void asymcute_handler_run(void) void asymcute_handler_run(void)
{ {
thread_create(_stack, sizeof(_stack), ASYMCUTE_HANDLER_PRIO, thread_create(_stack, sizeof(_stack), ASYMCUTE_HANDLER_PRIO,
0, _handler, NULL, "asymcute_main"); THREAD_CREATE_STACKTEST, _eventloop, NULL, "asymcute_main");
} }
int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name, int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name,
@ -723,7 +675,7 @@ bool asymcute_is_connected(const asymcute_con_t *con)
int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
sock_udp_ep_t *server, const char *cli_id, bool clean, sock_udp_ep_t *server, const char *cli_id, bool clean,
asymcute_will_t *will) asymcute_will_t *will, asymcute_evt_cb_t callback)
{ {
assert(con); assert(con);
assert(req); assert(req);
@ -744,7 +696,7 @@ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
/* check if the context is not already connected to any gateway */ /* check if the context is not already connected to any gateway */
mutex_lock(&con->lock); mutex_lock(&con->lock);
if (con->state != NOTCON) { if (con->state != NOTCON) {
ret = ASYMCUTE_GWERR; ret = ASYMCUTE_BUSY;
goto end; goto end;
} }
/* get mutual access to the request context */ /* get mutual access to the request context */
@ -753,10 +705,26 @@ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
goto end; goto end;
} }
/* prepare the connection context */ /* initialize the connection context */
memset(con, 0, sizeof(asymcute_con_t));
random_bytes((uint8_t *)&con->last_id, 2);
con->keepalive_retry_cnt = CONFIG_ASYMCUTE_N_RETRY;
event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con);
event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super);
con->user_cb = callback;
con->state = CONNECTING; con->state = CONNECTING;
strncpy(con->cli_id, cli_id, sizeof(con->cli_id)); strncpy(con->cli_id, cli_id, sizeof(con->cli_id));
memcpy(&con->server_ep, server, sizeof(con->server_ep));
/* create a socket for this listener, using an ephemeral port */
sock_udp_ep_t local = SOCK_IPV6_EP_ANY;
local.port = 0;
local.netif = server->netif;
if (sock_udp_create(&con->sock, &local, server, 0) != 0) {
con->state = NOTCON;
ret = ASYMCUTE_GWERR;
goto end;
}
sock_udp_event_init(&con->sock, &_queue, _on_pkt, con);
/* compile and send connect message */ /* compile and send connect message */
req->msg_id = 0; req->msg_id = 0;
@ -767,7 +735,10 @@ int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req,
byteorder_htobebufs(&req->data[4], CONFIG_ASYMCUTE_KEEPALIVE); byteorder_htobebufs(&req->data[4], CONFIG_ASYMCUTE_KEEPALIVE);
memcpy(&req->data[6], cli_id, id_len); memcpy(&req->data[6], cli_id, id_len);
req->data_len = (size_t)req->data[0]; req->data_len = (size_t)req->data[0];
_req_send(req, con, _on_con_timeout); ret = _req_send(req, con, _on_con_timeout);
if (ret != ASYMCUTE_OK) {
_disconnect(con, NOTCON);
}
end: end:
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
@ -801,7 +772,7 @@ int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req)
req->data[0] = 2; req->data[0] = 2;
req->data[1] = MQTTSN_DISCONNECT; req->data[1] = MQTTSN_DISCONNECT;
req->data_len = 2; req->data_len = 2;
_req_send(req, con, _on_discon_timeout); ret = _req_send(req, con, _on_discon_timeout);
end: end:
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
@ -855,7 +826,7 @@ int asymcute_register(asymcute_con_t *con, asymcute_req_t *req,
req->data_len = (pos + 5 + topic_len); req->data_len = (pos + 5 + topic_len);
/* send the request */ /* send the request */
_req_send(req, con, NULL); ret = _req_send(req, con, NULL);
end: end:
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
@ -917,10 +888,10 @@ int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req,
/* publish selected data */ /* publish selected data */
if (flags & MQTTSN_QOS_1) { if (flags & MQTTSN_QOS_1) {
_req_send(req, con, NULL); ret = _req_send(req, con, NULL);
} }
else { else {
_req_send_once(req, con); ret = _req_send_once(req, con);
} }
end: end:
@ -978,7 +949,7 @@ int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req,
/* send SUBSCRIBE message */ /* send SUBSCRIBE message */
_compile_sub_unsub(req, con, sub, MQTTSN_SUBSCRIBE); _compile_sub_unsub(req, con, sub, MQTTSN_SUBSCRIBE);
_req_send(req, con, _on_suback_timeout); ret = _req_send(req, con, _on_suback_timeout);
end: end:
mutex_unlock(&con->lock); mutex_unlock(&con->lock);
@ -1012,7 +983,7 @@ int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req,
/* prepare and send UNSUBSCRIBE message */ /* prepare and send UNSUBSCRIBE message */
_compile_sub_unsub(req, con, sub, MQTTSN_UNSUBSCRIBE); _compile_sub_unsub(req, con, sub, MQTTSN_UNSUBSCRIBE);
_req_send(req, con, NULL); ret = _req_send(req, con, NULL);
end: end:
mutex_unlock(&con->lock); mutex_unlock(&con->lock);