448 lines
9.6 KiB
C
448 lines
9.6 KiB
C
/*
|
|
* Copyright (c) 2017 Intel Corporation
|
|
*
|
|
* SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
#include <logging/log.h>
|
|
LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);
|
|
|
|
#include <zephyr.h>
|
|
#include <net/socket.h>
|
|
#include <net/mqtt.h>
|
|
|
|
#include <random/rand32.h>
|
|
#include <misc/printk.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
|
|
#include "config.h"
|
|
|
|
/* Buffers for MQTT client. */
|
|
static u8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
|
|
static u8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
|
|
|
|
/* The mqtt client struct */
|
|
static struct mqtt_client client_ctx;
|
|
|
|
/* MQTT Broker details. */
|
|
static struct sockaddr_storage broker;
|
|
#if defined(CONFIG_MQTT_LIB_SOCKS)
|
|
static struct sockaddr_storage socks5_proxy;
|
|
#endif
|
|
|
|
static struct pollfd fds[1];
|
|
static int nfds;
|
|
|
|
static bool connected;
|
|
|
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
|
|
|
#include "test_certs.h"
|
|
|
|
#define TLS_SNI_HOSTNAME "localhost"
|
|
#define APP_CA_CERT_TAG 1
|
|
#define APP_PSK_TAG 2
|
|
|
|
static sec_tag_t m_sec_tags[] = {
|
|
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
|
|
APP_CA_CERT_TAG,
|
|
#endif
|
|
#if defined(MBEDTLS_KEY_EXCHANGE__SOME__PSK_ENABLED)
|
|
APP_PSK_TAG,
|
|
#endif
|
|
};
|
|
|
|
static int tls_init(void)
|
|
{
|
|
int err = -EINVAL;
|
|
|
|
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
|
|
err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
|
|
ca_certificate, sizeof(ca_certificate));
|
|
if (err < 0) {
|
|
LOG_ERR("Failed to register public certificate: %d", err);
|
|
return err;
|
|
}
|
|
#endif
|
|
|
|
#if defined(MBEDTLS_KEY_EXCHANGE__SOME__PSK_ENABLED)
|
|
err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK,
|
|
client_psk, sizeof(client_psk));
|
|
if (err < 0) {
|
|
LOG_ERR("Failed to register PSK: %d", err);
|
|
return err;
|
|
}
|
|
|
|
err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID,
|
|
client_psk_id, sizeof(client_psk_id) - 1);
|
|
if (err < 0) {
|
|
LOG_ERR("Failed to register PSK ID: %d", err);
|
|
}
|
|
#endif
|
|
|
|
return err;
|
|
}
|
|
|
|
#endif /* CONFIG_MQTT_LIB_TLS */
|
|
|
|
static void prepare_fds(struct mqtt_client *client)
|
|
{
|
|
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
|
|
fds[0].fd = client->transport.tcp.sock;
|
|
}
|
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
|
else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
|
|
fds[0].fd = client->transport.tls.sock;
|
|
}
|
|
#endif
|
|
|
|
fds[0].events = ZSOCK_POLLIN;
|
|
nfds = 1;
|
|
}
|
|
|
|
static void clear_fds(void)
|
|
{
|
|
nfds = 0;
|
|
}
|
|
|
|
static void wait(int timeout)
|
|
{
|
|
if (nfds > 0) {
|
|
if (poll(fds, nfds, timeout) < 0) {
|
|
printk("poll error: %d\n", errno);
|
|
}
|
|
}
|
|
}
|
|
|
|
void mqtt_evt_handler(struct mqtt_client *const client,
|
|
const struct mqtt_evt *evt)
|
|
{
|
|
int err;
|
|
|
|
switch (evt->type) {
|
|
case MQTT_EVT_CONNACK:
|
|
if (evt->result != 0) {
|
|
printk("MQTT connect failed %d\n", evt->result);
|
|
break;
|
|
}
|
|
|
|
connected = true;
|
|
printk("[%s:%d] MQTT client connected!\n", __func__, __LINE__);
|
|
|
|
break;
|
|
|
|
case MQTT_EVT_DISCONNECT:
|
|
printk("[%s:%d] MQTT client disconnected %d\n", __func__,
|
|
__LINE__, evt->result);
|
|
|
|
connected = false;
|
|
clear_fds();
|
|
|
|
break;
|
|
|
|
case MQTT_EVT_PUBACK:
|
|
if (evt->result != 0) {
|
|
printk("MQTT PUBACK error %d\n", evt->result);
|
|
break;
|
|
}
|
|
|
|
printk("[%s:%d] PUBACK packet id: %u\n", __func__, __LINE__,
|
|
evt->param.puback.message_id);
|
|
|
|
break;
|
|
|
|
case MQTT_EVT_PUBREC:
|
|
if (evt->result != 0) {
|
|
printk("MQTT PUBREC error %d\n", evt->result);
|
|
break;
|
|
}
|
|
|
|
printk("[%s:%d] PUBREC packet id: %u\n", __func__, __LINE__,
|
|
evt->param.pubrec.message_id);
|
|
|
|
const struct mqtt_pubrel_param rel_param = {
|
|
.message_id = evt->param.pubrec.message_id
|
|
};
|
|
|
|
err = mqtt_publish_qos2_release(client, &rel_param);
|
|
if (err != 0) {
|
|
printk("Failed to send MQTT PUBREL: %d\n", err);
|
|
}
|
|
|
|
break;
|
|
|
|
case MQTT_EVT_PUBCOMP:
|
|
if (evt->result != 0) {
|
|
printk("MQTT PUBCOMP error %d\n", evt->result);
|
|
break;
|
|
}
|
|
|
|
printk("[%s:%d] PUBCOMP packet id: %u\n", __func__, __LINE__,
|
|
evt->param.pubcomp.message_id);
|
|
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
static char *get_mqtt_payload(enum mqtt_qos qos)
|
|
{
|
|
#if APP_BLUEMIX_TOPIC
|
|
static char payload[30];
|
|
|
|
snprintk(payload, sizeof(payload), "{d:{temperature:%d}}",
|
|
(u8_t)sys_rand32_get());
|
|
#else
|
|
static char payload[] = "DOORS:OPEN_QoSx";
|
|
|
|
payload[strlen(payload) - 1] = '0' + qos;
|
|
#endif
|
|
|
|
return payload;
|
|
}
|
|
|
|
static char *get_mqtt_topic(void)
|
|
{
|
|
#if APP_BLUEMIX_TOPIC
|
|
return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID
|
|
"/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT;
|
|
#else
|
|
return "sensors";
|
|
#endif
|
|
}
|
|
|
|
static int publish(struct mqtt_client *client, enum mqtt_qos qos)
|
|
{
|
|
struct mqtt_publish_param param;
|
|
|
|
param.message.topic.qos = qos;
|
|
param.message.topic.topic.utf8 = (u8_t *)get_mqtt_topic();
|
|
param.message.topic.topic.size =
|
|
strlen(param.message.topic.topic.utf8);
|
|
param.message.payload.data = get_mqtt_payload(qos);
|
|
param.message.payload.len =
|
|
strlen(param.message.payload.data);
|
|
param.message_id = sys_rand32_get();
|
|
param.dup_flag = 0U;
|
|
param.retain_flag = 0U;
|
|
|
|
return mqtt_publish(client, ¶m);
|
|
}
|
|
|
|
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
|
|
|
|
#define PRINT_RESULT(func, rc) \
|
|
printk("[%s:%d] %s: %d <%s>\n", __func__, __LINE__, \
|
|
(func), rc, RC_STR(rc))
|
|
|
|
static void broker_init(void)
|
|
{
|
|
#if defined(CONFIG_NET_IPV6)
|
|
struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
|
|
|
|
broker6->sin6_family = AF_INET6;
|
|
broker6->sin6_port = htons(SERVER_PORT);
|
|
inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
|
|
|
|
#if defined(CONFIG_MQTT_LIB_SOCKS)
|
|
struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy;
|
|
|
|
proxy6->sin6_family = AF_INET6;
|
|
proxy6->sin6_port = htons(SOCKS5_PROXY_PORT);
|
|
inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr);
|
|
#endif
|
|
#else
|
|
struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
|
|
|
|
broker4->sin_family = AF_INET;
|
|
broker4->sin_port = htons(SERVER_PORT);
|
|
inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
|
|
|
|
#if defined(CONFIG_MQTT_LIB_SOCKS)
|
|
struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;
|
|
|
|
proxy4->sin_family = AF_INET;
|
|
proxy4->sin_port = htons(SOCKS5_PROXY_PORT);
|
|
inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr);
|
|
#endif
|
|
#endif
|
|
}
|
|
|
|
static void client_init(struct mqtt_client *client)
|
|
{
|
|
mqtt_client_init(client);
|
|
|
|
broker_init();
|
|
|
|
/* MQTT client configuration */
|
|
client->broker = &broker;
|
|
client->evt_cb = mqtt_evt_handler;
|
|
client->client_id.utf8 = (u8_t *)MQTT_CLIENTID;
|
|
client->client_id.size = strlen(MQTT_CLIENTID);
|
|
client->password = NULL;
|
|
client->user_name = NULL;
|
|
client->protocol_version = MQTT_VERSION_3_1_1;
|
|
|
|
/* MQTT buffers configuration */
|
|
client->rx_buf = rx_buffer;
|
|
client->rx_buf_size = sizeof(rx_buffer);
|
|
client->tx_buf = tx_buffer;
|
|
client->tx_buf_size = sizeof(tx_buffer);
|
|
|
|
/* MQTT transport configuration */
|
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
|
client->transport.type = MQTT_TRANSPORT_SECURE;
|
|
|
|
struct mqtt_sec_config *tls_config = &client->transport.tls.config;
|
|
|
|
tls_config->peer_verify = 2;
|
|
tls_config->cipher_list = NULL;
|
|
tls_config->sec_tag_list = m_sec_tags;
|
|
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
|
|
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
|
|
tls_config->hostname = TLS_SNI_HOSTNAME;
|
|
#else
|
|
tls_config->hostname = NULL;
|
|
#endif
|
|
|
|
#else
|
|
#if defined(CONFIG_MQTT_LIB_SOCKS)
|
|
client->transport.type = MQTT_TRANSPORT_SOCKS;
|
|
client->transport.socks5.proxy = &socks5_proxy;
|
|
#else
|
|
client->transport.type = MQTT_TRANSPORT_NON_SECURE;
|
|
#endif
|
|
#endif
|
|
}
|
|
|
|
/* In this routine we block until the connected variable is 1 */
|
|
static int try_to_connect(struct mqtt_client *client)
|
|
{
|
|
int rc, i = 0;
|
|
|
|
while (i++ < APP_CONNECT_TRIES && !connected) {
|
|
|
|
client_init(client);
|
|
|
|
rc = mqtt_connect(client);
|
|
if (rc != 0) {
|
|
PRINT_RESULT("mqtt_connect", rc);
|
|
k_sleep(APP_SLEEP_MSECS);
|
|
continue;
|
|
}
|
|
|
|
prepare_fds(client);
|
|
|
|
wait(APP_SLEEP_MSECS);
|
|
mqtt_input(client);
|
|
|
|
if (!connected) {
|
|
mqtt_abort(client);
|
|
}
|
|
}
|
|
|
|
if (connected) {
|
|
return 0;
|
|
}
|
|
|
|
return -EINVAL;
|
|
}
|
|
|
|
static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
|
|
{
|
|
s64_t remaining = timeout;
|
|
s64_t start_time = k_uptime_get();
|
|
int rc;
|
|
|
|
while (remaining > 0 && connected) {
|
|
wait(remaining);
|
|
|
|
rc = mqtt_live(client);
|
|
if (rc != 0) {
|
|
PRINT_RESULT("mqtt_live", rc);
|
|
return rc;
|
|
}
|
|
|
|
rc = mqtt_input(client);
|
|
if (rc != 0) {
|
|
PRINT_RESULT("mqtt_input", rc);
|
|
return rc;
|
|
}
|
|
|
|
remaining = timeout + start_time - k_uptime_get();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
#define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return; } }
|
|
#define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }
|
|
|
|
static void publisher(void)
|
|
{
|
|
int i, rc;
|
|
|
|
printk("attempting to connect: ");
|
|
rc = try_to_connect(&client_ctx);
|
|
PRINT_RESULT("try_to_connect", rc);
|
|
SUCCESS_OR_EXIT(rc);
|
|
|
|
i = 0;
|
|
while (i++ < APP_MAX_ITERATIONS && connected) {
|
|
rc = mqtt_ping(&client_ctx);
|
|
PRINT_RESULT("mqtt_ping", rc);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
|
|
PRINT_RESULT("mqtt_publish", rc);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
|
|
PRINT_RESULT("mqtt_publish", rc);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
|
|
PRINT_RESULT("mqtt_publish", rc);
|
|
SUCCESS_OR_BREAK(rc);
|
|
|
|
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
|
|
SUCCESS_OR_BREAK(rc);
|
|
}
|
|
|
|
rc = mqtt_disconnect(&client_ctx);
|
|
PRINT_RESULT("mqtt_disconnect", rc);
|
|
|
|
wait(APP_SLEEP_MSECS);
|
|
rc = mqtt_input(&client_ctx);
|
|
PRINT_RESULT("mqtt_input", rc);
|
|
|
|
printk("\nBye!\n");
|
|
}
|
|
|
|
void main(void)
|
|
{
|
|
#if defined(CONFIG_MQTT_LIB_TLS)
|
|
int rc;
|
|
|
|
rc = tls_init();
|
|
PRINT_RESULT("tls_init", rc);
|
|
#endif
|
|
|
|
while (1) {
|
|
publisher();
|
|
k_sleep(5000);
|
|
}
|
|
}
|