diff --git a/sys/posix/sockets/posix_sockets.c b/sys/posix/sockets/posix_sockets.c index 2ba1658f37..5815d93707 100644 --- a/sys/posix/sockets/posix_sockets.c +++ b/sys/posix/sockets/posix_sockets.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -37,6 +38,10 @@ #include "net/sock/udp.h" #include "net/sock/tcp.h" +#if IS_USED(MODULE_SOCK_ASYNC) +#include "net/sock/async.h" +#endif + /* enough to create sockets both with socket() and accept() */ #define _ACTUAL_SOCKET_POOL_SIZE (SOCKET_POOL_SIZE + \ (SOCKET_POOL_SIZE * SOCKET_TCP_QUEUE_SIZE)) @@ -78,6 +83,9 @@ typedef struct { #ifdef MODULE_SOCK_TCP sock_tcp_t *queue_array; unsigned queue_array_len; +#endif +#if IS_USED(MODULE_SOCK_ASYNC) + atomic_uint available; #endif sock_tcp_ep_t local; /* to store bind before connect/listen */ } socket_t; @@ -105,6 +113,9 @@ static socket_t *_get_free_socket(void) { for (int i = 0; i < _ACTUAL_SOCKET_POOL_SIZE; i++) { if (_socket_pool[i].domain == AF_UNSPEC) { +#if IS_USED(MODULE_SOCK_ASYNC) + atomic_init(&_socket_pool[i].available, 0U); +#endif return &_socket_pool[i]; } } @@ -338,6 +349,68 @@ static const vfs_file_ops_t socket_ops = { .write = socket_write, }; +#if IS_USED(MODULE_SOCK_ASYNC) +static void _async_cb(void *sock, sock_async_flags_t type, + void *arg) +{ + socket_t *socket = arg; + + (void)sock; + if (type & SOCK_ASYNC_MSG_RECV) { + atomic_fetch_add(&socket->available, 1); +#if IS_USED(MODULE_POSIX_SELECT) + thread_flags_set(sock->socket->selecting_thread, + POSIX_SELECT_THREAD_FLAG); +#endif + } +} + +static void _sock_set_cb(socket_t *socket) +{ + union { + void (*sock_pool)(void *, sock_async_flags_t, void *); +#ifdef MODULE_SOCK_IP + sock_ip_cb_t ip; +#endif +#ifdef MODULE_SOCK_TCP + sock_tcp_cb_t tcp; + sock_tcp_queue_cb_t tcp_queue; +#endif +#ifdef MODULE_SOCK_UDP + sock_udp_cb_t udp; +#endif + } callback = { .sock_pool = _async_cb }; + + switch (socket->type) { +#ifdef MODULE_SOCK_IP + case SOCK_RAW: + sock_ip_set_cb(&socket->sock.ip, callback.ip, socket); + break; +#endif +#ifdef MODULE_SOCK_TCP + case SOCK_STREAM: + /* is a TCP client socket */ + if (socket->queue_array == NULL) { + sock_tcp_set_cb(&socket->sock.tcp.sock, callback.tcp, socket); + } + /* is a TCP listening socket */ + else { + sock_tcp_queue_set_cb(&socket->sock.tcp.queue, + callback.tcp_queue, socket); + } + break; +#endif +#ifdef MODULE_SOCK_UDP + case SOCK_DGRAM: + sock_udp_set_cb(&socket->sock->udp, callback.udp, socket); + break; +#endif + default: + break; + } +} +#endif + int socket(int domain, int type, int protocol) { int res = 0; @@ -472,6 +545,9 @@ int accept(int socket, struct sockaddr *restrict address, new_s->queue_array = NULL; new_s->queue_array_len = 0; new_s->sock = (socket_sock_t *)sock; +#if IS_USED(MODULE_SOCK_ASYNC) + _sock_set_cb(new_s); +#endif memset(&s->local, 0, sizeof(sock_tcp_ep_t)); } break; @@ -605,6 +681,10 @@ static int _bind_connect(socket_t *s, const struct sockaddr *address, return -1; } s->sock = sock; +#if IS_USED(MODULE_SOCK_ASYNC) + _sock_set_cb(s); +#endif + return 0; } @@ -804,6 +884,9 @@ int listen(int socket, int backlog) } if (res == 0) { s->sock = sock; +#if IS_USED(MODULE_SOCK_ASYNC) + _sock_set_cb(s); +#endif } else { errno = -res; @@ -878,6 +961,9 @@ static ssize_t socket_recvfrom(socket_t *s, void *restrict buffer, break; } if ((res >= 0) && (address != NULL) && (address_len != NULL)) { +#ifdef MODULE_SOCK_ASYNC + atomic_fetch_sub(&s->available, 1); +#endif switch (s->type) { #ifdef MODULE_SOCK_TCP case SOCK_STREAM: @@ -1071,6 +1157,18 @@ bool posix_socket_is(int fd) return IS_USED(MODULE_SOCK_ASYNC) && (_get_socket(fd) != NULL); } +unsigned posix_socket_avail(int fd) +{ +#if IS_USED(MODULE_SOCK_ASYNC) + socket_t *socket = _get_socket(fd); + + return (socket != NULL) ? atomic_load(&socket->available) : 0U; +#else + (void)fd; + return 0U; +#endif +} + /** * @} */