Skip to main content
Docs: template-rust

MQTT

This feature provides a client and a service adapter for your interfaces over the MQTT protocol, built on the rumqttc crate. It lets you connect applications built with the same or different technologies β€” check all of our templates and the MQTT feature in other templates that support it.

  • Use an MQTT client in place of your local implementation to receive data from a remote service.
  • Use an MQTT service adapter to expose your implementation as a remote service.
note

This feature requires api and core.

tip

The MQTT broker is not part of the template. To run a client and a service you need a broker (for example Mosquitto) reachable by both.

File overview for module​

With our example API definition

Hello World API (click to expand)
schema: apigear.module/1.0
name: io.world
version: "1.0.0"

interfaces:
- name: Hello
properties:
- { name: last, type: Message }
operations:
- name: say
params:
- { name: msg, type: Message }
- { name: when, type: When }
return:
type: int
signals:
- name: justSaid
params:
- { name: msg, type: Message }
enums:
- name: When
members:
- { name: Now, value: 0 }
- { name: Soon, value: 1 }
- { name: Never, value: 2 }
structs:
- name: Message
fields:
- { name: content, type: string }

the following files are generated. The purpose and content of each file is explained below.

πŸ“‚io_world
┣ πŸ“‚src
┃ ┣ πŸ“‚mqtt
┃ ┃ ┣ πŸ“œmod.rs
┃ ┃ ┣ πŸ“œhello_client.rs # MQTT client adapter for Hello
┃ ┃ β”— πŸ“œhello_service.rs # MQTT service adapter for Hello
┃ β”— πŸ“œlib.rs
┣ πŸ“‚tests
┃ ┣ πŸ“œmqtt_common.rs # broker test helper
┃ β”— πŸ“œmqtt_hello_test.rs # round-trip tests for Hello
...

The adapters speak the agreed ApiGear MQTT (MQTT 5) wire scheme, so a Rust client or service interoperates with the services and clients generated by the other ApiGear templates (C++, Qt, Python, …) over the same broker. For module io.world / interface Hello the topics are:

MessageTopicDirection
operation requestio.world/Hello/rpc/<op>client β†’ service
operation replythe request's MQTT 5 ResponseTopic (io.world/Hello/rpc/<op>/<clientId>/result)service β†’ client
property change requestio.world/Hello/set/<prop>client β†’ service
property notificationio.world/Hello/prop/<prop> (retained)service β†’ client
signalio.world/Hello/sig/<sig>service β†’ client

Operation replies are correlated using the request's MQTT 5 CorrelationData, which the service echoes back. Because property notifications are retained, a client that connects later still receives the current value (the MQTT scheme has no separate state topic).

See ApiGear over MQTT for the topic structure and payload format.

MQTT client adapter​

The file πŸ“œhello_client.rs contains HelloMqttClient, the MQTT client version of the Hello interface. It implements HelloTrait, so you use it like a local implementation. It takes a shared rumqttc v5 AsyncClient and a unique client_id (used to route RPC replies), subscribes to the interface's topics, and decodes incoming messages.

let client = Arc::new(HelloMqttClient::new(Arc::new(mqtt_async_client), "hello-client"));
client.subscribe_topics().await.expect("subscribe to topics");

Properties​

A getter (here last()) returns the locally cached value last received from the service. A setter (here set_last()) publishes a change request; the local value updates when the service confirms the change. Subscribe to changes through the Publisher returned by publisher().

note

Property notifications are retained on the broker, so a client connecting after the service started still receives the current property values.

Operations​

Operations are published as request messages and awaited:

let result = client.say(&message, WhenEnum::Now).await;

Signals​

Do not emit signals from a client. Subscribe to signals through the Publisher; incoming signal messages are delivered on the matching broadcast channel.

MQTT service adapter​

The file πŸ“œhello_service.rs contains HelloMqttService, which wraps a local Hello implementation and exposes it over MQTT. It applies incoming operation and property-change requests to your local object and publishes property changes and signals back to clients.

  • Properties β€” a change on your local object (or a client request) is published to all clients.
  • Operations β€” a request is run on your local object; the result is returned only to the requesting client.
  • Signals β€” a signal emitted by your local object is forwarded to all clients.

Use the adapters​

The generated examples crate ships ready-to-run mqtt_server and mqtt_client binaries. The client creates a rumqttc v5 AsyncClient, hands it to the adapter, and pumps the MQTT event loop so the adapter receives RPC replies, property changes and signals. The incoming MQTT 5 CorrelationData is forwarded to the adapter so it can match RPC replies:

use rumqttc::v5::mqttbytes::v5::Packet;
use rumqttc::v5::{AsyncClient, Event, MqttOptions};
use std::sync::Arc;
use std::time::Duration;

let client_id = "hello-client";
let mut opts = MqttOptions::new(client_id, "127.0.0.1", 1883);
opts.set_keep_alive(Duration::from_secs(5));
let (mqtt, mut eventloop) = AsyncClient::new(opts, 64);

let client = Arc::new(HelloMqttClient::new(Arc::new(mqtt), client_id));
client.subscribe_topics().await.expect("subscribe to topics");

// Drive the event loop so incoming messages reach the adapter.
let pump = client.clone();
tokio::spawn(async move {
loop {
if let Ok(Event::Incoming(Packet::Publish(p))) = eventloop.poll().await {
let topic = String::from_utf8_lossy(&p.topic);
let correlation_data = p.properties.as_ref().and_then(|pr| pr.correlation_data.as_deref());
pump.handle_message(&topic, &p.payload, correlation_data);
}
}
});

// Use the client like a local Hello implementation; the call awaits the RPC reply:
let _ = client.say(&Default::default(), WhenEnum::Now).await;

Start a broker, then run the two binaries in separate terminals (override the broker port with the MQTT_PORT environment variable, default 1883):

mosquitto -p 1883 &
cargo run -p rust_hello_world_examples --bin mqtt_server
cargo run -p rust_hello_world_examples --bin mqtt_client

Tests​

The MQTT feature generates round-trip tests in πŸ“œtests/mqtt_hello_test.rs, backed by the helper in πŸ“œmqtt_common.rs. They exercise a real client ↔ service round-trip over a live broker, so they are marked #[ignore] and skipped by default. Run them against a broker the way CI does:

mosquitto -p 1883 &
cargo test --manifest-path goldenmaster/Cargo.toml -- --ignored