Salvando Eventos de Alta Frequência em um Banco de Dados com Limite de Conexão

13

Temos uma situação em que tenho que lidar com um influxo maciço de eventos chegando ao nosso servidor, a cerca de 1000 eventos por segundo, em média (o pico pode ser ~ 2000).

O problema

Nosso sistema está hospedado no Heroku e usa um Heroku Postgres DB relativamente caro , que permite um máximo de 500 conexões de DB. Usamos o pool de conexões para conectar-se do servidor ao banco de dados.

Os eventos ocorrem mais rapidamente do que o pool de conexões do banco de dados pode suportar

O problema que temos é que os eventos são mais rápidos do que o pool de conexões pode suportar. Quando uma conexão termina a viagem de ida e volta da rede do servidor para o banco de dados, para que possa ser liberada de volta ao pool, mais neventos adicionais ocorrem.

Eventualmente, os eventos se acumulam, esperando para serem salvos e, como não há conexões disponíveis no pool, eles atingem o tempo limite e todo o sistema fica inoperante.

Resolvemos a emergência emitindo os eventos ofensivos de alta frequência em um ritmo mais lento dos clientes, mas ainda queremos saber como lidar com esses cenários no caso de precisarmos lidar com esses eventos de alta frequência.

Restrições

Outros clientes podem querer ler eventos simultaneamente

Outros clientes solicitam continuamente a leitura de todos os eventos com uma chave específica, mesmo que ainda não estejam salvos no banco de dados.

Um cliente pode consultar GET api/v1/events?clientId=1e obter todos os eventos enviados pelo cliente 1, mesmo que esses eventos ainda não tenham sido salvos no banco de dados.

Existem exemplos de "sala de aula" sobre como lidar com isso?

Soluções possíveis

Enfileire os eventos em nosso servidor

Podemos enfileirar os eventos no servidor (com a fila tendo uma simultaneidade máxima de 400 para que o pool de conexões não se esgote).

Essa é uma má ideia porque:

  • Ele consumirá a memória disponível do servidor. Os eventos enfileirados empilhados consumirão grandes quantidades de RAM.
  • Nossos servidores são reiniciados uma vez a cada 24 horas . Este é um limite rígido imposto pelo Heroku. O servidor pode reiniciar enquanto os eventos estão na fila, causando a perda dos eventos na fila.
  • Introduz o estado no servidor, prejudicando a escalabilidade. Se tivermos uma configuração para vários servidores e um cliente quiser ler todos os eventos enfileirados + salvos, não saberemos em qual servidor os eventos enfileirados estão.

Use uma fila de mensagens separada

Suponho que poderíamos usar uma fila de mensagens (como o RabbitMQ ?), Onde bombeamos as mensagens e, por outro lado, existe outro servidor que trata apenas de salvar os eventos no banco de dados.

Não tenho certeza se as filas de mensagens permitem a consulta de eventos em fila de espera (que ainda não foram salvos). Se outro cliente quiser ler as mensagens de outro cliente, posso obter as mensagens salvas do banco de dados e as mensagens pendentes da fila e concatená-los juntos para que eu possa enviá-los de volta ao cliente de solicitação de leitura.

Use vários bancos de dados, cada um salvando uma parte das mensagens com um servidor coordenador de banco de dados central para gerenciá-las

Outra solução que temos é usar vários bancos de dados, com um "coordenador de banco de dados / balanceador de carga" central. Ao receber um evento, esse coordenador escolheria um dos bancos de dados para escrever a mensagem. Isso deve permitir o uso de vários bancos de dados Heroku, aumentando o limite de conexão para 500 x número de bancos de dados.

Em uma consulta de leitura, esse coordenador pode emitir SELECTconsultas para cada banco de dados, mesclar todos os resultados e enviá-los de volta ao cliente que solicitou a leitura.

Essa é uma má ideia porque:

  • Essa ideia parece ... ahem ... excesso de engenharia? Seria um pesadelo para gerenciar também (backups etc.). É complicado criar e manter e, a menos que seja absolutamente necessário, soa como uma violação do KISS .
  • Sacrifica a consistência . Fazer transações em vários bancos de dados não é possível se seguirmos essa ideia.
