Data quality en pipelines pequeños: validaciones que merecen la pena desde el día uno
Validaciones sencillas para evitar datos basura en pipelines reales: nulos, duplicados, tipos y alertas con Python.

Hace poco revisé un pipeline que llevaba meses funcionando “sin problemas”. Cada día ingería datos de una API, los transformaba y los cargaba en una base de datos que alimentaba un dashboard. Nadie se quejaba. Hasta que alguien hizo una pregunta incómoda: “¿Por qué el revenue de abril es un 20% más bajo que el del año pasado?”. Después de dos días investigando, resultó que la API había empezado a devolver campos vacíos en un porcentaje de registros y el pipeline los estaba cargando alegremente como nulos. Las métricas se calculaban sobre datos incompletos. Nadie se enteró porque no había una sola validación.
Esto no pasó en un sistema legacy de una gran empresa. Pasó en un pipeline pequeño, mantenido por una persona, que hacía exactamente lo que se le pidió: mover datos de A a B. El problema no era el código. Era la ausencia total de controles sobre la calidad de lo que se movía.
Data quality no es un producto enterprise
Cuando oyes “data quality” probablemente pienses en herramientas caras, equipos dedicados y frameworks con dashboards impresionantes. Y sí, eso existe. Pero la realidad es que el 80% de los problemas de calidad de datos se detectan con validaciones que puedes escribir en 20 líneas de Python.
La validación de datos no es un proyecto. Es un hábito. Y como todos los hábitos, cuesta más empezar que mantenerlo.
No necesitas Great Expectations configurado con un deployment completo para saber si tus datos tienen nulos donde no deberían. No necesitas un data observability platform para detectar que la API te devolvió cero registros cuando normalmente devuelve miles. Necesitas asserts, ifs y un canal de Slack.
Las cinco validaciones mínimas que siempre deberías tener
Después de romperme la cabeza con pipelines que parecían funcionar y no lo hacían, llegué a una lista de validaciones que pongo en todos mis pipelines desde el día uno. Son cinco y no lleva más de una hora implementarlas.
1. El pipeline recibió datos
Parece obvio, pero no lo es. APIs que devuelven respuestas vacías, queries que no encuentran filas nuevas, archivos CSV que llegan vacíos. Si tu pipeline procesa cero registros y no avisa, tienes un problema silencioso.
def validate_not_empty(df: pd.DataFrame, source_name: str):
"""Valida que el DataFrame no esté vacío."""
if df.empty:
raise ValueError(
f"[QUALITY] {source_name}: se recibieron 0 registros. "
f"Esto no es normal. Revisa la fuente."
)
print(f"[QUALITY] {source_name}: {len(df)} registros recibidos. OK.")2. Los campos obligatorios no son nulos
Cada dataset tiene campos que nunca deberían ser nulos. Un pedido sin order_id, un usuario sin email, una transacción sin amount. Si alguno de estos llega nulo, o la fuente tiene un problema o tu extracción se rompió.
def validate_no_nulls(df: pd.DataFrame, required_columns: list[str]):
"""Valida que las columnas obligatorias no tengan nulos."""
for col in required_columns:
null_count = df[col].isnull().sum()
if null_count > 0:
null_pct = (null_count / len(df)) * 100
raise ValueError(
f"[QUALITY] Columna '{col}': {null_count} nulos "
f"({null_pct:.1f}%). No debería tener nulos."
)
print(f"[QUALITY] Campos obligatorios sin nulos. OK.")3. No hay duplicados donde no debería haberlos
Los duplicados son la basura más común en pipelines. A veces la API envía el mismo registro dos veces. A veces tu pipeline se ejecuta dos veces por un error de programación. A veces la fuente tiene un bug. Si no verificas, tus métricas se inflan.
def validate_no_duplicates(df: pd.DataFrame, key_columns: list[str]):
"""Valida que no haya filas duplicadas por las columnas clave."""
duplicates = df.duplicated(subset=key_columns, keep=False)
dup_count = duplicates.sum()
if dup_count > 0:
sample = df[duplicates].head(3)[key_columns]
raise ValueError(
f"[QUALITY] {dup_count} filas duplicadas por {key_columns}.\n"
f"Ejemplo:\n{sample.to_string()}"
)
print(f"[QUALITY] Sin duplicados por {key_columns}. OK.")4. Los tipos de datos son los esperados
Un campo total que llega como string en vez de numérico. Un created_at que de repente tiene formato americano en vez de ISO. Un campo booleano que ahora incluye “yes”, “no”, “maybe”. Si no validas tipos, las transformaciones posteriores fallan de formas impredecibles.
def validate_types(df: pd.DataFrame, expected_types: dict):
"""
Valida que los tipos de las columnas sean los esperados.
expected_types: {'total_amount': 'float64', 'order_id': 'object'}
"""
errors = []
for col, expected in expected_types.items():
actual = str(df[col].dtype)
if actual != expected:
errors.append(
f" - '{col}': esperado {expected}, recibido {actual}"
)
if errors:
raise TypeError(
f"[QUALITY] Tipos incorrectos:\n" + "\n".join(errors)
)
print(f"[QUALITY] Tipos de datos correctos. OK.")5. Los valores están dentro de rangos razonables
Un pedido con importe negativo. Una fecha en el año 1900. Un porcentaje de 350%. Estos valores técnicamente no son nulos ni duplicados, pero son basura. Las validaciones de rango son el último filtro antes de que un dato absurdo llegue a producción.
def validate_ranges(df: pd.DataFrame, range_rules: dict):
"""
Valida que los valores estén dentro de rangos esperados.
range_rules: {'total_amount': {'min': 0, 'max': 100000}}
"""
for col, rules in range_rules.items():
if 'min' in rules:
violations = (df[col] < rules['min']).sum()
if violations > 0:
raise ValueError(
f"[QUALITY] '{col}': {violations} valores por debajo "
f"del mínimo ({rules['min']})"
)
if 'max' in rules:
violations = (df[col] > rules['max']).sum()
if violations > 0:
raise ValueError(
f"[QUALITY] '{col}': {violations} valores por encima "
f"del máximo ({rules['max']})"
)
print(f"[QUALITY] Rangos dentro de los esperados. OK.")Juntando todo: un validador completo
En la práctica, estas validaciones se ejecutan como un paso del pipeline, entre la ingestión y la transformación. Si alguna falla, el pipeline se detiene y alguien recibe una alerta.
import pandas as pd
def validate_orders(df: pd.DataFrame):
"""Ejecuta todas las validaciones sobre los pedidos ingestados."""
# 1. No vacío
validate_not_empty(df, source_name="orders")
# 2. Campos obligatorios sin nulos
validate_no_nulls(df, required_columns=[
'order_id', 'customer_id', 'total_amount', 'status'
])
# 3. Sin duplicados por order_id
validate_no_duplicates(df, key_columns=['order_id'])
# 4. Tipos correctos
validate_types(df, expected_types={
'order_id': 'object',
'customer_id': 'object',
'total_amount': 'float64',
})
# 5. Rangos razonables
validate_ranges(df, range_rules={
'total_amount': {'min': 0, 'max': 50000},
})
print(f"[QUALITY] Todas las validaciones pasaron. Pipeline continúa.")Nulos: el enemigo que siempre vuelve
Los nulos merecen un tratamiento aparte porque no todos son iguales. Hay nulos que significan “no se proporcionó el dato”, nulos que significan “la API falló”, y nulos que significan “este campo no aplica”. Tratarlos a todos igual es un error.
Estrategia para manejar nulos
| Tipo de nulo | Ejemplo | Acción |
|---|---|---|
| Campo obligatorio nulo | order_id vacío | Rechazar el registro |
| Campo opcional nulo | phone_number vacío | Aceptar, rellenar con valor por defecto o marcar |
| Nulo por fallo de la fuente | Toda una columna nula | Alertar y detener el pipeline |
| Nulo por diseño | discount_code cuando no hay descuento | Aceptar como está |
def handle_nulls(df: pd.DataFrame) -> pd.DataFrame:
"""Estrategia de nulos para pedidos."""
# Campos obligatorios: rechazar si son nulos
critical_nulls = df[['order_id', 'total_amount']].isnull().any(axis=1)
rejected = df[critical_nulls]
if len(rejected) > 0:
print(f"[QUALITY] Rechazados {len(rejected)} registros por nulos críticos")
df = df[~critical_nulls].copy()
# Campos opcionales: rellenar con valor por defecto
df['discount_code'] = df['discount_code'].fillna('NONE')
df['notes'] = df['notes'].fillna('')
# Detección de anomalía: columna entera nula
for col in df.columns:
if df[col].isnull().all():
raise ValueError(
f"[QUALITY] La columna '{col}' está completamente nula. "
f"Posible fallo en la fuente."
)
return dfDuplicados: no siempre es tan simple como drop_duplicates
La deduplicación ingenua es peligrosa. Un df.drop_duplicates() sin pensar puede eliminar registros legítimos o quedarse con la versión equivocada de un dato.
Preguntas antes de deduplicar
- ¿Qué define un duplicado? No siempre es la fila entera. A veces es solo una clave de negocio.
- ¿Con cuál te quedas? El más reciente, el primero, el más completo.
- ¿Los “duplicados” son realmente duplicados? A veces son actualizaciones del mismo registro.
def deduplicate_orders(df: pd.DataFrame) -> pd.DataFrame:
"""
Deduplicación inteligente de pedidos.
Se queda con el registro más reciente por order_id.
"""
before = len(df)
# Ordenar por fecha de ingestión descendente
df = df.sort_values('_ingested_at', ascending=False)
# Quedarse con el primer registro (más reciente) de cada order_id
df = df.drop_duplicates(subset=['order_id'], keep='first')
after = len(df)
removed = before - after
if removed > 0:
pct = (removed / before) * 100
print(f"[QUALITY] Deduplicación: {removed} duplicados eliminados ({pct:.1f}%)")
# Si hay demasiados duplicados, algo raro pasa
if pct > 10:
print(
f"[WARNING] Tasa de duplicados inusualmente alta ({pct:.1f}%). "
f"Revisar la fuente."
)
return df.reset_index(drop=True)Usando Pandera para validaciones declarativas
Si las validaciones manuales se te quedan cortas o quieres algo más estructurado, Pandera es una librería que permite definir schemas de validación de forma declarativa. No es enterprise, no requiere infraestructura, y se integra directamente con pandas.
import pandera as pa
from pandera import Column, Check, DataFrameSchema
order_schema = DataFrameSchema({
"order_id": Column(
str,
Check.str_length(min_value=1),
nullable=False,
unique=True
),
"customer_id": Column(
str,
nullable=False
),
"total_amount": Column(
float,
Check.in_range(min_value=0, max_value=50000),
nullable=False
),
"status": Column(
str,
Check.isin(['pending', 'completed', 'shipped', 'cancelled']),
nullable=False
),
"order_created_at": Column(
"datetime64[ns]",
nullable=False
),
})
# Validar un DataFrame
try:
order_schema.validate(df, lazy=True)
print("[QUALITY] Schema Pandera validado correctamente.")
except pa.errors.SchemaErrors as err:
print(f"[QUALITY] Errores de validación:\n{err.failure_cases}")Lo que me gusta de Pandera es que el schema sirve como documentación viva. Cualquiera que mire el código sabe exactamente qué esperar de esos datos.
Alertas: de nada sirve validar si nadie se entera
La validación más perfecta del mundo es inútil si falla y nadie lo sabe. El pipeline se detiene, los datos no llegan, y tres días después alguien pregunta por qué el dashboard está vacío.
Principio básico de alertas en pipelines
No alertes de todo. Alerta de lo que requiere acción. Si mandas 50 alertas diarias, la gente las ignora. Si mandas una cuando de verdad hay un problema, la gente actúa.
Implementación sencilla con Slack
import requests
import os
from datetime import datetime
def send_alert(message: str, severity: str = "warning"):
"""Envía una alerta a Slack cuando una validación falla."""
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if not webhook_url:
print(f"[ALERT] {severity.upper()}: {message}")
return
emoji = "🔴" if severity == "critical" else "🟡"
payload = {
"text": (
f"{emoji} *Pipeline Alert [{severity.upper()}]*\n"
f"_{datetime.now().strftime('%Y-%m-%d %H:%M')}_\n\n"
f"{message}"
)
}
try:
requests.post(webhook_url, json=payload, timeout=10)
except requests.RequestException as e:
print(f"[ALERT] Error enviando alerta: {e}")Integrar alertas en el validador
def validate_orders_with_alerts(df: pd.DataFrame):
"""Validaciones con alertas automáticas."""
try:
validate_not_empty(df, source_name="orders")
except ValueError as e:
send_alert(str(e), severity="critical")
raise
try:
validate_no_nulls(df, required_columns=[
'order_id', 'customer_id', 'total_amount'
])
except ValueError as e:
send_alert(str(e), severity="warning")
# Decidir si continuar o parar
raise
try:
validate_no_duplicates(df, key_columns=['order_id'])
except ValueError as e:
send_alert(str(e), severity="warning")
# Los duplicados se pueden limpiar, no paramos
print("[QUALITY] Continuando después de limpiar duplicados...")
print("[QUALITY] Validación completa.")Cuándo escalar: de scripts a herramientas
Las validaciones manuales con Python son suficientes para la mayoría de pipelines pequeños. Pero hay un punto donde necesitas algo más:
| Situación | Lo que uso |
|---|---|
| Un pipeline, un desarrollador | Funciones Python + asserts |
| Varios pipelines, mismo equipo | Pandera + alertas centralizadas |
| Múltiples equipos consumiendo datos | Great Expectations o Soda |
| Data contracts entre equipos | Schemas versionados + CI/CD |
Mi consejo: empieza con lo mínimo. Un assert len(df) > 0 es infinitamente mejor que nada. Puedes evolucionar hacia Pandera o Great Expectations cuando el dolor lo justifique, no antes.
Lo que he aprendido a base de errores
Cada vez que un pipeline me ha dado problemas serios, la causa raíz ha sido la misma: datos malos que entraron sin control. No un bug en el código, no un fallo de infraestructura. Datos que no deberían haber pasado.
Las validaciones que he descrito aquí no son sofisticadas. Son las que pondrías en cualquier API para no aceptar basura: campos obligatorios, tipos correctos, rangos razonables. La diferencia es que en pipelines de datos mucha gente se olvida de ponerlas, porque el pipeline “funciona” aunque los datos sean incorrectos.
La pregunta no es si tus datos van a tener problemas de calidad. Es cuánto vas a tardar en enterarte.
Ponle validaciones a tu pipeline. Las más simples. Hoy. Tu yo del futuro te lo agradecerá cuando alguien pregunte por qué las métricas no cuadran y puedas responder en cinco minutos en vez de en dos días.


