Introducción
En esta clase aprenderemos a implementar la publicación de datos desde un ESP32 utilizando el protocolo MQTT. Configuraremos un cliente MQTT en el microcontrolador para enviar datos del sensor DHT11 de forma periódica a un broker Mosquitto, estableciendo la base de comunicación para nuestro sistema IoT completo.
Al finalizar esta lección, serás capaz de:
- Configurar un cliente MQTT en ESP32
- Establecer conexión con broker Mosquitto
- Publicar datos del sensor DHT11 en topics MQTT
- Implementar reconexión automática y manejo de errores
- Optimizar el consumo energético en el envío de datos
Conceptos Fundamentales
El protocolo MQTT (Message Queuing Telemetry Transport) es un protocolo de comunicación ligero diseñado específicamente para dispositivos IoT con recursos limitados. Su arquitectura publish/subscribe permite una comunicación eficiente entre dispositivos.
Componentes del Sistema MQTT:
- Publisher (ESP32): Dispositivo que envía datos a topics específicos
- Broker (Mosquitto): Servidor que gestiona la distribución de mensajes
- Subscriber (Flask App): Aplicación que recibe y procesa los datos
- Topics: Canales de comunicación organizados jerárquicamente
Características Clave de MQTT:
- QoS (Quality of Service): Niveles de garantía de entrega (0, 1, 2)
- Retain Flag: Mantiene el último mensaje publicado en un topic
- Last Will Testament: Mensaje automático cuando se pierde conexión
- Keep Alive: Mecanismo para mantener la conexión activa
Estructura de Topics Recomendada:
Para nuestro proyecto utilizaremos la estructura: iot/esp32/[device_id]/[sensor_type]
iot/esp32/device001/temperature
iot/esp32/device001/humidity
iot/esp32/device001/status
Implementación Práctica
Implementaremos un cliente MQTT completo en ESP32 que lee datos del sensor DHT11 y los publica periódicamente al broker Mosquitto.
Conexiones del Hardware:
DHT11 → ESP32:
- VCC → 3.3V
- GND → GND
- DATA → GPIO 4
Código Principal ESP32 - mqtt_publisher.ino
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD";
// Configuración MQTT
const char* mqtt_server = "192.168.1.100"; // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_user = "esp32_user";
const char* mqtt_password = "esp32_pass";
const char* client_id = "ESP32_DHT11_001";
// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);
// Topics MQTT
const char* temp_topic = "iot/esp32/device001/temperature";
const char* hum_topic = "iot/esp32/device001/humidity";
const char* status_topic = "iot/esp32/device001/status";
// Objetos principales
WiFiClient espClient;
PubSubClient client(espClient);
// Variables de control
unsigned long lastMsg = 0;
const long interval = 30000; // 30 segundos
int reconnectAttempts = 0;
const int maxReconnectAttempts = 5;
void setup() {
Serial.begin(115200);
Serial.println("Iniciando ESP32 MQTT Publisher...");
// Inicializar DHT11
dht.begin();
// Configurar WiFi
setup_wifi();
// Configurar MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
Serial.println("Sistema inicializado correctamente");
}
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando a WiFi: ");
Serial.println(ssid);
WiFi.begin(ssid, password);
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 20) {
delay(500);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("");
Serial.println("WiFi conectado exitosamente!");
Serial.print("Dirección IP: ");
Serial.println(WiFi.localIP());
Serial.print("Intensidad señal (RSSI): ");
Serial.print(WiFi.RSSI());
Serial.println(" dBm");
} else {
Serial.println("Error: No se pudo conectar al WiFi");
ESP.restart(); // Reiniciar si no hay conexión
}
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Mensaje recibido en topic: ");
Serial.print(topic);
Serial.print(". Mensaje: ");
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
}
boolean reconnect() {
Serial.print("Intentando conexión MQTT...");
// Crear Last Will Testament
String willMessage = "{\"device_id\":\"" + String(client_id) +
"\",\"status\":\"offline\",\"timestamp\":" +
String(millis()) + "}";
if (client.connect(client_id, mqtt_user, mqtt_password,
status_topic, 1, true, willMessage.c_str())) {
Serial.println("Conectado al broker MQTT!");
// Publicar mensaje de estado online
StaticJsonDocument<200> statusDoc;
statusDoc["device_id"] = client_id;
statusDoc["status"] = "online";
statusDoc["ip_address"] = WiFi.localIP().toString();
statusDoc["rssi"] = WiFi.RSSI();
statusDoc["timestamp"] = millis();
String statusPayload;
serializeJson(statusDoc, statusPayload);
client.publish(status_topic, statusPayload.c_str(), true);
reconnectAttempts = 0;
return true;
} else {
Serial.print("Error de conexión MQTT, rc=");
Serial.print(client.state());
reconnectAttempts++;
return false;
}
}
void publishSensorData() {
// Leer datos del sensor DHT11
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
// Verificar si las lecturas son válidas
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error: No se pudieron leer datos del sensor DHT11");
// Publicar mensaje de error
StaticJsonDocument<150> errorDoc;
errorDoc["device_id"] = client_id;
errorDoc["error"] = "Sensor DHT11 no responde";
errorDoc["timestamp"] = millis();
String errorPayload;
serializeJson(errorDoc, errorPayload);
client.publish(status_topic, errorPayload.c_str());
return;
}
// Crear payload JSON para temperatura
StaticJsonDocument<200> tempDoc;
tempDoc["device_id"] = client_id;
tempDoc["sensor"] = "DHT11";
tempDoc["temperature"] = round(temperature * 10) / 10.0; // Una decimal
tempDoc["unit"] = "°C";
tempDoc["timestamp"] = millis();
String tempPayload;
serializeJson(tempDoc, tempPayload);
// Crear payload JSON para humedad
StaticJsonDocument<200> humDoc;
humDoc["device_id"] = client_id;
humDoc["sensor"] = "DHT11";
humDoc["humidity"] = round(humidity * 10) / 10.0; // Una decimal
humDoc["unit"] = "%";
humDoc["timestamp"] = millis();
String humPayload;
serializeJson(humDoc, humPayload);
// Publicar datos con QoS 1 (entrega garantizada)
boolean tempResult = client.publish(temp_topic, tempPayload.c_str(), false);
boolean humResult = client.publish(hum_topic, humPayload.c_str(), false);
if (tempResult && humResult) {
Serial.println("=== DATOS PUBLICADOS ===");
Serial.printf("Temperatura: %.1f°C\n", temperature);
Serial.printf("Humedad: %.1f%%\n", humidity);
Serial.println("Topics:");
Serial.println(" " + String(temp_topic));
Serial.println(" " + String(hum_topic));
Serial.println("========================");
} else {
Serial.println("Error: Fallo al publicar datos");
}
}
void loop() {
// Verificar conexión WiFi
if (WiFi.status() != WL_CONNECTED) {
Serial.println("WiFi desconectado, reintentando...");
setup_wifi();
}
// Mantener conexión MQTT
if (!client.connected()) {
if (reconnectAttempts < maxReconnectAttempts) {
if (reconnect()) {
Serial.println("Reconexión MQTT exitosa");
} else {
Serial.println("Fallo en reconexión MQTT, reintentando en 5 segundos...");
delay(5000);
}
} else {
Serial.println("Máximo de intentos alcanzado, reiniciando ESP32...");
ESP.restart();
}
}
// Mantener bucle MQTT activo
client.loop();
// Publicar datos cada intervalo definido
unsigned long now = millis();
if (now - lastMsg > interval) {
lastMsg = now;
publishSensorData();
}
// Pequeña pausa para no saturar el procesador
delay(100);
}
Script Python para Verificar Publicación - mqtt_subscriber_test.py
import paho.mqtt.client as mqtt
import json
from datetime import datetime
import time
# Configuración MQTT
MQTT_SERVER = "192.168.1.100"
MQTT_PORT = 1883
MQTT_USER = "esp32_user"
MQTT_PASSWORD = "esp32_pass"
# Topics a suscribirse
TOPICS = [
"iot/esp32/device001/temperature",
"iot/esp32/device001/humidity",
"iot/esp32/device001/status"
]
def on_connect(client, userdata, flags, rc):
"""Callback ejecutado al conectarse al broker"""
if rc == 0:
print("✅ Conectado exitosamente al broker MQTT")
print("🔔 Suscribiéndose a topics...")
for topic in TOPICS:
client.subscribe(topic, qos=1)
print(f" 📡 Suscrito a: {topic}")
print("\n⏳ Esperando mensajes del ESP32...")
print("-" * 60)
else:
print(f"❌ Error de conexión. Código: {rc}")
def on_message(client, userdata, msg):
"""Callback ejecutado al recibir un mensaje"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
topic = msg.topic
try:
# Decodificar payload JSON
payload = json.loads(msg.payload.decode())
print(f"📨 [{timestamp}] Topic: {topic}")
# Procesar según el tipo de mensaje
if "temperature" in topic:
temp = payload.get("temperature", "N/A")
device = payload.get("device_id", "Unknown")
print(f" 🌡️ Temperatura: {temp}°C (Dispositivo: {device})")
elif "humidity" in topic:
hum = payload.get("humidity", "N/A")
device = payload.get("device_id", "Unknown")
print(f" 💧 Humedad: {hum}% (Dispositivo: {device})")
elif "status" in topic:
status = payload.get("status", "Unknown")
device = payload.get("device_id", "Unknown")
ip = payload.get("ip_address", "N/A")
rssi = payload.get("rssi", "N/A")
print(f" 📊 Estado: {status} (Dispositivo: {device})")
if ip != "N/A":
print(f" 🌐 IP: {ip}, Señal: {rssi} dBm")
print("-" * 60)
except json.JSONDecodeError:
print(f"⚠️ [{timestamp}] Payload no es JSON válido: {msg.payload.decode()}")
def main():
client = mqtt.Client()
client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_SERVER, MQTT_PORT, 60)
try:
client.loop_forever()
except KeyboardInterrupt:
print("\n⏹️ Saliendo...")
client.disconnect()
if __name__ == "__main__":
main()