Configurar ESP32 + DHT11 para publicar datos a Mosquitto

Implementación completa del hardware, configuración MQTT cliente

Módulo 7 ⏱️ 3 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta lección aprenderemos a configurar un sistema IoT completo utilizando ESP32 con sensor DHT11 para publicar datos de temperatura y humedad a un broker Mosquitto MQTT, y crear una aplicación Flask para visualizar estos datos en tiempo real. Este stack tecnológico es fundamental en aplicaciones industriales y sistemas de monitoreo distribuido.

Al finalizar esta clase, serás capaz de implementar un sistema completo de telemetría que puede escalarse a múltiples sensores y aplicaciones comerciales.

Conceptos Fundamentales

Arquitectura del Sistema

Nuestro sistema sigue una arquitectura distribuida basada en el patrón Publisher/Subscriber utilizando MQTT como protocolo de comunicación:

  • ESP32: Microcontrolador con WiFi integrado que actúa como cliente MQTT publisher
  • DHT11: Sensor digital de temperatura y humedad con protocolo de comunicación propietario
  • Mosquitto: Broker MQTT ligero que gestiona las comunicaciones entre clientes
  • Flask: Framework web que actúa como cliente MQTT subscriber y servidor HTTP

Protocolo MQTT

MQTT (Message Queuing Telemetry Transport) es un protocolo de mensajería ligero diseñado para dispositivos IoT:

  • QoS Levels: 0 (At most once), 1 (At least once), 2 (Exactly once)
  • Topics: Jerarquía de canales para organizar mensajes
  • Retain Messages: Mensajes que el broker mantiene para nuevos suscriptores
  • Keep Alive: Mecanismo para detectar clientes desconectados

Conexiones Hardware DHT11-ESP32

Esquema de conexión física:

  • VCC (DHT11): → 3.3V (ESP32)
  • DATA (DHT11): → GPIO 4 (ESP32) + Resistor pull-up 10kΩ a 3.3V
  • GND (DHT11): → GND (ESP32)

Implementación Práctica

1. Configuración del Broker Mosquitto

Primero configuramos el broker MQTT en nuestro sistema. Crear archivo de configuración:

mosquitto.conf


# Configuración básica de Mosquitto
port 1883
allow_anonymous true
listener 1883
protocol mqtt

# Configuración de logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information

# Configuración de persistencia
persistence true
persistence_location /var/lib/mosquitto/

# Configuración de retención
retained_persistence true

2. Código ESP32 para Lectura DHT11 y Publicación MQTT

main.cpp - Cliente ESP32 MQTT Publisher


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>

// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD_WIFI";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100"; // IP del broker Mosquitto
const int mqtt_port = 1883;
const char* mqtt_client_id = "ESP32_DHT11_Client";
const char* temperature_topic = "sensors/dht11/temperature";
const char* humidity_topic = "sensors/dht11/humidity";
const char* status_topic = "sensors/dht11/status";

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Clientes WiFi y MQTT
WiFiClient espClient;
PubSubClient client(espClient);

// Variables de control
unsigned long lastMsg = 0;
const long interval = 30000; // Publicar cada 30 segundos
int reconnectAttempts = 0;
const int maxReconnectAttempts = 5;

void setup() {
    Serial.begin(115200);
    Serial.println("Iniciando ESP32 DHT11 MQTT Publisher...");
    
    // Inicializar DHT11
    dht.begin();
    
    // Conectar a WiFi
    setup_wifi();
    
    // Configurar cliente MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(callback);
    
    // Mensaje de inicio
    Serial.println("Sistema iniciado correctamente");
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > interval) {
        lastMsg = now;
        readAndPublishSensorData();
    }
    
    delay(100); // Pequeño delay para estabilidad
}

void setup_wifi() {
    delay(10);
    Serial.println();
    Serial.print("Conectando a WiFi: ");
    Serial.println(ssid);
    
    WiFi.begin(ssid, password);
    
    int attempts = 0;
    while (WiFi.status() != WL_CONNECTED && attempts < 20) {
        delay(1000);
        Serial.print(".");
        attempts++;
    }
    
    if (WiFi.status() == WL_CONNECTED) {
        Serial.println("");
        Serial.println("WiFi conectado exitosamente");
        Serial.print("Dirección IP: ");
        Serial.println(WiFi.localIP());
        Serial.print("Intensidad de señal (RSSI): ");
        Serial.print(WiFi.RSSI());
        Serial.println(" dBm");
    } else {
        Serial.println("Error: No se pudo conectar a WiFi");
        ESP.restart();
    }
}

void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Mensaje recibido [");
    Serial.print(topic);
    Serial.print("]: ");
    
    String message;
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    Serial.println(message);
}

void reconnect() {
    while (!client.connected() && reconnectAttempts < maxReconnectAttempts) {
        Serial.print("Intentando conexión MQTT...");
        
        if (client.connect(mqtt_client_id)) {
            Serial.println(" conectado");
            reconnectAttempts = 0;
            
            // Publicar mensaje de estado
            publishStatus("online");
            
            // Suscribirse a topics de control si es necesario
            client.subscribe("sensors/dht11/control");
            
        } else {
            Serial.print(" falló, rc=");
            Serial.print(client.state());
            Serial.println(" reintentando en 5 segundos");
            reconnectAttempts++;
            delay(5000);
        }
    }
    
    if (reconnectAttempts >= maxReconnectAttempts) {
        Serial.println("Máximo número de reintentos MQTT alcanzado. Reiniciando ESP32...");
        ESP.restart();
    }
}

