Data quality en pipelines petits: validacions que valen la pena des del dia u

Validacions senzilles per evitar dades escombraries en pipelines reals: nuls, duplicats, tipus i alertes amb Python.

Cover for Data quality en pipelines petits: validacions que valen la pena des del dia u

Fa poc vaig revisar un pipeline que portava mesos funcionant “sense problemes”. Cada dia ingeria dades d’una API, les transformava i les carregava en una base de dades que alimentava un dashboard. Ningú es queixava. Fins que algú va fer una pregunta incòmoda: “Per què el revenue d’abril és un 20% més baix que el de l’any passat?”. Després de dos dies investigant, va resultar que l’API havia començat a retornar camps buits en un percentatge de registres i el pipeline els estava carregant alegrament com a nuls. Les mètriques es calculaven sobre dades incompletes. Ningú se’n va assabentar perquè no hi havia ni una sola validació.

Això no va passar en un sistema legacy d’una gran empresa. Va passar en un pipeline petit, mantingut per una persona, que feia exactament el que se li havia demanat: moure dades d’A a B. El problema no era el codi. Era l’absència total de controls sobre la qualitat del que es movia.


Data quality no és un producte enterprise

Quan sents “data quality” probablement penses en eines cares, equips dedicats i frameworks amb dashboards impressionants. I sí, això existeix. Però la realitat és que el 80% dels problemes de qualitat de dades es detecten amb validacions que pots escriure en 20 línies de Python.

La validació de dades no és un projecte. És un hàbit. I com tots els hàbits, costa més començar que mantenir-lo.

No necessites Great Expectations configurat amb un deployment complet per saber si les teves dades tenen nuls on no n’haurien de tenir. No necessites una data observability platform per detectar que l’API t’ha retornat zero registres quan normalment en retorna milers. Necessites asserts, ifs i un canal de Slack.


Les cinc validacions mínimes que sempre hauries de tenir

Després de trencar-me el cap amb pipelines que semblaven funcionar i no ho feien, vaig arribar a una llista de validacions que poso a tots els meus pipelines des del dia u. Són cinc i no porta més d’una hora implementar-les.

1. El pipeline ha rebut dades

Sembla obvi, però no ho és. APIs que retornen respostes buides, queries que no troben files noves, arxius CSV que arriben buits. Si el teu pipeline processa zero registres i no avisa, tens un problema silenciós.

def validate_not_empty(df: pd.DataFrame, source_name: str):
    """Valida que el DataFrame no esté vacío."""
    if df.empty:
        raise ValueError(
            f"[QUALITY] {source_name}: se recibieron 0 registros. "
            f"Esto no es normal. Revisa la fuente."
        )
    print(f"[QUALITY] {source_name}: {len(df)} registros recibidos. OK.")

2. Els camps obligatoris no són nuls

Cada dataset té camps que mai haurien de ser nuls. Una comanda sense order_id, un usuari sense email, una transacció sense amount. Si algun d’aquests arriba nul, o la font té un problema o la teva extracció s’ha trencat.

def validate_no_nulls(df: pd.DataFrame, required_columns: list[str]):
    """Valida que las columnas obligatorias no tengan nulos."""
    for col in required_columns:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            null_pct = (null_count / len(df)) * 100
            raise ValueError(
                f"[QUALITY] Columna '{col}': {null_count} nulos "
                f"({null_pct:.1f}%). No debería tener nulos."
            )
    print(f"[QUALITY] Campos obligatorios sin nulos. OK.")

3. No hi ha duplicats on no n’hi hauria d’haver

Els duplicats són les escombraries més comunes en pipelines. De vegades l’API envia el mateix registre dues vegades. De vegades el teu pipeline s’executa dues vegades per un error de programació. De vegades la font té un bug. Si no ho verifiques, les teves mètriques s’inflen.

