Ao definir uma topologia em fluxos kafka, um armazenamento de estado global pode ser adicionado. Ele precisará de um tópico de origem e também de um ProcessorSupplier
. O processador recebe registros e pode teoricamente transformá-los antes de adicioná-los à loja. Porém, em caso de restauração, os registros são inseridos diretamente do tópico de origem (changelog) no armazenamento de estado global, ignorando eventuais transformações feitas no processador.
+-------------+ +-------------+ +---------------+
| | | | | global |
|source topic -------------> processor +--------------> state |
|(changelog) | | | | store |
+-------------+ +-------------+ +---------------+
| ^
| |
+---------------------------------------------------------+
record directly inserted during restoration
StreamsBuilder # addGlobalStore (StoreBuilder storeBuilder, tópico String, Consumido consumido, ProcessorSupplier stateUpdateSupplier) Adiciona um StateStore global à topologia.
Conforme documentação
NOTA: você não deve usar o Processador para inserir registros transformados no armazenamento de estado global . Este armazenamento usa o tópico de origem como log de alterações e durante a restauração inserirá registros diretamente da fonte . Este ProcessorNode deve ser usado para manter a StateStore atualizada.
Em paralelo, como o principal erro está atualmente aberto no rastreador de erros kafka: O processador personalizado KAFKA-7663 fornecido no addGlobalStore não é usado ao restaurar o estado do tópico, o que explica exatamente o que é indicado na documentação, mas parece ser um erro aceito.
Gostaria de saber se o KAFKA-7663 é realmente um bug ou não. De acordo com a documentação, parece ter sido projetado assim, nesse caso, eu luto para entender o caso de uso.
Alguém pode explicar os principais casos de uso dessa API de baixo nível? A única coisa em que consigo pensar é processar efeitos colaterais, como, por exemplo, fazer algumas operações de log no processador.
Pergunta de bônus: se o tópico de origem atuar como o log de alterações da loja global, quando um registro for excluído do tópico porque a retenção expirou, ele será removido do armazenamento de estado global? Ou a remoção só ocorrerá na loja após uma restauração completa da loja do changelog.
fonte
Respostas:
Sim, isso é um pouco estranho, mas a documentação está correta. O Processador para um armazenamento de estado global não deve fazer nada nos registros, mas persistir no armazenamento.
AFAIK, essa não é uma questão filosófica, apenas prática. O motivo é simplesmente o comportamento que você observa ... O Streams trata o tópico de entrada como um tópico do log de alterações do armazenamento e, portanto, ignora o processador (bem como a desserialização) durante a restauração.
A razão pela qual a restauração de estado ignora qualquer processamento é que geralmente os dados em um registro de alterações são idênticos aos dados no armazenamento, portanto, seria realmente errado fazer algo novo. Além disso, é mais eficiente apenas tirar os bytes do fio e gravá-los em massa nas lojas do estado. Eu digo "normalmente" porque, neste caso, o tópico de entrada não é exatamente como um tópico normal do changelog, pois não recebe suas gravações durante as vendas da loja.
Pelo que vale, também luto para entender o caso de uso. Aparentemente, devemos:
A propósito, se você deseja o último comportamento, pode aproximar-se agora aplicando suas transformações e usando
to(my-global-changelog)
para fabricar um tópico "changelog". Em seguida, você criaria a loja global para ler a partir da sua emmy-global-changelog
vez da entrada.Portanto, para lhe dar uma resposta direta, o KAFKA-7663 não é um bug. Vou comentar sobre o ticket propondo transformá-lo em uma solicitação de recurso.
Resposta bônus: Os tópicos que atuam como registros de alterações para armazenamentos estaduais não devem ser configurados com retenção. Na prática, isso significa que você deve impedir o crescimento infinito ativando a compactação e desabilitando a retenção de logs.
Na prática, dados antigos perdendo a retenção e sendo descartados não são um "evento", e os consumidores não têm como saber se / quando isso acontece. Portanto, não é possível remover dados dos armazenamentos estaduais em resposta a esse não evento. Isso aconteceria como você descreve ... os registros ficariam parados na loja global indefinidamente. Se / quando uma instância for substituída, a nova seria restaurada a partir da entrada e (obviamente) somente receberia registros existentes no tópico naquele momento. Assim, o cluster Streams como um todo acabaria com uma visão inconsistente do estado global. É por isso que você deve desativar a retenção.
A maneira correta de "descartar" dados antigos da loja seria apenas escrever uma lápide para a chave desejada no tópico de entrada. Isso seria propagado corretamente para todos os membros do cluster, aplicado corretamente durante a restauração e compactado corretamente pelos intermediários.
Espero que tudo isso ajude. Definitivamente, fale com o ticket e ajude-nos a moldar a API para ser mais intuitiva!
fonte