Base de datos ligera (SQLite) para almacenar lecturas de sensores

Integración SQLite con Flask, modelos de datos, almacenamiento histórico de sensores

Módulo 4 ⏱️ 3 horas 🛠️ ESP32 + DHT11 🌐 Flask + MQTT

Introducción

En esta clase aprenderemos a implementar una base de datos ligera SQLite para almacenar las lecturas de sensores provenientes de nuestro ESP32 equipado con DHT11. SQLite es perfecta para aplicaciones IoT ya que es embebida, no requiere servidor y ofrece excelente rendimiento para el volumen de datos típico de sensores.

Integraremos SQLite con Flask para crear un sistema completo de almacenamiento histórico que recibirá datos vía MQTT desde nuestro ESP32, los almacenará en base de datos y proporcionará endpoints para consultar el histórico de temperaturas y humedad.

Objetivo de la clase: Crear un sistema completo de almacenamiento histórico para datos de sensores utilizando SQLite, Flask y MQTT, incluyendo modelos de datos optimizados y consultas eficientes.

Conceptos Fundamentales

¿Por qué SQLite para IoT?

SQLite es la elección ideal para proyectos IoT por varias razones fundamentales:

  • Base de datos embebida: No requiere servidor independiente, se ejecuta en el mismo proceso que la aplicación
  • Ligera: La biblioteca completa ocupa menos de 1MB
  • Sin configuración: No requiere instalación ni administración de servidor
  • ACID compliant: Garantiza la integridad de los datos
  • Multiplataforma: Funciona en cualquier sistema operativo
  • Rendimiento: Excelente para aplicaciones con hasta 100,000 lecturas por día

Diseño de Esquema para Datos de Sensores

Para optimizar el almacenamiento de datos de sensores, consideramos estos aspectos:

  • Indexación temporal: Índices en timestamps para consultas rápidas por rango de fechas
  • Tipos de datos eficientes: REAL para sensores, INTEGER para timestamps
  • Constraints: Validación de rangos de datos para evitar lecturas erróneas
  • Particionamiento lógico: Separación por tipo de sensor o dispositivo

Integración con Flask y SQLAlchemy

SQLAlchemy ORM nos proporciona:

  • Mapeo objeto-relacional: Trabajar con objetos Python en lugar de SQL directo
  • Migraciones: Evolución controlada del esquema de base de datos
  • Connection pooling: Gestión eficiente de conexiones
  • Validación: Validación automática de tipos y constraints

Patrones de Almacenamiento para Sensores

Implementaremos patrones específicos para datos de sensores:

  • Time-series storage: Almacenamiento optimizado para series temporales
  • Batch inserts: Inserción por lotes para mejor rendimiento
  • Data retention: Políticas de retención para controlar el crecimiento
  • Aggregations: Pre-cálculo de estadísticas (promedios, máximos, mínimos)

Implementación Práctica

1. Configuración ESP32 para Envío de Datos

Primero configuramos nuestro ESP32 para enviar datos estructurados vía MQTT:

Código ESP32 - Sensor DHT11 con MQTT:


#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
#include <time.h>

// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);

// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";

// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_topic = "sensors/dht11/data";
const char* device_id = "ESP32_DHT11_001";

WiFiClient espClient;
PubSubClient client(espClient);

// Variables de control
unsigned long lastReading = 0;
const long interval = 30000; // Lectura cada 30 segundos
int readingCount = 0;

void setup() {
    Serial.begin(115200);
    dht.begin();
    
    // Conectar WiFi
    WiFi.begin(ssid, password);
    Serial.print("Conectando a WiFi");
    
    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }
    
    Serial.println();
    Serial.print("Conectado! IP: ");
    Serial.println(WiFi.localIP());
    
    // Configurar cliente MQTT
    client.setServer(mqtt_server, mqtt_port);
    client.setCallback(onMqttMessage);
    
    // Configurar sincronización de tiempo NTP
    configTime(0, 0, "pool.ntp.org");
    
    Serial.println("ESP32 DHT11 con SQLite Storage iniciado");
}

void loop() {
    // Mantener conexión MQTT
    if (!client.connected()) {
        reconnectMQTT();
    }
    client.loop();
    
    // Leer sensores según intervalo
    unsigned long now = millis();
    if (now - lastReading > interval) {
        readAndPublishSensorData();
        lastReading = now;
    }
    
    delay(100);
}

void readAndPublishSensorData() {
    // Leer DHT11
    float humidity = dht.readHumidity();
    float temperature = dht.readTemperature();
    
    // Validar lecturas
    if (isnan(humidity) || isnan(temperature)) {
        Serial.println("Error: Fallo en lectura DHT11");
        return;
    }
    
    // Validar rangos razonables
    if (temperature < -40 || temperature > 80) {
        Serial.println("Error: Temperatura fuera de rango");
        return;
    }
    
    if (humidity < 0 || humidity > 100) {
        Serial.println("Error: Humedad fuera de rango");
        return;
    }
    
    // Obtener timestamp Unix
    time_t now = time(nullptr);
    
    // Crear JSON estructurado
    StaticJsonDocument<200> doc;
    doc["device_id"] = device_id;
    doc["timestamp"] = now;
    doc["reading_id"] = readingCount++;
    doc["temperature"] = round(temperature * 100) / 100.0; // 2 decimales
    doc["humidity"] = round(humidity * 100) / 100.0;
    doc["battery_level"] = 100; // Simulado para ESP32 con alimentación
    doc["signal_strength"] = WiFi.RSSI();
    
    // Convertir a string JSON
    String jsonString;
    serializeJson(doc, jsonString);
    
    // Publicar vía MQTT
    if (client.publish(mqtt_topic, jsonString.c_str())) {
        Serial.println("Datos enviados: " + jsonString);
    } else {
        Serial.println("Error enviando datos MQTT");
    }
}

