MQTT数据获取与调试实战
MQTT数据获取与调试实战

MQTT数据获取与调试实战

MQTT 数据获取与调试实战:五种方法实时接收设备消息

本文完整介绍从 EMQX Broker 获取 MQTT 数据的五种主流方式,涵盖 Dashboard 工具、命令行、桌面客户端、程序化订阅和生产级数据持久化方案。


目录

  • 1. 前置条件
  • 2. 方法一:EMQX Dashboard WebSocket 客户端(最便捷)
  • 3. 方法二:mosquitto_sub 命令行工具(服务器端首选)
  • 4. 方法三:MQTTX 桌面客户端(调试利器)
  • 5. 方法四:Python 程序化订阅(生产级数据采集)
  • 6. 方法五:EMQX 规则引擎持久化(企业级方案)
  • 7. 理解数据格式
  • 8. 数据解析实战
  • 9. 查看设备活动状态
  • 10. 故障排查指南
  • 11. 方法选型总结

1. 前置条件

在开始数据获取之前,请确认以下条件已满足:

检查项验证方法
✅ EMQX 已安装并运行sudo systemctl status emqx → active (running)
✅ 已配置客户端认证Dashboard → 访问控制 → 认证 → 确认认证器已创建
✅ 设备已连接并发布数据Dashboard → 客户端 → 确认设备在线
✅ 已知目标主题设备配置中的 Data Topic(如 data/test

如果你尚未完成 EMQX 部署,请先参考 EMQX 详细安装指南。


2. 方法一:EMQX Dashboard WebSocket 客户端(最便捷)

适用场景

  • 快速验证设备是否在正常发布数据
  • 无需安装任何额外工具
  • 调试阶段的数据查看

操作步骤

  1. 登录 EMQX Dashboard:浏览器访问 http://<服务器公网IP>:18083
  2. 左侧菜单 → 问题诊断 → WebSocket 客户端
  3. 在 WebSocket 客户端页面中:
    • 连接配置
      • Host:默认 127.0.0.1(Dashboard 所在服务器即为 EMQX,无需修改)
      • Port:默认 8083(MQTT over WebSocket 端口)
      • 路径:/mqtt
      • Username / Password:填写已创建的认证用户凭据
    • 点击 连接 按钮
  4. 连接成功后,在 订阅 区域:
    • Topic:输入你的数据主题,如 data/test
    • QoS:选择 1
    • 点击 订阅
  5. 消息将实时显示在下方的消息列表中
┌──────────────────────────────────────────────────┐
│  EMQX Dashboard - WebSocket 客户端                │
│                                                    │
│  连接状态: ✅ 已连接                      [断开]    │
│  ──────────────────────────────────────────────── │
│  订阅列表:                                         │
│  Topic: data/test    QoS: 1              [取消订阅] │
│  ──────────────────────────────────────────────── │
│  消息列表:                                         │
│  ┌─────────────────────────────────────────────┐  │
│  │ 14:30:01 [data/test]                        │  │
│  │ {"d":[{"tag":"Temp","value":25.3,"quality":0}]│  │
│  │ ,"ts":"2026-06-29T06:30:01+0000"}           │  │
│  ├─────────────────────────────────────────────┤  │
│  │ 14:30:11 [data/test]                        │  │
│  │ {"d":[{"tag":"Temp","value":25.5,"quality":0}]│  │
│  │ ,"ts":"2026-06-29T06:30:11+0000"}           │  │
│  └─────────────────────────────────────────────┘  │
│                                                    │
│  发布消息:                        [发布]            │
│  Topic: [              ]  QoS: [1▼]               │
│  Payload: [                              ]         │
└──────────────────────────────────────────────────┘

重要说明

  • 端口 8083 是 MQTT over WebSocket 端口,与 Dashboard 的 18083 不同(18083 是 HTTP 管理后台端口)
  • Dashboard WebSocket 客户端默认连接 127.0.0.1:8083,这意味着它通过服务器本地回环地址连接,无需经过公网
  • 如果你想从本地电脑通过 WebSocket 连接 EMQX(而不通过 Dashboard),需要在安全组中放行 8083 端口,并使用公网 IP

3. 方法二:mosquitto_sub 命令行工具(服务器端首选)

适用场景

  • 在服务器端快速验证
  • 自动化脚本和管道处理
  • 轻量级,无需图形界面

3.1 安装 mosquitto-clients

bash

# CentOS / RHEL
sudo yum install epel-release -y
sudo yum install mosquitto -y

# Ubuntu / Debian
sudo apt update
sudo apt install mosquitto-clients -y

# 验证安装
mosquitto_sub --version

3.2 订阅命令详解

bash

# 基本订阅(需替换为你的实际凭据和主题)
mosquitto_sub \
  -h 127.0.0.1 \        # Broker 地址,服务器端用 127.0.0.1
  -p 1883 \             # MQTT 端口
  -u <用户名> \          # 认证用户名
  -P <密码> \            # 认证密码
  -t "data/test" \       # 订阅主题(支持通配符 data/#)
  -q 1 \                # QoS 等级
  -v                    # 输出格式:主题 + 消息内容

各参数说明:

参数含义示例
-hBroker 地址127.0.0.1(本机)或 123.56.133.131(公网)
-p端口1883(标准 MQTT),8883(TLS)
-u用户名EMQX 认证器中创建的用户
-P密码对应的密码
-t主题data/testdata/#
-qQoS0 / 1 / 2
-v详细输出显示”主题 消息内容”
-iClient ID自定义客户端标识
-d调试模式打印协议交互细节
-c禁用 Clean Session断线重连后接收离线消息

3.3 实用技巧

bash

# 1. 将收到的消息保存到日志文件
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
  -t "data/#" -q 1 -v | tee -a mqtt_data.log

# 2. 带时间戳的订阅(每条消息前加时间)
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
  -t "data/#" -q 1 -v | while read line; do
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] $line"
done

