Configurar Mosquitto como broker central

Configuración avanzada del broker, optimización de rendimiento

Módulo 7 ⏱️ 2 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta clase aprenderemos a configurar Mosquitto como broker MQTT central para un sistema IoT completo que integra sensores DHT11 conectados a ESP32, transmisión de datos via MQTT y visualización web con Flask. Mosquitto actuará como el núcleo de comunicación que permite la interoperabilidad entre dispositivos IoT y aplicaciones web.

Al finalizar esta sesión, tendrás un broker MQTT completamente configurado y optimizado, capaz de manejar múltiples clientes ESP32 y servir datos en tiempo real a aplicaciones Flask con alta disponibilidad y rendimiento.

Prerrequisitos: Conocimientos básicos de Linux/Windows, protocolo MQTT, programación en C++ (Arduino IDE) y Python.

Conceptos Fundamentales

¿Qué es Mosquitto?

Eclipse Mosquitto es un broker MQTT de código abierto que implementa las versiones 5.0, 3.1.1 y 3.1 del protocolo MQTT. Es ligero, eficiente y adecuado para uso en dispositivos de baja potencia hasta servidores completos.

Características principales de Mosquitto:

  • Ligero y eficiente: Consume pocos recursos del sistema
  • Escalable: Soporta miles de conexiones simultáneas
  • Seguridad avanzada: SSL/TLS, autenticación por usuario/contraseña
  • Persistencia: Almacena mensajes y suscripciones
  • Bridge/Clustering: Conecta múltiples brokers
  • Logging detallado: Para monitoreo y debugging

Arquitectura del sistema IoT:

Flujo de datos:
DHT11 → ESP32 (Publisher) → Mosquitto Broker → Flask App (Subscriber) → Dashboard Web

Patrones de comunicación MQTT:

  • Publish/Subscribe: Desacoplamiento entre productores y consumidores
  • Topics jerárquicos: home/sensors/temperature, home/sensors/humidity
  • QoS (Quality of Service): 0 (Fire and forget), 1 (At least once), 2 (Exactly once)
  • Retain messages: Último mensaje disponible para nuevos suscriptores
  • Last Will Testament: Mensaje automático cuando cliente se desconecta

Implementación Práctica

1. Instalación y Configuración Básica de Mosquitto

Instalación en Ubuntu/Debian:


# Actualizar repositorios
sudo apt update

# Instalar Mosquitto broker y cliente
sudo apt install mosquitto mosquitto-clients

# Verificar instalación
mosquitto --help
mosquitto_pub --help
mosquitto_sub --help

# Iniciar servicio
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# Verificar estado
sudo systemctl status mosquitto
            

Configuración básica - /etc/mosquitto/mosquitto.conf:


# Puerto por defecto
port 1883

# Permitir conexiones anónimas (solo para desarrollo)
allow_anonymous true

# Archivo de log
log_dest file /var/log/mosquitto/mosquitto.log

# Nivel de log (error, warning, notice, information, debug)
log_type all

# Persistencia de datos
persistence true
persistence_location /var/lib/mosquitto/

# Archivo de persistencia
persistence_file mosquitto.db

# Intervalo de guardado automático (segundos)
autosave_interval 1800

# Mensaje de conexión por defecto
connection_messages true

# Estadísticas del broker
sys_interval 10
            

2. Configuración Avanzada para Producción

Configuración optimizada - /etc/mosquitto/conf.d/iot_production.conf:


# ==============================================
# CONFIGURACIÓN PARA SISTEMA IoT PROFESIONAL
# ==============================================

# Puertos de escucha
listener 1883 0.0.0.0
listener 8883 0.0.0.0

# Configuración SSL/TLS para puerto 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate false

# Autenticación
allow_anonymous false
password_file /etc/mosquitto/passwd

# Límites de conexión y rendimiento
max_connections 1000
max_inflight_messages 100
max_queued_messages 1000

# Timeouts optimizados para ESP32
keepalive_interval 60

# Límites de mensaje
message_size_limit 8192

# Configuración de memoria
memory_limit 67108864

# WebSockets para aplicaciones web
listener 9001
protocol websockets

# Logging detallado
log_dest file /var/log/mosquitto/mosquitto.log
log_dest stdout
log_type error
log_type warning  
log_type notice
log_type information
log_timestamp true
log_timestamp_format %Y-%m-%dT%H:%M:%S

# Bridge para conexión con otros brokers (opcional)
# connection bridge-01
# address mqtt.ejemplo.com:1883
# topic sensors/# out 0
# topic commands/# in 0
            

Crear usuarios para el sistema:


# Crear archivo de contraseñas
sudo mosquitto_passwd -c /etc/mosquitto/passwd esp32_user
sudo mosquitto_passwd /etc/mosquitto/passwd flask_app
sudo mosquitto_passwd /etc/mosquitto/passwd admin_user

# Verificar usuarios creados
sudo cat /etc/mosquitto/passwd

# Reiniciar Mosquitto para aplicar cambios
sudo systemctl restart mosquitto
            

3. Código ESP32 con DHT11

