Publicación (publish) de datos desde ESP32

Implementación de cliente MQTT en ESP32, envío periódico de datos del DHT11

Módulo 3 ⏱️ 2 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta clase aprenderemos a implementar la publicación de datos desde un ESP32 utilizando el protocolo MQTT. Configuraremos un cliente MQTT en el microcontrolador para enviar datos del sensor DHT11 de forma periódica a un broker Mosquitto, estableciendo la base de comunicación para nuestro sistema IoT completo.

Al finalizar esta lección, serás capaz de:

  • Configurar un cliente MQTT en ESP32
  • Establecer conexión con broker Mosquitto
  • Publicar datos del sensor DHT11 en topics MQTT
  • Implementar reconexión automática y manejo de errores
  • Optimizar el consumo energético en el envío de datos

Conceptos Fundamentales

El protocolo MQTT (Message Queuing Telemetry Transport) es un protocolo de comunicación ligero diseñado específicamente para dispositivos IoT con recursos limitados. Su arquitectura publish/subscribe permite una comunicación eficiente entre dispositivos.

Componentes del Sistema MQTT:

  • Publisher (ESP32): Dispositivo que envía datos a topics específicos
  • Broker (Mosquitto): Servidor que gestiona la distribución de mensajes
  • Subscriber (Flask App): Aplicación que recibe y procesa los datos
  • Topics: Canales de comunicación organizados jerárquicamente

Características Clave de MQTT:

  • QoS (Quality of Service): Niveles de garantía de entrega (0, 1, 2)
  • Retain Flag: Mantiene el último mensaje publicado en un topic
  • Last Will Testament: Mensaje automático cuando se pierde conexión
  • Keep Alive: Mecanismo para mantener la conexión activa

Estructura de Topics Recomendada:

Para nuestro proyecto utilizaremos la estructura: iot/esp32/[device_id]/[sensor_type]

  • iot/esp32/device001/temperature
  • iot/esp32/device001/humidity
  • iot/esp32/device001/status

Implementación Práctica

Implementaremos un cliente MQTT completo en ESP32 que lee datos del sensor DHT11 y los publica periódicamente al broker Mosquitto.

Conexiones del Hardware:

DHT11 → ESP32:

  • VCC → 3.3V
  • GND → GND
  • DATA → GPIO 4

Código Principal ESP32 - mqtt_publisher.ino


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";  // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_user = "esp32_user";
const char* mqtt_password = "esp32_pass";
const char* client_id = "ESP32_DHT11_001";

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Topics MQTT
const char* temp_topic = "iot/esp32/device001/temperature";
const char* hum_topic = "iot/esp32/device001/humidity";
const char* status_topic = "iot/esp32/device001/status";

// Objetos principales
WiFiClient espClient;
PubSubClient client(espClient);

// Variables de control
unsigned long lastMsg = 0;
const long interval = 30000; // 30 segundos
int reconnectAttempts = 0;
const int maxReconnectAttempts = 5;

void setup() {
    Serial.begin(115200);
    Serial.println("Iniciando ESP32 MQTT Publisher...");
    
    // Inicializar DHT11
    dht.begin();
    
    // Configurar WiFi
    setup_wifi();
    
    // Configurar MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
    
    Serial.println("Sistema inicializado correctamente");
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a WiFi: ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    int attempts = 0;
    while (WiFi.status() != WL_CONNECTED && attempts < 20) {
        delay(500);
        Serial.print(".");
        attempts++;
    }
    
    if (WiFi.status() == WL_CONNECTED) {
        Serial.println("");
        Serial.println("WiFi conectado exitosamente!");
        Serial.print("Dirección IP: ");
        Serial.println(WiFi.localIP());
        Serial.print("Intensidad señal (RSSI): ");
        Serial.print(WiFi.RSSI());
        Serial.println(" dBm");
    } else {
        Serial.println("Error: No se pudo conectar al WiFi");
        ESP.restart(); // Reiniciar si no hay conexión
    }
}

void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Mensaje recibido en topic: ");
    Serial.print(topic);
    Serial.print(". Mensaje: ");
    
    String message;
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);
}

boolean reconnect() {
    Serial.print("Intentando conexión MQTT...");
    
    // Crear Last Will Testament
    String willMessage = "{\"device_id\":\"" + String(client_id) + 
                        "\",\"status\":\"offline\",\"timestamp\":" + 
                        String(millis()) + "}";
    
    if (client.connect(client_id, mqtt_user, mqtt_password, 
                      status_topic, 1, true, willMessage.c_str())) {
        Serial.println("Conectado al broker MQTT!");
        
        // Publicar mensaje de estado online
        StaticJsonDocument<200> statusDoc;
        statusDoc["device_id"] = client_id;
        statusDoc["status"] = "online";
        statusDoc["ip_address"] = WiFi.localIP().toString();
        statusDoc["rssi"] = WiFi.RSSI();
        statusDoc["timestamp"] = millis();
        
        String statusPayload;
        serializeJson(statusDoc, statusPayload);
        
        client.publish(status_topic, statusPayload.c_str(), true);
        
        reconnectAttempts = 0;
        return true;
    } else {
        Serial.print("Error de conexión MQTT, rc=");
        Serial.print(client.state());
        reconnectAttempts++;
        return false;
    }
}

