RabbitMQ / AMQP: fila única, vários consumidores para a mesma mensagem?

144

Estou apenas começando a usar o RabbitMQ e o AMQP em geral.

  • Eu tenho uma fila de mensagens
  • Tenho vários consumidores, que gostaria de fazer coisas diferentes com a mesma mensagem .

A maior parte da documentação do RabbitMQ parece estar focada no round-robin, ou seja, onde uma única mensagem é consumida por um único consumidor, com a carga sendo distribuída entre cada consumidor. Este é realmente o comportamento que eu testemunho.

Um exemplo: o produtor possui uma única fila e envia mensagens a cada 2 segundos:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

E aqui está um consumidor:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

Se eu iniciar o consumidor duas vezes, posso ver que cada consumidor está consumindo mensagens alternativas no comportamento de rodízio. Por exemplo, vou ver as mensagens 1, 3, 5 em um terminal, 2, 4, 6 no outro .

Minha pergunta é:

  • Posso fazer com que cada consumidor receba as mesmas mensagens? Ou seja, ambos os consumidores recebem a mensagem 1, 2, 3, 4, 5, 6? Como é chamado o AMQP / RabbitMQ? Como é configurado normalmente?

  • Isso é comumente feito? Devo apenas fazer com que o Exchange direcione a mensagem para duas filas separadas, com um único consumidor?

mikemaccana
fonte
5
Eu não sou um especialista em RabbitMQ. No entanto, o que você tem agora é chamado de fila, mas o que você quer são tópicos, consulte este tutorial: rabbitmq.com/tutorials/tutorial-five-python.html , mais sobre filas versus tópicos: msdn.microsoft.com/en-us /library/windowsazure/hh367516.aspx
UrbanEsc
1
Eu acredito que ele quer fanout na verdade, embora os tópicos funcionem também e dêem mais controle mais tarde.
22412 roberthewolf #
Obrigado @UrbanEsc. Os tópicos parecem resolver o problema fazendo com que uma mensagem atinja várias filas e, portanto, seja consumida por cada fila de consumidores. O que me inclina ainda mais para o cenário de várias filas / consumidor único para o meu caso em particular.
Mikemaccana
1
Para 2018 (e mesmo para 2016 e anteriores), a resposta é usar algo como Kafka, IMO.
WattsInABox

Respostas:

115

Posso fazer com que cada consumidor receba as mesmas mensagens? Ou seja, ambos os consumidores recebem a mensagem 1, 2, 3, 4, 5, 6? Como é chamado o AMQP / RabbitMQ? Como é configurado normalmente?

Não, não se os consumidores estiverem na mesma fila. No guia de conceitos AMQP do RabbitMQ :

é importante entender que, no AMQP 0-9-1, as mensagens são balanceadas de carga entre os consumidores.

Isso parece implicar que o comportamento de rodízio em uma fila é um dado e não configurável. Ou seja, filas separadas são necessárias para que o mesmo ID de mensagem seja tratado por vários consumidores.

Isso é comumente feito? Devo apenas fazer com que o Exchange direcione a mensagem para duas filas separadas, com um único consumidor?

Não, não é, fila única / vários consumidores, com cada consumidor manipulando a mesma ID de mensagem não é possível. Ter a rota de troca da mensagem em duas filas separadas é realmente melhor.

Como não preciso de roteamento muito complexo, uma troca de fanout lidará com isso muito bem. Não foquei muito em Trocas anteriormente, pois o node-amqp tem o conceito de 'troca padrão', permitindo publicar mensagens diretamente em uma conexão, no entanto, a maioria das mensagens AMQP é publicada em uma troca específica.

