5.6. mqtt_client — MQTT client

MQTT is a publish-subscribe-based lightweight messaging protocol.

Note

This driver only implements the MQTT protocol, not the transport layer (normally TCP). That has to be set up using channels.

The driver works by running the processing code in a thread which communicate with the MQTT broker on one side using channels, and the application on the other side using queues.

This means the application has to set up appropriate channels, which is already ready to communicate with the MQTT server, e.g. using TCP, and the thread running the MQTT client.

Basic example of initializing MQTT over TCP (error checking left out for brevity).

static size_t on_publish(struct mqtt_client_t *client_p,
                         const char *topic_p,
                         void *chin_p,
                         size_t size)
{
    uint8_t buf[32];

    chan_read(chin_p, buf, size);
    buf[size] = '\0';
    std_printf(OSTR("on_publish: %s\r\n"), &buf[0]);

    return (0);
}
struct inet_addr_t remote_host_address;

inet_aton("127.0.0.1", &remote_host_address.ip);
remote_host_address.port = 1883;
socket_open_tcp(&server_sock);
socket_connect(&server_sock, &remote_host_address);

mqtt_client_init(&client,
                 "mqtt_client",
                 NULL,
                 &server_sock,
                 &server_sock,
                 on_publish,
                 NULL);

thrd_spawn(mqtt_client_main,
           &client,
           0,
           stack,
           sizeof(stack));

mqtt_client_connect(&client);

Source code: src/inet/mqtt_client.h, src/inet/mqtt_client.c

Test code: tst/inet/mqtt_client/main.c

Test coverage: src/inet/mqtt_client.c

Example code: examples/mqtt_client/main.c


Typedefs

typedef size_t (*mqtt_on_publish_t)(struct mqtt_client_t *client_p, const char *topic_p, void *chin_p, size_t size)

Prototype of the on-publish callback function.

Return
Number of bytes read from the input channel.
Parameters
  • client_p: The client.
  • topic_p: The received topic.
  • chin_p: The channel to read the value from.
  • size: Number of bytes of the value to read from chin_p.

typedef int (*mqtt_on_error_t)(struct mqtt_client_t *client_p, int error)

Prototype of the on-error callback function.

Return
zero(0) or nagative error code.
Parameters
  • client_p: The client.
  • error: The number of error that occured.

Enums

enum mqtt_client_state_t

Values:

mqtt_client_state_disconnected_t
mqtt_client_state_connected_t
mqtt_client_state_connecting_t
enum mqtt_qos_t

Quality of Service.

Values:

mqtt_qos_0_t = 0
mqtt_qos_1_t = 1
mqtt_qos_2_t = 2

Functions

int mqtt_client_init(struct mqtt_client_t *self_p, const char *name_p, struct log_object_t *log_object_p, void *chout_p, void *chin_p, mqtt_on_publish_t on_publish, mqtt_on_error_t on_error)

Initialize given MQTT client.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • name_p: Name of the thread.
  • log_object_p: Log object.
  • chout_p: Output channel for client to server packets.
  • chin_p: Input channel for server to client packets.
  • on_publish: On-publish callback function. Called when the server publishes a message.
  • on_error: On-error callback function. Called when an error occurs. If NULL, a default handler is used.

void *mqtt_client_main(void *arg_p)

MQTT client thread.

Return
Never returns.
Parameters
  • arg_p: MQTT client.

int mqtt_client_connect(struct mqtt_client_t *self_p)

Establish a connection to the server.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.

int mqtt_client_disconnect(struct mqtt_client_t *self_p)

Disconnect from the server.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.

int mqtt_client_ping(struct mqtt_client_t *self_p)

Send a ping request to the server (broker) and wait for the ping response.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.

int mqtt_client_publish(struct mqtt_client_t *self_p, struct mqtt_application_message_t *message_p)

Publish given topic.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • topic_p: Topic.
  • payload_p: Payload to publish. May be NULL.
  • payload_size: Number of bytes in the payload.

int mqtt_client_subscribe(struct mqtt_client_t *self_p, struct mqtt_application_message_t *message_p)

Subscribe to given message.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • message_p: The message to subscribe to. The payload part of the message is not used. The topic may use wildcards, given that the server supports it.

int mqtt_client_unsubscribe(struct mqtt_client_t *self_p, struct mqtt_application_message_t *message_p)

Unsubscribe from given message.

Return
zero(0) or negative error code.
Parameters
  • self_p: MQTT client.
  • message_p: The message to unsubscribe from. Only the topic in the message is used.

struct mqtt_client_t
#include <mqtt_client.h>

MQTT client.

Public Members

const char *name_p
struct log_object_t *log_object_p
int state
int type
void *data_p
struct mqtt_client_t::@52 mqtt_client_t::message
void *out_p
void *in_p
struct mqtt_client_t::@53 mqtt_client_t::transport
struct queue_t out
struct queue_t in
struct mqtt_client_t::@54 mqtt_client_t::control
mqtt_on_publish_t on_publish
mqtt_on_error_t on_error
struct mqtt_application_message_t
#include <mqtt_client.h>

MQTT application message.

Public Members

const char *buf_p
size_t size
struct mqtt_application_message_t::@55 mqtt_application_message_t::topic
const void *buf_p
struct mqtt_application_message_t::@56 mqtt_application_message_t::payload
mqtt_qos_t qos