# 3. 过滤特定内容(结合 jq 解析 JSON)
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
  -t "data/test" -q 1 | while read msg; do
  echo "$msg" | jq '.d[0].value'
done

# 4. 同时订阅多个主题
mosquitto_sub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
  -t "data/test" -t "cmd/test" -t "status/#" -q 1 -v

3.4 发布测试消息

bash

# 手动发布一条消息,用于测试订阅端是否能收到
mosquitto_pub -h 127.0.0.1 -p 1883 -u <用户> -P <密码> \
  -t "data/test" \
  -m '{"d":[{"tag":"TestTag","value":42.0,"quality":0}],"ts":"2026-06-29T06:00:00+0000"}' \
  -q 1

4. 方法三:MQTTX 桌面客户端(调试利器)

适用场景

  • 图形化调试,直观查看消息
  • 同时管理多个 Broker 连接
  • 进行主题测试和消息模拟

4.1 下载安装

4.2 创建新连接

  1. 打开 MQTTX,点击左侧 + 新建连接
  2. 填写连接参数:
参数说明
NameMy_Cloud_Broker自定义名称,仅用于识别
Hostmqtt:// + 服务器公网IP示例:mqtt://123.56.133.131
Port1883标准 MQTT;TLS 用 8883
Username你的认证用户名与 EMQX 中创建的一致
Password你的认证密码与 EMQX 中创建的一致
Client ID自动生成或自定义推荐自定义,便于在 Dashboard 中识别
MQTT Version5.0 或 3.1.1根据需求选择
  1. 点击右上角 连接

4.3 订阅主题

