Filtrando um sinal digital on-line em tempo real usando python
7
Atualmente, estou tentando aplicar um filtro passa-banda a um sinal em tempo real. Há amostras chegando com uma taxa de amostragem constante e eu gostaria de calcular o sinal filtrado passa-banda correspondente.
Qual seria a melhor forma de fazer isso? Preciso filtrar todo (ou pelo menos um pouquinho) o sinal toda vez que chegam algumas amostras novas ou existe uma maneira (como a DFT deslizante) em que é possível determinar com eficiência a nova parte do filtro filtrado sinal?
Gostaria de usar um filtro butterworth (para análise offline, atualmente estou usando a manteiga e o filtro de scipy). Sei que essa função pode retornar um atraso no filtro, mas não sei como usá-la para obter um sinal constante.
A mecânica fundamental da execução do processamento digital de auido em tempo real em uma plataforma de PC de uso geral baseada em X-windows baseia-se no uso de uma família de arquiteturas com buffer duplo.
Nesta arquitetura, o som que chega através de um microfone / entrada de linha é primeiro convertido em amostras via placa de som ADC e, em seguida, é preenchido em um buffer de entrada na taxa de amostragem Fs selecionada pelo usuário. Quando esse buffer está cheio, primeiro o hardware da placa de som notifica o sistema operacional e, em seguida, o sistema operacional notifica o seu programa. E seu programa pode acessar o bloco e iniciar o processamento de amostras nesse bloco.
No entanto, ao mesmo tempo em que está ocupado com o bloco atual, seu programa já forneceu outro (o segundo) buffer a ser preenchido pela placa de áudio com as amostras que chegam enquanto você processa o buffer preenchido anteriormente. Quando esse buffer disponível no momento é completamente processado, é necessário começar a processar o próximo buffer imediatamente, sem atrasos, o que é uma necessidade fundamental para a reprodução de áudio sem cliques. Dessa maneira, você pode criar um processo de áudio suave sem falhas e rachaduras.
Além disso, se você fará uma filtragem com base em FIR ou IIR, poderá filtrar todo o buffer de uma só vez como o do caso FIR ou ir amostra recursivamente por amostra para um caso IIR.
O tamanho do buffer é importante para o atraso inicial do processo. Portanto, se você for muito grande, terá que esperar até que os dois buffers sejam preenchidos antes de produzir qualquer coisa. Por outro lado, se você tomar os buffers muito curtos, o sistema ficará sobrecarregado pelas interrupções de entrada.
Uma escolha ideal é entre 128 e 1024 amostras. Esses comprimentos de buffer são apropriados para processamento posterior do tipo FFT. Também é possível aumentar o número de buffers para obter uma taxa de transferência mais robusta sob diferentes condições de carga do sistema. Mas pelo menos dois buffers são necessários.
Embora eu esteja processando o sinal de EEG, posso aplicar perfeitamente isso, obrigado!
BSTadlbauer
11
Essa é exatamente a descrição da arquitetura de buffer em cascata do GNU Radio, a propósito.
Marcus Müller
11
Abordei sua postagem na extensão da minha resposta, @ Fat32; Espero que gostem :)
Marcus Müller
@ MarcusMüller; Obrigado pela cooperação. I apreciam;)
Fat32
5
Preciso filtrar todo (ou pelo menos um pouquinho) o sinal toda vez que chegam algumas amostras novas ou existe uma maneira (como a DFT deslizante) em que é possível determinar com eficiência a nova parte do filtro filtrado sinal?
Os filtros digitais não funcionam assim - basicamente, o FIR ou IIR clássico pode funcionar em cada nova amostra . Você realmente deve ler sobre o que são esses filtros e como as pessoas os modelam.
Eu gostaria de usar um filtro butterworth
Bem, há muitas implementações disso por aí,
Atualmente, estou usando manteiga e filtro de scipy
dos quais você já conhece um!
Agora, um filtro butterworth é uma coisa recursiva; portanto, para calcular a próxima parte do sinal amostrado, você precisará do último estado. Esse é exatamente o "estado de atraso do filtro zi" que lfilterretorna e pode receber a próxima chamada como ziparâmetro.
mas não sei como usá-lo para obter um sinal constante.
Eu acho que você quer dizer "alcançar filtragem contínua".
Agora, dito isso, o ponto é que você está se preparando para escrever sua própria arquitetura de streaming. Eu não faria isso. Use uma estrutura existente. Por exemplo, há o GNU Radio, que permite definir gráficos de fluxo de processamento de sinal em Python, e também é inerentemente multithread, usa implementações de algoritmos altamente otimizadas, possui muitas facilidades de entrada e saída e vem com uma enorme biblioteca de blocos de processamento de sinal , que pode ser escrito em Python ou C ++, se você precisar fazer isso.
Por exemplo, um gráfico de fluxo que coleta amostras de uma placa de som, filtra-as com butterworth e as grava em um arquivo é:
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
##################################################
# GNU Radio Python Flow Graph
# Title: Butterworth Test
# Generated: Mon Feb 8 16:17:18 2016
##################################################
from gnuradio import audio
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import filter
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
class butterworth_test(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "Butterworth Test")
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 48000
##################################################
# Blocks
##################################################
# taps from scipy.butter!
self.iir_filter_xxx_0 = filter.iir_filter_ffd(([1.0952627450621233e-05, 0.00013143152940745496, 0.0007228734117410033, 0.0024095780391366808, 0.005421550588057537, 0.008674480940892064, 0.010120227764374086, 0.008674480940892081, 0.005421550588057554, 0.0024095780391366955, 0.0007228734117410089, 0.00013143152940745594, 1.0952627450621367e-05]), ([1.0, -4.4363862740719835, 10.215121830052535, -15.374408118154847, 16.57333784740102, -13.325056987818655, 8.133543488903097, -3.77641064765334, 1.3181452681671835, -0.3361758629961047, 0.05930166356243964, -0.0064815521348275, 0.00033130678123743994]), False)
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_float*1, "", False)
self.blocks_file_sink_0.set_unbuffered(False)
self.audio_source_0 = audio.source(samp_rate, "", True)
##################################################
# Connections
##################################################
self.connect((self.audio_source_0, 0), (self.iir_filter_xxx_0, 0))
self.connect((self.iir_filter_xxx_0, 0), (self.blocks_file_sink_0, 0))
def main(top_block_cls=butterworth_test, options=None):
tb = top_block_cls()
tb.start()
try:
raw_input('Press Enter to quit: ')
except EOFError:
pass
tb.stop()
tb.wait()
if __name__ == '__main__':
main()
Observe que esse código foi gerado automaticamente a partir de um gráfico de fluxo gráfico que eu apenas cliquei usando o gnuradio-companionprograma:
Se você quiser saber mais sobre como implementar gráficos de fluxo de processamento de sinal em Python, vá para os tutoriais guiados por rádio da GNU .
EDIT : Gostei bastante da resposta da @ Fat32! O que ele descreve como uma arquitetura de buffer duplo é bem parecido com o que o GNU Radio faz:
Um bloco upstream produz amostras em pedaços de tamanhos arbitrários, grava-os no buffer do anel de saída (que é representado como uma seta na figura acima) e notifica seus blocos downstream de que há novos dados.
O bloco downstream é notificado, verifica se há espaço suficiente em seu buffer de saída para processar as amostras que estão em seu buffer de anel de entrada (que é o mesmo que o buffer de saída do bloco upstream), processa-os. Quando finalizado, informa o (s) bloco (s) a montante que utilizou o buffer do anel de entrada (que pode ser reutilizado pelos blocos a montante como saída) e os blocos a jusante sobre a disponibilidade de novas amostras.
Agora, com o GNU Radio sendo multiencadeado, o bloco upstream já pode estar produzindo amostras novamente; em um aplicativo normal do GNU Radio, quase todos os blocos estão "ativos" simultaneamente e as coisas se adaptam muito bem em máquinas com várias CPUs.
Portanto, a principal tarefa do GNU Radio é fornecer a você essa infraestrutura de buffer, a limpeza de notificações e encadeamentos, a API clara do bloco de processamento de sinal e algo para definir como tudo está conectado, para que você não precise escrever o que o Fat32 descreve nela. publique-se! Observe que não é tão fácil fazer a organização do fluxo de amostra, e o GNU Radio elimina a dureza e permite que você se concentre no que você quer fazer: DSP.
Obrigado! Eu olhei para o GNU Radio, mas como processarei um sinal de EEG, precisaria criar meu próprio módulo para usar o gráfico de fluxo, porque cada amostra tem um registro de data e hora que deve ser rastreável durante todo o processo de filtragem.
BSTadlbauer
Você não precisa de um bloqueio para isso. As amostras são consecutivas, e com uma taxa de amostragem fixa, o tempo é directamente disponível como a taxa de amostragem de corrente
Marcus Müller
@ MarcusMüller; Essa arquitetura do GNU Radio, que você descreve, é realmente o que a filosofia moderna oferece para uso. Flexibilidade, facilidade de codificação e, o mais importante, concentrar-se em qual é o seu objetivo principal (processamento DSP), em vez de como alcançar isso usando detalhes intrincados de baixo nível (como o que acontece quando você tenta implementar a técnica de buffer duplo usando chamadas da API do Win32 !)
Fat32
@ Fat32, devemos reduzir o estilo de marketing, mas: Sim, de fato, e isso tudo, além de oferecer uma arquitetura eficiente de buffer de anel de cópia zero e uma extensa biblioteca de blocos que usam código otimizado manualmente no x86, é MMX, SSE, de SSE2, extensões AVX e ARM NEON quando aplicável usando o VOLK Vector Optimized Biblioteca do Núcleo :)
Marcus Müller
@ MarcusMüller Eu tive um tempo livre ontem, então eu examinei mais profundamente o GNU Radio e isso pareceria bastante útil, só que eu não entendo como posso "manualmente" alimentar amostras nos blocos sp, porque (todos) os turorials são feito para fontes de áudio provenientes de um dispositivo de hardware. Você não sabe onde posso encontrar um histórico, etc. para empurrar manualmente amostras para a cadeia? PS as amostras de EEG vir de uma camada especial (LSL - Lab vivo Layer) e tem cerca de 64 canais por amostra
Os filtros digitais não funcionam assim - basicamente, o FIR ou IIR clássico pode funcionar em cada nova amostra . Você realmente deve ler sobre o que são esses filtros e como as pessoas os modelam.
Bem, há muitas implementações disso por aí,
dos quais você já conhece um!
Agora, um filtro butterworth é uma coisa recursiva; portanto, para calcular a próxima parte do sinal amostrado, você precisará do último estado. Esse é exatamente o "estado de atraso do filtro zi" que
lfilter
retorna e pode receber a próxima chamada comozi
parâmetro.Eu acho que você quer dizer "alcançar filtragem contínua".
Agora, dito isso, o ponto é que você está se preparando para escrever sua própria arquitetura de streaming. Eu não faria isso. Use uma estrutura existente. Por exemplo, há o GNU Radio, que permite definir gráficos de fluxo de processamento de sinal em Python, e também é inerentemente multithread, usa implementações de algoritmos altamente otimizadas, possui muitas facilidades de entrada e saída e vem com uma enorme biblioteca de blocos de processamento de sinal , que pode ser escrito em Python ou C ++, se você precisar fazer isso.
Por exemplo, um gráfico de fluxo que coleta amostras de uma placa de som, filtra-as com butterworth e as grava em um arquivo é:
Observe que esse código foi gerado automaticamente a partir de um gráfico de fluxo gráfico que eu apenas cliquei usando o
gnuradio-companion
programa:Se você quiser saber mais sobre como implementar gráficos de fluxo de processamento de sinal em Python, vá para os tutoriais guiados por rádio da GNU .
EDIT : Gostei bastante da resposta da @ Fat32! O que ele descreve como uma arquitetura de buffer duplo é bem parecido com o que o GNU Radio faz:
Um bloco upstream produz amostras em pedaços de tamanhos arbitrários, grava-os no buffer do anel de saída (que é representado como uma seta na figura acima) e notifica seus blocos downstream de que há novos dados.
O bloco downstream é notificado, verifica se há espaço suficiente em seu buffer de saída para processar as amostras que estão em seu buffer de anel de entrada (que é o mesmo que o buffer de saída do bloco upstream), processa-os. Quando finalizado, informa o (s) bloco (s) a montante que utilizou o buffer do anel de entrada (que pode ser reutilizado pelos blocos a montante como saída) e os blocos a jusante sobre a disponibilidade de novas amostras.
Agora, com o GNU Radio sendo multiencadeado, o bloco upstream já pode estar produzindo amostras novamente; em um aplicativo normal do GNU Radio, quase todos os blocos estão "ativos" simultaneamente e as coisas se adaptam muito bem em máquinas com várias CPUs.
Portanto, a principal tarefa do GNU Radio é fornecer a você essa infraestrutura de buffer, a limpeza de notificações e encadeamentos, a API clara do bloco de processamento de sinal e algo para definir como tudo está conectado, para que você não precise escrever o que o Fat32 descreve nela. publique-se! Observe que não é tão fácil fazer a organização do fluxo de amostra, e o GNU Radio elimina a dureza e permite que você se concentre no que você quer fazer: DSP.
fonte