Existe uma maneira de limpar o tópico no kafka?
Enviei uma mensagem muito grande para um tópico de mensagem kafka na minha máquina local, agora estou recebendo um erro:
kafka.common.InvalidMessageSizeException: invalid message size
Aumentar o fetch.size
não é o ideal aqui, porque na verdade não quero aceitar mensagens tão grandes.
apache-kafka
purge
Peter Klipfel
fonte
fonte
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
--delete-config retention.ms
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
Isso também permite verificar o período de retenção atual, por exemplo, kafka-configs --zookeeper <zkhost>: 2181 --describe - topic-type topics --entity-name <nome do tópico>Para limpar a fila, você pode excluir o tópico:
depois recrie-o:
fonte
delete.topic.enable=true
no arquivoconfig/server.properties
, como a advertência impressa pelo comando mencionado dizNote: This will have no impact if delete.topic.enable is not set to true.
Aqui estão as etapas que eu sigo para excluir um tópico chamado
MyTopic
:rm -rf /tmp/kafka-logs/MyTopic-0
. Repita para outras partições e todas as réplicaszkCli.sh
em seguida,rmr /brokers/MyTopic
Se você errar a etapa 3, o Apache Kafka continuará relatando o tópico como presente (por exemplo, quando você executar
kafka-list-topic.sh
).Testado com Apache Kafka 0.8.0.
fonte
./zookeeper-shell.sh localhost:2181
e./kafka-topics.sh --list --zookeeper localhost:2181
zookeeper-client
em vez dezkCli.sh
(tentou em Cloudera CDH5)Embora a resposta aceita esteja correta, esse método foi preterido. A configuração do tópico agora deve ser feita via
kafka-configs
.As configurações definidas por esse método podem ser exibidas com o comando
fonte
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
Testado no Kafka 0.8.2, para o exemplo de início rápido: Primeiro, adicione uma linha ao arquivo server.properties na pasta config:
então, você pode executar este comando:
fonte
From kafka 1.1
Limpar um tópico
aguarde 1 minuto, para garantir que kafka limpe o tópico remova a configuração e, em seguida, vá para o valor padrão
fonte
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
O kafka não possui um método direto para o tópico de limpeza / limpeza (Filas), mas pode fazer isso excluindo esse tópico e recriá-lo.
primeiro, verifique se o arquivo sever.properties possui e, se não, adicione
delete.topic.enable=true
em seguida, excluir tópico
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
depois crie-o novamente.
fonte
Às vezes, se você tiver um cluster saturado (muitas partições, ou usando dados de tópico criptografados, ou SSL, ou o controlador estiver em um nó inválido ou a conexão estiver irregular, levará muito tempo para limpar o tópico) .
Eu sigo estas etapas, principalmente se você estiver usando o Avro.
1: Execute com ferramentas kafka:
2: Executar no nó de registro do esquema:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Coloque a retenção de tópicos de volta à configuração original, quando o tópico estiver vazio.
Espero que isso ajude alguém, pois não é facilmente anunciado.
fonte
kafka-avro-console-consumer
não é necessárioATUALIZAÇÃO: Esta resposta é relevante para o Kafka 0.6. Para o Kafka 0.8 e posterior, consulte a resposta do @Patrick.
Sim, pare o kafka e exclua manualmente todos os arquivos do subdiretório correspondente (é fácil encontrá-lo no diretório de dados kafka). Após o kafka reiniciar, o tópico ficará vazio.
fonte
A abordagem mais simples é definir que a data dos arquivos de log individuais seja mais antiga que o período de retenção. Em seguida, o corretor deve limpá-los e removê-los para você dentro de alguns segundos. Isso oferece várias vantagens:
Na minha experiência com o Kafka 0.7.x, remover os arquivos de log e reiniciar o broker pode levar a exceções de deslocamento inválidas para determinados consumidores. Isso aconteceria porque o broker reiniciaria as compensações em zero (na ausência de arquivos de log existentes) e um consumidor que estava consumindo anteriormente do tópico se reconectaria para solicitar um deslocamento [uma vez válido] específico. Se esse deslocamento estiver fora dos limites dos novos logs de tópicos, nenhum dano será causado e o consumidor continuará no início ou no final. Porém, se o deslocamento cair dentro dos limites dos novos logs de tópico, o broker tentará buscar o conjunto de mensagens, mas falhará porque o deslocamento não se alinha a uma mensagem real.
Isso pode ser atenuado também limpando as compensações do consumidor no tratador para esse tópico. Mas se você não precisa de um tópico virgem e apenas deseja remover o conteúdo existente, simplesmente 'tocar' alguns logs de tópicos é muito mais fácil e confiável do que parar os intermediários, excluir logs de tópicos e limpar certos nós do tratador .
fonte
O conselho de Thomas é ótimo, mas infelizmente
zkCli
nas versões antigas do Zookeeper (por exemplo, 3.3.6) não parecem dar suportermr
. Por exemplo, compare a implementação da linha de comando no Zookeeper moderno com a versão 3.3 .Se você se deparar com uma versão antiga do Zookeeper, uma solução é usar uma biblioteca cliente como o zc.zk para Python. Para pessoas não familiarizadas com o Python, é necessário instalá-lo usando o pip ou o easy_install . Em seguida, inicie um shell Python (
python
) e você pode fazer:ou mesmo
se você deseja remover todos os tópicos do Kafka.
fonte
Para limpar todas as mensagens de um tópico específico usando o seu grupo de aplicativos (GroupName deve ser o mesmo que o nome do grupo kafka do aplicativo).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
fonte
Após a resposta do @steven appleyard, executei os seguintes comandos no Kafka 2.2.0 e eles funcionaram para mim.
fonte
Muitas ótimas respostas por aqui, mas entre elas, não encontrei uma sobre o docker. Passei algum tempo para descobrir que o uso do contêiner do corretor está errado neste caso (obviamente !!!)
e eu deveria ter usado em
zookeeper:2181
vez do--zookeeper localhost:2181
meu arquivo de composiçãoo comando correto seria
Espero que economize o tempo de alguém.
Além disso, esteja ciente de que as mensagens não serão excluídas imediatamente e isso acontecerá quando o segmento do log for fechado.
fonte
localhost:2181
... Por exemplo, você está entendendo errado os recursos de rede do Docker. Além disso, nem todos os contêineres do Zookeeper possuemkafka-topics
, portanto, é melhor não usá-lo dessa maneira. As instalações mais recentes do Kafka permitem--bootstrap-servers
alterar um tópico em vez de--zookeeper
you can use
--zookeeper zookeeper: 2181` do contêiner Kafka é o meu ponto. Ou até mesmo cumprimentar a linha Zookeeper do arquivo server.propertiesNão foi possível adicionar como comentário por causa do tamanho: Não tenho certeza se isso é verdade, além de atualizar retention.ms e retention.bytes, mas notei que a política de limpeza de tópicos deve ser "delete" (padrão), se "compact", ela será mantenha as mensagens por mais tempo, ou seja, se for "compacto", você deverá especificar também delete.retention.ms .
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Também teve que monitorar as compensações mais antigas / mais recentes, devem ser as mesmas para confirmar que isso ocorreu com êxito, também pode verificar o du -h / tmp / kafka-logs / test-topic-3-100- *
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762
O outro problema é que você precisa primeiro obter a configuração atual para se lembrar de reverter após a exclusão ser bem-sucedida:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
fonte
Outra abordagem, bastante manual, para eliminar um tópico é:
nos corretores:
sudo service kafka stop
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
no tratador:
sudo /usr/lib/zookeeper/bin/zkCli.sh
rmr /brokers/topic/<some_topic_name>
nos corretores novamente:
sudo service kafka start
fonte
Isso deve dar
retention.ms
configurado. Em seguida, você pode usar o comando alter acima para mudar para 1 segundo (e depois voltar ao padrão).fonte
No Java, usando o novo em
AdminZkClient
vez do descontinuadoAdminUtils
:fonte
AdminClient
orKafkaAdminClient