连接成功后,在 添加订阅 区域:

  • Topicdata/test(或 data/# 匹配所有数据主题)
  • QoS1
  • 点击 确认

4.4 MQTTX CLI 命令行方式

bash

# 安装(需 Node.js 环境)
npm install -g mqttx

# 订阅
mqttx sub -h 123.56.133.131 -p 1883 \
  -u <用户名> -P <密码> \
  -t "data/test" -q 1

# 发布测试消息
mqttx pub -h 123.56.133.131 -p 1883 \
  -u <用户名> -P <密码> \
  -t "data/test" \
  -m '{"d":[{"tag":"TestTag","value":25.3,"quality":0}]}' \
  -q 1

5. 方法四:Python 程序化订阅(生产级数据采集)

适用场景

  • 需要将 MQTT 数据写入数据库
  • 实时数据处理与分析
  • 触发告警、联动控制等业务逻辑

5.1 安装依赖

bash

pip install paho-mqtt

5.2 基础订阅脚本

python

#!/usr/bin/env python3
"""
MQTT 数据订阅与采集脚本
功能:连接 EMQX Broker,订阅指定主题,实时接收并处理消息
"""

import json
import logging
import signal
import sys
from datetime import datetime

import paho.mqtt.client as mqtt

# ============ 配置区(请替换为实际值) ============
BROKER_HOST = "123.56.133.131"   # 服务器公网 IP
BROKER_PORT = 1883                # MQTT 端口
USERNAME = "your_username"        # EMQX 认证用户名
PASSWORD = "your_password"        # EMQX 认证密码
CLIENT_ID = "python_subscriber_001"
TOPICS = [
    ("data/#", 1),                # 订阅所有数据主题,QoS 1
]
# =================================================

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

# 优雅退出标志
running = True


def signal_handler(sig, frame):
    """处理 Ctrl+C 信号,优雅退出"""
    global running
    logger.info("收到退出信号,正在断开连接...")
    running = False


def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
    """
    连接回调
    reason_code 含义参考:
      0: 连接成功
      4: 用户名或密码错误
      5: 未授权
    """
    if reason_code == 0:
        logger.info(f"成功连接到 Broker: {BROKER_HOST}:{BROKER_PORT}")
        # 订阅主题
        for topic, qos in TOPICS:
            result, mid = client.subscribe(topic, qos)
            if result == mqtt.MQTT_ERR_SUCCESS:
                logger.info(f"已订阅: {topic} (QoS {qos})")
            else:
                logger.error(f"订阅失败: {topic} (错误码: {result})")
    else:
        reason_map = {
            1: "协议版本不支持",
            2: "Client ID 被拒绝",
            3: "服务不可用",
            4: "用户名或密码错误",
            5: "未授权",
        }
        msg = reason_map.get(reason_code, f"未知错误 ({reason_code})")
        logger.error(f"连接失败: {msg}")


def on_disconnect(client: mqtt.Client, userdata, reason_code, properties=None):
    """断开连接回调"""
    if reason_code != 0:
        logger.warning(f"意外断开连接 (原因码: {reason_code}),将自动重连...")
    else:
        logger.info("已正常断开连接")


def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
    """接收消息回调"""
    try:
        payload_str = msg.payload.decode("utf-8")
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]

        # 尝试解析 JSON
        try:
            data = json.loads(payload_str)
            logger.info(f"[{timestamp}] Topic: {msg.topic} | QoS: {msg.qos}")
            logger.info(f"  Data: {json.dumps(data, ensure_ascii=False, indent=2)}")

            # ===== 在此处添加你的业务逻辑 =====
            process_message(msg.topic, data)
            # =================================

        except json.JSONDecodeError:
            logger.info(f"[{timestamp}] Topic: {msg.topic} | Payload (非JSON): {payload_str}")

    except Exception as e:
        logger.error(f"消息处理异常: {e}", exc_info=True)


def process_message(topic: str, data: dict):
    """
    自定义消息处理逻辑
    你可以在这里实现:写入数据库、触发告警、转发到其他系统等
    """
    # 示例:提取传感器数值
    if "d" in data:
        for item in data["d"]:
            tag_name = item.get("tag", "unknown")
            value = item.get("value")
            quality = item.get("quality", -1)
            logger.info(f"  → 传感器: {tag_name} = {value} (质量: {quality})")


def main():
    """主函数"""
    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    # 创建 MQTT 客户端
    client = mqtt.Client(
        client_id=CLIENT_ID,
        protocol=mqtt.MQTTv5,       # 使用 MQTT 5.0
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )

    # 设置回调函数
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_message = on_message

    # 设置认证凭据
    client.username_pw_set(USERNAME, PASSWORD)

    # 可选:设置遗嘱消息
    client.will_set(
        topic=f"status/{CLIENT_ID}",
        payload="offline",
        qos=1,
        retain=True,
    )

    # 启用自动重连
    client.reconnect_delay_set(min_delay=1, max_delay=30)

    try:
        logger.info(f"正在连接 {BROKER_HOST}:{BROKER_PORT}...")
        client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

        # 启动网络循环(非阻塞)
        client.loop_start()

        # 主循环保持运行
        while running:
            client.loop(timeout=1.0)

    except ConnectionRefusedError:
        logger.error(
            "连接被拒绝!请检查:\n"
            "  1. Broker 地址和端口是否正确\n"
            "  2. 安全组/防火墙是否已放行端口\n"
            "  3. 用户名和密码是否正确"
        )
        sys.exit(1)
    except Exception as e:
        logger.error(f"连接异常: {e}", exc_info=True)
        sys.exit(1)
    finally:
        client.loop_stop()
        client.disconnect()
        logger.info("脚本已退出")


if __name__ == "__main__":
    main()

5.3 扩展:写入数据库示例

python

# 在上述 process_message 函数中,可以添加以下逻辑:

import sqlite3  # 或使用 pymysql / psycopg2 / influxdb_client 等

DB_PATH = "mqtt_data.db"

