Integración de Flask con MQTT (cliente suscriptor)

Cliente MQTT en Python, suscripción a topics, manejo de mensajes en Flask

Módulo 4 ⏱️ 3 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta lección aprenderemos a crear un cliente MQTT suscriptor utilizando Flask y Python para recibir y procesar datos de sensores enviados por un ESP32. Implementaremos una aplicación web que se conectará a un broker Mosquitto MQTT, se suscribirá a topics específicos y mostrará los datos de temperatura y humedad en tiempo real a través de una interfaz web moderna.

Al finalizar esta clase, serás capaz de integrar completamente un sistema IoT que capture datos con ESP32 + DHT11, los transmita via MQTT y los visualice en una aplicación web Flask profesional.

Stack Tecnológico Completo:
  • Hardware: ESP32 + Sensor DHT11
  • Broker MQTT: Mosquitto
  • Backend: Flask + Paho MQTT Client
  • Frontend: HTML + CSS + JavaScript (WebSockets)

Conceptos Fundamentales

Arquitectura del Sistema

Nuestro sistema IoT completo funciona con la siguiente arquitectura:

  • Publicador (ESP32): Lee datos del sensor DHT11 y los publica al broker MQTT
  • Broker MQTT (Mosquitto): Intermediario que recibe y distribuye mensajes
  • Suscriptor (Flask): Cliente Python que se suscribe a topics y procesa mensajes
  • Interfaz Web: Dashboard que muestra datos en tiempo real

Patrón Publicador-Suscriptor en MQTT

MQTT implementa un patrón de comunicación asíncrona donde:

  • Topics: Canales de comunicación organizados jerárquicamente (ej: sensor/temperatura, sensor/humedad)
  • QoS (Quality of Service): Niveles de garantía de entrega (0, 1, 2)
  • Retain: El broker mantiene el último mensaje para nuevos suscriptores
  • Keep Alive: Mecanismo para detectar conexiones perdidas

Integración Flask + MQTT

Para integrar MQTT con Flask necesitamos:

  • Threading: MQTT client debe ejecutarse en hilo separado
  • Callbacks: Funciones que manejan eventos MQTT
  • Data Storage: Variables globales o base de datos para almacenar datos
  • WebSockets: Para actualizar la interfaz en tiempo real
Diagrama de Conexiones DHT11 + ESP32:
  • VCC (DHT11): Pin 3.3V del ESP32
  • GND (DHT11): Pin GND del ESP32
  • DATA (DHT11): Pin GPIO 4 del ESP32
  • Resistor Pull-up: 4.7kΩ entre VCC y DATA (opcional, muchos módulos lo incluyen)

Implementación Práctica

Paso 1: Configuración del ESP32 (Publicador MQTT)

Primero configuramos el ESP32 para leer el sensor DHT11 y publicar los datos via MQTT:

Código ESP32 - Publicador MQTT con DHT11

#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";  // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_user = "mqtt_user";        // Usuario MQTT (opcional)
const char* mqtt_pass = "mqtt_pass";        // Password MQTT (opcional)

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Topics MQTT
const char* temp_topic = "esp32/sensor/temperatura";
const char* hum_topic = "esp32/sensor/humedad";
const char* status_topic = "esp32/status";

// Clientes WiFi y MQTT
WiFiClient espClient;
PubSubClient client(espClient);

// Variables de timing
unsigned long lastMsg = 0;
const unsigned long MSG_INTERVAL = 5000;  // 5 segundos

void setup() {
    Serial.begin(115200);
    
    // Inicializar DHT11
    dht.begin();
    Serial.println("DHT11 inicializado");
    
    // Conectar WiFi
    setup_wifi();
    
    // Configurar MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
    
    Serial.println("Setup completado");
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println("");
    Serial.println("WiFi conectado");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
    // Callback para mensajes MQTT recibidos (si necesario)
    Serial.print("Mensaje recibido [");
    Serial.print(topic);
    Serial.print("] ");
    
    String message = "";
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);
}