def validate_no_duplicates(df: pd.DataFrame, key_columns: list[str]):
    """Valida que no haya filas duplicadas por las columnas clave."""
    duplicates = df.duplicated(subset=key_columns, keep=False)
    dup_count = duplicates.sum()

    if dup_count > 0:
        sample = df[duplicates].head(3)[key_columns]
        raise ValueError(
            f"[QUALITY] {dup_count} filas duplicadas por {key_columns}.\n"
            f"Ejemplo:\n{sample.to_string()}"
        )
    print(f"[QUALITY] Sin duplicados por {key_columns}. OK.")

4. Els tipus de dades són els esperats

Un camp total que arriba com a string en lloc de numèric. Un created_at que de sobte té format americà en lloc d’ISO. Un camp booleà que ara inclou “yes”, “no”, “maybe”. Si no valides tipus, les transformacions posteriors fallen de formes impredictibles.

def validate_types(df: pd.DataFrame, expected_types: dict):
    """
    Valida que los tipos de las columnas sean los esperados.
    expected_types: {'total_amount': 'float64', 'order_id': 'object'}
    """
    errors = []
    for col, expected in expected_types.items():
        actual = str(df[col].dtype)
        if actual != expected:
            errors.append(
                f"  - '{col}': esperado {expected}, recibido {actual}"
            )

    if errors:
        raise TypeError(
            f"[QUALITY] Tipos incorrectos:\n" + "\n".join(errors)
        )
    print(f"[QUALITY] Tipos de datos correctos. OK.")

5. Els valors estan dins de rangs raonables

Una comanda amb import negatiu. Una data de l’any 1900. Un percentatge de 350%. Aquests valors tècnicament no són nuls ni duplicats, però són escombraries. Les validacions de rang són l’últim filtre abans que una dada absurda arribi a producció.

def validate_ranges(df: pd.DataFrame, range_rules: dict):
    """
    Valida que los valores estén dentro de rangos esperados.
    range_rules: {'total_amount': {'min': 0, 'max': 100000}}
    """
    for col, rules in range_rules.items():
        if 'min' in rules:
            violations = (df[col] < rules['min']).sum()
            if violations > 0:
                raise ValueError(
                    f"[QUALITY] '{col}': {violations} valores por debajo "
                    f"del mínimo ({rules['min']})"
                )
        if 'max' in rules:
            violations = (df[col] > rules['max']).sum()
            if violations > 0:
                raise ValueError(
                    f"[QUALITY] '{col}': {violations} valores por encima "
                    f"del máximo ({rules['max']})"
                )
    print(f"[QUALITY] Rangos dentro de los esperados. OK.")

Ajuntant-ho tot: un validador complet

A la pràctica, aquestes validacions s’executen com un pas del pipeline, entre la ingestió i la transformació. Si alguna falla, el pipeline s’atura i algú rep una alerta.

import pandas as pd

def validate_orders(df: pd.DataFrame):
    """Ejecuta todas las validaciones sobre los pedidos ingestados."""
    # 1. No vacío
    validate_not_empty(df, source_name="orders")

    # 2. Campos obligatorios sin nulos
    validate_no_nulls(df, required_columns=[
        'order_id', 'customer_id', 'total_amount', 'status'
    ])

    # 3. Sin duplicados por order_id
    validate_no_duplicates(df, key_columns=['order_id'])

    # 4. Tipos correctos
    validate_types(df, expected_types={
        'order_id': 'object',
        'customer_id': 'object',
        'total_amount': 'float64',
    })

    # 5. Rangos razonables
    validate_ranges(df, range_rules={
        'total_amount': {'min': 0, 'max': 50000},
    })

    print(f"[QUALITY] Todas las validaciones pasaron. Pipeline continúa.")

Nuls: l’enemic que sempre torna

Els nuls mereixen un tractament a part perquè no tots són iguals. Hi ha nuls que signifiquen “no es va proporcionar la dada”, nuls que signifiquen “l’API ha fallat”, i nuls que signifiquen “aquest camp no aplica”. Tractar-los tots igual és un error.

