Worker pools in Go: processing concurrent tasks in a controlled way

How to build a worker pool in Go with channels, context, error handling and graceful shutdown. Concurrency with limits.

Cover for Worker pools in Go: processing concurrent tasks in a controlled way

Launching 10,000 goroutines is easy. Controlling them is the real engineering problem.

Go gives you cheap goroutines and channels to communicate between them. But “cheap” doesn’t mean “free”, and “easy to create” doesn’t equal “safe to operate”. In production, an uncontrolled go func() for every incoming request is an elegant way to bring down your own service. No concurrency limit, no error handling, no clean way to stop: that’s not architecture, it’s a time bomb.

A worker pool solves exactly this. It defines a fixed number of goroutines that consume tasks from a channel, process them and return results. Nothing more. Nothing less. It’s the most pragmatic pattern for controlled concurrency in Go.


Why you need worker pools

If your service receives 500 requests per second and for each one you launch a goroutine that makes an external HTTP call, you’ll have 500 simultaneous open connections. If the external service takes longer than usual, they pile up. 1,000. 5,000. 20,000. Your service consumes all available memory, the kernel starts rejecting connections, and the on-call gets an alert at 3am.

A worker pool limits concurrency to a controlled number. If you have 20 workers, there will never be more than 20 tasks running at once. The rest wait in the channel. It’s that simple.

The most common use cases:

  • Rate limiting toward external services: APIs with per-second request limits.
  • Resource control: database connections, file descriptors, memory.
  • Batch processing: thousands of records that need to be processed in parallel without overwhelming the system.
  • Data pipelines: reading from Kafka, processing and writing the result, all with natural backpressure.

The channel acts as a work queue with built-in backpressure. If workers can’t keep up, the channel fills up and the producer blocks until there’s room. You don’t need to implement anything extra: the channel mechanics already give you that.


The basic pattern: jobs, workers, results

The fundamental structure of a worker pool in Go has three components:

  1. A jobs channel where tasks come in.
  2. N goroutines (workers) that read from the jobs channel, process the task and send the result.
  3. A results channel where results come out.
package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID    int
    Input string
}

type Result struct {
    JobID  int
    Output string
    Err    error
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // Simulate work
        time.Sleep(100 * time.Millisecond)
        results <- Result{
            JobID:  job.ID,
            Output: fmt.Sprintf("worker %d processed: %s", id, job.Input),
        }
    }
}

func main() {
    const numWorkers = 5
    const numJobs = 20

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)

    var wg sync.WaitGroup

    // Launch workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go worker(i, jobs, results, &wg)
    }

    // Send jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Input: fmt.Sprintf("task-%d", j)}
    }
    close(jobs)

    // Wait for them to finish and close results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    for r := range results {
        if r.Err != nil {
            fmt.Printf("Job %d failed: %v\n", r.JobID, r.Err)
        } else {
            fmt.Printf("Job %d: %s\n", r.JobID, r.Output)
        }
    }
}

Notice the channel types in the worker signature: jobs <-chan Job is read-only and results chan<- Result is write-only. This isn’t decoration. The compiler prevents you from performing incorrect operations on the channel. If a worker tries to close jobs, it won’t compile. If it tries to read from results, it won’t either. Using them this way is a good practice that avoids subtle bugs.

The sync.WaitGroup coordinates the shutdown. When all workers finish (because jobs was closed and they drained the channel), wg.Wait() unblocks and closes results. Then the range results in main terminates naturally.


Step-by-step implementation

Let’s build a more robust worker pool, step by step. We start from something real: a service that needs to validate URLs in parallel.

Step 1: define the types

type URLJob struct {
    ID  int
    URL string
}

type URLResult struct {
    JobID      int
    URL        string
    StatusCode int
    Duration   time.Duration
    Err        error
}

Always include an Err field in the result. Workers are going to fail, and the result must carry that error back to the consumer without breaking the flow.

Step 2: the worker

func urlWorker(id int, jobs <-chan URLJob, results chan<- URLResult, wg *sync.WaitGroup) {
    defer wg.Done()
    client := &http.Client{Timeout: 5 * time.Second}

    for job := range jobs {
        start := time.Now()
        resp, err := client.Get(job.URL)
        duration := time.Since(start)

        result := URLResult{
            JobID:    job.ID,
            URL:      job.URL,
            Duration: duration,
        }

        if err != nil {
            result.Err = err
        } else {
            result.StatusCode = resp.StatusCode
            resp.Body.Close()
        }

        results <- result
    }
}