def init_database():
    """初始化 SQLite 数据库"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS sensor_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            topic TEXT NOT NULL,
            tag_name TEXT NOT NULL,
            value REAL,
            quality INTEGER,
            device_ts TEXT,
            received_at TEXT DEFAULT (datetime('now'))
        )
    """)
    conn.commit()
    return conn


def save_to_database(conn, topic, tag_name, value, quality, device_ts):
    """保存数据到数据库"""
    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO sensor_data (topic, tag_name, value, quality, device_ts) VALUES (?, ?, ?, ?, ?)",
        (topic, tag_name, value, quality, device_ts),
    )
    conn.commit()

6. 方法五:EMQX 规则引擎持久化(企业级方案)

适用场景

  • 生产环境,需要可靠的数据持久化
  • 数据量大,需要写入时序数据库(InfluxDB、TimescaleDB)
  • 需要将数据桥接到 Kafka、RabbitMQ、MySQL 等外部系统

6.1 配置步骤(以 MySQL 为例)

  1. Dashboard → 数据集成 → 规则
  2. 点击 创建,填写规则 SQL:

sql

SELECT
  payload.d as data_items,
  payload.ts as device_timestamp,
  topic,
  clientid,
  timestamp
FROM
  "data/#"
  1. 点击 创建动作,选择 MySQL 作为数据桥接目标
  2. 填写 MySQL 连接信息(地址、端口、数据库、用户名、密码)
  3. 配置 SQL 模板(写入语句):

sql

INSERT INTO mqtt_data(topic, tag_name, value, quality, device_ts, client_id)
VALUES (
  ${topic},
  ${data_items[0].tag},
  ${data_items[0].value},
  ${data_items[0].quality},
  ${device_timestamp},
  ${clientid}
)
  1. 点击 创建 完成规则配置

规则引擎的优势在于:数据从设备 → Broker → 数据库的整条链路在 EMQX 内部完成,无需额外编写代码,可靠性高,性能好。


7. 理解数据格式

7.1 SimpleMQTT 默认格式

EdgeLink 的 SimpleMQTT 模块默认使用 Simple Payload Type,JSON 格式如下:

json

{
    "d": [
        {
            "tag": "Tag点名称1",
            "value": 25.3,
            "quality": 0
        },
        {
            "tag": "Tag点名称2",
            "value": 68.5,
            "quality": 0
        }
    ],
    "ts": "2026-06-29T06:30:01+0000"
}

7.2 字段详解

字段类型说明
dArray数据点数组,包含一个或多个 Tag 的数据
d[].tagStringTag 点名称,在 Edgelink Studio 中定义
d[].valueNumber当前数值(可能为整数或浮点数)
d[].qualityInteger数据质量标志:0 = 正常,非 0 = 异常(具体含义取决于设备)
tsStringISO 8601 时间戳(UTC 时区)

7.3 其他 Payload Type 格式

Payload Type格式特点示例
Simple标准 JSON,字段清晰{"d":[...], "ts":"..."}
Simple with quality与 Simple 类似,quality 字段更详细{"d":[...], "ts":"..."}
Compact减少字段名,节省带宽{"v":[25.3], "t":"..."}(取决于设备实现)

8. 数据解析实战

8.1 时间戳转换

数据中的 ts 字段是 UTC 时间,需转换为本地时区:

python

from datetime import datetime, timezone, timedelta

# 原始 UTC 时间戳
ts_str = "2026-06-29T06:30:01+0000"

# 解析为 datetime 对象
utc_time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")

# 转换为北京时间(UTC+8)
beijing_tz = timezone(timedelta(hours=8))
beijing_time = utc_time.astimezone(beijing_tz)

