tests: mqtt: Add test to verify publish reception
Add MQTT test to verify PUBLISH message reception. Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
This commit is contained in:
parent
0f7698be8b
commit
b5fd15de0c
|
@ -0,0 +1,9 @@
|
|||
cmake_minimum_required(VERSION 3.8.2)
|
||||
include($ENV{ZEPHYR_BASE}/cmake/app/boilerplate.cmake NO_POLICY_SCOPE)
|
||||
project(mqtt_subscriber)
|
||||
|
||||
target_include_directories(app PRIVATE
|
||||
$ENV{ZEPHYR_BASE}/subsys/net/ip
|
||||
)
|
||||
FILE(GLOB app_sources src/*.c)
|
||||
target_sources(app PRIVATE ${app_sources})
|
|
@ -0,0 +1,34 @@
|
|||
CONFIG_NETWORKING=y
|
||||
CONFIG_NET_TCP=y
|
||||
CONFIG_ENTROPY_GENERATOR=y
|
||||
CONFIG_TEST_RANDOM_GENERATOR=y
|
||||
CONFIG_NET_LOG=y
|
||||
CONFIG_INIT_STACKS=y
|
||||
|
||||
CONFIG_NET_IPV6_RA_RDNSS=y
|
||||
CONFIG_NET_IF_UNICAST_IPV6_ADDR_COUNT=3
|
||||
CONFIG_NET_IF_MCAST_IPV6_ADDR_COUNT=2
|
||||
|
||||
CONFIG_STDOUT_CONSOLE=y
|
||||
|
||||
# Enable IPv6 support
|
||||
CONFIG_NET_IPV6=n
|
||||
# Enable IPv4 support
|
||||
CONFIG_NET_IPV4=y
|
||||
|
||||
# Enable the MQTT Lib
|
||||
CONFIG_MQTT_LIB=y
|
||||
|
||||
CONFIG_NET_CONFIG_SETTINGS=y
|
||||
CONFIG_NET_CONFIG_MY_IPV6_ADDR="2001:db8::1"
|
||||
CONFIG_NET_CONFIG_PEER_IPV6_ADDR="2001:db8::2"
|
||||
|
||||
CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1"
|
||||
CONFIG_NET_CONFIG_PEER_IPV4_ADDR="192.0.2.2"
|
||||
|
||||
CONFIG_MAIN_STACK_SIZE=2048
|
||||
|
||||
# For IPv6
|
||||
CONFIG_NET_BUF_DATA_SIZE=256
|
||||
|
||||
CONFIG_ZTEST=y
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright (c) 2017 Intel Corporation
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#ifndef __CONFIG_H__
|
||||
#define __CONFIG_H__
|
||||
|
||||
#ifdef CONFIG_NET_CONFIG_SETTINGS
|
||||
#ifdef CONFIG_NET_IPV6
|
||||
#define ZEPHYR_ADDR CONFIG_NET_CONFIG_MY_IPV6_ADDR
|
||||
#define SERVER_ADDR CONFIG_NET_CONFIG_PEER_IPV6_ADDR
|
||||
#else
|
||||
#define ZEPHYR_ADDR CONFIG_NET_CONFIG_MY_IPV4_ADDR
|
||||
#define SERVER_ADDR CONFIG_NET_CONFIG_PEER_IPV4_ADDR
|
||||
#endif
|
||||
#else
|
||||
#ifdef CONFIG_NET_IPV6
|
||||
#define ZEPHYR_ADDR "2001:db8::1"
|
||||
#define SERVER_ADDR "2001:db8::2"
|
||||
#else
|
||||
#define ZEPHYR_ADDR "192.168.1.101"
|
||||
#define SERVER_ADDR "192.168.1.10"
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#define SERVER_PORT 1883
|
||||
|
||||
#define APP_SLEEP_MSECS 500
|
||||
#define APP_TX_RX_TIMEOUT 300
|
||||
#define APP_NET_INIT_TIMEOUT 1000
|
||||
|
||||
#define APP_CONNECT_TRIES 10
|
||||
|
||||
#define APP_MAX_ITERATIONS 100
|
||||
|
||||
#define MQTT_CLIENTID "zephyr_pubsub"
|
||||
|
||||
#endif
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2017 Intel Corporation
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#include <ztest.h>
|
||||
|
||||
extern void test_mqtt_connect(void);
|
||||
extern void test_mqtt_subscribe(void);
|
||||
extern void test_mqtt_publish_short(void);
|
||||
extern void test_mqtt_publish_long(void);
|
||||
extern void test_mqtt_unsubscribe(void);
|
||||
extern void test_mqtt_disconnect(void);
|
||||
|
||||
void test_main(void)
|
||||
{
|
||||
ztest_test_suite(mqtt_test,
|
||||
ztest_unit_test(test_mqtt_connect),
|
||||
ztest_unit_test(test_mqtt_subscribe),
|
||||
ztest_unit_test(test_mqtt_publish_short),
|
||||
ztest_unit_test(test_mqtt_publish_long),
|
||||
ztest_unit_test(test_mqtt_unsubscribe),
|
||||
ztest_unit_test(test_mqtt_disconnect));
|
||||
ztest_run_test_suite(mqtt_test);
|
||||
}
|
|
@ -0,0 +1,462 @@
|
|||
/*
|
||||
* Copyright (c) 2017 Intel Corporation
|
||||
*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
#define LOG_MODULE_NAME net_test
|
||||
#define NET_LOG_LEVEL LOG_LEVEL_WRN
|
||||
|
||||
#include <ztest.h>
|
||||
#include <net/socket.h>
|
||||
#include <net/mqtt.h>
|
||||
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include "../../mqtt_pubsub/src/config.h"
|
||||
|
||||
#define BUFFER_SIZE 32
|
||||
|
||||
static u8_t rx_buffer[BUFFER_SIZE];
|
||||
static u8_t tx_buffer[BUFFER_SIZE];
|
||||
static struct mqtt_client client_ctx;
|
||||
static struct sockaddr broker;
|
||||
static struct pollfd fds[1];
|
||||
static int nfds;
|
||||
static bool connected;
|
||||
static int payload_left;
|
||||
static const u8_t *payload;
|
||||
|
||||
static const u8_t payload_short[] = "Short payload";
|
||||
|
||||
/* Generated by http://www.lipsum.com/
|
||||
* 2 paragraphs, 171 words, 1210 bytes of Lorem Ipsum
|
||||
*/
|
||||
static const u8_t payload_long[] =
|
||||
"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse "
|
||||
"eleifend odio sit amet scelerisque tincidunt. In orci ligula, egestas "
|
||||
"ut neque sit amet, fringilla malesuada dolor. Fusce rhoncus nunc in "
|
||||
"lacus tincidunt lobortis. Donec aliquam lectus gravida fermentum "
|
||||
"egestas. Curabitur eu ex pretium, dapibus massa in, pretium tellus. "
|
||||
"Suspendisse ac efficitur magna, ut convallis nisl. Duis sed sapien "
|
||||
"odio. Aliquam efficitur sed tellus sit amet eleifend. Sed facilisis "
|
||||
"ligula aliquam erat ornare hendrerit. Aenean tincidunt nunc a nulla "
|
||||
"vestibulum mollis. Vivamus fringilla euismod nisi sit amet malesuada. "
|
||||
"Vivamus consequat ultricies metus sed feugiat. Aenean malesuada "
|
||||
"cursus sem."
|
||||
"\n"
|
||||
"Vestibulum tempor diam et aliquam tristique. Nullam condimentum felis "
|
||||
"et convallis finibus. Aliquam erat volutpat. Nam et blandit tortor. "
|
||||
"Nullam pharetra arcu aliquam, suscipit mi eu, faucibus lacus. "
|
||||
"Pellentesque eleifend nulla sit amet purus maximus, et elementum "
|
||||
"lectus fringilla. Praesent consectetur diam eget tellus molestie "
|
||||
"tempor. Class aptent taciti sociosqu ad litora torquent per conubia "
|
||||
"nostra, per inceptos himenaeos. Nam tincidunt urna augue, nec cursus "
|
||||
"arcu eleifend nec. Donec semper tellus in leo nullam."
|
||||
"\n";
|
||||
|
||||
static void broker_init(void)
|
||||
{
|
||||
#if defined(CONFIG_NET_IPV6)
|
||||
struct sockaddr_in6 *broker6 = net_sin6(&broker);
|
||||
|
||||
broker6->sin6_family = AF_INET6;
|
||||
broker6->sin6_port = htons(SERVER_PORT);
|
||||
inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
|
||||
#else
|
||||
struct sockaddr_in *broker4 = net_sin(&broker);
|
||||
|
||||
broker4->sin_family = AF_INET;
|
||||
broker4->sin_port = htons(SERVER_PORT);
|
||||
inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
|
||||
#endif
|
||||
}
|
||||
|
||||
static void prepare_fds(struct mqtt_client *client)
|
||||
{
|
||||
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
|
||||
fds[0].fd = client->transport.tcp.sock;
|
||||
}
|
||||
|
||||
fds[0].events = ZSOCK_POLLIN;
|
||||
nfds = 1;
|
||||
}
|
||||
|
||||
static void clear_fds(void)
|
||||
{
|
||||
nfds = 0;
|
||||
}
|
||||
|
||||
static void wait(int timeout)
|
||||
{
|
||||
if (nfds > 0) {
|
||||
if (poll(fds, nfds, timeout) < 0) {
|
||||
TC_PRINT("poll error: %d\n", errno);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void publish_handler(struct mqtt_client *const client,
|
||||
const struct mqtt_evt *evt)
|
||||
{
|
||||
int rc;
|
||||
u8_t buf[16];
|
||||
u32_t offset = 0;
|
||||
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT PUBLISH error: %d\n", evt->result);
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (payload_left != evt->param.publish.message.payload.len) {
|
||||
TC_PRINT("Invalid payload length: %d\n",
|
||||
evt->param.publish.message.payload.len);
|
||||
goto error;
|
||||
}
|
||||
|
||||
while (payload_left > 0) {
|
||||
wait(APP_SLEEP_MSECS);
|
||||
rc = mqtt_read_publish_payload(client, buf, sizeof(buf));
|
||||
if (rc <= 0 && rc != -EAGAIN) {
|
||||
TC_PRINT("Failed to receive payload, err: %d\n", -rc);
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (memcmp(payload + offset, buf, rc) != 0) {
|
||||
TC_PRINT("Invalid payload content\n");
|
||||
goto error;
|
||||
}
|
||||
|
||||
payload_left -= rc;
|
||||
offset += rc;
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
payload_left = -1;
|
||||
}
|
||||
|
||||
void mqtt_evt_handler(struct mqtt_client *const client,
|
||||
const struct mqtt_evt *evt)
|
||||
{
|
||||
int err;
|
||||
|
||||
switch (evt->type) {
|
||||
case MQTT_EVT_CONNACK:
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT connect failed %d\n", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
connected = true;
|
||||
TC_PRINT("[%s:%d] MQTT_EVT_CONNACK: Connected!\n",
|
||||
__func__, __LINE__);
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_DISCONNECT:
|
||||
TC_PRINT("[%s:%d] MQTT_EVT_DISCONNECT: disconnected %d\n",
|
||||
__func__, __LINE__, evt->result);
|
||||
|
||||
connected = false;
|
||||
payload_left = -1;
|
||||
clear_fds();
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_PUBLISH:
|
||||
publish_handler(client, evt);
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_PUBACK:
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT PUBACK error %d\n", evt->result);
|
||||
payload_left = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
TC_PRINT("[%s:%d] MQTT_EVT_PUBACK packet id: %u\n",
|
||||
__func__, __LINE__, evt->param.puback.message_id);
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_PUBREC:
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT PUBREC error %d\n", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
TC_PRINT("[%s:%d] MQTT_EVT_PUBREC packet id: %u\n",
|
||||
__func__, __LINE__, evt->param.pubrec.message_id);
|
||||
|
||||
const struct mqtt_pubrel_param rel_param = {
|
||||
.message_id = evt->param.pubrec.message_id
|
||||
};
|
||||
|
||||
err = mqtt_publish_qos2_release(client, &rel_param);
|
||||
if (err != 0) {
|
||||
TC_PRINT("Failed to send MQTT PUBREL: %d\n",
|
||||
err);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_PUBCOMP:
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT PUBCOMP error %d\n", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
TC_PRINT("[%s:%d] MQTT_EVT_PUBCOMP packet id: %u\n",
|
||||
__func__, __LINE__, evt->param.pubcomp.message_id);
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_SUBACK:
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT SUBACK error %d\n", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
TC_PRINT("[%s:%d] items: %d packet id: %u\n", __func__,
|
||||
__LINE__, evt->param.suback.return_codes.len,
|
||||
evt->param.suback.message_id);
|
||||
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_EVT_UNSUBACK:
|
||||
if (evt->result != 0) {
|
||||
TC_PRINT("MQTT UNSUBACK error %d\n", evt->result);
|
||||
break;
|
||||
}
|
||||
|
||||
TC_PRINT("[%s:%d] packet id: %u\n", __func__, __LINE__,
|
||||
evt->param.unsuback.message_id);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
TC_PRINT("[%s:%d] Invalid MQTT packet\n", __func__, __LINE__);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static char *get_mqtt_topic(void)
|
||||
{
|
||||
return "sensors";
|
||||
}
|
||||
|
||||
static void client_init(struct mqtt_client *client)
|
||||
{
|
||||
mqtt_client_init(client);
|
||||
|
||||
broker_init();
|
||||
|
||||
/* MQTT client configuration */
|
||||
client->broker = &broker;
|
||||
client->evt_cb = mqtt_evt_handler;
|
||||
client->client_id.utf8 = (u8_t *)MQTT_CLIENTID;
|
||||
client->client_id.size = strlen(MQTT_CLIENTID);
|
||||
client->password = NULL;
|
||||
client->user_name = NULL;
|
||||
client->protocol_version = MQTT_VERSION_3_1_1;
|
||||
client->transport.type = MQTT_TRANSPORT_NON_SECURE;
|
||||
|
||||
client->rx_buf = rx_buffer;
|
||||
client->rx_buf_size = sizeof(rx_buffer);
|
||||
client->tx_buf = tx_buffer;
|
||||
client->tx_buf_size = sizeof(tx_buffer);
|
||||
}
|
||||
|
||||
/* In this routine we block until the connected variable is 1 */
|
||||
static int try_to_connect(struct mqtt_client *client)
|
||||
{
|
||||
int rc, i = 0;
|
||||
|
||||
while (i++ < APP_CONNECT_TRIES && !connected) {
|
||||
|
||||
client_init(&client_ctx);
|
||||
|
||||
rc = mqtt_connect(client);
|
||||
if (rc != 0) {
|
||||
k_sleep(APP_SLEEP_MSECS);
|
||||
continue;
|
||||
}
|
||||
|
||||
prepare_fds(client);
|
||||
|
||||
wait(APP_SLEEP_MSECS);
|
||||
mqtt_input(client);
|
||||
|
||||
if (!connected) {
|
||||
mqtt_abort(client);
|
||||
}
|
||||
}
|
||||
|
||||
if (connected) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
static int test_connect(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = try_to_connect(&client_ctx);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
return TC_PASS;
|
||||
}
|
||||
|
||||
static int test_subscribe(void)
|
||||
{
|
||||
int rc;
|
||||
struct mqtt_topic topic;
|
||||
struct mqtt_subscription_list sub;
|
||||
|
||||
topic.topic.utf8 = get_mqtt_topic();
|
||||
topic.topic.size = strlen(topic.topic.utf8);
|
||||
topic.qos = MQTT_QOS_1_AT_LEAST_ONCE;
|
||||
sub.list = &topic;
|
||||
sub.list_count = 1;
|
||||
sub.message_id = sys_rand32_get();
|
||||
|
||||
rc = mqtt_subscribe(&client_ctx, &sub);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
wait(APP_SLEEP_MSECS);
|
||||
rc = mqtt_input(&client_ctx);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
return TC_PASS;
|
||||
}
|
||||
|
||||
static int test_publish(enum mqtt_qos qos)
|
||||
{
|
||||
int rc;
|
||||
struct mqtt_publish_param param;
|
||||
|
||||
payload_left = strlen(payload);
|
||||
|
||||
param.message.topic.qos = qos;
|
||||
param.message.topic.topic.utf8 = (u8_t *)get_mqtt_topic();
|
||||
param.message.topic.topic.size =
|
||||
strlen(param.message.topic.topic.utf8);
|
||||
param.message.payload.data = (u8_t *)payload;
|
||||
param.message.payload.len = payload_left;
|
||||
param.message_id = sys_rand32_get();
|
||||
param.dup_flag = 0;
|
||||
param.retain_flag = 0;
|
||||
|
||||
rc = mqtt_publish(&client_ctx, ¶m);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
while (payload_left > 0) {
|
||||
wait(APP_SLEEP_MSECS);
|
||||
rc = mqtt_input(&client_ctx);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
if (payload_left != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
return TC_PASS;
|
||||
}
|
||||
|
||||
static int test_unsubscribe(void)
|
||||
{
|
||||
int rc;
|
||||
struct mqtt_topic topic;
|
||||
struct mqtt_subscription_list unsub;
|
||||
|
||||
topic.topic.utf8 = get_mqtt_topic();
|
||||
topic.topic.size = strlen(topic.topic.utf8);
|
||||
unsub.list = &topic;
|
||||
unsub.list_count = 1;
|
||||
unsub.message_id = sys_rand32_get();
|
||||
|
||||
rc = mqtt_unsubscribe(&client_ctx, &unsub);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
wait(APP_SLEEP_MSECS);
|
||||
rc = mqtt_input(&client_ctx);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
return TC_PASS;
|
||||
}
|
||||
|
||||
static int test_disconnect(void)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = mqtt_disconnect(&client_ctx);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
wait(APP_SLEEP_MSECS);
|
||||
rc = mqtt_input(&client_ctx);
|
||||
if (rc != 0) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
if (connected) {
|
||||
return TC_FAIL;
|
||||
}
|
||||
|
||||
return TC_PASS;
|
||||
}
|
||||
|
||||
void test_mqtt_connect(void)
|
||||
{
|
||||
zassert_true(test_connect() == TC_PASS, NULL);
|
||||
}
|
||||
|
||||
void test_mqtt_subscribe(void)
|
||||
{
|
||||
zassert_true(test_subscribe() == TC_PASS, NULL);
|
||||
}
|
||||
|
||||
void test_mqtt_publish_short(void)
|
||||
{
|
||||
payload = payload_short;
|
||||
zassert_true(test_publish(MQTT_QOS_0_AT_MOST_ONCE) == TC_PASS, NULL);
|
||||
}
|
||||
|
||||
void test_mqtt_publish_long(void)
|
||||
{
|
||||
payload = payload_long;
|
||||
zassert_true(test_publish(MQTT_QOS_1_AT_LEAST_ONCE) == TC_PASS, NULL);
|
||||
}
|
||||
|
||||
void test_mqtt_unsubscribe(void)
|
||||
{
|
||||
zassert_true(test_unsubscribe() == TC_PASS, NULL);
|
||||
}
|
||||
|
||||
void test_mqtt_disconnect(void)
|
||||
{
|
||||
zassert_true(test_disconnect() == TC_PASS, NULL);
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
common:
|
||||
depends_on: netif
|
||||
platform_whitelist: native_posix qemu_x86 qemu_cortex_m3
|
||||
tests:
|
||||
net.mqtt.pubsub:
|
||||
min_ram: 16
|
||||
tags: net mqtt
|
||||
harness: net
|
Loading…
Reference in New Issue