Nik Kyriakides
fonte
3
Onde está o seu gargalo? Você está mencionando seu conjunto de conexões, mas isso influencia apenas o paralelismo, não a velocidade por inserção. Se você tiver 500 conexões e, por exemplo, 2000QPS, isso funcionará bem se cada consulta for concluída dentro de 250ms, o que é muito demorado. Por que isso está acima de 15ms? Observe também que, ao usar um PaaS, você oferece oportunidades significativas de otimização, como dimensionar o hardware do banco de dados ou usar réplicas de leitura para reduzir a carga no banco de dados primário. Heroku não vale a pena, a menos que a implantação seja seu maior problema.
amon
@ amon O gargalo é realmente o pool de conexão. Eu executei ANALYZEas próprias consultas e elas não são um problema. Também construí um protótipo para testar a hipótese do conjunto de conexões e verifiquei que esse é realmente o problema. O banco de dados e o próprio servidor vivem em máquinas diferentes, daí a latência. Além disso, não queremos desistir do Heroku, a menos que seja absolutamente necessário, não estar preocupado com implantações é uma grande vantagem para nós.
Nik Kyriakides 22/09
1
Dito isto, entendo que existem micro otimizações que eu poderia fazer que me ajudarão a resolver o problema atual . Gostaria de saber se existe uma solução arquitetural escalável para o meu problema.
Nik Kyriakides
3
Como exatamente você verificou se o pool de conexões é o problema? @amon está correto em seus cálculos. Tente emitir select nullem 500 conexões. Aposto que você encontrará que o pool de conexões não é o problema lá.
usr
1
Se selecionar nulo for problemático, provavelmente você está certo. Embora fosse interessante onde todo esse tempo é gasto. Nenhuma rede é tão lenta.
usr

Respostas:

9

Fluxo de entrada

Não está claro se seus 1000 eventos / segundo representam picos ou se é uma carga contínua:

  • se for um pico, você pode usar uma fila de mensagens como buffer para espalhar a carga no servidor de banco de dados por um período maior;
  • se for carga constante, a fila de mensagens sozinha não é suficiente, porque o servidor do banco de dados nunca poderá recuperar o atraso. Então você precisaria pensar em um banco de dados distribuído.

Solução proposta

Intuitivamente, nos dois casos, eu optaria por um fluxo de eventos baseado em Kafka :

  • Todos os eventos são publicados sistematicamente em um tópico kafka
  • Um consumidor assinaria os eventos e os armazenaria no banco de dados.
  • Um processador de consultas manipulará as solicitações dos clientes e consultará o banco de dados.

Isso é altamente escalável em todos os níveis:

  • Se o servidor de banco de dados for o gargalo, basta adicionar vários consumidores. Cada um pode se inscrever no tópico e gravar em um servidor de banco de dados diferente. No entanto, se a distribuição ocorrer aleatoriamente entre os servidores de banco de dados, o processador de consultas não poderá prever o servidor de banco de dados a ser utilizado e precisará consultar vários servidores de banco de dados. Isso pode levar a um novo gargalo no lado da consulta.
  • Portanto, o esquema de distribuição do banco de dados pode ser antecipado organizando o fluxo de eventos em vários tópicos (por exemplo, usando grupos de chaves ou propriedades, para particionar o banco de dados de acordo com uma lógica previsível).
  • Se um servidor de mensagens não for suficiente para lidar com uma crescente inundação de eventos de entrada, você poderá adicionar partições kafka para distribuir tópicos kafka em vários servidores físicos.

Oferecendo eventos ainda não gravados no banco de dados para clientes

Você deseja que seus clientes possam acessar também as informações ainda no canal e ainda não gravadas no banco de dados. Isso é um pouco mais delicado.

Opção 1: usando um cache para complementar consultas de banco de dados

Não analisei em profundidade, mas a primeira ideia que me veio à mente seria tornar o (s) processador (es) de consulta um (s) consumidor (es) dos tópicos kafka, mas em um grupo diferente de consumidores kafka . O processador de solicitação receberá todas as mensagens que o gravador de banco de dados receberá, mas de forma independente. Ele poderia mantê-los em um cache local. As consultas seriam executadas no cache do banco de dados + (+ eliminação de duplicatas).

O design ficaria assim:

insira a descrição da imagem aqui

A escalabilidade dessa camada de consulta pode ser alcançada adicionando mais processadores de consulta (cada um em seu próprio grupo de consumidores).

Opção 2: projetar uma API dupla

Uma abordagem melhor do IMHO seria oferecer uma API dupla (use o mecanismo do grupo de consumidores separado):

  • uma API de consulta para acessar eventos no banco de dados e / ou fazer análises
  • uma API de streaming que apenas encaminha mensagens diretamente do tópico

A vantagem é que você deixa o cliente decidir o que é interessante. Isso pode evitar que você mescle sistematicamente os dados do banco de dados com os dados recém-descontados, quando o cliente estiver interessado apenas em novos eventos recebidos. Se a mesclagem delicada entre eventos novos e arquivados for realmente necessária, o cliente precisará organizá-lo.

