Merge pull request #13947 from benpicco/riot-bus_multibus

core/msg: add message bus
This commit is contained in:
Koen Zandberg 2020-04-29 12:24:12 +02:00 committed by GitHub
commit 2af3ea586a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 512 additions and 10 deletions

View File

@ -1,5 +1,5 @@
# exclude submodule sources from *.c wildcard source selection
SRC := $(filter-out init.c mbox.c msg.c panic.c thread_flags.c,$(wildcard *.c))
SRC := $(filter-out init.c mbox.c msg.c msg_bus.c panic.c thread_flags.c,$(wildcard *.c))
# enable submodules
SUBMODULES := 1

214
core/include/msg_bus.h Normal file
View File

@ -0,0 +1,214 @@
/*
* Copyright (C) 2020 ML!PA Consulting GmbH
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup core_msg
*
* @experimental
*
* @{
*
* @file
* @brief Messaging Bus API for inter process message broadcast.
*
* @author Benjamin Valentin <benjamin.valentin@ml-pa.com>
*/
#ifndef MSG_BUS_H
#define MSG_BUS_H
#include <assert.h>
#include <stdint.h>
#include "list.h"
#include "msg.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* @brief A message bus is just a list of subscribers.
*/
typedef struct {
list_node_t subs; /**< List of subscribers to the bus */
uint16_t id; /**< Message Bus ID */
} msg_bus_t;
/**
* @brief Message bus subscriber entry.
* Should not be modified by the user.
*/
typedef struct {
list_node_t next; /**< next subscriber */
uint32_t event_mask; /**< Bitmask of event classes */
kernel_pid_t pid; /**< Subscriber PID */
} msg_bus_entry_t;
/**
* @brief Initialize a message bus.
*
* Must be called by the owner of a ``msg_bus_t`` struct.
*
* Message busses are considered to be long-running and must be
* created before any threads can attach to them.
*
* There can be a maximum number of 2047 busses in total.
*/
void msg_bus_init(msg_bus_t *bus);
/**
* @brief Get the message type of a message bus message.
*
* The `type` field of the`msg_t` also encodes the message bus ID,
* so use this function to get the real 5 bit message type.
*
* @param[in] msg A message that was received over a bus
*
* @return The message type
*/
static inline uint8_t msg_bus_get_type(const msg_t *msg)
{
return msg->type & 0x1F;
}
/**
* @brief Check if a message originates from a certain bus
*
* If a thread is attached to multiple busses, this function can be used
* to determine if a message originated from a certain bus.
*
* @param[in] bus The bus to check for
* @param[in] msg The received message
*
* @return True if the messages @p m was sent over @p bus
* False otherwise.
*/
static inline bool msg_is_from_bus(const msg_bus_t *bus, const msg_t *msg)
{
return bus->id == (msg->type >> 5);
}
/**
* @brief Attach a thread to a message bus.
*
* This attaches a message bus subscriber entry to a message bus.
* Subscribe to events on the bus using @ref msg_bus_detach.
* The thread will then receive events with a matching type that are
* posted on the bus.
*
* Events can be received with @ref msg_receive.
*
* @param[in] bus The message bus to attach to
* @param[in] entry Message bus subscriber entry
*/
void msg_bus_attach(msg_bus_t *bus, msg_bus_entry_t *entry);
/**
* @brief Remove a thread from a message bus.
*
* This removes the calling thread from the message bus.
*
* @note Call this function before the thread terminates.
*
* @param[in] bus The message bus from which to detach
* @param[in] entry Message bus subscriber entry
*/
void msg_bus_detach(msg_bus_t *bus, msg_bus_entry_t *entry);
/**
* @brief Get the message bus entry for the current thread.
*
* Traverse the message bus to find the subscriber entry for the
* current thread.
*
* @param[in] bus The message bus to seach
*
* @return The subscriber entry for the current thread.
* NULL if the thread is not attached to @p bus.
*/
msg_bus_entry_t *msg_bus_get_entry(msg_bus_t *bus);
/**
* @brief Subscribe to an event on the message bus.
*
* @pre The @p entry has been attached to a bus with @ref msg_bus_attach.
*
* @param[in] entry The message bus entry
* @param[in] type The event type to subscribe to (range: 031)
*/
static inline void msg_bus_subscribe(msg_bus_entry_t *entry, uint8_t type)
{
assert(type < 32);
entry->event_mask |= (1 << type);
}
/**
* @brief Unsubscribe from an event on the message bus.
*
* @pre The @p entry has been attached to a bus with @ref msg_bus_attach.
*
* @param[in] entry The message bus entry
* @param[in] type The event type to unsubscribe (range: 031)
*/
static inline void msg_bus_unsubscribe(msg_bus_entry_t *entry, uint8_t type)
{
assert(type < 32);
entry->event_mask &= ~(1 << type);
}
/**
* @brief Post a pre-assembled message to a bus.
*
* This function sends a message to all threads listening on the bus which are
* listening for messages with the message type of @p m.
*
* It behaves identical to @see msg_bus_post, but sends a pre-defined message.
*
* @note The message is expected to have the event ID encoded in the lower 5 bits
* and the bus ID encoded in the upper 11 bits of the message type.
*
* @param[in] m The message to post the bus
* @param[in] bus The message bus to post the message on
*
* @return The number of threads the message was sent to.
*/
int msg_send_bus(msg_t *m, msg_bus_t *bus);
/**
* @brief Post a message to a bus.
*
* This function sends a message to all threads listening on the bus which are
* listening for messages of @p type.
*
* It is safe to call this function from interrupt context.
*
* @param[in] bus The message bus to post this on
* @param[in] type The event type (range: 031)
* @param[in] arg Optional event parameter
*
* @return The number of threads the event was posted to.
*/
static inline int msg_bus_post(msg_bus_t *bus, uint8_t type, char *arg)
{
assert(type < 32);
msg_t m = {
.type = type | ((bus->id) << 5),
.content.ptr = arg,
};
return msg_send_bus(&m, bus);
}
#ifdef __cplusplus
}
#endif
#endif /* MSG_BUS_H */
/** @} */

