Limpar tópico Kafka

185

Existe uma maneira de limpar o tópico no kafka?

Enviei uma mensagem muito grande para um tópico de mensagem kafka na minha máquina local, agora estou recebendo um erro:

kafka.common.InvalidMessageSizeException: invalid message size

Aumentar o fetch.sizenão é o ideal aqui, porque na verdade não quero aceitar mensagens tão grandes.

Peter Klipfel
fonte

Respostas:

360

Atualize temporariamente o tempo de retenção no tópico para um segundo:

kafka-topics.sh --zookeeper <zkhost>:2181 --alter --topic <topic name> --config retention.ms=1000

E nas versões mais recentes do Kafka, você também pode fazê-lo com kafka-configs --entity-type topics

kafka-configs.sh --zookeeper <zkhost>:2181 --entity-type topics --alter --entity-name <topic name> --add-config retention.ms=1000

então aguarde a limpeza entrar em vigor (cerca de um minuto). Depois de limpo, restaure o retention.msvalor anterior .

steven appleyard
fonte
8
Essa é uma ótima resposta, mas você poderia adicionar uma descrição de como começar verificando o valor retention.ms atual do tópico?
Greg Dubicki
28
Eu não tenho certeza sobre como verificar a configuração atual, mas acredito que redefini-lo de volta à aparência padrão como:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
Aspergillus oryzae
15
Ou dependendo da versão:--delete-config retention.ms
aspergillusOryzae
3
apenas um fyi, para o kafka v. 0.9.0.0, ele diz: ubuntu @ ip-172-31-21-201: /opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin / kafka-topics.sh - -zookeeper localhost: 2181 --alter --topic room-data --config retention.ms = 1000 AVISO: A alteração da configuração do tópico desse script foi descontinuada e pode ser removida em versões futuras. Indo para a frente, por favor use kafka-configs.sh para essa funcionalidade
Alper Akture
54
Parece que desde a versão 0.9.0, usar o kafka-topics.sh para alterar a configuração está obsoleto. A nova opção é usar o script kafka-configs.sh. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 Isso também permite verificar o período de retenção atual, por exemplo, kafka-configs --zookeeper <zkhost>: 2181 --describe - topic-type topics --entity-name <nome do tópico>
RHE
70

Para limpar a fila, você pode excluir o tópico:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

depois recrie-o:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
rjaiswal
fonte
14
Lembre-se de adicionar a linha delete.topic.enable=trueno arquivo config/server.properties, como a advertência impressa pelo comando mencionado dizNote: This will have no impact if delete.topic.enable is not set to true.
Patrizio Bertoni
3
Isso nem sempre é instantâneo. Às vezes, apenas marcará a exclusão e a exclusão real ocorrerá mais tarde.
Gaurav Khare
48

Aqui estão as etapas que eu sigo para excluir um tópico chamado MyTopic:

  1. Descreva o tópico e não use os IDs do broker
  2. Pare o daemon Apache Kafka para cada ID do broker listado.
  3. Conecte-se a cada intermediário e exclua a pasta de dados do tópico, por exemplo rm -rf /tmp/kafka-logs/MyTopic-0. Repita para outras partições e todas as réplicas
  4. Excluir os metadados tópico: zkCli.shem seguida,rmr /brokers/MyTopic
  5. Inicie o daemon Apache Kafka para cada máquina parada

Se você errar a etapa 3, o Apache Kafka continuará relatando o tópico como presente (por exemplo, quando você executar kafka-list-topic.sh).

Testado com Apache Kafka 0.8.0.

Thomas Bratt
fonte
2
em 0.8.1 ./zookeeper-shell.sh localhost:2181e./kafka-topics.sh --list --zookeeper localhost:2181
pdeschen
Pode usar zookeeper-clientem vez de zkCli.sh(tentou em Cloudera CDH5)
Martin Tapp
1
Isso exclui o tópico, não os dados dentro dele. Isso requer que o Broker seja parado. Isso é, na melhor das hipóteses, um hack. A resposta de Steven Appleyard é realmente a melhor absoluta.
Jeff Maass
1
Esta foi a única maneira no momento em que foi escrito.
Thomas Bratt
2
Trabalhou para mim no Kafka 0.8.2.1, embora os topis no tratador estivessem em / brokers / topics / <topic name here>
codecraig 17/15
44

Embora a resposta aceita esteja correta, esse método foi preterido. A configuração do tópico agora deve ser feita via kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

As configurações definidas por esse método podem ser exibidas com o comando

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
Shane Perry
fonte
2
Também vale a pena adicionar:kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --delete-config retention.ms --entity-name MyTopic
NoBrainer
38

