buffer circular eficiente?

109

Eu quero criar um buffer circular eficiente em python (com o objetivo de tirar médias dos valores inteiros no buffer).

Esta é uma maneira eficiente de usar uma lista para coletar valores?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

O que seria mais eficiente (e por quê)?

Jedierikb
fonte
Esta não é uma maneira eficiente de implementar o buffer circular porque pop (0) é uma operação O (n) na lista. pop (0) remove o primeiro elemento da lista e todos os elementos devem ser deslocados para a esquerda. Em vez disso, use Collections.deque com o atributo maxlen. deque tem operação O (1) para anexar e pop.
Vlad Bezden

Respostas:

204

Eu usaria collections.dequecom um maxlenarg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

Existe uma receita nos documentos para dequeisso é semelhante ao que você deseja. Minha afirmação de que é o mais eficiente repousa inteiramente no fato de que é implementado em C por uma equipe incrivelmente habilidosa que tem o hábito de produzir código de primeira linha.

Aaronasterling
fonte
7
1 Sim, são as baterias inclusas. As operações para o buffer circular são O (1) e, como você disse, a sobrecarga extra está em C, então ainda deve ser bastante rápida
John La Rooy
7
Eu não gosto desta solução porque a documentação não garante acesso aleatório O (1) quando maxlené definido. O (n) é compreensível quando o dequepode crescer até o infinito, mas se maxlenfor fornecido, a indexação de um elemento deve ser um tempo constante.
lvella
1
Meu palpite é que ele é implementado como uma lista vinculada e não como um array.
e-satis
1
Parece correto, se os tempos em minha resposta abaixo estiverem corretos.
djvg de
13

aparecer no topo de uma lista faz com que toda a lista seja copiada, por isso é ineficiente

Em vez disso, você deve usar uma lista / array de tamanho fixo e um índice que se move pelo buffer conforme você adiciona / remove itens

John La Rooy
fonte
4
Aceita. Não importa o quão elegante ou deselegante pareça ou qualquer linguagem que seja usada. Na realidade, quanto menos você incomodar o coletor de lixo (ou gerenciador de heap ou mecanismos de paginação / mapeamento ou o que quer que faça a mágica da memória), melhor.
@RocketSurgeon Não é mágico, é apenas um array cujo primeiro elemento é excluído. Portanto, para uma matriz de tamanho n, isso significa n-1 operações de cópia. Nenhum coletor de lixo ou dispositivo semelhante está envolvido aqui.
Cristão
3
Concordo. Fazer isso também é muito mais fácil do que algumas pessoas pensam. Basta usar um contador cada vez maior e usar o operador módulo (% arraylen) ao acessar o item.
Andre Blum
idem, você pode conferir meu post acima, foi assim que eu fiz
MoonCactus 05/12/2014
10

Com base na resposta do MoonCactus , aqui está uma circularlistaula. A diferença com sua versão é que aqui c[0]sempre dará o elemento mais antigo anexado, c[-1]o último elemento anexado, c[-2]o penúltimo ... Isso é mais natural para aplicativos.

c = circularlist(4)
c.append(1); print c, c[0], c[-1]    #[1]              1, 1
c.append(2); print c, c[0], c[-1]    #[1, 2]           1, 2
c.append(3); print c, c[0], c[-1]    #[1, 2, 3]        1, 3
c.append(8); print c, c[0], c[-1]    #[1, 2, 3, 8]     1, 8
c.append(10); print c, c[0], c[-1]   #[10, 2, 3, 8]    2, 10
c.append(11); print c, c[0], c[-1]   #[10, 11, 3, 8]   3, 11

Classe:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

[Editado]:data Parâmetro opcional adicionado para permitir a inicialização de listas existentes, por exemplo:

