Módulo 12

Gestión de datos en la nube

Proyecto Final Integral

ESP32 Mecatrónica IoT UNAM

Introducción Teórica

La gestión de datos en la nube representa el paradigma fundamental para el desarrollo de sistemas IoT escalables y eficientes. En el contexto de la mecatrónica moderna, la capacidad de almacenar, procesar y analizar grandes volúmenes de datos generados por dispositivos como el ESP32 es crucial para la toma de decisiones inteligentes y la optimización de procesos industriales.

Los sistemas de cloud computing ofrecen ventajas significativas: escalabilidad automática, alta disponibilidad, procesamiento distribuido, y acceso global a los datos. Para aplicaciones de mecatrónica e IoT, esto se traduce en la capacidad de monitorear equipos en tiempo real, predecir fallas, optimizar mantenimiento y mejorar la eficiencia operativa.

Proveedores Cloud

  • AWS IoT Core: Plataforma IoT completa
  • Google Cloud IoT: Machine Learning integrado
  • Microsoft Azure: Soluciones industriales
  • Firebase: Desarrollo rápido

Tipos de Datos IoT

  • Telemetría: Sensores en tiempo real
  • Comandos: Control remoto
  • Metadatos: Configuración dispositivos
  • Eventos: Alertas y notificaciones

Beneficios Clave

  • Escalabilidad: Millones de dispositivos
  • Disponibilidad: 99.9% uptime
  • Seguridad: Cifrado extremo a extremo
  • Analytics: Insights automáticos

Explicación Técnica Detallada

La arquitectura cloud para IoT se basa en varios componentes fundamentales que trabajan de manera integrada para proporcionar una solución completa de gestión de datos.

Arquitectura IoT Cloud

Capas principales:

  1. Edge Layer: Dispositivos ESP32
  2. Gateway Layer: Concentradores de datos
  3. Network Layer: Conectividad (WiFi, MQTT)
  4. Data Processing: ETL y análisis
  5. Application Layer: Dashboards y APIs

Protocolos de Datos

Formatos y protocolos:

  • JSON: Intercambio ligero
  • MQTT: Pub/Sub eficiente
  • HTTP/HTTPS: APIs REST
  • CoAP: Dispositivos limitados
  • WebSocket: Tiempo real

Conexión básica ESP32 a AWS IoT

AWS IoT Connection
#include 
#include 
#include 
#include 
#include 

// Configuración WiFi
const char* ssid = "tu_wifi";
const char* password = "tu_password";

// Configuración AWS IoT
const char* aws_endpoint = "your-endpoint.iot.region.amazonaws.com";
const char* aws_topic = "esp32/telemetry";
const char* aws_topic_sub = "esp32/commands";

// Certificados (almacenar en PROGMEM para ESP32)
static const char aws_root_ca_pem[] PROGMEM = R"EOF(
-----BEGIN CERTIFICATE-----
// Tu certificado raíz aquí
-----END CERTIFICATE-----
)EOF";

static const char certificate_pem_crt[] PROGMEM = R"KEY(
-----BEGIN CERTIFICATE-----
// Tu certificado de dispositivo aquí
-----END CERTIFICATE-----
)KEY";

static const char private_pem_key[] PROGMEM = R"KEY(
-----BEGIN PRIVATE KEY-----
// Tu clave privada aquí
-----END PRIVATE KEY-----
)KEY";

// Objetos
WiFiClientSecure net;
PubSubClient client(net);
DHT dht(4, DHT22);

// Variables
float temperature = 0;
float humidity = 0;
unsigned long lastMsg = 0;
const long interval = 5000; // 5 segundos

void setup() {
    Serial.begin(115200);
    
    // Inicializar sensor
    dht.begin();
    
    // Configurar WiFi
    connectToWiFi();
    
    // Configurar AWS IoT
    configureAWS();
    
    Serial.println("Sistema iniciado correctamente");
}

void loop() {
    if (!client.connected()) {
        connectToAWS();
    }
    client.loop();
    
    unsigned long now = millis();
    if (now - lastMsg > interval) {
        lastMsg = now;
        
        // Leer sensores
        readSensors();
        
        // Enviar telemetría
        sendTelemetry();
    }
    
    delay(100);
}

void connectToWiFi() {
    WiFi.begin(ssid, password);
    Serial.print("Conectando a WiFi");
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println();
    Serial.println("WiFi conectado!");
    Serial.print("Dirección IP: ");
    Serial.println(WiFi.localIP());
}

void configureAWS() {
    // Configurar certificados
    net.setCACert(aws_root_ca_pem);
    net.setCertificate(certificate_pem_crt);
    net.setPrivateKey(private_pem_key);
    
    // Configurar cliente MQTT
    client.setServer(aws_endpoint, 8883);
    client.setCallback(messageReceived);
    
    Serial.println("AWS IoT configurado");
}

void connectToAWS() {
    while (!client.connected()) {
        Serial.print("Conectando a AWS IoT...");
        
        if (client.connect("ESP32Client")) {
            Serial.println(" conectado!");
            
            // Suscribirse a tópicos
            client.subscribe(aws_topic_sub);
            Serial.println("Suscrito a: " + String(aws_topic_sub));
            
        } else {
            Serial.print(" falló, rc=");
            Serial.print(client.state());
            Serial.println(" reintentando en 5 segundos");
            delay(5000);
        }
    }
}

