Consuming Kafka Messages with Go: A Practical Backend Example

How to build a Kafka consumer in Go with segmentio/kafka-go: messages, commits, errors, context and graceful shutdown.

Cover for Consuming Kafka Messages with Go: A Practical Backend Example

Kafka consumers are one of the areas where Go truly shines: long-running processes, concurrent message processing, and deployment as a static binary with no dependencies. If you come from the JVM world, where a Kafka consumer needs half a hundred transitive dependencies and 512 MB of heap just to start, the difference with Go is stark.

I’ve built Kafka consumers in Go for event processing, data synchronization between services, and ingestion pipelines. In all cases, the pattern is the same: read messages, process them, handle errors, commit offsets, and shut down cleanly when a signal arrives. What changes is the business logic in the middle.

That’s what this article is about: building a Kafka consumer in Go step by step, with code you can use as a real foundation. No unnecessary abstractions, no magic frameworks.


Why Go for Kafka Consumers

Go isn’t the only option, but it fits especially well for this type of workload for concrete reasons:

  • Static binaries: A Kafka consumer in Go compiles to a ~15 MB binary. No runtime, no JVM, no interpreter. The resulting Docker image can be based on scratch or distroless and weigh less than 20 MB.
  • Goroutines and native concurrency: Processing messages in parallel is natural with goroutines and channels. You don’t need explicit thread pools or concurrency frameworks.
  • Instant startup: A Go process starts in milliseconds. In Kubernetes, this means an HPA can scale consumers quickly when lag grows.
  • Low memory usage: A typical Kafka consumer in Go uses between 20-50 MB of RAM. The Java/Kotlin equivalent with Spring Kafka starts at 200-300 MB.
  • Native context: The context pattern in Go fits perfectly with the consumer lifecycle: cancellation propagation, per-message timeouts, and graceful shutdown.

Not everything is perfect. The Kafka library ecosystem in Go is smaller than in JVM. There’s no direct equivalent to Kafka Streams or Spring Kafka. But for pure consumers (which is 80% of cases), Go is more than sufficient.


Kafka in 2 Minutes: Just Enough to Understand the Code

If you work with Kafka daily, skip this section. If not, these are the concepts you need to follow the article:

  • Topic: A named message channel. Producers write messages to a topic, consumers read them.
  • Partition: Each topic is divided into partitions. Partitions enable parallelism: you can have N consumers reading N partitions in parallel.
  • Offset: Each message within a partition has a sequential number (offset). The consumer tracks how far it has read.
  • Consumer Group: A group of consumers that share the partitions of a topic. Kafka guarantees that each partition is read by exactly one consumer in the group.
  • Commit: The act of informing Kafka that you’ve processed a message up to a certain offset. If the consumer crashes, upon restart it begins from the last committed offset.

The key relationship: if a topic has 6 partitions and your consumer group has 3 consumers, each reads 2 partitions. Scale to 6 consumers, each reads 1. Scale to 7, one ends up with no partitions (idle). The number of partitions is the parallelism ceiling per consumer group.


Choosing a Library: segmentio/kafka-go vs confluent-kafka-go

In Go there are two main options:

confluent-kafka-go

  • CGo wrapper over librdkafka (C).
  • Maximum performance and full compatibility with Kafka features.
  • Requires librdkafka installed or statically linked (slower compilation, more complex cross-compilation).
  • API closer to the C client, less idiomatic in Go.

segmentio/kafka-go

  • Pure Go implementation, no CGo dependencies.
  • Idiomatic API: Reader, Writer, Conn.
  • Trivial cross-compilation. Compiling for Linux from macOS works without changes.
  • Excellent performance for most cases (no practical difference below 100K msg/s).

My choice: segmentio/kafka-go. For 90% of backend consumers, the simplicity of compilation, clean API, and absence of CGo more than compensate. I’d only choose confluent-kafka-go if I needed very specific features like Kafka transactions or Schema Registry integrated at the library level.

Install the dependency:

go get github.com/segmentio/kafka-go

Configuring the Consumer with kafka-go

The library offers two APIs: Reader (high-level, handles the consumer group) and Conn (low-level, direct connection). For consumers with a consumer group, Reader is what you want.

// internal/kafka/consumer.go
package kafka

import (
	"context"
	"log/slog"
	"time"

	"github.com/segmentio/kafka-go"
)

type ConsumerConfig struct {
	Brokers        []string
	Topic          string
	GroupID        string
	MinBytes       int
	MaxBytes       int
	CommitInterval time.Duration
}

