Concepto de broker y tópicos en MQTT

Arquitectura MQTT, conceptos de broker, publisher, subscriber, topics y payload

Módulo 3 ⏱️ 2 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

MQTT (Message Queuing Telemetry Transport) es un protocolo de comunicación ligero y eficiente, ideal para aplicaciones IoT donde se requiere una comunicación confiable entre dispositivos con recursos limitados. En esta clase exploraremos los conceptos fundamentales de MQTT, específicamente el broker y el sistema de topics, implementando una solución completa con ESP32, DHT11, Mosquitto y Flask.

La arquitectura MQTT sigue el patrón publish/subscribe, donde los dispositivos publican mensajes en topics específicos y otros dispositivos pueden suscribirse a esos topics para recibir los datos. Esto permite una comunicación desacoplada y escalable, perfecta para redes de sensores IoT.

Conceptos Fundamentales

¿Qué es un Broker MQTT?

El broker MQTT es el servidor central que actúa como intermediario entre todos los clientes MQTT. Es responsable de:

  • Recibir mensajes: Acepta todas las publicaciones de los clientes
  • Filtrar mensajes: Determina qué clientes deben recibir cada mensaje
  • Distribuir mensajes: Envía los mensajes a los suscriptores apropiados
  • Gestionar conexiones: Mantiene el estado de conexión de todos los clientes
  • Manejar la autenticación: Verifica credenciales y permisos

Arquitectura MQTT

La arquitectura MQTT consta de los siguientes elementos:

  • Publisher (Publicador): Cliente que envía mensajes a topics específicos
  • Subscriber (Suscriptor): Cliente que se suscribe a topics para recibir mensajes
  • Broker: Servidor que gestiona la comunicación entre publishers y subscribers
  • Topic: Canal de comunicación identificado por una cadena jerárquica
  • Payload: Contenido del mensaje (datos sensor, comandos, etc.)

Sistema de Topics

Los topics en MQTT son cadenas UTF-8 que siguen una estructura jerárquica usando "/" como separador:

  • Estructura jerárquica: casa/salon/temperatura
  • Wildcards de suscripción:
    • + (nivel único): casa/+/temperatura
    • # (multinivel): casa/salon/#
  • Buenas prácticas:
    • Usar nombres descriptivos y consistentes
    • Evitar caracteres especiales
    • Mantener una estructura lógica y escalable

Niveles de Calidad de Servicio (QoS)

  • QoS 0: At most once (máximo una vez) - No garantiza entrega
  • QoS 1: At least once (al menos una vez) - Garantiza entrega, posibles duplicados
  • QoS 2: Exactly once (exactamente una vez) - Garantiza entrega única

Implementación Práctica

Configuración del Broker Mosquitto

Primero, configuremos el broker Mosquitto. Crea un archivo de configuración mosquitto.conf:

Configuración Mosquitto (mosquitto.conf):


# Puerto de escucha
port 1883

# Permitir conexiones anónimas (solo para desarrollo)
allow_anonymous true

# Archivo de log
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information

# Persistencia de mensajes
persistence true
persistence_location /var/lib/mosquitto/

# Configuración de retain
retained_persistence true

Implementación en ESP32

Implementemos un cliente MQTT en ESP32 que publique datos del sensor DHT11. Conexiones del DHT11:

  • VCC → 3.3V del ESP32
  • GND → GND del ESP32
  • DATA → GPIO 4 del ESP32

Ejemplo: Publisher ESP32 con DHT11


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* client_id = "ESP32_Sensor_01";

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Topics MQTT
const char* temp_topic = "hogar/salon/temperatura";
const char* hum_topic = "hogar/salon/humedad";
const char* status_topic = "hogar/salon/estado";

WiFiClient espClient;
PubSubClient client(espClient);

unsigned long lastMsg = 0;
const unsigned long MSG_INTERVAL = 5000; // 5 segundos

void setup() {
    Serial.begin(115200);
    dht.begin();
    
    setup_wifi();
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
    
    Serial.println("Sistema iniciado correctamente");
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println("");
    Serial.println("WiFi conectado");
    Serial.print("Dirección IP: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Mensaje recibido [");
    Serial.print(topic);
    Serial.print("]: ");
    
    String message;
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);
}

void reconnect() {
    while (!client.connected()) {
        Serial.print("Intentando conexión MQTT...");
        
        if (client.connect(client_id)) {
            Serial.println("conectado");
            client.publish(status_topic, "ESP32 conectado");
            // Suscripción a topic de comandos
            client.subscribe("hogar/salon/comandos");
        } else {
            Serial.print("falló, rc=");
            Serial.print(client.state());
            Serial.println(" reintentando en 5 segundos");
            delay(5000);
        }
    }
}

