MQTT
This feature provides a client and a service adapter for your interfaces over the MQTT protocol, built on the rumqttc crate. It lets you connect applications built with the same or different technologies β check all of our templates and the MQTT feature in other templates that support it.
- Use an MQTT client in place of your local implementation to receive data from a remote service.
- Use an MQTT service adapter to expose your implementation as a remote service.
This feature requires api and core.
The MQTT broker is not part of the template. To run a client and a service you need a broker (for example Mosquitto) reachable by both.
File overview for moduleβ
With our example API definition
Hello World API (click to expand)
schema: apigear.module/1.0
name: io.world
version: "1.0.0"
interfaces:
- name: Hello
properties:
- { name: last, type: Message }
operations:
- name: say
params:
- { name: msg, type: Message }
- { name: when, type: When }
return:
type: int
signals:
- name: justSaid
params:
- { name: msg, type: Message }
enums:
- name: When
members:
- { name: Now, value: 0 }
- { name: Soon, value: 1 }
- { name: Never, value: 2 }
structs:
- name: Message
fields:
- { name: content, type: string }
the following files are generated. The purpose and content of each file is explained below.
πio_world
β£ πsrc
β β£ πmqtt
β β β£ πmod.rs
β β β£ πhello_client.rs # MQTT client adapter for Hello
β β β πhello_service.rs # MQTT service adapter for Hello
β β πlib.rs
β£ πtests
β β£ πmqtt_common.rs # broker test helper
β β πmqtt_hello_test.rs # round-trip tests for Hello
...
The adapters speak the agreed ApiGear MQTT (MQTT 5) wire scheme, so a Rust client or service interoperates with the services and clients generated by the other ApiGear templates (C++, Qt, Python, β¦) over the same broker. For module io.world / interface Hello the topics are:
| Message | Topic | Direction |
|---|---|---|
| operation request | io.world/Hello/rpc/<op> | client β service |
| operation reply | the request's MQTT 5 ResponseTopic (io.world/Hello/rpc/<op>/<clientId>/result) | service β client |
| property change request | io.world/Hello/set/<prop> | client β service |
| property notification | io.world/Hello/prop/<prop> (retained) | service β client |
| signal | io.world/Hello/sig/<sig> | service β client |
Operation replies are correlated using the request's MQTT 5 CorrelationData, which the service echoes back. Because property notifications are retained, a client that connects later still receives the current value (the MQTT scheme has no separate state topic).
See ApiGear over MQTT for the topic structure and payload format.
MQTT client adapterβ
The file πhello_client.rs contains HelloMqttClient, the MQTT client version of the Hello interface. It implements HelloTrait, so you use it like a local implementation. It takes a shared rumqttc v5 AsyncClient and a unique client_id (used to route RPC replies), subscribes to the interface's topics, and decodes incoming messages.
let client = Arc::new(HelloMqttClient::new(Arc::new(mqtt_async_client), "hello-client"));
client.subscribe_topics().await.expect("subscribe to topics");
Propertiesβ
A getter (here last()) returns the locally cached value last received from the service. A setter (here set_last()) publishes a change request; the local value updates when the service confirms the change. Subscribe to changes through the Publisher returned by publisher().
Property notifications are retained on the broker, so a client connecting after the service started still receives the current property values.
Operationsβ
Operations are published as request messages and awaited:
let result = client.say(&message, WhenEnum::Now).await;
Signalsβ
Do not emit signals from a client. Subscribe to signals through the Publisher; incoming signal messages are delivered on the matching broadcast channel.
MQTT service adapterβ
The file πhello_service.rs contains HelloMqttService, which wraps a local Hello implementation and exposes it over MQTT. It applies incoming operation and property-change requests to your local object and publishes property changes and signals back to clients.
- Properties β a change on your local object (or a client request) is published to all clients.
- Operations β a request is run on your local object; the result is returned only to the requesting client.
- Signals β a signal emitted by your local object is forwarded to all clients.
Use the adaptersβ
The generated examples crate ships ready-to-run mqtt_server and mqtt_client binaries. The client creates a rumqttc v5 AsyncClient, hands it to the adapter, and pumps the MQTT event loop so the adapter receives RPC replies, property changes and signals. The incoming MQTT 5 CorrelationData is forwarded to the adapter so it can match RPC replies:
use rumqttc::v5::mqttbytes::v5::Packet;
use rumqttc::v5::{AsyncClient, Event, MqttOptions};
use std::sync::Arc;
use std::time::Duration;
let client_id = "hello-client";
let mut opts = MqttOptions::new(client_id, "127.0.0.1", 1883);
opts.set_keep_alive(Duration::from_secs(5));
let (mqtt, mut eventloop) = AsyncClient::new(opts, 64);
let client = Arc::new(HelloMqttClient::new(Arc::new(mqtt), client_id));
client.subscribe_topics().await.expect("subscribe to topics");
// Drive the event loop so incoming messages reach the adapter.
let pump = client.clone();
tokio::spawn(async move {
loop {
if let Ok(Event::Incoming(Packet::Publish(p))) = eventloop.poll().await {
let topic = String::from_utf8_lossy(&p.topic);
let correlation_data = p.properties.as_ref().and_then(|pr| pr.correlation_data.as_deref());
pump.handle_message(&topic, &p.payload, correlation_data);
}
}
});
// Use the client like a local Hello implementation; the call awaits the RPC reply:
let _ = client.say(&Default::default(), WhenEnum::Now).await;
Start a broker, then run the two binaries in separate terminals (override the broker port with the MQTT_PORT environment variable, default 1883):
mosquitto -p 1883 &
cargo run -p rust_hello_world_examples --bin mqtt_server
cargo run -p rust_hello_world_examples --bin mqtt_client
Testsβ
The MQTT feature generates round-trip tests in πtests/mqtt_hello_test.rs, backed by the helper in πmqtt_common.rs. They exercise a real client β service round-trip over a live broker, so they are marked #[ignore] and skipped by default. Run them against a broker the way CI does:
mosquitto -p 1883 &
cargo test --manifest-path goldenmaster/Cargo.toml -- --ignored