Variantes

Propus o kafka porque foi projetado para volumes muito altos com mensagens persistentes, para que você possa reiniciar os servidores, se necessário.

Você pode construir uma arquitetura semelhante com o RabbitMQ. No entanto, se você precisar de filas persistentes, isso poderá diminuir o desempenho . Além disso, até onde eu sei, a única maneira de obter o consumo paralelo das mesmas mensagens por vários leitores (por exemplo, gravador + cache) com o RabbitMQ é clonar as filas . Portanto, uma maior escalabilidade pode ter um preço mais alto.

Christophe
fonte
Estelar; Como assim a distributed database (for example using a specialization of the server by group of keys)? Também porque Kafka em vez de RabbitMQ? Existe uma razão específica para escolher uma sobre a outra?
Nik Kyriakides 23/09
@NicholasKyriakides Thanks! 1) Eu estava simplesmente pensando em vários servidores de banco de dados independentes, mas com um esquema de particionamento claro (chave, geografia etc.) que poderia ser usado para despachar efetivamente os comandos. 2) Intuitivamente , talvez porque o Kafka seja projetado para uma taxa de transferência muito alta, com mensagens persistentes que precisam reiniciar seus servidores?). Eu não tenho certeza de que RabbitMQ é tão flexível para os cenários distribuídos, e as filas persistentes diminuir o desempenho
Christophe
Para 1) Portanto, isso é bastante semelhante à minha Use multiple databasesideia, mas você está dizendo que eu não deveria distribuir aleatoriamente (ou round-robin) as mensagens para cada um dos bancos de dados. Certo?
Nik Kyriakides 23/09
Sim. Meu primeiro pensamento seria não optar pela distribuição aleatória, porque isso poderia aumentar a carga de processamento para as consultas (ou seja, consulta de vários bancos de dados múltiplos na maioria das vezes). Você também pode considerar os mecanismos de banco de dados distribuídos (por exemplo, Ignite?). Mas fazer qualquer escolha informada exigiria uma boa compreensão dos padrões de uso do banco de dados (o que mais existe no banco de dados, com que frequência é consultado, que tipo de consulta, existem restrições transacionais além de eventos individuais, etc ...).
Christophe
3
Só quero dizer que, embora o kafka possa proporcionar uma taxa de transferência muito alta, provavelmente está além das necessidades da maioria das pessoas. Eu descobri que lidar com o kafka e sua API foi um grande erro para nós. RabbitMQ não é desleixo e tem interface que você esperaria de um MQ
imel96
11

Meu palpite é que você precisa explorar mais cuidadosamente uma abordagem que você rejeitou

  • Enfileire os eventos em nosso servidor

Minha sugestão seria começar a ler os vários artigos publicados sobre a arquitetura LMAX . Eles conseguiram fazer o trabalho em lotes de alto volume para seus casos de uso, e pode ser possível fazer com que suas compensações se pareçam mais com as deles.

Além disso, você pode querer ver se consegue tirar as leituras do caminho - idealmente, você gostaria de poder escalá-las independentemente das gravações. Isso pode significar examinar o CQRS (segregação de responsabilidade da consulta de comando).

O servidor pode reiniciar enquanto os eventos estão na fila, causando a perda dos eventos na fila.

Em um sistema distribuído, acho que você pode ter certeza de que as mensagens serão perdidas. Você pode mitigar parte do impacto disso, ponderando suas barreiras de sequência (por exemplo - garantindo que a gravação no armazenamento durável ocorra antes que o evento seja compartilhado fora do sistema).

  • Use vários bancos de dados, cada um salvando uma parte das mensagens com um servidor coordenador de banco de dados central para gerenciá-las

Talvez - seria mais provável que você analisasse os limites de sua empresa para ver se há lugares naturais para fragmentar os dados.

Há casos em que a perda de dados é uma troca aceitável?

Bem, suponho que poderia haver, mas não era para onde eu estava indo. O ponto é que o design deveria ter incorporado a robustez necessária para progredir diante da perda de mensagens.

O que isso geralmente parece é um modelo baseado em pull com notificações. O provedor grava as mensagens em uma loja durável solicitada. O consumidor puxa as mensagens da loja, acompanhando sua própria marca d'água. As notificações por push são usadas como um dispositivo de redução de latência - mas se a notificação for perdida, a mensagem ainda será buscada (eventualmente) porque o consumidor está recebendo um agendamento regular (a diferença é que, se a notificação for recebida, a solicitação ocorrerá mais cedo) )