print(f"UTC: {utc_time}")
print(f"北京: {beijing_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 输出:北京: 2026-06-29 14:30:01

8.2 异常数据过滤

python

def filter_valid_data(data: dict) -> list:
    """
    过滤异常数据,仅返回 quality == 0 的"好"数据
    """
    if "d" not in data:
        return []
    return [item for item in data["d"] if item.get("quality") == 0]


def detect_value_anomaly(value: float, min_val: float, max_val: float) -> bool:
    """
    检测数值是否超出合理范围
    示例:温度传感器合理范围 -40°C ~ 85°C
    """
    return value < min_val or value > max_val

9. 查看设备活动状态

除了订阅消息,你还可以在 EMQX Dashboard 中直接查看设备的连接和活动信息:

9.1 查看在线客户端

Dashboard → 客户端 → 可看到所有当前连接的客户端列表,包括:

  • Client ID
  • 用户名
  • IP 地址
  • 连接时间
  • Keep Alive 间隔

9.2 查看客户端详情

点击某个 Client ID,进入详情页:

标签页可查看内容
基本信息协议版本、连接状态、IP 地址、心跳间隔
订阅列表该客户端当前订阅的所有主题及 QoS
消息统计发送/接收消息数量、字节数
会话会话创建时间、过期时间、消息队列长度

9.3 实时监控面板

Dashboard → 监控 提供了全局概览:

  • 连接数实时曲线
  • 消息流入/流出速率(条/秒)
  • 主题数和订阅数

10. 故障排查指南

10.1 收不到任何数据

按以下顺序逐项排查:

┌─ ① 设备是否在线? ─────────────────────────────────────┐
│  Dashboard → 客户端 → 确认设备 Client ID 在列表中,     │
│  状态为 "已连接"                                        │
└────────────────────────────────────────────────────────┘
                            │
                   ┌──── online? ────┐
                   │ YES             │ NO
                   ▼                 ▼
┌─ ② 主题是否匹配? ───┐  ┌─ ③ 检查设备连接 ─────────────┐
│ 确认订阅的主题     │  │ A. 设备是否已配置 MQTT 连接? │
│ 与设备发布的主题   │  │ B. Host/Port 是否正确?      │
│ 完全一致           │  │ C. 用户名密码是否正确?      │
└────────────────────┘  │ D. 查看设备系统日志         │
          │             └──────────────────────────────┘
  ┌─ YES ─┴── NO ──────────────────────────────────────┐
  │          │  使用通配符 # 订阅所有主题               │
  │          │  或检查设备配置中的 Data Topic 实际值     │
  ▼          ▼                                         │
┌─ ④ 数据是否在发送? ──────────────────────────────────┐
│ A. 检查设备的"定期上传"是否已启用(周期是多少)      │
│ B. 检查 Tag 列表是否为空                              │
│ C. 如果 Tag 值一直不变且未开启"变化上传",不会发送    │
│ D. 在 Dashboard 客户端详情页查看消息统计              │
└──────────────────────────────────────────────────────┘
          │
  ┌─ YES ─┴── NO ──────────────────────────────────────┐
  │          │  确认设备 Tag 点配置正确                 │
  ▼          │  尝试重启设备或 MQTT 连接                │
┌─ ⑤ 认证/授权检查 ────────────────────────────────────┐
│ A. 确认 EMQX 中已创建认证器,且用户名密码正确        │
│ B. 如果配置了 ACL,检查是否允许该用户订阅该主题      │
│ C. 查看 EMQX 日志:                                  │
│    grep -i 'auth\|denied' /var/log/emqx/emqx.log.*   │
└──────────────────────────────────────────────────────┘

10.2 数据格式不一致

现象可能原因解决方法
JSON 解析失败Payload Type 不是 Simple检查 Edgelink 配置中的 Payload Type 设置
缺少某些字段设备版本差异代码中做缺失字段的兼容处理
中文乱码编码问题确保使用 UTF-8 解码:payload.decode("utf-8")
时间戳格式不同设备 / 固件差异使用 Python dateutil.parser.parse() 兼容解析

10.3 消息延迟大

  1. 检查网络延迟:ping 服务器公网IP
  2. 检查 Keep Alive 设置是否过小导致频繁重连
  3. 查看 Dashboard 监控面板的消息积压情况
  4. 检查服务器 CPU / 内存负载

10.4 消息丢失

  1. 确认发布和订阅的 QoS 等级(QoS 0 可能丢消息)
  2. 检查设备是否频繁断线重连(Clean Start 设置可能丢弃旧消息)
  3. 检查 EMQX 会话消息队列是否已满(默认 1000 条/客户端)
  4. 使用 mosquitto_sub -d 开启调试模式查看协议细节

11. 方法选型总结

方法难度适用阶段核心优势局限性
Dashboard WebSocket调试零安装,即开即用手动操作,无法自动化
mosquitto_sub调试命令行,可脚本化需 SSH 登录服务器
MQTTX 桌面版调试图形化,直观需安装客户端
Python 程序化⭐⭐开发/生产灵活,可集成业务逻辑需编写和维护代码
EMQX 规则引擎⭐⭐⭐生产高性能,免代码仅限 EMQX 支持的数据源

推荐路径:先用 Dashboard WebSocket 快速验证 → 用 Python 脚本开发数据采集逻辑 → 生产环境切换到 EMQX 规则引擎实现高可靠数据持久化。


下一步:掌握数据获取后,可以深入学习 MQTT 协议核心概念详解,理解消息传递背后的原理,更好地设计你的 IoT 系统架构。