func DefaultConsumerConfig() ConsumerConfig {
	return ConsumerConfig{
		Brokers:        []string{"localhost:9092"},
		Topic:          "events",
		GroupID:        "my-service",
		MinBytes:       1,          // minimum 1 byte per fetch
		MaxBytes:       10e6,       // maximum 10 MB per fetch
		CommitInterval: 0,          // 0 = manual commit
	}
}

func NewReader(cfg ConsumerConfig) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers:        cfg.Brokers,
		Topic:          cfg.Topic,
		GroupID:        cfg.GroupID,
		MinBytes:       cfg.MinBytes,
		MaxBytes:       cfg.MaxBytes,
		CommitInterval: cfg.CommitInterval,
		StartOffset:    kafka.LastOffset,
		Logger:         kafka.LoggerFunc(func(msg string, args ...interface{}) {
			slog.Debug(msg, "args", args)
		}),
		ErrorLogger:    kafka.LoggerFunc(func(msg string, args ...interface{}) {
			slog.Error(msg, "args", args)
		}),
	})
}

Key points:

  • CommitInterval: 0: Disables auto-commit. We want full control over when an offset is confirmed. More on this in the commits section.
  • StartOffset: kafka.LastOffset: If the consumer group has no saved offset (first time), start from the latest message. The alternative is kafka.FirstOffset to consume the entire history.
  • GroupID: Identifies the consumer group. All processes with the same GroupID share the partitions.

Reading Messages in a Loop

The basic pattern is an infinite loop with reader.ReadMessage() or reader.FetchMessage(). The difference is crucial:

  • ReadMessage(): Reads the message and commits automatically.
  • FetchMessage(): Reads the message without committing. You decide when to call CommitMessages().

For a consumer with manual commit (which is what you want in production), use FetchMessage():

// internal/kafka/consumer.go

type MessageHandler func(ctx context.Context, msg kafka.Message) error

func Consume(ctx context.Context, reader *kafka.Reader, handler MessageHandler) error {
	slog.Info("starting kafka consumer",
		"topic", reader.Config().Topic,
		"group", reader.Config().GroupID,
	)

	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				slog.Info("consumer context cancelled, stopping")
				return nil
			}
			slog.Error("error fetching message", "error", err)
			continue
		}

		slog.Debug("message received",
			"topic", msg.Topic,
			"partition", msg.Partition,
			"offset", msg.Offset,
			"key", string(msg.Key),
		)

		if err := handler(ctx, msg); err != nil {
			slog.Error("error processing message",
				"error", err,
				"topic", msg.Topic,
				"partition", msg.Partition,
				"offset", msg.Offset,
			)
			// No commit: the message will be reprocessed
			continue
		}

		if err := reader.CommitMessages(ctx, msg); err != nil {
			slog.Error("error committing message",
				"error", err,
				"offset", msg.Offset,
			)
		}
	}
}

This pattern has a fundamental property: at-least-once delivery. If the process crashes after processing a message but before committing, upon restart it will read that message again. Your handler must be idempotent.

The ctx.Err() check after the FetchMessage error is key. When you cancel the context (for example, upon receiving SIGTERM), FetchMessage returns an error. Without that check, the consumer would enter an infinite error loop.


Processing Messages: Deserialization and Business Logic

The MessageHandler receives a kafka.Message with raw bytes. You need to deserialize and execute your business logic. A clean approach is to separate deserialization from processing:

// internal/event/order.go
package event

import (
	"context"
	"encoding/json"
	"fmt"
	"log/slog"
	"time"

	"github.com/segmentio/kafka-go"
)

type OrderCreatedEvent struct {
	OrderID   string    `json:"order_id"`
	UserID    string    `json:"user_id"`
	Amount    float64   `json:"amount"`
	Currency  string    `json:"currency"`
	CreatedAt time.Time `json:"created_at"`
}

type OrderProcessor struct {
	// Your dependencies go here: repository, notification service, etc.
}

func NewOrderProcessor() *OrderProcessor {
	return &OrderProcessor{}
}

func (p *OrderProcessor) Handle(ctx context.Context, msg kafka.Message) error {
	var event OrderCreatedEvent
	if err := json.Unmarshal(msg.Value, &event); err != nil {
		return fmt.Errorf("deserializing order event: %w", err)
	}

	slog.Info("processing order",
		"order_id", event.OrderID,
		"user_id", event.UserID,
		"amount", event.Amount,
	)

	// Your business logic here.
	// Example: save to database, send notification, etc.
	if err := p.processOrder(ctx, event); err != nil {
		return fmt.Errorf("processing order %s: %w", event.OrderID, err)
	}

	return nil
}

