Suscripción (subscribe) y monitoreo de mensajes en consola

Herramientas de línea de comandos, monitoreo en tiempo real, debugging

Módulo 3 ⏱️ 2 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta lección aprenderás a suscribirse y monitorear mensajes MQTT en tiempo real utilizando herramientas de línea de comandos y técnicas de debugging. Exploraremos cómo implementar un sistema de monitoreo completo que permita supervisar el flujo de datos entre el ESP32 con sensor DHT11 y el broker Mosquitto MQTT, integrándolo con una aplicación Flask para visualización web.

Las herramientas de monitoreo y suscripción son fundamentales para el debugging, análisis de rendimiento y mantenimiento de sistemas IoT, permitiendo detectar problemas de conectividad, pérdida de datos y comportamientos anómalos en tiempo real.

Conceptos Fundamentales

La suscripción y monitoreo de mensajes MQTT involucra varios componentes y conceptos clave que debemos dominar para implementar un sistema robusto de supervisión:

Suscripción MQTT (Subscribe)

  • Patrón Publisher-Subscriber: Modelo de comunicación donde los clientes se suscriben a tópicos específicos para recibir mensajes
  • Wildcard Patterns: Uso de '+' (single-level) y '#' (multi-level) para suscribirse a múltiples tópicos
  • QoS Levels: Quality of Service (0, 1, 2) que garantiza diferentes niveles de entrega de mensajes
  • Retained Messages: Mensajes que el broker mantiene para entregar inmediatamente a nuevos suscriptores

Herramientas de Monitoreo

  • mosquitto_sub: Cliente de línea de comandos para suscripción y monitoreo
  • MQTT Explorer: Herramienta gráfica para visualizar la estructura de tópicos
  • Logging y Debugging: Técnicas para capturar y analizar el flujo de mensajes
  • Timestamp y Metadata: Información adicional para análisis temporal

Arquitectura de Monitoreo

  • Cliente Suscriptor: Componente que recibe y procesa mensajes MQTT
  • Buffer de Mensajes: Almacenamiento temporal para procesamiento asíncrono
  • Filtros y Parsers: Procesamiento y clasificación de mensajes recibidos
  • Dashboard Web: Interfaz de usuario para visualización en tiempo real

Implementación Práctica

Implementaremos un sistema completo de suscripción y monitoreo que incluye el ESP32 publicando datos, herramientas de línea de comandos para monitoreo, y una aplicación Flask para visualización web.

Código ESP32 - Publisher de Datos DHT11


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_user = "iot_user";
const char* mqtt_password = "iot_pass";

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Configuración MQTT Topics
const char* topic_temperature = "sensors/dht11/temperature";
const char* topic_humidity = "sensors/dht11/humidity";
const char* topic_status = "sensors/dht11/status";
const char* topic_data = "sensors/dht11/data";

WiFiClient espClient;
PubSubClient client(espClient);

unsigned long lastMsg = 0;
int msgCount = 0;

void setup() {
    Serial.begin(115200);
    dht.begin();
    
    // Conectar WiFi
    setup_wifi();
    
    // Configurar MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
    
    // Suscribirse a tópicos de control
    reconnect();
    
    // Publicar mensaje de inicio
    publishStatus("online", "DHT11 sensor initialized");
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println("");
    Serial.println("WiFi conectado");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Mensaje recibido [");
    Serial.print(topic);
    Serial.print("] ");
    
    String message;
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);
    
    // Procesar comandos de control
    if (String(topic) == "sensors/dht11/control") {
        if (message == "reset") {
            msgCount = 0;
            publishStatus("reset", "Message counter reset");
        } else if (message == "status") {
            publishStatus("active", "Sensor functioning normally");
        }
    }
}

void reconnect() {
    while (!client.connected()) {
        Serial.print("Intentando conexión MQTT...");
        
        String clientId = "ESP32Client-";
        clientId += String(random(0xffff), HEX);
        
        if (client.connect(clientId.c_str(), mqtt_user, mqtt_password)) {
            Serial.println("conectado");
            
            // Suscribirse a tópicos de control
            client.subscribe("sensors/dht11/control");
            
            // Publicar mensaje de conexión
            publishStatus("connected", "MQTT connection established");
        } else {
            Serial.print("falló, rc=");
            Serial.print(client.state());
            Serial.println(" intentando de nuevo en 5 segundos");
            delay(5000);
        }
    }
}

void publishStatus(const char* status, const char* message) {
    StaticJsonDocument<200> doc;
    doc["device_id"] = "ESP32_DHT11_001";
    doc["timestamp"] = millis();
    doc["status"] = status;
    doc["message"] = message;
    doc["ip"] = WiFi.localIP().toString();
    
    char jsonBuffer[256];
    serializeJson(doc, jsonBuffer);
    
    client.publish(topic_status, jsonBuffer, true); // Retained message
    
    Serial.print("Status publicado: ");
    Serial.println(jsonBuffer);
}

void publishSensorData() {
    float humidity = dht.readHumidity();
    float temperature = dht.readTemperature();
    
    if (isnan(humidity) || isnan(temperature)) {
        Serial.println("Error leyendo DHT11!");
        publishStatus("error", "Failed to read from DHT11 sensor");
        return;
    }
    
    // Publicar datos individuales
    client.publish(topic_temperature, String(temperature).c_str());
    client.publish(topic_humidity, String(humidity).c_str());
    
    // Publicar datos combinados en JSON
    StaticJsonDocument<300> doc;
    doc["device_id"] = "ESP32_DHT11_001";
    doc["timestamp"] = millis();
    doc["message_id"] = ++msgCount;
    doc["temperature"] = temperature;
    doc["humidity"] = humidity;
    doc["heat_index"] = dht.computeHeatIndex(temperature, humidity, false);
    doc["wifi_rssi"] = WiFi.RSSI();
    doc["free_heap"] = ESP.getFreeHeap();
    
    char jsonBuffer[400];
    serializeJson(doc, jsonBuffer);
    
    client.publish(topic_data, jsonBuffer);
    
    Serial.print("Datos publicados: ");
    Serial.println(jsonBuffer);
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > 5000) { // Publicar cada 5 segundos
        lastMsg = now;
        publishSensorData();
    }
    
    delay(100);
}
            

