Introducción
En esta clase aprenderemos a crear endpoints REST usando Flask para exponer los datos de sensores recolectados a través de MQTT desde nuestro ESP32 con DHT11. Los endpoints REST nos permitirán acceder a los datos de temperatura y humedad de manera estructurada y estandarizada, facilitando la integración con aplicaciones web, móviles o sistemas de terceros.
Al finalizar esta sesión, serás capaz de diseñar e implementar una API REST completa que sirva datos en tiempo real y históricos, aplicando las mejores prácticas de desarrollo de APIs y manejo de datos JSON.
Conceptos Fundamentales
¿Qué es una API REST?
REST (Representational State Transfer) es un estilo arquitectónico para diseñar servicios web. Una API REST utiliza métodos HTTP estándar para realizar operaciones sobre recursos identificados por URLs.
Principios fundamentales de REST:
- Stateless: Cada petición contiene toda la información necesaria
- Uniform Interface: Interfaz consistente usando métodos HTTP estándar
- Resource-Based: Los datos se exponen como recursos con URLs únicas
- Representation: Los recursos pueden tener múltiples representaciones (JSON, XML, etc.)
Métodos HTTP en REST:
- GET: Obtener datos (lectura)
- POST: Crear nuevos recursos
- PUT: Actualizar recursos existentes
- DELETE: Eliminar recursos
- PATCH: Actualización parcial de recursos
Serialización JSON
JSON (JavaScript Object Notation) es el formato estándar para intercambio de datos en APIs REST modernas. Es ligero, legible y compatible con prácticamente todos los lenguajes de programación.
Códigos de Estado HTTP:
- 200 OK: Petición exitosa
- 201 Created: Recurso creado exitosamente
- 400 Bad Request: Error en la petición del cliente
- 404 Not Found: Recurso no encontrado
- 500 Internal Server Error: Error del servidor
Implementación Práctica
Paso 1: Configuración del ESP32 con DHT11
Primero configuramos nuestro ESP32 para enviar datos del DHT11 vía MQTT:
Código ESP32 - Envío de datos del sensor:
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <ArduinoJson.h>
// Configuración WiFi
const char* ssid = "TU_RED_WIFI";
const char* password = "TU_PASSWORD";
// Configuración MQTT
const char* mqtt_server = "192.168.1.100";
const char* mqtt_topic = "sensors/dht11";
const char* device_id = "ESP32_DHT11_001";
// Configuración DHT11
#define DHTPIN 4
#define DHTTYPE DHT11
DHT dht(DHTPIN, DHTTYPE);
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
const long interval = 30000; // Enviar cada 30 segundos
void setup() {
Serial.begin(115200);
dht.begin();
// Conectar WiFi
setup_wifi();
// Configurar MQTT
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
}
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Conectando a ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi conectado");
Serial.println("IP: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
// Manejar mensajes entrantes si es necesario
}
void reconnect() {
while (!client.connected()) {
Serial.print("Intentando conexión MQTT...");
if (client.connect(device_id)) {
Serial.println("conectado");
} else {
Serial.print("falló, rc=");
Serial.print(client.state());
Serial.println(" reintentando en 5 segundos");
delay(5000);
}
}
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > interval) {
lastMsg = now;
// Leer sensor DHT11
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
// Verificar si las lecturas son válidas
if (isnan(humidity) || isnan(temperature)) {
Serial.println("Error leyendo el sensor DHT11!");
return;
}
// Crear objeto JSON
StaticJsonDocument<200> doc;
doc["device_id"] = device_id;
doc["timestamp"] = now;
doc["temperature"] = temperature;
doc["humidity"] = humidity;
doc["location"] = "Laboratorio IoT";
char buffer[256];
serializeJson(doc, buffer);
// Publicar datos
client.publish(mqtt_topic, buffer);
Serial.println("Datos enviados:");
Serial.println(buffer);
}
}
Paso 2: Servidor Flask con MQTT y almacenamiento de datos
Instalación de dependencias:
pip install flask paho-mqtt sqlite3 flask-cors
Servidor Flask completo con API REST:
from flask import Flask, jsonify, request
from flask_cors import CORS
import paho.mqtt.client as mqtt
import sqlite3
import json
import threading
import datetime
from typing import Dict, List, Optional
app = Flask(__name__)
CORS(app)
# Configuración
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensors/dht11"
DATABASE = "sensor_data.db"
# Variables globales para datos en tiempo real
latest_data = {}
connected_devices = set()
class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Inicializar la base de datos SQLite"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
temperature REAL NOT NULL,
humidity REAL NOT NULL,
location TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def insert_reading(self, device_id: str, temperature: float,
humidity: float, location: str = None):
"""Insertar nueva lectura en la base de datos"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO sensor_readings (device_id, temperature, humidity, location)
VALUES (?, ?, ?, ?)
''', (device_id, temperature, humidity, location))
conn.commit()
conn.close()
def get_latest_readings(self, limit: int = 10) -> List[Dict]:
"""Obtener las últimas lecturas"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM sensor_readings
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_readings_by_device(self, device_id: str, limit: int = 50) -> List[Dict]:
"""Obtener lecturas por dispositivo"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM sensor_readings
WHERE device_id = ?
ORDER BY timestamp DESC
LIMIT ?
''', (device_id, limit))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
def get_readings_by_date_range(self, start_date: str,
end_date: str) -> List[Dict]:
"""Obtener lecturas por rango de fechas"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM sensor_readings
WHERE timestamp BETWEEN ? AND ?
ORDER BY timestamp DESC
''', (start_date, end_date))
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
# Inicializar gestor de base de datos
db = DatabaseManager(DATABASE)
# Configuración MQTT
def on_connect(client, userdata, flags, rc):
"""Callback de conexión MQTT"""
if rc == 0:
print(f"Conectado al broker MQTT con código: {rc}")
client.subscribe(MQTT_TOPIC)
else:
print(f"Error conectando al broker MQTT: {rc}")
def on_message(client, userdata, msg):
"""Callback para mensajes MQTT recibidos"""
global latest_data, connected_devices
try:
# Decodificar mensaje JSON
payload = json.loads(msg.payload.decode())
# Actualizar datos en tiempo real
device_id = payload.get('device_id')
latest_data[device_id] = {
'temperature': payload.get('temperature'),
'humidity': payload.get('humidity'),
'timestamp': datetime.datetime.now().isoformat(),
'location': payload.get('location'),
'device_id': device_id
}
# Agregar dispositivo a la lista de conectados
connected_devices.add(device_id)
# Guardar en base de datos
db.insert_reading(
device_id=device_id,
temperature=payload.get('temperature'),
humidity=payload.get('humidity'),
location=payload.get('location')
)
print(f"Datos recibidos de {device_id}: T={payload.get('temperature')}°C, H={payload.get('humidity')}%")
except json.JSONDecodeError:
print("Error decodificando mensaje JSON")
except Exception as e:
print(f"Error procesando mensaje: {e}")
# Inicializar cliente MQTT
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
def start_mqtt():
"""Iniciar cliente MQTT en hilo separado"""
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_forever()
except Exception as e:
print(f"Error conectando a MQTT: {e}")
# ENDPOINTS REST API
@app.route('/api/health', methods=['GET'])
def health_check():
"""Endpoint de verificación de salud de la API"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.datetime.now().isoformat(),
'connected_devices': len(connected_devices),
'mqtt_connected': mqtt_client.is_connected()
}), 200
@app.route('/api/sensors/current', methods=['GET'])
def get_current_readings():
"""Obtener lecturas actuales de todos los sensores"""
if not latest_data:
return jsonify({
'error': 'No hay datos disponibles',
'message': 'No se han recibido datos de sensores'
}), 404
return jsonify({
'data': latest_data,
'count': len(latest_data),
'timestamp': datetime.datetime.now().isoformat()
}), 200
@app.route('/api/sensors/<device_id>/current', methods=['GET'])
def get_current_reading_by_device(device_id: str):
"""Obtener lectura actual de un dispositivo específico"""
if device_id not in latest_data:
return jsonify({
'error': 'Dispositivo no encontrado',
'message': f'No hay datos para el dispositivo {device_id}'
}), 404
return jsonify({
'data': latest_data[device_id],
'timestamp': datetime.datetime.now().isoformat()
}), 200
@app.route('/api/sensors/history', methods=['GET'])
def get_historical_readings():
"""Obtener historial de lecturas con paginación"""
limit = request.args.get('limit', 50, type=int)
limit = min(limit, 1000) # Máximo 1000 registros
try:
readings = db.get_latest_readings(limit