Introducción
En esta lección aprenderás a suscribirse y monitorear mensajes MQTT en tiempo real utilizando herramientas de línea de comandos y técnicas de debugging. Exploraremos cómo implementar un sistema de monitoreo completo que permita supervisar el flujo de datos entre el ESP32 con sensor DHT11 y el broker Mosquitto MQTT, integrándolo con una aplicación Flask para visualización web.
Las herramientas de monitoreo y suscripción son fundamentales para el debugging, análisis de rendimiento y mantenimiento de sistemas IoT, permitiendo detectar problemas de conectividad, pérdida de datos y comportamientos anómalos en tiempo real.
Conceptos Fundamentales
La suscripción y monitoreo de mensajes MQTT involucra varios componentes y conceptos clave que debemos dominar para implementar un sistema robusto de supervisión:
Suscripción MQTT (Subscribe)
- Patrón Publisher-Subscriber: Modelo de comunicación donde los clientes se suscriben a tópicos específicos para recibir mensajes
- Wildcard Patterns: Uso de '+' (single-level) y '#' (multi-level) para suscribirse a múltiples tópicos
- QoS Levels: Quality of Service (0, 1, 2) que garantiza diferentes niveles de entrega de mensajes
- Retained Messages: Mensajes que el broker mantiene para entregar inmediatamente a nuevos suscriptores
Herramientas de Monitoreo
- mosquitto_sub: Cliente de línea de comandos para suscripción y monitoreo
- MQTT Explorer: Herramienta gráfica para visualizar la estructura de tópicos
- Logging y Debugging: Técnicas para capturar y analizar el flujo de mensajes
- Timestamp y Metadata: Información adicional para análisis temporal
Arquitectura de Monitoreo
- Cliente Suscriptor: Componente que recibe y procesa mensajes MQTT
- Buffer de Mensajes: Almacenamiento temporal para procesamiento asíncrono
- Filtros y Parsers: Procesamiento y clasificación de mensajes recibidos
- Dashboard Web: Interfaz de usuario para visualización en tiempo real
Implementación Práctica
Implementaremos un sistema completo de suscripción y monitoreo que incluye el ESP32 publicando datos, herramientas de línea de comandos para monitoreo, y una aplicación Flask para visualización web.
Código ESP32 - Publisher de Datos DHT11
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";
// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_user = "iot_user";
const char* mqtt_password = "iot_pass";
// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);
// Configuración MQTT Topics
const char* topic_temperature = "sensors/dht11/temperature";
const char* topic_humidity = "sensors/dht11/humidity";
const char* topic_status = "sensors/dht11/status";
const char* topic_data = "sensors/dht11/data";
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
int msgCount = 0;
void setup() {
Serial.begin(115200);
dht.begin();
// Conectar WiFi
setup_wifi();
// Configurar MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
// Suscribirse a tópicos de control
reconnect();
// Publicar mensaje de inicio
publishStatus("online", "DHT11 sensor initialized");
}
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando a ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi conectado");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Mensaje recibido [");
Serial.print(topic);
Serial.print("] ");
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
// Procesar comandos de control
if (String(topic) == "sensors/dht11/control") {
if (message == "reset") {
msgCount = 0;
publishStatus("reset", "Message counter reset");
} else if (message == "status") {
publishStatus("active", "Sensor functioning normally");
}
}
}
void reconnect() {
while (!client.connected()) {
Serial.print("Intentando conexión MQTT...");
String clientId = "ESP32Client-";
clientId += String(random(0xffff), HEX);
if (client.connect(clientId.c_str(), mqtt_user, mqtt_password)) {
Serial.println("conectado");
// Suscribirse a tópicos de control
client.subscribe("sensors/dht11/control");
// Publicar mensaje de conexión
publishStatus("connected", "MQTT connection established");
} else {
Serial.print("falló, rc=");
Serial.print(client.state());
Serial.println(" intentando de nuevo en 5 segundos");
delay(5000);
}
}
}
void publishStatus(const char* status, const char* message) {
StaticJsonDocument<200> doc;
doc["device_id"] = "ESP32_DHT11_001";
doc["timestamp"] = millis();
doc["status"] = status;
doc["message"] = message;
doc["ip"] = WiFi.localIP().toString();
char jsonBuffer[256];
serializeJson(doc, jsonBuffer);
client.publish(topic_status, jsonBuffer, true); // Retained message
Serial.print("Status publicado: ");
Serial.println(jsonBuffer);
}
void publishSensorData() {
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error leyendo DHT11!");
publishStatus("error", "Failed to read from DHT11 sensor");
return;
}
// Publicar datos individuales
client.publish(topic_temperature, String(temperature).c_str());
client.publish(topic_humidity, String(humidity).c_str());
// Publicar datos combinados en JSON
StaticJsonDocument<300> doc;
doc["device_id"] = "ESP32_DHT11_001";
doc["timestamp"] = millis();
doc["message_id"] = ++msgCount;
doc["temperature"] = temperature;
doc["humidity"] = humidity;
doc["heat_index"] = dht.computeHeatIndex(temperature, humidity, false);
doc["wifi_rssi"] = WiFi.RSSI();
doc["free_heap"] = ESP.getFreeHeap();
char jsonBuffer[400];
serializeJson(doc, jsonBuffer);
client.publish(topic_data, jsonBuffer);
Serial.print("Datos publicados: ");
Serial.println(jsonBuffer);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > 5000) { // Publicar cada 5 segundos
lastMsg = now;
publishSensorData();
}
delay(100);
}
Herramientas de Línea de Comandos - Monitoreo Básico
# Suscribirse a todos los tópicos del sensor DHT11
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/#" -v
# Monitorear solo datos de temperatura con timestamp
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/temperature" -v --pretty
# Suscripción con formato personalizado y logging
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/data" -F "@Y-@m-@d @H:@M:@S [%t] %p" > sensor_log.txt
# Monitoreo de todos los tópicos con wildcards
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/+/+" -v
# Debugging de conexiones con verbose
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/status" -v -d
# Monitoreo con filtro JSON usando jq
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/data" | jq '.temperature'
Script Python - Monitor MQTT Avanzado
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import json
import datetime
import logging
import argparse
import signal
import sys
from collections import defaultdict, deque
import statistics
class MQTTMonitor:
def __init__(self, host="localhost", port=1883, username=None, password=None):
self.host = host
self.port = port
self.username = username
self.password = password
# Estadísticas
self.message_count = defaultdict(int)
self.last_messages = defaultdict(lambda: deque(maxlen=10))
self.start_time = datetime.datetime.now()
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('mqtt_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# Configurar cliente MQTT
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
if username and password:
self.client.username_pw_set(username, password)
# Manejar señales para cierre limpio
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.logger.info(f"Conectado al broker MQTT {self.host}:{self.port}")
# Suscribirse a todos los tópicos de sensores
client.subscribe("sensors/#")
client.subscribe("$SYS/#") # Tópicos del sistema
self.logger.info("Suscrito a tópicos: sensors/# y $SYS/#")
else:
self.logger.error(f"Error de conexión: {rc}")
def on_disconnect(self, client, userdata, rc):
if rc != 0:
self.logger.warning("Desconexión inesperada del broker MQTT")
else:
self.logger.info("Desconectado del broker MQTT")
def on_message(self, client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode('utf-8')
timestamp = datetime.datetime.now()
# Actualizar estadísticas
self.message_count[topic] += 1
self.last_messages[topic].append({
'timestamp': timestamp,
'payload': payload,
'size': len(payload)
})
# Logging básico
self.logger.info(f"[{topic}] {payload}")
# Procesamiento especial para datos de sensores
if topic.startswith("sensors/"):
self.process_sensor_message(topic, payload, timestamp)
# Procesamiento de mensajes del sistema
elif topic.startswith("$SYS/"):
self.process_system_message(topic, payload, timestamp)
def process_sensor_message(self, topic, payload, timestamp):
"""Procesar mensajes específicos de sensores"""
try:
# Intentar parsear JSON
if payload.startswith('{'):
data = json.loads(payload)
if 'temperature' in data and 'humidity' in data:
# Análisis de datos DHT11
temp = data['temperature']
hum = data['humidity']
# Alertas de valores anómalos
if temp > 40 or temp < -10:
self.logger.warning(f"Temperatura anómala: {temp}°C")
if hum < 10 or hum > 90:
self.logger.warning(f"Humedad anómala: {hum}%")
else:
# Mensaje no JSON: loggear tamaño y muestra parcial
preview = payload[:120] + ('…' if len(payload) > 120 else '')
self.logger.info(f"Payload no JSON ({len(payload)} bytes): {preview}")
except Exception as e:
self.logger.error(f"Error procesando mensaje de sensor: {e}")
# Estadísticas cada 30 mensajes
total_msgs = sum(self.message_count.values())
if total_msgs % 30 == 0:
self.print_stats()
def process_system_message(self, topic, payload, timestamp):
# Ejemplo simple: loggear cambios de clientes conectados
if topic.endswith("/clients/connected"):
self.logger.info(f"Clientes conectados: {payload}")
def print_stats(self):
self.logger.info("=== Estadísticas MQTT ===")
for topic, count in list(self.message_count.items())[:10]:
self.logger.info(f"{topic}: {count} mensajes")
self.logger.info("=========================")
def signal_handler(self, signum, frame):
self.logger.info("Señal recibida, cerrando monitor...")
try:
self.client.loop_stop()
self.client.disconnect()
finally:
sys.exit(0)
def main():
parser = argparse.ArgumentParser(description='Monitor MQTT avanzado')
parser.add_argument('--host', default='localhost')
parser.add_argument('--port', type=int, default=1883)
parser.add_argument('--user')
parser.add_argument('--password')
args = parser.parse_args([]) # ajustar si se ejecuta externamente
monitor = MQTTMonitor(host=args.host, port=args.port, username=args.user, password=args.password)
monitor.client.connect(monitor.host, monitor.port, 60)
monitor.client.loop_forever()
if __name__ == '__main__':
main()