func (p *OrderProcessor) processOrder(ctx context.Context, event OrderCreatedEvent) error {
	// Real implementation of business logic
	slog.Info("order processed successfully", "order_id", event.OrderID)
	return nil
}

Some deliberate decisions:

  • Direct json.Unmarshal: Works well for JSON. If you use Avro or Protobuf, substitute the corresponding deserializer.
  • Error wrapping with %w: Allows the upper layer to inspect the error type and decide whether to retry or discard. More on this in the errors section.
  • OrderProcessor struct with dependencies: Not loose functions. This facilitates dependency injection and testing.

Commit Strategies: Auto-commit vs Manual Commit

How and when you commit offsets defines the delivery guarantees of your consumer. There are three main strategies:

Periodic Auto-commit

reader := kafka.NewReader(kafka.ReaderConfig{
	// ...
	CommitInterval: 5 * time.Second, // Commit every 5 seconds
})

Kafka-go automatically commits the offsets of read messages every N seconds. Advantage: simplicity. Problem: if the process crashes between an automatic commit and the next, you lose the reference of processed messages and they are reprocessed on restart.

Manual Commit per Message

This is what we’ve seen above: FetchMessage + CommitMessages after processing each message.

msg, _ := reader.FetchMessage(ctx)
// process...
reader.CommitMessages(ctx, msg)

Advantage: full control. The offset is committed only when the message is processed. Disadvantage: one commit per message means more calls to Kafka (more latency if you process thousands of messages per second).

Manual Batch Commit

You accumulate N messages or wait T time, process the batch, and commit the last offset of the batch:

func ConsumeBatch(ctx context.Context, reader *kafka.Reader, batchSize int, handler func(ctx context.Context, msgs []kafka.Message) error) error {
	batch := make([]kafka.Message, 0, batchSize)

	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				return nil
			}
			slog.Error("error fetching message", "error", err)
			continue
		}

		batch = append(batch, msg)

		if len(batch) >= batchSize {
			if err := handler(ctx, batch); err != nil {
				slog.Error("error processing batch", "error", err)
				batch = batch[:0]
				continue
			}

			// Commit the last message of the batch
			last := batch[len(batch)-1]
			if err := reader.CommitMessages(ctx, last); err != nil {
				slog.Error("error committing batch", "error", err)
			}

			batch = batch[:0]
		}
	}
}

This strategy is the most efficient for high throughput, but it carries a risk: if the process crashes in the middle of a batch, all messages in the batch are reprocessed. Your handler must be idempotent.

My recommendation: start with manual commit per message. It’s the safest and easiest to reason about. Switch to batch only if throughput demands it and your logic is idempotent (which it should be anyway).


Error Handling: Transient vs Permanent

Not all errors are treated equally. A network timeout and malformed JSON require different strategies:

// internal/kafka/errors.go
package kafka

import "errors"

// PermanentError indicates an error that won't be resolved with retries.
// Example: malformed JSON, missing required field, failed validation.
type PermanentError struct {
	Err error
}

func (e *PermanentError) Error() string {
	return e.Err.Error()
}

func (e *PermanentError) Unwrap() error {
	return e.Err
}

func NewPermanentError(err error) *PermanentError {
	return &PermanentError{Err: err}
}

func IsPermanent(err error) bool {
	var permanent *PermanentError
	return errors.As(err, &permanent)
}

Now, in the consumer, the decision logic:

func ConsumeWithRetry(ctx context.Context, reader *kafka.Reader, handler MessageHandler, maxRetries int) error {
	for {
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if ctx.Err() != nil {
				return nil
			}
			slog.Error("error fetching message", "error", err)
			continue
		}

		var processingErr error
		for attempt := 1; attempt <= maxRetries; attempt++ {
			processingErr = handler(ctx, msg)
			if processingErr == nil {
				break
			}

			if IsPermanent(processingErr) {
				slog.Error("permanent error, skipping message",
					"error", processingErr,
					"offset", msg.Offset,
					"partition", msg.Partition,
				)
				// Here you could send to a dead-letter topic
				break
			}

			slog.Warn("transient error, retrying",
				"error", processingErr,
				"attempt", attempt,
				"max_retries", maxRetries,
			)
			time.Sleep(time.Duration(attempt) * time.Second) // Linear backoff
		}

		// Commit even if it failed: avoids blocking the partition
		if err := reader.CommitMessages(ctx, msg); err != nil {
			slog.Error("error committing", "error", err)
		}
	}
}

