O que é um bom algoritmo de limitação de taxa?

155

Eu poderia usar algum pseudo-código, ou melhor, Python. Estou tentando implementar uma fila de limitação de taxa para um bot de Python IRC, e isso funciona parcialmente, mas se alguém acionar menos mensagens que o limite (por exemplo, o limite de taxa é de 5 mensagens por 8 segundos e a pessoa aciona apenas 4), e o próximo gatilho ocorre após os 8 segundos (por exemplo, 16 segundos depois), o bot envia a mensagem, mas a fila fica cheia e o bot aguarda 8 segundos, mesmo que não seja necessário desde que o período de 8 segundos expirou.

mini-homem
fonte

Respostas:

231

Aqui está o algoritmo mais simples , se você deseja simplesmente soltar as mensagens quando elas chegarem muito rapidamente (em vez de colocá-las na fila, o que faz sentido porque a fila pode ficar arbitrariamente grande):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

Não há estruturas de dados, temporizadores etc. nesta solução e ela funciona de maneira limpa :) Para ver isso, a 'permissão' cresce na velocidade de 5/8 unidades por segundo, no máximo, ou seja, no máximo cinco unidades por oito segundos. Cada mensagem encaminhada deduz uma unidade, portanto você não pode enviar mais de cinco mensagens a cada oito segundos.

Observe que ratedeve ser um número inteiro, ou seja, sem parte decimal diferente de zero, ou o algoritmo não funcionará corretamente (a taxa real não será rate/per). Por exemplo rate=0.5; per=1.0;, não funciona porque allowancenunca aumentará para 1,0. Mas rate=1.0; per=2.0;funciona bem.

Antti Huima
fonte
4
Também vale ressaltar que a dimensão e a escala de 'time_passed' devem ser iguais a 'por', por exemplo, segundos.
21413 skaffman
2
Oi skaffman, obrigado pelos elogios --- Eu joguei fora da minha manga, mas com 99,9% de probabilidade que alguém tenha anteriormente surgiu com uma solução semelhante :)
Antti Huima
52
Esse é um algoritmo padrão - é um bucket de token, sem fila. O balde é allowance. O tamanho do balde é rate. A allowance += …linha é uma otimização da adição de um token a cada taxa ÷ por segundo.
derobert
5
@zwirbeltier O que você escreveu acima não é verdade. 'Allowance' é sempre limitado por 'rate' (veja a linha "// throttle"); portanto, ele permite apenas uma explosão de exatamente mensagens de 'rate' a qualquer momento específico, ou seja, 5.
Antti Huima
8
Isso é bom, mas pode exceder a taxa. Digamos que no momento 0 você encaminha 5 mensagens e, no momento N * (8/5) para N = 1, 2, ... você pode enviar outra mensagem, resultando em mais de 5 mensagens em um período de 8 segundos
mindvirus
48

Use este decorador @RateLimited (ratepersec) antes da sua função que enfileira.

Basicamente, isso verifica se 1 / taxa de segundos se passaram desde a última vez e, se não, aguarda o restante do tempo, caso contrário, não espera. Isso efetivamente limita a taxa / s. O decorador pode ser aplicado a qualquer função que você queira com taxa limitada.

No seu caso, se você quiser no máximo 5 mensagens por 8 segundos, use @RateLimited (0.625) antes da função sendToQueue.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
Carlos A. Ibarra
fonte
Gosto da ideia de usar um decorador para esse fim. Por que lastTimeCalled é uma lista? Além disso, eu duvido que isso vai funcionar quando vários segmentos estão chamando a mesma função RateLimited ...
Stephan202
8
É uma lista porque tipos simples como float são constantes quando capturados por um fechamento. Ao torná-lo uma lista, a lista é constante, mas seu conteúdo não. Sim, não é seguro para threads, mas pode ser facilmente corrigido com bloqueios.
Carlos A. Ibarra
time.clock()não tem resolução suficiente no meu sistema, então eu adaptei o código e mudou para usotime.time()
mtrbean
3
Para limitar a taxa, você definitivamente não deseja usar time.clock(), o que mede o tempo decorrido da CPU. O tempo da CPU pode correr muito mais rápido ou muito mais lentamente que o tempo "real". Você deseja usar time.time(), em vez disso, que mede o tempo da parede (tempo "real").
John Wiseman
1
BTW para sistemas de produção reais: implementar uma limitação de taxa com uma chamada sleep () pode não ser uma boa ideia, pois ele bloqueia o encadeamento e, portanto, impede que outro cliente o utilize.
Maresh
28

Um Token Bucket é bastante simples de implementar.

Comece com um balde com 5 fichas.

A cada 5/8 segundos: se o balde tiver menos de 5 tokens, adicione um.

Sempre que você desejar enviar uma mensagem: Se o bucket tiver ≥1 token, retire um token e envie a mensagem. Caso contrário, aguarde / solte a mensagem / o que for.

(obviamente, no código real, você usaria um contador inteiro em vez de tokens reais e poderá otimizar a cada etapa de 5 / 8s armazenando registros de data e hora)


Lendo a pergunta novamente, se o limite da taxa for totalmente redefinido a cada 8 segundos, eis uma modificação:

Comece com um registro de data e hora, last_sendhá muito tempo (por exemplo, na época). Além disso, comece com o mesmo balde de 5 token.

Atinja a regra a cada 5/8 segundos.