circularlist(4, [1, 2, 3, 4, 5])      #  [2, 3, 4, 5] (4 items)
circularlist(4, set([1, 2, 3, 4, 5])) #  [2, 3, 4, 5] (4 items)
circularlist(4, (1, 2, 3, 4, 5))      #  [2, 3, 4, 5] (4 items)
Basj
fonte
Boa adição. As listas Python já permitem índices negativos, mas (-1), por exemplo, não retornaria o valor esperado quando o buffer circular estiver cheio, pois a "última" adição à lista termina dentro da lista.
MoonCactus
1
Funciona @MoonCactus, veja os 6 exemplos que dei no topo da resposta; nas últimas, você pode ver que c[-1]é sempre o elemento certo. __getitem__faz direito.
Basj,
ah sim, quero dizer, o meu falhou, não o seu, desculpe: DI deixará meu comentário mais claro! - ah não posso, o comentário é muito antigo.
MoonCactus
boa solução simples. Eu adicionei um argumento opcional para permitir a inicialização da lista de dados existentes, é mais pythonpathetic dessa forma.
Orwellophile
9

O deque do Python é lento. Você também pode usar numpy.roll. Como você gira os números em uma matriz numpy de forma (n,) ou (n, 1)?

Neste benchmark, deque é 448ms. Numpy.roll tem 29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

Orvar Korvar
fonte
1
Mas numpy.rollretorna uma cópia do array, certo?
djvg de
3
Essa resposta é muito enganosa - o deque do Python parece ser bastante rápido, mas converter de e para arrays entorpecidos o torna consideravelmente mais lento nos benchmarks aos quais você vincula.
xitrium
7

ok com o uso da classe deque, mas para os requisitos da questão (média) esta é a minha solução:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
SmartElectron
fonte
Recebo TypeError: 'numpy.float64' object is not callableao tentar chamar o averagemétodo
scls
Sim ... na verdade, acho que o deque usa matrizes numpy internamente (depois de remover @property, ele funciona bem)
scls
17
Garanto que o deque não usa matrizes numpy internamente. collectionsfaz parte da biblioteca padrão, numpynão. Dependências de bibliotecas de terceiros criariam uma biblioteca padrão terrível.
6

Embora já haja um grande número de ótimas respostas aqui, não consegui encontrar nenhuma comparação direta de tempos para as opções mencionadas. Portanto, encontre minha humilde tentativa de comparação abaixo.

Apenas para fins de teste, a classe pode alternar entre um listbuffer baseado em-, um collections.dequebuffer baseado em e um Numpy.rollbuffer baseado em.

Observe que o updatemétodo adiciona apenas um valor por vez, para mantê-lo simples.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

No meu sistema, isso produz:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
djvg
fonte
4

Que tal a solução do Python Cookbook , incluindo uma reclassificação da instância do buffer de anel quando ela ficar cheia?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

A escolha de design notável na implementação é que, uma vez que esses objetos passam por uma transição de estado não reversível em algum ponto de suas vidas - de buffer não cheio para buffer cheio (e mudanças de comportamento nesse ponto) - eu modelei isso mudando self.__class__. Isso funciona mesmo no Python 2.2, contanto que ambas as classes tenham os mesmos slots (por exemplo, funciona bem para duas classes clássicas, como RingBuffer e __Fullnesta receita).

Alterar a classe de uma instância pode ser estranho em muitas linguagens, mas é uma alternativa pitônica a outras maneiras de representar mudanças de estado ocasionais, massivas, irreversíveis e discretas que afetam amplamente o comportamento, como nesta receita. Ainda bem que Python oferece suporte para todos os tipos de classes.

Crédito: Sébastien Keim

d8aninja
fonte
Fiz alguns testes de velocidade desse vs deque. Isso é cerca de 7 vezes mais lento que o deque.
PolyMesh
@PolyMesh incrível, você deve avisar o autor!
d8aninja
1
Qua seria o ponto disso? É um antigo documento publicado. O objetivo do meu comentário é permitir que outras pessoas saibam que essa resposta está desatualizada e, em vez disso, usar deque.
PolyMesh
@PolyMesh provavelmente ainda estava mais lento quando ele o publicou; as instruções para entrar em contato com o autor estão na introdução do livro. Estou apenas relatando uma alternativa possível. Além disso, "Se apenas a velocidade fosse a melhor métrica; infelizmente, pode ser apenas uma boa."
d8aninja
3

