Introducción
En esta lección aprenderemos a configurar un sistema IoT completo utilizando ESP32 con sensor DHT11 para publicar datos de temperatura y humedad a un broker Mosquitto MQTT, y crear una aplicación Flask para visualizar estos datos en tiempo real. Este stack tecnológico es fundamental en aplicaciones industriales y sistemas de monitoreo distribuido.
Al finalizar esta clase, serás capaz de implementar un sistema completo de telemetría que puede escalarse a múltiples sensores y aplicaciones comerciales.
Conceptos Fundamentales
Arquitectura del Sistema
Nuestro sistema sigue una arquitectura distribuida basada en el patrón Publisher/Subscriber utilizando MQTT como protocolo de comunicación:
- ESP32: Microcontrolador con WiFi integrado que actúa como cliente MQTT publisher
- DHT11: Sensor digital de temperatura y humedad con protocolo de comunicación propietario
- Mosquitto: Broker MQTT ligero que gestiona las comunicaciones entre clientes
- Flask: Framework web que actúa como cliente MQTT subscriber y servidor HTTP
Protocolo MQTT
MQTT (Message Queuing Telemetry Transport) es un protocolo de mensajería ligero diseñado para dispositivos IoT:
- QoS Levels: 0 (At most once), 1 (At least once), 2 (Exactly once)
- Topics: Jerarquía de canales para organizar mensajes
- Retain Messages: Mensajes que el broker mantiene para nuevos suscriptores
- Keep Alive: Mecanismo para detectar clientes desconectados
Conexiones Hardware DHT11-ESP32
Esquema de conexión física:
- VCC (DHT11): → 3.3V (ESP32)
- DATA (DHT11): → GPIO 4 (ESP32) + Resistor pull-up 10kΩ a 3.3V
- GND (DHT11): → GND (ESP32)
Implementación Práctica
1. Configuración del Broker Mosquitto
Primero configuramos el broker MQTT en nuestro sistema. Crear archivo de configuración:
mosquitto.conf
# Configuración básica de Mosquitto
port 1883
allow_anonymous true
listener 1883
protocol mqtt
# Configuración de logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
# Configuración de persistencia
persistence true
persistence_location /var/lib/mosquitto/
# Configuración de retención
retained_persistence true
2. Código ESP32 para Lectura DHT11 y Publicación MQTT
main.cpp - Cliente ESP32 MQTT Publisher
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD_WIFI";
// Configuración MQTT
const char* mqtt_server = "192.168.1.100"; // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_client_id = "ESP32_DHT11_Client";
const char* temperature_topic = "sensors/dht11/temperature";
const char* humidity_topic = "sensors/dht11/humidity";
const char* status_topic = "sensors/dht11/status";
// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);
// Clientes WiFi y MQTT
WiFiClient espClient;
PubSubClient client(espClient);
// Variables de control
unsigned long lastMsg = 0;
const long interval = 30000; // Publicar cada 30 segundos
int reconnectAttempts = 0;
const int maxReconnectAttempts = 5;
void setup() {
Serial.begin(115200);
Serial.println("Iniciando ESP32 DHT11 MQTT Publisher...");
// Inicializar DHT11
dht.begin();
// Conectar a WiFi
setup_wifi();
// Configurar cliente MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
// Mensaje de inicio
Serial.println("Sistema iniciado correctamente");
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > interval) {
lastMsg = now;
readAndPublishSensorData();
}
delay(100); // Pequeño delay para estabilidad
}
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando a WiFi: ");
Serial.println(ssid);
WiFi.begin(ssid, password);
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 20) {
delay(1000);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("");
Serial.println("WiFi conectado exitosamente");
Serial.print("Dirección IP: ");
Serial.println(WiFi.localIP());
Serial.print("Intensidad de señal (RSSI): ");
Serial.print(WiFi.RSSI());
Serial.println(" dBm");
} else {
Serial.println("Error: No se pudo conectar a WiFi");
ESP.restart();
}
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Mensaje recibido [");
Serial.print(topic);
Serial.print("]: ");
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
}
void reconnect() {
while (!client.connected() && reconnectAttempts < maxReconnectAttempts) {
Serial.print("Intentando conexión MQTT...");
if (client.connect(mqtt_client_id)) {
Serial.println(" conectado");
reconnectAttempts = 0;
// Publicar mensaje de estado
publishStatus("online");
// Suscribirse a topics de control si es necesario
client.subscribe("sensors/dht11/control");
} else {
Serial.print(" falló, rc=");
Serial.print(client.state());
Serial.println(" reintentando en 5 segundos");
reconnectAttempts++;
delay(5000);
}
}
if (reconnectAttempts >= maxReconnectAttempts) {
Serial.println("Máximo número de reintentos MQTT alcanzado. Reiniciando ESP32...");
ESP.restart();
}
}
void readAndPublishSensorData() {
// Leer datos del sensor DHT11
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
// Verificar si las lecturas son válidas
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error: Fallo en la lectura del sensor DHT11");
publishStatus("sensor_error");
return;
}
// Crear payload JSON para temperatura
StaticJsonDocument<200> tempDoc;
tempDoc["sensor_id"] = "DHT11_001";
tempDoc["timestamp"] = millis();
tempDoc["value"] = temperature;
tempDoc["unit"] = "°C";
tempDoc["location"] = "Laboratory_A";
char tempBuffer[256];
serializeJson(tempDoc, tempBuffer);
// Crear payload JSON para humedad
StaticJsonDocument<200> humDoc;
humDoc["sensor_id"] = "DHT11_001";
humDoc["timestamp"] = millis();
humDoc["value"] = humidity;
humDoc["unit"] = "%";
humDoc["location"] = "Laboratory_A";
char humBuffer[256];
serializeJson(humDoc, humBuffer);
// Publicar datos
bool tempPublished = client.publish(temperature_topic, tempBuffer, true);
bool humPublished = client.publish(humidity_topic, humBuffer, true);
if (tempPublished && humPublished) {
Serial.printf("Datos publicados - Temperatura: %.1f°C, Humedad: %.1f%%\n",
temperature, humidity);
publishStatus("active");
} else {
Serial.println("Error: Fallo al publicar datos MQTT");
publishStatus("publish_error");
}
}
void publishStatus(const char* status) {
StaticJsonDocument<200> statusDoc;
statusDoc["device_id"] = mqtt_client_id;
statusDoc["status"] = status;
statusDoc["timestamp"] = millis();
statusDoc["wifi_rssi"] = WiFi.RSSI();
statusDoc["free_heap"] = ESP.getFreeHeap();
char statusBuffer[256];
serializeJson(statusDoc, statusBuffer);
client.publish(status_topic, statusBuffer, true);
}
3. Aplicación Flask como Subscriber MQTT
app.py - Servidor Flask con Cliente MQTT
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
import sqlite3
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key_here'
socketio = SocketIO(app, cors_allowed_origins="*")
# Configuración MQTT
MQTT_BROKER = "localhost" # Cambiar por IP del broker si está remoto
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60
# Topics MQTT
TEMPERATURE_TOPIC = "sensors/dht11/temperature"
HUMIDITY_TOPIC = "sensors/dht11/humidity"
STATUS_TOPIC = "sensors/dht11/status"
# Variables globales para almacenar datos
latest_data = {
'temperature': None,
'humidity': None,
'status': None,
'last_update': None
}
# Base de datos SQLite para almacenar histórico
def init_database():
conn = sqlite3.connect('sensor_data.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
sensor_type TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT NOT NULL,
sensor_id TEXT,
location TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS device_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
device_id TEXT NOT NULL,
status TEXT NOT NULL,
wifi_rssi INTEGER,
free_heap INTEGER
)
''')
conn.commit()
conn.close()
def save_sensor_data(sensor_type, data):
try:
conn = sqlite3.connect('sensor_data.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO sensor_readings (sensor_type, value, unit, sensor_id, location)
VALUES (?, ?, ?, ?, ?)
''', (sensor_type, data['value'], data['unit'],
data.get('sensor_id', 'unknown'), data.get('location', 'unknown')))
conn.commit()
conn.close()
print(f"Datos guardados en BD: {sensor_type} = {data['value']}{data['unit']}")
except Exception as e:
print(f"Error guardando en BD: {e}")
def save_device_status(status_data):
try:
conn = sqlite3.connect('sensor_data.db')
cursor = conn.cursor()
cursor.execute('''
INSERT INTO device_status (device_id, status, wifi_rssi, free_heap)
VALUES (?, ?, ?, ?)
''', (status_data.get('device_id', 'unknown'), status_data.get('status', 'unknown'),
status_data.get('wifi_rssi'), status_data.get('free_heap')))
conn.commit()
conn.close()
except Exception as e:
print(f"Error guardando status en BD: {e}")
# Callbacks MQTT
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Conectado al broker MQTT exitosamente")
client.subscribe(TEMPERATURE_TOPIC)
client.subscribe(HUMIDITY_TOPIC)
client.subscribe(STATUS_TOPIC)
print(f"Suscrito a topics: {TEMPERATURE_TOPIC}, {HUMIDITY_TOPIC}, {STATUS_TOPIC}")
else:
print(f"Error conectando a MQTT, código: {rc}")
def on_message(client, userdata, msg):
try:
topic = msg.topic
payload = json.loads(msg.payload.decode())
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Mensaje recibido [{topic}]: {payload}")