Uso de Flask + WebSockets para actualizar valores en vivo

WebSockets con Flask-SocketIO, comunicación bidireccional, actualizaciones en tiempo real

Módulo 5 ⏱️ 3 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En el mundo del IoT, la capacidad de mostrar datos en tiempo real es fundamental para crear interfaces de usuario efectivas. Los WebSockets nos permiten establecer una comunicación bidireccional entre el servidor Flask y el navegador web, eliminando la necesidad de actualizar constantemente la página para obtener nuevos datos.

En esta lección, integraremos Flask-SocketIO con nuestro stack ESP32 + DHT11 + Mosquitto MQTT para crear un dashboard que actualice automáticamente los valores de temperatura y humedad en tiempo real. Aprenderemos a manejar eventos, emitir datos desde el servidor y crear una interfaz web dinámica que responda instantáneamente a los cambios de los sensores.

Objetivo de la clase: Crear un sistema completo de monitoreo en tiempo real que permita visualizar instantáneamente los datos del sensor DHT11 conectado al ESP32, utilizando WebSockets para la comunicación bidireccional entre el servidor Flask y múltiples clientes web.

Conceptos Fundamentales

¿Qué son los WebSockets?

Los WebSockets son un protocolo de comunicación que permite establecer una conexión persistente y bidireccional entre el cliente (navegador) y el servidor. A diferencia de HTTP tradicional, los WebSockets mantienen la conexión abierta, permitiendo que tanto el cliente como el servidor envíen datos en cualquier momento.

Flask-SocketIO: Ventajas Clave

  • Comunicación en Tiempo Real: Actualizaciones instantáneas sin polling
  • Bidireccionalidad: El servidor puede enviar datos al cliente sin solicitud previa
  • Eficiencia: Menor overhead de red comparado con HTTP polling
  • Múltiples Clientes: Soporte nativo para broadcast a múltiples conexiones
  • Fallback Automático: Si WebSockets no está disponible, utiliza long-polling

Arquitectura del Sistema Completo

Nuestro sistema integrará los siguientes componentes:

  • ESP32 + DHT11: Captura datos cada 5 segundos
  • Mosquitto MQTT: Broker de mensajes para desacoplar componentes
  • Flask + SocketIO: Servidor web con capacidades WebSocket
  • Cliente Web: Dashboard HTML/JavaScript con actualizaciones en vivo

Eventos y Namespaces en SocketIO

SocketIO organiza la comunicación mediante eventos personalizados y puede usar namespaces para separar diferentes tipos de comunicación:

  • Eventos del Sistema: connect, disconnect
  • Eventos Personalizados: sensor_data, alert, status_update
  • Namespaces: /sensors, /alerts para organizar diferentes flujos de datos

Implementación Práctica

Paso 1: Configuración del ESP32 con DHT11

Primero, configuraremos el ESP32 para enviar datos del DHT11 vía MQTT cada 5 segundos:

Código ESP32 - Sensor DHT11 + MQTT:


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_topic = "sensors/dht11";
const char* device_id = "ESP32_DHT11_01";

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Clientes WiFi y MQTT
WiFiClient espClient;
PubSubClient client(espClient);

// Variables de control
unsigned long lastMsg = 0;
const unsigned long MSG_INTERVAL = 5000; // 5 segundos
int msgCount = 0;

void setup() {
    Serial.begin(115200);
    
    // Inicializar DHT11
    dht.begin();
    Serial.println("DHT11 inicializado");
    
    // Conectar WiFi
    setup_wifi();
    
    // Configurar MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println("");
    Serial.println("WiFi conectado!");
    Serial.print("Dirección IP: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* message, unsigned int length) {
    Serial.print("Mensaje recibido en tópico: ");
    Serial.print(topic);
    Serial.print(". Mensaje: ");
    String messageTemp;
    
    for (int i = 0; i < length; i++) {
        Serial.print((char)message[i]);
        messageTemp += (char)message[i];
    }
    Serial.println();
}

void reconnect() {
    while (!client.connected()) {
        Serial.print("Intentando conexión MQTT...");
        
        if (client.connect(device_id)) {
            Serial.println("¡Conectado!");
            // Suscribirse a tópicos de control si es necesario
            client.subscribe("sensors/control");
        } else {
            Serial.print("Error, rc=");
            Serial.print(client.state());
            Serial.println(" Intentando de nuevo en 5 segundos");
            delay(5000);
        }
    }
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > MSG_INTERVAL) {
        lastMsg = now;
        
        // Leer datos del DHT11
        float temperature = dht.readTemperature();
        float humidity = dht.readHumidity();
        
        // Verificar si las lecturas son válidas
        if (isnan(temperature) || isnan(humidity)) {
            Serial.println("Error leyendo el sensor DHT11!");
            return;
        }
        
        // Crear JSON con los datos
        StaticJsonDocument<200> doc;
        doc["device_id"] = device_id;
        doc["timestamp"] = now;
        doc["message_count"] = ++msgCount;
        doc["temperature"] = round(temperature * 10) / 10.0; // 1 decimal
        doc["humidity"] = round(humidity * 10) / 10.0;       // 1 decimal
        doc["wifi_rssi"] = WiFi.RSSI();
        
        // Convertir a string y publicar
        char buffer[256];
        serializeJson(doc, buffer);
        
        Serial.print("Publicando: ");
        Serial.println(buffer);
        
        client.publish(mqtt_topic, buffer);
    }
}
            

