Casos de uso de fluxos Kafka para adicionar armazenamento global

8

Ao definir uma topologia em fluxos kafka, um armazenamento de estado global pode ser adicionado. Ele precisará de um tópico de origem e também de um ProcessorSupplier. O processador recebe registros e pode teoricamente transformá-los antes de adicioná-los à loja. Porém, em caso de restauração, os registros são inseridos diretamente do tópico de origem (changelog) no armazenamento de estado global, ignorando eventuais transformações feitas no processador.

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder # addGlobalStore (StoreBuilder storeBuilder, tópico String, Consumido consumido, ProcessorSupplier stateUpdateSupplier) Adiciona um StateStore global à topologia.

Conforme documentação

NOTA: você não deve usar o Processador para inserir registros transformados no armazenamento de estado global . Este armazenamento usa o tópico de origem como log de alterações e durante a restauração inserirá registros diretamente da fonte . Este ProcessorNode deve ser usado para manter a StateStore atualizada.

Em paralelo, como o principal erro está atualmente aberto no rastreador de erros kafka: O processador personalizado KAFKA-7663 fornecido no addGlobalStore não é usado ao restaurar o estado do tópico, o que explica exatamente o que é indicado na documentação, mas parece ser um erro aceito.

Gostaria de saber se o KAFKA-7663 é realmente um bug ou não. De acordo com a documentação, parece ter sido projetado assim, nesse caso, eu luto para entender o caso de uso.
Alguém pode explicar os principais casos de uso dessa API de baixo nível? A única coisa em que consigo pensar é processar efeitos colaterais, como, por exemplo, fazer algumas operações de log no processador.

Pergunta de bônus: se o tópico de origem atuar como o log de alterações da loja global, quando um registro for excluído do tópico porque a retenção expirou, ele será removido do armazenamento de estado global? Ou a remoção só ocorrerá na loja após uma restauração completa da loja do changelog.

ampliação
fonte
2
Observe que a documentação mais antiga não estava indicando o problema e acabamos de atualizar o documento como "correção intermediária".
Matthias J. Sax

Respostas:

8

Sim, isso é um pouco estranho, mas a documentação está correta. O Processador para um armazenamento de estado global não deve fazer nada nos registros, mas persistir no armazenamento.

AFAIK, essa não é uma questão filosófica, apenas prática. O motivo é simplesmente o comportamento que você observa ... O Streams trata o tópico de entrada como um tópico do log de alterações do armazenamento e, portanto, ignora o processador (bem como a desserialização) durante a restauração.

A razão pela qual a restauração de estado ignora qualquer processamento é que geralmente os dados em um registro de alterações são idênticos aos dados no armazenamento, portanto, seria realmente errado fazer algo novo. Além disso, é mais eficiente apenas tirar os bytes do fio e gravá-los em massa nas lojas do estado. Eu digo "normalmente" porque, neste caso, o tópico de entrada não é exatamente como um tópico normal do changelog, pois não recebe suas gravações durante as vendas da loja.

Pelo que vale, também luto para entender o caso de uso. Aparentemente, devemos:

  1. Livre-se totalmente desse processador e sempre apenas despeje os dados binários dos fios nas lojas, assim como a restauração.
  2. Redesenhe as lojas globais para permitir transformações arbitrárias antes da loja global. Poderíamos:
    • continue a usar o tópico de entrada e desserialize e chame os processadores também durante a restauração, OU
    • adicione um registro de alterações real para lojas globais, para que pesquisássemos o tópico de entrada, aplicássemos algumas transformações e depois gravássemos na loja global e no registro de alterações global da loja. Em seguida, podemos usar o changelog (não a entrada) para restauração e replicação.

A propósito, se você deseja o último comportamento, pode aproximar-se agora aplicando suas transformações e usando to(my-global-changelog)para fabricar um tópico "changelog". Em seguida, você criaria a loja global para ler a partir da sua em my-global-changelogvez da entrada.

Portanto, para lhe dar uma resposta direta, o KAFKA-7663 não é um bug. Vou comentar sobre o ticket propondo transformá-lo em uma solicitação de recurso.

Resposta bônus: Os tópicos que atuam como registros de alterações para armazenamentos estaduais não devem ser configurados com retenção. Na prática, isso significa que você deve impedir o crescimento infinito ativando a compactação e desabilitando a retenção de logs.

Na prática, dados antigos perdendo a retenção e sendo descartados não são um "evento", e os consumidores não têm como saber se / quando isso acontece. Portanto, não é possível remover dados dos armazenamentos estaduais em resposta a esse não evento. Isso aconteceria como você descreve ... os registros ficariam parados na loja global indefinidamente. Se / quando uma instância for substituída, a nova seria restaurada a partir da entrada e (obviamente) somente receberia registros existentes no tópico naquele momento. Assim, o cluster Streams como um todo acabaria com uma visão inconsistente do estado global. É por isso que você deve desativar a retenção.

A maneira correta de "descartar" dados antigos da loja seria apenas escrever uma lápide para a chave desejada no tópico de entrada. Isso seria propagado corretamente para todos os membros do cluster, aplicado corretamente durante a restauração e compactado corretamente pelos intermediários.

Espero que tudo isso ajude. Definitivamente, fale com o ticket e ajude-nos a moldar a API para ser mais intuitiva!

John
fonte
Sim, definitivamente ajuda muito. Obrigado por esta resposta detalhada :)
zoom
2
Para esclarecer "Os tópicos que atuam como registros de alterações para armazenamentos de estado não devem ser configurados com retenção". Isso significa que você não deve configurar o tópico para expirar os dados após um certo período de tempo ou após um certo limite de tamanho ter sido excedido. Em vez disso, os dados devem ser mantidos "para sempre" no tópico, e a habilitação da compactação ajuda a garantir que o tópico ainda não possa crescer fora dos limites.
Michael G. Noll
Eu estava procurando a explicação. Muito obrigado
SunilS