Consumir missatges de Kafka amb Go: exemple pràctic per a backend

Com crear un consumidor de Kafka en Go amb segmentio/kafka-go: missatges, commits, errors, context i shutdown ordenat.

Cover for Consumir missatges de Kafka amb Go: exemple pràctic per a backend

Els consumidors de Kafka són un dels terrenys on Go brilla amb força: processos de llarga durada, processament concurrent de missatges i desplegament com un binari estàtic sense dependències. Si véns de la JVM, on un consumidor Kafka necessita mig centenar de dependències transitives i 512 MB de heap per arrencar, la diferència amb Go és brutal.

He muntat consumidors Kafka en Go per al processament d’esdeveniments, sincronització de dades entre serveis i pipelines d’ingesta. En tots els casos, el patró és el mateix: llegir missatges, processar-los, gestionar errors, fer commit d’offsets i tancar neta quan arriba un senyal. El que canvia és la lògica de negoci del mig.

D’això va aquest article: construir un consumidor Kafka en Go pas a pas, amb codi que pots fer servir com a base real. Sense abstraccions innecessàries, sense frameworks màgics.


Per què Go per a consumidors Kafka

Go no és l’única opció, però encaixa especialment bé en aquest tipus de workloads per raons concretes:

  • Binaris estàtics: Un consumidor Kafka en Go compila a un binari de ~15 MB. Sense runtime, sense JVM, sense intèrpret. La imatge Docker resultant pot basar-se en scratch o distroless i pesar menys de 20 MB.
  • Goroutines i concurrència nativa: Processar missatges en paral·lel és natural amb goroutines i channels. No necessites thread pools explícits ni frameworks de concurrència.
  • Arrencada immediata: Un procés Go arrenca en mil·lisegons. A Kubernetes, això significa que un HPA pot escalar consumidors ràpidament quan el lag creix.
  • Baix consum de memòria: Un consumidor Kafka típic en Go consumeix entre 20-50 MB de RAM. L’equivalent en Java/Kotlin amb Spring Kafka part de 200-300 MB.
  • Context natiu: El patró de context en Go encaixa perfectament amb el cicle de vida d’un consumidor: propagació de cancel·lació, timeouts per missatge i shutdown ordenat.

No tot és perfecte. L’ecosistema de llibreries Kafka en Go és més reduït que en JVM. No hi ha equivalent directe a Kafka Streams o Spring Kafka. Però per a consumidors purs (que és el 80% dels casos), Go és més que suficient.


Kafka en 2 minuts: el just per entendre el codi

Si ja treballes amb Kafka cada dia, salta aquesta secció. Si no, aquests són els conceptes que necessites per seguir l’article:

  • Topic: Un canal de missatges amb nom. Els productors escriuen missatges a un topic, els consumidors els llegeixen.
  • Partició: Cada topic es divideix en particions. Les particions permeten paral·lelisme: pots tenir N consumidors llegint N particions en paral·lel.
  • Offset: Cada missatge dins d’una partició té un número seqüencial (offset). El consumidor porta el compte de fins on ha llegit.
  • Consumer Group: Un grup de consumidors que es reparteixen les particions d’un topic. Kafka garanteix que cada partició la llegeix exactament un consumidor del grup.
  • Commit: L’acte d’informar Kafka que has processat un missatge fins a cert offset. Si el consumidor cau, en reiniciar comença des de l’últim offset committed.

La relació clau: si un topic té 6 particions i el teu consumer group té 3 consumidors, cadascun llegeix 2 particions. Si puges a 6 consumidors, cadascun llegeix 1. Si puges a 7, un es queda sense particions (idle). El nombre de particions és el sostre de paral·lelisme per consumer group.


Triar llibreria: segmentio/kafka-go vs confluent-kafka-go

En Go hi ha dues opcions principals:

confluent-kafka-go

  • Wrapper CGo sobre librdkafka (C).
  • Rendiment màxim i compatibilitat total amb les features de Kafka.
  • Requereix librdkafka instal·lada o linkada estàticament (compilació més lenta, cross-compilation més complicada).
  • API més propera al client C, menys idiomàtica en Go.

segmentio/kafka-go

  • Implementació pura en Go, sense dependències CGo.
  • API idiomàtica: Reader, Writer, Conn.
  • Cross-compilation trivial. Compilar per a Linux des de macOS funciona sense canvis.
  • Rendiment excel·lent per a la majoria de casos (no hi ha diferència pràctica per sota de 100K msg/s).