Consulte Mensagens confiáveis ​​sem transações distribuídas, de Udi Dahan (já referenciado por Andy ) e Dados poliglotas de Greg Young.

VoiceOfUnreason
fonte
In a distributed system, I think you can be pretty confident that messages are going to get lost. Verdade? Há casos em que a perda de dados é uma troca aceitável? Fiquei com a impressão de que perder dados = falha.
Nik Kyriakides
1
@NicholasKyriakides, geralmente não é aceitável, portanto, a OP sugeriu a possibilidade de gravar em uma loja durável antes de emitir o evento. Confira este artigo e este vídeo de Udi Dahan, onde ele aborda o problema com mais detalhes.
Andy
6

Se bem entendi, o fluxo atual é:

  1. Receber e evento (presumo que através de HTTP?)
  2. Solicite uma conexão do pool.
  3. Inserir o evento no banco de dados
  4. Libere a conexão com o pool.

Nesse caso, acho que a primeira alteração no design seria parar de ter seu código de manipulação uniforme retornando conexões ao pool em todos os eventos. Em vez disso, crie um pool de processos / threads de inserção 1 a 1 com o número de conexões com o banco de dados. Cada um deles manterá uma conexão de banco de dados dedicada.

Usando algum tipo de fila simultânea, você faz com que esses encadeamentos puxem as mensagens da fila simultânea e as insiram. Em teoria, eles nunca precisam retornar a conexão ao pool ou solicitar uma nova, mas você pode precisar criar um tratamento caso a conexão fique ruim. Pode ser mais fácil eliminar o encadeamento / processo e iniciar um novo.

Isso deve eliminar efetivamente a sobrecarga do conjunto de conexões. Obviamente, você precisará executar pelo menos 1000 eventos / conexões por segundo em cada conexão. Convém tentar diferentes números de conexões, pois ter 500 conexões trabalhando nas mesmas tabelas pode criar uma contenção no banco de dados, mas essa é uma questão totalmente diferente. Outra coisa a considerar é o uso de inserções em lote, ou seja, cada thread extrai várias mensagens e as envia de uma só vez. Além disso, evite ter várias conexões tentando atualizar as mesmas linhas.

JimmyJames
fonte
5

Premissas

Vou assumir que a carga que você descreve é ​​constante, pois esse é o cenário mais difícil de resolver.

Também vou assumir que você tem alguma maneira de executar cargas de trabalho de longa duração acionadas fora do processo de seu aplicativo da web.

Solução

Supondo que você tenha identificado corretamente seu gargalo - latência entre seu processo e o banco de dados do Postgres - esse é o principal problema a ser resolvido. A solução precisa levar em conta sua restrição de consistência com outros clientes que desejam ler os eventos assim que possível após o recebimento.

Para resolver o problema de latência, você precisa trabalhar de maneira a minimizar a quantidade de latência incorrida por evento a ser armazenada. Essa é a principal coisa que você precisa alcançar se não estiver disposto ou não puder alterar o hardware . Como você está nos serviços PaaS e não tem controle sobre hardware ou rede, a única maneira de reduzir a latência por evento será com algum tipo de gravação em lote de eventos.

Você precisará armazenar localmente uma fila de eventos que seja liberada e gravada periodicamente no seu banco de dados, assim que atingir um determinado tamanho ou depois de um período de tempo decorrido. Um processo precisará monitorar essa fila para acionar a liberação para a loja. Deve haver muitos exemplos de como gerenciar uma fila simultânea que é liberada periodicamente no seu idioma de escolha - Aqui está um exemplo em C # , do coletor periódico de lotes da popular biblioteca de registro Serilog.

Essa resposta do SO descreve a maneira mais rápida de liberar dados no Postgres - embora isso exija que o lote armazene a fila no disco e é provável que haja um problema a ser resolvido quando o disco desaparecer após a reinicialização no Heroku.

Limitação

Outra resposta já mencionou o CQRS , e essa é a abordagem correta para resolver a restrição. Você deseja hidratar os modelos de leitura à medida que cada evento é processado - um padrão Mediador pode ajudar a encapsular um evento e distribuí-lo para vários manipuladores em processo. Portanto, um manipulador pode adicionar o evento ao seu modelo de leitura que está na memória que os clientes podem consultar e outro manipulador pode ser responsável por enfileirar o evento para sua eventual gravação em lote.