View File

@ -25,6 +25,7 @@
#include <assert.h>
#include "sched.h"
#include "msg.h"
#include "msg_bus.h"
#include "list.h"
#include "thread.h"
#if MODULE_CORE_THREAD_FLAGS
@ -95,7 +96,7 @@ static int _msg_send(msg_t *m, kernel_pid_t target_pid, bool block,
m->sender_pid = sched_active_pid;
if (target == NULL) {
DEBUG("msg_send(): target thread does not exist\n");
DEBUG("msg_send(): target thread %d does not exist\n", target_pid);
irq_restore(state);
return -1;
}
@ -187,41 +188,85 @@ int msg_send_to_self(msg_t *m)
return res;
}
int msg_send_int(msg_t *m, kernel_pid_t target_pid)
static int _msg_send_oneway(msg_t *m, kernel_pid_t target_pid)
{
#ifdef DEVELHELP
if (!pid_is_valid(target_pid)) {
DEBUG("msg_send(): target_pid is invalid, continuing anyways\n");
DEBUG("%s: target_pid is invalid, continuing anyways\n", __func__);
}
#endif /* DEVELHELP */
thread_t *target = (thread_t *)sched_threads[target_pid];
if (target == NULL) {
DEBUG("msg_send_int(): target thread does not exist\n");
DEBUG("%s: target thread %d does not exist\n", __func__, target_pid);
return -1;
}
m->sender_pid = KERNEL_PID_ISR;
if (target->status == STATUS_RECEIVE_BLOCKED) {
DEBUG("msg_send_int: Direct msg copy from %" PRIkernel_pid " to %"
PRIkernel_pid ".\n", thread_getpid(), target_pid);
DEBUG("%s: Direct msg copy from %" PRIkernel_pid " to %"
PRIkernel_pid ".\n", __func__, thread_getpid(), target_pid);
/* copy msg to target */
msg_t *target_message = (msg_t *)target->wait_data;
*target_message = *m;
sched_set_status(target, STATUS_PENDING);
/* Interrupts are disabled here, we can set / re-use
sched_context_switch_request. */
sched_context_switch_request = 1;
return 1;
}
else {
DEBUG("msg_send_int: Receiver not waiting.\n");
DEBUG("%s: Receiver not waiting.\n", __func__);
return (queue_msg(target, m));
}
}
int msg_send_int(msg_t *m, kernel_pid_t target_pid)
{
int res;
m->sender_pid = KERNEL_PID_ISR;
res = _msg_send_oneway(m, target_pid);
return res;
}
int msg_send_bus(msg_t *m, msg_bus_t *bus)
{
const bool in_irq = irq_is_in();
const uint32_t event_mask = (1 << (m->type & 0x1F));
int count = 0;
m->sender_pid = in_irq ? KERNEL_PID_ISR : sched_active_pid;
unsigned state = irq_disable();
for (list_node_t *e = bus->subs.next; e; e = e->next) {
msg_bus_entry_t *subscriber = container_of(e, msg_bus_entry_t, next);
if ((subscriber->event_mask & event_mask) == 0) {
continue;
}
if (_msg_send_oneway(m, subscriber->pid) > 0) {
++count;
}
}
irq_restore(state);
if (sched_context_switch_request && !in_irq) {
thread_yield_higher();
}
return count;
}
int msg_send_receive(msg_t *m, msg_t *reply, kernel_pid_t target_pid)
{
assert(sched_active_pid != target_pid);

73
core/msg_bus.c Normal file
View File

@ -0,0 +1,73 @@
/*
* Copyright (C) 2020 ML!PA Consulting GmbH
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup core_msg
*
* @{
*
* @file
* @brief Messaging Bus API for inter process message broadcast.
*
* @author Benjamin Valentin <benjamin.valentin@ml-pa.com>
*
* @}
*/
#include "irq.h"
#include "msg_bus.h"
#include "thread.h"
void msg_bus_init(msg_bus_t *bus)
{
static uint16_t bus_count;
bus->subs.next = NULL;
bus->id = bus_count++;
}
void msg_bus_attach(msg_bus_t *bus, msg_bus_entry_t *entry)
{
unsigned state;
entry->next.next = NULL;
entry->event_mask = 0;
entry->pid = sched_active_pid;
state = irq_disable();
list_add(&bus->subs, &entry->next);
irq_restore(state);
}
void msg_bus_detach(msg_bus_t *bus, msg_bus_entry_t *entry)
{
unsigned state;
state = irq_disable();
list_remove(&bus->subs, &entry->next);
irq_restore(state);
}
msg_bus_entry_t *msg_bus_get_entry(msg_bus_t *bus)
{
msg_bus_entry_t *s = NULL;
unsigned state = irq_disable();
for (list_node_t *e = bus->subs.next; e; e = e->next) {
msg_bus_entry_t *subscriber = container_of(e, msg_bus_entry_t, next);
if (subscriber->pid == sched_active_pid) {
s = subscriber;
break;
}
}
irq_restore(state);
return s;
}

View File

@ -0,0 +1,5 @@
include ../Makefile.tests_common
USEMODULE += core_msg_bus
include $(RIOTBASE)/Makefile.include

View File

@ -0,0 +1,10 @@
BOARD_INSUFFICIENT_MEMORY := \
arduino-duemilanove \
arduino-leonardo \
arduino-nano \
arduino-uno \
atmega328p \
nucleo-f031k6 \
nucleo-f042k6 \
stm32f030f4-demo \
#

129
tests/thread_msg_bus/main.c Normal file
View File

@ -0,0 +1,129 @@
/*
* Copyright (C) 2020 ML!PA Consulting GmbH
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup tests
* @{
*
* @file
* @brief Message bus test application
*
* @author Benjamin Valentin <benjamin.valentin@ml-pa.com>
*
* @}
*/
#include <stdio.h>
#include <inttypes.h>
#include "thread.h"
#include "msg.h"
#include "msg_bus.h"
char t1_stack[THREAD_STACKSIZE_MAIN];
char t2_stack[THREAD_STACKSIZE_MAIN];
char t3_stack[THREAD_STACKSIZE_MAIN];
kernel_pid_t p_main, p1, p2, p3;
void *thread1(void *arg)
{
msg_t msg;
msg_bus_entry_t sub;
puts("THREAD 1 start");
msg_bus_attach(arg, &sub);
msg_bus_subscribe(&sub, 23);
msg_bus_subscribe(&sub, 24);
msg_receive(&msg);
/* check if the message came from the right bus */
assert(msg_is_from_bus(arg, &msg));
printf("T1 recv: %s (type=%d)\n",
(char*) msg.content.ptr, msg_bus_get_type(&msg));
msg_bus_detach(arg, &sub);
return NULL;
}
void *thread2(void *arg)
{
msg_t msg;
msg_bus_entry_t sub;
puts("THREAD 2 start");
msg_bus_attach(arg, &sub);
msg_bus_subscribe(&sub, 24);
msg_receive(&msg);
/* check if the message came from the right bus */
assert(msg_is_from_bus(arg, &msg));
printf("T2 recv: %s (type=%d)\n",
(char*) msg.content.ptr, msg_bus_get_type(&msg));
msg_bus_detach(arg, &sub);
return NULL;
}
void *thread3(void *arg)
{
msg_t msg;
msg_bus_entry_t sub;
puts("THREAD 3 start");
msg_bus_attach(arg, &sub);
msg_bus_subscribe(&sub, 23);
msg_receive(&msg);
/* check if the message came from the right bus */
assert(msg_is_from_bus(arg, &msg));
printf("T3 recv: %s (type=%d)\n",
(char*) msg.content.ptr, msg_bus_get_type(&msg));
msg_bus_detach(arg, &sub);
return NULL;
}
int main(void)
{
msg_bus_t my_bus;
msg_bus_init(&my_bus);
p_main = sched_active_pid;
p1 = thread_create(t1_stack, sizeof(t1_stack), THREAD_PRIORITY_MAIN - 3,
THREAD_CREATE_STACKTEST, thread1, &my_bus, "nr1");
p2 = thread_create(t2_stack, sizeof(t2_stack), THREAD_PRIORITY_MAIN - 2,
THREAD_CREATE_STACKTEST, thread2, &my_bus, "nr2");
p3 = thread_create(t3_stack, sizeof(t3_stack), THREAD_PRIORITY_MAIN - 1,
THREAD_CREATE_STACKTEST, thread3, &my_bus, "nr3");
puts("THREADS CREATED");
const char hello[] = "Hello Threads!";
for (int id = 22; id < 25; ++id) {
int woken = msg_bus_post(&my_bus, id, (void*)hello);
printf("Posted event %d to %d threads\n", id, woken);
}
puts("SUCCESS");
return 0;
}

View File

@ -0,0 +1,26 @@
#!/usr/bin/env python3
import sys
from testrunner import run
def testfunc(child):
child.expect_exact('THREAD 1 start')
child.expect_exact('THREAD 2 start')
child.expect_exact('THREAD 3 start')
child.expect_exact('THREADS CREATED')
child.expect_exact('Posted event 22 to 0 threads')
child.expect_exact('T1 recv: Hello Threads! (type=23)')
child.expect_exact('T3 recv: Hello Threads! (type=23)')
child.expect_exact('Posted event 23 to 2 threads')
child.expect_exact('T2 recv: Hello Threads! (type=24)')
child.expect_exact('Posted event 24 to 1 threads')
child.expect_exact('SUCCESS')
if __name__ == "__main__":
sys.exit(run(testfunc))