Estrategias para escalar: múltiples dispositivos y múltiples sensores

Arquitecturas escalables, load balancing, gestión de múltiples dispositivos

Módulo 6 ⏱️ 2 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En el desarrollo de sistemas IoT profesionales, la capacidad de escalar desde un prototipo con un solo dispositivo y sensor hacia una arquitectura que maneje cientos o miles de dispositivos es crucial. Esta clase aborda las estrategias y técnicas necesarias para implementar sistemas IoT escalables utilizando ESP32, múltiples sensores DHT11, Mosquitto MQTT y Flask.

Aprenderemos a diseñar arquitecturas que puedan crecer orgánicamente, implementar patrones de load balancing, gestionar la identidad única de dispositivos y manejar eficientemente grandes volúmenes de datos de sensores en tiempo real.

Objetivo de aprendizaje: Al finalizar esta clase, serás capaz de diseñar e implementar sistemas IoT escalables que manejen múltiples dispositivos ESP32 con sensores DHT11, utilizando patrones de arquitectura profesionales.

Conceptos Fundamentales

Principios de Escalabilidad en IoT

La escalabilidad en sistemas IoT implica varios desafíos únicos que no encontramos en aplicaciones web tradicionales:

  • Escalabilidad Horizontal: Agregar más dispositivos sin degradar el rendimiento
  • Gestión de Identidad: Identificación única y gestión de estado de cada dispositivo
  • Tolerancia a Fallos: El sistema debe continuar funcionando aunque algunos dispositivos fallen
  • Distribución Geográfica: Dispositivos pueden estar distribuidos en diferentes ubicaciones
  • Heterogeneidad: Diferentes tipos de sensores y capacidades de dispositivos

Patrones de Arquitectura para Escalabilidad

1. Patrón de Identificación Única

Cada dispositivo debe tener un identificador único que permita:

  • Rastreo individual de estado y datos
  • Configuración específica por dispositivo
  • Mantenimiento y debugging selectivo

2. Patrón de Temas MQTT Jerárquicos

Estructura de temas que escale eficientemente:

  • edificio/piso/habitacion/dispositivo/sensor/tipo
  • Permite suscripciones granulares y wildcards
  • Facilita el enrutamiento y filtrado de mensajes

3. Patrón de Load Balancing

  • Broker Clustering: Múltiples instancias de Mosquitto
  • Sharding por Ubicación: Distribución geográfica de brokers
  • Client Load Balancing: Dispositivos se conectan al broker menos cargado

4. Patrón de Gestión de Estado Distribuido

Para manejar el estado de múltiples dispositivos:

  • Base de datos time-series para datos de sensores
  • Cache distribuido para estado actual
  • Event sourcing para historial de comandos

Implementación Práctica

1. ESP32 con Identificación Única y Múltiples Sensores

Implementamos un sistema donde cada ESP32 puede manejar múltiples sensores DHT11 y se identifica de manera única:

Código ESP32 - Múltiples Sensores Escalable


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
#include <Preferences.h>

// Configuración de red
const char* ssid = "MI_RED_WIFI";
const char* password = "MI_PASSWORD";
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;

// Configuración de sensores DHT11
#define MAX_SENSORS 4
struct SensorConfig {
  int pin;
  String location;
  DHT* sensor;
};

SensorConfig sensors[MAX_SENSORS] = {
  {2, "living_room", nullptr},
  {4, "bedroom", nullptr},
  {5, "kitchen", nullptr},
  {18, "bathroom", nullptr}
};

WiFiClient espClient;
PubSubClient client(espClient);
Preferences preferences;

// Variables de identificación única
String deviceId;
String buildingId;
String floorId;

// Variables de control
unsigned long lastSensorRead = 0;
const unsigned long sensorInterval = 30000; // 30 segundos
unsigned long lastHeartbeat = 0;
const unsigned long heartbeatInterval = 60000; // 1 minuto

