Worker pools en Go: procesar tareas concurrentes de forma controlada

Cómo construir un worker pool en Go con channels, context, control de errores y cierre ordenado. Concurrencia con límites.

Cover for Worker pools en Go: procesar tareas concurrentes de forma controlada

Lanzar 10.000 goroutines es fácil. Controlarlas es el problema real de ingeniería.

Go te da goroutines baratas y channels para comunicarlas. Pero “barato” no significa “gratis”, y “fácil de crear” no equivale a “seguro de operar”. En producción, un go func() descontrolado por cada petición entrante es una forma elegante de tirar abajo tu propio servicio. Sin límite de concurrencia, sin control de errores, sin manera limpia de parar: eso no es arquitectura, es una bomba de relojería.

Un worker pool resuelve exactamente esto. Define un número fijo de goroutines que consumen tareas de un channel, las procesan y devuelven resultados. Nada más. Nada menos. Es el patrón más pragmático que existe para concurrencia controlada en Go.


Por qué necesitas worker pools

Si tu servicio recibe 500 peticiones por segundo y por cada una lanzas una goroutine que hace una llamada HTTP externa, vas a tener 500 conexiones simultáneas abiertas. Si el servicio externo tarda más de lo habitual, se acumulan. 1.000. 5.000. 20.000. Tu servicio consume toda la memoria disponible, el kernel empieza a rechazar conexiones, y el on-call recibe una alerta a las 3 de la madrugada.

Un worker pool limita la concurrencia a un número controlado. Si tienes 20 workers, nunca habrá más de 20 tareas ejecutándose a la vez. Las demás esperan en el channel. Es así de simple.

Los casos de uso más habituales:

  • Rate limiting hacia servicios externos: APIs con límites de peticiones por segundo.
  • Control de recursos: conexiones a base de datos, descriptores de fichero, memoria.
  • Procesamiento batch: miles de registros que hay que procesar en paralelo pero sin ahogar al sistema.
  • Pipelines de datos: leer de Kafka, procesar y escribir el resultado, todo con backpressure natural.

El channel actúa como cola de trabajo con backpressure incorporado. Si los workers no dan abasto, el channel se llena y el productor se bloquea hasta que hay hueco. No necesitas implementar nada extra: la mecánica de channels ya te lo da.


El patrón básico: jobs, workers, results

La estructura fundamental de un worker pool en Go tiene tres componentes:

  1. Un channel de jobs por donde entran las tareas.
  2. N goroutines (workers) que leen del channel de jobs, procesan la tarea y envían el resultado.
  3. Un channel de results por donde salen los resultados.
package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID    int
    Input string
}

type Result struct {
    JobID  int
    Output string
    Err    error
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // Simular trabajo
        time.Sleep(100 * time.Millisecond)
        results <- Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("worker %d procesó: %s", id, job.Input),
        }
    }
}

func main() {
    const numWorkers = 5
    const numJobs = 20

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    var wg sync.WaitGroup

    // Lanzar workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // Enviar jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Input: fmt.Sprintf("tarea-%d", j)}
    }
    close(jobs)

    // Esperar a que terminen y cerrar results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Recoger resultados
    for r := range results {
        if r.Err != nil {
            fmt.Printf("Job %d falló: %v\n", r.JobID, r.Err)
        } else {
            fmt.Printf("Job %d: %s\n", r.JobID, r.Output)
        }
    }
}

Fíjate en los tipos de los channels en la firma de worker: jobs <-chan Job es de solo lectura y results chan<- Result es de solo escritura. Esto no es decoración. El compilador te impide hacer operaciones incorrectas sobre el channel. Si un worker intenta cerrar jobs, no compila. Si intenta leer de results, tampoco. Usarlo así es una buena práctica que evita bugs sutiles.

El sync.WaitGroup coordina el cierre. Cuando todos los workers terminan (porque jobs se cerró y vaciaron el channel), el wg.Wait() se desbloquea y cierra results. Entonces el range results en main termina de forma natural.


Implementación paso a paso

Vamos a construir un worker pool más robusto, paso a paso. Partimos de algo real: un servicio que necesita validar URLs en paralelo.

Paso 1: definir los tipos

type URLJob struct {
    ID  int
    URL string
}