The decision to commit even when processing fails is deliberate. If you don’t, the consumer gets stuck on the same message indefinitely. The alternative is to send failed messages to a dead-letter topic (DLT) before committing:

func sendToDeadLetter(ctx context.Context, writer *kafka.Writer, msg kafka.Message, err error) error {
	return writer.WriteMessages(ctx, kafka.Message{
		Key:   msg.Key,
		Value: msg.Value,
		Headers: append(msg.Headers,
			kafka.Header{Key: "original-topic", Value: []byte(msg.Topic)},
			kafka.Header{Key: "error", Value: []byte(err.Error())},
			kafka.Header{Key: "failed-at", Value: []byte(time.Now().UTC().Format(time.RFC3339))},
		),
	})
}

Graceful Shutdown with Context and System Signals

A Kafka consumer in production must close cleanly: stop reading messages, finish in-flight processing, and commit pending offsets. If the process dies abruptly, in-flight messages are reprocessed (not catastrophic if you’re idempotent, but wasteful).

The pattern is to intercept operating system signals and propagate cancellation through Go’s context:

// cmd/consumer/main.go
package main

import (
	"context"
	"log/slog"
	"os"
	"os/signal"
	"syscall"

	appkafka "myservice/internal/kafka"
	"myservice/internal/event"
)

func main() {
	slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelInfo,
	})))

	ctx, cancel := signal.NotifyContext(context.Background(),
		syscall.SIGINT,
		syscall.SIGTERM,
	)
	defer cancel()

	cfg := appkafka.DefaultConsumerConfig()
	cfg.Topic = "orders"
	cfg.GroupID = "order-processor"

	reader := appkafka.NewReader(cfg)
	defer func() {
		if err := reader.Close(); err != nil {
			slog.Error("error closing reader", "error", err)
		}
		slog.Info("kafka reader closed")
	}()

	processor := event.NewOrderProcessor()

	slog.Info("starting consumer",
		"topic", cfg.Topic,
		"group", cfg.GroupID,
		"brokers", cfg.Brokers,
	)

	if err := appkafka.Consume(ctx, reader, processor.Handle); err != nil {
		slog.Error("consumer error", "error", err)
		os.Exit(1)
	}

	slog.Info("consumer stopped gracefully")
}

signal.NotifyContext is the cleanest way to handle this in Go. It creates a context that cancels automatically when one of the specified signals arrives. When the context is cancelled:

  1. FetchMessage(ctx) returns an error.
  2. The loop detects ctx.Err() != nil and exits.
  3. defer reader.Close() closes the connection with Kafka.
  4. The process terminates cleanly.

In Kubernetes, the process will receive SIGTERM when the pod is about to die. You have 30 seconds by default (terminationGracePeriodSeconds) to shut down. With this pattern, shutdown takes milliseconds.

If your processing is slow and you need a worker pool to parallelize messages, the context propagates equally to each goroutine in the pool. When SIGTERM arrives, all goroutines receive the cancellation signal and finish their current work.


Docker Compose with Kafka for Local Development

To develop and test the consumer you need a local Kafka cluster. Docker Compose with KRaft (no Zookeeper) is the simplest option:

# docker-compose.yml
services:
  kafka:
    image: apache/kafka:3.8.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
    healthcheck:
      test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka-init:
    image: apache/kafka:3.8.0
    depends_on:
      kafka:
        condition: service_healthy
    entrypoint: ["/bin/sh", "-c"]
    command: |
      "
      /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --create --if-not-exists --topic orders --partitions 3 --replication-factor 1
      echo 'Topic orders created'
      "

Start it with docker compose up -d and you have a Kafka broker with an orders topic of 3 partitions ready to use.

To produce test messages from the terminal:

