Introducción
En esta clase aprenderemos a implementar una base de datos ligera SQLite para almacenar las lecturas de sensores provenientes de nuestro ESP32 equipado con DHT11. SQLite es perfecta para aplicaciones IoT ya que es embebida, no requiere servidor y ofrece excelente rendimiento para el volumen de datos típico de sensores.
Integraremos SQLite con Flask para crear un sistema completo de almacenamiento histórico que recibirá datos vía MQTT desde nuestro ESP32, los almacenará en base de datos y proporcionará endpoints para consultar el histórico de temperaturas y humedad.
Conceptos Fundamentales
¿Por qué SQLite para IoT?
SQLite es la elección ideal para proyectos IoT por varias razones fundamentales:
- Base de datos embebida: No requiere servidor independiente, se ejecuta en el mismo proceso que la aplicación
- Ligera: La biblioteca completa ocupa menos de 1MB
- Sin configuración: No requiere instalación ni administración de servidor
- ACID compliant: Garantiza la integridad de los datos
- Multiplataforma: Funciona en cualquier sistema operativo
- Rendimiento: Excelente para aplicaciones con hasta 100,000 lecturas por día
Diseño de Esquema para Datos de Sensores
Para optimizar el almacenamiento de datos de sensores, consideramos estos aspectos:
- Indexación temporal: Índices en timestamps para consultas rápidas por rango de fechas
- Tipos de datos eficientes: REAL para sensores, INTEGER para timestamps
- Constraints: Validación de rangos de datos para evitar lecturas erróneas
- Particionamiento lógico: Separación por tipo de sensor o dispositivo
Integración con Flask y SQLAlchemy
SQLAlchemy ORM nos proporciona:
- Mapeo objeto-relacional: Trabajar con objetos Python en lugar de SQL directo
- Migraciones: Evolución controlada del esquema de base de datos
- Connection pooling: Gestión eficiente de conexiones
- Validación: Validación automática de tipos y constraints
Patrones de Almacenamiento para Sensores
Implementaremos patrones específicos para datos de sensores:
- Time-series storage: Almacenamiento optimizado para series temporales
- Batch inserts: Inserción por lotes para mejor rendimiento
- Data retention: Políticas de retención para controlar el crecimiento
- Aggregations: Pre-cálculo de estadísticas (promedios, máximos, mínimos)
Implementación Práctica
1. Configuración ESP32 para Envío de Datos
Primero configuramos nuestro ESP32 para enviar datos estructurados vía MQTT:
Código ESP32 - Sensor DHT11 con MQTT:
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
#include <time.h>
// Configuración DHT11
#define DHT_PIN 4
#define DHT_TYPE DHT11
DHT dht(DHT_PIN, DHT_TYPE);
// Configuración WiFi
const char* ssid = "TU_WIFI";
const char* password = "TU_PASSWORD";
// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
const char* mqtt_topic = "sensors/dht11/data";
const char* device_id = "ESP32_DHT11_001";
WiFiClient espClient;
PubSubClient client(espClient);
// Variables de control
unsigned long lastReading = 0;
const long interval = 30000; // Lectura cada 30 segundos
int readingCount = 0;
void setup() {
Serial.begin(115200);
dht.begin();
// Conectar WiFi
WiFi.begin(ssid, password);
Serial.print("Conectando a WiFi");
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.print("Conectado! IP: ");
Serial.println(WiFi.localIP());
// Configurar cliente MQTT
client.setServer(mqtt_server, mqtt_port);
client.setCallback(onMqttMessage);
// Configurar sincronización de tiempo NTP
configTime(0, 0, "pool.ntp.org");
Serial.println("ESP32 DHT11 con SQLite Storage iniciado");
}
void loop() {
// Mantener conexión MQTT
if (!client.connected()) {
reconnectMQTT();
}
client.loop();
// Leer sensores según intervalo
unsigned long now = millis();
if (now - lastReading > interval) {
readAndPublishSensorData();
lastReading = now;
}
delay(100);
}
void readAndPublishSensorData() {
// Leer DHT11
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
// Validar lecturas
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error: Fallo en lectura DHT11");
return;
}
// Validar rangos razonables
if (temperature < -40 || temperature > 80) {
Serial.println("Error: Temperatura fuera de rango");
return;
}
if (humidity < 0 || humidity > 100) {
Serial.println("Error: Humedad fuera de rango");
return;
}
// Obtener timestamp Unix
time_t now = time(nullptr);
// Crear JSON estructurado
StaticJsonDocument<200> doc;
doc["device_id"] = device_id;
doc["timestamp"] = now;
doc["reading_id"] = readingCount++;
doc["temperature"] = round(temperature * 100) / 100.0; // 2 decimales
doc["humidity"] = round(humidity * 100) / 100.0;
doc["battery_level"] = 100; // Simulado para ESP32 con alimentación
doc["signal_strength"] = WiFi.RSSI();
// Convertir a string JSON
String jsonString;
serializeJson(doc, jsonString);
// Publicar vía MQTT
if (client.publish(mqtt_topic, jsonString.c_str())) {
Serial.println("Datos enviados: " + jsonString);
} else {
Serial.println("Error enviando datos MQTT");
}
}
void reconnectMQTT() {
while (!client.connected()) {
Serial.print("Conectando MQTT...");
String clientId = "ESP32Client-" + String(random(0xffff), HEX);
if (client.connect(clientId.c_str())) {
Serial.println("Conectado al broker MQTT");
// Suscribirse a tópico de configuración
client.subscribe("config/dht11/interval");
} else {
Serial.print("Fallo conexión MQTT, rc=");
Serial.print(client.state());
Serial.println(" Reintentando en 5 segundos");
delay(5000);
}
}
}
void onMqttMessage(char* topic, byte* payload, unsigned int length) {
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println("Mensaje recibido [" + String(topic) + "]: " + message);
// Procesar configuración remota
if (String(topic) == "config/dht11/interval") {
int newInterval = message.toInt() * 1000; // Convertir a ms
if (newInterval >= 5000 && newInterval <= 300000) { // 5s a 5min
// interval = newInterval; // Actualizar intervalo
Serial.println("Intervalo actualizado a: " + String(newInterval/1000) + " segundos");
}
}
}
2. Configuración de Base de Datos SQLite
Creamos la estructura de base de datos optimizada para datos de sensores:
Models.py - Modelos SQLAlchemy:
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
import json
db = SQLAlchemy()
class Device(db.Model):
__tablename__ = 'devices'
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.String(50), unique=True, nullable=False, index=True)
device_name = db.Column(db.String(100), nullable=False)
location = db.Column(db.String(100))
device_type = db.Column(db.String(50), nullable=False)
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_seen = db.Column(db.DateTime)
# Relación con lecturas
sensor_readings = db.relationship('SensorReading', backref='device', lazy='dynamic')
def __repr__(self):
return f'<Device {self.device_id}>'
def to_dict(self):
return {
'id': self.id,
'device_id': self.device_id,
'device_name': self.device_name,
'location': self.location,
'device_type': self.device_type,
'is_active': self.is_active,
'last_seen': self.last_seen.isoformat() if self.last_seen else None
}
class SensorReading(db.Model):
__tablename__ = 'sensor_readings'
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.String(50), db.ForeignKey('devices.device_id'), nullable=False, index=True)
timestamp = db.Column(db.DateTime, nullable=False, index=True)
reading_id = db.Column(db.Integer) # ID secuencial del dispositivo
# Datos del sensor DHT11
temperature = db.Column(db.Float, nullable=False)
humidity = db.Column(db.Float, nullable=False)
# Metadatos
battery_level = db.Column(db.Integer)
signal_strength = db.Column(db.Integer)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Constraints para validar rangos
__table_args__ = (
db.CheckConstraint('temperature >= -40 AND temperature <= 80', name='check_temperature_range'),
db.CheckConstraint('humidity >= 0 AND humidity <= 100', name='check_humidity_range'),
db.CheckConstraint('battery_level >= 0 AND battery_level <= 100', name='check_battery_range'),
db.Index('idx_device_timestamp', 'device_id', 'timestamp'),
)
def __repr__(self):
return f'<SensorReading {self.device_id} - {self.timestamp}>'
def to_dict(self):
return {
'id': self.id,
'device_id': self.device_id,
'timestamp': self.timestamp.isoformat(),
'temperature': self.temperature,
'humidity': self.humidity,
'battery_level': self.battery_level,
'signal_strength': self.signal_strength
}
class HourlyStats(db.Model):
__tablename__ = 'hourly_stats'
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.String(50), db.ForeignKey('devices.device_id'), nullable=False)
hour_timestamp = db.Column(db.DateTime, nullable=False) # Inicio de la hora
# Estadísticas de temperatura
temp_min = db.Column(db.Float)
temp_max = db.Column(db.Float)
temp_avg = db.Column(db.Float)
temp_count = db.Column(db.Integer)
# Estadísticas de humedad
humidity_min = db.Column(db.Float)
humidity_max = db.Column(db.Float)
humidity_avg = db.Column(db.Float)
humidity_count = db.Column(db.Integer)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
__table_args__ = (
db.UniqueConstraint('device_id', 'hour_timestamp', name='unique_device_hour'),
db.Index('idx_device_hour', 'device_id', 'hour_timestamp'),