Estratègia per gestionar nuls

Tipus de nulExempleAcció
Camp obligatori nulorder_id buitRebutjar el registre
Camp opcional nulphone_number buitAcceptar, omplir amb valor per defecte o marcar
Nul per fallada de la fontTota una columna nul·laAlertar i aturar el pipeline
Nul per dissenydiscount_code quan no hi ha descompteAcceptar tal com està
def handle_nulls(df: pd.DataFrame) -> pd.DataFrame:
    """Estrategia de nulos para pedidos."""
    # Campos obligatorios: rechazar si son nulos
    critical_nulls = df[['order_id', 'total_amount']].isnull().any(axis=1)
    rejected = df[critical_nulls]
    if len(rejected) > 0:
        print(f"[QUALITY] Rechazados {len(rejected)} registros por nulos críticos")

    df = df[~critical_nulls].copy()

    # Campos opcionales: rellenar con valor por defecto
    df['discount_code'] = df['discount_code'].fillna('NONE')
    df['notes'] = df['notes'].fillna('')

    # Detección de anomalía: columna entera nula
    for col in df.columns:
        if df[col].isnull().all():
            raise ValueError(
                f"[QUALITY] La columna '{col}' está completamente nula. "
                f"Posible fallo en la fuente."
            )

    return df

Duplicats: no sempre és tan simple com drop_duplicates

La deduplicació ingènua és perillosa. Un df.drop_duplicates() sense pensar pot eliminar registres legítims o quedar-se amb la versió equivocada d’una dada.

Preguntes abans de deduplicar

  1. Què defineix un duplicat? No sempre és la fila sencera. De vegades és només una clau de negoci.
  2. Amb quin et quedes? El més recent, el primer, el més complet.
  3. Els “duplicats” són realment duplicats? De vegades són actualitzacions del mateix registre.
def deduplicate_orders(df: pd.DataFrame) -> pd.DataFrame:
    """
    Deduplicación inteligente de pedidos.
    Se queda con el registro más reciente por order_id.
    """
    before = len(df)

    # Ordenar por fecha de ingestión descendente
    df = df.sort_values('_ingested_at', ascending=False)

    # Quedarse con el primer registro (más reciente) de cada order_id
    df = df.drop_duplicates(subset=['order_id'], keep='first')

    after = len(df)
    removed = before - after

    if removed > 0:
        pct = (removed / before) * 100
        print(f"[QUALITY] Deduplicación: {removed} duplicados eliminados ({pct:.1f}%)")

        # Si hay demasiados duplicados, algo raro pasa
        if pct > 10:
            print(
                f"[WARNING] Tasa de duplicados inusualmente alta ({pct:.1f}%). "
                f"Revisar la fuente."
            )

    return df.reset_index(drop=True)

Usant Pandera per a validacions declaratives

Si les validacions manuals se’t queden curtes o vols alguna cosa més estructurada, Pandera és una llibreria que permet definir schemas de validació de forma declarativa. No és enterprise, no requereix infraestructura, i s’integra directament amb pandas.

import pandera as pa
from pandera import Column, Check, DataFrameSchema

order_schema = DataFrameSchema({
    "order_id": Column(
        str,
        Check.str_length(min_value=1),
        nullable=False,
        unique=True
    ),
    "customer_id": Column(
        str,
        nullable=False
    ),
    "total_amount": Column(
        float,
        Check.in_range(min_value=0, max_value=50000),
        nullable=False
    ),
    "status": Column(
        str,
        Check.isin(['pending', 'completed', 'shipped', 'cancelled']),
        nullable=False
    ),
    "order_created_at": Column(
        "datetime64[ns]",
        nullable=False
    ),
})

# Validar un DataFrame
try:
    order_schema.validate(df, lazy=True)
    print("[QUALITY] Schema Pandera validado correctamente.")
except pa.errors.SchemaErrors as err:
    print(f"[QUALITY] Errores de validación:\n{err.failure_cases}")