La meva elecció: segmentio/kafka-go. Per al 90% de consumidors backend, la simplicitat de compilació, l’API neta i l’absència de CGo compensen amb escreix. Només triaria confluent-kafka-go si necessités features molt específiques com transaccions de Kafka o Schema Registry integrat a nivell de llibreria.

Instal·lem la dependència:

go get github.com/segmentio/kafka-go

Configurar el consumidor amb kafka-go

La llibreria ofereix dues APIs: Reader (alt nivell, gestiona el consumer group) i Conn (baix nivell, connexió directa). Per a consumidors amb consumer group, Reader és el que vols.

// internal/kafka/consumer.go
package kafka

import (
	\"context\"
	\"log/slog\"
	\"time\"

	\"github.com/segmentio/kafka-go\"
)

type ConsumerConfig struct {
	Brokers        []string
	Topic          string
	GroupID        string
	MinBytes       int
	MaxBytes       int
	CommitInterval time.Duration
}

func DefaultConsumerConfig() ConsumerConfig {
	return ConsumerConfig{
		Brokers:        []string{\"localhost:9092\"},
		Topic:          \"events\",
		GroupID:        \"my-service\",
		MinBytes:       1,          // 1 byte mínim per fetch
		MaxBytes:       10e6,       // 10 MB màxim per fetch
		CommitInterval: 0,          // 0 = commit manual
	}
}

func NewReader(cfg ConsumerConfig) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:        cfg.Brokers,
		Topic:          cfg.Topic,
		GroupID:        cfg.GroupID,
		MinBytes:       cfg.MinBytes,
		MaxBytes:       cfg.MaxBytes,
		CommitInterval: cfg.CommitInterval,
		StartOffset:    kafka.LastOffset,
		Logger:         kafka.LoggerFunc(func(msg string, args ...interface{}) {
			slog.Debug(msg, \"args\", args)
		}),
		ErrorLogger:    kafka.LoggerFunc(func(msg string, args ...interface{}) {
			slog.Error(msg, \"args\", args)
		}),
	})
}

Punts importants:

  • CommitInterval: 0: Desactiva l’auto-commit. Volem control total sobre quan es confirma un offset. Més sobre això a la secció de commits.
  • StartOffset: kafka.LastOffset: Si el consumer group no té offset desat (primera vegada), comença des de l’últim missatge. L’alternativa és kafka.FirstOffset per consumir tot l’històric.
  • GroupID: Identifica el consumer group. Tots els processos amb el mateix GroupID es reparteixen les particions.

Llegir missatges en bucle

El patró bàsic és un bucle infinit amb reader.ReadMessage() o reader.FetchMessage(). La diferència és crucial:

  • ReadMessage(): Llegeix el missatge i fa commit automàticament.
  • FetchMessage(): Llegeix el missatge sense fer commit. Tu decideixes quan cridar CommitMessages().

Per a un consumidor amb commit manual (que és el que vols en producció), usa FetchMessage():

// internal/kafka/consumer.go

type MessageHandler func(ctx context.Context, msg kafka.Message) error

func Consume(ctx context.Context, reader *kafka.Reader, handler MessageHandler) error {
	slog.Info(\"starting kafka consumer\",
		\"topic\", reader.Config().Topic,
		\"group\", reader.Config().GroupID,
	)

	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				slog.Info(\"consumer context cancelled, stopping\")
				return nil
			}
			slog.Error(\"error fetching message\", \"error\", err)
			continue
		}

		slog.Debug(\"message received\",
			\"topic\", msg.Topic,
			\"partition\", msg.Partition,
			\"offset\", msg.Offset,
			\"key\", string(msg.Key),
		)

		if err := handler(ctx, msg); err != nil {
			slog.Error(\"error processing message\",
				\"error\", err,
				\"topic\", msg.Topic,
				\"partition\", msg.Partition,
				\"offset\", msg.Offset,
			)
			// No fem commit: el missatge es reprocessarà
			continue
		}

		if err := reader.CommitMessages(ctx, msg); err != nil {
			slog.Error(\"error committing message\",
				\"error\", err,
				\"offset\", msg.Offset,
			)
		}
	}
}

Aquest patró té una propietat fonamental: at-least-once delivery. Si el procés cau després de processar un missatge però abans de fer commit, en reiniciar tornarà a llegir aquell missatge. El teu handler ha de ser idempotent.