void reconnect() {
    while (!client.connected()) {
        Serial.print("Intentando conexión MQTT...");
        
        String clientId = "ESP32Client-";
        clientId += String(random(0xffff), HEX);
        
        if (client.connect(clientId.c_str(), mqtt_user, mqtt_pass)) {
            Serial.println("conectado");
            
            // Publicar mensaje de estado
            client.publish(status_topic, "ESP32 Conectado", true);
            
            // Suscribirse a topics de control (opcional)
            client.subscribe("esp32/control/#");
            
        } else {
            Serial.print("falló, rc=");
            Serial.print(client.state());
            Serial.println(" reintentando en 5 segundos");
            delay(5000);
        }
    }
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > MSG_INTERVAL) {
        lastMsg = now;
        
        // Leer datos del sensor
        float temperature = dht.readTemperature();
        float humidity = dht.readHumidity();
        
        // Verificar si las lecturas son válidas
        if (isnan(temperature) || isnan(humidity)) {
            Serial.println("Error leyendo el sensor DHT11!");
            
            // Publicar error
            client.publish(status_topic, "Error: Sensor DHT11 no responde");
            return;
        }
        
        // Crear JSON con los datos
        StaticJsonDocument<200> doc;
        doc["device_id"] = "ESP32-001";
        doc["timestamp"] = millis();
        doc["temperatura"] = temperature;
        doc["humedad"] = humidity;
        
        String json_string;
        serializeJson(doc, json_string);
        
        // Publicar datos individuales
        client.publish(temp_topic, String(temperature).c_str(), true);
        client.publish(hum_topic, String(humidity).c_str(), true);
        
        // Publicar JSON completo
        client.publish("esp32/sensor/data", json_string.c_str());
        
        // Debug por serial
        Serial.printf("Temperatura: %.2f°C, Humedad: %.2f%%\n", temperature, humidity);
        Serial.println("Datos publicados a MQTT");
    }
}

Paso 2: Configuración de Mosquitto MQTT Broker

Configuramos el broker MQTT Mosquitto en el servidor:

Archivo de configuración: /etc/mosquitto/mosquitto.conf

# Configuración básica de Mosquitto
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/

# Puerto de escucha
port 1883

# Logs
log_dest file /var/log/mosquitto/mosquitto.log
log_type all

# Autenticación (opcional)
allow_anonymous false
password_file /etc/mosquitto/passwd

# ACL (Access Control List) - opcional
acl_file /etc/mosquitto/acl.conf

# WebSockets para clientes web (opcional)
listener 9001
protocol websockets

Comandos de configuración del broker:

# Instalar Mosquitto
sudo apt update
sudo apt install mosquitto mosquitto-clients

# Crear usuario MQTT
sudo mosquitto_passwd -c /etc/mosquitto/passwd mqtt_user

# Reiniciar servicio
sudo systemctl restart mosquitto
sudo systemctl enable mosquitto

# Verificar estado
sudo systemctl status mosquitto

# Test de publicación
mosquitto_pub -h localhost -t "test/topic" -m "Hola MQTT" -u mqtt_user -P mqtt_pass

# Test de suscripción
mosquitto_sub -h localhost -t "test/topic" -u mqtt_user -P mqtt_pass

Paso 3: Cliente Flask MQTT Suscriptor

Implementamos la aplicación Flask que actúa como cliente suscriptor MQTT:

requirements.txt - Dependencias Python:

Flask==2.3.3
paho-mqtt==1.6.1
Flask-SocketIO==5.3.6
eventlet==0.33.3
python-dotenv==1.0.0

app.py - Aplicación Flask principal:

from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
import os
from collections import deque

app = Flask(__name__)
app.config['SECRET_KEY'] = 'tu_clave_secreta_aqui'
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='eventlet')

# Configuración MQTT
MQTT_BROKER = "localhost"  # o la IP de tu broker
MQTT_PORT = 1883
MQTT_USER = "mqtt_user"
MQTT_PASS = "mqtt_pass"

# Topics a los que nos suscribiremos
MQTT_TOPICS = [
    ("esp32/sensor/temperatura", 0),
    ("esp32/sensor/humedad", 0),
    ("esp32/sensor/data", 0),
    ("esp32/status", 0)
]

# Almacenamiento de datos en memoria
sensor_data = {
    'temperatura': None,
    'humedad': None,
    'timestamp': None,
    'status': 'Desconectado'
}

# Historial de datos (últimos 100 registros)
data_history = deque(maxlen=100)

# Cliente MQTT global
mqtt_client = None
mqtt_connected = False

class MQTTClient:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.username_pw_set(MQTT_USER, MQTT_PASS)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect
        self.client.on_log = self.on_log
        
    def on_connect(self, client, userdata, flags, rc):
        global mqtt_connected
        if rc == 0:
            mqtt_connected = True
            print("Conectado al broker MQTT")
            
            # Suscribirse a todos los topics
            for topic, qos in MQTT_TOPICS:
                client.subscribe(topic, qos)
                print(f"Suscrito al topic: {topic}")
                
            # Notificar conexión via WebSocket
            socketio.emit('mqtt_status', {'connected': True}, broadcast=True)
            
        else:
            mqtt_connected = False
            print(f"Error de conexión MQTT: {rc}")
            socketio.emit('mqtt_status', {'connected': False, 'error': rc}, broadcast=True)
    
    def on_message(self, client, userdata, msg):
        global sensor_data, data_history
        
        try:
            topic = msg.topic
            payload = msg.payload.decode('utf-8')
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            
            print(f"Mensaje recibido - Topic: