When you need an MQTT client in Python, you almost always end up looking at paho-mqtt. It is mature, battle-tested, and the most popular option by far. The awkward part is that its API is built around callbacks, while modern Python services usually live in asyncio: FastAPI, background workers, async clients, and one shared event loop.
I ran into exactly this problem on an IoT project. I needed an MQTT client that could fit into an async application without a pile of adapters, and I wanted subscriptions to be managed objects rather than callbacks attached to a long-lived client.
FastStream had a similar gap for a long time. MQTT support was requested back in November 2023, but it was not obvious what the underlying driver should be. Wrapping paho-mqtt in an async layer is possible, but the foundation is still a callback/threading model. For a FastStream integration, that felt like the wrong base. So I wrote a separate driver, zmqtt, and then built MQTT support for FastStream on top of it.
My name is Boris Alekseev. I am a senior software engineer and lead at Raiffeisen Bank, one of the FastStream maintainers, and the author of zmqtt. In this article I will show what an MQTT service in Python can look like now: async handlers, managed subscriptions, tests without a real broker, and AsyncAPI documentation. I will also explain why this required a new driver in the first place.
Why start with a new driver?
paho-mqtt is a mature project and the de facto standard MQTT client for Python. It supports the popular MQTT protocol versions: 5.0, 3.1.1, and 3.1. There are async wrappers around paho, but that still was not enough for FastStream.
FastStream does not just need to wait for the next message from the broker. It needs to route a stream of messages between subscribers, manage subscriber lifecycles, and avoid turning every subscription into a separate MQTT connection.
With paho-mqtt, you essentially get two choices:
- Build a complex routing layer on top of somebody else's callback model.
- Create a separate connection per subscription.
The first option is unpleasant to maintain. The second is wasteful enough to be a non-starter.
I also wanted a more convenient API: a subscription object instead of a callback, a built-in request/response implementation, and strongly typed data structures instead of generic containers for MQTT 5.0 properties.
| Criterion | paho-mqtt | zmqtt |
|---|---|---|
| Execution model | Callback API, threading | asyncio |
| Subscriptions |
subscribe() + callbacks |
Subscription as a lifecycle-managed object |
| Request/response | No high-level abstraction out of the box |
request() for MQTT 5.0: subscribes to a reply topic and waits for the response |
| FastStream integration | Needs a separate routing and lifecycle layer | Written specifically as an async foundation for FastStream |
| Maturity | Years in production, the most popular option | New library, fewer than 100 stars :) |
Why FastStream is useful here
FastStream's author, Nikita Pastukhov, described the idea of FastStream as a feature-rich broker client: a layer that adds routing, serialization, validation, observability, documentation, and testing tools so you can write less infrastructure code and focus on application logic.
If zmqtt handles the low-level network interaction with the MQTT protocol, FastStream handles the higher-level application concerns: serialization and deserialization, Dependency Injection, observability and tracing, AsyncAPI documentation, and testing utilities.
Install it like this:
pip install "faststream[mqtt]"
To run the examples locally, start an MQTT broker in a separate terminal:
docker run --rm -p 1883:1883 eclipse-mosquitto:2 \
mosquitto -c /mosquitto-no-auth.conf
A small service
Here is a complete example you can put into main.py and run. The application subscribes to device temperature readings, extracts device_id from the MQTT topic, and validates the payload as a float. After startup, it publishes a test message so you can see the handler in action.
from typing import Annotated
from faststream import FastStream, Path
from faststream.mqtt import MQTTBroker
broker = MQTTBroker()
app = FastStream(broker)
@broker.subscriber("temperature-sensors/{device_id}/temperature")
async def handle_temperature(
temperature: float,
device_id: Annotated[str, Path()],
) -> None:
print(f"{device_id}: {temperature}")
@app.after_startup
async def publish_test_message() -> None:
await broker.publish(
21.5,
"temperature-sensors/thermostat-1/temperature",
)
Run it:
faststream run main:app
FastStream sends an MQTT SUBSCRIBE for the topic filter temperature-sensors/+/temperature. When a message arrives on temperature-sensors/thermostat-1/temperature, the handler receives device_id="thermostat-1" and temperature=21.5.
Runtime features
MQTT versions
MQTT 3.1.1 and MQTT 5.0 are supported. MQTT 3.1 is not supported in zmqtt yet, but it is on the roadmap.
Concurrent message processing
By default, a subscriber processes messages sequentially. If you want to process several messages at the same time, set max_workers.
@broker.subscriber(
"temperature-sensors/{device_id}/temperature",
max_workers=5,
)
async def handle_temperature(
temperature: float,
device_id: Annotated[str, Path()],
) -> None:
print(f"{device_id}: {temperature}")
Shared subscriptions
MQTT shared subscriptions let multiple clients subscribe to the same topic so that each message is delivered to only one client in the group. This is similar to a Kafka consumer group.
@broker.subscriber("workers/jobs/#", shared="pool-a")
async def handle_job(job: dict) -> None:
print(job)
The broker receives a SUBSCRIBE for $share/pool-a/workers/jobs/#.
Acknowledgment
In MQTT, QoS describes delivery between the broker and the client, not whether your business logic successfully processed the message. So the idiomatic MQTT flow usually keeps the protocol-level ack inside the client: receive the packet, acknowledge it, then call application code.
FastStream still lets you control this explicitly. You can send the acknowledgment for QoS 1, or start the protocol-level acknowledgment exchange for QoS 2 after user code has run. Treat this more as lifecycle control than as a recommendation to turn MQTT into a task queue with business-level acknowledgments.
from faststream import AckPolicy, FastStream
from faststream.mqtt import MQTTBroker, MQTTMessage, QoS
broker = MQTTBroker()
app = FastStream(broker)
@broker.subscriber(
"jobs/run",
qos=QoS.AT_LEAST_ONCE,
ack_policy=AckPolicy.MANUAL,
)
async def work(payload: dict, msg: MQTTMessage) -> None:
try:
print(payload)
finally:
await msg.ack()
By default, a message is acknowledged when it is received. For most MQTT use cases, that is the behavior you want.
Publish
You can publish directly through the broker:
await broker.publish(
{"value": 21.5, "unit": "celsius"},
"devices/thermostat-1/rooms/kitchen/temperature",
qos=QoS.AT_LEAST_ONCE,
retain=True,
)
If publishing is part of a handler, it is often cleaner to declare the publisher with a decorator.
@broker.subscriber("devices/{device_id}/commands")
@broker.publisher("devices/events")
async def handle_command(
device_id: Annotated[str, Path()],
command: dict,
) -> dict:
print(f"Command for device {device_id}: {command}")
return {"device_id": device_id, "command": command}
Request/response
In MQTT 5.0, request/response is built on standard PUBLISH properties: Response Topic and Correlation Data. If you do not pass a correlation ID explicitly, FastStream generates one for you.
The core MQTT 5.0 logic lives in zmqtt: the client creates a temporary reply topic, subscribes to it, publishes the request, and waits for the first response message. If no response arrives in time, you get a timeout error.
broker = MQTTBroker(version="5.0")
reply = await broker.request(
{"request_id": "req-42"},
"devices/thermostat-1/status",
timeout=5.0,
)
print(reply.body)
MQTT 3.1.1 does not have those properties, so FastStream supports request/response through an explicit reply_to topic. It works as long as the message handler knows, by convention, where it should send the response:
reply = await broker.request(
"ping",
"devices/thermostat-1/status",
reply_to="devices/replies/thermostat-1",
timeout=5.0,
)
The FastStream broker subscribes to reply_to, publishes the request, and waits for the first response in that topic. Calling request() with MQTT 3.1.1 and no reply_to is an error.
AsyncAPI
Let's return to the original example, install uvicorn, and create an ASGI app from the FastStream application:
pip install uvicorn
app = FastStream(broker).as_asgi(asyncapi_path="/docs")
Run it:
faststream run main:app
Now app is a full ASGI application. Besides working with the broker, it can serve HTTP endpoints. The AsyncAPI specification is available at http://localhost:8000/docs and looks like this:
The UI also includes a Try It Out button. You can simulate sending a message into the application, or, by selecting the relevant checkbox, publish a message directly to the broker. That lets you test a handler without a separate publisher client, very much like Swagger UI.
Testing
FastStream provides an async context manager called TestMQTTBroker for testing applications. When you publish through it, it calls handlers directly, without a real MQTT broker. Inside the context manager, handle_temperature.mock records the same call that the handler receives, which is very convenient in tests.
test_example.py:
import pytest
from faststream.mqtt import MQTTBroker, TestMQTTBroker
broker = MQTTBroker()
@broker.subscriber("devices/{device_name}/temperature")
async def handle_temperature(payload: dict) -> None:
print(payload)
@pytest.mark.asyncio
async def test_collect_temperature() -> None:
async with TestMQTTBroker(broker) as test_broker:
await test_broker.publish(
{"value": 21.5, "unit": "celsius"},
topic="devices/thermostat-1/temperature",
)
handle_temperature.mock.assert_called_once_with({"value": 21.5, "unit": "celsius"})
Run it:
pytest test_example.py
Conclusion
MQTT services in Python can now be written in the same declarative style as FastStream applications for other brokers. The handler describes application logic, while the framework handles subscriptions, validation, tests, and infrastructure integration.
zmqtt remains a standalone library. If you need a small asyncio MQTT client for MQTT 3.1.1 or 5.0 with zero external dependencies, you can use it without FastStream.
If you use MQTT in Python, I would be glad to hear your feedback: how you use MQTT in your projects, and what problems you have run into. And if the project looks useful, please consider starring zmqtt. It helps the project become easier to discover.















