Envío de datos del sensor mediante MQTT (Mosquitto)

Configuración cliente MQTT en ESP32, publicación de datos de sensores

Módulo 2 ⏱️ 2 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta lección aprenderemos a implementar la comunicación MQTT entre el ESP32 y un broker Mosquitto para transmitir datos del sensor DHT11. MQTT (Message Queuing Telemetry Transport) es un protocolo de comunicación ligero y eficiente, ideal para aplicaciones IoT debido a su bajo consumo de ancho de banda y capacidad de manejar conexiones intermitentes.

Al finalizar esta clase, serás capaz de configurar un cliente MQTT en el ESP32, publicar datos de sensores en tiempo real y procesar estos datos usando Flask con Python, creando así un sistema completo de monitoreo IoT.

Conceptos Fundamentales

Para implementar exitosamente la comunicación MQTT en nuestro sistema, es esencial comprender los siguientes conceptos:

Protocolo MQTT

  • Broker: Servidor central que recibe y distribuye mensajes entre clientes (Mosquitto en nuestro caso)
  • Cliente: Dispositivos que se conectan al broker para publicar o suscribirse a tópicos (ESP32, aplicación Flask)
  • Tópicos: Canales de comunicación organizados jerárquicamente (ej: "sensor/temperatura", "sensor/humedad")
  • QoS (Quality of Service): Niveles de garantía de entrega de mensajes (0, 1, 2)
  • Retain: Mantener el último mensaje publicado para nuevos suscriptores

Arquitectura del Sistema

  • ESP32: Cliente MQTT que publica datos del DHT11
  • Broker Mosquitto: Intermediario que gestiona la comunicación
  • Flask App: Cliente MQTT suscriptor que procesa y visualiza datos
  • Red WiFi: Infraestructura de comunicación local

Ventajas de MQTT para IoT

  • Protocolo ligero con overhead mínimo
  • Soporte para conexiones intermitentes
  • Escalabilidad para múltiples dispositivos
  • Diferentes niveles de QoS según necesidades
  • Patrón publish-subscribe desacoplado

Implementación Práctica

Implementaremos el sistema completo comenzando con la configuración del ESP32 como cliente MQTT, seguido de la aplicación Flask como suscriptor.

Conexiones Hardware DHT11 - ESP32:
  • VCC del DHT11 → Pin 3.3V del ESP32
  • GND del DHT11 → Pin GND del ESP32
  • DATA del DHT11 → Pin GPIO 4 del ESP32
  • Resistor pull-up 10kΩ entre VCC y DATA del DHT11

Código ESP32 - Cliente MQTT Publisher


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "Tu_Red_WiFi";
const char* password = "Tu_Password";

// Configuración MQTT Broker
const char* mqtt_server = "192.168.1.100";  // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_user = "iot_user";         // Usuario MQTT (opcional)
const char* mqtt_password = "iot_pass";     // Password MQTT (opcional)

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Configuración tópicos MQTT
const char* topic_temperatura = "sensor/temperatura";
const char* topic_humedad = "sensor/humedad";
const char* topic_datos = "sensor/datos";  // Tópico combinado JSON

// Objetos para WiFi y MQTT
WiFiClient espClient;
PubSubClient client(espClient);

// Variables de control
unsigned long lastMsg = 0;
const unsigned long MSG_INTERVAL = 5000;  // Publicar cada 5 segundos

void setup() {
    Serial.begin(115200);
    
    // Inicializar DHT11
    dht.begin();
    Serial.println("DHT11 inicializado");
    
    // Conectar a WiFi
    setup_wifi();
    
    // Configurar cliente MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
    
    Serial.println("Sistema inicializado correctamente");
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a WiFi: ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println();
    Serial.println("WiFi conectado exitosamente");
    Serial.print("Dirección IP: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
    // Función callback para mensajes MQTT recibidos
    Serial.print("Mensaje recibido en tópico: ");
    Serial.print(topic);
    Serial.print(". Mensaje: ");
    
    String message = "";
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);
}

void reconnect() {
    // Bucle hasta reconectar MQTT
    while (!client.connected()) {
        Serial.print("Intentando conexión MQTT...");
        
        // Crear un ID de cliente único
        String clientId = "ESP32Client-";
        clientId += String(random(0xffff), HEX);
        
        // Intentar conectar
        if (client.connect(clientId.c_str(), mqtt_user, mqtt_password)) {
            Serial.println("MQTT conectado");
            
            // Publicar mensaje de conexión
            client.publish("sensor/estado", "ESP32 conectado");
            
            // Suscribirse a tópicos de control (opcional)
            client.subscribe("sensor/control");
            
        } else {
            Serial.print("Error de conexión MQTT, rc=");
            Serial.print(client.state());
            Serial.println(" reintentando en 5 segundos");
            delay(5000);
        }
    }
}