Testado no Kafka 0.8.2, para o exemplo de início rápido: Primeiro, adicione uma linha ao arquivo server.properties na pasta config:

delete.topic.enable=true

então, você pode executar este comando:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Patrick
fonte
6

From kafka 1.1

Limpar um tópico

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

aguarde 1 minuto, para garantir que kafka limpe o tópico remova a configuração e, em seguida, vá para o valor padrão

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
user644265
fonte
1
Eu acho que você tem uma flecha extra. No meu, eu era capaz de correrbin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config rentention.ms=100
Will
4

O kafka não possui um método direto para o tópico de limpeza / limpeza (Filas), mas pode fazer isso excluindo esse tópico e recriá-lo.

primeiro, verifique se o arquivo sever.properties possui e, se não, adicione delete.topic.enable=true

em seguida, excluir tópico bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

depois crie-o novamente.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
Manish Jaiswal
fonte
4

Às vezes, se você tiver um cluster saturado (muitas partições, ou usando dados de tópico criptografados, ou SSL, ou o controlador estiver em um nó inválido ou a conexão estiver irregular, levará muito tempo para limpar o tópico) .

Eu sigo estas etapas, principalmente se você estiver usando o Avro.

1: Execute com ferramentas kafka:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Executar no nó de registro do esquema:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: Coloque a retenção de tópicos de volta à configuração original, quando o tópico estiver vazio.

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

Espero que isso ajude alguém, pois não é facilmente anunciado.

Ben Coughlan
fonte
Nota: kafka-avro-console-consumernão é necessário
OneCricketeer 27/01/19
4

ATUALIZAÇÃO: Esta resposta é relevante para o Kafka 0.6. Para o Kafka 0.8 e posterior, consulte a resposta do @Patrick.

Sim, pare o kafka e exclua manualmente todos os arquivos do subdiretório correspondente (é fácil encontrá-lo no diretório de dados kafka). Após o kafka reiniciar, o tópico ficará vazio.

Incêndios
fonte
Isso requer derrubar o Broker e, na melhor das hipóteses, é um hack. A resposta de Steven Appleyard é realmente a melhor absoluta.
Jeff Maass
@MaasSql Eu concordo. :) Esta resposta tem dois anos, sobre a versão 0.6. As funcionalidades "alterar tópico" e "excluir tópico" foram implementadas posteriormente.
Wildfire
A resposta de Steven Appleyard é tão hacky quanto esta.
precisa saber é o seguinte
Ter um aplicativo gerenciando a exclusão de seus próprios dados de uma maneira suportada é muito menos invasivo do que desativar o aplicativo e excluir o que você acha que são todos os seus arquivos de dados e depois ativá-lo novamente.
Nick
3

A abordagem mais simples é definir que a data dos arquivos de log individuais seja mais antiga que o período de retenção. Em seguida, o corretor deve limpá-los e removê-los para você dentro de alguns segundos. Isso oferece várias vantagens:

  1. Não há necessidade de derrubar corretores, é uma operação de tempo de execução.
  2. Evita a possibilidade de exceções de deslocamento inválidas (mais sobre isso abaixo).

Na minha experiência com o Kafka 0.7.x, remover os arquivos de log e reiniciar o broker pode levar a exceções de deslocamento inválidas para determinados consumidores. Isso aconteceria porque o broker reiniciaria as compensações em zero (na ausência de arquivos de log existentes) e um consumidor que estava consumindo anteriormente do tópico se reconectaria para solicitar um deslocamento [uma vez válido] específico. Se esse deslocamento estiver fora dos limites dos novos logs de tópicos, nenhum dano será causado e o consumidor continuará no início ou no final. Porém, se o deslocamento cair dentro dos limites dos novos logs de tópico, o broker tentará buscar o conjunto de mensagens, mas falhará porque o deslocamento não se alinha a uma mensagem real.

Isso pode ser atenuado também limpando as compensações do consumidor no tratador para esse tópico. Mas se você não precisa de um tópico virgem e apenas deseja remover o conteúdo existente, simplesmente 'tocar' alguns logs de tópicos é muito mais fácil e confiável do que parar os intermediários, excluir logs de tópicos e limpar certos nós do tratador .

Andrew Carter
fonte
como "definir a data dos arquivos de log individuais como mais antigos que o período de retenção"? obrigado #
23416
3

O conselho de Thomas é ótimo, mas infelizmente zkClinas versões antigas do Zookeeper (por exemplo, 3.3.6) não parecem dar suporte rmr. Por exemplo, compare a implementação da linha de comando no Zookeeper moderno com a versão 3.3 .

