5.6. mqtt_client — MQTT client

MQTT is a publish-subscribe-based lightweight messaging protocol.

The driver works by running the processing code in a thread which communicate with the MQTT broker on one side using channels, and the application on the other side using queues.

This means the application has to set up appropriate channels, which is already ready to communicate with the MQTT server, e.g. using TCP, and the thread running the MQTT client.

5.6.1. MQTT Notes

MQTT requires the client to send packets regularly, otherwise the broker will disconnect the client. Any packet from the client to the broker will fulfill the requirement, so a regular stream of publish messages will work or sending MQTT Ping packets as needed. The keep alive interval is set by the client on connect, and can be changed from the default in the mqtt_conn_options_t.

Note

The current client does not gracefully handle the underlying channel (e.g. TCP connection) to the broker disconnecting, and requires complete restart of the MQTT client to recover.

5.6.2. Basic MQTT client usage

Basic example of initializing MQTT over TCP (error checking left out for brevity).

static size_t on_publish(struct mqtt_client_t *client_p,
                         const char *topic_p,
                         void *chin_p,
                         size_t size)
{
    uint8_t buf[32];

    chan_read(chin_p, buf, size);
    buf[size] = '\0';
    std_printf(OSTR("on_publish: %s\r\n"), &buf[0]);

    return (0);
}
struct inet_addr_t remote_host_address;

inet_aton("127.0.0.1", &remote_host_address.ip);
remote_host_address.port = 1883;
socket_open_tcp(&server_sock);
socket_connect(&server_sock, &remote_host_address);

mqtt_client_init(&client,
                 "mqtt_client",
                 NULL,
                 &server_sock,
                 &server_sock,
                 on_publish,
                 NULL);

thrd_spawn(mqtt_client_main,
           &client,
           0,
           stack,
           sizeof(stack));

mqtt_client_connect(&client);

Source code: src/inet/mqtt_client.h, src/inet/mqtt_client.c

Test code: tst/inet/mqtt_client/main.c

Test coverage: src/inet/mqtt_client.c

Example code: examples/mqtt_client/main.c


Defines

DEFAULT_KEEP_ALIVE_S

Default MQTT keep alive interval in seconds.

Typedefs

typedef size_t (*mqtt_on_publish_t)(struct mqtt_client_t *client_p, const char *topic_p, void *chin_p, size_t size)

Prototype of the on-publish callback function.

Return
Number of bytes read from the input channel.
Parameters
  • client_p: The client.
  • topic_p: The received topic.
  • chin_p: The channel to read the value from.
  • size: Number of bytes of the value to read from chin_p.

typedef int (*mqtt_on_error_t)(struct mqtt_client_t *client_p, int error)

Prototype of the on-error callback function.

Return
zero(0) or nagative error code.
Parameters
  • client_p: The client.
  • error: The number of error that occured.

Enums

enum mqtt_client_state_t

Client states.

Values:

mqtt_client_state_disconnected_t
mqtt_client_state_connected_t
mqtt_client_state_connecting_t
enum mqtt_qos_t

Quality of Service.

Values:

mqtt_qos_0_t = 0
mqtt_qos_1_t = 1
mqtt_qos_2_t = 2

Functions

int mqtt_client_init(struct mqtt_client_t *self_p, const char *name_p, struct log_object_t *log_object_p, void *chout_p, void *chin_p, mqtt_on_publish_t on_publish, mqtt_on_error_t on_error)

Initialize given MQTT client.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • name_p: Name of the thread.
  • log_object_p: Log object.
  • chout_p: Output channel for client to server packets.
  • chin_p: Input channel for server to client packets.
  • on_publish: On-publish callback function. Called when the server publishes a message.
  • on_error: On-error callback function. Called when an error occurs. If NULL, a default handler is used.

void *mqtt_client_main(void *arg_p)

MQTT client thread.

Return
Never returns.
Parameters
  • arg_p: MQTT client.

int mqtt_client_connect(struct mqtt_client_t *self_p, struct mqtt_conn_options_t *options_p)

Establish a connection to the server.

Warning
If options_p is set, all members of the struct not explicitly used, must be set to zero. It is suggested to do this by calling memset(options_p, 0, sizeof(*options_p)); before setting needed variables.
Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • options_p: MQTT connection options. May be NULL. Pointer only need to be valid for the duration of the function call.

int mqtt_client_disconnect(struct mqtt_client_t *self_p)

Disconnect from the server.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.

int mqtt_client_ping(struct mqtt_client_t *self_p)

Send a ping request to the server (broker) and wait for the ping response.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.

int mqtt_client_publish(struct mqtt_client_t *self_p, struct mqtt_application_message_t *message_p)

Publish given topic.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • topic_p: Topic.
  • payload_p: Payload to publish. May be NULL.
  • payload_size: Number of bytes in the payload.

int mqtt_client_subscribe(struct mqtt_client_t *self_p, struct mqtt_application_message_t *message_p)

Subscribe to given message.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • message_p: The message to subscribe to. The payload part of the message is not used. The topic may use wildcards, given that the server supports it.

int mqtt_client_unsubscribe(struct mqtt_client_t *self_p, struct mqtt_application_message_t *message_p)

Unsubscribe from given message.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • message_p: The message to unsubscribe from. Only the topic in the message is used.

struct mqtt_string_t
#include <mqtt_client.h>

An MQTT style length string.

Public Members

const void *buf_p
size_t size
struct mqtt_client_t
#include <mqtt_client.h>

MQTT client.

Public Members

const char *name_p
struct log_object_t *log_object_p
int state
int type
void *data_p
struct mqtt_client_t::@70 mqtt_client_t::message
void *out_p
void *in_p
struct mqtt_client_t::@71 mqtt_client_t::transport
struct queue_t out
struct queue_t in
struct mqtt_client_t::@72 mqtt_client_t::control
mqtt_on_publish_t on_publish
mqtt_on_error_t on_error
struct mqtt_application_message_t
#include <mqtt_client.h>

MQTT application message.

Public Members

struct mqtt_string_t topic
struct mqtt_string_t payload
mqtt_qos_t qos
struct mqtt_conn_options_t
#include <mqtt_client.h>

MQTT Connection options.

Public Members

struct mqtt_string_t client_id

Should be 1-23 [0-9a-zA-Z] characters as per [MQTT-3.1.3-5].

struct mqtt_application_message_t will

Optional Last Will and Testament to be sent on unclean disconnect.

struct mqtt_string_t user_name

Optional user name for broker authentication.

struct mqtt_string_t password

Optional password for broker authentication.

int keep_alive_s

Keep alive interval in seconds.