Eu envio mensagens String para Kafka V. 0.8 com a API do Java Producer. Se o tamanho da mensagem for de cerca de 15 MB, recebo a MessageSizeTooLargeException
. Tentei definir message.max.bytes
para 40 MB, mas ainda recebo a exceção. As mensagens pequenas funcionaram sem problemas.
(A exceção aparece no produtor, não tenho um consumidor neste aplicativo.)
O que posso fazer para me livrar dessa exceção?
Minha configuração de produtor de exemplo
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Log de erros:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
java
apache-kafka
Sonson123
fonte
fonte
Respostas:
Você precisa ajustar três (ou quatro) propriedades:
fetch.message.max.bytes
- determinará o maior tamanho de uma mensagem que pode ser buscada pelo consumidor.replica.fetch.max.bytes
- permitirá que as réplicas nos corretores enviem mensagens dentro do cluster e certifique-se de que as mensagens sejam replicadas corretamente. Se for muito pequeno, a mensagem nunca será replicada e, portanto, o consumidor nunca verá a mensagem porque ela nunca será confirmada (totalmente replicada).message.max.bytes
- este é o maior tamanho da mensagem que pode ser recebida pelo corretor de um produtor.max.message.bytes
- este é o maior tamanho da mensagem que o corretor permitirá que seja anexado ao tópico. Este tamanho é pré-compressão validado. (O padrão é do corretormessage.max.bytes
.)Eu descobri da maneira mais difícil sobre o número 2 - você não recebe NENHUMA exceção, mensagem ou aviso de Kafka, então lembre-se de levar isso em consideração ao enviar mensagens grandes.
fonte
message.max.bytes
no código-fonte. Mas eu tenho que definir esses valores na configuração do servidor Kafkaconfig/server.properties
. Agora também mensagens maiores funcionam :).fetch.message.max.bytes
memória para CADA partição. Isso significa que se você usar um grande número parafetch.message.max.bytes
combinado com um grande número de partições, consumirá muita memória. Na verdade, como o processo de replicação entre as corretoras também é um consumidor especializado, isso também consumirá memória nas corretoras.max.message.bytes
configuração por tópico que pode ser inferior à do corretormessage.max.bytes
./.*fetch.*bytes/
não parecem ser limites rígidos: "Este não é um máximo absoluto, se [...] maior que este valor, o lote de registro será ainda será devolvido para garantir que o progresso possa ser feito. "Pequenas alterações necessárias para o Kafka 0.10 e o novo consumidor em comparação com a resposta doughing_man :
message.max.bytes
ereplica.fetch.max.bytes
.message.max.bytes
deve ser igual ou menor (*) do quereplica.fetch.max.bytes
.max.request.size
para enviar a mensagem maior.max.partition.fetch.bytes
para receber mensagens maiores.(*) Leia os comentários para saber mais sobre
message.max.bytes
<=replica.fetch.max.bytes
fonte
message.max.bytes
precisa ser menor do quereplica.fetch.max.bytes
?replica.fetch.max.bytes
deveria ser estritamente maior paramessage.max.bytes
. Um funcionário da Confluent confirmou hoje cedo o que eu suspeitava: que as duas quantidades podem, de fato, ser iguais.message.max.bytes<replica.fetch.max.bytes
oumessage.max.bytes=replica.fetch.max.bytes
@Kostas?Você precisa substituir as seguintes propriedades:
Configurações do corretor ($ KAFKA_HOME / config / server.properties)
Configurações do consumidor ($ KAFKA_HOME / config / consumer.properties)
Esta etapa não funcionou para mim. Eu adicionei ao aplicativo do consumidor e estava funcionando bem
Reinicie o servidor.
consulte esta documentação para obter mais informações: http://kafka.apache.org/08/configuration.html
fonte
A ideia é ter o mesmo tamanho de mensagem enviada do Produtor Kafka para o Corretor Kafka e depois recebida pelo Consumidor Kafka, ou seja,
Produtor Kafka -> Corretor Kafka -> Consumidor Kafka
Suponha que se o requisito for enviar 15 MB de mensagem, o produtor , o corretor e o consumidor , todos os três, precisam estar sincronizados.
Kafka Produtor envia 15 MB -> Kafka Broker Permite / Armazena 15 MB -> Kafka Consumidor recebe 15 MB
A configuração, portanto, deve ser:
a) no corretor:
b) no consumidor:
fonte
É importante lembrar que o
message.max.bytes
atributo deve estar sincronizado com afetch.message.max.bytes
propriedade do consumidor . o tamanho de busca deve ser pelo menos tão grande quanto o tamanho máximo da mensagem, caso contrário, pode haver uma situação em que os produtores podem enviar mensagens maiores do que o consumidor pode consumir / buscar. Pode valer a pena dar uma olhada nisso.Qual versão do Kafka você está usando? Forneça também mais alguns rastreios de detalhes que você está obtendo. há algo como ...
payload size of xxxx larger than 1000000
aparecendo no log?fonte
A resposta de @laughing_man é bastante precisa. Mesmo assim, eu queria dar uma recomendação que aprendi com o especialista em Kafka, Stephane Maarek, do Quora.
Kafka não foi feito para lidar com mensagens grandes.
Sua API deve usar armazenamento em nuvem (Ex AWS S3), e apenas enviar para Kafka ou qualquer corretor de mensagens uma referência de S3. Você deve encontrar um lugar para manter seus dados, talvez seja uma unidade de rede, talvez seja o que for, mas não deve ser um agente de mensagens.
Agora, se você não quiser ir com a solução acima
O tamanho máximo da mensagem é 1 MB (a configuração em seus corretores é chamada
message.max.bytes
) Apache Kafka . Se você realmente precisasse muito, você poderia aumentar esse tamanho e certificar-se de aumentar os buffers de rede para seus produtores e consumidores.E se você realmente se preocupa em dividir sua mensagem, certifique-se de que cada divisão de mensagem tenha exatamente a mesma chave para que seja enviada para a mesma partição, e seu conteúdo de mensagem deve relatar um "id de parte" para que seu consumidor possa reconstruir totalmente a mensagem .
Você também pode explorar a compactação, se sua mensagem for baseada em texto (gzip, snappy, compactação lz4), o que pode reduzir o tamanho dos dados, mas não por mágica.
Novamente, você precisa usar um sistema externo para armazenar esses dados e apenas enviar uma referência externa para Kafka. Essa é uma arquitetura muito comum, que você deve seguir e amplamente aceita.
Tenha isso em mente que o Kafka funciona melhor apenas se as mensagens forem grandes em quantidade, mas não em tamanho.
Fonte: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka
fonte
Para pessoas que usam landoop kafka: você pode passar os valores de configuração nas variáveis de ambiente como:
E se você estiver usando rdkafka, passe a mensagem.max.bytes na configuração do produtor como:
Da mesma forma, para o consumidor,
fonte