Introducción
En esta lección aprenderemos a implementar la comunicación MQTT entre el ESP32 y un broker Mosquitto para transmitir datos del sensor DHT11. MQTT (Message Queuing Telemetry Transport) es un protocolo de comunicación ligero y eficiente, ideal para aplicaciones IoT debido a su bajo consumo de ancho de banda y capacidad de manejar conexiones intermitentes.
Al finalizar esta clase, serás capaz de configurar un cliente MQTT en el ESP32, publicar datos de sensores en tiempo real y procesar estos datos usando Flask con Python, creando así un sistema completo de monitoreo IoT.
Conceptos Fundamentales
Para implementar exitosamente la comunicación MQTT en nuestro sistema, es esencial comprender los siguientes conceptos:
Protocolo MQTT
- Broker: Servidor central que recibe y distribuye mensajes entre clientes (Mosquitto en nuestro caso)
- Cliente: Dispositivos que se conectan al broker para publicar o suscribirse a tópicos (ESP32, aplicación Flask)
- Tópicos: Canales de comunicación organizados jerárquicamente (ej: "sensor/temperatura", "sensor/humedad")
- QoS (Quality of Service): Niveles de garantía de entrega de mensajes (0, 1, 2)
- Retain: Mantener el último mensaje publicado para nuevos suscriptores
Arquitectura del Sistema
- ESP32: Cliente MQTT que publica datos del DHT11
- Broker Mosquitto: Intermediario que gestiona la comunicación
- Flask App: Cliente MQTT suscriptor que procesa y visualiza datos
- Red WiFi: Infraestructura de comunicación local
Ventajas de MQTT para IoT
- Protocolo ligero con overhead mínimo
- Soporte para conexiones intermitentes
- Escalabilidad para múltiples dispositivos
- Diferentes niveles de QoS según necesidades
- Patrón publish-subscribe desacoplado
Implementación Práctica
Implementaremos el sistema completo comenzando con la configuración del ESP32 como cliente MQTT, seguido de la aplicación Flask como suscriptor.
Conexiones Hardware DHT11 - ESP32:
- VCC del DHT11 → Pin 3.3V del ESP32
- GND del DHT11 → Pin GND del ESP32
- DATA del DHT11 → Pin GPIO 4 del ESP32
- Resistor pull-up 10kΩ entre VCC y DATA del DHT11
Código ESP32 - Cliente MQTT Publisher
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
// Configuración WiFi
const char* ssid = "Tu_Red_WiFi";
const char* password = "Tu_Password";
// Configuración MQTT Broker
const char* mqtt_server = "192.168.1.100"; // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_user = "iot_user"; // Usuario MQTT (opcional)
const char* mqtt_password = "iot_pass"; // Password MQTT (opcional)
// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);
// Configuración tópicos MQTT
const char* topic_temperatura = "sensor/temperatura";
const char* topic_humedad = "sensor/humedad";
const char* topic_datos = "sensor/datos"; // Tópico combinado JSON
// Objetos para WiFi y MQTT
WiFiClient espClient;
PubSubClient client(espClient);
// Variables de control
unsigned long lastMsg = 0;
const unsigned long MSG_INTERVAL = 5000; // Publicar cada 5 segundos
void setup() {
Serial.begin(115200);
// Inicializar DHT11
dht.begin();
Serial.println("DHT11 inicializado");
// Conectar a WiFi
setup_wifi();
// Configurar cliente MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
Serial.println("Sistema inicializado correctamente");
}
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando a WiFi: ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi conectado exitosamente");
Serial.print("Dirección IP: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
// Función callback para mensajes MQTT recibidos
Serial.print("Mensaje recibido en tópico: ");
Serial.print(topic);
Serial.print(". Mensaje: ");
String message = "";
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
}
void reconnect() {
// Bucle hasta reconectar MQTT
while (!client.connected()) {
Serial.print("Intentando conexión MQTT...");
// Crear un ID de cliente único
String clientId = "ESP32Client-";
clientId += String(random(0xffff), HEX);
// Intentar conectar
if (client.connect(clientId.c_str(), mqtt_user, mqtt_password)) {
Serial.println("MQTT conectado");
// Publicar mensaje de conexión
client.publish("sensor/estado", "ESP32 conectado");
// Suscribirse a tópicos de control (opcional)
client.subscribe("sensor/control");
} else {
Serial.print("Error de conexión MQTT, rc=");
Serial.print(client.state());
Serial.println(" reintentando en 5 segundos");
delay(5000);
}
}
}
void publishSensorData() {
// Leer datos del DHT11
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
// Verificar si las lecturas son válidas
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error al leer datos del DHT11");
return;
}
// Publicar datos individuales
char tempString[8];
char humString[8];
dtostrf(temperature, 6, 2, tempString);
dtostrf(humidity, 6, 2, humString);
client.publish(topic_temperatura, tempString, true); // retain = true
client.publish(topic_humedad, humString, true);
// Crear JSON con todos los datos
StaticJsonDocument<200> jsonDoc;
jsonDoc["temperatura"] = temperature;
jsonDoc["humedad"] = humidity;
jsonDoc["timestamp"] = millis();
jsonDoc["dispositivo"] = "ESP32_DHT11";
char jsonBuffer[256];
serializeJson(jsonDoc, jsonBuffer);
client.publish(topic_datos, jsonBuffer, true);
// Log en Serial
Serial.print("Datos publicados - Temperatura: ");
Serial.print(temperature);
Serial.print("°C, Humedad: ");
Serial.print(humidity);
Serial.println("%");
Serial.print("JSON: ");
Serial.println(jsonBuffer);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > MSG_INTERVAL) {
lastMsg = now;
publishSensorData();
}
// Pequeña pausa para evitar watchdog reset
delay(100);
}
Aplicación Flask - Cliente MQTT Subscriber
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'tu_clave_secreta_aqui'
socketio = SocketIO(app, cors_allowed_origins="*")
# Configuración MQTT
MQTT_BROKER = "192.168.1.100" # IP del broker Mosquitto
MQTT_PORT = 1883
MQTT_USER = "iot_user" # Usuario MQTT (opcional)
MQTT_PASSWORD = "iot_pass" # Password MQTT (opcional)
MQTT_KEEPALIVE = 60
# Tópicos MQTT
TOPICS = [
"sensor/temperatura",
"sensor/humedad",
"sensor/datos",
"sensor/estado"
]
# Variables globales para almacenar datos
sensor_data = {
'temperatura': None,
'humedad': None,
'timestamp': None,
'dispositivo': 'Desconectado'
}
# Historial de datos (últimas 50 lecturas)
data_history = []
MAX_HISTORY = 50
class MQTTClient:
def __init__(self):
self.client = mqtt.Client()
self.client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.connected = False
def on_connect(self, client, userdata, flags, rc):
"""Callback de conexión MQTT"""
if rc == 0:
print(f"Conectado al broker MQTT con código: {rc}")
self.connected = True
# Suscribirse a todos los tópicos
for topic in TOPICS:
client.subscribe(topic)
print(f"Suscrito al tópico: {topic}")
else:
print(f"Error de conexión MQTT: {rc}")
self.connected = False
def on_message(self, client, userdata, msg):
"""Callback para mensajes MQTT recibidos"""
topic = msg.topic
payload = msg.payload.decode('utf-8')
print(f"Mensaje recibido - Tópico: {topic}, Payload: {payload}")
try:
if topic == "sensor/datos":
# Procesar JSON completo
data = json.loads(payload)
self.update_sensor_data(data)
elif topic == "sensor/temperatura":
sensor_data['temperatura'] = float(payload)
sensor_data['timestamp'] = datetime.now().isoformat()
elif topic == "sensor/humedad":
sensor_data['humedad'] = float(payload)
sensor_data['timestamp'] = datetime.now().isoformat()
elif topic == "sensor/estado":
sensor_data['dispositivo'] = payload
# Enviar datos actualizados a clientes web via WebSocket
socketio.emit('sensor_update', sensor_data)
except Exception as e:
print(f"Error procesando mensaje MQTT: {e}")
def on_disconnect(self, client, userdata, rc):
"""Callback de desconexión MQTT"""
print(f"Desconectado del broker MQTT: {rc}")
self.connected = False
def update_sensor_data(self, data):
"""Actualizar datos del sensor con timestamp"""
global sensor_data, data_history
sensor_data.update({
'temperatura': data.get('temperatura'),
'humedad': data.get('humedad'),
'timestamp': datetime.now().isoformat(),
'dispositivo': data.get('dispositivo', 'ESP32')
})
# Agregar al historial
history_entry = sensor_data.copy()
data_history.append(history_entry)
# Mantener solo las últimas MAX_HISTORY lecturas
if len(data_history) > MAX_HISTORY:
data_history.pop(0)
def connect(self):
"""Conectar al broker MQTT"""
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE)
self.client.loop_start()
return True
except Exception as e:
print(f"Error conectando a MQTT: {e}")
return False
def disconnect(self):
"""Desconectar del broker MQTT"""
self.client.loop_stop()
self.client.disconnect()
# Instancia global del cliente MQTT
mqtt_client = MQTTClient()
@app.route('/')
def index():
"""Página principal del dashboard"""
return render_template('dashboard.html')
@app.route('/api/sensor-data')
def get_sensor_data():
"""API endpoint para obtener datos actuales del sensor"""
return jsonify(sensor_data)
@app.route('/api/history')
def get_history():
"""API endpoint para obtener historial de datos"""
return jsonify(data_history)
@app.route('/api/status')
def get_status():
"""API endpoint para obtener estado de conexión"""
return jsonify({
'mqtt_connected': mqtt_client.connected,
'data_count':