Como posso enviar mensagens grandes com Kafka (acima de 15 MB)?

119

Eu envio mensagens String para Kafka V. 0.8 com a API do Java Producer. Se o tamanho da mensagem for de cerca de 15 MB, recebo a MessageSizeTooLargeException. Tentei definir message.max.bytespara 40 MB, mas ainda recebo a exceção. As mensagens pequenas funcionaram sem problemas.

(A exceção aparece no produtor, não tenho um consumidor neste aplicativo.)

O que posso fazer para me livrar dessa exceção?

Minha configuração de produtor de exemplo

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Log de erros:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Sonson123
fonte
5
Meu primeiro instinto seria pedir a você para dividir esta mensagem enorme em várias menores: - / Meu palpite é que isso não é possível por algum motivo, mas você pode querer reconsiderar mesmo assim: Mensagens enormes geralmente significam que há uma falha de design algum lugar que realmente deva ser consertado.
Aaron Digulla
1
Obrigado, mas tornaria minha lógica muito mais complexa. Por que é uma ideia usar o Kafka para mensagens com cerca de 15 MB? 1 MB é o limite máximo de tamanho de mensagem que pode ser usado? Não encontrei muito sobre o limite de tamanho da mensagem na documentação do Kafka.
Sonson123
2
Isso não tem nenhuma relação com o Kafka ou qualquer outro sistema de processamento de mensagens. Meu raciocínio: se algo der errado com seu arquivo de 15 MB, limpar a bagunça depois será muito caro. É por isso que geralmente divido arquivos grandes em muitas tarefas menores (que podem ser executadas em paralelo também).
Aaron Digulla
você já usou alguma compressão? você poderia compartilhar mais alguns detalhes, é meio difícil adivinhar algo com apenas uma única palavra
user2720864

Respostas:

182

Você precisa ajustar três (ou quatro) propriedades:

  • Lado do consumidor: fetch.message.max.bytes- determinará o maior tamanho de uma mensagem que pode ser buscada pelo consumidor.
  • Lado do corretor: replica.fetch.max.bytes- permitirá que as réplicas nos corretores enviem mensagens dentro do cluster e certifique-se de que as mensagens sejam replicadas corretamente. Se for muito pequeno, a mensagem nunca será replicada e, portanto, o consumidor nunca verá a mensagem porque ela nunca será confirmada (totalmente replicada).
  • Lado do corretor: message.max.bytes- este é o maior tamanho da mensagem que pode ser recebida pelo corretor de um produtor.
  • Lado do corretor (por tópico): max.message.bytes- este é o maior tamanho da mensagem que o corretor permitirá que seja anexado ao tópico. Este tamanho é pré-compressão validado. (O padrão é do corretor message.max.bytes.)

Eu descobri da maneira mais difícil sobre o número 2 - você não recebe NENHUMA exceção, mensagem ou aviso de Kafka, então lembre-se de levar isso em consideração ao enviar mensagens grandes.

homem rindo
fonte
3
Ok, você e o usuário 2720864 estavam corretos. Eu tinha apenas definido o message.max.bytesno código-fonte. Mas eu tenho que definir esses valores na configuração do servidor Kafka config/server.properties. Agora também mensagens maiores funcionam :).
Sonson123
3
Há alguma desvantagem conhecida em definir esses valores muito altos?
Ivan Balashov
7
Sim. No lado do consumidor, você aloca fetch.message.max.bytesmemória para CADA partição. Isso significa que se você usar um grande número para fetch.message.max.bytescombinado com um grande número de partições, consumirá muita memória. Na verdade, como o processo de replicação entre as corretoras também é um consumidor especializado, isso também consumirá memória nas corretoras.
laugh_man
3
Observe que também há uma max.message.bytesconfiguração por tópico que pode ser inferior à do corretor message.max.bytes.
Peter Davis
1
De acordo com o doc oficial, os parâmetros do lado do consumidor e aqueles relativos à replicação entre corretores /.*fetch.*bytes/não parecem ser limites rígidos: "Este não é um máximo absoluto, se [...] maior que este valor, o lote de registro será ainda será devolvido para garantir que o progresso possa ser feito. "
Bluu
56

Pequenas alterações necessárias para o Kafka 0.10 e o novo consumidor em comparação com a resposta doughing_man :

  • Corretor: Sem alterações, você ainda precisa aumentar as propriedades message.max.bytese replica.fetch.max.bytes. message.max.bytesdeve ser igual ou menor (*) do que replica.fetch.max.bytes.
  • Produtor: Aumente max.request.sizepara enviar a mensagem maior.
  • Consumidor: Aumente max.partition.fetch.bytespara receber mensagens maiores.

(*) Leia os comentários para saber mais sobre message.max.bytes<=replica.fetch.max.bytes

Sascha Vetter
fonte
2
Você sabe por que message.max.bytesprecisa ser menor do que replica.fetch.max.bytes?
Kostas
2
" replica.fetch.max.bytes (padrão: 1 MB) - Tamanho máximo dos dados que um corretor pode replicar. Deve ser maior do que message.max.bytes ou um corretor aceitará mensagens e não as replicará. potencial perda de dados. " Fonte: handling-large-messages-kafka
Sascha Vetter
2
Obrigado por me responder com um link. Isso também parece ecoar o que o guia Cloudera sugere. Ambos, entretanto, estão errados - observe que eles não oferecem nenhuma razão técnica do porquê replica.fetch.max.bytes deveria ser estritamente maior para message.max.bytes. Um funcionário da Confluent confirmou hoje cedo o que eu suspeitava: que as duas quantidades podem, de fato, ser iguais.
Kostas
2
Há alguma atualização sobre message.max.bytes<replica.fetch.max.bytesou message.max.bytes=replica.fetch.max.bytes@Kostas?
Sascha Vetter
2
Sim, eles podem ser iguais: mail-archive.com/[email protected]/msg25494.html (Ismael trabalha para a Confluent)
Kostas
13