Herramientas de Línea de Comandos - Monitoreo Básico


# Suscribirse a todos los tópicos del sensor DHT11
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/#" -v

# Monitorear solo datos de temperatura con timestamp
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/temperature" -v --pretty

# Suscripción con formato personalizado y logging
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/data" -F "@Y-@m-@d @H:@M:@S [%t] %p" > sensor_log.txt

# Monitoreo de todos los tópicos con wildcards
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/+/+" -v

# Debugging de conexiones con verbose
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/status" -v -d

# Monitoreo con filtro JSON usando jq
mosquitto_sub -h localhost -p 1883 -u iot_user -P iot_pass -t "sensors/dht11/data" | jq '.temperature'
            

Script Python - Monitor MQTT Avanzado


#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import json
import datetime
import logging
import argparse
import signal
import sys
from collections import defaultdict, deque
import statistics

class MQTTMonitor:
    def __init__(self, host="localhost", port=1883, username=None, password=None):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        
        # Estadísticas
        self.message_count = defaultdict(int)
        self.last_messages = defaultdict(lambda: deque(maxlen=10))
        self.start_time = datetime.datetime.now()
        
        # Configurar logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('mqtt_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        # Configurar cliente MQTT
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        
        if username and password:
            self.client.username_pw_set(username, password)
        
        # Manejar señales para cierre limpio
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    
    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.logger.info(f"Conectado al broker MQTT {self.host}:{self.port}")
            # Suscribirse a todos los tópicos de sensores
            client.subscribe("sensors/#")
            client.subscribe("$SYS/#")  # Tópicos del sistema
            self.logger.info("Suscrito a tópicos: sensors/# y $SYS/#")
        else:
            self.logger.error(f"Error de conexión: {rc}")
    
    def on_disconnect(self, client, userdata, rc):
        if rc != 0:
            self.logger.warning("Desconexión inesperada del broker MQTT")
        else:
            self.logger.info("Desconectado del broker MQTT")
    
    def on_message(self, client, userdata, msg):
        topic = msg.topic
        payload = msg.payload.decode('utf-8')
        timestamp = datetime.datetime.now()
        
        # Actualizar estadísticas
        self.message_count[topic] += 1
        self.last_messages[topic].append({
            'timestamp': timestamp,
            'payload': payload,
            'size': len(payload)
        })
        
        # Logging básico
        self.logger.info(f"[{topic}] {payload}")
        
        # Procesamiento especial para datos de sensores
        if topic.startswith("sensors/"):
            self.process_sensor_message(topic, payload, timestamp)
        
        # Procesamiento de mensajes del sistema
        elif topic.startswith("$SYS/"):
            self.process_system_message(topic, payload, timestamp)
    
    def process_sensor_message(self, topic, payload, timestamp):
        """Procesar mensajes específicos de sensores"""
        try:
            # Intentar parsear JSON
            if payload.startswith('{'):
                data = json.loads(payload)
                
                if 'temperature' in data and 'humidity' in data:
                    # Análisis de datos DHT11
                    temp = data['temperature']
                    hum = data['humidity']
                    
                    # Alertas de valores anómalos
                    if temp > 40 or temp < -10:
                        self.logger.warning(f"Temperatura anómala: {temp}°C")
                    if hum < 10 or hum > 90:
                        self.logger.warning(f"Humedad anómala: {hum}%")
            else:
                # Mensaje no JSON: loggear tamaño y muestra parcial
                preview = payload[:120] + ('…' if len(payload) > 120 else '')
                self.logger.info(f"Payload no JSON ({len(payload)} bytes): {preview}")
        except Exception as e:
            self.logger.error(f"Error procesando mensaje de sensor: {e}")

        # Estadísticas cada 30 mensajes
        total_msgs = sum(self.message_count.values())
        if total_msgs % 30 == 0:
            self.print_stats()

    def process_system_message(self, topic, payload, timestamp):
        # Ejemplo simple: loggear cambios de clientes conectados
        if topic.endswith("/clients/connected"):
            self.logger.info(f"Clientes conectados: {payload}")

    def print_stats(self):
        self.logger.info("=== Estadísticas MQTT ===")
        for topic, count in list(self.message_count.items())[:10]:
            self.logger.info(f"{topic}: {count} mensajes")
        self.logger.info("=========================")

    def signal_handler(self, signum, frame):
        self.logger.info("Señal recibida, cerrando monitor...")
        try:
            self.client.loop_stop()
            self.client.disconnect()
        finally:
            sys.exit(0)

def main():
    parser = argparse.ArgumentParser(description='Monitor MQTT avanzado')
    parser.add_argument('--host', default='localhost')
    parser.add_argument('--port', type=int, default=1883)
    parser.add_argument('--user')
    parser.add_argument('--password')
    args = parser.parse_args([])  # ajustar si se ejecuta externamente

    monitor = MQTTMonitor(host=args.host, port=args.port, username=args.user, password=args.password)
    monitor.client.connect(monitor.host, monitor.port, 60)
    monitor.client.loop_forever()

if __name__ == '__main__':
    main()