Introducción
En el desarrollo de sistemas IoT profesionales, la capacidad de escalar desde un prototipo con un solo dispositivo y sensor hacia una arquitectura que maneje cientos o miles de dispositivos es crucial. Esta clase aborda las estrategias y técnicas necesarias para implementar sistemas IoT escalables utilizando ESP32, múltiples sensores DHT11, Mosquitto MQTT y Flask.
Aprenderemos a diseñar arquitecturas que puedan crecer orgánicamente, implementar patrones de load balancing, gestionar la identidad única de dispositivos y manejar eficientemente grandes volúmenes de datos de sensores en tiempo real.
Conceptos Fundamentales
Principios de Escalabilidad en IoT
La escalabilidad en sistemas IoT implica varios desafíos únicos que no encontramos en aplicaciones web tradicionales:
- Escalabilidad Horizontal: Agregar más dispositivos sin degradar el rendimiento
- Gestión de Identidad: Identificación única y gestión de estado de cada dispositivo
- Tolerancia a Fallos: El sistema debe continuar funcionando aunque algunos dispositivos fallen
- Distribución Geográfica: Dispositivos pueden estar distribuidos en diferentes ubicaciones
- Heterogeneidad: Diferentes tipos de sensores y capacidades de dispositivos
Patrones de Arquitectura para Escalabilidad
1. Patrón de Identificación Única
Cada dispositivo debe tener un identificador único que permita:
- Rastreo individual de estado y datos
- Configuración específica por dispositivo
- Mantenimiento y debugging selectivo
2. Patrón de Temas MQTT Jerárquicos
Estructura de temas que escale eficientemente:
edificio/piso/habitacion/dispositivo/sensor/tipo
- Permite suscripciones granulares y wildcards
- Facilita el enrutamiento y filtrado de mensajes
3. Patrón de Load Balancing
- Broker Clustering: Múltiples instancias de Mosquitto
- Sharding por Ubicación: Distribución geográfica de brokers
- Client Load Balancing: Dispositivos se conectan al broker menos cargado
4. Patrón de Gestión de Estado Distribuido
Para manejar el estado de múltiples dispositivos:
- Base de datos time-series para datos de sensores
- Cache distribuido para estado actual
- Event sourcing para historial de comandos
Implementación Práctica
1. ESP32 con Identificación Única y Múltiples Sensores
Implementamos un sistema donde cada ESP32 puede manejar múltiples sensores DHT11 y se identifica de manera única:
Código ESP32 - Múltiples Sensores Escalable
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
#include <Preferences.h>
// Configuración de red
const char* ssid = "MI_RED_WIFI";
const char* password = "MI_PASSWORD";
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
// Configuración de sensores DHT11
#define MAX_SENSORS 4
struct SensorConfig {
int pin;
String location;
DHT* sensor;
};
SensorConfig sensors[MAX_SENSORS] = {
{2, "living_room", nullptr},
{4, "bedroom", nullptr},
{5, "kitchen", nullptr},
{18, "bathroom", nullptr}
};
WiFiClient espClient;
PubSubClient client(espClient);
Preferences preferences;
// Variables de identificación única
String deviceId;
String buildingId;
String floorId;
// Variables de control
unsigned long lastSensorRead = 0;
const unsigned long sensorInterval = 30000; // 30 segundos
unsigned long lastHeartbeat = 0;
const unsigned long heartbeatInterval = 60000; // 1 minuto
// Configuración de retry y reconexión
int reconnectAttempts = 0;
const int maxReconnectAttempts = 5;
unsigned long lastReconnectAttempt = 0;
const unsigned long reconnectInterval = 5000;
void setup() {
Serial.begin(115200);
// Inicializar preferencias para almacenamiento persistente
preferences.begin("iot_device", false);
// Generar o recuperar ID único del dispositivo
setupDeviceIdentity();
// Inicializar sensores DHT11
setupSensors();
// Conectar WiFi
setupWiFi();
// Configurar MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(onMqttMessage);
// Conectar MQTT
connectMQTT();
Serial.println("Dispositivo inicializado correctamente");
Serial.println("Device ID: " + deviceId);
}
void setupDeviceIdentity() {
// Generar ID único basado en MAC address si no existe
deviceId = preferences.getString("deviceId", "");
if (deviceId.length() == 0) {
uint8_t mac[6];
WiFi.macAddress(mac);
deviceId = "ESP32_" + String(mac[3], HEX) + String(mac[4], HEX) + String(mac[5], HEX);
preferences.putString("deviceId", deviceId);
}
// Configuración de ubicación (puede venir de configuración web)
buildingId = preferences.getString("buildingId", "building_01");
floorId = preferences.getString("floorId", "floor_02");
}
void setupSensors() {
Serial.println("Inicializando sensores DHT11...");
for (int i = 0; i < MAX_SENSORS; i++) {
sensors[i].sensor = new DHT(sensors[i].pin, DHT11);
sensors[i].sensor->begin();
Serial.println("Sensor " + String(i) + " en pin " +
String(sensors[i].pin) + " - Ubicación: " + sensors[i].location);
delay(100);
}
}
void setupWiFi() {
delay(10);
Serial.println("Conectando a WiFi...");
WiFi.begin(ssid, password);
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 20) {
delay(500);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("");
Serial.println("WiFi conectado");
Serial.println("IP address: " + WiFi.localIP().toString());
} else {
Serial.println("Error: No se pudo conectar a WiFi");
ESP.restart();
}
}
void connectMQTT() {
while (!client.connected() && reconnectAttempts < maxReconnectAttempts) {
Serial.print("Intentando conexión MQTT...");
// Crear cliente ID único
String clientId = deviceId + "_" + String(random(0xffff), HEX);
// Last Will Testament para detectar desconexiones
String lwt_topic = buildingId + "/" + floorId + "/" + deviceId + "/status";
if (client.connect(clientId.c_str(), lwt_topic.c_str(), 1, true, "offline")) {
Serial.println(" conectado");
// Suscribirse a comandos para este dispositivo
String command_topic = buildingId + "/" + floorId + "/" + deviceId + "/command/+";
client.subscribe(command_topic.c_str());
// Publicar estado online
client.publish(lwt_topic.c_str(), "online", true);
// Publicar configuración del dispositivo
publishDeviceConfig();
reconnectAttempts = 0;
} else {
Serial.print(" falló, rc=");
Serial.print(client.state());
Serial.println(" reintentando en 5 segundos");
reconnectAttempts++;
delay(5000);
}
}
if (reconnectAttempts >= maxReconnectAttempts) {
Serial.println("Máximo número de intentos MQTT alcanzado. Reiniciando...");
ESP.restart();
}
}
void publishDeviceConfig() {
StaticJsonDocument<512> doc;
doc["deviceId"] = deviceId;
doc["buildingId"] = buildingId;
doc["floorId"] = floorId;
doc["firmware_version"] = "1.0.0";
doc["ip_address"] = WiFi.localIP().toString();
doc["mac_address"] = WiFi.macAddress();
JsonArray sensor_array = doc.createNestedArray("sensors");
for (int i = 0; i < MAX_SENSORS; i++) {
JsonObject sensor = sensor_array.createNestedObject();
sensor["id"] = i;
sensor["type"] = "DHT11";
sensor["location"] = sensors[i].location;
sensor["pin"] = sensors[i].pin;
}
String config_json;
serializeJson(doc, config_json);
String topic = buildingId + "/" + floorId + "/" + deviceId + "/config";
client.publish(topic.c_str(), config_json.c_str(), true);
}
void readAndPublishSensors() {
for (int i = 0; i < MAX_SENSORS; i++) {
float humidity = sensors[i].sensor->readHumidity();
float temperature = sensors[i].sensor->readTemperature();
if (!isnan(humidity) && !isnan(temperature)) {
// Crear JSON con datos del sensor
StaticJsonDocument<200> doc;
doc["deviceId"] = deviceId;
doc["sensorId"] = i;
doc["location"] = sensors[i].location;
doc["temperature"] = round(temperature * 100.0) / 100.0;
doc["humidity"] = round(humidity * 100.0) / 100.0;
doc["timestamp"] = millis();
doc["rssi"] = WiFi.RSSI();
String sensor_json;
serializeJson(doc, sensor_json);
// Publicar en tópico jerárquico
String topic = buildingId + "/" + floorId + "/" + deviceId +
"/sensor/" + sensors[i].location + "/data";
if (client.publish(topic.c_str(), sensor_json.c_str())) {
Serial.println("Sensor " + String(i) + " (" + sensors[i].location +
"): T=" + String(temperature) + "°C, H=" + String(humidity) + "%");
} else {
Serial.println("Error publicando datos del sensor " + String(i));
}
} else {
Serial.println("Error leyendo sensor " + String(i) + " en " + sensors[i].location);
}
delay(100); // Pequeña pausa entre lecturas
}
}
void publishHeartbeat() {
StaticJsonDocument<200> doc;
doc["deviceId"] = deviceId;
doc["uptime"] = millis();
doc["free_heap"] = ESP.getFreeHeap();
doc["wifi_rssi"] = WiFi.RSSI();
doc["active_sensors"] = MAX_SENSORS;
String heartbeat_json;
serializeJson(doc, heartbeat_json);
String topic = buildingId + "/" + floorId + "/" + deviceId + "/heartbeat";
client.publish(topic.c_str(), heartbeat_json.c_str());
}
void onMqttMessage(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println("Mensaje recibido [" + String(topic) + "]: " + message);
// Parsear comando
StaticJsonDocument<200> doc;
deserializeJson(doc, message);
String command = doc["command"];
if (command == "restart") {
Serial.println("Reiniciando dispositivo por comando remoto");
ESP.restart();
} else if (command == "set_interval") {
int new_interval = doc["interval"];
if (new_interval >= 5000) { // Mínimo 5 segundos
preferences.putUInt("sensorInterval", new_interval);
Serial.println("Nuevo intervalo configurado: " + String(new_interval) + "ms");
}
}
}
void loop() {
// Mantener conexión MQTT
if (!client.connected()) {
connectMQTT();
}
client.loop();
unsigned long currentMillis = millis();
// Leer y publicar datos de sensores
if (currentMillis - lastSensorRead >= sensorInterval) {
readAndPublishSensors();
lastSensorRead = currentMillis;
}
// Publicar heartbeat
if (currentMill