void publishSensorData() {
    // Leer datos del sensor
    float temperature = dht.readTemperature();
    float humidity = dht.readHumidity();
    
    // Verificar si las lecturas son válidas
    if (isnan(humidity) || isnan(temperature)) {
        Serial.println("Error al leer del sensor DHT!");
        client.publish(status_topic, "Error de sensor");
        return;
    }
    
    // Crear JSON con los datos
    StaticJsonDocument<200> doc;
    doc["temperature"] = temperature;
    doc["humidity"] = humidity;
    doc["timestamp"] = millis();
    doc["sensor_id"] = client_id;
    
    char json_string[200];
    serializeJson(doc, json_string);
    
    // Publicar datos individuales
    client.publish(temp_topic, String(temperature).c_str(), true);
    client.publish(hum_topic, String(humidity).c_str(), true);
    
    // Publicar JSON completo
    client.publish("hogar/salon/datos", json_string, true);
    
    // Log en serial
    Serial.printf("Temperatura: %.2f°C, Humedad: %.2f%%\n", temperature, humidity);
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > MSG_INTERVAL) {
        lastMsg = now;
        publishSensorData();
    }
    
    delay(100);
}

Aplicación Flask como Subscriber

Implementemos una aplicación Flask que se suscriba a los topics MQTT y muestre los datos en tiempo real:

Servidor Flask (app.py):


from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime

app = Flask(__name__)
app.config['SECRET_KEY'] = 'mqtt_secret_key'
socketio = SocketIO(app, cors_allowed_origins="*")

# Configuración MQTT
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPICS = [
    ("hogar/salon/temperatura", 0),
    ("hogar/salon/humedad", 0),
    ("hogar/salon/datos", 0),
    ("hogar/salon/estado", 0)
]

# Almacenamiento de datos
sensor_data = {
    "temperature": 0,
    "humidity": 0,
    "timestamp": "",
    "status": "Desconectado"
}

class MQTTClient:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect
        
    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Conectado al broker MQTT")
            sensor_data["status"] = "Conectado al broker"
            # Suscribirse a todos los topics
            for topic, qos in MQTT_TOPICS:
                client.subscribe(topic, qos)
                print(f"Suscrito a: {topic}")
        else:
            print(f"Error de conexión MQTT: {rc}")
            sensor_data["status"] = f"Error conexión: {rc}"
    
    def on_message(self, client, userdata, msg):
        topic = msg.topic
        message = msg.payload.decode('utf-8')
        
        print(f"Mensaje recibido - Topic: {topic}, Mensaje: {message}")
        
        try:
            if topic == "hogar/salon/temperatura":
                sensor_data["temperature"] = float(message)
            elif topic == "hogar/salon/humedad":
                sensor_data["humidity"] = float(message)
            elif topic == "hogar/salon/datos":
                # Procesar JSON completo
                data = json.loads(message)
                sensor_data["temperature"] = data.get("temperature", 0)
                sensor_data["humidity"] = data.get("humidity", 0)
            elif topic == "hogar/salon/estado":
                sensor_data["status"] = message
            
            # Actualizar timestamp
            sensor_data["timestamp"] = datetime.now().strftime("%H:%M:%S")
            
            # Emitir datos a todos los clientes conectados via WebSocket
            socketio.emit('sensor_update', sensor_data)
            
        except Exception as e:
            print(f"Error procesando mensaje: {e}")
    
    def on_disconnect(self, client, userdata, rc):
        print("Desconectado del broker MQTT")
        sensor_data["status"] = "Desconectado"
    
    def connect_and_loop(self):
        try:
            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
            self.client.loop_forever()
        except Exception as e:
            print(f"Error conectando a MQTT: {e}")
            sensor_data["status"] = f"Error: {e}"

# Crear instancia del cliente MQTT
mqtt_client = MQTTClient()

@app.route('/')
def dashboard():
    return render_template('dashboard.html', data=sensor_data)

@app.route('/api/data')
def get_data():
    return jsonify(sensor_data)

@app.route('/api/publish/<topic>/<message>')
def publish_message(topic, message):
    try:
        full_topic = f"hogar/salon/{topic}"
        mqtt_client.client.publish(full_topic, message)
        return jsonify({"status": "success", "topic": full_topic, "message": message})
    except Exception as e:
        return jsonify({"status": "error", "error": str(e)})

@socketio.on('connect')
def handle_connect():
    print('Cliente WebSocket conectado')
    emit('sensor_update', sensor_data)

@socketio.on('disconnect')
def handle_disconnect():
    print('Cliente WebSocket desconectado')

def start_mqtt_client():
    """Función para iniciar el cliente MQTT en un hilo separado"""
    mqtt_client.connect_and_loop()

if __name__ == '__main__':
    # Iniciar cliente MQTT en hilo separado
    mqtt_thread = threading.Thread(target=start_mqtt_client)
    mqtt_thread.daemon = True
    mqtt_thread.start()
    # Iniciar servidor Flask con SocketIO
    socketio.run(app, host='0.0.0.0', port=5000, debug=True)