void publishSensorData() {
    // Leer datos del sensor DHT11
    float humidity = dht.readHumidity();
    float temperature = dht.readTemperature();
    
    // Verificar si las lecturas son válidas
    if (isnan(humidity) || isnan(temperature)) {
        Serial.println("Error: No se pudieron leer datos del sensor DHT11");
        
        // Publicar mensaje de error
        StaticJsonDocument<150> errorDoc;
        errorDoc["device_id"] = client_id;
        errorDoc["error"] = "Sensor DHT11 no responde";
        errorDoc["timestamp"] = millis();
        
        String errorPayload;
        serializeJson(errorDoc, errorPayload);
        client.publish(status_topic, errorPayload.c_str());
        return;
    }
    
    // Crear payload JSON para temperatura
    StaticJsonDocument<200> tempDoc;
    tempDoc["device_id"] = client_id;
    tempDoc["sensor"] = "DHT11";
    tempDoc["temperature"] = round(temperature * 10) / 10.0; // Una decimal
    tempDoc["unit"] = "°C";
    tempDoc["timestamp"] = millis();
    
    String tempPayload;
    serializeJson(tempDoc, tempPayload);
    
    // Crear payload JSON para humedad
    StaticJsonDocument<200> humDoc;
    humDoc["device_id"] = client_id;
    humDoc["sensor"] = "DHT11";
    humDoc["humidity"] = round(humidity * 10) / 10.0; // Una decimal
    humDoc["unit"] = "%";
    humDoc["timestamp"] = millis();
    
    String humPayload;
    serializeJson(humDoc, humPayload);
    
    // Publicar datos con QoS 1 (entrega garantizada)
    boolean tempResult = client.publish(temp_topic, tempPayload.c_str(), false);
    boolean humResult = client.publish(hum_topic, humPayload.c_str(), false);
    
    if (tempResult && humResult) {
        Serial.println("=== DATOS PUBLICADOS ===");
        Serial.printf("Temperatura: %.1f°C\n", temperature);
        Serial.printf("Humedad: %.1f%%\n", humidity);
        Serial.println("Topics:");
        Serial.println("  " + String(temp_topic));
        Serial.println("  " + String(hum_topic));
        Serial.println("========================");
    } else {
        Serial.println("Error: Fallo al publicar datos");
    }
}

void loop() {
    // Verificar conexión WiFi
    if (WiFi.status() != WL_CONNECTED) {
        Serial.println("WiFi desconectado, reintentando...");
        setup_wifi();
    }
    
    // Mantener conexión MQTT
    if (!client.connected()) {
        if (reconnectAttempts < maxReconnectAttempts) {
            if (reconnect()) {
                Serial.println("Reconexión MQTT exitosa");
            } else {
                Serial.println("Fallo en reconexión MQTT, reintentando en 5 segundos...");
                delay(5000);
            }
        } else {
            Serial.println("Máximo de intentos alcanzado, reiniciando ESP32...");
            ESP.restart();
        }
    }
    
    // Mantener bucle MQTT activo
    client.loop();
    
    // Publicar datos cada intervalo definido
    unsigned long now = millis();
    if (now - lastMsg > interval) {
        lastMsg = now;
        publishSensorData();
    }
    
    // Pequeña pausa para no saturar el procesador
    delay(100);
}
            

Script Python para Verificar Publicación - mqtt_subscriber_test.py


import paho.mqtt.client as mqtt
import json
from datetime import datetime
import time

# Configuración MQTT
MQTT_SERVER = "192.168.1.100"
MQTT_PORT = 1883
MQTT_USER = "esp32_user"
MQTT_PASSWORD = "esp32_pass"

# Topics a suscribirse
TOPICS = [
    "iot/esp32/device001/temperature",
    "iot/esp32/device001/humidity", 
    "iot/esp32/device001/status"
]

def on_connect(client, userdata, flags, rc):
    """Callback ejecutado al conectarse al broker"""
    if rc == 0:
        print("✅ Conectado exitosamente al broker MQTT")
        print("🔔 Suscribiéndose a topics...")
        
        for topic in TOPICS:
            client.subscribe(topic, qos=1)
            print(f"   📡 Suscrito a: {topic}")
        
        print("\n⏳ Esperando mensajes del ESP32...")
        print("-" * 60)
    else:
        print(f"❌ Error de conexión. Código: {rc}")

def on_message(client, userdata, msg):
    """Callback ejecutado al recibir un mensaje"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    topic = msg.topic
    
    try:
        # Decodificar payload JSON
        payload = json.loads(msg.payload.decode())
        
        print(f"📨 [{timestamp}] Topic: {topic}")
        
        # Procesar según el tipo de mensaje
        if "temperature" in topic:
            temp = payload.get("temperature", "N/A")
            device = payload.get("device_id", "Unknown")
            print(f"   🌡️  Temperatura: {temp}°C (Dispositivo: {device})")
            
        elif "humidity" in topic:
            hum = payload.get("humidity", "N/A")
            device = payload.get("device_id", "Unknown")
            print(f"   💧 Humedad: {hum}% (Dispositivo: {device})")
            
        elif "status" in topic:
            status = payload.get("status", "Unknown")
            device = payload.get("device_id", "Unknown")
            ip = payload.get("ip_address", "N/A")
            rssi = payload.get("rssi", "N/A")
            print(f"   📊 Estado: {status} (Dispositivo: {device})")
            if ip != "N/A":
                print(f"      🌐 IP: {ip}, Señal: {rssi} dBm")
        
        print("-" * 60)
        
    except json.JSONDecodeError:
        print(f"⚠️  [{timestamp}] Payload no es JSON válido: {msg.payload.decode()}")

def main():
    client = mqtt.Client()
    client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(MQTT_SERVER, MQTT_PORT, 60)
    try:
        client.loop_forever()
    except KeyboardInterrupt:
        print("\n⏹️  Saliendo...")
        client.disconnect()

if __name__ == "__main__":
    main()