// Configuración de retry y reconexión
int reconnectAttempts = 0;
const int maxReconnectAttempts = 5;
unsigned long lastReconnectAttempt = 0;
const unsigned long reconnectInterval = 5000;

void setup() {
  Serial.begin(115200);
  
  // Inicializar preferencias para almacenamiento persistente
  preferences.begin("iot_device", false);
  
  // Generar o recuperar ID único del dispositivo
  setupDeviceIdentity();
  
  // Inicializar sensores DHT11
  setupSensors();
  
  // Conectar WiFi
  setupWiFi();
  
  // Configurar MQTT
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(onMqttMessage);
  
  // Conectar MQTT
  connectMQTT();
  
  Serial.println("Dispositivo inicializado correctamente");
  Serial.println("Device ID: " + deviceId);
}

void setupDeviceIdentity() {
  // Generar ID único basado en MAC address si no existe
  deviceId = preferences.getString("deviceId", "");
  
  if (deviceId.length() == 0) {
    uint8_t mac[6];
    WiFi.macAddress(mac);
    deviceId = "ESP32_" + String(mac[3], HEX) + String(mac[4], HEX) + String(mac[5], HEX);
    preferences.putString("deviceId", deviceId);
  }
  
  // Configuración de ubicación (puede venir de configuración web)
  buildingId = preferences.getString("buildingId", "building_01");
  floorId = preferences.getString("floorId", "floor_02");
}

void setupSensors() {
  Serial.println("Inicializando sensores DHT11...");
  
  for (int i = 0; i < MAX_SENSORS; i++) {
    sensors[i].sensor = new DHT(sensors[i].pin, DHT11);
    sensors[i].sensor->begin();
    
    Serial.println("Sensor " + String(i) + " en pin " + 
                   String(sensors[i].pin) + " - Ubicación: " + sensors[i].location);
    delay(100);
  }
}

void setupWiFi() {
  delay(10);
  Serial.println("Conectando a WiFi...");
  WiFi.begin(ssid, password);
  
  int attempts = 0;
  while (WiFi.status() != WL_CONNECTED && attempts < 20) {
    delay(500);
    Serial.print(".");
    attempts++;
  }
  
  if (WiFi.status() == WL_CONNECTED) {
    Serial.println("");
    Serial.println("WiFi conectado");
    Serial.println("IP address: " + WiFi.localIP().toString());
  } else {
    Serial.println("Error: No se pudo conectar a WiFi");
    ESP.restart();
  }
}

void connectMQTT() {
  while (!client.connected() && reconnectAttempts < maxReconnectAttempts) {
    Serial.print("Intentando conexión MQTT...");
    
    // Crear cliente ID único
    String clientId = deviceId + "_" + String(random(0xffff), HEX);
    
    // Last Will Testament para detectar desconexiones
    String lwt_topic = buildingId + "/" + floorId + "/" + deviceId + "/status";
    
    if (client.connect(clientId.c_str(), lwt_topic.c_str(), 1, true, "offline")) {
      Serial.println(" conectado");
      
      // Suscribirse a comandos para este dispositivo
      String command_topic = buildingId + "/" + floorId + "/" + deviceId + "/command/+";
      client.subscribe(command_topic.c_str());
      
      // Publicar estado online
      client.publish(lwt_topic.c_str(), "online", true);
      
      // Publicar configuración del dispositivo
      publishDeviceConfig();
      
      reconnectAttempts = 0;
    } else {
      Serial.print(" falló, rc=");
      Serial.print(client.state());
      Serial.println(" reintentando en 5 segundos");
      
      reconnectAttempts++;
      delay(5000);
    }
  }
  
  if (reconnectAttempts >= maxReconnectAttempts) {
    Serial.println("Máximo número de intentos MQTT alcanzado. Reiniciando...");
    ESP.restart();
  }
}