void reconnectMQTT() {
    while (!client.connected()) {
        Serial.print("Conectando MQTT...");
        
        String clientId = "ESP32Client-" + String(random(0xffff), HEX);
        
        if (client.connect(clientId.c_str())) {
            Serial.println("Conectado al broker MQTT");
            // Suscribirse a tópico de configuración
            client.subscribe("config/dht11/interval");
        } else {
            Serial.print("Fallo conexión MQTT, rc=");
            Serial.print(client.state());
            Serial.println(" Reintentando en 5 segundos");
            delay(5000);
        }
    }
}

void onMqttMessage(char* topic, byte* payload, unsigned int length) {
    String message;
    for (int i = 0; i < length; i++) {
        message += (char)payload[i];
    }
    
    Serial.println("Mensaje recibido [" + String(topic) + "]: " + message);
    
    // Procesar configuración remota
    if (String(topic) == "config/dht11/interval") {
        int newInterval = message.toInt() * 1000; // Convertir a ms
        if (newInterval >= 5000 && newInterval <= 300000) { // 5s a 5min
            // interval = newInterval; // Actualizar intervalo
            Serial.println("Intervalo actualizado a: " + String(newInterval/1000) + " segundos");
        }
    }
}

2. Configuración de Base de Datos SQLite

Creamos la estructura de base de datos optimizada para datos de sensores:

Models.py - Modelos SQLAlchemy:


from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
import json

db = SQLAlchemy()

class Device(db.Model):
    __tablename__ = 'devices'
    
    id = db.Column(db.Integer, primary_key=True)
    device_id = db.Column(db.String(50), unique=True, nullable=False, index=True)
    device_name = db.Column(db.String(100), nullable=False)
    location = db.Column(db.String(100))
    device_type = db.Column(db.String(50), nullable=False)
    is_active = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    last_seen = db.Column(db.DateTime)
    
    # Relación con lecturas
    sensor_readings = db.relationship('SensorReading', backref='device', lazy='dynamic')
    
    def __repr__(self):
        return f'<Device {self.device_id}>'
    
    def to_dict(self):
        return {
            'id': self.id,
            'device_id': self.device_id,
            'device_name': self.device_name,
            'location': self.location,
            'device_type': self.device_type,
            'is_active': self.is_active,
            'last_seen': self.last_seen.isoformat() if self.last_seen else None
        }

class SensorReading(db.Model):
    __tablename__ = 'sensor_readings'
    
    id = db.Column(db.Integer, primary_key=True)
    device_id = db.Column(db.String(50), db.ForeignKey('devices.device_id'), nullable=False, index=True)
    timestamp = db.Column(db.DateTime, nullable=False, index=True)
    reading_id = db.Column(db.Integer)  # ID secuencial del dispositivo
    
    # Datos del sensor DHT11
    temperature = db.Column(db.Float, nullable=False)
    humidity = db.Column(db.Float, nullable=False)
    
    # Metadatos
    battery_level = db.Column(db.Integer)
    signal_strength = db.Column(db.Integer)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Constraints para validar rangos
    __table_args__ = (
        db.CheckConstraint('temperature >= -40 AND temperature <= 80', name='check_temperature_range'),
        db.CheckConstraint('humidity >= 0 AND humidity <= 100', name='check_humidity_range'),
        db.CheckConstraint('battery_level >= 0 AND battery_level <= 100', name='check_battery_range'),
        db.Index('idx_device_timestamp', 'device_id', 'timestamp'),
    )
    
    def __repr__(self):
        return f'<SensorReading {self.device_id} - {self.timestamp}>'
    
    def to_dict(self):
        return {
            'id': self.id,
            'device_id': self.device_id,
            'timestamp': self.timestamp.isoformat(),
            'temperature': self.temperature,
            'humidity': self.humidity,
            'battery_level': self.battery_level,
            'signal_strength': self.signal_strength
        }

class HourlyStats(db.Model):
    __tablename__ = 'hourly_stats'
    
    id = db.Column(db.Integer, primary_key=True)
    device_id = db.Column(db.String(50), db.ForeignKey('devices.device_id'), nullable=False)
    hour_timestamp = db.Column(db.DateTime, nullable=False)  # Inicio de la hora
    
    # Estadísticas de temperatura
    temp_min = db.Column(db.Float)
    temp_max = db.Column(db.Float)
    temp_avg = db.Column(db.Float)
    temp_count = db.Column(db.Integer)
    
    # Estadísticas de humedad
    humidity_min = db.Column(db.Float)
    humidity_max = db.Column(db.Float)
    humidity_avg = db.Column(db.Float)
    humidity_count = db.Column(db.Integer)
    
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    __table_args__ = (
        db.UniqueConstraint('device_id', 'hour_timestamp', name='unique_device_hour'),
        db.Index('idx_device_hour', 'device_id', 'hour_timestamp'),