Processamento assíncrono com filas em Go: RabbitMQ, Kafka e NATS na prática

Toda API que escala encontra o mesmo problema: tarefas demoradas travam a resposta. Enviar e-mail, processar pagamento, gerar relatório, notificar outros serviços — nenhum cliente quer esperar segundos por isso. A solução é o processamento assíncrono via filas (message queues).

Em Go, a combinação é especialmente poderosa: goroutines leves consomem mensagens de forma concorrente, o broker cuida da persistência e entrega, e sua API principal responde em milissegundos. Mas qual broker escolher? RabbitMQ, Kafka ou NATS? E como garantir que cada mensagem seja processada exatamente uma vez, mesmo com falhas?

Neste post, vamos comparar os três principais brokers no ecossistema Go, mostrar implementações práticas e as armadilhas que você precisa evitar — com casos reais da Jacobus Software.

Por que usar filas em Go?

Antes de mergulhar nos brokers, vamos entender o valor:

  • Desacoplamento: o produtor não precisa saber quem consome.
  • Resiliência: se o consumidor cair, a mensagem fica na fila.
  • Elasticidade: aumente consumidores Go sob demanda.
  • Flow control: evite que picos de tráfego derrubem seus serviços.

Um exemplo clássico: usuário faz checkout, sistema publica evento PedidoCriado, e múltiplos consumidores (estoque, pagamento, notificação, logística) processam em paralelo. A API de checkout responde em 50ms, não em 5 segundos.

Comparativo: RabbitMQ vs Kafka vs NATS

CaracterísticaRabbitMQKafkaNATS (com JetStream)
ArquiteturaBroker inteligente (roteamento complexo)Log distribuído (particionado)Broker leve + JetStream para persistência
Garantia de entregaAt-most-once / At-least-onceAt-least-once (exactly-once com idempotência)At-least-once / Exactly-once (com idempotência)
PersistênciaSim (configurável)Sim (por padrão)Sim (JetStream)
RoteamentoExchanges, bindings, topicsTopics (particionados)Subjects (wildcard > e *)
ConsumoPush / PullPull (consumidores com grupo)Push / Pull (JetStream)
Ordem de mensagensGarantida por filaGarantida por partiçãoGarantida por stream
Throughput~50k msg/s~100k+ msg/sAlto ( > 100k msg/s)
Latência (P99)Baixa (~ms)Baixa (~ms)Muito baixa (sub-ms)
Complexidade operacionalMédiaAltaBaixa
Go client madurostreadway/amqpconfluent-kafka-go ou segmentio/kafka-gonats.go (oficial)

Quando escolher cada um?

  • NATS: Performance máxima, latência mínima, simplicidade. Para comunicação intra-serviço e IoT. NATS JetStream adiciona persistência Kafka-like. “NATS wins on speed and simplicity”.
  • RabbitMQ: Flexibilidade de roteamento e confiabilidade. “RabbitMQ excels at reliability and complex routing”. Para fluxos empresariais e mensageria entre sistemas heterogêneos.
  • Kafka: Escala massiva e replay histórico. “Kafka dominates in scalability and data streaming”. Para event sourcing, pipelines de dados e black friday.

Implementação prática em Go

1. NATS (recomendado da Jacobus para baixa latência)

NATS é escrito em Go, tem API nativa e é perfeito para microsserviços Go.

Produtor:

package main

import (
    "log"
    "github.com/nats-io/nats.go"
)

type OrderEvent struct {
    OrderID string `json:"order_id"`
    Status  string `json:"status"`
}

func main() {
    // Conecta ao NATS (pode ser embedado no binário Go)
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Publica mensagem
    data := []byte(`{"order_id":"123","status":"created"}`)
    err = nc.Publish("orders.created", data)
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Mensagem publicada")
}

Consumidor com JetStream (persistente e tolerante a falhas):

package main

import (
    "log"
    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream(nats.PublishAsync)
    if err != nil {
        log.Fatal(err)
    }

    // Cria stream (como tópico Kafka)
    _, err = js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"orders.*"},
        Storage:  nats.FileStorage, // persistente
    })
    if err != nil {
        log.Println("Stream pode já existir")
    }

    // Consome mensagens
    sub, err := js.Subscribe("orders.*", func(msg *nats.Msg) {
        log.Printf("Processando: %s", string(msg.Data))
        // Processamento idempotente aqui
        msg.Ack() // confirma processamento
    }, nats.ManualAck())
    if err != nil {
        log.Fatal(err)
    }
    defer sub.Unsubscribe()

    select {} // mantém rodando
}