Se você se deparar com uma versão antiga do Zookeeper, uma solução é usar uma biblioteca cliente como o zc.zk para Python. Para pessoas não familiarizadas com o Python, é necessário instalá-lo usando o pip ou o easy_install . Em seguida, inicie um shell Python ( python) e você pode fazer:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

ou mesmo

zk.delete_recursive('brokers')

se você deseja remover todos os tópicos do Kafka.

Mark Butler
fonte
2

Para limpar todas as mensagens de um tópico específico usando o seu grupo de aplicativos (GroupName deve ser o mesmo que o nome do grupo kafka do aplicativo).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

user4713340
fonte
Há um problema com essa abordagem (testada em 0.8.1.1). Se um aplicativo assinar dois (ou mais) tópicos: topic1 e topic2 e o consumidor do console limpar o topic1, infelizmente também excluirá o deslocamento do consumidor não relacionado para o topic2, que causa a reprodução de todas as mensagens do topic2.
JSH
2

Após a resposta do @steven appleyard, executei os seguintes comandos no Kafka 2.2.0 e eles funcionaram para mim.

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
abbas
fonte
Isso parece duplicar outras respostas
OneCricketeer 14/01
2

Muitas ótimas respostas por aqui, mas entre elas, não encontrei uma sobre o docker. Passei algum tempo para descobrir que o uso do contêiner do corretor está errado neste caso (obviamente !!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

e eu deveria ter usado em zookeeper:2181vez do --zookeeper localhost:2181meu arquivo de composição

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

o comando correto seria

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

Espero que economize o tempo de alguém.

Além disso, esteja ciente de que as mensagens não serão excluídas imediatamente e isso acontecerá quando o segmento do log for fechado.

Vladimir Semashkin
fonte
Você pode executar o corretor muito bem. O problema é localhost:2181... Por exemplo, você está entendendo errado os recursos de rede do Docker. Além disso, nem todos os contêineres do Zookeeper possuem kafka-topics, portanto, é melhor não usá-lo dessa maneira. As instalações mais recentes do Kafka permitem --bootstrap-serversalterar um tópico em vez de--zookeeper
OneCricketeer 14/01
1
Ainda assim, o executivo no contêiner do Zookeeper parece errado. you can use --zookeeper zookeeper: 2181` do contêiner Kafka é o meu ponto. Ou até mesmo cumprimentar a linha Zookeeper do arquivo server.properties
OneCricketeer 16/01
@ cricket_007 ei, obrigado por isso realmente, eu corrigi a resposta, deixe-me saber se algo ainda está errado por lá
Vladimir Semashkin
1

Não foi possível adicionar como comentário por causa do tamanho: Não tenho certeza se isso é verdade, além de atualizar retention.ms e retention.bytes, mas notei que a política de limpeza de tópicos deve ser "delete" (padrão), se "compact", ela será mantenha as mensagens por mais tempo, ou seja, se for "compacto", você deverá especificar também delete.retention.ms .

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

Também teve que monitorar as compensações mais antigas / mais recentes, devem ser as mesmas para confirmar que isso ocorreu com êxito, também pode verificar o du -h / tmp / kafka-logs / test-topic-3-100- *

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

O outro problema é que você precisa primeiro obter a configuração atual para se lembrar de reverter após a exclusão ser bem-sucedida: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

kisna
fonte
1

Outra abordagem, bastante manual, para eliminar um tópico é:

nos corretores:

  1. parar corretor kafka
    sudo service kafka stop
  2. excluir todos os arquivos de log da partição (isso deve ser feito em todos os intermediários)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

no tratador:

  1. executar interface de linha de comando zookeeper
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. use zkCli para remover os metadados do tópico
    rmr /brokers/topic/<some_topic_name>

nos corretores novamente:

  1. reiniciar o serviço do broker
    sudo service kafka start
Danny Mor
fonte
Você precisa parar e remover os arquivos de cada broker com uma réplica, o que significa que você pode ter um tempo de inatividade do cliente ao fazer isso.
OneCricketeer
1
você está correto, este apenas permite que você realmente veja onde algumas coisas são armazenadas e gerenciadas pelo Kafka. mas essa abordagem de força bruta definitivamente não é para um sistema em execução de produção.
Danny Mor
1
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

Isso deve dar retention.msconfigurado. Em seguida, você pode usar o comando alter acima para mudar para 1 segundo (e depois voltar ao padrão).

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000
tushararora19
fonte
1

No Java, usando o novo em AdminZkClientvez do descontinuado AdminUtils:

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }
Michael Böckling
fonte
Você não precisa do Zookeeper. Use AdminClientorKafkaAdminClient
OneCricketeer 14/01