M03·06Exemplos Práticos com Golang

CAPÍTULO 06

Exemplos Práticos com Golang

Producer e consumer completos com kafka-go, com controle manual de offset para garantia at-least-once.

Por Thiago Souza8 min de leituraAtualizado em 2026-05

Vamos usar a biblioteca segmentio/kafka-go — é idiomática e fácil de ler.

bash
go get github.com/segmentio/kafka-go

Producer Simples

go
package main
 
import (
    "context"
    "encoding/json"
    "log"
    "time"
 
    "github.com/segmentio/kafka-go"
)
 
type OrderCreated struct {
    OrderID  string  `json:"order_id"`
    Customer string  `json:"customer"`
    Amount   float64 `json:"amount"`
}
 
func main() {
    // Cria um writer (producer)
    writer := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "pedidos",
        Balancer:     &kafka.Hash{}, // usa hash da Key pra escolher partição
        RequiredAcks: kafka.RequireAll, // acks=all → at-least-once
    }
    defer writer.Close()
 
    order := OrderCreated{
        OrderID:  "pedido-123",
        Customer: "joao@email.com",
        Amount:   250.00,
    }
 
    payload, err := json.Marshal(order)
    if err != nil {
        log.Fatal("erro ao serializar:", err)
    }
 
    msg := kafka.Message{
        Key:   []byte(order.OrderID), // chave garante ordem
        Value: payload,
        Time:  time.Now(),
    }
 
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
 
    if err := writer.WriteMessages(ctx, msg); err != nil {
        log.Fatal("erro ao publicar:", err)
    }
 
    log.Printf("pedido publicado: %s", order.OrderID)
}

Pontos a destacar:

  • RequiredAcks: kafka.RequireAll → garante que todas as réplicas receberam.
  • Key: []byte(order.OrderID) → todos os eventos do mesmo pedido vão pra mesma partição.
  • Balancer: &kafka.Hash{} → usa hash da key.

Consumer Simples

go
package main
 
import (
    "context"
    "encoding/json"
    "log"
 
    "github.com/segmentio/kafka-go"
)
 
type OrderCreated struct {
    OrderID  string  `json:"order_id"`
    Customer string  `json:"customer"`
    Amount   float64 `json:"amount"`
}
 
func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"localhost:9092"},
        Topic:    "pedidos",
        GroupID:  "faturamento", // consumer group
        MinBytes: 1,
        MaxBytes: 10e6,
    })
    defer reader.Close()
 
    log.Println("consumer iniciado, aguardando mensagens...")
 
    for {
        // Lê mensagem (NÃO comita offset automaticamente)
        msg, err := reader.FetchMessage(context.Background())
        if err != nil {
            log.Printf("erro ao ler: %v", err)
            continue
        }
 
        var order OrderCreated
        if err := json.Unmarshal(msg.Value, &order); err != nil {
            log.Printf("payload inválido, vai pra DLQ: %v", err)
            // (na vida real: enviaria pra DLQ aqui)
            // mesmo assim comita pra não travar
            reader.CommitMessages(context.Background(), msg)
            continue
        }
 
        // Processa
        if err := processOrder(order); err != nil {
            log.Printf("erro ao processar pedido %s: %v", order.OrderID, err)
            // não comita → próxima execução vai retentar
            continue
        }
 
        // Só comita depois de processar com sucesso
        if err := reader.CommitMessages(context.Background(), msg); err != nil {
            log.Printf("erro ao comitar: %v", err)
        }
 
        log.Printf("processado: %s (offset=%d, partition=%d)",
            order.OrderID, msg.Offset, msg.Partition)
    }
}
 
func processOrder(p OrderCreated) error {
    // sua lógica aqui (gerar nota, atualizar banco, etc)
    log.Printf("processando pedido %s do cliente %s, valor R$%.2f",
        p.OrderID, p.Customer, p.Amount)
    return nil
}

Pontos críticos:

  • Usamos FetchMessage + CommitMessages em vez de ReadMessage (que comita automaticamente). Isso nos dá controle.
  • Comitamos só após processar com sucesso → at-least-once.
  • Se processOrder falhar, não comitamos → na próxima leitura, voltamos pra mesma mensagem.