// cmd/producer/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	writer := &kafka.Writer{
		Addr:     kafka.TCP("localhost:9092"),
		Topic:    "orders",
		Balancer: &kafka.LeastBytes{},
	}
	defer writer.Close()

	for i := 1; i <= 10; i++ {
		event := map[string]interface{}{
			"order_id":   fmt.Sprintf("ORD-%03d", i),
			"user_id":    fmt.Sprintf("USR-%03d", i),
			"amount":     float64(i) * 29.99,
			"currency":   "EUR",
			"created_at": time.Now().UTC(),
		}

		value, _ := json.Marshal(event)

		err := writer.WriteMessages(context.Background(), kafka.Message{
			Key:   []byte(fmt.Sprintf("ORD-%03d", i)),
			Value: value,
		})
		if err != nil {
			log.Fatalf("error writing message: %v", err)
		}

		fmt.Printf("sent: %s\n", string(value))
	}
}

Testing with testcontainers

Unit tests are fine for business logic, but a Kafka consumer needs integration tests against a real Kafka. testcontainers-go lets you spin up a Kafka broker in a Docker container inside your tests:

// internal/kafka/consumer_test.go
package kafka_test

import (
	"context"
	"encoding/json"
	"testing"
	"time"

	"github.com/segmentio/kafka-go"
	tc "github.com/testcontainers/testcontainers-go"
	"github.com/testcontainers/testcontainers-go/modules/kafka"
)

func TestConsumer_ProcessesMessages(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	// Start Kafka in a container
	kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0",
		kafka.WithClusterID("test-cluster"),
	)
	if err != nil {
		t.Fatalf("starting kafka container: %v", err)
	}
	defer func() {
		if err := tc.TerminateContainer(kafkaContainer); err != nil {
			t.Logf("terminating container: %v", err)
		}
	}()

	brokers, err := kafkaContainer.Brokers(ctx)
	if err != nil {
		t.Fatalf("getting brokers: %v", err)
	}

	topic := "test-orders"

	// Create the topic
	conn, err := kafka.DialLeader(ctx, "tcp", brokers[0], topic, 0)
	if err != nil {
		// If it fails, try creating via the admin API
		adminConn, _ := kafka.Dial("tcp", brokers[0])
		defer adminConn.Close()
		adminConn.CreateTopics(kafka.TopicConfig{
			Topic:             topic,
			NumPartitions:     1,
			ReplicationFactor: 1,
		})
	} else {
		conn.Close()
	}

	// Produce a test message
	writer := &kafka.Writer{
		Addr:     kafka.TCP(brokers[0]),
		Topic:    topic,
		Balancer: &kafka.LeastBytes{},
	}

	event := map[string]interface{}{
		"order_id": "ORD-TEST-001",
		"user_id":  "USR-001",
		"amount":   99.99,
	}
	value, _ := json.Marshal(event)

	err = writer.WriteMessages(ctx, kafka.Message{
		Key:   []byte("ORD-TEST-001"),
		Value: value,
	})
	writer.Close()
	if err != nil {
		t.Fatalf("writing message: %v", err)
	}

	// Consume and verify
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  brokers,
		Topic:    topic,
		GroupID:  "test-group",
		MinBytes: 1,
		MaxBytes: 10e6,
	})
	defer reader.Close()

	msg, err := reader.ReadMessage(ctx)
	if err != nil {
		t.Fatalf("reading message: %v", err)
	}

	var received map[string]interface{}
	if err := json.Unmarshal(msg.Value, &received); err != nil {
		t.Fatalf("unmarshaling message: %v", err)
	}

	if received["order_id"] != "ORD-TEST-001" {
		t.Errorf("expected order_id ORD-TEST-001, got %v", received["order_id"])
	}
}

The test spins up a real Kafka container, produces a message, consumes it, and verifies the content. It takes a few seconds to start the container, but gives you real confidence that your consumer works against a real Kafka broker.

To run it:

go test -v -tags=integration ./internal/kafka/...

You can use build tags to separate integration tests from unit tests and not run them on every go test ./....


Production Considerations

A Kafka consumer that works locally is not the same as one ready for production. These are the points you cannot ignore:

Consumer Lag Monitoring

Consumer lag is the difference between the latest offset of the topic and the committed offset of your consumer group. If lag grows, your consumer isn’t processing messages at the rate they arrive.

Expose metrics from the kafka-go reader:

func reportMetrics(reader *kafka.Reader) {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()

	for range ticker.C {
		stats := reader.Stats()
		slog.Info("consumer stats",
			"messages", stats.Messages,
			"bytes", stats.Bytes,
			"rebalances", stats.Rebalances,
			"timeouts", stats.Timeouts,
			"errors", stats.Errors,
			"dial_time_avg", stats.DialTime.Avg,
			"read_time_avg", stats.ReadTime.Avg,
			"wait_time_avg", stats.WaitTime.Avg,
			"fetch_size_avg", stats.FetchSize.Avg,
			"fetch_bytes_avg", stats.FetchBytes.Avg,
			"offset", stats.Offset,
			"lag", stats.Lag,
		)
	}
}

