Skip to main content
Docs: template-rust

MQTT

This feature provides a client and a service adapter for your interfaces over the MQTT protocol, built on the rumqttc crate. It lets you connect applications built with the same or different technologies โ€” check all of our templates and the MQTT feature in other templates that support it.

  • Use an MQTT client in place of your local implementation to receive data from a remote service.
  • Use an MQTT service adapter to expose your implementation as a remote service.
note

This feature requires api and core.

tip

The MQTT broker is not part of the template. To run a client and a service you need a broker (for example Mosquitto) reachable by both.

File overview for moduleโ€‹

With our example API definition

Hello World API (click to expand)
schema: apigear.module/1.0
name: io.world
version: "1.0.0"

interfaces:
- name: Hello
properties:
- { name: last, type: Message }
operations:
- name: say
params:
- { name: msg, type: Message }
- { name: when, type: When }
return:
type: int
signals:
- name: justSaid
params:
- { name: msg, type: Message }
enums:
- name: When
members:
- { name: Now, value: 0 }
- { name: Soon, value: 1 }
- { name: Never, value: 2 }
structs:
- name: Message
fields:
- { name: content, type: string }

the following files are generated. The purpose and content of each file is explained below.

๐Ÿ“‚io_world
โ”ฃ ๐Ÿ“‚src
โ”ƒ โ”ฃ ๐Ÿ“‚mqtt
โ”ƒ โ”ƒ โ”ฃ ๐Ÿ“œmod.rs
โ”ƒ โ”ƒ โ”ฃ ๐Ÿ“œhello_client.rs # MQTT client adapter for Hello
โ”ƒ โ”ƒ โ”— ๐Ÿ“œhello_service.rs # MQTT service adapter for Hello
โ”ƒ โ”— ๐Ÿ“œlib.rs
โ”ฃ ๐Ÿ“‚tests
โ”ƒ โ”ฃ ๐Ÿ“œmqtt_common.rs # broker test helper
โ”ƒ โ”— ๐Ÿ“œmqtt_hello_test.rs # round-trip tests for Hello
...

The adapters map your interface onto MQTT topics: operations are published as request messages, while properties, signals and the service's state use dedicated topics. The service publishes property and state messages as retained, so a client that connects later still receives the current state.

MQTT client adapterโ€‹

The file ๐Ÿ“œhello_client.rs contains HelloMqttClient, the MQTT client version of the Hello interface. It implements HelloTrait, so you use it like a local implementation. It takes a shared rumqttc AsyncClient, subscribes to the interface's topics, and decodes incoming messages.

let client = Arc::new(HelloMqttClient::new(Arc::new(mqtt_async_client)));
client.subscribe_topics().await.expect("subscribe to topics");

Propertiesโ€‹

A getter (here last()) returns the locally cached value last received from the service. A setter (here set_last()) publishes a change request; the local value updates when the service confirms the change. Subscribe to changes through the Publisher returned by publisher().

note

Property and state messages are retained on the broker, so a client connecting after the service started still receives the current property values.

Operationsโ€‹

Operations are published as request messages and awaited:

let result = client.say(&message, WhenEnum::Now).await;

Signalsโ€‹

Do not emit signals from a client. Subscribe to signals through the Publisher; incoming signal messages are delivered on the matching broadcast channel.

MQTT service adapterโ€‹

The file ๐Ÿ“œhello_service.rs contains HelloMqttService, which wraps a local Hello implementation and exposes it over MQTT. It applies incoming operation and property-change requests to your local object and publishes property changes and signals back to clients.

  • Properties โ€” a change on your local object (or a client request) is published to all clients.
  • Operations โ€” a request is run on your local object; the result is returned only to the requesting client.
  • Signals โ€” a signal emitted by your local object is forwarded to all clients.

Use the adaptersโ€‹

The generated examples crate ships ready-to-run mqtt_server and mqtt_client binaries. The client creates a rumqttc AsyncClient, hands it to the adapter, and pumps the MQTT event loop so the adapter receives state, property changes and signals:

use rumqttc::{AsyncClient, Event, MqttOptions, Packet};
use std::sync::Arc;
use std::time::Duration;

let mut opts = MqttOptions::new("hello-client", "127.0.0.1", 1883);
opts.set_keep_alive(Duration::from_secs(5));
let (mqtt, mut eventloop) = AsyncClient::new(opts, 64);

let client = Arc::new(HelloMqttClient::new(Arc::new(mqtt)));
client.subscribe_topics().await.expect("subscribe to topics");

// Drive the event loop so incoming messages reach the adapter.
let pump = client.clone();
tokio::spawn(async move {
loop {
if let Ok(Event::Incoming(Packet::Publish(p))) = eventloop.poll().await {
pump.handle_message(&p.topic, &p.payload);
}
}
});

// Use the client like a local Hello implementation:
let _ = client.say(&Default::default(), WhenEnum::Now).await;

Start a broker, then run the two binaries in separate terminals (override the broker port with the MQTT_PORT environment variable, default 1883):

mosquitto -p 1883 &
cargo run -p rust_hello_world_examples --bin mqtt_server
cargo run -p rust_hello_world_examples --bin mqtt_client

Testsโ€‹

The MQTT feature generates round-trip tests in ๐Ÿ“œtests/mqtt_hello_test.rs, backed by the helper in ๐Ÿ“œmqtt_common.rs. They exercise a real client โ†” service round-trip over a live broker, so they are marked #[ignore] and skipped by default. Run them against a broker the way CI does:

mosquitto -p 1883 &
cargo test --manifest-path goldenmaster/Cargo.toml -- --ignored