type URLResult struct {
    JobID      int
    URL        string
    StatusCode int
    Duration   time.Duration
    Err        error
}

Siempre incluye un campo Err en el resultado. Los workers van a fallar, y el resultado debe transportar ese error de vuelta al consumidor sin romper el flujo.

Paso 2: el worker

func urlWorker(id int, jobs <-chan URLJob, results chan<- URLResult, wg *sync.WaitGroup) {
    defer wg.Done()
    client := &http.Client{Timeout: 5 * time.Second}

    for job := range jobs {
        start := time.Now()
        resp, err := client.Get(job.URL)
        duration := time.Since(start)

        result := URLResult{
            JobID:    job.ID,
            URL:      job.URL,
            Duration: duration,
        }

        if err != nil {
            result.Err = err
        } else {
            result.StatusCode = resp.StatusCode
            resp.Body.Close()
        }

        results <- result
    }
}

Cada worker tiene su propio http.Client con timeout. No comparten cliente porque no es necesario: http.Client es seguro para uso concurrente, pero cada worker teniendo el suyo simplifica el razonamiento. El resp.Body.Close() dentro del else es obligatorio: si no cierras el body, estás filtrando conexiones TCP.

Paso 3: orquestar

func validateURLs(urls []string, concurrency int) []URLResult {
    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    // Lanzar workers
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorker(i, jobs, results, &wg)
    }

    // Enviar jobs
    for i, url := range urls {
        jobs <- URLJob{ID: i, URL: url}
    }
    close(jobs)

    // Cerrar results cuando todos los workers terminen
    go func() {
        wg.Wait()
        close(results)
    }()

    // Recoger resultados
    var allResults []URLResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

El buffer del channel de jobs tiene tamaño len(urls) porque enviamos todas las tareas de golpe. En un escenario real donde los jobs llegan de forma continua (un stream de Kafka, un endpoint HTTP), usarías un buffer más pequeño para que el backpressure funcione correctamente.


Cancelación con context

El ejemplo anterior funciona, pero no se puede cancelar. Si necesitas parar el pool porque el usuario canceló la petición, porque excediste un timeout global, o porque recibiste una señal del sistema operativo, necesitas context.

func urlWorkerWithCtx(
    ctx context.Context,
    id int,
    jobs <-chan URLJob,
    results chan<- URLResult,
    wg *sync.WaitGroup,
) {
    defer wg.Done()
    client := &http.Client{Timeout: 5 * time.Second}

    for job := range jobs {
        // Comprobar cancelación antes de procesar
        select {
        case <-ctx.Done():
            return
        default:
        }

        start := time.Now()
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, job.URL, nil)
        if err != nil {
            results <- URLResult{JobID: job.ID, URL: job.URL, Err: err}
            continue
        }

        resp, err := client.Do(req)
        duration := time.Since(start)

        result := URLResult{
            JobID:    job.ID,
            URL:      job.URL,
            Duration: duration,
        }

        if err != nil {
            result.Err = err
        } else {
            result.StatusCode = resp.StatusCode
            resp.Body.Close()
        }

        results <- result
    }
}

Hay dos niveles de cancelación aquí:

  1. El select al inicio del loop: si el context se canceló entre job y job, el worker sale inmediatamente sin procesar más tareas.
  2. http.NewRequestWithContext: si el context se cancela durante la petición HTTP, esta se aborta inmediatamente.

Para el orquestador, el cambio es mínimo:

func validateURLsWithTimeout(urls []string, concurrency int, timeout time.Duration) []URLResult {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorkerWithCtx(ctx, i, jobs, results, &wg)
    }

    // Enviar jobs respetando cancelación
    go func() {
        defer close(jobs)
        for i, url := range urls {
            select {
            case jobs <- URLJob{ID: i, URL: url}:
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    var allResults []URLResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

El envío de jobs ahora también usa select con el context. Si el timeout se cumple a mitad del envío, dejamos de enviar. Sin esto, el productor podría quedarse bloqueado intentando escribir en un channel que nadie lee (porque los workers ya salieron).


Manejo de errores en workers

Hay varias estrategias para errores en workers. La que elijas depende de tu caso de uso.

Estrategia 1: error por resultado (la más común)

Cada resultado lleva su propio error. El consumidor decide qué hacer con cada fallo individualmente.

for r := range results {
    if r.Err != nil {
        log.Printf("Job %d falló: %v", r.JobID, r.Err)
        continue
    }
    procesarResultado(r)
}

Es la más flexible. Si 3 de 100 jobs fallan, procesas los 97 que fueron bien y logueas los 3 que fallaron. Ideal para procesamiento batch donde los fallos individuales no invalidan el lote completo.

Estrategia 2: fallar rápido (cancel on first error)

Si un solo error invalida toda la operación, cancela el context al primer fallo:

func validateURLsFailFast(urls []string, concurrency int) ([]URLResult, error) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorkerWithCtx(ctx, i, jobs, results, &wg)
    }

    go func() {
        defer close(jobs)
        for i, url := range urls {
            select {
            case jobs <- URLJob{ID: i, URL: url}:
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    var allResults []URLResult
    for r := range results {
        if r.Err != nil {
            cancel() // Cancela todos los workers
            return allResults, fmt.Errorf("job %d falló: %w", r.JobID, r.Err)
        }
        allResults = append(allResults, r)
    }

    return allResults, nil
}

Al llamar a cancel(), los workers que están procesando abortan su petición HTTP actual (gracias a NewRequestWithContext), y los que están esperando jobs salen al comprobar ctx.Done().

Estrategia 3: acumular errores

Recoger todos los errores y devolverlos agrupados:

var errs []error
for r := range results {
    if r.Err != nil {
        errs = append(errs, fmt.Errorf("job %d: %w", r.JobID, r.Err))
    }
    allResults = append(allResults, r)
}

if len(errs) > 0 {
    return allResults, errors.Join(errs...)
}

errors.Join (Go 1.20+) combina múltiples errores en uno solo que se puede inspeccionar con errors.Is y errors.As. Útil cuando necesitas reportar todos los fallos al caller pero no quieres parar a la primera.


Cierre ordenado: cerrar channels correctamente

El cierre mal gestionado es la fuente número uno de deadlocks y panics en worker pools. Las reglas son simples pero inflexibles:

  1. Solo el productor cierra el channel de jobs. Los workers nunca cierran jobs.
  2. Solo la goroutine que sabe que todos los workers terminaron cierra el channel de results. Esto suele ser una goroutine que hace wg.Wait().
  3. Nunca envíes a un channel cerrado. Provoca panic.
  4. Nunca cierres un channel más de una vez. También provoca panic.

El patrón correcto que hemos visto:

// 1. Productor envía y cierra jobs
go func() {
    defer close(jobs) // Se cierra cuando el productor termina
    for _, item := range items {
        select {
        case jobs <- item:
        case <-ctx.Done():
            return
        }
    }
}()

// 2. Goroutine dedicada espera y cierra results
go func() {
    wg.Wait()        // Espera a todos los workers
    close(results)   // Solo entonces cierra results
}()

// 3. Main recoge resultados
for r := range results {
    // ...
}

¿Qué pasa si no sigues este orden? Un escenario típico de deadlock:

// MAL: deadlock si results no tiene buffer suficiente
for j := range jobList {
    jobs <- j
}
close(jobs)

// Esto nunca se ejecuta si los workers llenaron results
// y están bloqueados esperando a que alguien lea
wg.Wait()
close(results)

El problema: el productor envía todos los jobs de forma síncrona. Si el channel de results se llena y nadie lo está leyendo todavía, los workers se bloquean al intentar enviar el resultado. Y como el productor no puede avanzar hasta enviar todo, y nadie lee results hasta que el productor cierra jobs… deadlock.

La solución es siempre enviar los jobs en una goroutine separada, como en los ejemplos anteriores.


Escalado dinámico de workers

A veces un número fijo de workers no es suficiente. Puede que quieras escalar según la carga: más workers cuando hay muchos jobs pendientes, menos cuando la cola está vacía.

type Pool struct {
    jobs       chan Job
    results    chan Result
    maxWorkers int
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewPool(maxWorkers, jobBuffer int) *Pool {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pool{
        jobs:       make(chan Job, jobBuffer),
        results:    make(chan Result, jobBuffer),
        maxWorkers: maxWorkers,
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (p *Pool) Start(numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        p.addWorker()
    }
}

func (p *Pool) addWorker() {
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        for {
            select {
            case <-p.ctx.Done():
                return
            case job, ok := <-p.jobs:
                if !ok {
                    return
                }
                result := process(job)
                select {
                case p.results <- result:
                case <-p.ctx.Done():
                    return
                }
            }
        }
    }()
}

func (p *Pool) ScaleUp(n int) {
    for i := 0; i < n; i++ {
        p.addWorker()
    }
}

func (p *Pool) Submit(job Job) {
    select {
    case p.jobs <- job:
    case <-p.ctx.Done():
    }
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

func (p *Pool) Stop() {
    p.cancel()
    p.wg.Wait()
    close(p.results)
}

Shutdown es un cierre ordenado: deja que los workers terminen los jobs que ya tienen. Stop es un cierre inmediato: cancela el context y los workers salen en cuanto puedan.

Escalar hacia abajo es más complicado. No puedes matar una goroutine desde fuera en Go: tiene que decidir salir ella misma. Una opción es usar un channel de “quit” individual por worker, o simplemente aceptar que reducir workers significa no lanzar nuevos cuando los actuales terminen sus tareas.

En la práctica, el escalado dinámico rara vez es necesario. Un pool con un número fijo de workers bien dimensionado cubre el 90% de los casos. Solo lo necesitas si la carga es muy variable y el coste de tener workers ociosos es significativo (por ejemplo, si cada worker mantiene una conexión costosa abierta).


Patrón semáforo como alternativa

Si lo único que necesitas es limitar el número de goroutines simultáneas y no necesitas channels de results ni la estructura completa de un worker pool, el paquete golang.org/x/sync/semaphore ofrece una solución más ligera:

import "golang.org/x/sync/semaphore"

func processAll(ctx context.Context, items []string, maxConcurrency int64) error {
    sem := semaphore.NewWeighted(maxConcurrency)
    var wg sync.WaitGroup
    var mu sync.Mutex
    var firstErr error

    for _, item := range items {
        if err := sem.Acquire(ctx, 1); err != nil {
            return err // Context cancelado
        }

        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            defer sem.Release(1)

            if err := processItem(ctx, item); err != nil {
                mu.Lock()
                if firstErr == nil {
                    firstErr = err
                }
                mu.Unlock()
            }
        }(item)
    }

    wg.Wait()
    return firstErr
}

semaphore.Acquire bloquea cuando ya hay maxConcurrency goroutines ejecutándose. Es como un worker pool implícito: en lugar de pre-crear N workers que leen de un channel, creas goroutines bajo demanda pero nunca más de N a la vez.

La ventaja: menos boilerplate. La desventaja: pierdes la estructura clara de jobs/results. Para tareas fire-and-forget o cuando solo te importa si hubo error (no recoger resultados individuales), el semáforo es suficiente.

También puedes simular un semáforo con un channel sin necesidad de dependencias externas:

func processAllWithChan(ctx context.Context, items []string, maxConcurrency int) error {
    sem := make(chan struct{}, maxConcurrency)
    var wg sync.WaitGroup
    errCh := make(chan error, 1)

    for _, item := range items {
        select {
        case sem <- struct{}{}:
        case <-ctx.Done():
            break
        }

        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            defer func() { <-sem }()

            if err := processItem(ctx, item); err != nil {
                select {
                case errCh <- err:
                default:
                }
            }
        }(item)
    }

    wg.Wait()

    select {
    case err := <-errCh:
        return err
    default:
        return nil
    }
}

Un channel de struct{} con buffer N funciona como semáforo. Enviar al channel “adquiere” un slot, recibir lo “libera”. Cero dependencias extra.


errgroup: la opción simple para muchos casos

Si estás pensando “esto del worker pool tiene mucho boilerplate para lo que necesito”, probablemente tengas razón. Para muchos escenarios, golang.org/x/sync/errgroup hace exactamente lo que necesitas con una fracción del código:

import "golang.org/x/sync/errgroup"

func processAllSimple(ctx context.Context, items []string) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(10) // Máximo 10 goroutines simultáneas

    for _, item := range items {
        item := item // Go < 1.22
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

errgroup.SetLimit(10) hace que g.Go() bloquee cuando ya hay 10 goroutines ejecutándose. Es un worker pool con semáforo integrado, cancelación automática al primer error, y Wait() que devuelve el primer error encontrado. Exactamente lo mismo que construimos a mano en secciones anteriores, pero en 10 líneas.

errgroup.WithContext cancela el context derivado cuando cualquier función devuelve un error. Esto significa que el resto de goroutines pueden detectar la cancelación a través de ctx.Done().

Cuándo errgroup es suficiente

  • Procesar una lista finita de tareas en paralelo con límite de concurrencia.
  • Cuando solo necesitas saber si hubo error, no recoger resultados individuales.
  • Fan-out simple: lanzar N operaciones paralelas y esperar a que todas terminen.

Cuándo necesitas un worker pool real

  • Los jobs llegan de forma continua (no una lista finita).
  • Necesitas un channel de resultados para procesar outputs individualmente.
  • Necesitas escalado dinámico de workers.
  • Necesitas lógica de retry por worker.
  • Necesitas métricas por worker (jobs procesados, latencia, errores).

Ejemplo real: procesar webhooks de una API en paralelo

Vamos a juntar todo en un ejemplo más cercano a producción. Un servicio que recibe webhooks, los encola y los procesa con un pool de workers. Cada webhook requiere hacer una llamada a una API externa y persistir el resultado.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

type Webhook struct {
    ID        string          `json:"id"`
    EventType string          `json:"event_type"`
    Payload   json.RawMessage `json:"payload"`
    Received  time.Time
}

type ProcessResult struct {
    WebhookID string
    Success   bool
    Duration  time.Duration
    Err       error
}

type WebhookProcessor struct {
    jobs       chan Webhook
    results    chan ProcessResult
    numWorkers int
    client     *http.Client
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewWebhookProcessor(numWorkers, queueSize int) *WebhookProcessor {
    ctx, cancel := context.WithCancel(context.Background())
    return &WebhookProcessor{
        jobs:       make(chan Webhook, queueSize),
        results:    make(chan ProcessResult, queueSize),
        numWorkers: numWorkers,
        client:     &http.Client{Timeout: 10 * time.Second},
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (wp *WebhookProcessor) Start() {
    // Lanzar workers
    for i := 0; i < wp.numWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }

    // Goroutine para cerrar results cuando todos los workers terminan
    go func() {
        wp.wg.Wait()
        close(wp.results)
    }()

    // Goroutine para procesar resultados
    go wp.collectResults()
}

func (wp *WebhookProcessor) worker(id int) {
    defer wp.wg.Done()
    log.Printf("Worker %d iniciado", id)

    for {
        select {
        case <-wp.ctx.Done():
            log.Printf("Worker %d: context cancelado, saliendo", id)
            return
        case webhook, ok := <-wp.jobs:
            if !ok {
                log.Printf("Worker %d: channel cerrado, saliendo", id)
                return
            }
            result := wp.processWebhook(webhook)
            select {
            case wp.results <- result:
            case <-wp.ctx.Done():
                return
            }
        }
    }
}

func (wp *WebhookProcessor) processWebhook(wh Webhook) ProcessResult {
    start := time.Now()

    // Aquí iría la lógica real: llamar a una API, escribir en DB, etc.
    // Simulamos con un sleep
    time.Sleep(50 * time.Millisecond)

    // Ejemplo: si el tipo de evento no es válido, devolver error
    validEvents := map[string]bool{
        "order.created":  true,
        "order.updated":  true,
        "payment.success": true,
        "payment.failed":  true,
    }

    if !validEvents[wh.EventType] {
        return ProcessResult{
            WebhookID: wh.ID,
            Success:   false,
            Duration:  time.Since(start),
            Err:       fmt.Errorf("evento desconocido: %s", wh.EventType),
        }
    }

    return ProcessResult{
        WebhookID: wh.ID,
        Success:   true,
        Duration:  time.Since(start),
    }
}

func (wp *WebhookProcessor) collectResults() {
    var processed, failed int
    for r := range wp.results {
        if r.Err != nil {
            failed++
            log.Printf("Webhook %s falló (%v): %v", r.WebhookID, r.Duration, r.Err)
        } else {
            processed++
            log.Printf("Webhook %s procesado (%v)", r.WebhookID, r.Duration)
        }
    }
    log.Printf("Resumen: %d procesados, %d fallidos", processed, failed)
}

func (wp *WebhookProcessor) Enqueue(wh Webhook) error {
    select {
    case wp.jobs <- wh:
        return nil
    case <-wp.ctx.Done():
        return fmt.Errorf("procesador detenido")
    default:
        return fmt.Errorf("cola llena, descartando webhook %s", wh.ID)
    }
}

func (wp *WebhookProcessor) Shutdown(timeout time.Duration) {
    log.Println("Iniciando cierre ordenado...")

    // 1. Dejar de aceptar nuevos jobs
    close(wp.jobs)

    // 2. Esperar a que los workers terminen con timeout
    done := make(chan struct{})
    go func() {
        wp.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Println("Todos los workers terminaron limpiamente")
    case <-time.After(timeout):
        log.Println("Timeout alcanzado, forzando cierre")
        wp.cancel()
        <-done // Esperar a que los workers salgan tras la cancelación
    }
}

Puntos clave de este ejemplo:

  • Enqueue con default: si la cola está llena, rechaza el webhook inmediatamente en lugar de bloquear. En un handler HTTP, bloquear significaría que la petición se queda colgada.
  • Shutdown con timeout: cierra jobs y espera un tiempo razonable. Si los workers no terminan, cancela el context y fuerza la salida.
  • collectResults como goroutine separada: desacopla el procesamiento de resultados de los workers. Podrías sustituir los log.Printf por métricas, alertas, o escritura a base de datos.
  • El worker usa select doble: tanto para leer de jobs como para enviar a results, siempre con ctx.Done() como alternativa. Si no pones el select al enviar a results, un worker podría quedarse bloqueado para siempre si nadie lee resultados y el context se cancela.

Para usarlo en un servicio HTTP real:

func main() {
    processor := NewWebhookProcessor(10, 100) // 10 workers, cola de 100
    processor.Start()

    http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
        var wh Webhook
        if err := json.NewDecoder(r.Body).Decode(&wh); err != nil {
            http.Error(w, "payload inválido", http.StatusBadRequest)
            return
        }
        wh.Received = time.Now()

        if err := processor.Enqueue(wh); err != nil {
            http.Error(w, err.Error(), http.StatusServiceUnavailable)
            return
        }

        w.WriteHeader(http.StatusAccepted)
    })

    // Escuchar señales del sistema para cierre ordenado
    // (en producción usarías signal.NotifyContext)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Cuándo los worker pools son excesivos

No todo necesita un worker pool. A veces la complejidad no se justifica.

No lo necesitas si:

  • Solo tienes que hacer 3-5 operaciones en paralelo. Un errgroup o incluso goroutines sueltas con un WaitGroup son suficientes.
  • El procesamiento es tan rápido que la concurrencia no aporta nada. Si cada tarea tarda 1ms, probablemente vayas más rápido en secuencial (el overhead de channels y goroutines no es cero).
  • Solo tienes un productor y un consumidor. Un channel directo entre ambos ya es un pipeline perfectamente válido.

Lo necesitas si:

  • La carga es alta y continua, y necesitas controlar cuántos recursos consume.
  • Necesitas backpressure: que el productor se ralentice cuando los workers no dan abasto.
  • Necesitas métricas y observabilidad por worker.
  • La tarea implica llamadas de red, I/O de disco o acceso a recursos compartidos que se degradan bajo concurrencia excesiva.

La decisión es pragmática. Si un errgroup.SetLimit(10) resuelve tu problema, úsalo. Si necesitas cierre ordenado, escalado, métricas y reintentos, construye el pool. Lo que no tiene sentido es lanzar goroutines sin control y esperar que todo salga bien.

La concurrencia en Go es una herramienta poderosa, pero el poder sin control no es ingeniería. Es suerte.

OshyTech

Ingeniería backend y de datos orientada a sistemas escalables, automatización e IA.

Navegación

Copyright 2026 OshyTech. Todos los derechos reservados