Introducción Teórica
La gestión de datos en la nube representa el paradigma fundamental para el desarrollo de sistemas IoT escalables y eficientes. En el contexto de la mecatrónica moderna, la capacidad de almacenar, procesar y analizar grandes volúmenes de datos generados por dispositivos como el ESP32 es crucial para la toma de decisiones inteligentes y la optimización de procesos industriales.
Los sistemas de cloud computing ofrecen ventajas significativas: escalabilidad automática, alta disponibilidad, procesamiento distribuido, y acceso global a los datos. Para aplicaciones de mecatrónica e IoT, esto se traduce en la capacidad de monitorear equipos en tiempo real, predecir fallas, optimizar mantenimiento y mejorar la eficiencia operativa.
Proveedores Cloud
- AWS IoT Core: Plataforma IoT completa
- Google Cloud IoT: Machine Learning integrado
- Microsoft Azure: Soluciones industriales
- Firebase: Desarrollo rápido
Tipos de Datos IoT
- Telemetría: Sensores en tiempo real
- Comandos: Control remoto
- Metadatos: Configuración dispositivos
- Eventos: Alertas y notificaciones
Beneficios Clave
- Escalabilidad: Millones de dispositivos
- Disponibilidad: 99.9% uptime
- Seguridad: Cifrado extremo a extremo
- Analytics: Insights automáticos
Explicación Técnica Detallada
La arquitectura cloud para IoT se basa en varios componentes fundamentales que trabajan de manera integrada para proporcionar una solución completa de gestión de datos.
Arquitectura IoT Cloud
Capas principales:
- Edge Layer: Dispositivos ESP32
- Gateway Layer: Concentradores de datos
- Network Layer: Conectividad (WiFi, MQTT)
- Data Processing: ETL y análisis
- Application Layer: Dashboards y APIs
Protocolos de Datos
Formatos y protocolos:
- JSON: Intercambio ligero
- MQTT: Pub/Sub eficiente
- HTTP/HTTPS: APIs REST
- CoAP: Dispositivos limitados
- WebSocket: Tiempo real
Conexión básica ESP32 a AWS IoT
#include
#include
#include
#include
#include
// Configuración WiFi
const char* ssid = "tu_wifi";
const char* password = "tu_password";
// Configuración AWS IoT
const char* aws_endpoint = "your-endpoint.iot.region.amazonaws.com";
const char* aws_topic = "esp32/telemetry";
const char* aws_topic_sub = "esp32/commands";
// Certificados (almacenar en PROGMEM para ESP32)
static const char aws_root_ca_pem[] PROGMEM = R"EOF(
-----BEGIN CERTIFICATE-----
// Tu certificado raíz aquí
-----END CERTIFICATE-----
)EOF";
static const char certificate_pem_crt[] PROGMEM = R"KEY(
-----BEGIN CERTIFICATE-----
// Tu certificado de dispositivo aquí
-----END CERTIFICATE-----
)KEY";
static const char private_pem_key[] PROGMEM = R"KEY(
-----BEGIN PRIVATE KEY-----
// Tu clave privada aquí
-----END PRIVATE KEY-----
)KEY";
// Objetos
WiFiClientSecure net;
PubSubClient client(net);
DHT dht(4, DHT22);
// Variables
float temperature = 0;
float humidity = 0;
unsigned long lastMsg = 0;
const long interval = 5000; // 5 segundos
void setup() {
Serial.begin(115200);
// Inicializar sensor
dht.begin();
// Configurar WiFi
connectToWiFi();
// Configurar AWS IoT
configureAWS();
Serial.println("Sistema iniciado correctamente");
}
void loop() {
if (!client.connected()) {
connectToAWS();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > interval) {
lastMsg = now;
// Leer sensores
readSensors();
// Enviar telemetría
sendTelemetry();
}
delay(100);
}
void connectToWiFi() {
WiFi.begin(ssid, password);
Serial.print("Conectando a WiFi");
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi conectado!");
Serial.print("Dirección IP: ");
Serial.println(WiFi.localIP());
}
void configureAWS() {
// Configurar certificados
net.setCACert(aws_root_ca_pem);
net.setCertificate(certificate_pem_crt);
net.setPrivateKey(private_pem_key);
// Configurar cliente MQTT
client.setServer(aws_endpoint, 8883);
client.setCallback(messageReceived);
Serial.println("AWS IoT configurado");
}
void connectToAWS() {
while (!client.connected()) {
Serial.print("Conectando a AWS IoT...");
if (client.connect("ESP32Client")) {
Serial.println(" conectado!");
// Suscribirse a tópicos
client.subscribe(aws_topic_sub);
Serial.println("Suscrito a: " + String(aws_topic_sub));
} else {
Serial.print(" falló, rc=");
Serial.print(client.state());
Serial.println(" reintentando en 5 segundos");
delay(5000);
}
}
}
void readSensors() {
temperature = dht.readTemperature();
humidity = dht.readHumidity();
// Verificar si las lecturas son válidas
if (isnan(temperature) || isnan(humidity)) {
Serial.println("Error leyendo sensor DHT!");
return;
}
Serial.printf("Temp: %.2f°C, Hum: %.2f%%\n", temperature, humidity);
}
void sendTelemetry() {
// Crear payload JSON
StaticJsonDocument<300> telemetry;
telemetry["timestamp"] = millis();
telemetry["device_id"] = "ESP32_001";
telemetry["temperature"] = temperature;
telemetry["humidity"] = humidity;
telemetry["rssi"] = WiFi.RSSI();
telemetry["free_heap"] = ESP.getFreeHeap();
// Agregar geolocalización (ejemplo)
JsonObject location = telemetry.createNestedObject("location");
location["latitude"] = 19.4326;
location["longitude"] = -99.1332;
// Convertir a string
char buffer[512];
serializeJson(telemetry, buffer);
// Publicar a AWS IoT
if (client.publish(aws_topic, buffer)) {
Serial.println("Telemetría enviada: " + String(buffer));
} else {
Serial.println("Error enviando telemetría");
}
}
void messageReceived(char* topic, byte* payload, unsigned int length) {
Serial.println("Mensaje recibido en tópico: " + String(topic));
// Convertir payload a string
String message = "";
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println("Mensaje: " + message);
// Procesar comandos
StaticJsonDocument<200> doc;
deserializeJson(doc, message);
if (doc["command"] == "restart") {
Serial.println("Reiniciando dispositivo...");
delay(1000);
ESP.restart();
} else if (doc["command"] == "status") {
sendTelemetry(); // Enviar estado actual
}
}
Almacenamiento y Base de Datos
Bases de Datos SQL
Casos de uso:
- Relaciones complejas entre datos
- Transacciones ACID requeridas
- Consultas complejas con JOINs
- Reporting y analytics tradicional
Ejemplos: PostgreSQL, MySQL, SQL Server
Bases de Datos NoSQL
Casos de uso:
- Escalamiento horizontal masivo
- Datos no estructurados o semi-estructurados
- Alto throughput de escritura
- Flexibilidad de esquema
Ejemplos: DynamoDB, MongoDB, Cassandra
Ejercicios Prácticos Visuales
Objetivo: Configurar ESP32 para enviar datos a Firebase Realtime Database.
Materiales:
- ESP32 DevKit
- Cuenta de Firebase
- Sensor DHT22
- Resistencias y cables
Pasos:
- Crear proyecto en Firebase
- Configurar Realtime Database
- Obtener credenciales API
- Implementar código ESP32
Código Firebase:
#include
#include
#include
// Firebase config
#define FIREBASE_HOST "tu-proyecto.firebaseio.com"
#define FIREBASE_AUTH "tu-secret-key"
// WiFi credentials
const char* ssid = "tu_wifi";
const char* password = "tu_password";
// Firebase objects
FirebaseData firebaseData;
FirebaseConfig config;
FirebaseAuth auth;
// Sensor
DHT dht(4, DHT22);
void setup() {
Serial.begin(115200);
dht.begin();
// Connect WiFi
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
// Firebase config
config.host = FIREBASE_HOST;
config.signer.tokens.legacy_token = FIREBASE_AUTH;
Firebase.begin(&config, &auth);
Firebase.reconnectWiFi(true);
Serial.println("Sistema iniciado");
}
void loop() {
float temp = dht.readTemperature();
float hum = dht.readHumidity();
if (!isnan(temp) && !isnan(hum)) {
String path = "/sensors/" + String(millis());
FirebaseJson json;
json.set("temperature", temp);
json.set("humidity", hum);
json.set("timestamp", millis());
if (Firebase.setJSON(firebaseData, path, json)) {
Serial.println("Datos enviados: T=" + String(temp) + "°C, H=" + String(hum) + "%");
} else {
Serial.println("Error: " + firebaseData.errorReason());
}
}
delay(10000); // Enviar cada 10 segundos
}
Objetivo: Crear dashboard web que muestre datos de sensores en tiempo real.
Materiales:
- ESP32 con sensores
- Servidor web (Node.js/Python)
- Base de datos (MongoDB/Firebase)
- Framework frontend (React/Vue)
Características:
- Gráficas en tiempo real
- Alertas automáticas
- Histórico de datos
- Control remoto de actuadores
Backend Node.js con Socket.IO:
const express = require('express');
const http = require('http');
const socketIO = require('socket.io');
const mqtt = require('mqtt');
const mongoose = require('mongoose');
const app = express();
const server = http.createServer(app);
const io = socketIO(server);
// MongoDB connection
mongoose.connect('mongodb://localhost/iot_data');
// Data schema
const SensorData = mongoose.model('SensorData', {
deviceId: String,
temperature: Number,
humidity: Number,
timestamp: { type: Date, default: Date.now }
});
// MQTT client
const mqttClient = mqtt.connect('mqtt://localhost:1883');
mqttClient.on('connect', () => {
console.log('MQTT connected');
mqttClient.subscribe('esp32/telemetry');
});
mqttClient.on('message', async (topic, message) => {
try {
const data = JSON.parse(message.toString());
// Save to database
const sensorData = new SensorData({
deviceId: data.device_id,
temperature: data.temperature,
humidity: data.humidity
});
await sensorData.save();
// Emit to connected clients
io.emit('sensorData', data);
console.log('Data processed:', data);
} catch (error) {
console.error('Error processing message:', error);
}
});
// REST API endpoints
app.use(express.json());
app.use(express.static('public'));
app.get('/api/sensors/latest', async (req, res) => {
try {
const latestData = await SensorData.find()
.sort({ timestamp: -1 })
.limit(10);
res.json(latestData);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.get('/api/sensors/history/:hours', async (req, res) => {
try {
const hours = parseInt(req.params.hours);
const since = new Date(Date.now() - (hours * 60 * 60 * 1000));
const data = await SensorData.find({
timestamp: { $gte: since }
}).sort({ timestamp: 1 });
res.json(data);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// Socket.IO connection handling
io.on('connection', (socket) => {
console.log('Client connected');
socket.on('disconnect', () => {
console.log('Client disconnected');
});
// Send command to device
socket.on('sendCommand', (command) => {
mqttClient.publish('esp32/commands', JSON.stringify(command));
});
});
server.listen(3000, () => {
console.log('Server running on port 3000');
});
Objetivo: Implementar análisis predictivo de datos de sensores.
Tecnologías:
- Python con scikit-learn
- TensorFlow/PyTorch
- Apache Spark
- Jupyter Notebooks
Casos de uso:
- Predicción de fallos
- Detección de anomalías
- Optimización energética
- Mantenimiento predictivo
Script Python para Análisis:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
class IoTAnalytics:
def __init__(self):
self.scaler = StandardScaler()
self.anomaly_detector = IsolationForest(
contamination=0.1,
random_state=42
)
def load_data(self, source='mongodb'):
"""Cargar datos desde diferentes fuentes"""
if source == 'mongodb':
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client.iot_data
data = list(db.sensordata.find())
df = pd.DataFrame(data)
df['timestamp'] = pd.to_datetime(df['timestamp'])
elif source == 'csv':
df = pd.read_csv('sensor_data.csv')
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
def preprocess_data(self, df):
"""Preprocesar datos para análisis"""
# Remover valores faltantes
df = df.dropna()
# Crear características temporales
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['month'] = df['timestamp'].dt.month
# Calcular estadísticas móviles
df['temp_rolling_mean'] = df['temperature'].rolling(window=10).mean()
df['hum_rolling_std'] = df['humidity'].rolling(window=10).std()
# Detectar cambios bruscos
df['temp_diff'] = df['temperature'].diff()
df['hum_diff'] = df['humidity'].diff()
return df
def detect_anomalies(self, df):
"""Detectar anomalías en los datos"""
features = ['temperature', 'humidity', 'temp_diff', 'hum_diff']
# Preparar datos
X = df[features].dropna()
X_scaled = self.scaler.fit_transform(X)
# Detectar anomalías
anomalies = self.anomaly_detector.fit_predict(X_scaled)
# Agregar resultados al dataframe
df_clean = df.dropna(subset=features)
df_clean['anomaly'] = anomalies
df_clean['anomaly_score'] = self.anomaly_detector.score_samples(X_scaled)
return df_clean
def predict_maintenance(self, df):
"""Predecir necesidad de mantenimiento"""
# Simular score de salud del dispositivo
df['health_score'] = 100 - (
abs(df['temperature'] - df['temperature'].mean()) * 2 +
abs(df['humidity'] - df['humidity'].mean()) * 1.5 +
abs(df['temp_diff'].fillna(0)) * 5
)
# Predecir mantenimiento cuando health_score < 70
df['needs_maintenance'] = df['health_score'] < 70
return df
def generate_insights(self, df):
"""Generar insights automáticos"""
insights = {
'total_records': len(df),
'date_range': {
'start': df['timestamp'].min(),
'end': df['timestamp'].max()
},
'temperature_stats': {
'mean': df['temperature'].mean(),
'min': df['temperature'].min(),
'max': df['temperature'].max(),
'std': df['temperature'].std()
},
'humidity_stats': {
'mean': df['humidity'].mean(),
'min': df['humidity'].min(),
'max': df['humidity'].max(),
'std': df['humidity'].std()
},
'anomalies': {
'total': len(df[df['anomaly'] == -1]),
'percentage': len(df[df['anomaly'] == -1]) / len(df) * 100
},
'maintenance_alerts': len(df[df['needs_maintenance'] == True])
}
return insights
def create_dashboard_data(self, df):
"""Preparar datos para dashboard"""
# Agrupar por hora para gráficas
hourly = df.set_index('timestamp').resample('1H').agg({
'temperature': ['mean', 'min', 'max'],
'humidity': ['mean', 'min', 'max'],
'anomaly': lambda x: (x == -1).sum(),
'needs_maintenance': 'any'
}).round(2)
# Convertir a formato JSON para frontend
dashboard_data = {
'timeseries': hourly.to_dict('records'),
'timestamps': hourly.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
'alerts': df[df['anomaly'] == -1][['timestamp', 'temperature', 'humidity']].to_dict('records'),
'maintenance': df[df['needs_maintenance']][['timestamp', 'health_score']].to_dict('records')
}
return dashboard_data
# Ejemplo de uso
if __name__ == "__main__":
analytics = IoTAnalytics()
# Cargar y procesar datos
df = analytics.load_data()
df = analytics.preprocess_data(df)
df = analytics.detect_anomalies(df)
df = analytics.predict_maintenance(df)
# Generar insights
insights = analytics.generate_insights(df)
print("Insights generados:", insights)
# Crear datos para dashboard
dashboard_data = analytics.create_dashboard_data(df)
# Visualización
plt.figure(figsize=(15, 10))
plt.subplot(2, 2, 1)
plt.plot(df['timestamp'], df['temperature'])
plt.scatter(df[df['anomaly'] == -1]['timestamp'],
df[df['anomaly'] == -1]['temperature'],
color='red', label='Anomalías')
plt.title('Temperatura vs Tiempo')
plt.xlabel('Tiempo')
plt.ylabel('Temperatura (°C)')
plt.legend()
plt.subplot(2, 2, 2)
plt.plot(df['timestamp'], df['humidity'])
plt.scatter(df[df['anomaly'] == -1]['timestamp'],
df[df['anomaly'] == -1]['humidity'],
color='red', label='Anomalías')
plt.title('Humedad vs Tiempo')
plt.xlabel('Tiempo')
plt.ylabel('Humedad (%)')
plt.legend()
plt.subplot(2, 2, 3)
plt.plot(df['timestamp'], df['health_score'])
plt.axhline(y=70, color='r', linestyle='--', label='Umbral Mantenimiento')
plt.title('Score de Salud del Dispositivo')
plt.xlabel('Tiempo')
plt.ylabel('Score de Salud')
plt.legend()
plt.subplot(2, 2, 4)
hourly_anomalies = df.set_index('timestamp').resample('1H')['anomaly'].apply(lambda x: (x == -1).sum())
plt.bar(range(len(hourly_anomalies)), hourly_anomalies.values)
plt.title('Anomalías por Hora')
plt.xlabel('Hora')
plt.ylabel('Número de Anomalías')
plt.tight_layout()
plt.show()
Objetivo: Implementar almacenamiento local como backup cuando no hay conectividad.
Características:
- Buffer circular en SPIFFS
- Sincronización automática
- Compresión de datos
- Queue de prioridades
ESP32 Edge Storage:
#include
#include
#include
#include
class EdgeDataManager {
private:
String bufferFile = "/data_buffer.json";
String configFile = "/config.json";
int maxBufferSize = 1000;
public:
bool initStorage() {
if (!SPIFFS.begin(true)) {
Serial.println("Error inicializando SPIFFS");
return false;
}
return true;
}
bool storeDataLocally(const JsonObject& data) {
File file = SPIFFS.open(bufferFile, FILE_APPEND);
if (!file) {
Serial.println("Error abriendo buffer file");
return false;
}
String jsonString;
serializeJson(data, jsonString);
file.println(jsonString);
file.close();
// Verificar tamaño del buffer
if (getBufferSize() > maxBufferSize) {
compactBuffer();
}
return true;
}
int syncToCloud() {
if (WiFi.status() != WL_CONNECTED) {
return 0; // No connectivity
}
File file = SPIFFS.open(bufferFile, FILE_READ);
if (!file) {
return 0; // No data to sync
}
int syncCount = 0;
String tempFile = "/temp_buffer.json";
File tempBuffer = SPIFFS.open(tempFile, FILE_WRITE);
while (file.available()) {
String line = file.readStringUntil('\n');
line.trim();
if (line.length() > 0) {
if (sendToCloud(line)) {
syncCount++;
} else {
// Failed to send, keep in buffer
tempBuffer.println(line);
}
}
}
file.close();
tempBuffer.close();
// Replace buffer with remaining data
SPIFFS.remove(bufferFile);
SPIFFS.rename(tempFile, bufferFile);
return syncCount;
}
private:
bool sendToCloud(const String& jsonData) {
HTTPClient http;
http.begin("https://api.tu-servicio.com/data");
http.addHeader("Content-Type", "application/json");
int httpResponseCode = http.POST(jsonData);
http.end();
return (httpResponseCode == 200);
}
int getBufferSize() {
File file = SPIFFS.open(bufferFile, FILE_READ);
if (!file) return 0;
int count = 0;
while (file.available()) {
file.readStringUntil('\n');
count++;
}
file.close();
return count;
}
void compactBuffer() {
// Keep only the most recent 70% of data
File file = SPIFFS.open(bufferFile, FILE_READ);
if (!file) return;
int totalLines = getBufferSize();
int keepLines = totalLines * 0.7;
int skipLines = totalLines - keepLines;
String tempFile = "/compact_buffer.json";
File compactBuffer = SPIFFS.open(tempFile, FILE_WRITE);
// Skip old data
for (int i = 0; i < skipLines && file.available(); i++) {
file.readStringUntil('\n');
}
// Keep recent data
while (file.available()) {
String line = file.readStringUntil('\n');
compactBuffer.println(line);
}
file.close();
compactBuffer.close();
SPIFFS.remove(bufferFile);
SPIFFS.rename(tempFile, bufferFile);
}
};
Proyecto Aplicado: Sistema IoT Completo de Gestión de Datos
Plataforma Industrial IoT con Analytics Avanzado
Desarrolla una plataforma completa de gestión de datos IoT que integre múltiples ESP32, almacenamiento en la nube, procesamiento en tiempo real, analytics predictivo y dashboards interactivos para monitoreo industrial.
Arquitectura del Sistema:
Componentes Hardware:
- 3x ESP32 DevKit (diferentes sensores)
- DHT22 (temperatura/humedad)
- MPU6050 (acelerómetro/giroscopio)
- BMP280 (presión atmosférica)
- Sensores de corriente ACS712
- Display OLED 128x64
- Gateway Raspberry Pi (opcional)
Componentes Software:
- AWS IoT Core / Google Cloud IoT
- MongoDB / DynamoDB
- Node.js + Express API
- React Dashboard con D3.js
- Python Analytics con scikit-learn
- Docker para microservicios
- Grafana para visualización
Especificaciones Técnicas:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
// Network Configuration
const char* ssid = "INDUSTRIAL_WIFI";
const char* password = "SecurePass123";
// Cloud Configuration
const char* mqtt_server = "your-iot-endpoint.amazonaws.com";
const char* mqtt_user = "your_username";
const char* mqtt_password = "your_password";
const int mqtt_port = 8883;
// Device Configuration
const String deviceId = "ESP32_MASTER_001";
const String location = "Planta_Industrial_A";
const String zone = "Zona_Produccion_1";
// Hardware Objects
WiFiClientSecure espClient;
PubSubClient client(espClient);
DHT dht(4, DHT22);
MPU6050 mpu;
Adafruit_BMP280 bmp;
SSD1306 display(0x3c, 21, 22);
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP, "pool.ntp.org", -21600, 60000); // UTC-6
// Sensor Data Structure
struct SensorData {
float temperature;
float humidity;
float pressure;
float altitude;
float accelX, accelY, accelZ;
float gyroX, gyroY, gyroZ;
float current;
int rssi;
unsigned long timestamp;
String status;
};
// Global Variables
SensorData currentData;
bool cloudConnected = false;
int dataBufferCount = 0;
unsigned long lastSensorRead = 0;
unsigned long lastCloudSync = 0;
unsigned long lastDisplayUpdate = 0;
unsigned long lastHealthCheck = 0;
// Configuration
const unsigned long sensorInterval = 2000; // 2 seconds
const unsigned long cloudInterval = 10000; // 10 seconds
const unsigned long displayInterval = 1000; // 1 second
const unsigned long healthInterval = 30000; // 30 seconds
// Data Quality and Edge Intelligence
class DataQualityManager {
private:
float tempHistory[10];
float humHistory[10];
int historyIndex = 0;
bool historyFull = false;
public:
bool validateSensorData(const SensorData& data) {
// Temperature validation (-40 to 85°C for industrial)
if (data.temperature < -40 || data.temperature > 85) {
Serial.println("Warning: Temperature out of range");
return false;
}
// Humidity validation (0-100%)
if (data.humidity < 0 || data.humidity > 100) {
Serial.println("Warning: Humidity out of range");
return false;
}
// Pressure validation (300-1100 hPa)
if (data.pressure < 300 || data.pressure > 1100) {
Serial.println("Warning: Pressure out of range");
return false;
}
return true;
}
bool detectAnomalies(const SensorData& data) {
if (!historyFull && historyIndex < 10) {
tempHistory[historyIndex] = data.temperature;
humHistory[historyIndex] = data.humidity;
historyIndex++;
if (historyIndex >= 10) historyFull = true;
return false; // Not enough history
}
// Calculate moving average
float tempAvg = 0, humAvg = 0;
for (int i = 0; i < 10; i++) {
tempAvg += tempHistory[i];
humAvg += humHistory[i];
}
tempAvg /= 10;
humAvg /= 10;
// Update history with current data
tempHistory[historyIndex % 10] = data.temperature;
humHistory[historyIndex % 10] = data.humidity;
historyIndex++;
// Detect significant deviations (>3 standard deviations)
float tempDev = abs(data.temperature - tempAvg);
float humDev = abs(data.humidity - humAvg);
if (tempDev > 5.0 || humDev > 10.0) {
Serial.println("ANOMALY DETECTED: Significant sensor deviation");
return true;
}
return false;
}
};
// Edge Storage Manager
class EdgeStorageManager {
private:
const char* dataFile = "/sensor_buffer.jsonl";
const char* configFile = "/device_config.json";
const int maxBufferSize = 500;
public:
bool init() {
return SPIFFS.begin(true);
}
bool storeData(const SensorData& data) {
StaticJsonDocument<512> doc;
doc["deviceId"] = deviceId;
doc["location"] = location;
doc["zone"] = zone;
doc["timestamp"] = data.timestamp;
doc["temperature"] = data.temperature;
doc["humidity"] = data.humidity;
doc["pressure"] = data.pressure;
doc["altitude"] = data.altitude;
JsonObject accel = doc.createNestedObject("accelerometer");
accel["x"] = data.accelX;
accel["y"] = data.accelY;
accel["z"] = data.accelZ;
JsonObject gyro = doc.createNestedObject("gyroscope");
gyro["x"] = data.gyroX;
gyro["y"] = data.gyroY;
gyro["z"] = data.gyroZ;
doc["current"] = data.current;
doc["rssi"] = data.rssi;
doc["status"] = data.status;
File file = SPIFFS.open(dataFile, FILE_APPEND);
if (!file) {
Serial.println("Failed to open data file for writing");
return false;
}
serializeJson(doc, file);
file.println();
file.close();
// Manage buffer size
dataBufferCount++;
if (dataBufferCount > maxBufferSize) {
compactBuffer();
}
return true;
}
int syncWithCloud() {
File file = SPIFFS.open(dataFile, FILE_READ);
if (!file) {
return 0;
}
int syncCount = 0;
String line;
File tempFile = SPIFFS.open("/temp_buffer.jsonl", FILE_WRITE);
while (file.available()) {
line = file.readStringUntil('\n');
line.trim();
if (line.length() > 0) {
if (publishToCloud(line)) {
syncCount++;
} else {
// Failed to publish, keep in buffer
tempFile.println(line);
}
}
}
file.close();
tempFile.close();
// Replace original file with remaining data
SPIFFS.remove(dataFile);
if (SPIFFS.exists("/temp_buffer.jsonl")) {
SPIFFS.rename("/temp_buffer.jsonl", dataFile);
dataBufferCount = getBufferCount();
} else {
dataBufferCount = 0;
}
return syncCount;
}
private:
bool publishToCloud(const String& jsonData) {
if (!client.connected()) {
return false;
}
String topic = "industrial/" + location + "/" + zone + "/telemetry";
return client.publish(topic.c_str(), jsonData.c_str());
}
void compactBuffer() {
// Remove oldest 30% of data when buffer is full
File file = SPIFFS.open(dataFile, FILE_READ);
if (!file) return;
int linesToSkip = maxBufferSize * 0.3;
int currentLine = 0;
String line;
File newFile = SPIFFS.open("/compact_buffer.jsonl", FILE_WRITE);
while (file.available()) {
line = file.readStringUntil('\n');
if (currentLine >= linesToSkip) {
newFile.println(line);
}
currentLine++;
}
file.close();
newFile.close();
SPIFFS.remove(dataFile);
SPIFFS.rename("/compact_buffer.jsonl", dataFile);
dataBufferCount = getBufferCount();
}
int getBufferCount() {
File file = SPIFFS.open(dataFile, FILE_READ);
if (!file) return 0;
int count = 0;
while (file.available()) {
file.readStringUntil('\n');
count++;
}
file.close();
return count;
}
};
// Global objects
DataQualityManager dataQuality;
EdgeStorageManager edgeStorage;
void setup() {
Serial.begin(115200);
Serial.println("Iniciando ESP32 Master Node...");
// Initialize hardware
initializeHardware();
// Initialize edge storage
if (!edgeStorage.init()) {
Serial.println("Error: Failed to initialize edge storage");
}
// Connect to WiFi
connectToWiFi();
// Initialize NTP
timeClient.begin();
// Connect to cloud
connectToCloud();
Serial.println("ESP32 Master Node iniciado correctamente");
}
void loop() {
unsigned long now = millis();
// Update NTP time
timeClient.update();
// Read sensors
if (now - lastSensorRead >= sensorInterval) {
readAllSensors();
lastSensorRead = now;
}
// Update display
if (now - lastDisplayUpdate >= displayInterval) {
updateDisplay();
lastDisplayUpdate = now;
}
// Cloud sync
if (now - lastCloudSync >= cloudInterval) {
handleCloudOperations();
lastCloudSync = now;
}
// Health check
if (now - lastHealthCheck >= healthInterval) {
performHealthCheck();
lastHealthCheck = now;
}
// Handle MQTT
if (client.connected()) {
client.loop();
}
delay(100);
}
void initializeHardware() {
// Initialize I2C
Wire.begin(21, 22);
// Initialize sensors
dht.begin();
if (!mpu.begin()) {
Serial.println("Error: MPU6050 not found");
} else {
mpu.setAccelerometerRange(MPU6050_RANGE_8_G);
mpu.setGyroRange(MPU6050_RANGE_500_DEG);
}
if (!bmp.begin()) {
Serial.println("Error: BMP280 not found");
}
// Initialize display
display.init();
display.flipScreenVertically();
display.setFont(ArialMT_Plain_10);
Serial.println("Hardware initialized");
}
void readAllSensors() {
// Read environmental sensors
currentData.temperature = dht.readTemperature();
currentData.humidity = dht.readHumidity();
currentData.pressure = bmp.readPressure() / 100.0F; // hPa
currentData.altitude = bmp.readAltitude(1013.25); // Standard sea level pressure
// Read motion sensors
sensors_event_t a, g, temp;
mpu.getEvent(&a, &g, &temp);
currentData.accelX = a.acceleration.x;
currentData.accelY = a.acceleration.y;
currentData.accelZ = a.acceleration.z;
currentData.gyroX = g.gyro.x;
currentData.gyroY = g.gyro.y;
currentData.gyroZ = g.gyro.z;
// Read current sensor (simulated)
currentData.current = analogRead(A0) * (5.0 / 4095.0); // Convert to current
// System info
currentData.rssi = WiFi.RSSI();
currentData.timestamp = timeClient.getEpochTime();
// Validate data quality
if (dataQuality.validateSensorData(currentData)) {
currentData.status = "OK";
// Check for anomalies
if (dataQuality.detectAnomalies(currentData)) {
currentData.status = "ANOMALY";
sendAlert("Anomaly detected in sensor readings");
}
} else {
currentData.status = "ERROR";
}
// Store data locally
edgeStorage.storeData(currentData);
// Print to serial for debugging
Serial.printf("T:%.2f H:%.2f P:%.2f Status:%s\n",
currentData.temperature, currentData.humidity,
currentData.pressure, currentData.status.c_str());
}
void handleCloudOperations() {
if (!client.connected()) {
connectToCloud();
}
if (cloudConnected) {
// Sync buffered data
int synced = edgeStorage.syncWithCloud();
if (synced > 0) {
Serial.printf("Synced %d records to cloud\n", synced);
}
// Send current data
sendCurrentDataToCloud();
}
}
void sendCurrentDataToCloud() {
StaticJsonDocument<1024> doc;
// Device info
doc["deviceId"] = deviceId;
doc["location"] = location;
doc["zone"] = zone;
doc["timestamp"] = currentData.timestamp;
// Sensor readings
JsonObject sensors = doc.createNestedObject("sensors");
sensors["temperature"] = currentData.temperature;
sensors["humidity"] = currentData.humidity;
sensors["pressure"] = currentData.pressure;
sensors["altitude"] = currentData.altitude;
JsonObject motion = sensors.createNestedObject("motion");
JsonObject accel = motion.createNestedObject("accelerometer");
accel["x"] = currentData.accelX;
accel["y"] = currentData.accelY;
accel["z"] = currentData.accelZ;
JsonObject gyro = motion.createNestedObject("gyroscope");
gyro["x"] = currentData.gyroX;
gyro["y"] = currentData.gyroY;
gyro["z"] = currentData.gyroZ;
sensors["current"] = currentData.current;
// System info
JsonObject system = doc.createNestedObject("system");
system["rssi"] = currentData.rssi;
system["freeHeap"] = ESP.getFreeHeap();
system["uptime"] = millis();
system["bufferCount"] = dataBufferCount;
system["status"] = currentData.status;
String jsonString;
serializeJson(doc, jsonString);
String topic = "industrial/" + location + "/" + zone + "/telemetry";
client.publish(topic.c_str(), jsonString.c_str());
}
void connectToWiFi() {
WiFi.begin(ssid, password);
Serial.print("Connecting to WiFi");
int attempts = 0;
while (WiFi.status() != WL_CONNECTED && attempts < 20) {
delay(1000);
Serial.print(".");
attempts++;
}
if (WiFi.status() == WL_CONNECTED) {
Serial.println("\nWiFi connected!");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
} else {
Serial.println("\nFailed to connect to WiFi");
}
}
void connectToCloud() {
if (WiFi.status() != WL_CONNECTED) {
cloudConnected = false;
return;
}
client.setServer(mqtt_server, mqtt_port);
client.setCallback(onCloudMessage);
Serial.print("Connecting to cloud...");
if (client.connect(deviceId.c_str(), mqtt_user, mqtt_password)) {
Serial.println(" connected!");
cloudConnected = true;
// Subscribe to command topics
String commandTopic = "industrial/" + location + "/" + zone + "/commands/" + deviceId;
client.subscribe(commandTopic.c_str());
String broadcastTopic = "industrial/" + location + "/" + zone + "/commands/broadcast";
client.subscribe(broadcastTopic.c_str());
} else {
Serial.println(" failed!");
cloudConnected = false;
}
}
void onCloudMessage(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println("Command received: " + message);
StaticJsonDocument<256> doc;
deserializeJson(doc, message);
String command = doc["command"];
if (command == "restart") {
Serial.println("Restarting device...");
delay(1000);
ESP.restart();
} else if (command == "status") {
sendCurrentDataToCloud();
} else if (command == "sync") {
int synced = edgeStorage.syncWithCloud();
Serial.printf("Manual sync completed: %d records\n", synced);
}
}
void updateDisplay() {
display.clear();
// Title
display.setFont(ArialMT_Plain_10);
display.drawString(0, 0, "ESP32 Master Node");
// Status line
String status = cloudConnected ? "Cloud: ON" : "Cloud: OFF";
status += " | WiFi: " + String(WiFi.RSSI()) + "dBm";
display.drawString(0, 12, status);
// Sensor data
display.drawString(0, 24, "Temp: " + String(currentData.temperature, 1) + "°C");
display.drawString(0, 36, "Hum: " + String(currentData.humidity, 1) + "%");
display.drawString(0, 48, "Pres: " + String(currentData.pressure, 1) + "hPa");
// Buffer info
display.drawString(75, 24, "Buf: " + String(dataBufferCount));
display.drawString(75, 36, "Heap: " + String(ESP.getFreeHeap()/1024) + "KB");
display.drawString(75, 48, "Status: " + currentData.status);
display.display();
}
void performHealthCheck() {
// Check memory
if (ESP.getFreeHeap() < 50000) {
Serial.println("WARNING: Low memory");
sendAlert("Low memory warning");
}
// Check WiFi
if (WiFi.status() != WL_CONNECTED) {
Serial.println("WARNING: WiFi disconnected");
connectToWiFi();
}
// Check cloud connection
if (!cloudConnected) {
Serial.println("WARNING: Cloud disconnected");
connectToCloud();
}
Serial.println("Health check completed");
}
void sendAlert(const String& message) {
if (cloudConnected) {
StaticJsonDocument<256> alert;
alert["deviceId"] = deviceId;
alert["location"] = location;
alert["zone"] = zone;
alert["timestamp"] = timeClient.getEpochTime();
alert["alertType"] = "WARNING";
alert["message"] = message;
alert["severity"] = "MEDIUM";
String alertJson;
serializeJson(alert, alertJson);
String alertTopic = "industrial/" + location + "/" + zone + "/alerts";
client.publish(alertTopic.c_str(), alertJson.c_str());
Serial.println("Alert sent: " + message);
}
}
Evaluación y Troubleshooting
Problemas Comunes
- Conectividad intermitente:
- Implementar reconexión automática
- Buffer local robusto
- Heartbeat monitoring
- Autenticación fallida:
- Verificar certificados SSL/TLS
- Validar credenciales
- Comprobar permisos IoT
- Pérdida de datos:
- Queue de mensajes MQTT
- Almacenamiento persistente
- Confirmación de recepción
- Latencia alta:
- Optimizar payload JSON
- Batch processing
- Compresión de datos
Criterios de Evaluación
- Escalabilidad (25 pts):
- Soporte >1000 dispositivos
- Throughput >10k msg/sec
- Auto-scaling configurado
- Confiabilidad (25 pts):
- 99.9% uptime
- Tolerancia a fallos
- Backup y recovery
- Rendimiento (25 pts):
- Latencia <100ms
- Procesamiento tiempo real
- Optimización recursos
- Seguridad (25 pts):
- Cifrado extremo a extremo
- Autenticación robusta
- Audit logging
Herramientas de Debugging y Monitoreo
Monitoreo en Tiempo Real:
- AWS CloudWatch / Google Monitoring
- Grafana + Prometheus
- ELK Stack (Elasticsearch, Logstash, Kibana)
- New Relic / DataDog
Testing y Validación:
- MQTT.fx / MQTT Explorer
- Postman para APIs REST
- Load testing con JMeter
- Unit tests con Jest/PyTest
Desarrollo y Deploy:
- Docker Compose
- Kubernetes
- CI/CD con GitHub Actions
- Infrastructure as Code (Terraform)