Introducción
MQTT (Message Queuing Telemetry Transport) es un protocolo de comunicación ligero y eficiente, ideal para aplicaciones IoT donde se requiere una comunicación confiable entre dispositivos con recursos limitados. En esta clase exploraremos los conceptos fundamentales de MQTT, específicamente el broker y el sistema de topics, implementando una solución completa con ESP32, DHT11, Mosquitto y Flask.
La arquitectura MQTT sigue el patrón publish/subscribe, donde los dispositivos publican mensajes en topics específicos y otros dispositivos pueden suscribirse a esos topics para recibir los datos. Esto permite una comunicación desacoplada y escalable, perfecta para redes de sensores IoT.
Conceptos Fundamentales
¿Qué es un Broker MQTT?
El broker MQTT es el servidor central que actúa como intermediario entre todos los clientes MQTT. Es responsable de:
- Recibir mensajes: Acepta todas las publicaciones de los clientes
- Filtrar mensajes: Determina qué clientes deben recibir cada mensaje
- Distribuir mensajes: Envía los mensajes a los suscriptores apropiados
- Gestionar conexiones: Mantiene el estado de conexión de todos los clientes
- Manejar la autenticación: Verifica credenciales y permisos
Arquitectura MQTT
La arquitectura MQTT consta de los siguientes elementos:
- Publisher (Publicador): Cliente que envía mensajes a topics específicos
- Subscriber (Suscriptor): Cliente que se suscribe a topics para recibir mensajes
- Broker: Servidor que gestiona la comunicación entre publishers y subscribers
- Topic: Canal de comunicación identificado por una cadena jerárquica
- Payload: Contenido del mensaje (datos sensor, comandos, etc.)
Sistema de Topics
Los topics en MQTT son cadenas UTF-8 que siguen una estructura jerárquica usando "/" como separador:
- Estructura jerárquica:
casa/salon/temperatura
- Wildcards de suscripción:
+
(nivel único):casa/+/temperatura
#
(multinivel):casa/salon/#
- Buenas prácticas:
- Usar nombres descriptivos y consistentes
- Evitar caracteres especiales
- Mantener una estructura lógica y escalable
Niveles de Calidad de Servicio (QoS)
- QoS 0: At most once (máximo una vez) - No garantiza entrega
- QoS 1: At least once (al menos una vez) - Garantiza entrega, posibles duplicados
- QoS 2: Exactly once (exactamente una vez) - Garantiza entrega única
Implementación Práctica
Configuración del Broker Mosquitto
Primero, configuremos el broker Mosquitto. Crea un archivo de configuración mosquitto.conf
:
Configuración Mosquitto (mosquitto.conf):
# Puerto de escucha
port 1883
# Permitir conexiones anónimas (solo para desarrollo)
allow_anonymous true
# Archivo de log
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
# Persistencia de mensajes
persistence true
persistence_location /var/lib/mosquitto/
# Configuración de retain
retained_persistence true
Implementación en ESP32
Implementemos un cliente MQTT en ESP32 que publique datos del sensor DHT11. Conexiones del DHT11:
- VCC → 3.3V del ESP32
- GND → GND del ESP32
- DATA → GPIO 4 del ESP32
Ejemplo: Publisher ESP32 con DHT11
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";
// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* client_id = "ESP32_Sensor_01";
// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);
// Topics MQTT
const char* temp_topic = "hogar/salon/temperatura";
const char* hum_topic = "hogar/salon/humedad";
const char* status_topic = "hogar/salon/estado";
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
const unsigned long MSG_INTERVAL = 5000; // 5 segundos
void setup() {
Serial.begin(115200);
dht.begin();
setup_wifi();
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
Serial.println("Sistema iniciado correctamente");
}
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando a ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi conectado");
Serial.print("Dirección IP: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Mensaje recibido [");
Serial.print(topic);
Serial.print("]: ");
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
}
void reconnect() {
while (!client.connected()) {
Serial.print("Intentando conexión MQTT...");
if (client.connect(client_id)) {
Serial.println("conectado");
client.publish(status_topic, "ESP32 conectado");
// Suscripción a topic de comandos
client.subscribe("hogar/salon/comandos");
} else {
Serial.print("falló, rc=");
Serial.print(client.state());
Serial.println(" reintentando en 5 segundos");
delay(5000);
}
}
}
void publishSensorData() {
// Leer datos del sensor
float temperature = dht.readTemperature();
float humidity = dht.readHumidity();
// Verificar si las lecturas son válidas
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error al leer del sensor DHT!");
client.publish(status_topic, "Error de sensor");
return;
}
// Crear JSON con los datos
StaticJsonDocument<200> doc;
doc["temperature"] = temperature;
doc["humidity"] = humidity;
doc["timestamp"] = millis();
doc["sensor_id"] = client_id;
char json_string[200];
serializeJson(doc, json_string);
// Publicar datos individuales
client.publish(temp_topic, String(temperature).c_str(), true);
client.publish(hum_topic, String(humidity).c_str(), true);
// Publicar JSON completo
client.publish("hogar/salon/datos", json_string, true);
// Log en serial
Serial.printf("Temperatura: %.2f°C, Humedad: %.2f%%\n", temperature, humidity);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > MSG_INTERVAL) {
lastMsg = now;
publishSensorData();
}
delay(100);
}
Aplicación Flask como Subscriber
Implementemos una aplicación Flask que se suscriba a los topics MQTT y muestre los datos en tiempo real:
Servidor Flask (app.py):
from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
app = Flask(__name__)
app.config['SECRET_KEY'] = 'mqtt_secret_key'
socketio = SocketIO(app, cors_allowed_origins="*")
# Configuración MQTT
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPICS = [
("hogar/salon/temperatura", 0),
("hogar/salon/humedad", 0),
("hogar/salon/datos", 0),
("hogar/salon/estado", 0)
]
# Almacenamiento de datos
sensor_data = {
"temperature": 0,
"humidity": 0,
"timestamp": "",
"status": "Desconectado"
}
class MQTTClient:
def __init__(self):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Conectado al broker MQTT")
sensor_data["status"] = "Conectado al broker"
# Suscribirse a todos los topics
for topic, qos in MQTT_TOPICS:
client.subscribe(topic, qos)
print(f"Suscrito a: {topic}")
else:
print(f"Error de conexión MQTT: {rc}")
sensor_data["status"] = f"Error conexión: {rc}"
def on_message(self, client, userdata, msg):
topic = msg.topic
message = msg.payload.decode('utf-8')
print(f"Mensaje recibido - Topic: {topic}, Mensaje: {message}")
try:
if topic == "hogar/salon/temperatura":
sensor_data["temperature"] = float(message)
elif topic == "hogar/salon/humedad":
sensor_data["humidity"] = float(message)
elif topic == "hogar/salon/datos":
# Procesar JSON completo
data = json.loads(message)
sensor_data["temperature"] = data.get("temperature", 0)
sensor_data["humidity"] = data.get("humidity", 0)
elif topic == "hogar/salon/estado":
sensor_data["status"] = message
# Actualizar timestamp
sensor_data["timestamp"] = datetime.now().strftime("%H:%M:%S")
# Emitir datos a todos los clientes conectados via WebSocket
socketio.emit('sensor_update', sensor_data)
except Exception as e:
print(f"Error procesando mensaje: {e}")
def on_disconnect(self, client, userdata, rc):
print("Desconectado del broker MQTT")
sensor_data["status"] = "Desconectado"
def connect_and_loop(self):
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_forever()
except Exception as e:
print(f"Error conectando a MQTT: {e}")
sensor_data["status"] = f"Error: {e}"
# Crear instancia del cliente MQTT
mqtt_client = MQTTClient()
@app.route('/')
def dashboard():
return render_template('dashboard.html', data=sensor_data)
@app.route('/api/data')
def get_data():
return jsonify(sensor_data)
@app.route('/api/publish/<topic>/<message>')
def publish_message(topic, message):
try:
full_topic = f"hogar/salon/{topic}"
mqtt_client.client.publish(full_topic, message)
return jsonify({"status": "success", "topic": full_topic, "message": message})
except Exception as e:
return jsonify({"status": "error", "error": str(e)})
@socketio.on('connect')
def handle_connect():
print('Cliente WebSocket conectado')
emit('sensor_update', sensor_data)
@socketio.on('disconnect')
def handle_disconnect():
print('Cliente WebSocket desconectado')
def start_mqtt_client():
"""Función para iniciar el cliente MQTT en un hilo separado"""
mqtt_client.connect_and_loop()
if __name__ == '__main__':
# Iniciar cliente MQTT en hilo separado
mqtt_thread = threading.Thread(target=start_mqtt_client)
mqtt_thread.daemon = True
mqtt_thread.start()
# Iniciar servidor Flask con SocketIO
socketio.run(app, host='0.0.0.0', port=5000, debug=True)