Cada vez que você envia uma mensagem: Primeiro, verifique se há last_send≥ 8 segundos. Nesse caso, preencha o balde (configure-o para 5 fichas). Segundo, se houver tokens no bucket, envie a mensagem (caso contrário, solte / aguarde / etc.). Terceiro, defina last_sendagora.

Isso deve funcionar para esse cenário.


Na verdade, eu escrevi um bot de IRC usando uma estratégia como essa (a primeira abordagem). Está em Perl, não em Python, mas aqui está um código para ilustrar:

A primeira parte aqui trata da adição de tokens ao balde. Você pode ver a otimização da adição de tokens com base no tempo (da segunda à última linha) e, em seguida, a última linha prende o conteúdo do depósito ao máximo (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn é uma estrutura de dados que é distribuída. Isso está dentro de um método que é executado rotineiramente (calcula quando da próxima vez que tiver algo a fazer e dorme por tanto tempo ou até obter tráfego de rede). A próxima parte do método trata do envio. É um pouco complicado, porque as mensagens têm prioridades associadas a elas.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Essa é a primeira fila, que é executada, não importa o quê. Mesmo que nossa conexão seja morta por inundações. Usado para coisas extremamente importantes, como responder ao PING do servidor. Em seguida, o restante das filas:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Por fim, o status do bucket é salvo de volta na estrutura de dados $ conn (na verdade, um pouco mais tarde no método; ele primeiro calcula quanto tempo terá mais trabalho)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Como você pode ver, o código real de manipulação de bucket é muito pequeno - cerca de quatro linhas. O restante do código é manipulação de fila prioritária. O bot tem filas prioritárias, de modo que, por exemplo, alguém conversando com ele não pode impedi-lo de executar suas importantes tarefas de chute / banimento.

derobert
fonte
Estou faltando alguma coisa ... parece que isso seria limitá-lo a uma mensagem a cada 8 segundos depois de passar a primeira 5
chills42
@ chills42: Sim, li a pergunta errada ... veja a segunda metade da resposta.
Derobert 20/03/09
@ chills: se last_send for <8 segundos, você não adiciona tokens ao bucket. Se o seu bucket contiver tokens, você poderá enviar a mensagem; caso contrário você não pode (você já enviado 5 mensagens nos últimos 8 segundos)
derobert
3
Eu apreciaria se as pessoas que votaram contra isso explicassem o porquê ... Eu gostaria de corrigir os problemas que você vê, mas isso é difícil de fazer sem feedback!
Derobert 20/03/09
10

para bloquear o processamento até que a mensagem possa ser enviada, enfileirando outras mensagens, a bela solução da antti também pode ser modificada assim:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

apenas espera até que haja permissão suficiente para enviar a mensagem. para não começar com o dobro da taxa, a permissão também pode ser inicializada com 0.

san
fonte
5
Quando você dorme (1-allowance) * (per/rate), você precisa adicionar a mesma quantidade a last_check.
Alp
2

Mantenha o tempo em que as últimas cinco linhas foram enviadas. Mantenha as mensagens na fila até o momento em que a quinta mensagem mais recente (se existir) for pelo menos 8 segundos no passado (com last_five como uma matriz de vezes):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
Pesto
fonte
Desde que você a revisou, eu não sou.
Pesto
Você está armazenando cinco carimbos de hora e os deslocando repetidamente na memória (ou executando operações de lista vinculada). Estou armazenando um contador inteiro e um carimbo de data e hora. E apenas fazendo aritmética e atribuir.
Derobert 20/03/09
2
Exceto que o meu funcionará melhor se tentar enviar 5 linhas, mas apenas mais 3 forem permitidas no período. O seu permitirá o envio dos três primeiros e forçará uma espera de 8 segundos antes de enviar 4 e 5. O meu permitirá que o 4 e o 5 sejam enviados 8 segundos após a quarta e a quinta linhas mais recentes.
Pesto
1
Mas, sobre o assunto, o desempenho pode ser aprimorado através do uso de uma lista vinculada circular de comprimento 5, apontando para o quinto envio mais recente, substituindo-o em novo envio e movendo o ponteiro para frente.
Pesto
para um bot irc com velocidade limitadora de taxa não é um problema. Eu prefiro a solução de lista, pois é mais legível. a resposta que foi dada é confusa por causa da revisão, mas também não há nada errado.
21139 jheriko
2

Uma solução é anexar um carimbo de data / hora a cada item da fila e descartá-lo após 8 segundos. Você pode executar essa verificação sempre que a fila for adicionada.

Isso funciona apenas se você limitar o tamanho da fila a 5 e descartar quaisquer adições enquanto a fila estiver cheia.

jheriko
fonte
1

Se alguém ainda estiver interessado, eu uso essa classe de chamada simples em conjunto com um armazenamento de valor de chave LRU cronometrado para limitar a taxa de solicitação por IP. Usa um deque, mas pode ser reescrito para ser usado com uma lista.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
sanyi
fonte
1

Apenas uma implementação em python de um código a partir da resposta aceita.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
Hodza
fonte
Foi-me sugerido que sugiro que você adicione um exemplo de uso do seu código .
Luc
0

Que tal agora:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

fonte
0

Eu precisava de uma variação no Scala. Aqui está:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A  B) extends (A  B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Aqui está como ele pode ser usado:

val f = Limiter((5d, 8d), { 
  _: Unit  
    println(System.currentTimeMillis) 
})
while(true){f(())}
Landon Kuhn
fonte