Como ultilizar o Ack do RabbitMQ de forma eficiente.

RMAG news

A Motivação por Trás Deste Artigo

Recentemente, participei de um debate no ambiente de trabalho sobre como lidar com o reconhecimento da leitura de mensagens no RabbitMQ.

Alguns colegas argumentaram que baseados em experiências anteriores a melhor abordagem para essa aplicação seria usar o auto-ack. Eles relataram que, ao utilizar o ack manual, um erro inesperado ocorreu durante o processamento da mensagem, resultando na sua volta para a fila e em seu reprocessamento várias vezes.

Inspirado por essa discussão e por este post do blog de Luiz Carlos Faria, decidi escrever sobre por que provavelmente esse comportamento ocorreu e qual, na minha visão, seria a melhor maneira de lidar com o reconhecimento de leitura dessas mensagens e o tratamento adequado em caso de erros.

Para chegar ao assunto principal, preciso explicar o que é, e o que o RabbitMQ nos oferce.

O RabbitMQ é um poderoso message broker que facilita a comunicação entre diferentes partes de um software. O RabbitMQ oferece diversos recursos para lidar com a publicação e o recebimento de mensagens. Os principais recursos abordados neste artigo são:

Queues
Exchanges
Acknowledgement
Dead Letter Exchange

Queues:

A principal função de uma fila é evitar a execução imediata de uma tarefa que consome muitos recursos e evitar a necessidade de esperar que ela seja concluída.

No modelo de mensagens do RabbitMQ, o produtor (producer) nunca envia uma mensagem diretamente para uma fila.

Para uma mensagem chegar a uma fila no RabbitMQ, ela precisa passar por um recurso chamado Exchange. Mesmo ao enviar uma mensagem para uma fila sem definir uma exchange, como mostrado no exemplo abaixo, a mensagem passará por uma exchange conhecida como default exchange.

package main

func main() {
conn, _ := amqp.Dial(“amqp://guest:guest@localhost:5672/”)
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
queue, _ := ch.QueueDeclare(
“TransactionCompleted”,
true,
false,
false,
false,
nil,
)
message := message{
ID: “1”,
}
payload, _ := json.Marshal(message)
ch.Publish(
“”,
queue.Name,
false,
false,
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(payload),
},
)
}

Exchanges:

As exchanges são um recurso que tem como propósito gerenciar e direcionar as mensagens publicadas no RabbitMQ.

Existem alguns tipos de exchanges disponíveis: direct, topic, headers e fanout. Caso queira saber mais, este artigo pode ajudar.

declare

ch.ExchangeDeclare(
“Transaction”,
“topic”,
true,
false,
false,
false,
nil,
)
queue, _ := ch.QueueDeclare(
“ProcessTransaction”,
true,
false,
false,
false,
nil,
)
eventName := “out.requested”
ch.QueueBind(
queue.Name, // name
eventName, // key
“Transaction”, // exchange
false, // noWait
nil, // args
)

publish

func (r *RabbitMQAdapter) Publish(eventName string, data any) error {
payload, err := json.Marshal(data)
if err != nil {
return err
}
return r.channel.Publish(
r.exchange, // exchange
eventName, // key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(payload),
}, // msg
)
}

func main() {
// …
rabbitmq := RabbitMQAdapter{
connection: conn,
channel: ch,
}
message := message{
ID: “1”,
}
rabbitmq.Publish(eventName, message)
}

consume

func (r *RabbitMQAdapter) Consume(queueName string, callback func(data any) error) error {
msgCh, _ := r.channel.Consume(
queueName,
“”, // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
for msg := range msgCh {
callback(msg.Body)
}
return nil
}

func main() {
// …
rabbitmq.Consume(“ProcessTransaction”, func(data any) error {
// process transaction …
return nil
})
}

Acknowledgement:

Quando o RabbitMQ entrega uma mensagem a um consumidor, ele precisa saber quando considerar que a mensagem foi enviada com sucesso. O RabbitMQ nos oferece duas formas de lidar com isso: auto-ack e ack manual.

No auto-ack, o RabbitMQ descarta a mensagem assim que ela é consumida.

No ack manual, temos o controle de quando emitir essa confirmação e se ela é positiva ou negativa.

exemplo de ack manual

func (r *RabbitMQAdapter) Consume(queueName string, callback func(data any) error) error {
msgCh, _ := r.channel.Consume(
queueName,
“”, // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
for msg := range msgCh {
if err := callback(msg.Body); err != nil {
msg.Nack(
false, // multiple
true, // requeue
)
} else {
msg.Ack(
false, // multiple
)
}
}
return nil
}

Dead Letter Exchange:

As mensagens de uma fila podem ser “dead-lettered”, o que significa que essas mensagens são republicadas em uma exchange quando qualquer um dos quatro eventos a seguir ocorrer:

A mensagem é reconhecida negativamente por um consumidor usando basic.reject ou basic.nack com requeue definido como false.
A mensagem expira devido ao TTL (tempo de vida) por mensagem.
A mensagem é descartada porque sua fila excedeu um limite de comprimento.
A mensagem é retornada mais vezes para uma fila quorum do que o limite de entrega.

Como evitar o reprocessamento de mensagens.

Agora que entendemos os recursos oferecidos pelo RabbitMQ, vamos analisar o possível motivo do comportamento mencionado no debate e como isso poderia ter sido evitado.

É muito provável que a opção de requeue ao chamar o método nack ou reject estivesse ativa.

msg.Nack(
false, // multiple
true, // requeue
)
// ou…
msg.Reject(
true, // requeue
)

Tendo isso em vista, para evitar o reprocessamento da mensagem, poderíamos simplesmente colocar a flag de requeue da mensagem como false.

msg.Nack(
false, // multiple
false, // requeue
)
// ou…
msg.Reject(
false, // requeue
)

Dessa forma, a mensagem não voltaria para a fila e seria descartada, assim como acontece no auto-ack.

Como ultilizar o ack manual de forma eficiente.

Por padrão, ao utilizar Nack ou Reject, a mensagem é perdida, porém, nem sempre é o que queremos. Para esse caso, o RabbitMQ nos oferece a Dead Letter Exchange. Tendo esse recurso ativo em uma fila, ao rejeitarmos uma mensagem, ela é encaminhada para essa exchange, que por sua vez direciona a mensagem para os interessados.

dlxName := “ProcessingError”
ch.ExchangeDeclare(
dlxName, // name
“fanout”, // kind
true, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args
)
dlq, _ := ch.QueueDeclare(
“NotifyCustomer”,
true,
false,
false,
false,
nil,
)
ch.QueueBind(dlq.Name, “”, dlxName, false, nil)
queue, _ := ch.QueueDeclare(
“ProcessTransaction”,
true,
false,
false,
false,
amqp.Table{
“x-dead-letter-exchange”: dlxName,
},
)

Conclusão

Com os recursos apresentados acima, o problema inicialmente discutido neste artigo, que é o reprocessamento indevido das mensagens, pode ser solucionado.

O ack manual com a opção requeue desativada, juntamente com a Dead Letter Exchange, formam uma poderosa combinação para lidar com erros na sua aplicação.