El que m’agrada de Pandera és que l’schema serveix com a documentació viva. Qualsevol persona que miri el codi sap exactament què esperar d’aquelles dades.


Alertes: de res serveix validar si ningú se n’assabenta

La validació més perfecta del món és inútil si falla i ningú ho sap. El pipeline s’atura, les dades no arriben, i tres dies després algú pregunta per què el dashboard és buit.

Principi bàsic d’alertes en pipelines

No alertis de tot. Alerta del que requereix acció. Si envies 50 alertes diàries, la gent les ignora. Si n’envies una quan de veritat hi ha un problema, la gent actua.

Implementació senzilla amb Slack

import requests
import os
from datetime import datetime

def send_alert(message: str, severity: str = "warning"):
    """Envía una alerta a Slack cuando una validación falla."""
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")

    if not webhook_url:
        print(f"[ALERT] {severity.upper()}: {message}")
        return

    emoji = "🔴" if severity == "critical" else "🟡"
    payload = {
        "text": (
            f"{emoji} *Pipeline Alert [{severity.upper()}]*\n"
            f"_{datetime.now().strftime('%Y-%m-%d %H:%M')}_\n\n"
            f"{message}"
        )
    }

    try:
        requests.post(webhook_url, json=payload, timeout=10)
    except requests.RequestException as e:
        print(f"[ALERT] Error enviando alerta: {e}")

Integrar alertes al validador

def validate_orders_with_alerts(df: pd.DataFrame):
    """Validaciones con alertas automáticas."""
    try:
        validate_not_empty(df, source_name="orders")
    except ValueError as e:
        send_alert(str(e), severity="critical")
        raise

    try:
        validate_no_nulls(df, required_columns=[
            'order_id', 'customer_id', 'total_amount'
        ])
    except ValueError as e:
        send_alert(str(e), severity="warning")
        # Decidir si continuar o parar
        raise

    try:
        validate_no_duplicates(df, key_columns=['order_id'])
    except ValueError as e:
        send_alert(str(e), severity="warning")
        # Los duplicados se pueden limpiar, no paramos
        print("[QUALITY] Continuando después de limpiar duplicados...")

    print("[QUALITY] Validación completa.")

Quan escalar: de scripts a eines

Les validacions manuals amb Python són suficients per a la majoria de pipelines petits. Però hi ha un punt on necessites alguna cosa més:

SituacióEl que faig servir
Un pipeline, un desenvolupadorFuncions Python + asserts
Diversos pipelines, mateix equipPandera + alertes centralitzades
Múltiples equips consumint dadesGreat Expectations o Soda
Data contracts entre equipsSchemas versionats + CI/CD

El meu consell: comença amb el mínim. Un assert len(df) > 0 és infinitament millor que res. Pots evolucionar cap a Pandera o Great Expectations quan el dolor ho justifiqui, no abans.


El que he après a base d’errors

Cada cop que un pipeline m’ha donat problemes seriosos, la causa arrel ha estat la mateixa: dades dolentes que van entrar sense control. No un bug al codi, no una fallada d’infraestructura. Dades que no haurien d’haver passat.

Les validacions que he descrit aquí no són sofisticades. Són les que posaries a qualsevol API per no acceptar escombraries: camps obligatoris, tipus correctes, rangs raonables. La diferència és que en pipelines de dades molta gent s’oblida de posar-les, perquè el pipeline “funciona” encara que les dades siguin incorrectes.

La pregunta no és si les teves dades tindran problemes de qualitat. És quant trigaràs a assabentar-te’n.

Posa-li validacions al teu pipeline. Les més simples. Avui. El teu jo del futur t’ho agrairà quan algú pregunti per què les mètriques no quadren i puguis respondre en cinc minuts en lloc de dos dies.

OshyTech

Enginyeria backend i de dades orientada a sistemes escalables, automatització i IA.

Navegació

Copyright 2026 OshyTech. Tots els drets reservats