void readSensors() {
    temperature = dht.readTemperature();
    humidity = dht.readHumidity();
    
    // Verificar si las lecturas son válidas
    if (isnan(temperature) || isnan(humidity)) {
        Serial.println("Error leyendo sensor DHT!");
        return;
    }
    
    Serial.printf("Temp: %.2f°C, Hum: %.2f%%\n", temperature, humidity);
}

void sendTelemetry() {
    // Crear payload JSON
    StaticJsonDocument<300> telemetry;
    telemetry["timestamp"] = millis();
    telemetry["device_id"] = "ESP32_001";
    telemetry["temperature"] = temperature;
    telemetry["humidity"] = humidity;
    telemetry["rssi"] = WiFi.RSSI();
    telemetry["free_heap"] = ESP.getFreeHeap();
    
    // Agregar geolocalización (ejemplo)
    JsonObject location = telemetry.createNestedObject("location");
    location["latitude"] = 19.4326;
    location["longitude"] = -99.1332;
    
    // Convertir a string
    char buffer[512];
    serializeJson(telemetry, buffer);
    
    // Publicar a AWS IoT
    if (client.publish(aws_topic, buffer)) {
        Serial.println("Telemetría enviada: " + String(buffer));
    } else {
        Serial.println("Error enviando telemetría");
    }
}

void messageReceived(char* topic, byte* payload, unsigned int length) {
    Serial.println("Mensaje recibido en tópico: " + String(topic));
    
    // Convertir payload a string
    String message = "";
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    
    Serial.println("Mensaje: " + message);
    
    // Procesar comandos
    StaticJsonDocument<200> doc;
    deserializeJson(doc, message);
    
    if (doc["command"] == "restart") {
        Serial.println("Reiniciando dispositivo...");
        delay(1000);
        ESP.restart();
    } else if (doc["command"] == "status") {
        sendTelemetry(); // Enviar estado actual
    }
}

Almacenamiento y Base de Datos

Bases de Datos SQL

Casos de uso:

  • Relaciones complejas entre datos
  • Transacciones ACID requeridas
  • Consultas complejas con JOINs
  • Reporting y analytics tradicional

Ejemplos: PostgreSQL, MySQL, SQL Server

Bases de Datos NoSQL

Casos de uso:

  • Escalamiento horizontal masivo
  • Datos no estructurados o semi-estructurados
  • Alto throughput de escritura
  • Flexibilidad de esquema

Ejemplos: DynamoDB, MongoDB, Cassandra

Ejercicios Prácticos Visuales

1

Configuración Cloud Básica

Básico 20 min

Objetivo: Configurar ESP32 para enviar datos a Firebase Realtime Database.

Materiales:

  • ESP32 DevKit
  • Cuenta de Firebase
  • Sensor DHT22
  • Resistencias y cables

Pasos:

  1. Crear proyecto en Firebase
  2. Configurar Realtime Database
  3. Obtener credenciales API
  4. Implementar código ESP32
Código Firebase:
Firebase Connection
#include 
#include 
#include 

// Firebase config
#define FIREBASE_HOST "tu-proyecto.firebaseio.com"
#define FIREBASE_AUTH "tu-secret-key"

// WiFi credentials
const char* ssid = "tu_wifi";
const char* password = "tu_password";

// Firebase objects
FirebaseData firebaseData;
FirebaseConfig config;
FirebaseAuth auth;

// Sensor
DHT dht(4, DHT22);

void setup() {
    Serial.begin(115200);
    dht.begin();
    
    // Connect WiFi
    WiFi.begin(ssid, password);
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    // Firebase config
    config.host = FIREBASE_HOST;
    config.signer.tokens.legacy_token = FIREBASE_AUTH;
    
    Firebase.begin(&config, &auth);
    Firebase.reconnectWiFi(true);
    
    Serial.println("Sistema iniciado");
}

void loop() {
    float temp = dht.readTemperature();
    float hum = dht.readHumidity();
    
    if (!isnan(temp) && !isnan(hum)) {
        String path = "/sensors/" + String(millis());
        
        FirebaseJson json;
        json.set("temperature", temp);
        json.set("humidity", hum);
        json.set("timestamp", millis());
        
        if (Firebase.setJSON(firebaseData, path, json)) {
            Serial.println("Datos enviados: T=" + String(temp) + "°C, H=" + String(hum) + "%");
        } else {
            Serial.println("Error: " + firebaseData.errorReason());
        }
    }
    
    delay(10000); // Enviar cada 10 segundos
}
2

Dashboard en Tiempo Real

Intermedio 45 min

Objetivo: Crear dashboard web que muestre datos de sensores en tiempo real.

Materiales:

  • ESP32 con sensores
  • Servidor web (Node.js/Python)
  • Base de datos (MongoDB/Firebase)
  • Framework frontend (React/Vue)

Características:

  • Gráficas en tiempo real
  • Alertas automáticas
  • Histórico de datos
  • Control remoto de actuadores
Backend Node.js con Socket.IO:
Node.js Server
const express = require('express');
const http = require('http');
const socketIO = require('socket.io');
const mqtt = require('mqtt');
const mongoose = require('mongoose');

const app = express();
const server = http.createServer(app);
const io = socketIO(server);

// MongoDB connection
mongoose.connect('mongodb://localhost/iot_data');