Você precisa substituir as seguintes propriedades:

Configurações do corretor ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Configurações do consumidor ($ KAFKA_HOME / config / consumer.properties)
Esta etapa não funcionou para mim. Eu adicionei ao aplicativo do consumidor e estava funcionando bem

  • fetch.message.max.bytes

Reinicie o servidor.

consulte esta documentação para obter mais informações: http://kafka.apache.org/08/configuration.html

user2550587
fonte
1
para o consumidor da linha de comando, preciso usar a sinalização --fetch-size = <bytes>. Parece não ler o arquivo consumer.properties (kafka 0.8.1). Eu também recomendaria ativar a compactação do lado do produtor usando a opção compression.codec.
Ziggy Eunicien
O comentário de Ziggy funcionou para mim kafka 0.8.1.1. Obrigado!
James
será que fetch.message.max.bytes é substituído por max.partition.fetch.bytes em ConsumerConfig?
s_bei
12

A ideia é ter o mesmo tamanho de mensagem enviada do Produtor Kafka para o Corretor Kafka e depois recebida pelo Consumidor Kafka, ou seja,

Produtor Kafka -> Corretor Kafka -> Consumidor Kafka

Suponha que se o requisito for enviar 15 MB de mensagem, o produtor , o corretor e o consumidor , todos os três, precisam estar sincronizados.

Kafka Produtor envia 15 MB -> Kafka Broker Permite / Armazena 15 MB -> Kafka Consumidor recebe 15 MB

A configuração, portanto, deve ser:

a) no corretor:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) no consumidor:

fetch.message.max.bytes=15728640
Ravi
fonte
2
será que fetch.message.max.bytes é substituído por max.partition.fetch.bytes em ConsumerConfig?
s_bei
7

É importante lembrar que o message.max.bytesatributo deve estar sincronizado com a fetch.message.max.bytespropriedade do consumidor . o tamanho de busca deve ser pelo menos tão grande quanto o tamanho máximo da mensagem, caso contrário, pode haver uma situação em que os produtores podem enviar mensagens maiores do que o consumidor pode consumir / buscar. Pode valer a pena dar uma olhada nisso.
Qual versão do Kafka você está usando? Forneça também mais alguns rastreios de detalhes que você está obtendo. há algo como ... payload size of xxxx larger than 1000000aparecendo no log?

user2720864
fonte
1
Eu atualizei minha pergunta com mais informações: Kafka versão 2.8.0-0.8.0; agora só preciso do produtor.
Sonson123
6

A resposta de @laughing_man é bastante precisa. Mesmo assim, eu queria dar uma recomendação que aprendi com o especialista em Kafka, Stephane Maarek, do Quora.

Kafka não foi feito para lidar com mensagens grandes.

Sua API deve usar armazenamento em nuvem (Ex AWS S3), e apenas enviar para Kafka ou qualquer corretor de mensagens uma referência de S3. Você deve encontrar um lugar para manter seus dados, talvez seja uma unidade de rede, talvez seja o que for, mas não deve ser um agente de mensagens.

Agora, se você não quiser ir com a solução acima

O tamanho máximo da mensagem é 1 MB (a configuração em seus corretores é chamada message.max.bytes) Apache Kafka . Se você realmente precisasse muito, você poderia aumentar esse tamanho e certificar-se de aumentar os buffers de rede para seus produtores e consumidores.

E se você realmente se preocupa em dividir sua mensagem, certifique-se de que cada divisão de mensagem tenha exatamente a mesma chave para que seja enviada para a mesma partição, e seu conteúdo de mensagem deve relatar um "id de parte" para que seu consumidor possa reconstruir totalmente a mensagem .

Você também pode explorar a compactação, se sua mensagem for baseada em texto (gzip, snappy, compactação lz4), o que pode reduzir o tamanho dos dados, mas não por mágica.

Novamente, você precisa usar um sistema externo para armazenar esses dados e apenas enviar uma referência externa para Kafka. Essa é uma arquitetura muito comum, que você deve seguir e amplamente aceita.

Tenha isso em mente que o Kafka funciona melhor apenas se as mensagens forem grandes em quantidade, mas não em tamanho.

Fonte: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Bhanu Hoysala
fonte
4
Você pode querer notar que "sua" recomendação é uma cópia quase palavra por palavra da recomendação Quora de Stéphane Maarek em quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
Mike
Kafka trabalha com mensagens grandes, absolutamente nenhum problema. A página de introdução na página inicial do Kafka até faz referência a ele como um sistema de armazenamento.
calloc_org
3

Para pessoas que usam landoop kafka: você pode passar os valores de configuração nas variáveis ​​de ambiente como:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

E se você estiver usando rdkafka, passe a mensagem.max.bytes na configuração do produtor como:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Da mesma forma, para o consumidor,

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
informante
fonte