El check de ctx.Err() després de l’error de FetchMessage és clau. Quan cancel·les el context (per exemple, en rebre SIGTERM), FetchMessage retorna un error. Sense aquell check, el consumidor entraria en un bucle infinit d’errors.


Processar missatges: deserialització i lògica de negoci

El MessageHandler rep un kafka.Message amb bytes crus. Necessites deserialitzar i executar la teva lògica de negoci. Un enfocament net és separar la deserialització del processament:

// internal/event/order.go
package event

import (
	\"context\"
	\"encoding/json\"
	\"fmt\"
	\"log/slog\"
	\"time\"

	\"github.com/segmentio/kafka-go\"
)

type OrderCreatedEvent struct {
	OrderID   string    `json:\"order_id\"`
	UserID    string    `json:\"user_id\"`
	Amount    float64   `json:\"amount\"`
	Currency  string    `json:\"currency\"`
	CreatedAt time.Time `json:\"created_at\"`
}

type OrderProcessor struct {
	// Aquí anirien les teves dependències: repositori, servei de notificacions, etc.
}

func NewOrderProcessor() *OrderProcessor {
	return &OrderProcessor{}
}

func (p *OrderProcessor) Handle(ctx context.Context, msg kafka.Message) error {
	var event OrderCreatedEvent
	if err := json.Unmarshal(msg.Value, &event); err != nil {
		return fmt.Errorf(\"deserializing order event: %w\", err)
	}

	slog.Info(\"processing order\",
		\"order_id\", event.OrderID,
		\"user_id\", event.UserID,
		\"amount\", event.Amount,
	)

	// La teva lògica de negoci aquí.
	// Exemple: desar a base de dades, enviar notificació, etc.
	if err := p.processOrder(ctx, event); err != nil {
		return fmt.Errorf(\"processing order %s: %w\", event.OrderID, err)
	}

	return nil
}

func (p *OrderProcessor) processOrder(ctx context.Context, event OrderCreatedEvent) error {
	// Implementació real de la lògica de negoci
	slog.Info(\"order processed successfully\", \"order_id\", event.OrderID)
	return nil
}

Algunes decisions deliberades:

  • json.Unmarshal directe: Per a JSON funciona bé. Si uses Avro o Protobuf, substitueix pel deserialitzador corresponent.
  • Wrapping d’errors amb %w: Permet a la capa superior inspeccionar el tipus d’error i decidir si reintenta o descarta. Més sobre això a la secció d’errors.
  • Struct OrderProcessor amb dependències: No funcions soltes. Això facilita la injecció de dependències i el testing.

Estratègies de commit: auto-commit vs commit manual

Com i quan fas commit d’offsets defineix les garanties de lliurament del teu consumidor. Hi ha tres estratègies principals:

Auto-commit periòdic

reader := kafka.NewReader(kafka.ReaderConfig{
	// ...
	CommitInterval: 5 * time.Second, // Commit cada 5 segons
})

Kafka-go fa commit automàticament dels offsets dels missatges llegits cada N segons. Avantatge: simplicitat. Problema: si el procés cau entre un commit automàtic i el següent, perds la referència dels missatges processats i es reproces en reiniciar.

Commit manual per missatge

És el que hem vist a dalt: FetchMessage + CommitMessages després de processar cada missatge.

msg, _ := reader.FetchMessage(ctx)
// processar...
reader.CommitMessages(ctx, msg)

Avantatge: control total. L’offset es fa commit només quan el missatge està processat. Desavantatge: un commit per cada missatge implica més crides a Kafka (més latència si processen milers de missatges per segon).

Commit manual per lot

Acumules N missatges o esperes T temps, processes el lot, i fas commit de l’últim offset del lot:

func ConsumeBatch(ctx context.Context, reader *kafka.Reader, batchSize int, handler func(ctx context.Context, msgs []kafka.Message) error) error {
	batch := make([]kafka.Message, 0, batchSize)

	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				return nil
			}
			slog.Error(\"error fetching message\", \"error\", err)
			continue
		}

		batch = append(batch, msg)

		if len(batch) >= batchSize {
			if err := handler(ctx, batch); err != nil {
				slog.Error(\"error processing batch\", \"error\", err)
				batch = batch[:0]
				continue
			}

			// Commit de l'últim missatge del lot
			last := batch[len(batch)-1]
			if err := reader.CommitMessages(ctx, last); err != nil {
				slog.Error(\"error committing batch\", \"error\", err)
			}

			batch = batch[:0]
		}
	}
}

Aquesta estratègia és la més eficient per a alt throughput, però té un risc: si el procés cau a meitat d’un lot, es reproces tots els missatges del lot. El teu handler ha de ser idempotent.

La meva recomanació: comença amb commit manual per missatge. És el més segur i fàcil de raonar. Passa a batch només si el throughput ho exigeix i la teva lògica és idempotent (que hauria de ser-ho de totes formes).


Gestió d’errors: transitoris vs permanents

No tots els errors es tracten igual. Un timeout de xarxa i un JSON mal format requereixen estratègies diferents:

// internal/kafka/errors.go
package kafka

import \"errors\"

// PermanentError indica un error que no es resoldrà amb reintents.
// Exemple: JSON mal format, camp obligatori absent, validació fallida.
type PermanentError struct {
	Err error
}

func (e *PermanentError) Error() string {
	return e.Err.Error()
}

func (e *PermanentError) Unwrap() error {
	return e.Err
}

func NewPermanentError(err error) *PermanentError {
	return &PermanentError{Err: err}
}

func IsPermanent(err error) bool {
	var permanent *PermanentError
	return errors.As(err, &permanent)
}

Ara, al consumidor, la lògica de decisió:

func ConsumeWithRetry(ctx context.Context, reader *kafka.Reader, handler MessageHandler, maxRetries int) error {
	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				return nil
			}
			slog.Error(\"error fetching message\", \"error\", err)
			continue
		}

		var processingErr error
		for attempt := 1; attempt <= maxRetries; attempt++ {
			processingErr = handler(ctx, msg)
			if processingErr == nil {
				break
			}

			if IsPermanent(processingErr) {
				slog.Error(\"permanent error, skipping message\",
					\"error\", processingErr,
					\"offset\", msg.Offset,
					\"partition\", msg.Partition,
				)
				// Aquí podries enviar a un dead-letter topic
				break
			}

			slog.Warn(\"transient error, retrying\",
				\"error\", processingErr,
				\"attempt\", attempt,
				\"max_retries\", maxRetries,
			)
			time.Sleep(time.Duration(attempt) * time.Second) // Backoff lineal
		}

		// Commit fins i tot si ha fallat: evita bloquejar la partició
		if err := reader.CommitMessages(ctx, msg); err != nil {
			slog.Error(\"error committing\", \"error\", err)
		}
	}
}

La decisió de fer commit fins i tot quan el processament falla és deliberada. Si no ho fas, el consumidor es queda encallat al mateix missatge indefinidament. L’alternativa és enviar els missatges fallits a un dead-letter topic (DLT) abans de fer commit:

func sendToDeadLetter(ctx context.Context, writer *kafka.Writer, msg kafka.Message, err error) error {
	return writer.WriteMessages(ctx, kafka.Message{
		Key:   msg.Key,
		Value: msg.Value,
		Headers: append(msg.Headers,
			kafka.Header{Key: \"original-topic\", Value: []byte(msg.Topic)},
			kafka.Header{Key: \"error\", Value: []byte(err.Error())},
			kafka.Header{Key: \"failed-at\", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
		),
	})
}

Shutdown ordenat amb context i senyals del sistema

Un consumidor Kafka en producció ha de tancar neta: deixar de llegir missatges, acabar el processament en curs i fer commit del pendent. Si el procés mor de cop, els missatges en vol es reproces (que no és catastròfic si ets idempotent, però és un malbaratament).

El patró és interceptar senyals del sistema operatiu i propagar la cancel·lació a través del context de Go:

// cmd/consumer/main.go
package main

import (
	\"context\"
	\"log/slog\"
	\"os\"
	\"os/signal\"
	\"syscall\"

	appkafka \"myservice/internal/kafka\"
	\"myservice/internal/event\"
)

func main() {
	slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelInfo,
	})))

	ctx, cancel := signal.NotifyContext(context.Background(),
		syscall.SIGINT,
		syscall.SIGTERM,
	)
	defer cancel()

	cfg := appkafka.DefaultConsumerConfig()
	cfg.Topic = \"orders\"
	cfg.GroupID = \"order-processor\"

	reader := appkafka.NewReader(cfg)
	defer func() {
		if err := reader.Close(); err != nil {
			slog.Error(\"error closing reader\", \"error\", err)
		}
		slog.Info(\"kafka reader closed\")
	}()

	processor := event.NewOrderProcessor()

	slog.Info(\"starting consumer\",
		\"topic\", cfg.Topic,
		\"group\", cfg.GroupID,
		\"brokers\", cfg.Brokers,
	)

	if err := appkafka.Consume(ctx, reader, processor.Handle); err != nil {
		slog.Error(\"consumer error\", \"error\", err)
		os.Exit(1)
	}

	slog.Info(\"consumer stopped gracefully\")
}

signal.NotifyContext és la forma més neta de gestionar això en Go. Crea un context que es cancel·la automàticament quan arriba un dels senyals especificats. En cancel·lar-se el context:

  1. FetchMessage(ctx) retorna un error.
  2. El bucle detecta ctx.Err() != nil i surt.
  3. defer reader.Close() tanca la connexió amb Kafka.
  4. El procés acaba net.

A Kubernetes, el procés rebrà SIGTERM quan el pod va a morir. Tens 30 segons per defecte (terminationGracePeriodSeconds) per tancar. Amb aquest patró, el shutdown tarda mil·lisegons.

Si el teu processament és lent i necessites un worker pool per paral·lelitzar missatges, el context es propaga igual a cada goroutine del pool. Quan arriba SIGTERM, totes les goroutines reben el senyal de cancel·lació i acaben la feina en curs.


Docker Compose amb Kafka per al desenvolupament local

Per desenvolupar i provar el consumidor necessites un cluster Kafka local. Docker Compose amb KRaft (sense Zookeeper) és l’opció més simple:

# docker-compose.yml
services:
  kafka:
    image: apache/kafka:3.8.0
    container_name: kafka
    ports:
      - \"9092:9092\"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
    healthcheck:
      test: [\"CMD-SHELL\", \"/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list\"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka-init:
    image: apache/kafka:3.8.0
    depends_on:
      kafka:
        condition: service_healthy
    entrypoint: [\"/bin/sh\", \"-c\"]
    command: |
      \"
      /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic orders --partitions 3 --replication-factor 1
      echo 'Topic orders created'
      \"

Arranques amb docker compose up -d i tens un broker Kafka amb un topic orders de 3 particions llest per usar.

Per produir missatges de prova des del terminal:

// cmd/producer/main.go
package main

import (
	\"context\"
	\"encoding/json\"
	\"fmt\"
	\"log\"
	\"time\"

	\"github.com/segmentio/kafka-go\"
)

func main() {
	writer := &kafka.Writer{
		Addr:     kafka.TCP(\"localhost:9092\"),
		Topic:    \"orders\",
		Balancer: &kafka.LeastBytes{},
	}
	defer writer.Close()

	for i := 1; i <= 10; i++ {
		event := map[string]interface{}{
			\"order_id\":   fmt.Sprintf(\"ORD-%03d\", i),
			\"user_id\":    fmt.Sprintf(\"USR-%03d\", i),
			\"amount\":     float64(i) * 29.99,
			\"currency\":   \"EUR\",
			\"created_at\": time.Now().UTC(),
		}

		value, _ := json.Marshal(event)

		err := writer.WriteMessages(context.Background(), kafka.Message{
			Key:   []byte(fmt.Sprintf(\"ORD-%03d\", i)),
			Value: value,
		})
		if err != nil {
			log.Fatalf(\"error writing message: %v\", err)
		}

		fmt.Printf(\"sent: %s\n\", string(value))
	}
}

Testing amb testcontainers

Els tests unitaris estan bé per a la lògica de negoci, però un consumidor Kafka necessita tests d’integració contra un Kafka real. testcontainers-go et permet aixecar un broker Kafka en un contenidor Docker dins dels teus tests:

// internal/kafka/consumer_test.go
package kafka_test

import (
	\"context\"
	\"encoding/json\"
	\"testing\"
	\"time\"

	\"github.com/segmentio/kafka-go\"
	tc \"github.com/testcontainers/testcontainers-go\"
	\"github.com/testcontainers/testcontainers-go/modules/kafka\"
)

func TestConsumer_ProcessesMessages(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	// Aixecar Kafka en un contenidor
	kafkaContainer, err := kafka.Run(ctx, \"confluentinc/confluent-local:7.5.0\",
		kafka.WithClusterID(\"test-cluster\"),
	)
	if err != nil {
		t.Fatalf(\"starting kafka container: %v\", err)
	}
	defer func() {
		if err := tc.TerminateContainer(kafkaContainer); err != nil {
			t.Logf(\"terminating container: %v\", err)
		}
	}()

	brokers, err := kafkaContainer.Brokers(ctx)
	if err != nil {
		t.Fatalf(\"getting brokers: %v\", err)
	}

	topic := \"test-orders\"

	// Crear el topic
	conn, err := kafka.DialLeader(ctx, \"tcp\", brokers[0], topic, 0)
	if err != nil {
		// Si falla, intentar crear via l'API d'admin
		adminConn, _ := kafka.Dial(\"tcp\", brokers[0])
		defer adminConn.Close()
		adminConn.CreateTopics(kafka.TopicConfig{
			Topic:             topic,
			NumPartitions:     1,
			ReplicationFactor: 1,
		})
	} else {
		conn.Close()
	}

	// Produir un missatge de prova
	writer := &kafka.Writer{
		Addr:     kafka.TCP(brokers[0]),
		Topic:    topic,
		Balancer: &kafka.LeastBytes{},
	}

	event := map[string]interface{}{
		\"order_id\": \"ORD-TEST-001\",
		\"user_id\":  \"USR-001\",
		\"amount\":   99.99,
	}
	value, _ := json.Marshal(event)

	err = writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte(\"ORD-TEST-001\"),
		Value: value,
	})
	writer.Close()
	if err != nil {
		t.Fatalf(\"writing message: %v\", err)
	}

	// Consumir i verificar
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		Topic:    topic,
		GroupID:  \"test-group\",
		MinBytes: 1,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		t.Fatalf(\"reading message: %v\", err)
	}

	var received map[string]interface{}
	if err := json.Unmarshal(msg.Value, &received); err != nil {
		t.Fatalf(\"unmarshaling message: %v\", err)
	}

	if received[\"order_id\"] != \"ORD-TEST-001\" {
		t.Errorf(\"expected order_id ORD-TEST-001, got %v\", received[\"order_id\"])
	}
}

El test aixeca un contenidor Kafka real, produeix un missatge, el consumeix i verifica el contingut. Tarda uns segons en arrencar el contenidor, però et dóna confiança real que el teu consumidor funciona contra un broker Kafka de debò.

Per executar-lo:

go test -v -tags=integration ./internal/kafka/...

Pots usar build tags per separar els tests d’integració dels unitaris i no executar-los a cada go test ./....


Consideracions de producció

Un consumidor Kafka que funciona en local no és el mateix que un preparat per a producció. Aquests són els punts que no pots ignorar:

Monitorització del consumer lag

El consumer lag és la diferència entre l’offset més recent del topic i l’offset commitejat pel teu consumer group. Si el lag creix, el teu consumidor no està processant missatges al ritme que arriben.

Exposa mètriques del reader de kafka-go:

func reportMetrics(reader *kafka.Reader) {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		stats := reader.Stats()
		slog.Info(\"consumer stats\",
			\"messages\", stats.Messages,
			\"bytes\", stats.Bytes,
			\"rebalances\", stats.Rebalances,
			\"timeouts\", stats.Timeouts,
			\"errors\", stats.Errors,
			\"dial_time_avg\", stats.DialTime.Avg,
			\"read_time_avg\", stats.ReadTime.Avg,
			\"wait_time_avg\", stats.WaitTime.Avg,
			\"fetch_size_avg\", stats.FetchSize.Avg,
			\"fetch_bytes_avg\", stats.FetchBytes.Avg,
			\"offset\", stats.Offset,
			\"lag\", stats.Lag,
		)
	}
}

En producció, aquestes mètriques haurien d’anar a Prometheus, Datadog o el teu sistema de monitorització. El stats.Lag és la mètrica crítica: configura una alerta si supera un llindar.

Escalat horitzontal

Per escalar consumidors, simplement llança més instàncies amb el mateix GroupID. Kafka reequilibrarà les particions automàticament. Recorda:

  • Màxim de consumidors = nombre de particions. Si tens 6 particions, no escalis més enllà de 6 instàncies.
  • El reequilibri té cost: Cada vegada que una instància entra o surt del grup, Kafka reasigna particions. Durant el reequilibri, el consum es pausa breument.
  • A Kubernetes: Usa un Deployment (no un StatefulSet) i configura un HPA basat en el consumer lag.

Idempotència

Ho repeteixo perquè és el que més s’oblida: el teu handler ha de ser idempotent. Amb at-least-once delivery, rebràs missatges duplicats. Això passa quan:

  • El consumidor processa un missatge però cau abans de commitear.
  • Un reequilibri reasigna una partició i es reproces missatges.
  • Kafka té un bug (rar, però passa).

Estratègies d’idempotència:

  • Clau única: Desa l’order_id (o l’identificador natural de l’esdeveniment) i fes un upsert en lloc d’un insert.
  • Taula de deduplicació: Desa els offsets processats i verifica abans de processar.
  • Operacions idempotents per disseny: SET balance = 100 és idempotent, SET balance = balance + 100 no ho és.

Timeouts i deadlines

Cada missatge s’hauria de processar amb un timeout. Si un handler es queda penjat, bloqueja aquella partició:

func handleWithTimeout(ctx context.Context, msg kafka.Message, handler MessageHandler, timeout time.Duration) error {
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	return handler(ctx, msg)
}

Un timeout de 30 segons per missatge és un bon punt de partida. Ajusta’l segons el teu cas.

Health checks

A Kubernetes, el teu consumidor necessita un endpoint de salut. Pots aixecar un servidor HTTP mínim en paral·lel:

func startHealthServer(ctx context.Context, port string) {
	mux := http.NewServeMux()
	mux.HandleFunc(\"/health\", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte(\"ok\"))
	})

	server := &http.Server{Addr: \":\" + port, Handler: mux}

	go func() {
		<-ctx.Done()
		shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		server.Shutdown(shutdownCtx)
	}()

	if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
		slog.Error(\"health server error\", \"error\", err)
	}
}

Llança’l en una goroutine des de main() i configura els probes de Kubernetes per apuntar a /health.


Estructura final del projecte

En juntar-ho tot, el projecte queda així:

kafka-consumer/
├── cmd/
│   ├── consumer/
│   │   └── main.go          # Punt d'entrada del consumidor
│   └── producer/
│       └── main.go          # Productor de proves
├── internal/
│   ├── event/
│   │   └── order.go         # Handler d'esdeveniments de comandes
│   └── kafka/
│       ├── consumer.go       # Configuració i bucle de consum
│       ├── consumer_test.go  # Tests d'integració
│       └── errors.go         # Tipus d'error
├── docker-compose.yml
├── go.mod
└── go.sum

Si véns d’un article sobre workers en Go, veuràs que l’estructura és gairebé idèntica. I no és casualitat: un consumidor Kafka és, en essència, un worker que rep feina des d’un topic en lloc des d’una cua en memòria.


Kafka sense la complexitat de la JVM

Un consumidor Kafka en Go es construeix amb poques peces que encaixen de forma natural: un reader de segmentio/kafka-go amb consumer group i commit manual, un bucle de consum basat en FetchMessage + CommitMessages per al control total de l’offset, i handlers tipats que deserialitzen el missatge i executen la lògica de negoci. La resta és classificar errors entre transitoris i permanents, garantir un shutdown ordenat amb signal.NotifyContext, i dissenyar handlers idempotents perquè at-least-once delivery implica duplicats.

Go no té l’ecosistema Kafka de la JVM. No hi ha Kafka Streams, no hi ha Spring Kafka amb les seves 47 anotacions. Però per a consumidors backend (que són la majoria dels casos), el que ofereix és suficient i considerablement més simple. Un binari de 15 MB, 30 MB de RAM, arrencada en mil·lisegons i concurrència nativa per paral·lelitzar el processament.

El que m’agrada d’aquest enfocament és que el consumidor acaba sent un worker més. Si véns de l’article de workers en Go, hauràs vist que l’estructura és gairebé idèntica. La diferència és d’on ve la feina, no de com la processen. I això és exactament el que vols: patrons reutilitzables que s’adapten al transport sense reescriure la lògica.

Si necessites processar missatges Kafka amb concurrència real, combina el que has vist aquí amb un worker pool: el reader alimenta un channel, el pool processa els missatges en paral·lel i el commit es fa quan tots els missatges del batch estan processats.

OshyTech

Enginyeria backend i de dades orientada a sistemes escalables, automatització i IA.

Navegació

Copyright 2026 OshyTech. Tots els drets reservats