// Data schema
const SensorData = mongoose.model('SensorData', {
    deviceId: String,
    temperature: Number,
    humidity: Number,
    timestamp: { type: Date, default: Date.now }
});

// MQTT client
const mqttClient = mqtt.connect('mqtt://localhost:1883');

mqttClient.on('connect', () => {
    console.log('MQTT connected');
    mqttClient.subscribe('esp32/telemetry');
});

mqttClient.on('message', async (topic, message) => {
    try {
        const data = JSON.parse(message.toString());
        
        // Save to database
        const sensorData = new SensorData({
            deviceId: data.device_id,
            temperature: data.temperature,
            humidity: data.humidity
        });
        
        await sensorData.save();
        
        // Emit to connected clients
        io.emit('sensorData', data);
        
        console.log('Data processed:', data);
    } catch (error) {
        console.error('Error processing message:', error);
    }
});

// REST API endpoints
app.use(express.json());
app.use(express.static('public'));

app.get('/api/sensors/latest', async (req, res) => {
    try {
        const latestData = await SensorData.find()
            .sort({ timestamp: -1 })
            .limit(10);
        res.json(latestData);
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

app.get('/api/sensors/history/:hours', async (req, res) => {
    try {
        const hours = parseInt(req.params.hours);
        const since = new Date(Date.now() - (hours * 60 * 60 * 1000));
        
        const data = await SensorData.find({
            timestamp: { $gte: since }
        }).sort({ timestamp: 1 });
        
        res.json(data);
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

// Socket.IO connection handling
io.on('connection', (socket) => {
    console.log('Client connected');
    
    socket.on('disconnect', () => {
        console.log('Client disconnected');
    });
    
    // Send command to device
    socket.on('sendCommand', (command) => {
        mqttClient.publish('esp32/commands', JSON.stringify(command));
    });
});

server.listen(3000, () => {
    console.log('Server running on port 3000');
});
3

Analytics y Machine Learning

Avanzado 60 min

Objetivo: Implementar análisis predictivo de datos de sensores.

Tecnologías:

  • Python con scikit-learn
  • TensorFlow/PyTorch
  • Apache Spark
  • Jupyter Notebooks

Casos de uso:

  • Predicción de fallos
  • Detección de anomalías
  • Optimización energética
  • Mantenimiento predictivo
Script Python para Análisis:
Python Analytics
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

class IoTAnalytics:
    def __init__(self):
        self.scaler = StandardScaler()
        self.anomaly_detector = IsolationForest(
            contamination=0.1,
            random_state=42
        )
    
    def load_data(self, source='mongodb'):
        """Cargar datos desde diferentes fuentes"""
        if source == 'mongodb':
            from pymongo import MongoClient
            client = MongoClient('mongodb://localhost:27017/')
            db = client.iot_data
            
            data = list(db.sensordata.find())
            df = pd.DataFrame(data)
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
        elif source == 'csv':
            df = pd.read_csv('sensor_data.csv')
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
        return df
    
    def preprocess_data(self, df):
        """Preprocesar datos para análisis"""
        # Remover valores faltantes
        df = df.dropna()
        
        # Crear características temporales
        df['hour'] = df['timestamp'].dt.hour
        df['day_of_week'] = df['timestamp'].dt.dayofweek
        df['month'] = df['timestamp'].dt.month
        
        # Calcular estadísticas móviles
        df['temp_rolling_mean'] = df['temperature'].rolling(window=10).mean()
        df['hum_rolling_std'] = df['humidity'].rolling(window=10).std()
        
        # Detectar cambios bruscos
        df['temp_diff'] = df['temperature'].diff()
        df['hum_diff'] = df['humidity'].diff()
        
        return df
    
    def detect_anomalies(self, df):
        """Detectar anomalías en los datos"""
        features = ['temperature', 'humidity', 'temp_diff', 'hum_diff']
        
        # Preparar datos
        X = df[features].dropna()
        X_scaled = self.scaler.fit_transform(X)
        
        # Detectar anomalías
        anomalies = self.anomaly_detector.fit_predict(X_scaled)
        
        # Agregar resultados al dataframe
        df_clean = df.dropna(subset=features)
        df_clean['anomaly'] = anomalies
        df_clean['anomaly_score'] = self.anomaly_detector.score_samples(X_scaled)
        
        return df_clean
    
    def predict_maintenance(self, df):
        """Predecir necesidad de mantenimiento"""
        # Simular score de salud del dispositivo
        df['health_score'] = 100 - (
            abs(df['temperature'] - df['temperature'].mean()) * 2 +
            abs(df['humidity'] - df['humidity'].mean()) * 1.5 +
            abs(df['temp_diff'].fillna(0)) * 5
        )
        
        # Predecir mantenimiento cuando health_score < 70
        df['needs_maintenance'] = df['health_score'] < 70
        
        return df
    
    def generate_insights(self, df):
        """Generar insights automáticos"""
        insights = {
            'total_records': len(df),
            'date_range': {
                'start': df['timestamp'].min(),
                'end': df['timestamp'].max()
            },
            'temperature_stats': {
                'mean': df['temperature'].mean(),
                'min': df['temperature'].min(),
                'max': df['temperature'].max(),
                'std': df['temperature'].std()
            },
            'humidity_stats': {
                'mean': df['humidity'].mean(),
                'min': df['humidity'].min(),
                'max': df['humidity'].max(),
                'std': df['humidity'].std()
            },
            'anomalies': {
                'total': len(df[df['anomaly'] == -1]),
                'percentage': len(df[df['anomaly'] == -1]) / len(df) * 100
            },
            'maintenance_alerts': len(df[df['needs_maintenance'] == True])
        }
        
        return insights
    
    def create_dashboard_data(self, df):
        """Preparar datos para dashboard"""
        # Agrupar por hora para gráficas
        hourly = df.set_index('timestamp').resample('1H').agg({
            'temperature': ['mean', 'min', 'max'],
            'humidity': ['mean', 'min', 'max'],
            'anomaly': lambda x: (x == -1).sum(),
            'needs_maintenance': 'any'
        }).round(2)
        
        # Convertir a formato JSON para frontend
        dashboard_data = {
            'timeseries': hourly.to_dict('records'),
            'timestamps': hourly.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
            'alerts': df[df['anomaly'] == -1][['timestamp', 'temperature', 'humidity']].to_dict('records'),
            'maintenance': df[df['needs_maintenance']][['timestamp', 'health_score']].to_dict('records')
        }
        
        return dashboard_data

# Ejemplo de uso
if __name__ == "__main__":
    analytics = IoTAnalytics()
    
    # Cargar y procesar datos
    df = analytics.load_data()
    df = analytics.preprocess_data(df)
    df = analytics.detect_anomalies(df)
    df = analytics.predict_maintenance(df)
    
    # Generar insights
    insights = analytics.generate_insights(df)
    print("Insights generados:", insights)
    
    # Crear datos para dashboard
    dashboard_data = analytics.create_dashboard_data(df)
    
    # Visualización
    plt.figure(figsize=(15, 10))
    
    plt.subplot(2, 2, 1)
    plt.plot(df['timestamp'], df['temperature'])
    plt.scatter(df[df['anomaly'] == -1]['timestamp'], 
               df[df['anomaly'] == -1]['temperature'], 
               color='red', label='Anomalías')
    plt.title('Temperatura vs Tiempo')
    plt.xlabel('Tiempo')
    plt.ylabel('Temperatura (°C)')
    plt.legend()
    
    plt.subplot(2, 2, 2)
    plt.plot(df['timestamp'], df['humidity'])
    plt.scatter(df[df['anomaly'] == -1]['timestamp'], 
               df[df['anomaly'] == -1]['humidity'], 
               color='red', label='Anomalías')
    plt.title('Humedad vs Tiempo')
    plt.xlabel('Tiempo')
    plt.ylabel('Humedad (%)')
    plt.legend()
    
    plt.subplot(2, 2, 3)
    plt.plot(df['timestamp'], df['health_score'])
    plt.axhline(y=70, color='r', linestyle='--', label='Umbral Mantenimiento')
    plt.title('Score de Salud del Dispositivo')
    plt.xlabel('Tiempo')
    plt.ylabel('Score de Salud')
    plt.legend()
    
    plt.subplot(2, 2, 4)
    hourly_anomalies = df.set_index('timestamp').resample('1H')['anomaly'].apply(lambda x: (x == -1).sum())
    plt.bar(range(len(hourly_anomalies)), hourly_anomalies.values)
    plt.title('Anomalías por Hora')
    plt.xlabel('Hora')
    plt.ylabel('Número de Anomalías')
    
    plt.tight_layout()
    plt.show()
4

Edge Computing con Local Storage

Avanzado 40 min

Objetivo: Implementar almacenamiento local como backup cuando no hay conectividad.

Características:

  • Buffer circular en SPIFFS
  • Sincronización automática
  • Compresión de datos
  • Queue de prioridades
ESP32 Edge Storage:
Edge Computing
#include 
#include 
#include 
#include 

class EdgeDataManager {
private:
    String bufferFile = "/data_buffer.json";
    String configFile = "/config.json";
    int maxBufferSize = 1000;
    
public:
    bool initStorage() {
        if (!SPIFFS.begin(true)) {
            Serial.println("Error inicializando SPIFFS");
            return false;
        }
        return true;
    }
    
    bool storeDataLocally(const JsonObject& data) {
        File file = SPIFFS.open(bufferFile, FILE_APPEND);
        if (!file) {
            Serial.println("Error abriendo buffer file");
            return false;
        }
        
        String jsonString;
        serializeJson(data, jsonString);
        file.println(jsonString);
        file.close();
        
        // Verificar tamaño del buffer
        if (getBufferSize() > maxBufferSize) {
            compactBuffer();
        }
        
        return true;
    }
    
    int syncToCloud() {
        if (WiFi.status() != WL_CONNECTED) {
            return 0; // No connectivity
        }
        
        File file = SPIFFS.open(bufferFile, FILE_READ);
        if (!file) {
            return 0; // No data to sync
        }
        
        int syncCount = 0;
        String tempFile = "/temp_buffer.json";
        File tempBuffer = SPIFFS.open(tempFile, FILE_WRITE);
        
        while (file.available()) {
            String line = file.readStringUntil('\n');
            line.trim();
            
            if (line.length() > 0) {
                if (sendToCloud(line)) {
                    syncCount++;
                } else {
                    // Failed to send, keep in buffer
                    tempBuffer.println(line);
                }
            }
        }
        
        file.close();
        tempBuffer.close();
        
        // Replace buffer with remaining data
        SPIFFS.remove(bufferFile);
        SPIFFS.rename(tempFile, bufferFile);
        
        return syncCount;
    }
    
private:
    bool sendToCloud(const String& jsonData) {
        HTTPClient http;
        http.begin("https://api.tu-servicio.com/data");
        http.addHeader("Content-Type", "application/json");
        
        int httpResponseCode = http.POST(jsonData);
        http.end();
        
        return (httpResponseCode == 200);
    }
    
    int getBufferSize() {
        File file = SPIFFS.open(bufferFile, FILE_READ);
        if (!file) return 0;
        
        int count = 0;
        while (file.available()) {
            file.readStringUntil('\n');
            count++;
        }
        file.close();
        return count;
    }
    
    void compactBuffer() {
        // Keep only the most recent 70% of data
        File file = SPIFFS.open(bufferFile, FILE_READ);
        if (!file) return;
        
        int totalLines = getBufferSize();
        int keepLines = totalLines * 0.7;
        int skipLines = totalLines - keepLines;
        
        String tempFile = "/compact_buffer.json";
        File compactBuffer = SPIFFS.open(tempFile, FILE_WRITE);
        
        // Skip old data
        for (int i = 0; i < skipLines && file.available(); i++) {
            file.readStringUntil('\n');
        }
        
        // Keep recent data
        while (file.available()) {
            String line = file.readStringUntil('\n');
            compactBuffer.println(line);
        }
        
        file.close();
        compactBuffer.close();
        
        SPIFFS.remove(bufferFile);
        SPIFFS.rename(tempFile, bufferFile);
    }
};

Proyecto Aplicado: Sistema IoT Completo de Gestión de Datos

Plataforma Industrial IoT con Analytics Avanzado

Desarrolla una plataforma completa de gestión de datos IoT que integre múltiples ESP32, almacenamiento en la nube, procesamiento en tiempo real, analytics predictivo y dashboards interactivos para monitoreo industrial.

Arquitectura del Sistema:

Componentes Hardware:
  • 3x ESP32 DevKit (diferentes sensores)
  • DHT22 (temperatura/humedad)
  • MPU6050 (acelerómetro/giroscopio)
  • BMP280 (presión atmosférica)
  • Sensores de corriente ACS712
  • Display OLED 128x64
  • Gateway Raspberry Pi (opcional)
Componentes Software:
  • AWS IoT Core / Google Cloud IoT
  • MongoDB / DynamoDB
  • Node.js + Express API
  • React Dashboard con D3.js
  • Python Analytics con scikit-learn
  • Docker para microservicios
  • Grafana para visualización

Especificaciones Técnicas:

ESP32 Master Node - Gestión Completa
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

// Network Configuration
const char* ssid = "INDUSTRIAL_WIFI";
const char* password = "SecurePass123";

// Cloud Configuration
const char* mqtt_server = "your-iot-endpoint.amazonaws.com";
const char* mqtt_user = "your_username";
const char* mqtt_password = "your_password";
const int mqtt_port = 8883;

// Device Configuration
const String deviceId = "ESP32_MASTER_001";
const String location = "Planta_Industrial_A";
const String zone = "Zona_Produccion_1";

// Hardware Objects
WiFiClientSecure espClient;
PubSubClient client(espClient);
DHT dht(4, DHT22);
MPU6050 mpu;
Adafruit_BMP280 bmp;
SSD1306 display(0x3c, 21, 22);
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP, "pool.ntp.org", -21600, 60000); // UTC-6

// Sensor Data Structure
struct SensorData {
    float temperature;
    float humidity;
    float pressure;
    float altitude;
    float accelX, accelY, accelZ;
    float gyroX, gyroY, gyroZ;
    float current;
    int rssi;
    unsigned long timestamp;
    String status;
};

// Global Variables
SensorData currentData;
bool cloudConnected = false;
int dataBufferCount = 0;
unsigned long lastSensorRead = 0;
unsigned long lastCloudSync = 0;
unsigned long lastDisplayUpdate = 0;
unsigned long lastHealthCheck = 0;

// Configuration
const unsigned long sensorInterval = 2000;  // 2 seconds
const unsigned long cloudInterval = 10000;  // 10 seconds
const unsigned long displayInterval = 1000; // 1 second
const unsigned long healthInterval = 30000; // 30 seconds

// Data Quality and Edge Intelligence
class DataQualityManager {
private:
    float tempHistory[10];
    float humHistory[10];
    int historyIndex = 0;
    bool historyFull = false;
    
public:
    bool validateSensorData(const SensorData& data) {
        // Temperature validation (-40 to 85°C for industrial)
        if (data.temperature < -40 || data.temperature > 85) {
            Serial.println("Warning: Temperature out of range");
            return false;
        }
        
        // Humidity validation (0-100%)
        if (data.humidity < 0 || data.humidity > 100) {
            Serial.println("Warning: Humidity out of range");
            return false;
        }
        
        // Pressure validation (300-1100 hPa)
        if (data.pressure < 300 || data.pressure > 1100) {
            Serial.println("Warning: Pressure out of range");
            return false;
        }
        
        return true;
    }
    
    bool detectAnomalies(const SensorData& data) {
        if (!historyFull && historyIndex < 10) {
            tempHistory[historyIndex] = data.temperature;
            humHistory[historyIndex] = data.humidity;
            historyIndex++;
            if (historyIndex >= 10) historyFull = true;
            return false; // Not enough history
        }
        
        // Calculate moving average
        float tempAvg = 0, humAvg = 0;
        for (int i = 0; i < 10; i++) {
            tempAvg += tempHistory[i];
            humAvg += humHistory[i];
        }
        tempAvg /= 10;
        humAvg /= 10;
        
        // Update history with current data
        tempHistory[historyIndex % 10] = data.temperature;
        humHistory[historyIndex % 10] = data.humidity;
        historyIndex++;
        
        // Detect significant deviations (>3 standard deviations)
        float tempDev = abs(data.temperature - tempAvg);
        float humDev = abs(data.humidity - humAvg);
        
        if (tempDev > 5.0 || humDev > 10.0) {
            Serial.println("ANOMALY DETECTED: Significant sensor deviation");
            return true;
        }
        
        return false;
    }
};

// Edge Storage Manager
class EdgeStorageManager {
private:
    const char* dataFile = "/sensor_buffer.jsonl";
    const char* configFile = "/device_config.json";
    const int maxBufferSize = 500;
    
public:
    bool init() {
        return SPIFFS.begin(true);
    }
    
    bool storeData(const SensorData& data) {
        StaticJsonDocument<512> doc;
        doc["deviceId"] = deviceId;
        doc["location"] = location;
        doc["zone"] = zone;
        doc["timestamp"] = data.timestamp;
        doc["temperature"] = data.temperature;
        doc["humidity"] = data.humidity;
        doc["pressure"] = data.pressure;
        doc["altitude"] = data.altitude;
        
        JsonObject accel = doc.createNestedObject("accelerometer");
        accel["x"] = data.accelX;
        accel["y"] = data.accelY;
        accel["z"] = data.accelZ;
        
        JsonObject gyro = doc.createNestedObject("gyroscope");
        gyro["x"] = data.gyroX;
        gyro["y"] = data.gyroY;
        gyro["z"] = data.gyroZ;
        
        doc["current"] = data.current;
        doc["rssi"] = data.rssi;
        doc["status"] = data.status;
        
        File file = SPIFFS.open(dataFile, FILE_APPEND);
        if (!file) {
            Serial.println("Failed to open data file for writing");
            return false;
        }
        
        serializeJson(doc, file);
        file.println();
        file.close();
        
        // Manage buffer size
        dataBufferCount++;
        if (dataBufferCount > maxBufferSize) {
            compactBuffer();
        }
        
        return true;
    }
    
    int syncWithCloud() {
        File file = SPIFFS.open(dataFile, FILE_READ);
        if (!file) {
            return 0;
        }
        
        int syncCount = 0;
        String line;
        
        File tempFile = SPIFFS.open("/temp_buffer.jsonl", FILE_WRITE);
        
        while (file.available()) {
            line = file.readStringUntil('\n');
            line.trim();
            
            if (line.length() > 0) {
                if (publishToCloud(line)) {
                    syncCount++;
                } else {
                    // Failed to publish, keep in buffer
                    tempFile.println(line);
                }
            }
        }
        
        file.close();
        tempFile.close();
        
        // Replace original file with remaining data
        SPIFFS.remove(dataFile);
        if (SPIFFS.exists("/temp_buffer.jsonl")) {
            SPIFFS.rename("/temp_buffer.jsonl", dataFile);
            dataBufferCount = getBufferCount();
        } else {
            dataBufferCount = 0;
        }
        
        return syncCount;
    }
    
private:
    bool publishToCloud(const String& jsonData) {
        if (!client.connected()) {
            return false;
        }
        
        String topic = "industrial/" + location + "/" + zone + "/telemetry";
        return client.publish(topic.c_str(), jsonData.c_str());
    }
    
    void compactBuffer() {
        // Remove oldest 30% of data when buffer is full
        File file = SPIFFS.open(dataFile, FILE_READ);
        if (!file) return;
        
        int linesToSkip = maxBufferSize * 0.3;
        int currentLine = 0;
        String line;
        
        File newFile = SPIFFS.open("/compact_buffer.jsonl", FILE_WRITE);
        
        while (file.available()) {
            line = file.readStringUntil('\n');
            
            if (currentLine >= linesToSkip) {
                newFile.println(line);
            }
            currentLine++;
        }
        
        file.close();
        newFile.close();
        
        SPIFFS.remove(dataFile);
        SPIFFS.rename("/compact_buffer.jsonl", dataFile);
        dataBufferCount = getBufferCount();
    }
    
    int getBufferCount() {
        File file = SPIFFS.open(dataFile, FILE_READ);
        if (!file) return 0;
        
        int count = 0;
        while (file.available()) {
            file.readStringUntil('\n');
            count++;
        }
        file.close();
        return count;
    }
};

// Global objects
DataQualityManager dataQuality;
EdgeStorageManager edgeStorage;

void setup() {
    Serial.begin(115200);
    Serial.println("Iniciando ESP32 Master Node...");
    
    // Initialize hardware
    initializeHardware();
    
    // Initialize edge storage
    if (!edgeStorage.init()) {
        Serial.println("Error: Failed to initialize edge storage");
    }
    
    // Connect to WiFi
    connectToWiFi();
    
    // Initialize NTP
    timeClient.begin();
    
    // Connect to cloud
    connectToCloud();
    
    Serial.println("ESP32 Master Node iniciado correctamente");
}

void loop() {
    unsigned long now = millis();
    
    // Update NTP time
    timeClient.update();
    
    // Read sensors
    if (now - lastSensorRead >= sensorInterval) {
        readAllSensors();
        lastSensorRead = now;
    }
    
    // Update display
    if (now - lastDisplayUpdate >= displayInterval) {
        updateDisplay();
        lastDisplayUpdate = now;
    }
    
    // Cloud sync
    if (now - lastCloudSync >= cloudInterval) {
        handleCloudOperations();
        lastCloudSync = now;
    }
    
    // Health check
    if (now - lastHealthCheck >= healthInterval) {
        performHealthCheck();
        lastHealthCheck = now;
    }
    
    // Handle MQTT
    if (client.connected()) {
        client.loop();
    }
    
    delay(100);
}

void initializeHardware() {
    // Initialize I2C
    Wire.begin(21, 22);
    
    // Initialize sensors
    dht.begin();
    
    if (!mpu.begin()) {
        Serial.println("Error: MPU6050 not found");
    } else {
        mpu.setAccelerometerRange(MPU6050_RANGE_8_G);
        mpu.setGyroRange(MPU6050_RANGE_500_DEG);
    }
    
    if (!bmp.begin()) {
        Serial.println("Error: BMP280 not found");
    }
    
    // Initialize display
    display.init();
    display.flipScreenVertically();
    display.setFont(ArialMT_Plain_10);
    
    Serial.println("Hardware initialized");
}

void readAllSensors() {
    // Read environmental sensors
    currentData.temperature = dht.readTemperature();
    currentData.humidity = dht.readHumidity();
    currentData.pressure = bmp.readPressure() / 100.0F; // hPa
    currentData.altitude = bmp.readAltitude(1013.25); // Standard sea level pressure
    
    // Read motion sensors
    sensors_event_t a, g, temp;
    mpu.getEvent(&a, &g, &temp);
    currentData.accelX = a.acceleration.x;
    currentData.accelY = a.acceleration.y;
    currentData.accelZ = a.acceleration.z;
    currentData.gyroX = g.gyro.x;
    currentData.gyroY = g.gyro.y;
    currentData.gyroZ = g.gyro.z;
    
    // Read current sensor (simulated)
    currentData.current = analogRead(A0) * (5.0 / 4095.0); // Convert to current
    
    // System info
    currentData.rssi = WiFi.RSSI();
    currentData.timestamp = timeClient.getEpochTime();
    
    // Validate data quality
    if (dataQuality.validateSensorData(currentData)) {
        currentData.status = "OK";
        
        // Check for anomalies
        if (dataQuality.detectAnomalies(currentData)) {
            currentData.status = "ANOMALY";
            sendAlert("Anomaly detected in sensor readings");
        }
    } else {
        currentData.status = "ERROR";
    }
    
    // Store data locally
    edgeStorage.storeData(currentData);
    
    // Print to serial for debugging
    Serial.printf("T:%.2f H:%.2f P:%.2f Status:%s\n", 
                  currentData.temperature, currentData.humidity, 
                  currentData.pressure, currentData.status.c_str());
}

void handleCloudOperations() {
    if (!client.connected()) {
        connectToCloud();
    }
    
    if (cloudConnected) {
        // Sync buffered data
        int synced = edgeStorage.syncWithCloud();
        if (synced > 0) {
            Serial.printf("Synced %d records to cloud\n", synced);
        }
        
        // Send current data
        sendCurrentDataToCloud();
    }
}

void sendCurrentDataToCloud() {
    StaticJsonDocument<1024> doc;
    
    // Device info
    doc["deviceId"] = deviceId;
    doc["location"] = location;
    doc["zone"] = zone;
    doc["timestamp"] = currentData.timestamp;
    
    // Sensor readings
    JsonObject sensors = doc.createNestedObject("sensors");
    sensors["temperature"] = currentData.temperature;
    sensors["humidity"] = currentData.humidity;
    sensors["pressure"] = currentData.pressure;
    sensors["altitude"] = currentData.altitude;
    
    JsonObject motion = sensors.createNestedObject("motion");
    JsonObject accel = motion.createNestedObject("accelerometer");
    accel["x"] = currentData.accelX;
    accel["y"] = currentData.accelY;
    accel["z"] = currentData.accelZ;
    
    JsonObject gyro = motion.createNestedObject("gyroscope");
    gyro["x"] = currentData.gyroX;
    gyro["y"] = currentData.gyroY;
    gyro["z"] = currentData.gyroZ;
    
    sensors["current"] = currentData.current;
    
    // System info
    JsonObject system = doc.createNestedObject("system");
    system["rssi"] = currentData.rssi;
    system["freeHeap"] = ESP.getFreeHeap();
    system["uptime"] = millis();
    system["bufferCount"] = dataBufferCount;
    system["status"] = currentData.status;
    
    String jsonString;
    serializeJson(doc, jsonString);
    
    String topic = "industrial/" + location + "/" + zone + "/telemetry";
    client.publish(topic.c_str(), jsonString.c_str());
}

void connectToWiFi() {
    WiFi.begin(ssid, password);
    Serial.print("Connecting to WiFi");
    
    int attempts = 0;
    while (WiFi.status() != WL_CONNECTED && attempts < 20) {
        delay(1000);
        Serial.print(".");
        attempts++;
    }
    
    if (WiFi.status() == WL_CONNECTED) {
        Serial.println("\nWiFi connected!");
        Serial.print("IP address: ");
        Serial.println(WiFi.localIP());
    } else {
        Serial.println("\nFailed to connect to WiFi");
    }
}

void connectToCloud() {
    if (WiFi.status() != WL_CONNECTED) {
        cloudConnected = false;
        return;
    }
    
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(onCloudMessage);
    
    Serial.print("Connecting to cloud...");
    
    if (client.connect(deviceId.c_str(), mqtt_user, mqtt_password)) {
        Serial.println(" connected!");
        cloudConnected = true;
        
        // Subscribe to command topics
        String commandTopic = "industrial/" + location + "/" + zone + "/commands/" + deviceId;
        client.subscribe(commandTopic.c_str());
        
        String broadcastTopic = "industrial/" + location + "/" + zone + "/commands/broadcast";
        client.subscribe(broadcastTopic.c_str());
        
    } else {
        Serial.println(" failed!");
        cloudConnected = false;
    }
}

void onCloudMessage(char* topic, byte* payload, unsigned int length) {
    String message = "";
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    
    Serial.println("Command received: " + message);
    
    StaticJsonDocument<256> doc;
    deserializeJson(doc, message);
    
    String command = doc["command"];
    
    if (command == "restart") {
        Serial.println("Restarting device...");
        delay(1000);
        ESP.restart();
    } else if (command == "status") {
        sendCurrentDataToCloud();
    } else if (command == "sync") {
        int synced = edgeStorage.syncWithCloud();
        Serial.printf("Manual sync completed: %d records\n", synced);
    }
}

void updateDisplay() {
    display.clear();
    
    // Title
    display.setFont(ArialMT_Plain_10);
    display.drawString(0, 0, "ESP32 Master Node");
    
    // Status line
    String status = cloudConnected ? "Cloud: ON" : "Cloud: OFF";
    status += " | WiFi: " + String(WiFi.RSSI()) + "dBm";
    display.drawString(0, 12, status);
    
    // Sensor data
    display.drawString(0, 24, "Temp: " + String(currentData.temperature, 1) + "°C");
    display.drawString(0, 36, "Hum:  " + String(currentData.humidity, 1) + "%");
    display.drawString(0, 48, "Pres: " + String(currentData.pressure, 1) + "hPa");
    
    // Buffer info
    display.drawString(75, 24, "Buf: " + String(dataBufferCount));
    display.drawString(75, 36, "Heap: " + String(ESP.getFreeHeap()/1024) + "KB");
    display.drawString(75, 48, "Status: " + currentData.status);
    
    display.display();
}

void performHealthCheck() {
    // Check memory
    if (ESP.getFreeHeap() < 50000) {
        Serial.println("WARNING: Low memory");
        sendAlert("Low memory warning");
    }
    
    // Check WiFi
    if (WiFi.status() != WL_CONNECTED) {
        Serial.println("WARNING: WiFi disconnected");
        connectToWiFi();
    }
    
    // Check cloud connection
    if (!cloudConnected) {
        Serial.println("WARNING: Cloud disconnected");
        connectToCloud();
    }
    
    Serial.println("Health check completed");
}

void sendAlert(const String& message) {
    if (cloudConnected) {
        StaticJsonDocument<256> alert;
        alert["deviceId"] = deviceId;
        alert["location"] = location;
        alert["zone"] = zone;
        alert["timestamp"] = timeClient.getEpochTime();
        alert["alertType"] = "WARNING";
        alert["message"] = message;
        alert["severity"] = "MEDIUM";
        
        String alertJson;
        serializeJson(alert, alertJson);
        
        String alertTopic = "industrial/" + location + "/" + zone + "/alerts";
        client.publish(alertTopic.c_str(), alertJson.c_str());
        
        Serial.println("Alert sent: " + message);
    }
}

Evaluación y Troubleshooting

Problemas Comunes

  • Conectividad intermitente:
    • Implementar reconexión automática
    • Buffer local robusto
    • Heartbeat monitoring
  • Autenticación fallida:
    • Verificar certificados SSL/TLS
    • Validar credenciales
    • Comprobar permisos IoT
  • Pérdida de datos:
    • Queue de mensajes MQTT
    • Almacenamiento persistente
    • Confirmación de recepción
  • Latencia alta:
    • Optimizar payload JSON
    • Batch processing
    • Compresión de datos

Criterios de Evaluación

  • Escalabilidad (25 pts):
    • Soporte >1000 dispositivos
    • Throughput >10k msg/sec
    • Auto-scaling configurado
  • Confiabilidad (25 pts):
    • 99.9% uptime
    • Tolerancia a fallos
    • Backup y recovery
  • Rendimiento (25 pts):
    • Latencia <100ms
    • Procesamiento tiempo real
    • Optimización recursos
  • Seguridad (25 pts):
    • Cifrado extremo a extremo
    • Autenticación robusta
    • Audit logging

Herramientas de Debugging y Monitoreo

Monitoreo en Tiempo Real:
  • AWS CloudWatch / Google Monitoring
  • Grafana + Prometheus
  • ELK Stack (Elasticsearch, Logstash, Kibana)
  • New Relic / DataDog
Testing y Validación:
  • MQTT.fx / MQTT Explorer
  • Postman para APIs REST
  • Load testing con JMeter
  • Unit tests con Jest/PyTest
Desarrollo y Deploy:
  • Docker Compose
  • Kubernetes
  • CI/CD con GitHub Actions
  • Infrastructure as Code (Terraform)