Introducción
En esta lección desarrollaremos una aplicación Flask completa que se suscribirá a datos MQTT enviados por nuestro ESP32 con sensor DHT11. Implementaremos persistencia de datos usando SQLAlchemy, crearemos APIs REST para consultar información histórica y construiremos un dashboard en tiempo real para visualizar los datos de temperatura y humedad.
Al final de esta clase, tendrán una aplicación backend robusta capaz de recibir, procesar, almacenar y servir datos de sensores IoT de manera profesional.
ESP32 + DHT11 → Mosquitto MQTT Broker → Flask App → Base de Datos SQLite → APIs REST + Dashboard Web
Conceptos Fundamentales
1. Flask como Cliente MQTT
Flask puede actuar como cliente MQTT usando la biblioteca paho-mqtt
, permitiendo suscribirse a tópicos y procesar mensajes en tiempo real. Implementaremos un patrón de threading para mantener la conexión MQTT activa mientras Flask maneja requests HTTP.
2. Persistencia de Datos con SQLAlchemy
Utilizaremos SQLAlchemy ORM para definir modelos de datos y gestionar operaciones CRUD de manera eficiente. Esto nos permite abstraer la lógica de base de datos y facilitar el mantenimiento del código.
3. Arquitectura de la Aplicación
- Capa de Comunicación MQTT: Cliente que recibe datos del ESP32
- Capa de Datos: Modelos SQLAlchemy para persistencia
- Capa de API: Endpoints REST para consultar datos
- Capa de Presentación: Dashboard web con visualización en tiempo real
4. Patrones de Diseño Implementados
- Observer Pattern: Para manejar eventos MQTT
- Repository Pattern: Para abstraer operaciones de base de datos
- Factory Pattern: Para configuración de la aplicación Flask
Implementación Práctica
Paso 1: Configuración del Entorno
Primero, instalamos las dependencias necesarias para nuestra aplicación Flask:
requirements.txt
Flask==2.3.3
Flask-SQLAlchemy==3.0.5
paho-mqtt==1.6.1
python-dotenv==1.0.0
flask-cors==4.0.0
requests==2.31.0
Paso 2: Estructura del Proyecto
flask_iot_app/
├── app.py
├── config.py
├── models.py
├── mqtt_client.py
├── requirements.txt
├── templates/
│ └── dashboard.html
└── static/
├── css/
└── js/
Paso 3: Configuración de la Base de Datos
models.py - Definición del Modelo de Datos
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import json
db = SQLAlchemy()
class SensorData(db.Model):
"""Modelo para almacenar datos de sensores"""
__tablename__ = 'sensor_data'
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.String(50), nullable=False, index=True)
temperature = db.Column(db.Float, nullable=False)
humidity = db.Column(db.Float, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow, index=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<SensorData {self.device_id}: T={self.temperature}°C, H={self.humidity}%>'
def to_dict(self):
"""Convierte el objeto a diccionario para JSON"""
return {
'id': self.id,
'device_id': self.device_id,
'temperature': self.temperature,
'humidity': self.humidity,
'timestamp': self.timestamp.isoformat(),
'created_at': self.created_at.isoformat()
}
@staticmethod
def get_latest_by_device(device_id, limit=10):
"""Obtiene los últimos registros de un dispositivo"""
return SensorData.query.filter_by(device_id=device_id)\
.order_by(SensorData.timestamp.desc())\
.limit(limit).all()
@staticmethod
def get_data_range(device_id, start_date, end_date):
"""Obtiene datos en un rango de fechas"""
return SensorData.query.filter(
SensorData.device_id == device_id,
SensorData.timestamp >= start_date,
SensorData.timestamp <= end_date
).order_by(SensorData.timestamp.asc()).all()
class DeviceStatus(db.Model):
"""Modelo para estado de dispositivos"""
__tablename__ = 'device_status'
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.String(50), unique=True, nullable=False)
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
is_online = db.Column(db.Boolean, default=True)
battery_level = db.Column(db.Integer, nullable=True)
def to_dict(self):
return {
'device_id': self.device_id,
'last_seen': self.last_seen.isoformat(),
'is_online': self.is_online,
'battery_level': self.battery_level
}
Paso 4: Cliente MQTT Integrado
mqtt_client.py - Cliente MQTT con Threading
import paho.mqtt.client as mqtt
import json
import threading
import logging
from datetime import datetime
from models import db, SensorData, DeviceStatus
class MQTTClient:
def __init__(self, app=None, broker_host="localhost", broker_port=1883):
self.app = app
self.broker_host = broker_host
self.broker_port = broker_port
self.client = None
self.connected = False
self.logger = logging.getLogger(__name__)
if app:
self.init_app(app)
def init_app(self, app):
"""Inicializa el cliente MQTT con la aplicación Flask"""
self.app = app
self.broker_host = app.config.get('MQTT_BROKER_HOST', 'localhost')
self.broker_port = app.config.get('MQTT_BROKER_PORT', 1883)
# Configurar cliente MQTT
self.client = mqtt.Client(client_id="flask_iot_server", clean_session=True)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
# Iniciar conexión en thread separado
self.start_mqtt_thread()
def start_mqtt_thread(self):
"""Inicia el cliente MQTT en un thread separado"""
def mqtt_thread():
try:
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_forever()
except Exception as e:
self.logger.error(f"Error conectando a MQTT: {e}")
thread = threading.Thread(target=mqtt_thread, daemon=True)
thread.start()
self.logger.info("Thread MQTT iniciado")
def on_connect(self, client, userdata, flags, rc):
"""Callback cuando se conecta al broker"""
if rc == 0:
self.connected = True
self.logger.info("Conectado al broker MQTT")
# Suscribirse a tópicos
topics = [
("sensor/dht11/+/temperature", 0),
("sensor/dht11/+/humidity", 0),
("sensor/dht11/+/data", 0),
("device/+/status", 0)
]
for topic, qos in topics:
client.subscribe(topic, qos)
self.logger.info(f"Suscrito a: {topic}")
else:
self.logger.error(f"Error conectando al broker. Código: {rc}")
def on_disconnect(self, client, userdata, rc):
"""Callback cuando se desconecta del broker"""
self.connected = False
self.logger.warning("Desconectado del broker MQTT")
def on_message(self, client, userdata, msg):
"""Procesa mensajes MQTT recibidos"""
try:
topic = msg.topic
payload = msg.payload.decode('utf-8')
self.logger.info(f"Mensaje recibido - Topic: {topic}, Payload: {payload}")
# Procesar según el tipo de tópico
if "data" in topic:
self.process_sensor_data(topic, payload)
elif "status" in topic:
self.process_device_status(topic, payload)
elif "temperature" in topic or "humidity" in topic:
self.process_individual_sensor(topic, payload)
except Exception as e:
self.logger.error(f"Error procesando mensaje MQTT: {e}")
def process_sensor_data(self, topic, payload):
"""Procesa datos completos del sensor"""
try:
# Extraer device_id del topic: sensor/dht11/ESP32_001/data
device_id = topic.split('/')[2]
# Parsear JSON
data = json.loads(payload)
with self.app.app_context():
# Crear registro en la base de datos
sensor_data = SensorData(
device_id=device_id,
temperature=float(data.get('temperature', 0)),
humidity=float(data.get('humidity', 0)),
timestamp=datetime.utcnow()
)
db.session.add(sensor_data)
# Actualizar estado del dispositivo
device_status = DeviceStatus.query.filter_by(device_id=device_id).first()
if not device_status:
device_status = DeviceStatus(device_id=device_id)
db.session.add(device_status)
device_status.last_seen = datetime.utcnow()
device_status.is_online = True
if 'battery' in data:
device_status.battery_level = int(data['battery'])
db.session.commit()
self.logger.info(f"Datos guardados para {device_id}: T={data.get('temperature')}°C, H={data.get('humidity')}%")
except Exception as e:
self.logger.error(f"Error procesando datos del sensor: {e}")
def process_device_status(self, topic, payload):
"""Procesa estado del dispositivo"""
try:
device_id = topic.split('/')[1]
status_data = json.loads(payload)
with self.app.app_context():
device_status = DeviceStatus.query.filter_by(device_id=device_id).first()
if not device_status:
device_status = DeviceStatus(device_id=device_id)
db.session.add(device_status)
device_status.last_seen = datetime.utcnow()
device_status.is_online = status_data.get('online', True)
if 'battery' in status_data:
device_status.battery_level = status_data['battery']
db.session.commit()
except Exception as e:
self.logger.error(f"Error procesando estado del dispositivo: {e}")
def process_individual_sensor(self, topic, payload):
"""Procesa lecturas individuales de temperatura/humedad"""
# Implementar si se necesita compatibilidad con mensajes individuales
pass
def publish(self, topic, payload, qos=0):
"""Publica un mensaje MQTT"""
if self.connected and self.client:
result = self.client.publish(topic, payload, qos)
return result.rc == mqtt.MQTT_ERR_SUCCESS
return False
Paso 5: Aplicación Flask Principal
app.py - Aplicación Flask con APIs REST
from flask import Flask, render_template, jsonify, request
from flask_cors import CORS
from datetime import datetime, timedelta
import logging
import os
from models import db, SensorData, DeviceStatus
from mqtt_client import MQTTClient
def create_app():
"""Factory para crear la aplicación Flask"""
app = Flask(__name__)
# Configuración
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///iot_sensors.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['MQTT_