void publishDeviceConfig() {
  StaticJsonDocument<512> doc;
  doc["deviceId"] = deviceId;
  doc["buildingId"] = buildingId;
  doc["floorId"] = floorId;
  doc["firmware_version"] = "1.0.0";
  doc["ip_address"] = WiFi.localIP().toString();
  doc["mac_address"] = WiFi.macAddress();
  
  JsonArray sensor_array = doc.createNestedArray("sensors");
  for (int i = 0; i < MAX_SENSORS; i++) {
    JsonObject sensor = sensor_array.createNestedObject();
    sensor["id"] = i;
    sensor["type"] = "DHT11";
    sensor["location"] = sensors[i].location;
    sensor["pin"] = sensors[i].pin;
  }
  
  String config_json;
  serializeJson(doc, config_json);
  
  String topic = buildingId + "/" + floorId + "/" + deviceId + "/config";
  client.publish(topic.c_str(), config_json.c_str(), true);
}

void readAndPublishSensors() {
  for (int i = 0; i < MAX_SENSORS; i++) {
    float humidity = sensors[i].sensor->readHumidity();
    float temperature = sensors[i].sensor->readTemperature();
    
    if (!isnan(humidity) && !isnan(temperature)) {
      // Crear JSON con datos del sensor
      StaticJsonDocument<200> doc;
      doc["deviceId"] = deviceId;
      doc["sensorId"] = i;
      doc["location"] = sensors[i].location;
      doc["temperature"] = round(temperature * 100.0) / 100.0;
      doc["humidity"] = round(humidity * 100.0) / 100.0;
      doc["timestamp"] = millis();
      doc["rssi"] = WiFi.RSSI();
      
      String sensor_json;
      serializeJson(doc, sensor_json);
      
      // Publicar en tópico jerárquico
      String topic = buildingId + "/" + floorId + "/" + deviceId + 
                    "/sensor/" + sensors[i].location + "/data";
      
      if (client.publish(topic.c_str(), sensor_json.c_str())) {
        Serial.println("Sensor " + String(i) + " (" + sensors[i].location + 
                      "): T=" + String(temperature) + "°C, H=" + String(humidity) + "%");
      } else {
        Serial.println("Error publicando datos del sensor " + String(i));
      }
    } else {
      Serial.println("Error leyendo sensor " + String(i) + " en " + sensors[i].location);
    }
    
    delay(100); // Pequeña pausa entre lecturas
  }
}

void publishHeartbeat() {
  StaticJsonDocument<200> doc;
  doc["deviceId"] = deviceId;
  doc["uptime"] = millis();
  doc["free_heap"] = ESP.getFreeHeap();
  doc["wifi_rssi"] = WiFi.RSSI();
  doc["active_sensors"] = MAX_SENSORS;
  
  String heartbeat_json;
  serializeJson(doc, heartbeat_json);
  
  String topic = buildingId + "/" + floorId + "/" + deviceId + "/heartbeat";
  client.publish(topic.c_str(), heartbeat_json.c_str());
}

void onMqttMessage(char* topic, byte* payload, unsigned int length) {
  String message = "";
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  
  Serial.println("Mensaje recibido [" + String(topic) + "]: " + message);
  
  // Parsear comando
  StaticJsonDocument<200> doc;
  deserializeJson(doc, message);
  
  String command = doc["command"];
  
  if (command == "restart") {
    Serial.println("Reiniciando dispositivo por comando remoto");
    ESP.restart();
  } else if (command == "set_interval") {
    int new_interval = doc["interval"];
    if (new_interval >= 5000) { // Mínimo 5 segundos
      preferences.putUInt("sensorInterval", new_interval);
      Serial.println("Nuevo intervalo configurado: " + String(new_interval) + "ms");
    }
  }
}

void loop() {
  // Mantener conexión MQTT
  if (!client.connected()) {
    connectMQTT();
  }
  client.loop();
  
  unsigned long currentMillis = millis();
  
  // Leer y publicar datos de sensores
  if (currentMillis - lastSensorRead >= sensorInterval) {
    readAndPublishSensors();
    lastSensorRead = currentMillis;
  }
  
  // Publicar heartbeat
  if (currentMill