Vamos construir um sistema realista:
- Serviço A (Orders): recebe um pedido via API e publica
OrderCreatedno Kafka. - Serviço B (Notifications): consome
OrderCreatede envia e-mail (simulado). - Implementaremos: retry com backoff e DLQ.
Arquitetura
flowchart LR C[Cliente] -->|"POST"| A["Serviço A<br/>(Pedidos)"] A -->|"publish<br/>Topic: pedidos"| K[Kafka] K -->|"subscribe"| B["Serviço B<br/>(Notificações)"] B -->|"falhou?"| DLQ["Topic: pedidos.dlq"]
Serviço A — Producer
go
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"time"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
)
type Order struct {
OrderID string `json:"order_id"`
Customer string `json:"customer"`
Email string `json:"email"`
Amount float64 `json:"amount"`
CreatedAt time.Time `json:"created_at"`
}
var writer *kafka.Writer
func main() {
writer = &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "pedidos",
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequireAll,
}
defer writer.Close()
http.HandleFunc("/orders", createOrder)
log.Println("Serviço A rodando em :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func createOrder(w http.ResponseWriter, r *http.Request) {
var p Order
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
http.Error(w, "invalid payload", 400)
return
}
p.OrderID = uuid.New().String()
p.CreatedAt = time.Now()
payload, _ := json.Marshal(p)
msg := kafka.Message{
Key: []byte(p.OrderID),
Value: payload,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte("OrderCreated")},
{Key: "event-id", Value: []byte(uuid.New().String())},
},
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
if err := writer.WriteMessages(ctx, msg); err != nil {
log.Printf("erro ao publicar: %v", err)
http.Error(w, "erro interno", 500)
return
}
w.WriteHeader(201)
json.NewEncoder(w).Encode(p)
log.Printf("pedido %s publicado", p.OrderID)
}Serviço B — Consumer com Retry e DLQ
go
package main
import (
"context"
"encoding/json"
"errors"
"log"
"math"
"math/rand"
"time"
"github.com/segmentio/kafka-go"
)
type Order struct {
OrderID string `json:"order_id"`
Customer string `json:"customer"`
Email string `json:"email"`
Amount float64 `json:"amount"`
}
const (
maxRetries = 3
)
var dlqWriter *kafka.Writer
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "pedidos",
GroupID: "notificacoes",
})
defer reader.Close()
dlqWriter = &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "pedidos.dlq",
RequiredAcks: kafka.RequireAll,
}
defer dlqWriter.Close()
log.Println("Serviço B (notificações) iniciado")
for {
msg, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("erro ao ler: %v", err)
continue
}
if err := processWithRetry(msg); err != nil {
// todas as tentativas falharam → DLQ
log.Printf("esgotaram retries, enviando pra DLQ: %v", err)
if dlqErr := enviarParaDLQ(msg, err); dlqErr != nil {
log.Printf("falha ao enviar pra DLQ: %v", dlqErr)
// NÃO comita: vai tentar de novo na próxima execução
continue
}
}
// Comita offset (sucesso OU enviou pra DLQ)
if err := reader.CommitMessages(context.Background(), msg); err != nil {
log.Printf("erro ao comitar: %v", err)
}
}
}
// processWithRetry tenta processar com backoff exponencial + jitter
func processWithRetry(msg kafka.Message) error {
var lastErr error
for tentativa := 0; tentativa < maxRetries; tentativa++ {
if tentativa > 0 {
delay := backoffComJitter(tentativa)
log.Printf("tentativa %d/%d em %v", tentativa+1, maxRetries, delay)
time.Sleep(delay)
}
err := processMessage(msg)
if err == nil {
return nil // sucesso
}
// Se for erro permanente, não vale a pena retentar
if errors.Is(err, ErrPayloadInvalido) {
return err
}
lastErr = err
log.Printf("falhou tentativa %d: %v", tentativa+1, err)
}
return lastErr
}
// backoffComJitter: 1s, 2s, 4s, 8s... + aleatoriedade
func backoffComJitter(tentativa int) time.Duration {
base := math.Pow(2, float64(tentativa-1))
jitter := rand.Float64() // 0 a 1s
return time.Duration((base + jitter) * float64(time.Second))
}
var ErrPayloadInvalido = errors.New("payload inválido")
func processMessage(msg kafka.Message) error {
var p Order
if err := json.Unmarshal(msg.Value, &p); err != nil {
return ErrPayloadInvalido
}
// Aqui entraria a lógica real (enviar e-mail via SMTP, etc)
return enviarEmail(p)
}
func enviarEmail(p Order) error {
// simulação: 30% das vezes "falha"
if rand.Float64() < 0.3 {
return errors.New("smtp temporariamente indisponível")
}
log.Printf("e-mail enviado pra %s sobre pedido %s", p.Email, p.OrderID)
return nil
}
func enviarParaDLQ(msg kafka.Message, processErr error) error {
headers := append(msg.Headers,
kafka.Header{Key: "x-error", Value: []byte(processErr.Error())},
kafka.Header{Key: "x-original-topic", Value: []byte("pedidos")},
kafka.Header{Key: "x-failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
)
dlqMsg := kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: headers,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return dlqWriter.WriteMessages(ctx, dlqMsg)
}Como Rodar Localmente
docker-compose.yml (Kafka em modo KRaft, sem ZooKeeper):
yaml
version: '3.8'
services:
kafka:
image: bitnami/kafka:3.6
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTbash
docker compose up -d
# Cria os topics
docker exec -it kafka kafka-topics.sh --create --topic pedidos --partitions 3 --bootstrap-server localhost:9092
docker exec -it kafka kafka-topics.sh --create --topic pedidos.dlq --partitions 1 --bootstrap-server localhost:9092
# Em terminais separados:
go run servico-a/main.go
go run servico-b/main.go
# Manda um pedido
curl -X POST http://localhost:8080/pedidos \
-H 'Content-Type: application/json' \
-d '{"customer":"João","email":"joao@email.com","amount":99.90}'