Each worker has its own http.Client with timeout. They don’t share a client because it’s not necessary — http.Client is safe for concurrent use, but each worker having its own simplifies reasoning. The resp.Body.Close() inside the else is mandatory: if you don’t close the body, you’re leaking TCP connections.

Step 3: orchestrate

func validateURLs(urls []string, concurrency int) []URLResult {
    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    // Launch workers
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorker(i, jobs, results, &wg)
    }

    // Send jobs
    for i, url := range urls {
        jobs <- URLJob{ID: i, URL: url}
    }
    close(jobs)

    // Close results when all workers finish
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    var allResults []URLResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

The jobs channel buffer has size len(urls) because we send all tasks at once. In a real scenario where jobs arrive continuously (a Kafka stream, an HTTP endpoint), you’d use a smaller buffer so backpressure works correctly.


Cancellation with context

The previous example works, but it can’t be cancelled. If you need to stop the pool because the user cancelled the request, because you exceeded a global timeout, or because you received an OS signal, you need context.

func urlWorkerWithCtx(
    ctx context.Context,
    id int,
    jobs <-chan URLJob,
    results chan<- URLResult,
    wg *sync.WaitGroup,
) {
    defer wg.Done()
    client := &http.Client{Timeout: 5 * time.Second}

    for job := range jobs {
        // Check cancellation before processing
        select {
        case <-ctx.Done():
            return
        default:
        }

        start := time.Now()
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, job.URL, nil)
        if err != nil {
            results <- URLResult{JobID: job.ID, URL: job.URL, Err: err}
            continue
        }

        resp, err := client.Do(req)
        duration := time.Since(start)

        result := URLResult{
            JobID:    job.ID,
            URL:      job.URL,
            Duration: duration,
        }

        if err != nil {
            result.Err = err
        } else {
            result.StatusCode = resp.StatusCode
            resp.Body.Close()
        }

        results <- result
    }
}

There are two levels of cancellation here:

  1. The select at the start of the loop: if the context was cancelled between jobs, the worker exits immediately without processing more tasks.
  2. http.NewRequestWithContext: if the context is cancelled during the HTTP request, it’s aborted immediately.

For the orchestrator, the change is minimal:

