Vamos usar a biblioteca segmentio/kafka-go — é idiomática e fácil de ler.
bash
go get github.com/segmentio/kafka-goProducer Simples
go
package main
import (
"context"
"encoding/json"
"log"
"time"
"github.com/segmentio/kafka-go"
)
type OrderCreated struct {
OrderID string `json:"order_id"`
Customer string `json:"customer"`
Amount float64 `json:"amount"`
}
func main() {
// Cria um writer (producer)
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "pedidos",
Balancer: &kafka.Hash{}, // usa hash da Key pra escolher partição
RequiredAcks: kafka.RequireAll, // acks=all → at-least-once
}
defer writer.Close()
order := OrderCreated{
OrderID: "pedido-123",
Customer: "joao@email.com",
Amount: 250.00,
}
payload, err := json.Marshal(order)
if err != nil {
log.Fatal("erro ao serializar:", err)
}
msg := kafka.Message{
Key: []byte(order.OrderID), // chave garante ordem
Value: payload,
Time: time.Now(),
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := writer.WriteMessages(ctx, msg); err != nil {
log.Fatal("erro ao publicar:", err)
}
log.Printf("pedido publicado: %s", order.OrderID)
}Pontos a destacar:
RequiredAcks: kafka.RequireAll→ garante que todas as réplicas receberam.Key: []byte(order.OrderID)→ todos os eventos do mesmo pedido vão pra mesma partição.Balancer: &kafka.Hash{}→ usa hash da key.
Consumer Simples
go
package main
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
)
type OrderCreated struct {
OrderID string `json:"order_id"`
Customer string `json:"customer"`
Amount float64 `json:"amount"`
}
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "pedidos",
GroupID: "faturamento", // consumer group
MinBytes: 1,
MaxBytes: 10e6,
})
defer reader.Close()
log.Println("consumer iniciado, aguardando mensagens...")
for {
// Lê mensagem (NÃO comita offset automaticamente)
msg, err := reader.FetchMessage(context.Background())
if err != nil {
log.Printf("erro ao ler: %v", err)
continue
}
var order OrderCreated
if err := json.Unmarshal(msg.Value, &order); err != nil {
log.Printf("payload inválido, vai pra DLQ: %v", err)
// (na vida real: enviaria pra DLQ aqui)
// mesmo assim comita pra não travar
reader.CommitMessages(context.Background(), msg)
continue
}
// Processa
if err := processOrder(order); err != nil {
log.Printf("erro ao processar pedido %s: %v", order.OrderID, err)
// não comita → próxima execução vai retentar
continue
}
// Só comita depois de processar com sucesso
if err := reader.CommitMessages(context.Background(), msg); err != nil {
log.Printf("erro ao comitar: %v", err)
}
log.Printf("processado: %s (offset=%d, partition=%d)",
order.OrderID, msg.Offset, msg.Partition)
}
}
func processOrder(p OrderCreated) error {
// sua lógica aqui (gerar nota, atualizar banco, etc)
log.Printf("processando pedido %s do cliente %s, valor R$%.2f",
p.OrderID, p.Customer, p.Amount)
return nil
}Pontos críticos:
- Usamos
FetchMessage+CommitMessagesem vez deReadMessage(que comita automaticamente). Isso nos dá controle. - Comitamos só após processar com sucesso → at-least-once.
- Se
processOrderfalhar, não comitamos → na próxima leitura, voltamos pra mesma mensagem.