void publishSensorData() {
    // Leer datos del DHT11
    float humidity = dht.readHumidity();
    float temperature = dht.readTemperature();
    
    // Verificar si las lecturas son válidas
    if (isnan(humidity) || isnan(temperature)) {
        Serial.println("Error al leer datos del DHT11");
        return;
    }
    
    // Publicar datos individuales
    char tempString[8];
    char humString[8];
    dtostrf(temperature, 6, 2, tempString);
    dtostrf(humidity, 6, 2, humString);
    
    client.publish(topic_temperatura, tempString, true);  // retain = true
    client.publish(topic_humedad, humString, true);
    
    // Crear JSON con todos los datos
    StaticJsonDocument<200> jsonDoc;
    jsonDoc["temperatura"] = temperature;
    jsonDoc["humedad"] = humidity;
    jsonDoc["timestamp"] = millis();
    jsonDoc["dispositivo"] = "ESP32_DHT11";
    
    char jsonBuffer[256];
    serializeJson(jsonDoc, jsonBuffer);
    
    client.publish(topic_datos, jsonBuffer, true);
    
    // Log en Serial
    Serial.print("Datos publicados - Temperatura: ");
    Serial.print(temperature);
    Serial.print("°C, Humedad: ");
    Serial.print(humidity);
    Serial.println("%");
    Serial.print("JSON: ");
    Serial.println(jsonBuffer);
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > MSG_INTERVAL) {
        lastMsg = now;
        publishSensorData();
    }
    
    // Pequeña pausa para evitar watchdog reset
    delay(100);
}
            

Aplicación Flask - Cliente MQTT Subscriber


from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime

app = Flask(__name__)
app.config['SECRET_KEY'] = 'tu_clave_secreta_aqui'
socketio = SocketIO(app, cors_allowed_origins="*")

# Configuración MQTT
MQTT_BROKER = "192.168.1.100"  # IP del broker Mosquitto
MQTT_PORT = 1883
MQTT_USER = "iot_user"         # Usuario MQTT (opcional)
MQTT_PASSWORD = "iot_pass"     # Password MQTT (opcional)
MQTT_KEEPALIVE = 60

# Tópicos MQTT
TOPICS = [
    "sensor/temperatura",
    "sensor/humedad", 
    "sensor/datos",
    "sensor/estado"
]

# Variables globales para almacenar datos
sensor_data = {
    'temperatura': None,
    'humedad': None,
    'timestamp': None,
    'dispositivo': 'Desconectado'
}

# Historial de datos (últimas 50 lecturas)
data_history = []
MAX_HISTORY = 50

class MQTTClient:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect
        self.connected = False
        
    def on_connect(self, client, userdata, flags, rc):
        """Callback de conexión MQTT"""
        if rc == 0:
            print(f"Conectado al broker MQTT con código: {rc}")
            self.connected = True
            
            # Suscribirse a todos los tópicos
            for topic in TOPICS:
                client.subscribe(topic)
                print(f"Suscrito al tópico: {topic}")
                
        else:
            print(f"Error de conexión MQTT: {rc}")
            self.connected = False
    
    def on_message(self, client, userdata, msg):
        """Callback para mensajes MQTT recibidos"""
        topic = msg.topic
        payload = msg.payload.decode('utf-8')
        
        print(f"Mensaje recibido - Tópico: {topic}, Payload: {payload}")
        
        try:
            if topic == "sensor/datos":
                # Procesar JSON completo
                data = json.loads(payload)
                self.update_sensor_data(data)
                
            elif topic == "sensor/temperatura":
                sensor_data['temperatura'] = float(payload)
                sensor_data['timestamp'] = datetime.now().isoformat()
                
            elif topic == "sensor/humedad":
                sensor_data['humedad'] = float(payload)
                sensor_data['timestamp'] = datetime.now().isoformat()
                
            elif topic == "sensor/estado":
                sensor_data['dispositivo'] = payload
            
            # Enviar datos actualizados a clientes web via WebSocket
            socketio.emit('sensor_update', sensor_data)
            
        except Exception as e:
            print(f"Error procesando mensaje MQTT: {e}")
    
    def on_disconnect(self, client, userdata, rc):
        """Callback de desconexión MQTT"""
        print(f"Desconectado del broker MQTT: {rc}")
        self.connected = False
    
    def update_sensor_data(self, data):
        """Actualizar datos del sensor con timestamp"""
        global sensor_data, data_history
        
        sensor_data.update({
            'temperatura': data.get('temperatura'),
            'humedad': data.get('humedad'),
            'timestamp': datetime.now().isoformat(),
            'dispositivo': data.get('dispositivo', 'ESP32')
        })
        
        # Agregar al historial
        history_entry = sensor_data.copy()
        data_history.append(history_entry)
        
        # Mantener solo las últimas MAX_HISTORY lecturas
        if len(data_history) > MAX_HISTORY:
            data_history.pop(0)
    
    def connect(self):
        """Conectar al broker MQTT"""
        try:
            self.client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE)
            self.client.loop_start()
            return True
        except Exception as e:
            print(f"Error conectando a MQTT: {e}")
            return False
    
    def disconnect(self):
        """Desconectar del broker MQTT"""
        self.client.loop_stop()
        self.client.disconnect()

# Instancia global del cliente MQTT
mqtt_client = MQTTClient()

@app.route('/')
def index():
    """Página principal del dashboard"""
    return render_template('dashboard.html')

@app.route('/api/sensor-data')
def get_sensor_data():
    """API endpoint para obtener datos actuales del sensor"""
    return jsonify(sensor_data)

@app.route('/api/history')
def get_history():
    """API endpoint para obtener historial de datos"""
    return jsonify(data_history)

@app.route('/api/status')
def get_status():
    """API endpoint para obtener estado de conexión"""
    return jsonify({
        'mqtt_connected': mqtt_client.connected,
        'data_count':