M03·07Projeto Guiado: Serviço A → Kafka → Serviço B

CAPÍTULO 07

Projeto Guiado: Serviço A → Kafka → Serviço B

Dois serviços conectados via Kafka com retry exponencial, DLQ e docker-compose para rodar localmente.

Por Thiago Souza14 min de leituraAtualizado em 2026-05

Vamos construir um sistema realista:

  • Serviço A (Orders): recebe um pedido via API e publica OrderCreated no Kafka.
  • Serviço B (Notifications): consome OrderCreated e envia e-mail (simulado).
  • Implementaremos: retry com backoff e DLQ.

Arquitetura

flowchart LR
  C[Cliente] -->|"POST"| A["Serviço A<br/>(Pedidos)"]
  A -->|"publish<br/>Topic: pedidos"| K[Kafka]
  K -->|"subscribe"| B["Serviço B<br/>(Notificações)"]
  B -->|"falhou?"| DLQ["Topic: pedidos.dlq"]

Serviço A — Producer

go
package main
 
import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"
 
    "github.com/google/uuid"
    "github.com/segmentio/kafka-go"
)
 
type Order struct {
    OrderID   string    `json:"order_id"`
    Customer  string    `json:"customer"`
    Email     string    `json:"email"`
    Amount    float64   `json:"amount"`
    CreatedAt time.Time `json:"created_at"`
}
 
var writer *kafka.Writer
 
func main() {
    writer = &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "pedidos",
        Balancer:     &kafka.Hash{},
        RequiredAcks: kafka.RequireAll,
    }
    defer writer.Close()
 
    http.HandleFunc("/orders", createOrder)
    log.Println("Serviço A rodando em :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}
 
func createOrder(w http.ResponseWriter, r *http.Request) {
    var p Order
    if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
        http.Error(w, "invalid payload", 400)
        return
    }
    p.OrderID = uuid.New().String()
    p.CreatedAt = time.Now()
 
    payload, _ := json.Marshal(p)
 
    msg := kafka.Message{
        Key:   []byte(p.OrderID),
        Value: payload,
        Headers: []kafka.Header{
            {Key: "event-type", Value: []byte("OrderCreated")},
            {Key: "event-id", Value: []byte(uuid.New().String())},
        },
    }
 
    ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
    defer cancel()
 
    if err := writer.WriteMessages(ctx, msg); err != nil {
        log.Printf("erro ao publicar: %v", err)
        http.Error(w, "erro interno", 500)
        return
    }
 
    w.WriteHeader(201)
    json.NewEncoder(w).Encode(p)
    log.Printf("pedido %s publicado", p.OrderID)
}

Serviço B — Consumer com Retry e DLQ

go
package main
 
import (
    "context"
    "encoding/json"
    "errors"
    "log"
    "math"
    "math/rand"
    "time"
 
    "github.com/segmentio/kafka-go"
)
 
type Order struct {
    OrderID  string  `json:"order_id"`
    Customer string  `json:"customer"`
    Email    string  `json:"email"`
    Amount   float64 `json:"amount"`
}
 
const (
    maxRetries = 3
)
 
var dlqWriter *kafka.Writer
 
func main() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "pedidos",
        GroupID: "notificacoes",
    })
    defer reader.Close()
 
    dlqWriter = &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "pedidos.dlq",
        RequiredAcks: kafka.RequireAll,
    }
    defer dlqWriter.Close()
 
    log.Println("Serviço B (notificações) iniciado")
 
    for {
        msg, err := reader.FetchMessage(context.Background())
        if err != nil {
            log.Printf("erro ao ler: %v", err)
            continue
        }
 
        if err := processWithRetry(msg); err != nil {
            // todas as tentativas falharam → DLQ
            log.Printf("esgotaram retries, enviando pra DLQ: %v", err)
            if dlqErr := enviarParaDLQ(msg, err); dlqErr != nil {
                log.Printf("falha ao enviar pra DLQ: %v", dlqErr)
                // NÃO comita: vai tentar de novo na próxima execução
                continue
            }
        }
 
        // Comita offset (sucesso OU enviou pra DLQ)
        if err := reader.CommitMessages(context.Background(), msg); err != nil {
            log.Printf("erro ao comitar: %v", err)
        }
    }
}
 
// processWithRetry tenta processar com backoff exponencial + jitter
func processWithRetry(msg kafka.Message) error {
    var lastErr error
 
    for tentativa := 0; tentativa < maxRetries; tentativa++ {
        if tentativa > 0 {
            delay := backoffComJitter(tentativa)
            log.Printf("tentativa %d/%d em %v", tentativa+1, maxRetries, delay)
            time.Sleep(delay)
        }
 
        err := processMessage(msg)
        if err == nil {
            return nil // sucesso
        }
 
        // Se for erro permanente, não vale a pena retentar
        if errors.Is(err, ErrPayloadInvalido) {
            return err
        }
 
        lastErr = err
        log.Printf("falhou tentativa %d: %v", tentativa+1, err)
    }
 
    return lastErr
}
 
// backoffComJitter: 1s, 2s, 4s, 8s... + aleatoriedade
func backoffComJitter(tentativa int) time.Duration {
    base := math.Pow(2, float64(tentativa-1))
    jitter := rand.Float64() // 0 a 1s
    return time.Duration((base + jitter) * float64(time.Second))
}
 
var ErrPayloadInvalido = errors.New("payload inválido")
 
func processMessage(msg kafka.Message) error {
    var p Order
    if err := json.Unmarshal(msg.Value, &p); err != nil {
        return ErrPayloadInvalido
    }
 
    // Aqui entraria a lógica real (enviar e-mail via SMTP, etc)
    return enviarEmail(p)
}
 
func enviarEmail(p Order) error {
    // simulação: 30% das vezes "falha"
    if rand.Float64() < 0.3 {
        return errors.New("smtp temporariamente indisponível")
    }
    log.Printf("e-mail enviado pra %s sobre pedido %s", p.Email, p.OrderID)
    return nil
}
 
func enviarParaDLQ(msg kafka.Message, processErr error) error {
    headers := append(msg.Headers,
        kafka.Header{Key: "x-error", Value: []byte(processErr.Error())},
        kafka.Header{Key: "x-original-topic", Value: []byte("pedidos")},
        kafka.Header{Key: "x-failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
    )
 
    dlqMsg := kafka.Message{
        Key:     msg.Key,
        Value:   msg.Value,
        Headers: headers,
    }
 
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    return dlqWriter.WriteMessages(ctx, dlqMsg)
}

Como Rodar Localmente

docker-compose.yml (Kafka em modo KRaft, sem ZooKeeper):

yaml
version: '3.8'
services:
  kafka:
    image: bitnami/kafka:3.6
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
bash
docker compose up -d
 
# Cria os topics
docker exec -it kafka kafka-topics.sh --create --topic pedidos --partitions 3 --bootstrap-server localhost:9092
docker exec -it kafka kafka-topics.sh --create --topic pedidos.dlq --partitions 1 --bootstrap-server localhost:9092
 
# Em terminais separados:
go run servico-a/main.go
go run servico-b/main.go
 
# Manda um pedido
curl -X POST http://localhost:8080/pedidos \
  -H 'Content-Type: application/json' \
  -d '{"customer":"João","email":"joao@email.com","amount":99.90}'