2. RabbitMQ (roteamento flexível)

Para cenários que exigem roteamento complexo (exchanges, bindings, tópicos dinâmicos).

Produtor:

package main

import (
    "log"
    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err)
    }
    defer ch.Close()

    // Declara fila
    q, err := ch.QueueDeclare("orders", true, false, false, false, nil)
    if err != nil {
        log.Fatal(err)
    }

    // Publica mensagem
    err = ch.Publish("", q.Name, false, false, amqp.Publishing{
        ContentType: "application/json",
        Body:        []byte(`{"order_id":"123"}`),
    })
    if err != nil {
        log.Fatal(err)
    }
}

Consumidor com worker pool em Go:

func consumeWithWorkerPool(conn *amqp.Connection, queueName string, workers int) {
    ch, _ := conn.Channel()
    ch.Qos(1, 0, false) // prefetch = 1 (fair dispatch)

    msgs, _ := ch.Consume(queueName, "", false, false, false, false, nil)

    // Worker pool com goroutines
    for i := 0; i < workers; i++ {
        go func(workerID int) {
            for msg := range msgs {
                log.Printf("Worker %d processando: %s", workerID, msg.Body)
                // processa...
                msg.Ack(false) // confirma manual
            }
        }(i)
    }
    select {}
}

3. Kafka (alta throughput)

Para pipelines de dados, event sourcing e cenários de replay.

Produtor:

package main

import (
    "log"
    "github.com/segmentio/kafka-go"
    "context"
)

func main() {
    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "orders",
    })
    defer writer.Close()

    err := writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("order_123"),
            Value: []byte(`{"order_id":"123","status":"created"}`),
        },
    )
    if err != nil {
        log.Fatal(err)
    }
}

Consumidor com grupo de consumidores:

reader := kafka.NewReader(kafka.ReaderConfig{
    Brokers: []string{"localhost:9092"},
    Topic:   "orders",
    GroupID: "order-processor",
})

for {
    msg, err := reader.ReadMessage(context.Background())
    if err != nil {
        log.Printf("erro: %v", err)
        continue
    }
    log.Printf("Mensagem: %s", string(msg.Value))
    // processa de forma idempotente
    // reader.CommitMessages(...) após processar
}

Padrões de consumo para Go

1. Worker Pool com canais

Combine goroutines com canais para escalar consumo de forma controlada:

type WorkerPool struct {
    taskQueue chan Message
    workers   int
}

func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        go wp.worker()
    }
}

func (wp *WorkerPool) worker() {
    for msg := range wp.taskQueue {
        wp.processMessage(msg)
    }
}

2. Backpressure e rate limiting

Evite sobrecarregar serviços downstream com time.Ticker ou semáforos:

limiter := time.Tick(100 * time.Millisecond) // 10 msg/s
for msg := range messages {
    <-limiter
    go process(msg)
}

3. Graceful shutdown

Em Go, use context e sync.WaitGroup para drenar mensagens antes de desligar:

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

var wg sync.WaitGroup
for i := 0; i < workers; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        for msg := range consumer.Messages() {
            select {
            case <-ctx.Done():
                consumer.Close()
                return
            default:
                process(msg)
            }
        }
    }()
}
<-ctx.Done()
wg.Wait() // espera workers terminarem

Idempotência: o guardião contra duplicação

A maioria dos brokers entrega mensagens “pelo menos uma vez” (at-least-once). Isso significa que, em caso de falhas ou reinicializações, a mesma mensagem pode ser entregue múltiplas vezes.

Para uma operação financeira, como uma cobrança, uma duplicata pode ser catastrófica. Processar um pedido de estoque duas vezes gera problemas sérios. A solução é garantir que o processamento seja idempotente: processar a mesma mensagem duas vezes tenha o mesmo efeito que processá-la uma única vez.

Estratégias de idempotência em Go

1. ID único de mensagem + Redis SETNX

Cada mensagem carrega um ID único. Antes de processar, verifique se ele já foi processado:

import "github.com/redis/go-redis/v9"