Você também pode ver esta receita Python bastante antiga .

Aqui está minha própria versão com a matriz NumPy:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
scls
fonte
4
+1 para usar numpy, mas -1 para não implementar um buffer circular. Da maneira como você implementou, você está transferindo todos os dados sempre que adiciona um único elemento, isso custa O(n)tempo. Para implementar um buffer circular apropriado , você deve ter um índice e uma variável de tamanho, e você precisa lidar corretamente com o caso em que os dados 'envolvem' o final do buffer. Ao recuperar dados, pode ser necessário concatenar duas seções no início e no final do buffer.
Bas Swinckels
2

Este não requer nenhuma biblioteca. Ele cria uma lista e depois circula por índice.

O espaço físico é muito pequeno (sem biblioteca) e é executado duas vezes mais rápido do que retirar da fila, pelo menos. Isso é bom para calcular médias móveis, mas esteja ciente de que os itens não são mantidos classificados por idade como acima.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

Para obter o valor médio, por exemplo:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Resulta em:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

Isso é cerca de 1/3 do tempo equivalente com desenfileiramento.

MoonCactus
fonte
1
Não deveria __getitem__ser um pouco mais poderoso self._data[(key + self._index + 1) % self._size]:?
Mateen Ulhaq
Por que você deseja mudar para +1? Agora, sim, veja a variante Basj abaixo para a ideia
MoonCactus
1

Eu tive esse problema antes de fazer a programação serial. Na época, há pouco mais de um ano, também não consegui encontrar nenhuma implementação eficiente, então acabei escrevendo uma como uma extensão C e também está disponível em pypi sob uma licença do MIT. É superbásico, só lida com buffers de chars assinados de 8 bits, mas é de comprimento flexível, então você pode usar Struct ou algo em cima dele se precisar de algo diferente de chars. Vejo agora, com uma pesquisa no Google, que existem várias opções atualmente, então você pode querer dar uma olhada nelas também.

sirlark
fonte
1

Sua resposta não está certa. O buffer circular principal tem dois princípios (https://en.wikipedia.org/wiki/Circular_buffer )

  1. O lenth do buffer é definido;
  2. Primeiro a entrar, primeiro a sair;
  3. Quando você adiciona ou exclui um item, os outros itens não devem se mover

seu código abaixo:

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Vamos considerar uma situação em que a lista está cheia, usando seu código:

self.mylist = [1, 2, 3, 4, 5]

agora acrescentamos 6, a lista é alterada para

self.mylist = [2, 3, 4, 5, 6]

os itens esperados 1 na lista mudaram de posição

seu código é uma fila, não um buffer circular.

A resposta do Basj, eu acho que é a mais eficiente.

A propósito, um buffer de círculo pode melhorar o desempenho da operação para adicionar um item.

Johnny Wong
fonte
1

Do Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

Ijaz Ahmad Khan
fonte
0

A pergunta original era: buffer circular " eficiente ". De acordo com esta eficiência solicitada, a resposta de aaronasterling parece ser definitivamente correta. Usando uma classe dedicada programada em Python e comparando o processamento do tempo com as coleções.deque mostra uma aceleração de x5,2 vezes com deque! Aqui está um código muito simples para testar isso:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

Para transformar um deque em uma lista, basta usar:

my_list = [v for v in my_deque]

Você então obterá acesso aleatório O (1) aos itens deque. Claro, isso só é valioso se você precisar fazer muitos acessos aleatórios ao deque depois de configurá-lo uma vez.

Schmouk
fonte
0

Isso aplica o mesmo princípio a alguns buffers destinados a conter as mensagens de texto mais recentes.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
David Torrens
fonte
0

Você pode verificar esse buffer circular com base em uma matriz numpy de tamanho predefinido. A ideia é que você crie um buffer (aloque memória para o array numpy) e depois acrescente a ele. A inserção e recuperação de dados é muito rápida. Criei este módulo para um propósito semelhante ao que você precisa. No meu caso, tenho um dispositivo que gera dados inteiros. Eu li os dados e os coloquei no buffer circular para análise e processamento futuros.

Valentyn
fonte