Estou usando o cliente Confluent.Kafka .NET versão 1.3.0. Estou seguindo os documentos :
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "server1, server2",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = this.groupId,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = this.kafkaUsername,
SaslPassword = this.kafkaPassword,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
var cancellationToken = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationToken.Cancel();
};
consumer.Subscribe("my-topic");
while (true)
{
try
{
var consumerResult = consumer.Consume();
// process message
consumer.StoreOffset(consumerResult);
}
catch (ConsumeException e)
{
// log
}
catch (KafkaException e)
{
// log
}
catch (OperationCanceledException e)
{
// log
}
}
}
O problema é que, mesmo que eu comente a linha consumer.StoreOffset(consumerResult);
, continuo recebendo a próxima mensagem não consumida na próxima vez que consumir , ou seja, o deslocamento continua aumentando, o que não parece ser o que a documentação afirma, ou seja, pelo menos uma entrega .
Mesmo que eu defina EnableAutoCommit = false
e remova 'EnableAutoOffsetStore = false' da configuração e substitua consumer.StoreOffset(consumerResult)
por consumer.Commit()
, ainda vejo o mesmo comportamento, ou seja, mesmo que eu comente Commit
, continuo recebendo as próximas mensagens não consumidas.
Sinto que estou perdendo algo fundamental aqui, mas não consigo entender o que. Qualquer ajuda é apreciada!
EnableAutoCommit
está definido. Digamos que simEnableAutoCommit = false
, e quando euConsume
recebo de volta a mensagem com deslocamento 11. Eu esperava continuar recebendo a mesma mensagem com o deslocamento 11 repetidas vezes, se o processamento da mensagem continuar sendo exibido e, portanto, nãoCommit
for feita a chamada .Consume
) usandoCommit
depois de já ter discutido oSubscribe
tópico. O Kafka (como na biblioteca do cliente) nos bastidores mantém todos os deslocamentos que ele enviou ao aplicativoConsume
e os envia linearmente. Portanto, para reprocessar uma mensagem como em um cenário de falha, você deve rastreá-la em seu código, procurar compensar e iniciar o processamento da mensagem e também saber o que pular se já tiver sido processado em solicitações anteriores. Não estou familiarizado com a biblioteca .net, mas isso realmente não deve importar, pois esse é o design kafka.Respostas:
Desculpe, ainda não posso adicionar comentários. O consumidor Kafka consome mensagens em lotes, portanto , talvez você ainda itere o lote pré-buscado pelo encadeamento em segundo plano .
Você pode verificar se o seu consumidor realmente confirma o deslocamento ou não usando o kafka util
kafka-consumer-groups.sh
fonte
Convém ter uma lógica de repetição para processar cada uma de suas mensagens por um número fixo de vezes, como digamos 5. Se não for bem-sucedida durante essas 5 tentativas, você poderá adicionar esta mensagem a outro tópico para lidar com todos mensagens com falha que têm precedência sobre o tópico atual. Ou você pode adicionar a mensagem com falha ao mesmo tópico para que ela seja capturada mais tarde quando todas as outras mensagens forem consumidas.
Se o processamento de qualquer mensagem for bem-sucedido nessas 5 tentativas, você poderá pular para a próxima mensagem na fila.
fonte