Consuma a mesma mensagem novamente se o processamento da mensagem falhar

10

Estou usando o cliente Confluent.Kafka .NET versão 1.3.0. Estou seguindo os documentos :

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "server1, server2",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = true,
    EnableAutoOffsetStore = false,
    GroupId = this.groupId,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = this.kafkaUsername,
    SaslPassword = this.kafkaPassword,
};

using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
    var cancellationToken = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cancellationToken.Cancel();
    };

    consumer.Subscribe("my-topic");
    while (true)
    {
        try
        {
            var consumerResult = consumer.Consume();
            // process message
            consumer.StoreOffset(consumerResult);
        }
        catch (ConsumeException e)
        {
            // log
        }
        catch (KafkaException e)
        {
            // log
        }
        catch (OperationCanceledException e)
        {
            // log
        }
    }
}

O problema é que, mesmo que eu comente a linha consumer.StoreOffset(consumerResult);, continuo recebendo a próxima mensagem não consumida na próxima vez que consumir , ou seja, o deslocamento continua aumentando, o que não parece ser o que a documentação afirma, ou seja, pelo menos uma entrega .

Mesmo que eu defina EnableAutoCommit = falsee remova 'EnableAutoOffsetStore = false' da configuração e substitua consumer.StoreOffset(consumerResult)por consumer.Commit(), ainda vejo o mesmo comportamento, ou seja, mesmo que eu comente Commit, continuo recebendo as próximas mensagens não consumidas.

Sinto que estou perdendo algo fundamental aqui, mas não consigo entender o que. Qualquer ajuda é apreciada!

havij
fonte
As mensagens já foram retornadas ao aplicativo do ponto de vista kafka; portanto, quando você confirma, elas são salvas como as últimas compensações confirmadas, mas o consumo continuará retornando as próximas mensagens, independentemente de você ter consumido ou não. Qual é a sua expectativa aqui? Poderia, por favor, elaborar o que você espera que aconteça antes / depois de confirmar e consumir?
Sagar Veeram 12/03
As mensagens não são recuperadas até que você use a busca de deslocamento. Isso afetará o consumo e as mensagens serão retornadas do deslocamento de busca.
Sagar Veeram 12/03
@ user2683814 Na minha postagem, mencionei dois cenários, dependendo do que EnableAutoCommitestá definido. Digamos que sim EnableAutoCommit = false, e quando eu Consumerecebo de volta a mensagem com deslocamento 11. Eu esperava continuar recebendo a mesma mensagem com o deslocamento 11 repetidas vezes, se o processamento da mensagem continuar sendo exibido e, portanto, não Commitfor feita a chamada .
havij 12/03
Não, esse não é o caso. Você não pode controlar o que poll ( Consume) usando Commitdepois de já ter discutido o Subscribetópico. O Kafka (como na biblioteca do cliente) nos bastidores mantém todos os deslocamentos que ele enviou ao aplicativo Consumee os envia linearmente. Portanto, para reprocessar uma mensagem como em um cenário de falha, você deve rastreá-la em seu código, procurar compensar e iniciar o processamento da mensagem e também saber o que pular se já tiver sido processado em solicitações anteriores. Não estou familiarizado com a biblioteca .net, mas isso realmente não deve importar, pois esse é o design kafka.
Sagar Veeram 12/03
Eu acho que você precisa usar a combinação de assinatura e atribuição e pode precisar de diferentes consumidores para dar suporte ao seu caso de uso. Em caso de falhas, use atribuir / procurar compensar partições de tópicos com um consumidor para reprocessar mensagens e, para processamento normal, usar outro consumidor com fluxo de assinatura / consumo / confirmação.
Sagar Veeram 12/03

Respostas:

0

Convém ter uma lógica de repetição para processar cada uma de suas mensagens por um número fixo de vezes, como digamos 5. Se não for bem-sucedida durante essas 5 tentativas, você poderá adicionar esta mensagem a outro tópico para lidar com todos mensagens com falha que têm precedência sobre o tópico atual. Ou você pode adicionar a mensagem com falha ao mesmo tópico para que ela seja capturada mais tarde quando todas as outras mensagens forem consumidas.

Se o processamento de qualquer mensagem for bem-sucedido nessas 5 tentativas, você poderá pular para a próxima mensagem na fila.

Raju Dasupally
fonte