Paso 2: Servidor Flask con SocketIO

Ahora crearemos el servidor Flask que se conecta al broker MQTT y usa WebSockets para enviar datos a los clientes:

app.py - Servidor Flask con SocketIO:


from flask import Flask, render_template, request
from flask_socketio import SocketIO, emit, join_room, leave_room
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
import logging

# Configuración de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuración Flask
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret_key_for_socketio'

# Configuración SocketIO
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')

# Configuración MQTT
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensors/dht11"

# Variables globales para almacenar datos
sensor_data = {
    'temperature': 0,
    'humidity': 0,
    'last_update': None,
    'device_status': 'disconnected',
    'message_count': 0
}

connected_clients = 0

# Cliente MQTT
mqtt_client = mqtt.Client()

def on_mqtt_connect(client, userdata, flags, rc):
    """Callback cuando se conecta al broker MQTT"""
    if rc == 0:
        logger.info("Conectado al broker MQTT")
        client.subscribe(MQTT_TOPIC)
        sensor_data['device_status'] = 'connected'
        # Notificar a todos los clientes WebSocket
        socketio.emit('status_update', {
            'status': 'mqtt_connected',
            'message': 'Conectado al broker MQTT'
        })
    else:
        logger.error(f"Error conectando al MQTT broker: {rc}")

def on_mqtt_message(client, userdata, msg):
    """Callback cuando se recibe un mensaje MQTT"""
    try:
        # Decodificar el mensaje JSON
        payload = json.loads(msg.payload.decode())
        
        # Actualizar datos globales
        sensor_data.update({
            'temperature': payload.get('temperature', 0),
            'humidity': payload.get('humidity', 0),
            'last_update': datetime.now().strftime('%H:%M:%S'),
            'device_status': 'active',
            'message_count': payload.get('message_count', 0),
            'wifi_rssi': payload.get('wifi_rssi', 0),
            'device_id': payload.get('device_id', 'Unknown')
        })
        
        logger.info(f"Datos recibidos: T={payload.get('temperature')}°C, H={payload.get('humidity')}%")
        
        # Emitir datos a todos los clientes WebSocket conectados
        socketio.emit('sensor_data', sensor_data)
        
        # Verificar alertas
        check_alerts(sensor_data)
        
    except Exception as e:
        logger.error(f"Error procesando mensaje MQTT: {e}")

def check_alerts(data):
    """Verificar condiciones de alerta y enviar notificaciones"""
    alerts = []
    
    if data['temperature'] > 30:
        alerts.append({
            'type': 'warning',
            'message': f'Temperatura alta: {data["temperature"]}°C',
            'timestamp': data['last_update']
        })
    
    if data['temperature'] < 10:
        alerts.append({
            'type': 'info',
            'message': f'Temperatura baja: {data["temperature"]}°C',
            'timestamp': data['last_update']
        })
    
    if data['humidity'] > 80:
        alerts.append({
            'type': 'warning',
            'message': f'Humedad alta: {data["humidity"]}%',
            'timestamp': data['last_update']
        })
    
    # Emitir alertas si existen
    for alert in alerts:
        socketio.emit('alert', alert)

def setup_mqtt():
    """Configurar y conectar cliente MQTT"""
    mqtt_client.on_connect = on_mqtt_connect
    mqtt_client.on_message = on_mqtt_message
    
    try:
        mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
        mqtt_client.loop_start()
        logger.info("Cliente MQTT iniciado")
    except Exception as e:
        logger.error(f"Error conectando MQTT: {e}")

# Rutas Flask
@app.route('/')
def index():
    """Página principal con dashboard"""
    return render_template('dashboard.html')

@app.route('/api/current-data')
def get_current_data():
    """API REST para obtener datos actuales"""
    return sensor_data

# Eventos SocketIO
@socketio.on('connect')
def handle_connect():
    """Cuando un cliente se conecta vía WebSocket"""
    global connected_clients
    connected_clients += 1
    
    logger.info(f'Cliente conectado. Total: {connected_clients}')
    
    # Enviar datos actuales al cliente recién conectado
    emit('sensor_data', sensor_data)
    emit('status_update', {
        'status': 'connected',
        'message': f'Conectado al dashboard. Clientes activos: {connected_clients}',
        'clients_count': connected_clients
    })

@socketio.on('disconnect')
def handle_disconnect():
    """Cuando un cliente se desconecta"""
    global connected_clients
    connected_clients = max(0, connected_clients - 1)
    
    logger.info(f'Cliente desconectado. Total: {connected_clients}')

@socketio.on('request_data')
def handle_data_request():
    """Cuando el cliente solicita datos actualizados"""
    emit('sensor_data', sensor_data)

@socketio.on('join_room')
def handle_join_room(data):
    """Permitir que clientes se unan a rooms específicos"""
    room = data.get('room', 'general')
    join_room(room)
    emit('status_update', {
        'status': 'joined_room',
        'message': f'Unido a la sala: {room}'
    })

@socketio.on('leave_room')
def handle_leave_room(data):
    """Permitir que clientes salgan de rooms"""
    room = data.get('room', 'general')
    leave_room(room)
    emit('status_update', {
        'status': 'left_room',
        'message': f'Salido de la sala: {room}'
    })