In production, these metrics should go to Prometheus, Datadog, or your monitoring system. stats.Lag is the critical metric: configure an alert if it exceeds a threshold.

Horizontal Scaling

To scale consumers, simply launch more instances with the same GroupID. Kafka will automatically rebalance partitions. Remember:

  • Maximum consumers = number of partitions. If you have 6 partitions, don’t scale beyond 6 instances.
  • Rebalancing has a cost: Every time an instance joins or leaves the group, Kafka reassigns partitions. During rebalancing, consumption pauses briefly.
  • In Kubernetes: Use a Deployment (not a StatefulSet) and configure an HPA based on consumer lag.

Idempotency

I repeat it because it’s what’s most often forgotten: your handler must be idempotent. With at-least-once delivery, you will receive duplicate messages. This happens when:

  • The consumer processes a message but crashes before committing.
  • A rebalance reassigns a partition and messages are reprocessed.
  • Kafka has a bug (rare, but it happens).

Idempotency strategies:

  • Unique key: Save the order_id (or the natural event identifier) and do an upsert instead of an insert.
  • Deduplication table: Save processed offsets and check before processing.
  • Idempotent operations by design: SET balance = 100 is idempotent, SET balance = balance + 100 is not.

Timeouts and Deadlines

Each message should be processed with a timeout. If a handler hangs, it blocks that partition:

func handleWithTimeout(ctx context.Context, msg kafka.Message, handler MessageHandler, timeout time.Duration) error {
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	return handler(ctx, msg)
}

A 30-second timeout per message is a good starting point. Adjust according to your case.

Health Checks

In Kubernetes, your consumer needs a health endpoint. You can spin up a minimal HTTP server in parallel:

func startHealthServer(ctx context.Context, port string) {
	mux := http.NewServeMux()
	mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("ok"))
	})

	server := &http.Server{Addr: ":" + port, Handler: mux}

	go func() {
		<-ctx.Done()
		shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		server.Shutdown(shutdownCtx)
	}()

	if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
		slog.Error("health server error", "error", err)
	}
}

Launch it in a goroutine from main() and configure Kubernetes probes to point to /health.


Final Project Structure

Putting it all together, the project looks like this:

kafka-consumer/
├── cmd/
│   ├── consumer/
│   │   └── main.go          # Consumer entry point
│   └── producer/
│       └── main.go          # Test producer
├── internal/
│   ├── event/
│   │   └── order.go         # Order event handler
│   └── kafka/
│       ├── consumer.go       # Configuration and consume loop
│       ├── consumer_test.go  # Integration tests
│       └── errors.go         # Error types
├── docker-compose.yml
├── go.mod
└── go.sum

If you come from an article about workers in Go, you’ll see the structure is almost identical. And it’s no coincidence: a Kafka consumer is, in essence, a worker that receives work from a topic instead of from an in-memory queue.


Kafka Without the JVM Complexity

A Kafka consumer in Go is built with a few pieces that fit together naturally: a segmentio/kafka-go reader with consumer group and manual commit, a consume loop based on FetchMessage + CommitMessages for full offset control, and typed handlers that deserialize the message and execute business logic. The rest is classifying errors between transient and permanent, ensuring graceful shutdown with signal.NotifyContext, and designing idempotent handlers because at-least-once delivery implies duplicates.

Go doesn’t have the Kafka ecosystem of JVM. There’s no Kafka Streams, no Spring Kafka with its 47 annotations. But for backend consumers (which are the majority of cases), what it offers is sufficient and considerably simpler. A 15 MB binary, 30 MB of RAM, startup in milliseconds, and native concurrency to parallelize processing.

What I like about this approach is that the consumer ends up being just another worker. If you come from the workers in Go article, you’ll have seen that the structure is almost identical. The difference is where the work comes from, not how you process it. And that’s exactly what you want: reusable patterns that adapt to the transport without rewriting the logic.

If you need to process Kafka messages with real concurrency, combine what you’ve seen here with a worker pool: the reader feeds a channel, the pool processes messages in parallel, and the commit is made when all messages in the batch are processed.

OshyTech

Backend and data engineering focused on scalable systems, automation, and AI.

Navigation

Copyright 2026 OshyTech. All Rights Reserved