Aqui está minha troca de fãs, tanto enviando quanto recebendo:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
mikemaccana
fonte
1
espalhar claramente era o que você queria. Não o ajudaria aqui, mas pensei em mencionar que o comportamento round-robin dentro de uma fila é configurável. int prefetchCount = 1; channel.basicQos(prefetchCount); Isso permitirá que cada consumidor receba uma mensagem assim que terminar com a anterior. Em vez de receber mensagens alternadas. Novamente, não resolve o seu problema, mas pode ser útil para as pessoas saberem. exemplo aqui http://www.rabbitmq.com/tutorials/tutorial-two-java.html em Fair Dispatch
Ommit
3
Para esclarecer: 'troca padrão' não é específico do nó-amqp. É um conceito geral do AMQP com as seguintes regras: quando qualquer mensagem é publicada para troca padrão, a chave de roteamento (com a qual essa mensagem foi publicada) tratada como nome da fila pelo broker do AMQP. Parece que você pode publicar diretamente nas filas. Mas você nao é. O broker simplesmente liga cada fila à troca padrão com a chave de roteamento igual ao nome da fila.
Ruslan Stelmachenko
2
Existe alguma alternativa para os tópicos do Apache activemq jms no rabbitmq em que nenhuma fila está envolvida, mas sim multicasting?
pantonis
Se o mesmo usuário fizer login em vários dispositivos, a mensagem receberá apenas um dispositivo. Como pode ser resolvido ou alguma idéia, por favor?
Rafiq
@ Rafiq, você deve fazer uma pergunta para isso.
Mikemaccana #
27

Basta ler o tutorial rabbitmq . Você publica mensagem para troca, não para fila; em seguida, é roteado para as filas apropriadas. No seu caso, você deve vincular uma fila separada para cada consumidor. Dessa forma, eles podem consumir mensagens de forma totalmente independente.

driushkin
fonte
26

As duas últimas respostas estão quase corretas - eu tenho vários aplicativos que geram mensagens que precisam terminar com diferentes consumidores, portanto o processo é muito simples.

Se você desejar vários consumidores com a mesma mensagem, siga o procedimento a seguir.

Crie várias filas, uma para cada aplicativo que receberá a mensagem, nas propriedades de cada fila, "vincule" uma marca de roteamento à troca amq.direct. Altere seu aplicativo de publicação para enviar para amq.direct e use a tag de roteamento (não uma fila). O AMQP copiará a mensagem em cada fila com a mesma ligação. Funciona como um encanto :)

Exemplo: digamos que eu possuo uma string JSON que eu gere, eu a publico na central "amq.direct" usando a tag de roteamento "new-sales-order", tenho uma fila para meu aplicativo order_printer que imprime o pedido, tenho um fila para o meu sistema de cobrança que enviará uma cópia do pedido e faturará ao cliente, e eu tenho um sistema de arquivamento na web em que arquivo os pedidos por razões históricas / de conformidade e tenho uma interface da web do cliente em que os pedidos são rastreados quando outras informações são exibidas. uma ordem.

Então, minhas filas são: order_printer, order_billing, order_archive e order_tracking Todos têm a tag de ligação "new-sales-order" vinculada a eles, todos os 4 obterão os dados JSON.

Essa é a maneira ideal de enviar dados sem que o aplicativo de publicação conheça ou se preocupe com os aplicativos de recebimento.

z900collector
fonte
8

Sim, cada consumidor pode receber as mesmas mensagens. consulte http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http: //www.rabbitmq. com / tutoriais / tutorial-cinco-python.html

para diferentes maneiras de rotear mensagens. Eu sei que eles são para python e java, mas é bom entender os princípios, decidir o que você está fazendo e depois descobrir como fazê-lo em JS. Parece que você deseja fazer um fanout simples ( tutorial 3 ), que envia as mensagens para todas as filas conectadas à central.

A diferença com o que você está fazendo e com o que você quer fazer é basicamente que você irá configurar e trocar ou digitar fanout. As trocas de fanout enviam todas as mensagens para todas as filas conectadas. Cada fila terá um consumidor que terá acesso a todas as mensagens separadamente.

Sim, isso geralmente é feito, é um dos recursos do AMPQ.