func validateURLsWithTimeout(urls []string, concurrency int, timeout time.Duration) []URLResult {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorkerWithCtx(ctx, i, jobs, results, &wg)
    }

    // Send jobs respecting cancellation
    go func() {
        defer close(jobs)
        for i, url := range urls {
            select {
            case jobs <- URLJob{ID: i, URL: url}:
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    var allResults []URLResult
    for r := range results {
        allResults = append(allResults, r)
    }

    return allResults
}

Job sending now also uses select with the context. If the timeout expires halfway through sending, we stop sending. Without this, the producer could get stuck trying to write to a channel nobody reads (because the workers already exited).


Error handling in workers

There are several strategies for errors in workers. Which one you choose depends on your use case.

Strategy 1: error per result (the most common)

Each result carries its own error. The consumer decides what to do with each failure individually.

for r := range results {
    if r.Err != nil {
        log.Printf("Job %d failed: %v", r.JobID, r.Err)
        continue
    }
    processResult(r)
}

It’s the most flexible. If 3 of 100 jobs fail, you process the 97 that succeeded and log the 3 that failed. Ideal for batch processing where individual failures don’t invalidate the whole batch.

Strategy 2: fail fast (cancel on first error)

If a single error invalidates the entire operation, cancel the context on the first failure:

func validateURLsFailFast(urls []string, concurrency int) ([]URLResult, error) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    jobs := make(chan URLJob, len(urls))
    results := make(chan URLResult, len(urls))
    var wg sync.WaitGroup

    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go urlWorkerWithCtx(ctx, i, jobs, results, &wg)
    }

    go func() {
        defer close(jobs)
        for i, url := range urls {
            select {
            case jobs <- URLJob{ID: i, URL: url}:
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
        wg.Wait()
        close(results)
    }()

    var allResults []URLResult
    for r := range results {
        if r.Err != nil {
            cancel() // Cancel all workers
            return allResults, fmt.Errorf("job %d failed: %w", r.JobID, r.Err)
        }
        allResults = append(allResults, r)
    }

    return allResults, nil
}

By calling cancel(), workers that are processing abort their current HTTP request (thanks to NewRequestWithContext), and those waiting for jobs exit when they check ctx.Done().

Strategy 3: accumulate errors

Collect all errors and return them grouped:

var errs []error
for r := range results {
    if r.Err != nil {
        errs = append(errs, fmt.Errorf("job %d: %w", r.JobID, r.Err))
    }
    allResults = append(allResults, r)
}

if len(errs) > 0 {
    return allResults, errors.Join(errs...)
}

errors.Join (Go 1.20+) combines multiple errors into a single one that can be inspected with errors.Is and errors.As. Useful when you need to report all failures to the caller but don’t want to stop at the first one.


Graceful shutdown: closing channels correctly

Poorly managed shutdown is the number one source of deadlocks and panics in worker pools. The rules are simple but inflexible:

  1. Only the producer closes the jobs channel. Workers never close jobs.
  2. Only the goroutine that knows all workers have finished closes the results channel. This is usually a goroutine doing wg.Wait().
  3. Never send to a closed channel. It causes a panic.
  4. Never close a channel more than once. Also causes a panic.

The correct pattern we’ve seen:

// 1. Producer sends and closes jobs
go func() {
    defer close(jobs) // Closed when the producer finishes
    for _, item := range items {
        select {
        case jobs <- item:
        case <-ctx.Done():
            return
        }
    }
}()

// 2. Dedicated goroutine waits and closes results
go func() {
    wg.Wait()        // Wait for all workers
    close(results)   // Only then close results
}()

// 3. Main collects results
for r := range results {
    // ...
}

What happens if you don’t follow this order? A typical deadlock scenario:

// BAD: deadlock if results doesn't have enough buffer
for j := range jobList {
    jobs <- j
}
close(jobs)

// This never executes if workers filled results
// and are blocked waiting for someone to read
wg.Wait()
close(results)

The problem: the producer sends all jobs synchronously. If the results channel fills up and nobody is reading it yet, workers block when trying to send the result. And since the producer can’t advance until it sends everything, and nobody reads results until the producer closes jobs… deadlock.

The solution is always to send jobs in a separate goroutine, as in the previous examples.


Dynamic worker scaling

Sometimes a fixed number of workers isn’t enough. You might want to scale based on load: more workers when there are many pending jobs, fewer when the queue is empty.

type Pool struct {
    jobs       chan Job
    results    chan Result
    maxWorkers int
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewPool(maxWorkers, jobBuffer int) *Pool {
    ctx, cancel := context.WithCancel(context.Background())
    return &Pool{
        jobs:       make(chan Job, jobBuffer),
        results:    make(chan Result, jobBuffer),
        maxWorkers: maxWorkers,
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (p *Pool) Start(numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        p.addWorker()
    }
}

func (p *Pool) addWorker() {
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        for {
            select {
            case <-p.ctx.Done():
                return
            case job, ok := <-p.jobs:
                if !ok {
                    return
                }
                result := process(job)
                select {
                case p.results <- result:
                case <-p.ctx.Done():
                    return
                }
            }
        }
    }()
}

func (p *Pool) ScaleUp(n int) {
    for i := 0; i < n; i++ {
        p.addWorker()
    }
}

func (p *Pool) Submit(job Job) {
    select {
    case p.jobs <- job:
    case <-p.ctx.Done():
    }
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

func (p *Pool) Stop() {
    p.cancel()
    p.wg.Wait()
    close(p.results)
}

Shutdown is a graceful shutdown: it lets workers finish the jobs they already have. Stop is an immediate shutdown: it cancels the context and workers exit as soon as they can.

Scaling down is more complex. You can’t kill a goroutine from outside in Go — it has to decide to exit itself. One option is to use an individual “quit” channel per worker, or simply accept that reducing workers means not launching new ones when the current ones finish their tasks.

In practice, dynamic scaling is rarely necessary. A pool with a well-sized fixed number of workers covers 90% of cases. You only need it if the load is highly variable and the cost of having idle workers is significant (for example, if each worker holds an expensive connection open).


Semaphore pattern as an alternative

If all you need is to limit the number of simultaneous goroutines and don’t need results channels or the full worker pool structure, the golang.org/x/sync/semaphore package offers a lighter solution:

import "golang.org/x/sync/semaphore"

func processAll(ctx context.Context, items []string, maxConcurrency int64) error {
    sem := semaphore.NewWeighted(maxConcurrency)
    var wg sync.WaitGroup
    var mu sync.Mutex
    var firstErr error

    for _, item := range items {
        if err := sem.Acquire(ctx, 1); err != nil {
            return err // Context cancelled
        }

        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            defer sem.Release(1)

            if err := processItem(ctx, item); err != nil {
                mu.Lock()
                if firstErr == nil {
                    firstErr = err
                }
                mu.Unlock()
            }
        }(item)
    }

    wg.Wait()
    return firstErr
}

semaphore.Acquire blocks when maxConcurrency goroutines are already running. It’s like an implicit worker pool: instead of pre-creating N workers that read from a channel, you create goroutines on demand but never more than N at once.

The advantage: less boilerplate. The disadvantage: you lose the clear jobs/results structure. For fire-and-forget tasks or when you only care about whether there was an error (not collecting individual results), the semaphore is enough.

You can also simulate a semaphore with a channel without needing external dependencies:

func processAllWithChan(ctx context.Context, items []string, maxConcurrency int) error {
    sem := make(chan struct{}, maxConcurrency)
    var wg sync.WaitGroup
    errCh := make(chan error, 1)

    for _, item := range items {
        select {
        case sem <- struct{}{}:
        case <-ctx.Done():
            break
        }

        wg.Add(1)
        go func(item string) {
            defer wg.Done()
            defer func() { <-sem }()

            if err := processItem(ctx, item); err != nil {
                select {
                case errCh <- err:
                default:
                }
            }
        }(item)
    }

    wg.Wait()

    select {
    case err := <-errCh:
        return err
    default:
        return nil
    }
}

A struct{} channel with buffer N works as a semaphore. Sending to the channel “acquires” a slot, receiving “releases” it. Zero extra dependencies.


errgroup: the simple option for many cases

If you’re thinking “this worker pool has too much boilerplate for what I need”, you’re probably right. For many scenarios, golang.org/x/sync/errgroup does exactly what you need with a fraction of the code:

import "golang.org/x/sync/errgroup"

func processAllSimple(ctx context.Context, items []string) error {
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(10) // Maximum 10 simultaneous goroutines

    for _, item := range items {
        item := item // Go < 1.22
        g.Go(func() error {
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

errgroup.SetLimit(10) makes g.Go() block when 10 goroutines are already running. It’s a worker pool with an integrated semaphore, automatic cancellation on the first error, and Wait() that returns the first error encountered. Exactly the same as what we built by hand in previous sections, but in 10 lines.

errgroup.WithContext cancels the derived context when any function returns an error. This means the rest of the goroutines can detect the cancellation through ctx.Done().

When errgroup is enough

  • Processing a finite list of tasks in parallel with a concurrency limit.
  • When you only need to know if there was an error, not collect individual results.
  • Simple fan-out: launching N parallel operations and waiting for all to finish.

When you need a real worker pool

  • Jobs arrive continuously (not a finite list).
  • You need a results channel to process outputs individually.
  • You need dynamic worker scaling.
  • You need per-worker retry logic.
  • You need per-worker metrics (jobs processed, latency, errors).

Real example: processing webhooks from an API in parallel

Let’s put everything together in an example closer to production. A service that receives webhooks, queues them and processes them with a worker pool. Each webhook requires making a call to an external API and persisting the result.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

type Webhook struct {
    ID        string          `json:"id"`
    EventType string          `json:"event_type"`
    Payload   json.RawMessage `json:"payload"`
    Received  time.Time
}

type ProcessResult struct {
    WebhookID string
    Success   bool
    Duration  time.Duration
    Err       error
}

type WebhookProcessor struct {
    jobs       chan Webhook
    results    chan ProcessResult
    numWorkers int
    client     *http.Client
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewWebhookProcessor(numWorkers, queueSize int) *WebhookProcessor {
    ctx, cancel := context.WithCancel(context.Background())
    return &WebhookProcessor{
        jobs:       make(chan Webhook, queueSize),
        results:    make(chan ProcessResult, queueSize),
        numWorkers: numWorkers,
        client:     &http.Client{Timeout: 10 * time.Second},
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (wp *WebhookProcessor) Start() {
    // Launch workers
    for i := 0; i < wp.numWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }

    // Goroutine to close results when all workers finish
    go func() {
        wp.wg.Wait()
        close(wp.results)
    }()

    // Goroutine to process results
    go wp.collectResults()
}

func (wp *WebhookProcessor) worker(id int) {
    defer wp.wg.Done()
    log.Printf("Worker %d started", id)

    for {
        select {
        case <-wp.ctx.Done():
            log.Printf("Worker %d: context cancelled, exiting", id)
            return
        case webhook, ok := <-wp.jobs:
            if !ok {
                log.Printf("Worker %d: channel closed, exiting", id)
                return
            }
            result := wp.processWebhook(webhook)
            select {
            case wp.results <- result:
            case <-wp.ctx.Done():
                return
            }
        }
    }
}

func (wp *WebhookProcessor) processWebhook(wh Webhook) ProcessResult {
    start := time.Now()

    // Real logic would go here: call an API, write to DB, etc.
    // Simulating with a sleep
    time.Sleep(50 * time.Millisecond)

    // Example: if the event type is not valid, return error
    validEvents := map[string]bool{
        "order.created":   true,
        "order.updated":   true,
        "payment.success": true,
        "payment.failed":  true,
    }

    if !validEvents[wh.EventType] {
        return ProcessResult{
            WebhookID: wh.ID,
            Success:   false,
            Duration:  time.Since(start),
            Err:       fmt.Errorf("unknown event: %s", wh.EventType),
        }
    }

    return ProcessResult{
        WebhookID: wh.ID,
        Success:   true,
        Duration:  time.Since(start),
    }
}

func (wp *WebhookProcessor) collectResults() {
    var processed, failed int
    for r := range wp.results {
        if r.Err != nil {
            failed++
            log.Printf("Webhook %s failed (%v): %v", r.WebhookID, r.Duration, r.Err)
        } else {
            processed++
            log.Printf("Webhook %s processed (%v)", r.WebhookID, r.Duration)
        }
    }
    log.Printf("Summary: %d processed, %d failed", processed, failed)
}

func (wp *WebhookProcessor) Enqueue(wh Webhook) error {
    select {
    case wp.jobs <- wh:
        return nil
    case <-wp.ctx.Done():
        return fmt.Errorf("processor stopped")
    default:
        return fmt.Errorf("queue full, dropping webhook %s", wh.ID)
    }
}

func (wp *WebhookProcessor) Shutdown(timeout time.Duration) {
    log.Println("Starting graceful shutdown...")

    // 1. Stop accepting new jobs
    close(wp.jobs)

    // 2. Wait for workers to finish with timeout
    done := make(chan struct{})
    go func() {
        wp.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Println("All workers finished cleanly")
    case <-time.After(timeout):
        log.Println("Timeout reached, forcing shutdown")
        wp.cancel()
        <-done // Wait for workers to exit after cancellation
    }
}

Key points from this example:

  • Enqueue with default: if the queue is full, it rejects the webhook immediately rather than blocking. In an HTTP handler, blocking would mean the request hangs indefinitely.
  • Shutdown with timeout: closes jobs and waits a reasonable time. If workers don’t finish, it cancels the context and forces exit.
  • collectResults as a separate goroutine: decouples result processing from the workers. You could replace the log.Printf calls with metrics, alerts, or database writes.
  • The worker uses double select: both when reading from jobs and when sending to results, always with ctx.Done() as an alternative. If you don’t put the select when sending to results, a worker could get stuck forever if nobody reads results and the context is cancelled.

To use it in a real HTTP service:

func main() {
    processor := NewWebhookProcessor(10, 100) // 10 workers, queue of 100
    processor.Start()

    http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
        var wh Webhook
        if err := json.NewDecoder(r.Body).Decode(&wh); err != nil {
            http.Error(w, "invalid payload", http.StatusBadRequest)
            return
        }
        wh.Received = time.Now()

        if err := processor.Enqueue(wh); err != nil {
            http.Error(w, err.Error(), http.StatusServiceUnavailable)
            return
        }

        w.WriteHeader(http.StatusAccepted)
    })

    // Listen for OS signals for graceful shutdown
    // (in production you'd use signal.NotifyContext)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

When worker pools are overkill

Not everything needs a worker pool. Sometimes the complexity isn’t justified.

You don’t need it if:

  • You only need to run 3-5 operations in parallel. An errgroup or even loose goroutines with a WaitGroup are sufficient.
  • The processing is so fast that concurrency doesn’t add anything. If each task takes 1ms, you’ll probably go faster sequentially (the overhead of channels and goroutines is not zero).
  • You only have one producer and one consumer. A direct channel between them is already a perfectly valid pipeline.

You need it if:

  • The load is high and continuous, and you need to control how many resources it consumes.
  • You need backpressure: the producer should slow down when workers can’t keep up.
  • You need metrics and observability per worker.
  • The task involves network calls, disk I/O or access to shared resources that degrade under excessive concurrency.

The decision is pragmatic. If errgroup.SetLimit(10) solves your problem, use it. If you need graceful shutdown, scaling, metrics and retries, build the pool. What makes no sense is launching goroutines without control and hoping everything will be fine.

Concurrency in Go is a powerful tool, but power without control isn’t engineering. It’s luck.

OshyTech

Backend and data engineering focused on scalable systems, automation, and AI.

Navigation

Copyright 2026 OshyTech. All Rights Reserved