func processWithIdempotency(ctx context.Context, msg Message, processFunc func() error) error {
    key := "processed:" + msg.ID

    // Tenta marcar como processado (apenas se não existir)
    ok, err := rdb.SetNX(ctx, key, "1", 24*time.Hour).Result()
    if err != nil {
        return err
    }
    if !ok {
        // Já foi processado antes
        log.Printf("Mensagem %s já processada, ignorando", msg.ID)
        return nil
    }

    // Primeira vez: processa
    return processFunc()
}

2. Tabela de idempotência no PostgreSQL (para atomicidade com transações)

Para operações que escrevem no banco, use uma tabela de idempotência na mesma transação:

func processOrder(ctx context.Context, tx *sql.Tx, msg OrderMessage) error {
    // Tabela idempotency_keys com UNIQUE(idempotency_key)
    _, err := tx.ExecContext(ctx,
        "INSERT INTO idempotency_keys (key, created_at) VALUES ($1, NOW())",
        msg.IdempotencyKey,
    )
    if err != nil {
        if isUniqueViolation(err) {
            return nil // já processado
        }
        return err
    }

    // Atualiza pedido (mesma transação)
    _, err = tx.ExecContext(ctx,
        "UPDATE orders SET status = $1 WHERE id = $2",
        msg.Status, msg.OrderID,
    )
    return err
}

3. Idempotência na lógica de negócio

Às vezes, a própria lógica já é idempotente: definir um status para “pago” ou marcar “enviado” não causa dano em repetição.

Watermill: uma abstração unificada

Implementar cada broker diretamente é trabalhoso. Watermill é uma biblioteca Go open-source que abstrai a complexidade dos principais brokers, oferecendo uma API unificada. Ela suporta 13 Pub/Subs, incluindo Kafka, Redis Streams, NATS, RabbitMQ e Google Pub/Sub.

import "github.com/ThreeDotsLabs/watermill"

pubSub, _ := googlecloud.NewPublisher(googlecloud.PublisherConfig{...})
// o mesmo código funciona com Kafka, RabbitMQ, NATS...

Caso real: processamento de pedidos com NATS na Jacobus

Um cliente do varejo online enfrentava picos de tráfego 10x maiores durante promoções. O checkout síncrono travava, gerando abandono de carrinho.

Solução com NATS + Go:

  1. API de checkout publica evento OrderCreated no NATS JetStream.
  2. Três consumidores Go processam em paralelo:
    • Pagamento (idempotente via Redis SETNX)
    • Estoque (worker pool de 50 goroutines)
    • Notificação (envio de e-mail assíncrono)

Resultado: checkout responde em 50ms (antes 2s). Zero perda de mensagens mesmo com pico de 50k pedidos/minuto. Migração concluída em 2 semanas.

Erros comuns e como evitá-los

Ack antes de processar: se o processamento falhar, a mensagem é perdida. Sempre confirme (Ack) após o processamento bem-sucedido.

Message fetch size muito alto: consumidores Go podem travar ao baixar mensagens grandes. Use prefetch limitado (ch.Qos(1, 0, false) no RabbitMQ).

Goroutines infinitas sem controle: para cada mensagem, uma goroutine pode vazar. Use worker pool.

Ignorar dead-letter queues: mensagens que falham repetidamente devem ir para uma fila de dead-letter para análise posterior.

Conclusão: escolha o broker certo, implemente com Go

Cada broker tem seu lugar:

  • NATS + JetStream: a escolha padrão para novos projetos na Jacobus. Performance, simplicidade e persistência.
  • RabbitMQ: roteamento complexo e sistemas empresariais.
  • Kafka: pipelines massivos e event sourcing.

Go, com suas goroutines e ecossistema maduro, é a linguagem ideal para consumir filas de forma concorrente e eficiente. Combine com idempotência e worker pools, e seu sistema estará pronto para escalar sem sustos.

Na Jacobus Software, usamos NATS como espinha dorsal de comunicação assíncrona em Go. O resultado: sistemas que processam milhões de eventos por dia com latência de milissegundos e operação simples.


📨 Quer processamento assíncrono robusto no seu sistema Go?

Projetamos arquiteturas com RabbitMQ, Kafka ou NATS que desacoplam seus serviços e garantem resiliência. Fale com a Jacobus.

👉 Fale com a Jacobus Software

Rolar para cima