Estou usando o Apache Kafka para mensagens. Implementei o produtor e o consumidor em Java. Como podemos obter o número de mensagens em um tópico?
java
messages
apache-kafka
Chetan
fonte
fonte
Não é java, mas pode ser útil
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker>: <port> --topic <topic-name> --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
fonte
bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}' 13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}' 12434609
E então a diferença retorna mensagens pendentes reais no tópico? Estou correcto?Na verdade, eu uso isso para comparar meu POC. O item que você deseja usar ConsumerOffsetChecker. Você pode executá-lo usando o script bash como abaixo.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group testgroup
E abaixo está o resultado: Como você pode ver na caixa vermelha, 999 é o número de mensagens atualmente no tópico.
Atualização: ConsumerOffsetChecker está obsoleto desde 0.10.0, você pode querer começar a usar ConsumerGroupCommand.
fonte
Como
ConsumerOffsetChecker
não é mais compatível, você pode usar este comando para verificar todas as mensagens no tópico:bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \ --group my-group \ --bootstrap-server localhost:9092 \ --describe
Onde
LAG
está a contagem de mensagens na partição do tópico:Você também pode tentar usar o kafkacat . Este é um projeto de código aberto que pode ajudá-lo a ler mensagens de um tópico e partição e imprimi-los em stdout. Aqui está um exemplo que lê as últimas 10 mensagens do
sample-kafka-topic
tópico e depois sai:kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
fonte
Às vezes, o interesse é saber o número de mensagens em cada partição, por exemplo, ao testar um particionador personalizado. As etapas seguintes foram testadas para funcionar com o Kafka 0.10.2.1-2 do Confluent 3.2. Dado um tópico Kafka
kt
e a seguinte linha de comando:$ kafka-run-class kafka.tools.GetOffsetShell \ --broker-list host01:9092,host02:9092,host02:9092 --topic kt
Isso imprime a saída de amostra mostrando a contagem de mensagens nas três partições:
kt:2:6138 kt:1:6123 kt:0:6137
O número de linhas pode ser mais ou menos dependendo do número de partições para o tópico.
fonte
Use https://prestodb.io/docs/current/connector/kafka-tutorial.html
Um super motor SQL, fornecido pelo Facebook, que se conecta a várias fontes de dados (Cassandra, Kafka, JMX, Redis ...).
PrestoDB está sendo executado como um servidor com trabalhadores opcionais (há um modo autônomo sem trabalhadores extras), então você usa um pequeno JAR executável (chamado presto CLI) para fazer consultas.
Depois de configurar bem o servidor Presto, você pode usar o SQL tradicional:
SELECT count(*) FROM TOPIC_NAME;
fonte
Comando Apache Kafka para obter mensagens não tratadas em todas as partições de um tópico:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group
Impressões:
Group Topic Pid Offset logSize Lag Owner test_group test 0 11051 11053 2 none test_group test 1 10810 10812 2 none test_group test 2 11027 11028 1 none
A coluna 6 são as mensagens não tratadas. Adicione-os assim:
kafka-run-class kafka.tools.ConsumerOffsetChecker --topic test --zookeeper localhost:2181 --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} END {print sum}'
awk lê as linhas, pula a linha do cabeçalho e adiciona a 6ª coluna e no final imprime a soma.
Impressões
5
fonte
Execute o seguinte (supondo que
kafka-console-consumer.sh
esteja no caminho):kafka-console-consumer.sh --from-beginning \ --bootstrap-server yourbroker:9092 --property print.key=true \ --property print.value=false --property print.partition \ --topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
fonte
--new-consumer
porque essa opção não está mais disponível (ou aparentemente necessária)Para obter todas as mensagens armazenadas para o tópico pode-se buscar o consumidor no início e no final do stream de cada partição e somar os resultados
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream() .map(p -> new TopicPartition(topic, p.partition())) .collect(Collectors.toList()); consumer.assign(partitions); consumer.seekToEnd(Collections.emptySet()); Map<TopicPartition, Long> endPartitions = partitions.stream() .collect(Collectors.toMap(Function.identity(), consumer::position)); consumer.seekToBeginning(Collections.emptySet()); System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
fonte
Usando o cliente Java de Kafka 2.11-1.0.0, você pode fazer o seguinte:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); while(true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // after each message, query the number of messages of the topic Set<TopicPartition> partitions = consumer.assignment(); Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions); for(TopicPartition partition : offsets.keySet()) { System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition)); } } }
O resultado é mais ou menos assim:
offset = 10, key = null, value = un partition test is at 13 offset = 11, key = null, value = deux partition test is at 13 offset = 12, key = null, value = trois partition test is at 13
fonte
seekToEnd(..)
eseekToBeginning(..)
métodos que mudam o estado daconsumer
.Nas versões mais recentes do Kafka Manager, há uma coluna intitulada Summed Recent Offsets .
fonte
Eu tive essa mesma dúvida e é assim que estou fazendo isso, de um KafkaConsumer, em Kotlin:
Código muito aproximado, já que acabei de fazer isso funcionar, mas basicamente você deseja subtrair o deslocamento inicial do tópico do deslocamento final e essa será a contagem de mensagens atual para o tópico.
Você não pode simplesmente confiar no deslocamento final por causa de outras configurações (política de limpeza, retenção-ms, etc.) que podem acabar causando a exclusão de mensagens antigas do seu tópico. Os deslocamentos apenas "movem" para a frente, portanto, é o deslocamento inicial que avançará próximo ao deslocamento final (ou eventualmente para o mesmo valor, se o tópico não contiver nenhuma mensagem agora).
Basicamente, o deslocamento final representa o número geral de mensagens que passaram por esse tópico, e a diferença entre os dois representa o número de mensagens que o tópico contém no momento.
fonte
Trechos de documentos de Kafka
Suspensão de uso em 0.9.0.0
O kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) foi descontinuado. No futuro, use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) para esta funcionalidade.
Estou executando o corretor Kafka com SSL habilitado para servidor e cliente. Abaixo do comando eu uso
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
onde / tmp / ssl_config é como abaixo
fonte
Se você tiver acesso à interface JMX do servidor, os deslocamentos inicial e final estão presentes em:
(você precisa substituir
TOPICNAME
&PARTITIONNUMBER
). Lembre-se de que você precisa verificar cada uma das réplicas de determinada partição ou descobrir qual dos brokers é o líder de uma determinada partição (e isso pode mudar com o tempo).Como alternativa, você pode usar os métodos Kafka Consumer
beginningOffsets
eendOffsets
.fonte
Eu não tentei fazer isso sozinho, mas parece fazer sentido.
Você também pode usar
kafka.tools.ConsumerOffsetChecker
( fonte ).fonte
A maneira mais simples que encontrei é usar a API REST do Kafdrop
/topic/topicName
e especificar o cabeçalho key:"Accept"
/ value:"application/json"
para obter uma resposta JSON.Isso está documentado aqui .
fonte
Você pode usar kafkatool . Verifique este link -> http://www.kafkatool.com/download.html
fonte