MQTT 数据获取与调试实战:五种方法实时接收设备消息
本文完整介绍从 EMQX Broker 获取 MQTT 数据的五种主流方式,涵盖 Dashboard 工具、命令行、桌面客户端、程序化订阅和生产级数据持久化方案。
目录
- 1. 前置条件
- 2. 方法一:EMQX Dashboard WebSocket 客户端(最便捷)
- 3. 方法二:mosquitto_sub 命令行工具(服务器端首选)
- 4. 方法三:MQTTX 桌面客户端(调试利器)
- 5. 方法四:Python 程序化订阅(生产级数据采集)
- 6. 方法五:EMQX 规则引擎持久化(企业级方案)
- 7. 理解数据格式
- 8. 数据解析实战
- 9. 查看设备活动状态
- 10. 故障排查指南
- 11. 方法选型总结
1. 前置条件
在开始数据获取之前,请确认以下条件已满足:
| 检查项 | 验证方法 |
|---|---|
| ✅ EMQX 已安装并运行 | sudo systemctl status emqx → active (running) |
| ✅ 已配置客户端认证 | Dashboard → 访问控制 → 认证 → 确认认证器已创建 |
| ✅ 设备已连接并发布数据 | Dashboard → 客户端 → 确认设备在线 |
| ✅ 已知目标主题 | 设备配置中的 Data Topic(如 data/test) |
如果你尚未完成 EMQX 部署,请先参考 EMQX 详细安装指南。
2. 方法一:EMQX Dashboard WebSocket 客户端(最便捷)
适用场景
- 快速验证设备是否在正常发布数据
- 无需安装任何额外工具
- 调试阶段的数据查看
操作步骤
- 登录 EMQX Dashboard:浏览器访问
http://<服务器公网IP>:18083 - 左侧菜单 → 问题诊断 → WebSocket 客户端
- 在 WebSocket 客户端页面中:
- 连接配置:
- Host:默认
127.0.0.1(Dashboard 所在服务器即为 EMQX,无需修改) - Port:默认
8083(MQTT over WebSocket 端口) - 路径:
/mqtt - Username / Password:填写已创建的认证用户凭据
- Host:默认
- 点击 连接 按钮
- 连接配置:
- 连接成功后,在 订阅 区域:
- Topic:输入你的数据主题,如
data/test - QoS:选择
1 - 点击 订阅
- Topic:输入你的数据主题,如
- 消息将实时显示在下方的消息列表中
┌──────────────────────────────────────────────────┐
│ EMQX Dashboard - WebSocket 客户端 │
│ │
│ 连接状态: ✅ 已连接 [断开] │
│ ──────────────────────────────────────────────── │
│ 订阅列表: │
│ Topic: data/test QoS: 1 [取消订阅] │
│ ──────────────────────────────────────────────── │
│ 消息列表: │
│ ┌─────────────────────────────────────────────┐ │
│ │ 14:30:01 [data/test] │ │
│ │ {"d":[{"tag":"Temp","value":25.3,"quality":0}]│ │
│ │ ,"ts":"2026-06-29T06:30:01+0000"} │ │
│ ├─────────────────────────────────────────────┤ │
│ │ 14:30:11 [data/test] │ │
│ │ {"d":[{"tag":"Temp","value":25.5,"quality":0}]│ │
│ │ ,"ts":"2026-06-29T06:30:11+0000"} │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 发布消息: [发布] │
│ Topic: [ ] QoS: [1▼] │
│ Payload: [ ] │
└──────────────────────────────────────────────────┘
重要说明
- 端口 8083 是 MQTT over WebSocket 端口,与 Dashboard 的 18083 不同(18083 是 HTTP 管理后台端口)
- Dashboard WebSocket 客户端默认连接
127.0.0.1:8083,这意味着它通过服务器本地回环地址连接,无需经过公网 - 如果你想从本地电脑通过 WebSocket 连接 EMQX(而不通过 Dashboard),需要在安全组中放行 8083 端口,并使用公网 IP
3. 方法二:mosquitto_sub 命令行工具(服务器端首选)
适用场景
- 在服务器端快速验证
- 自动化脚本和管道处理
- 轻量级,无需图形界面
3.1 安装 mosquitto-clients
bash
# CentOS / RHEL
sudo yum install epel-release -y
sudo yum install mosquitto -y
# Ubuntu / Debian
sudo apt update
sudo apt install mosquitto-clients -y
# 验证安装
mosquitto_sub --version
3.2 订阅命令详解
bash
# 基本订阅(需替换为你的实际凭据和主题)
mosquitto_sub \
-h 127.0.0.1 \ # Broker 地址,服务器端用 127.0.0.1
-p 1883 \ # MQTT 端口
-u <用户名> \ # 认证用户名
-P <密码> \ # 认证密码
-t "data/test" \ # 订阅主题(支持通配符 data/#)
-q 1 \ # QoS 等级
-v # 输出格式:主题 + 消息内容
各参数说明:
| 参数 | 含义 | 示例 |
|---|---|---|
-h | Broker 地址 | 127.0.0.1(本机)或 123.56.133.131(公网) |
-p | 端口 | 1883(标准 MQTT),8883(TLS) |
-u | 用户名 | EMQX 认证器中创建的用户 |
-P | 密码 | 对应的密码 |
-t | 主题 | data/test,data/# |
-q | QoS | 0 / 1 / 2 |
-v | 详细输出 | 显示”主题 消息内容” |
-i | Client ID | 自定义客户端标识 |
-d | 调试模式 | 打印协议交互细节 |
-c | 禁用 Clean Session | 断线重连后接收离线消息 |
3.3 实用技巧
bash
# 1. 将收到的消息保存到日志文件
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
-t "data/#" -q 1 -v | tee -a mqtt_data.log
# 2. 带时间戳的订阅(每条消息前加时间)
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
-t "data/#" -q 1 -v | while read line; do
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $line"
done
# 3. 过滤特定内容(结合 jq 解析 JSON)
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
-t "data/test" -q 1 | while read msg; do
echo "$msg" | jq '.d[0].value'
done
# 4. 同时订阅多个主题
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
-t "data/test" -t "cmd/test" -t "status/#" -q 1 -v
3.4 发布测试消息
bash
# 手动发布一条消息,用于测试订阅端是否能收到
mosquitto_pub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
-t "data/test" \
-m '{"d":[{"tag":"TestTag","value":42.0,"quality":0}],"ts":"2026-06-29T06:00:00+0000"}' \
-q 1
4. 方法三:MQTTX 桌面客户端(调试利器)
适用场景
- 图形化调试,直观查看消息
- 同时管理多个 Broker 连接
- 进行主题测试和消息模拟
4.1 下载安装
- 官网下载:https://mqttx.app/(支持 Windows / macOS / Linux)
- 同时提供命令行版本 MQTTX CLI,适合脚本环境
4.2 创建新连接
- 打开 MQTTX,点击左侧 + 新建连接
- 填写连接参数:
| 参数 | 值 | 说明 |
|---|---|---|
| Name | My_Cloud_Broker | 自定义名称,仅用于识别 |
| Host | mqtt:// + 服务器公网IP | 示例:mqtt://123.56.133.131 |
| Port | 1883 | 标准 MQTT;TLS 用 8883 |
| Username | 你的认证用户名 | 与 EMQX 中创建的一致 |
| Password | 你的认证密码 | 与 EMQX 中创建的一致 |
| Client ID | 自动生成或自定义 | 推荐自定义,便于在 Dashboard 中识别 |
| MQTT Version | 5.0 或 3.1.1 | 根据需求选择 |
- 点击右上角 连接
4.3 订阅主题
连接成功后,在 添加订阅 区域:
- Topic:
data/test(或data/#匹配所有数据主题) - QoS:
1 - 点击 确认
4.4 MQTTX CLI 命令行方式
bash
# 安装(需 Node.js 环境)
npm install -g mqttx
# 订阅
mqttx sub -h 123.56.133.131 -p 1883 \
-u <用户名> -P <密码> \
-t "data/test" -q 1
# 发布测试消息
mqttx pub -h 123.56.133.131 -p 1883 \
-u <用户名> -P <密码> \
-t "data/test" \
-m '{"d":[{"tag":"TestTag","value":25.3,"quality":0}]}' \
-q 1
5. 方法四:Python 程序化订阅(生产级数据采集)
适用场景
- 需要将 MQTT 数据写入数据库
- 实时数据处理与分析
- 触发告警、联动控制等业务逻辑
5.1 安装依赖
bash
pip install paho-mqtt
5.2 基础订阅脚本
python
#!/usr/bin/env python3
"""
MQTT 数据订阅与采集脚本
功能:连接 EMQX Broker,订阅指定主题,实时接收并处理消息
"""
import json
import logging
import signal
import sys
from datetime import datetime
import paho.mqtt.client as mqtt
# ============ 配置区(请替换为实际值) ============
BROKER_HOST = "123.56.133.131" # 服务器公网 IP
BROKER_PORT = 1883 # MQTT 端口
USERNAME = "your_username" # EMQX 认证用户名
PASSWORD = "your_password" # EMQX 认证密码
CLIENT_ID = "python_subscriber_001"
TOPICS = [
("data/#", 1), # 订阅所有数据主题,QoS 1
]
# =================================================
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# 优雅退出标志
running = True
def signal_handler(sig, frame):
"""处理 Ctrl+C 信号,优雅退出"""
global running
logger.info("收到退出信号,正在断开连接...")
running = False
def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
"""
连接回调
reason_code 含义参考:
0: 连接成功
4: 用户名或密码错误
5: 未授权
"""
if reason_code == 0:
logger.info(f"成功连接到 Broker: {BROKER_HOST}:{BROKER_PORT}")
# 订阅主题
for topic, qos in TOPICS:
result, mid = client.subscribe(topic, qos)
if result == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"已订阅: {topic} (QoS {qos})")
else:
logger.error(f"订阅失败: {topic} (错误码: {result})")
else:
reason_map = {
1: "协议版本不支持",
2: "Client ID 被拒绝",
3: "服务不可用",
4: "用户名或密码错误",
5: "未授权",
}
msg = reason_map.get(reason_code, f"未知错误 ({reason_code})")
logger.error(f"连接失败: {msg}")
def on_disconnect(client: mqtt.Client, userdata, reason_code, properties=None):
"""断开连接回调"""
if reason_code != 0:
logger.warning(f"意外断开连接 (原因码: {reason_code}),将自动重连...")
else:
logger.info("已正常断开连接")
def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
"""接收消息回调"""
try:
payload_str = msg.payload.decode("utf-8")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# 尝试解析 JSON
try:
data = json.loads(payload_str)
logger.info(f"[{timestamp}] Topic: {msg.topic} | QoS: {msg.qos}")
logger.info(f" Data: {json.dumps(data, ensure_ascii=False, indent=2)}")
# ===== 在此处添加你的业务逻辑 =====
process_message(msg.topic, data)
# =================================
except json.JSONDecodeError:
logger.info(f"[{timestamp}] Topic: {msg.topic} | Payload (非JSON): {payload_str}")
except Exception as e:
logger.error(f"消息处理异常: {e}", exc_info=True)
def process_message(topic: str, data: dict):
"""
自定义消息处理逻辑
你可以在这里实现:写入数据库、触发告警、转发到其他系统等
"""
# 示例:提取传感器数值
if "d" in data:
for item in data["d"]:
tag_name = item.get("tag", "unknown")
value = item.get("value")
quality = item.get("quality", -1)
logger.info(f" → 传感器: {tag_name} = {value} (质量: {quality})")
def main():
"""主函数"""
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 创建 MQTT 客户端
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5, # 使用 MQTT 5.0
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
# 设置回调函数
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
# 设置认证凭据
client.username_pw_set(USERNAME, PASSWORD)
# 可选:设置遗嘱消息
client.will_set(
topic=f"status/{CLIENT_ID}",
payload="offline",
qos=1,
retain=True,
)
# 启用自动重连
client.reconnect_delay_set(min_delay=1, max_delay=30)
try:
logger.info(f"正在连接 {BROKER_HOST}:{BROKER_PORT}...")
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
# 启动网络循环(非阻塞)
client.loop_start()
# 主循环保持运行
while running:
client.loop(timeout=1.0)
except ConnectionRefusedError:
logger.error(
"连接被拒绝!请检查:\n"
" 1. Broker 地址和端口是否正确\n"
" 2. 安全组/防火墙是否已放行端口\n"
" 3. 用户名和密码是否正确"
)
sys.exit(1)
except Exception as e:
logger.error(f"连接异常: {e}", exc_info=True)
sys.exit(1)
finally:
client.loop_stop()
client.disconnect()
logger.info("脚本已退出")
if __name__ == "__main__":
main()
5.3 扩展:写入数据库示例
python
# 在上述 process_message 函数中,可以添加以下逻辑:
import sqlite3 # 或使用 pymysql / psycopg2 / influxdb_client 等
DB_PATH = "mqtt_data.db"
def init_database():
"""初始化 SQLite 数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
tag_name TEXT NOT NULL,
value REAL,
quality INTEGER,
device_ts TEXT,
received_at TEXT DEFAULT (datetime('now'))
)
""")
conn.commit()
return conn
def save_to_database(conn, topic, tag_name, value, quality, device_ts):
"""保存数据到数据库"""
cursor = conn.cursor()
cursor.execute(
"INSERT INTO sensor_data (topic, tag_name, value, quality, device_ts) VALUES (?, ?, ?, ?, ?)",
(topic, tag_name, value, quality, device_ts),
)
conn.commit()
6. 方法五:EMQX 规则引擎持久化(企业级方案)
适用场景
- 生产环境,需要可靠的数据持久化
- 数据量大,需要写入时序数据库(InfluxDB、TimescaleDB)
- 需要将数据桥接到 Kafka、RabbitMQ、MySQL 等外部系统
6.1 配置步骤(以 MySQL 为例)
- Dashboard → 数据集成 → 规则
- 点击 创建,填写规则 SQL:
sql
SELECT
payload.d as data_items,
payload.ts as device_timestamp,
topic,
clientid,
timestamp
FROM
"data/#"
- 点击 创建动作,选择 MySQL 作为数据桥接目标
- 填写 MySQL 连接信息(地址、端口、数据库、用户名、密码)
- 配置 SQL 模板(写入语句):
sql
INSERT INTO mqtt_data(topic, tag_name, value, quality, device_ts, client_id)
VALUES (
${topic},
${data_items[0].tag},
${data_items[0].value},
${data_items[0].quality},
${device_timestamp},
${clientid}
)
- 点击 创建 完成规则配置
规则引擎的优势在于:数据从设备 → Broker → 数据库的整条链路在 EMQX 内部完成,无需额外编写代码,可靠性高,性能好。
7. 理解数据格式
7.1 SimpleMQTT 默认格式
EdgeLink 的 SimpleMQTT 模块默认使用 Simple Payload Type,JSON 格式如下:
json
{
"d": [
{
"tag": "Tag点名称1",
"value": 25.3,
"quality": 0
},
{
"tag": "Tag点名称2",
"value": 68.5,
"quality": 0
}
],
"ts": "2026-06-29T06:30:01+0000"
}
7.2 字段详解
| 字段 | 类型 | 说明 |
|---|---|---|
d | Array | 数据点数组,包含一个或多个 Tag 的数据 |
d[].tag | String | Tag 点名称,在 Edgelink Studio 中定义 |
d[].value | Number | 当前数值(可能为整数或浮点数) |
d[].quality | Integer | 数据质量标志:0 = 正常,非 0 = 异常(具体含义取决于设备) |
ts | String | ISO 8601 时间戳(UTC 时区) |
7.3 其他 Payload Type 格式
| Payload Type | 格式特点 | 示例 |
|---|---|---|
| Simple | 标准 JSON,字段清晰 | {"d":[...], "ts":"..."} |
| Simple with quality | 与 Simple 类似,quality 字段更详细 | {"d":[...], "ts":"..."} |
| Compact | 减少字段名,节省带宽 | {"v":[25.3], "t":"..."}(取决于设备实现) |
8. 数据解析实战
8.1 时间戳转换
数据中的 ts 字段是 UTC 时间,需转换为本地时区:
python
from datetime import datetime, timezone, timedelta
# 原始 UTC 时间戳
ts_str = "2026-06-29T06:30:01+0000"
# 解析为 datetime 对象
utc_time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
# 转换为北京时间(UTC+8)
beijing_tz = timezone(timedelta(hours=8))
beijing_time = utc_time.astimezone(beijing_tz)
print(f"UTC: {utc_time}")
print(f"北京: {beijing_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 输出:北京: 2026-06-29 14:30:01
8.2 异常数据过滤
python
def filter_valid_data(data: dict) -> list:
"""
过滤异常数据,仅返回 quality == 0 的"好"数据
"""
if "d" not in data:
return []
return [item for item in data["d"] if item.get("quality") == 0]
def detect_value_anomaly(value: float, min_val: float, max_val: float) -> bool:
"""
检测数值是否超出合理范围
示例:温度传感器合理范围 -40°C ~ 85°C
"""
return value < min_val or value > max_val
9. 查看设备活动状态
除了订阅消息,你还可以在 EMQX Dashboard 中直接查看设备的连接和活动信息:
9.1 查看在线客户端
Dashboard → 客户端 → 可看到所有当前连接的客户端列表,包括:
- Client ID
- 用户名
- IP 地址
- 连接时间
- Keep Alive 间隔
9.2 查看客户端详情
点击某个 Client ID,进入详情页:
| 标签页 | 可查看内容 |
|---|---|
| 基本信息 | 协议版本、连接状态、IP 地址、心跳间隔 |
| 订阅列表 | 该客户端当前订阅的所有主题及 QoS |
| 消息统计 | 发送/接收消息数量、字节数 |
| 会话 | 会话创建时间、过期时间、消息队列长度 |
9.3 实时监控面板
Dashboard → 监控 提供了全局概览:
- 连接数实时曲线
- 消息流入/流出速率(条/秒)
- 主题数和订阅数
10. 故障排查指南
10.1 收不到任何数据
按以下顺序逐项排查:
┌─ ① 设备是否在线? ─────────────────────────────────────┐
│ Dashboard → 客户端 → 确认设备 Client ID 在列表中, │
│ 状态为 "已连接" │
└────────────────────────────────────────────────────────┘
│
┌──── online? ────┐
│ YES │ NO
▼ ▼
┌─ ② 主题是否匹配? ───┐ ┌─ ③ 检查设备连接 ─────────────┐
│ 确认订阅的主题 │ │ A. 设备是否已配置 MQTT 连接? │
│ 与设备发布的主题 │ │ B. Host/Port 是否正确? │
│ 完全一致 │ │ C. 用户名密码是否正确? │
└────────────────────┘ │ D. 查看设备系统日志 │
│ └──────────────────────────────┘
┌─ YES ─┴── NO ──────────────────────────────────────┐
│ │ 使用通配符 # 订阅所有主题 │
│ │ 或检查设备配置中的 Data Topic 实际值 │
▼ ▼ │
┌─ ④ 数据是否在发送? ──────────────────────────────────┐
│ A. 检查设备的"定期上传"是否已启用(周期是多少) │
│ B. 检查 Tag 列表是否为空 │
│ C. 如果 Tag 值一直不变且未开启"变化上传",不会发送 │
│ D. 在 Dashboard 客户端详情页查看消息统计 │
└──────────────────────────────────────────────────────┘
│
┌─ YES ─┴── NO ──────────────────────────────────────┐
│ │ 确认设备 Tag 点配置正确 │
▼ │ 尝试重启设备或 MQTT 连接 │
┌─ ⑤ 认证/授权检查 ────────────────────────────────────┐
│ A. 确认 EMQX 中已创建认证器,且用户名密码正确 │
│ B. 如果配置了 ACL,检查是否允许该用户订阅该主题 │
│ C. 查看 EMQX 日志: │
│ grep -i 'auth\|denied' /var/log/emqx/emqx.log.* │
└──────────────────────────────────────────────────────┘
10.2 数据格式不一致
| 现象 | 可能原因 | 解决方法 |
|---|---|---|
| JSON 解析失败 | Payload Type 不是 Simple | 检查 Edgelink 配置中的 Payload Type 设置 |
| 缺少某些字段 | 设备版本差异 | 代码中做缺失字段的兼容处理 |
| 中文乱码 | 编码问题 | 确保使用 UTF-8 解码:payload.decode("utf-8") |
| 时间戳格式不同 | 设备 / 固件差异 | 使用 Python dateutil.parser.parse() 兼容解析 |
10.3 消息延迟大
- 检查网络延迟:
ping 服务器公网IP - 检查 Keep Alive 设置是否过小导致频繁重连
- 查看 Dashboard 监控面板的消息积压情况
- 检查服务器 CPU / 内存负载
10.4 消息丢失
- 确认发布和订阅的 QoS 等级(QoS 0 可能丢消息)
- 检查设备是否频繁断线重连(Clean Start 设置可能丢弃旧消息)
- 检查 EMQX 会话消息队列是否已满(默认 1000 条/客户端)
- 使用
mosquitto_sub -d开启调试模式查看协议细节
11. 方法选型总结
| 方法 | 难度 | 适用阶段 | 核心优势 | 局限性 |
|---|---|---|---|---|
| Dashboard WebSocket | ⭐ | 调试 | 零安装,即开即用 | 手动操作,无法自动化 |
| mosquitto_sub | ⭐ | 调试 | 命令行,可脚本化 | 需 SSH 登录服务器 |
| MQTTX 桌面版 | ⭐ | 调试 | 图形化,直观 | 需安装客户端 |
| Python 程序化 | ⭐⭐ | 开发/生产 | 灵活,可集成业务逻辑 | 需编写和维护代码 |
| EMQX 规则引擎 | ⭐⭐⭐ | 生产 | 高性能,免代码 | 仅限 EMQX 支持的数据源 |
推荐路径:先用 Dashboard WebSocket 快速验证 → 用 Python 脚本开发数据采集逻辑 → 生产环境切换到 EMQX 规则引擎实现高可靠数据持久化。
下一步:掌握数据获取后,可以深入学习 MQTT 协议核心概念详解,理解消息传递背后的原理,更好地设计你的 IoT 系统架构。