robthewolf
fonte
ótima resposta, exceto por 'isso é comumente feito?' Eu estava me referindo a 'fazer com que cada consumidor recebesse as mesmas mensagens' - o que geralmente não é feito (consumidores na mesma fila sempre rodam com o robin). Provavelmente minha culpa por não ser clara o suficiente.
Mikemaccana
Na verdade, atrevo-me a dizer que depende do que você deseja usá-lo. Você tem duas opções básicas de pub / sub ou filas de trabalho. Sua configuração original era uma fila de trabalho, mas o que você queria era um pub / sub fanout. Eles apontam que o uso comum aqui é totalmente dependente do que você deseja fazer.
22412 Roblhewolf #
Certamente, mas em uma fila de trabalho, a mesma mensagem (por exemplo, o mesmo ID de mensagem) não é tratada por diferentes consumidores - é implicitamente round robin. Novamente, isso provavelmente é minha culpa por não ser suficientemente claro.
Mikemaccana
parece que estamos falando de propósitos diferentes aqui.
214123 roberthewolf #
Desculpe a confusão. Se houver alguma maneira de ter uma fila de trabalho em que os consumidores da mesma fila lidem com o mesmo ID de mensagem, indique-me uma referência. Caso contrário, continuarei acreditando no que li em outro lugar.
Mikemaccana
3

RabbitMQ / AMQP: fila única, vários consumidores para a mesma mensagem e atualização de página.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
durai
fonte
1

Para obter o comportamento desejado, basta fazer com que cada consumidor consuma em sua própria fila. Você precisará usar um tipo de troca não direta (tópico, cabeçalho, fanout) para obter a mensagem para todas as filas de uma só vez.

Skylos
fonte
1

Como eu avalio o seu caso, é:

  • Eu tenho uma fila de mensagens (sua fonte para receber mensagens, vamos chamá-lo de q111)

  • Tenho vários consumidores, que gostaria de fazer coisas diferentes com a mesma mensagem.

Seu problema aqui é que, enquanto 3 mensagens são recebidas por essa fila, a mensagem 1 é consumida por um consumidor A, outros consumidores B e C consomem as mensagens 2 e 3. Onde você precisar de uma configuração em que rabbitmq passe as mesmas cópias de todas essas três mensagens (1,2,3) para todos os três consumidores conectados (A, B, C) simultaneamente.

Embora muitas configurações possam ser feitas para isso, uma maneira simples é usar o seguinte conceito de duas etapas:

  • Use um rabbitmq-shovel dinâmico para capturar mensagens da fila desejada (q111) e publique em uma troca de fanout (troca criada e dedicada exclusivamente para essa finalidade).
  • Agora reconfigure seus consumidores A, B e C (que estavam ouvindo a fila (q111)) para ouvir diretamente essa troca de Fanout usando uma fila exclusiva e anônima para cada consumidor.

Nota: Ao usar esse conceito, não consuma diretamente da fila de origem (q111), pois as mensagens já consumidas não serão enviadas para a sua troca de Fanout.

Se você acha que isso não atende exatamente aos seus requisitos ... fique à vontade para postar suas sugestões :-)

Anshuman Banerjee
fonte
0

Eu acho que você deve verificar o envio de suas mensagens usando o trocador de fan-out . Dessa forma, você receberá a mesma mensagem para diferentes consumidores, abaixo da tabela, o RabbitMQ está criando filas diferentes para cada um desses novos consumidores / assinantes.

Este é o link para ver o exemplo do tutorial em javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

Alejandro Serret
fonte
-1

Há uma opção interessante nesse cenário que não encontrei nas respostas aqui.

Você pode Nack mensagens com o recurso "refileirar" em um consumidor para processá-las em outro. De um modo geral, não é o caminho certo, mas talvez seja bom o suficiente para alguém.

https://www.rabbitmq.com/nack.html

E cuidado com os loops (quando todos os concorrentes concordam em remarcar mensagem)!

Alexus1024
fonte
1
Eu recomendaria muito contra isso, pois ele não aumenta de forma alguma. Não há ordem para os consumidores, você não pode garantir que o consumidor B, que não a coloca novamente na fila, recebe a mensagem antes do consumidor A, que a processa e a coloca novamente na fila, os loops mencionados são um problema. Como você diz "isso geralmente não é o caminho certo", e não consigo pensar em um cenário em que isso seria melhor do que as outras respostas.
Kevin Streicher