void readAndPublishSensorData() {
    // Leer datos del sensor DHT11
    float humidity = dht.readHumidity();
    float temperature = dht.readTemperature();
    
    // Verificar si las lecturas son válidas
    if (isnan(humidity) || isnan(temperature)) {
        Serial.println("Error: Fallo en la lectura del sensor DHT11");
        publishStatus("sensor_error");
        return;
    }
    
    // Crear payload JSON para temperatura
    StaticJsonDocument<200> tempDoc;
    tempDoc["sensor_id"] = "DHT11_001";
    tempDoc["timestamp"] = millis();
    tempDoc["value"] = temperature;
    tempDoc["unit"] = "°C";
    tempDoc["location"] = "Laboratory_A";
    
    char tempBuffer[256];
    serializeJson(tempDoc, tempBuffer);
    
    // Crear payload JSON para humedad
    StaticJsonDocument<200> humDoc;
    humDoc["sensor_id"] = "DHT11_001";
    humDoc["timestamp"] = millis();
    humDoc["value"] = humidity;
    humDoc["unit"] = "%";
    humDoc["location"] = "Laboratory_A";
    
    char humBuffer[256];
    serializeJson(humDoc, humBuffer);
    
    // Publicar datos
    bool tempPublished = client.publish(temperature_topic, tempBuffer, true);
    bool humPublished = client.publish(humidity_topic, humBuffer, true);
    
    if (tempPublished && humPublished) {
        Serial.printf("Datos publicados - Temperatura: %.1f°C, Humedad: %.1f%%\n", 
                     temperature, humidity);
        publishStatus("active");
    } else {
        Serial.println("Error: Fallo al publicar datos MQTT");
        publishStatus("publish_error");
    }
}

void publishStatus(const char* status) {
    StaticJsonDocument<200> statusDoc;
    statusDoc["device_id"] = mqtt_client_id;
    statusDoc["status"] = status;
    statusDoc["timestamp"] = millis();
    statusDoc["wifi_rssi"] = WiFi.RSSI();
    statusDoc["free_heap"] = ESP.getFreeHeap();
    
    char statusBuffer[256];
    serializeJson(statusDoc, statusBuffer);
    
    client.publish(status_topic, statusBuffer, true);
}

3. Aplicación Flask como Subscriber MQTT

app.py - Servidor Flask con Cliente MQTT


from flask import Flask, render_template, jsonify
from flask_socketio import SocketIO, emit
import paho.mqtt.client as mqtt
import json
import threading
import time
from datetime import datetime
import sqlite3
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key_here'
socketio = SocketIO(app, cors_allowed_origins="*")

# Configuración MQTT
MQTT_BROKER = "localhost"  # Cambiar por IP del broker si está remoto
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60

# Topics MQTT
TEMPERATURE_TOPIC = "sensors/dht11/temperature"
HUMIDITY_TOPIC = "sensors/dht11/humidity"
STATUS_TOPIC = "sensors/dht11/status"

# Variables globales para almacenar datos
latest_data = {
    'temperature': None,
    'humidity': None,
    'status': None,
    'last_update': None
}

# Base de datos SQLite para almacenar histórico
def init_database():
    conn = sqlite3.connect('sensor_data.db')
    cursor = conn.cursor()
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS sensor_readings (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            sensor_type TEXT NOT NULL,
            value REAL NOT NULL,
            unit TEXT NOT NULL,
            sensor_id TEXT,
            location TEXT
        )
    ''')
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS device_status (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            device_id TEXT NOT NULL,
            status TEXT NOT NULL,
            wifi_rssi INTEGER,
            free_heap INTEGER
        )
    ''')
    
    conn.commit()
    conn.close()

def save_sensor_data(sensor_type, data):
    try:
        conn = sqlite3.connect('sensor_data.db')
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO sensor_readings (sensor_type, value, unit, sensor_id, location)
            VALUES (?, ?, ?, ?, ?)
        ''', (sensor_type, data['value'], data['unit'], 
              data.get('sensor_id', 'unknown'), data.get('location', 'unknown')))
        
        conn.commit()
        conn.close()
        print(f"Datos guardados en BD: {sensor_type} = {data['value']}{data['unit']}")
    except Exception as e:
        print(f"Error guardando en BD: {e}")

def save_device_status(status_data):
    try:
        conn = sqlite3.connect('sensor_data.db')
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO device_status (device_id, status, wifi_rssi, free_heap)
            VALUES (?, ?, ?, ?)
        ''', (status_data.get('device_id', 'unknown'), status_data.get('status', 'unknown'),
              status_data.get('wifi_rssi'), status_data.get('free_heap')))
        
        conn.commit()
        conn.close()
    except Exception as e:
        print(f"Error guardando status en BD: {e}")

# Callbacks MQTT
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Conectado al broker MQTT exitosamente")
        client.subscribe(TEMPERATURE_TOPIC)
        client.subscribe(HUMIDITY_TOPIC)
        client.subscribe(STATUS_TOPIC)
        print(f"Suscrito a topics: {TEMPERATURE_TOPIC}, {HUMIDITY_TOPIC}, {STATUS_TOPIC}")
    else:
        print(f"Error conectando a MQTT, código: {rc}")

def on_message(client, userdata, msg):
    try:
        topic = msg.topic
        payload = json.loads(msg.payload.decode())
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        print(f"Mensaje recibido [{topic}]: {payload}")