O principal benefício do CQRS é que você desacopla seus modelos conceituais de leitura e gravação - que é uma maneira elegante de dizer que você escreve em um modelo e que lê de outro modelo totalmente diferente. Para obter benefícios de escalabilidade do CQRS, geralmente você deseja garantir que cada modelo seja armazenado separadamente, de maneira ideal para seus padrões de uso. Nesse caso, podemos usar um modelo de leitura agregado - por exemplo, um cache Redis, ou simplesmente na memória - para garantir que nossas leituras sejam rápidas e consistentes, enquanto ainda usamos nosso banco de dados transacional para gravar nossos dados.

Andrew Best
fonte
3

Os eventos ocorrem mais rapidamente do que o pool de conexões do banco de dados pode suportar

Este é um problema se cada processo precisar de uma conexão com o banco de dados. O sistema deve ser projetado para que você tenha um conjunto de trabalhadores em que cada trabalhador precise apenas de uma conexão com o banco de dados e cada trabalhador possa processar vários eventos.

A fila de mensagens pode ser usada com esse design. Você precisa de produtores de mensagens que enviam eventos para a fila de mensagens e os trabalhadores (consumidores) processam as mensagens da fila.

Outros clientes podem querer ler eventos simultaneamente

Essa restrição só é possível se os eventos armazenados no banco de dados sem nenhum processamento (eventos brutos). Se os eventos estiverem sendo processados ​​antes de serem armazenados no banco de dados, a única maneira de obtê-los é pelo banco de dados.

Se os clientes querem apenas consultar eventos brutos, sugiro usar o mecanismo de pesquisa como o Elastic Search. Você ainda receberá a API de consulta / pesquisa gratuitamente.

Como parece importante a consulta de eventos antes que eles sejam salvos no banco de dados, uma solução simples como o Elastic Search deve funcionar. Basicamente, você apenas armazena todos os eventos e não duplica os mesmos dados, copiando-os para o banco de dados.

O dimensionamento da pesquisa elástica é fácil, mas mesmo com a configuração básica, é de alto desempenho.

Quando você precisar processar, seu processo poderá obter os eventos do ES, processá-los e armazená-los no banco de dados. Não sei qual é o nível de desempenho que você precisa desse processamento, mas seria completamente separado da consulta aos eventos do ES. Você não deve ter problema de conexão, pois pode ter um número fixo de trabalhadores e cada um com uma conexão com o banco de dados.

imel96
fonte
2

Eventos de 1k ou 2k (5KB) por segundo não é muito para um banco de dados se ele tiver um esquema e mecanismo de armazenamento apropriados. Conforme sugerido por @eddyce, um mestre com um ou mais escravos pode separar as consultas de leitura da confirmação de gravações. O uso de menos conexões com o banco de dados proporcionará uma melhor taxa de transferência geral.

Outros clientes podem querer ler eventos simultaneamente

Para esses pedidos, eles precisariam também ler do banco de dados mestre, pois haveria um atraso na replicação para os escravos de leitura.

Eu usei o MySQL (Percona) com o mecanismo TokuDB para gravações de volume muito alto. Há também o mecanismo MyRocks baseado no LSMtrees, que é bom para cargas de gravação. Para ambos os mecanismos e, provavelmente, também para o PostgreSQL, existem configurações para o isolamento de transações e o comportamento de sincronização de confirmação, que podem aumentar drasticamente a capacidade de gravação. No passado, aceitávamos até 1s de dados perdidos que eram relatados ao cliente db como confirmados. Em outros casos, havia SSDs com bateria para evitar perdas.

Afirma-se que o Amazon RDS Aurora no sabor MySQL possui uma taxa de transferência de gravação 6x mais alta com replicação de custo zero (semelhante aos escravos que compartilham um sistema de arquivos com o mestre). O sabor do Aurora PostgreSQL também possui um mecanismo de replicação avançado diferente.

karmakaze
fonte
TBH qualquer banco de dados bem administrado em hardware suficiente deve ser capaz de lidar com essa carga. O problema do OP não parece ser o desempenho do banco de dados, mas a latência da conexão; Meu palpite é que a Heroku, como um provedor de PaaS, está vendendo a eles uma instância do Postgres em uma região diferente da AWS.
amon
1

Eu soltaria o heroku todos juntos, ou seja, eu abandonaria uma abordagem centralizada: várias gravações que atingem o pico da conexão máxima do pool são uma das principais razões pelas quais os clusters db foram inventados, principalmente porque você não carrega a gravação db (s) com solicitações de leitura que podem ser executadas por outros db's no cluster, além disso, eu tentaria com uma topologia mestre-escravo - como alguém já mencionou, ter suas próprias instalações de db permitiria ajustar o conjunto sistema para garantir que o tempo de propagação da consulta seja tratado corretamente.

Boa sorte

Edoardo
fonte