Código completo ESP32 - sensor_node.ino:


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD_WIFI";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";  // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_user = "esp32_user";
const char* mqtt_password = "esp32_pass";
const char* client_id = "ESP32_DHT11_001";

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Topics MQTT
const char* topic_temperature = "home/sensors/temperature";
const char* topic_humidity = "home/sensors/humidity";
const char* topic_status = "home/sensors/status";
const char* topic_commands = "home/commands/esp32_001";

// Variables globales
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
const long interval = 30000;  // Enviar datos cada 30 segundos

// Variables para reconexión
unsigned long lastReconnectAttempt = 0;
const long reconnectInterval = 5000;

void setup() {
  Serial.begin(115200);
  dht.begin();
  
  // Configurar LED interno para indicar estado
  pinMode(LED_BUILTIN, OUTPUT);
  
  Serial.println("Iniciando sensor DHT11 con MQTT...");
  
  // Conectar a WiFi
  setup_wifi();
  
  // Configurar cliente MQTT
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
  
  // Conectar al broker MQTT
  reconnect();
}

void setup_wifi() {
  delay(10);
  Serial.println();
  Serial.print("Conectando a ");
  Serial.println(ssid);
  
  WiFi.mode(WIFI_STA);
  WiFi.begin(ssid, password);
  
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  
  randomSeed(micros());
  
  Serial.println("");
  Serial.println("WiFi conectado!");
  Serial.print("IP: ");
  Serial.println(WiFi.localIP());
  Serial.print("RSSI: ");
  Serial.println(WiFi.RSSI());
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Mensaje recibido [");
  Serial.print(topic);
  Serial.print("]: ");
  
  String message;
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.println(message);
  
  // Procesar comandos
  if (String(topic) == topic_commands) {
    if (message == "reboot") {
      Serial.println("Reiniciando ESP32...");
      ESP.restart();
    } else if (message == "status") {
      publishStatus();
    }
  }
}

boolean reconnect() {
  if (client.connect(client_id, mqtt_user, mqtt_password, topic_status, 1, true, "offline")) {
    Serial.println("Conectado al broker MQTT");
    
    // Publicar estado online
    client.publish(topic_status, "online", true);
    
    // Suscribirse a comandos
    client.subscribe(topic_commands);
    
    // LED encendido = conexión exitosa
    digitalWrite(LED_BUILTIN, HIGH);
    
    return true;
  } else {
    Serial.print("Error de conexión MQTT, rc=");
    Serial.print(client.state());
    digitalWrite(LED_BUILTIN, LOW);
    return false;
  }
}

void publishSensorData() {
  float humidity = dht.readHumidity();
  float temperature = dht.readTemperature();
  
  // Verificar lecturas válidas
  if (isnan(humidity) || isnan(temperature)) {
    Serial.println("Error leyendo sensor DHT11!");
    return;
  }
  
  // Crear JSON con datos del sensor
  StaticJsonDocument<200> doc;
  doc["device_id"] = client_id;
  doc["timestamp"] = millis();
  doc["temperature"] = temperature;
  doc["humidity"] = humidity;
  doc["rssi"] = WiFi.RSSI();
  
  char jsonString[200];
  serializeJson(doc, jsonString);
  
  // Publicar datos individuales (para compatibilidad)
  client.publish(topic_temperature, String(temperature).c_str(), true);
  client.publish(topic_humidity, String(humidity).c_str(), true);
  
  // Publicar JSON completo
  client.publish("home/sensors/data", jsonString, true);
  
  Serial.println("Datos publicados:");
  Serial.println(jsonString);
}

void publishStatus() {
  StaticJsonDocument<300> doc;
  doc["device_id"] = client_id;
  doc["uptime"] = millis();
  doc["free_heap"] = ESP.getFreeHeap();
  doc["wifi_rssi"] = WiFi.RSSI();
  doc["ip_address"] = WiFi.localIP().toString();
  
  char statusJson[300];
  serializeJson(doc, statusJson);
  
  client.publish("home/sensors/device_status", statusJson, true);
}

void loop() {
  // Verificar conexión MQTT
  if (!client.connected()) {
    unsigned long now = millis();
    if (now - lastReconnectAttempt > reconnectInterval) {
      lastReconnectAttempt = now;
      if (reconnect()) {
        lastReconnectAttempt = 0;
      }
    }
  } else {
    client.loop();
  }
  
  // Publicar datos del sensor
  unsigned long now = millis();
  if (now - lastMsg > interval) {
    lastMsg = now;
    publishSensorData();
  }
  
  // Monitoreo de memoria
  if (ESP.getFreeHeap() < 10000) {
    Serial.println("¡Advertencia: Poca memoria libre!");
  }
}
            

4. Aplicación Flask para Dashboard

Aplicación Flask principal - app.py:


from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
import sqlite3
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'tu_clave_secreta_aqui'
socketio = SocketIO(app, cors_allowed_origins="*")

# Configuración MQTT
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_USER = "flask_app"
MQTT_PASSWORD = "flask_pass"
MQTT_KEEPALIVE = 60

# Variables globales para datos en tiempo real
sensor_data = {
    "temperature": 0.0,
    "humidity": 0.0,
    "last_update": None,
    "device_status": "offline"
}

# Base de datos SQLite
DATABASE = 'sensor_data.db'

def init_database