Aqui está o algoritmo mais simples , se você deseja simplesmente soltar as mensagens quando elas chegarem muito rapidamente (em vez de colocá-las na fila, o que faz sentido porque a fila pode ficar arbitrariamente grande):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
Não há estruturas de dados, temporizadores etc. nesta solução e ela funciona de maneira limpa :) Para ver isso, a 'permissão' cresce na velocidade de 5/8 unidades por segundo, no máximo, ou seja, no máximo cinco unidades por oito segundos. Cada mensagem encaminhada deduz uma unidade, portanto você não pode enviar mais de cinco mensagens a cada oito segundos.
Observe que rate
deve ser um número inteiro, ou seja, sem parte decimal diferente de zero, ou o algoritmo não funcionará corretamente (a taxa real não será rate/per
). Por exemplo rate=0.5; per=1.0;
, não funciona porque allowance
nunca aumentará para 1,0. Mas rate=1.0; per=2.0;
funciona bem.
allowance
. O tamanho do balde érate
. Aallowance += …
linha é uma otimização da adição de um token a cada taxa ÷ por segundo.Use este decorador @RateLimited (ratepersec) antes da sua função que enfileira.
Basicamente, isso verifica se 1 / taxa de segundos se passaram desde a última vez e, se não, aguarda o restante do tempo, caso contrário, não espera. Isso efetivamente limita a taxa / s. O decorador pode ser aplicado a qualquer função que você queira com taxa limitada.
No seu caso, se você quiser no máximo 5 mensagens por 8 segundos, use @RateLimited (0.625) antes da função sendToQueue.
fonte
time.clock()
não tem resolução suficiente no meu sistema, então eu adaptei o código e mudou para usotime.time()
time.clock()
, o que mede o tempo decorrido da CPU. O tempo da CPU pode correr muito mais rápido ou muito mais lentamente que o tempo "real". Você deseja usartime.time()
, em vez disso, que mede o tempo da parede (tempo "real").Um Token Bucket é bastante simples de implementar.
Comece com um balde com 5 fichas.
A cada 5/8 segundos: se o balde tiver menos de 5 tokens, adicione um.
Sempre que você desejar enviar uma mensagem: Se o bucket tiver ≥1 token, retire um token e envie a mensagem. Caso contrário, aguarde / solte a mensagem / o que for.
(obviamente, no código real, você usaria um contador inteiro em vez de tokens reais e poderá otimizar a cada etapa de 5 / 8s armazenando registros de data e hora)
Lendo a pergunta novamente, se o limite da taxa for totalmente redefinido a cada 8 segundos, eis uma modificação:
Comece com um registro de data e hora,
last_send
há muito tempo (por exemplo, na época). Além disso, comece com o mesmo balde de 5 token.Atinja a regra a cada 5/8 segundos.
Cada vez que você envia uma mensagem: Primeiro, verifique se há
last_send
≥ 8 segundos. Nesse caso, preencha o balde (configure-o para 5 fichas). Segundo, se houver tokens no bucket, envie a mensagem (caso contrário, solte / aguarde / etc.). Terceiro, definalast_send
agora.Isso deve funcionar para esse cenário.
Na verdade, eu escrevi um bot de IRC usando uma estratégia como essa (a primeira abordagem). Está em Perl, não em Python, mas aqui está um código para ilustrar:
A primeira parte aqui trata da adição de tokens ao balde. Você pode ver a otimização da adição de tokens com base no tempo (da segunda à última linha) e, em seguida, a última linha prende o conteúdo do depósito ao máximo (MESSAGE_BURST)
$ conn é uma estrutura de dados que é distribuída. Isso está dentro de um método que é executado rotineiramente (calcula quando da próxima vez que tiver algo a fazer e dorme por tanto tempo ou até obter tráfego de rede). A próxima parte do método trata do envio. É um pouco complicado, porque as mensagens têm prioridades associadas a elas.
Essa é a primeira fila, que é executada, não importa o quê. Mesmo que nossa conexão seja morta por inundações. Usado para coisas extremamente importantes, como responder ao PING do servidor. Em seguida, o restante das filas:
Por fim, o status do bucket é salvo de volta na estrutura de dados $ conn (na verdade, um pouco mais tarde no método; ele primeiro calcula quanto tempo terá mais trabalho)
Como você pode ver, o código real de manipulação de bucket é muito pequeno - cerca de quatro linhas. O restante do código é manipulação de fila prioritária. O bot tem filas prioritárias, de modo que, por exemplo, alguém conversando com ele não pode impedi-lo de executar suas importantes tarefas de chute / banimento.
fonte
para bloquear o processamento até que a mensagem possa ser enviada, enfileirando outras mensagens, a bela solução da antti também pode ser modificada assim:
apenas espera até que haja permissão suficiente para enviar a mensagem. para não começar com o dobro da taxa, a permissão também pode ser inicializada com 0.
fonte
(1-allowance) * (per/rate)
, você precisa adicionar a mesma quantidade alast_check
.Mantenha o tempo em que as últimas cinco linhas foram enviadas. Mantenha as mensagens na fila até o momento em que a quinta mensagem mais recente (se existir) for pelo menos 8 segundos no passado (com last_five como uma matriz de vezes):
fonte
Uma solução é anexar um carimbo de data / hora a cada item da fila e descartá-lo após 8 segundos. Você pode executar essa verificação sempre que a fila for adicionada.
Isso funciona apenas se você limitar o tamanho da fila a 5 e descartar quaisquer adições enquanto a fila estiver cheia.
fonte
Se alguém ainda estiver interessado, eu uso essa classe de chamada simples em conjunto com um armazenamento de valor de chave LRU cronometrado para limitar a taxa de solicitação por IP. Usa um deque, mas pode ser reescrito para ser usado com uma lista.
fonte
Apenas uma implementação em python de um código a partir da resposta aceita.
fonte
Que tal agora:
fonte
Eu precisava de uma variação no Scala. Aqui está:
Aqui está como ele pode ser usado:
fonte