Detecção de pico de sinal em dados de séries temporais em tempo real


Esta pergunta explora algoritmos robustos para detectar picos repentinos em dados de séries temporais em tempo real.

Considere o seguinte conjunto de dados:

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9 1, ...
     1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1 3, ... 
     2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

(Formato Matlab, mas não é sobre a linguagem, mas sobre o algoritmo)

Trama de dados

Você pode ver claramente que existem três picos grandes e alguns pequenos. Este conjunto de dados é um exemplo específico da classe de conjuntos de dados de séries temporais sobre a qual a pergunta se refere. Essa classe de conjuntos de dados possui dois recursos gerais:

  1. Há ruído básico com média geral
  2. Existem grandes " picos " ou " pontos de dados mais altos " que se desviam significativamente do ruído.

Vamos também assumir o seguinte:

  • a largura dos picos não pode ser determinada antecipadamente
  • a altura dos picos desvia clara e significativamente dos outros valores
  • o algoritmo usado deve calcular em tempo real (então mude a cada novo ponto de dados)

Para tal situação, um valor limite precisa ser construído, o que dispara sinais. No entanto, o valor limite não pode ser estático e deve ser determinado em tempo real com base em um algoritmo.

Minha pergunta: o que é um bom algoritmo para calcular esses limites em tempo real? Existem algoritmos específicos para essas situações? Quais são os algoritmos mais conhecidos?

Algoritmos robustos ou idéias úteis são todos muito apreciados. (pode responder em qualquer idioma: trata-se do algoritmo)

Não deve haver alguma exigência de altura absoluta por ser um pico além dos requisitos já dadas. Caso contrário, o pico no tempo 13 deve ser considerado um pico.
Concordo. Vamos supor que esses picos sejam os que precisamos considerar apenas.
Você pode estar fazendo a pergunta errada. Em vez de perguntar como você pode detectar sem demora, você pode perguntar se é possível detectar um certo tipo de sinal sem demora, considerando apenas o que é conhecido antes desse período ou o que precisa ser conhecido sobre um sinal para detectar algo com algum dado demora.
Eu costumava fazer isso para detectar uma mudança abrupta da intensidade da luz em um fotossensor. Fiz isso movendo a média e ignorando quaisquer pontos de dados que sejam maiores que um limite. Observe que esse limite é diferente do limite que determina um pico. Portanto, suponha que você inclua apenas pontos de dados que estejam dentro de um stddev em relação à sua média móvel e considere esses pontos de dados com mais de três stddev como picos. Esse algoritmo se saiu muito bem em nosso contexto de aplicação naquela época.
Ah entendo. Eu não estava esperando isso no formulário de código. Se eu tivesse visto essa pergunta anteriormente, provavelmente você obteria essa resposta muito mais rapidamente = D. De qualquer forma, minha aplicação naquele tempo foi detectar se o fotossensor está obstruído da fonte de luz ambiente (é por isso que precisamos da média móvel, pois a fonte de luz ambiente pode mudar gradualmente ao longo do tempo). Criamos isso como um jogo em que você deve passar o mouse sobre os sensores seguindo um padrão específico. = D
justhalf 28/03



Algoritmo robusto de detecção de pico (usando z-scores)

Eu vim com um algoritmo que funciona muito bem para esses tipos de conjuntos de dados. Ele é baseado no princípio da dispersão : se um novo ponto de dados está com um número x de desvios padrão distantes de alguma média móvel, o algoritmo sinaliza (também chamado de z-score ). O algoritmo é muito robusto porque constrói uma média móvel e um desvio separados , de modo que os sinais não corromperam o limite. Os sinais futuros são, portanto, identificados com aproximadamente a mesma precisão, independentemente da quantidade de sinais anteriores. O algoritmo leva 3 entradas: lag = the lag of the moving window, threshold = the z-score at which the algorithm signalse influence = the influence (between 0 and 1) of new signals on the mean and standard deviation. Por exemplo, um lagde 5 usará as últimas 5 observações para suavizar os dados. UMAthresholdde 3,5 sinalizará se um ponto de dados estiver a 3,5 desvios padrão da média móvel. E um influencede 0,5 fornece aos sinais metade da influência que os pontos de dados normais têm. Da mesma forma, um influencede 0 ignora completamente os sinais para recalcular o novo limite. Uma influência de 0 é, portanto, a opção mais robusta (mas assume a estacionariedade ); colocar a opção de influência em 1 é menos robusto. Para dados não estacionários, a opção de influência deve, portanto, ser colocada entre 0 e 1.

Funciona da seguinte maneira:


# Let y be a vector of timeseries data of at least length lag+2
# Let mean() be a function that calculates the mean
# Let std() be a function that calculates the standard deviaton
# Let absolute() be the absolute value function

# Settings (the ones below are examples: choose what is best for your data)
set lag to 5;          # lag 5 for the smoothing functions
set threshold to 3.5;  # 3.5 standard deviations for signal
set influence to 0.5;  # between 0 and 1, where 1 is normal influence, 0.5 is half

# Initialize variables
set signals to vector 0,...,0 of length of y;   # Initialize signal results
set filteredY to y(1),...,y(lag)                # Initialize filtered series
set avgFilter to null;                          # Initialize average filter
set stdFilter to null;                          # Initialize std. filter
set avgFilter(lag) to mean(y(1),...,y(lag));    # Initialize first value
set stdFilter(lag) to std(y(1),...,y(lag));     # Initialize first value

for i=lag+1,...,t do
  if absolute(y(i) - avgFilter(i-1)) > threshold*stdFilter(i-1) then
    if y(i) > avgFilter(i-1) then
      set signals(i) to +1;                     # Positive signal
      set signals(i) to -1;                     # Negative signal
    # Reduce influence of signal
    set filteredY(i) to influence*y(i) + (1-influence)*filteredY(i-1);
    set signals(i) to 0;                        # No signal
    set filteredY(i) to y(i);
  # Adjust the filters
  set avgFilter(i) to mean(filteredY(i-lag),...,filteredY(i));
  set stdFilter(i) to std(filteredY(i-lag),...,filteredY(i));

As regras práticas para a seleção de bons parâmetros para seus dados podem ser encontradas abaixo.


Demonstração de algoritmo robusto de limiarização

O código Matlab para esta demonstração pode ser encontrado aqui . Para usar a demonstração, basta executá-la e criar uma série temporal clicando no gráfico superior. O algoritmo começa a funcionar após o desenho do lagnúmero de observações.


Para a pergunta original, esse algoritmo fornecerá a seguinte saída ao usar as seguintes configurações lag = 30, threshold = 5, influence = 0::

Exemplo de algoritmo de limiar

Implementações em diferentes linguagens de programação:

Regras básicas para configurar o algoritmo

lag: o parâmetro lag determina quanto seus dados serão suavizados e quão adaptável é o algoritmo às alterações na média de longo prazo dos dados. Quanto mais estacionários seus dados, mais atrasos você deve incluir (isso deve melhorar a robustez do algoritmo). Se seus dados contiverem tendências que variam no tempo, considere a rapidez com que deseja que o algoritmo se adapte a essas tendências. Ou seja, se você colocar lag10, leva 10 'períodos' antes que o limite do algoritmo seja ajustado a quaisquer alterações sistemáticas na média de longo prazo. Portanto, escolha o lagparâmetro com base no comportamento de tendência dos seus dados e quão adaptável você deseja que o algoritmo seja.

influence: este parâmetro determina a influência dos sinais no limiar de detecção do algoritmo. Se colocado em 0, os sinais não influenciam o limiar, de modo que os sinais futuros sejam detectados com base em um limiar que é calculado com uma média e desvio padrão que não é influenciado por sinais passados. Outra maneira de pensar sobre isso é que, se você colocar a influência em 0, assume implicitamente a estacionariedade (ou seja, não importa quantos sinais existam, a série temporal sempre retorna à mesma média a longo prazo). Se esse não for o caso, você deve colocar o parâmetro de influência em algum lugar entre 0 e 1, dependendo da extensão em que os sinais possam influenciar sistematicamente a tendência variável dos dados no tempo. Por exemplo, se os sinais levarem a uma quebra estrutural da média de longo prazo da série temporal, o parâmetro de influência deve ser alto (próximo a 1) para que o limite possa se ajustar rapidamente a essas alterações.

threshold: o parâmetro threshold é o número de desvios padrão da média móvel acima do qual o algoritmo classificará um novo ponto de dados como sinal. Por exemplo, se um novo ponto de dados tiver 4,0 desvios padrão acima da média móvel e o parâmetro threshold estiver definido como 3,5, o algoritmo identificará o ponto de dados como um sinal. Este parâmetro deve ser definido com base em quantos sinais você espera. Por exemplo, se seus dados são normalmente distribuídos, um limite (ou: z-score) de 3,5 corresponde a uma probabilidade de sinalização de 0,00047 (de nesta tabela), o que implica que você espera um sinal uma vez a cada 2128 pontos de dados (1 / 0,00047). O limiar, portanto, influencia diretamente a sensibilidade do algoritmo e, portanto, também a frequência com que o algoritmo sinaliza. Examine seus próprios dados e determine um limite sensível que faça o sinal do algoritmo quando você desejar (algumas tentativas e erros podem ser necessários aqui para atingir um bom limite para o seu propósito).

AVISO: O código acima sempre passa por todos os pontos de dados toda vez que é executado. Ao implementar este código, certifique-se de dividir o cálculo do sinal em uma função separada (sem o loop). Então, quando uma nova datapoint chega, atualização filteredY, avgFiltere stdFilteruma vez. Não recalcule os sinais para todos os dados sempre que houver um novo ponto de dados (como no exemplo acima), que seria extremamente ineficiente e lento!

Outras maneiras de modificar o algoritmo (para possíveis melhorias) são:

  1. Use mediana em vez de média
  2. Use uma medida robusta de escala , como o MAD, em vez do desvio padrão
  3. Use uma margem de sinalização, para que o sinal não mude com muita frequência
  4. Alterar a maneira como o parâmetro de influência funciona
  5. Trate os sinais para cima e para baixo de maneira diferente (tratamento assimétrico)
  6. Crie um influenceparâmetro separado para a média e o padrão ( como feito nesta tradução rápida )

Citações conhecidas (acadêmicas) para esta resposta do StackOverflow:

Outro trabalho usando o algoritmo

Outras aplicações deste algoritmo

Links para outros algoritmos de detecção de pico

Se você usar esta função em algum lugar, por favor, credite-me ou esta resposta. Se você tiver alguma dúvida sobre esse algoritmo, poste-a nos comentários abaixo ou entre em contato comigo no LinkedIn .

O link para movingstd está quebrado, mas você pode encontrar uma descrição do mesmo aqui
@reasra Acontece que a função não precisa de um desvio padrão móvel após a reescrita. Ele agora pode ser usado com simples built-in funções Matlab :)
Estou tentando o código Matlab para obter alguns dados do acelerômetro, mas por algum motivo o thresholdgráfico se torna uma linha verde plana após um grande pico de até 20 nos dados, e permanece assim pelo resto do gráfico ... Se Eu removo o sike, isso não acontece, então parece ser causado pelo pico nos dados. Alguma idéia do que poderia estar acontecendo? Eu sou um novato em Matlab, então eu não consigo entender ...
Magnus W
@BadCash Você pode fornecer um exemplo (com os dados)? Talvez faça sua própria pergunta aqui no SO e me diga o link?
Existem muitas maneiras de melhorar esse algo; portanto, seja criativo (tratamento diferente para cima / baixo; mediana em vez de média; std robusto; escrever o código como uma função com eficiência de memória; margem de limiar para que o sinal não mude com muita frequência etc.) .).

Aqui está a Python/ numpyimplementação do algoritmo de escore z suavizado (veja a resposta acima ). Você pode encontrar a essência aqui .

#!/usr/bin/env python
# Implementation of algorithm from
import numpy as np
import pylab

def thresholding_algo(y, lag, threshold, influence):
    signals = np.zeros(len(y))
    filteredY = np.array(y)
    avgFilter = [0]*len(y)
    stdFilter = [0]*len(y)
    avgFilter[lag - 1] = np.mean(y[0:lag])
    stdFilter[lag - 1] = np.std(y[0:lag])
    for i in range(lag, len(y)):
        if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
            if y[i] > avgFilter[i-1]:
                signals[i] = 1
                signals[i] = -1

            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])
            signals[i] = 0
            filteredY[i] = y[i]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])

    return dict(signals = np.asarray(signals),
                avgFilter = np.asarray(avgFilter),
                stdFilter = np.asarray(stdFilter))

Abaixo está o teste no mesmo conjunto de dados que gera o mesmo gráfico da resposta original para R/Matlab

# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)

# Plot result
pylab.plot(np.arange(1, len(y)+1), y)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"], color="cyan", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)

pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
Por aqui, 'y' é realmente o sinal e 'sinais' é o conjunto de pontos de dados, estou certo em entender?
@TheTank yé a matriz de dados que você transmite, ou signalsé a matriz de saída que indica para cada ponto de dados se esse ponto de dados é um "pico significativo", dadas as configurações usadas. +1-1y[i]
19419 Jean-Paul

Uma abordagem é detectar picos com base na seguinte observação:

  • O tempo t é um pico se (y (t)> y (t-1)) && (y (t)> y (t + 1))

Evita falsos positivos, aguardando até que a tendência de alta termine. Não é exatamente "tempo real" no sentido de que ele perderá o pico em um dt. a sensibilidade pode ser controlada exigindo uma margem para comparação. Há uma troca entre a detecção de ruído e o atraso de tempo na detecção. Você pode enriquecer o modelo adicionando mais parâmetros:

  • Pico se (y (t) - y (t-dt)> m) && (y (t) - y (t + dt)> m)

onde dt e m são parâmetros para controlar a sensibilidade versus atraso

Isto é o que você obtém com o algoritmo mencionado: insira a descrição da imagem aqui

Aqui está o código para reproduzir o gráfico em python:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(signal.nonzero()[0], input[signal], 'ro')

Ao definir m = 0.5, você pode obter um sinal mais limpo com apenas um falso positivo: insira a descrição da imagem aqui

Anterior = melhor, então todos os picos são significativos. Obrigado! Muito legal!
Como eu mudaria a sensibilidade?
Posso pensar em duas abordagens: 1: defina m para um valor maior, para que apenas picos maiores sejam detectados. 2: em vez de calcular y (t) - y (t-dt) (e y (t) - y (t + dt)), você integra de t-dt a t (et a t + dt).
Por quais critérios você está rejeitando os outros 7 picos?
Existe um problema com picos planas, uma vez que o que fazer é basicamente detecção de borda 1-D (como convoluting o sinal com [1 0 -1])

No processamento de sinal, a detecção de pico geralmente é feita via transformada wavelet. Você basicamente faz uma transformação de wavelet discreta nos dados de séries temporais. Cruzamentos de zero nos coeficientes de detalhes retornados corresponderão a picos no sinal da série temporal. Você obtém diferentes amplitudes de pico detectadas em diferentes níveis de coeficiente de detalhes, o que fornece uma resolução em vários níveis.

Sua resposta me deixou neste artigo e nesta resposta que me ajudará a construir um bom algoritmo para minha implementação. Obrigado!
@cklin Você pode explicar como você calcula o cruzamento de zero dos coefs de wavelet, pois eles não estão na mesma escala de tempo da série temporal original. Alguma referência a esse uso?
amigos estão dizendo sobre horace

Tentamos usar o algoritmo de escore z suavizado em nosso conjunto de dados, o que resulta em super-sensibilidade ou sub-sensibilidade (dependendo de como os parâmetros são ajustados), com pouco meio-termo. No sinal de tráfego de nosso site, observamos uma linha de base de baixa frequência que representa o ciclo diário e, mesmo com os melhores parâmetros possíveis (mostrados abaixo), ela ainda é interrompida principalmente no 4º dia, porque a maioria dos pontos de dados é reconhecida como anomalia .

Com base no algoritmo original do z-score, criamos uma maneira de resolver esse problema através da filtragem reversa. Os detalhes do algoritmo modificado e sua aplicação na atribuição de tráfego comercial de TV são publicados no blog da equipe .

insira a descrição da imagem aqui

É legal ver que o algoritmo foi um ponto de partida para sua versão mais avançada. Seus dados têm um padrão muito particular; portanto, faria mais sentido primeiro remover o padrão usando alguma outra técnica e depois aplicar o algo nos resíduos. Como alternativa, convém usar uma janela centralizada em vez de atrasada para calcular a média / Outro comentário: sua solução se move da direita para a esquerda para identificar picos, mas isso não é possível em aplicativos em tempo real (é por isso que o algo original é tão simplista, porque informações futuras são inacessíveis).

Na topologia computacional, a idéia de homologia persistente leva a uma solução eficiente - rápida como números de classificação -. Ele não detecta apenas picos, quantifica o "significado" dos picos de uma maneira natural que permite selecionar os picos significativos para você.

Resumo do algoritmo. Em um cenário unidimensional (série temporal, sinal com valor real), o algoritmo pode ser facilmente descrito pela figura a seguir:

Picos mais persistentes

Pense no gráfico da função (ou em seu conjunto de subnível) como uma paisagem e considere um nível decrescente de água começando no nível infinito (ou 1,8 nesta figura). Enquanto o nível diminui, nas máximas ilhas locais surgem. Em mínimos locais, essas ilhas se fundem. Um detalhe nessa idéia é que a ilha que apareceu mais tarde no tempo é mesclada na ilha mais antiga. A "persistência" de uma ilha é o seu tempo de nascimento menos o seu tempo de morte. Os comprimentos das barras azuis representam a persistência, que é o "significado" mencionado acima de um pico.

Eficiência. Não é muito difícil encontrar uma implementação que seja executada em tempo linear - na verdade, é um loop único e simples - depois que os valores da função foram classificados. Portanto, essa implementação deve ser rápida na prática e também é facilmente implementada.

Referências. Uma descrição completa da história e referências à motivação da homologia persistente (um campo em topologia algébrica computacional) podem ser encontradas aqui:

S. Huber
Esse algoritmo é muito mais rápido e preciso do que, por exemplo, scipy.signal.find_peaks. Para uma série temporal "real" com 1053896 pontos de dados, detectou 137516 picos (13%). A ordem dos picos (primeiro o mais significativo) permite extrair os picos mais significativos. Ele fornece o início, o pico e o fim de cada pico. Funciona bem com dados barulhentos.
Por dados em tempo real, você quer dizer o chamado algoritmo on-line, no qual os pontos de dados são recebidos repetidamente. A importância de um pico pode ser determinada por valores no futuro. Seria bom estender o algoritmo para ficar online, modificando os resultados anteriores sem sacrificar muito a complexidade do tempo.
S. Huber

Foi encontrado outro algoritmo de GH Palshikar em Algoritmos Simples para Detecção de Picos em Séries Temporais .

O algoritmo é assim:

algorithm peak1 // one peak detection algorithms that uses peak function S1 

input T = x1, x2, …, xN, N // input time-series of N points 
input k // window size around the peak 
input h // typically 1 <= h <= 3 
output O // set of peaks detected in T 

O = empty set // initially empty 

    for (i = 1; i < n; i++) do
        // compute peak function value for each of the N points in T 
        a[i] = S1(k,i,xi,T); 
    end for 

    Compute the mean m' and standard deviation s' of all positive values in array a; 

    for (i = 1; i < n; i++) do // remove local peaks which are “small” in global context 
        if (a[i] > 0 && (a[i] – m') >( h * s')) then O = O + {xi}; 
        end if 
    end for 

    Order peaks in O in terms of increasing index in T 

    // retain only one peak out of any set of peaks within distance k of each other 

    for every adjacent pair of peaks xi and xj in O do 
        if |j – i| <= k then remove the smaller value of {xi, xj} from O 
        end if 
    end for 


  • O artigo fornece 5 algoritmos diferentes para detecção de pico
  • Os algoritmos funcionam com dados brutos de séries temporais (não é necessária suavização)


  • Difícil de determinar ke de hantemão
  • Os picos não podem ser baixos (como o terceiro pico nos meus dados de teste)


insira a descrição da imagem aqui

Papel realmente interessante. S4 parece ser uma função melhor para usar em sua opinião. Mas o mais importante é esclarecer quando k <i <Nk não é verdadeiro. Como definir a função S1 (S2, ..) para i = 0, simplesmente não dividi por 2 e ignorei o primeiro operando; para todos os outros, incluí os dois operandos, mas para i <= k havia menos operandos à esquerda então à direita #
daniels_pa 15/11

Aqui está uma implementação do algoritmo Smooth-z-score (acima) em Golang. Ele assume uma fatia de []int16(amostras de PCM 16 bits). Você pode encontrar uma essência aqui .

Settings (the ones below are examples: choose what is best for your data)
set lag to 5;          # lag 5 for the smoothing functions
set threshold to 3.5;  # 3.5 standard deviations for signal
set influence to 0.5;  # between 0 and 1, where 1 is normal influence, 0.5 is half

// ZScore on 16bit WAV samples
func ZScore(samples []int16, lag int, threshold float64, influence float64) (signals []int16) {
    //lag := 20
    //threshold := 3.5
    //influence := 0.5

    signals = make([]int16, len(samples))
    filteredY := make([]int16, len(samples))
    for i, sample := range samples[0:lag] {
        filteredY[i] = sample
    avgFilter := make([]int16, len(samples))
    stdFilter := make([]int16, len(samples))

    avgFilter[lag] = Average(samples[0:lag])
    stdFilter[lag] = Std(samples[0:lag])

    for i := lag + 1; i < len(samples); i++ {

        f := float64(samples[i])

        if float64(Abs(samples[i]-avgFilter[i-1])) > threshold*float64(stdFilter[i-1]) {
            if samples[i] > avgFilter[i-1] {
                signals[i] = 1
            } else {
                signals[i] = -1
            filteredY[i] = int16(influence*f + (1-influence)*float64(filteredY[i-1]))
            avgFilter[i] = Average(filteredY[(i - lag):i])
            stdFilter[i] = Std(filteredY[(i - lag):i])
        } else {
            signals[i] = 0
            filteredY[i] = samples[i]
            avgFilter[i] = Average(filteredY[(i - lag):i])
            stdFilter[i] = Std(filteredY[(i - lag):i])


// Average a chunk of values
func Average(chunk []int16) (avg int16) {
    var sum int64
    for _, sample := range chunk {
        if sample < 0 {
            sample *= -1
        sum += int64(sample)
    return int16(sum / int64(len(chunk)))
Você tentou replicar a saída de exemplo de demonstração do Matlab / R? Essa deve ser uma boa confirmação da qualidade.
19417 Jean-Paul

Aqui está uma implementação em C ++ do algoritmo z-score suavizado desta resposta

std::vector<int> smoothedZScore(std::vector<float> input)
    //lag 5 for the smoothing functions
    int lag = 5;
    //3.5 standard deviations for signal
    float threshold = 3.5;
    //between 0 and 1, where 1 is normal influence, 0.5 is half
    float influence = .5;

    if (input.size() <= lag + 2)
        std::vector<int> emptyVec;
        return emptyVec;

    //Initialise variables
    std::vector<int> signals(input.size(), 0.0);
    std::vector<float> filteredY(input.size(), 0.0);
    std::vector<float> avgFilter(input.size(), 0.0);
    std::vector<float> stdFilter(input.size(), 0.0);
    std::vector<float> subVecStart(input.begin(), input.begin() + lag);
    avgFilter[lag] = mean(subVecStart);
    stdFilter[lag] = stdDev(subVecStart);

    for (size_t i = lag + 1; i < input.size(); i++)
        if (std::abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
            if (input[i] > avgFilter[i - 1])
                signals[i] = 1; //# Positive signal
                signals[i] = -1; //# Negative signal
            //Make influence lower
            filteredY[i] = influence* input[i] + (1 - influence) * filteredY[i - 1];
            signals[i] = 0; //# No signal
            filteredY[i] = input[i];
        //Adjust the filters
        std::vector<float> subVec(filteredY.begin() + i - lag, filteredY.begin() + i);
        avgFilter[i] = mean(subVec);
        stdFilter[i] = stdDev(subVec);
    return signals;
Advertência: Na verdade, essa implementação não fornece um método para calcular a média e o desvio padrão. Para C ++ 11, um método fácil pode ser encontrado aqui:

Esse problema é semelhante ao que encontrei em um curso de sistemas híbridos / embarcados, mas estava relacionado à detecção de falhas quando a entrada de um sensor é barulhenta. Usamos um filtro Kalman para estimar / prever o estado oculto do sistema e, em seguida, usamos análises estatísticas para determinar a probabilidade de ocorrência de uma falha . Estávamos trabalhando com sistemas lineares, mas existem variantes não lineares. Lembro que a abordagem era surpreendentemente adaptável, mas exigia um modelo da dinâmica do sistema.

Peter G
O filtro Kalman é interessante, mas não consigo encontrar um algoritmo aplicável para minha finalidade. Eu aprecio muito a resposta, porém, e examinarei alguns documentos de detecção de pico como este para ver se consigo aprender com qualquer um dos algoritmos. Obrigado!

Implementação de C ++

#include <iostream>
#include <vector>
#include <algorithm>
#include <unordered_map>
#include <cmath>
#include <iterator>
#include <numeric>

using namespace std;

typedef long double ld;
typedef unsigned int uint;
typedef std::vector<ld>::iterator vec_iter_ld;

 * Overriding the ostream operator for pretty printing vectors.
template<typename T>
std::ostream &operator<<(std::ostream &os, std::vector<T> vec) {
    os << "[";
    if (vec.size() != 0) {
        std::copy(vec.begin(), vec.end() - 1, std::ostream_iterator<T>(os, " "));
        os << vec.back();
    os << "]";
    return os;

 * This class calculates mean and standard deviation of a subvector.
 * This is basically stats computation of a subvector of a window size qual to "lag".
class VectorStats {
     * Constructor for VectorStats class.
     * @param start - This is the iterator position of the start of the window,
     * @param end   - This is the iterator position of the end of the window,
    VectorStats(vec_iter_ld start, vec_iter_ld end) {
        this->start = start;
        this->end = end;

     * This method calculates the mean and standard deviation using STL function.
     * This is the Two-Pass implementation of the Mean & Variance calculation.
    void compute() {
        ld sum = std::accumulate(start, end, 0.0);
        uint slice_size = std::distance(start, end);
        ld mean = sum / slice_size;
        std::vector<ld> diff(slice_size);
        std::transform(start, end, diff.begin(), [mean](ld x) { return x - mean; });
        ld sq_sum = std::inner_product(diff.begin(), diff.end(), diff.begin(), 0.0);
        ld std_dev = std::sqrt(sq_sum / slice_size);

        this->m1 = mean;
        this->m2 = std_dev;

    ld mean() {
        return m1;

    ld standard_deviation() {
        return m2;

    vec_iter_ld start;
    vec_iter_ld end;
    ld m1;
    ld m2;

 * This is the implementation of the Smoothed Z-Score Algorithm.
 * This is direction translation of
 * @param input - input signal
 * @param lag - the lag of the moving window
 * @param threshold - the z-score at which the algorithm signals
 * @param influence - the influence (between 0 and 1) of new signals on the mean and standard deviation
 * @return a hashmap containing the filtered signal and corresponding mean and standard deviation.
unordered_map<string, vector<ld>> z_score_thresholding(vector<ld> input, int lag, ld threshold, ld influence) {
    unordered_map<string, vector<ld>> output;

    uint n = (uint) input.size();
    vector<ld> signals(input.size());
    vector<ld> filtered_input(input.begin(), input.end());
    vector<ld> filtered_mean(input.size());
    vector<ld> filtered_stddev(input.size());

    VectorStats lag_subvector_stats(input.begin(), input.begin() + lag);
    filtered_mean[lag - 1] = lag_subvector_stats.mean();
    filtered_stddev[lag - 1] = lag_subvector_stats.standard_deviation();

    for (int i = lag; i < n; i++) {
        if (abs(input[i] - filtered_mean[i - 1]) > threshold * filtered_stddev[i - 1]) {
            signals[i] = (input[i] > filtered_mean[i - 1]) ? 1.0 : -1.0;
            filtered_input[i] = influence * input[i] + (1 - influence) * filtered_input[i - 1];
        } else {
            signals[i] = 0.0;
            filtered_input[i] = input[i];
        VectorStats lag_subvector_stats(filtered_input.begin() + (i - lag), filtered_input.begin() + i);
        filtered_mean[i] = lag_subvector_stats.mean();
        filtered_stddev[i] = lag_subvector_stats.standard_deviation();

    output["signals"] = signals;
    output["filtered_mean"] = filtered_mean;
    output["filtered_stddev"] = filtered_stddev;

    return output;

int main() {
    vector<ld> input = {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0,
                        1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0,
                        1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0, 3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0,
                        1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0, 1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

    int lag = 30;
    ld threshold = 5.0;
    ld influence = 0.0;
    unordered_map<string, vector<ld>> output = z_score_thresholding(input, lag, threshold, influence);
    cout << output["signals"] << endl;
Seguindo a solução proposta por Jean-Paul, eu implementei seu algoritmo em C #

public class ZScoreOutput
    public List<double> input;
    public List<int> signals;
    public List<double> avgFilter;
    public List<double> filtered_stddev;

public static class ZScore
    public static ZScoreOutput StartAlgo(List<double> input, int lag, double threshold, double influence)
        // init variables!
        int[] signals = new int[input.Count];
        double[] filteredY = new List<double>(input).ToArray();
        double[] avgFilter = new double[input.Count];
        double[] stdFilter = new double[input.Count];

        var initialWindow = new List<double>(filteredY).Skip(0).Take(lag).ToList();

        avgFilter[lag - 1] = Mean(initialWindow);
        stdFilter[lag - 1] = StdDev(initialWindow);

        for (int i = lag; i < input.Count; i++)
            if (Math.Abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
                signals[i] = (input[i] > avgFilter[i - 1]) ? 1 : -1;
                filteredY[i] = influence * input[i] + (1 - influence) * filteredY[i - 1];
                signals[i] = 0;
                filteredY[i] = input[i];

            // Update rolling average and deviation
            var slidingWindow = new List<double>(filteredY).Skip(i - lag).Take(lag+1).ToList();

            var tmpMean = Mean(slidingWindow);
            var tmpStdDev = StdDev(slidingWindow);

            avgFilter[i] = Mean(slidingWindow);
            stdFilter[i] = StdDev(slidingWindow);

        // Copy to convenience class 
        var result = new ZScoreOutput();
        result.input = input;
        result.avgFilter       = new List<double>(avgFilter);
        result.signals         = new List<int>(signals);
        result.filtered_stddev = new List<double>(stdFilter);

        return result;

    private static double Mean(List<double> list)
        // Simple helper function! 
        return list.Average();

    private static double StdDev(List<double> values)
        double ret = 0;
        if (values.Count() > 0)
            double avg = values.Average();
            double sum = values.Sum(d => Math.Pow(d - avg, 2));
            ret = Math.Sqrt((sum) / (values.Count() - 1));
        return ret;

Exemplo de uso:

var input = new List<double> {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0,
    1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9,
    1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0, 1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0,
    3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0, 1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0,
    1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

int lag = 30;
double threshold = 5.0;
double influence = 0.0;

var output = ZScore.StartAlgo(input, lag, threshold, influence);
Ocean Airdrop
Aqui está uma implementação em C do Z-score Smoothed de Jean-Paul para o microcontrolador Arduino usado para fazer leituras do acelerômetro e decidir se a direção de um impacto veio da esquerda ou da direita. Isso funciona muito bem, pois este dispositivo retorna um sinal devolvido. Aqui está uma entrada para este algoritmo de detecção de pico do dispositivo - mostrando um impacto da direita seguido por e da esquerda. Você pode ver o pico inicial e a oscilação do sensor.

insira a descrição da imagem aqui

#include <stdio.h>
#include <math.h>
#include <string.h>

#define SAMPLE_LENGTH 1000

float stddev(float data[], int len);
float mean(float data[], int len);
void thresholding(float y[], int signals[], int lag, float threshold, float influence);

void thresholding(float y[], int signals[], int lag, float threshold, float influence) {
    memset(signals, 0, sizeof(float) * SAMPLE_LENGTH);
    float filteredY[SAMPLE_LENGTH];
    memcpy(filteredY, y, sizeof(float) * SAMPLE_LENGTH);
    float avgFilter[SAMPLE_LENGTH];
    float stdFilter[SAMPLE_LENGTH];

    avgFilter[lag - 1] = mean(y, lag);
    stdFilter[lag - 1] = stddev(y, lag);

    for (int i = lag; i < SAMPLE_LENGTH; i++) {
        if (fabsf(y[i] - avgFilter[i-1]) > threshold * stdFilter[i-1]) {
            if (y[i] > avgFilter[i-1]) {
                signals[i] = 1;
            } else {
                signals[i] = -1;
            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1];
        } else {
            signals[i] = 0;
        avgFilter[i] = mean(filteredY + i-lag, lag);
        stdFilter[i] = stddev(filteredY + i-lag, lag);

float mean(float data[], int len) {
    float sum = 0.0, mean = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        sum += data[i];

    mean = sum/len;
    return mean;


float stddev(float data[], int len) {
    float the_mean = mean(data, len);
    float standardDeviation = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        standardDeviation += pow(data[i] - the_mean, 2);

    return sqrt(standardDeviation/len);

int main() {
    printf("Hello, World!\n");
    int lag = 100;
    float threshold = 5;
    float influence = 0;
    float y[]=  {1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,

    int signal[SAMPLE_LENGTH];

    thresholding(y, signal,  lag, threshold, influence);

    return 0;

O resultado é dela com influência = 0

insira a descrição da imagem aqui

Não é ótimo, mas aqui com influência = 1

insira a descrição da imagem aqui

o que é muito bom


Aqui está uma implementação Java real baseada no resposta Groovy publicada anteriormente. (Eu sei que já existem implementações de Groovy e Kotlin, mas para alguém como eu, que só fez Java, é um verdadeiro aborrecimento descobrir como converter entre outras linguagens e Java).

(Os resultados correspondem aos gráficos de outras pessoas)

Implementação de algoritmo

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.commons.math3.stat.descriptive.SummaryStatistics;

public class SignalDetector {

    public HashMap<String, List> analyzeDataForSignals(List<Double> data, int lag, Double threshold, Double influence) {

        // init stats instance
        SummaryStatistics stats = new SummaryStatistics();

        // the results (peaks, 1 or -1) of our algorithm
        List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(data.size(), 0));

        // filter out the signals (peaks) from our original list (using influence arg)
        List<Double> filteredData = new ArrayList<Double>(data);

        // the current average of the rolling window
        List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(data.size(), 0.0d));

        // the current standard deviation of the rolling window
        List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(data.size(), 0.0d));

        // init avgFilter and stdFilter
        for (int i = 0; i < lag; i++) {
        avgFilter.set(lag - 1, stats.getMean());
        stdFilter.set(lag - 1, Math.sqrt(stats.getPopulationVariance())); // getStandardDeviation() uses sample variance

        // loop input starting at end of rolling window
        for (int i = lag; i < data.size(); i++) {

            // if the distance between the current value and average is enough standard deviations (threshold) away
            if (Math.abs((data.get(i) - avgFilter.get(i - 1))) > threshold * stdFilter.get(i - 1)) {

                // this is a signal (i.e. peak), determine if it is a positive or negative signal
                if (data.get(i) > avgFilter.get(i - 1)) {
                    signals.set(i, 1);
                } else {
                    signals.set(i, -1);

                // filter this signal out using influence
                filteredData.set(i, (influence * data.get(i)) + ((1 - influence) * filteredData.get(i - 1)));
            } else {
                // ensure this signal remains a zero
                signals.set(i, 0);
                // ensure this value is not filtered
                filteredData.set(i, data.get(i));

            // update rolling average and deviation
            for (int j = i - lag; j < i; j++) {
            avgFilter.set(i, stats.getMean());
            stdFilter.set(i, Math.sqrt(stats.getPopulationVariance()));

        HashMap<String, List> returnMap = new HashMap<String, List>();
        returnMap.put("signals", signals);
        returnMap.put("filteredData", filteredData);
        returnMap.put("avgFilter", avgFilter);
        returnMap.put("stdFilter", stdFilter);

        return returnMap;

    } // end

Método principal

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class Main {

    public static void main(String[] args) throws Exception {
        DecimalFormat df = new DecimalFormat("#0.000");

        ArrayList<Double> data = new ArrayList<Double>(Arrays.asList(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d,
                1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d, 1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d,
                1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d, 1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d,
                0.9d, 1d, 1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d));

        SignalDetector signalDetector = new SignalDetector();
        int lag = 30;
        double threshold = 5;
        double influence = 0;

        HashMap<String, List> resultsMap = signalDetector.analyzeDataForSignals(data, lag, threshold, influence);
        // print algorithm params
        System.out.println("lag: " + lag + "\t\tthreshold: " + threshold + "\t\tinfluence: " + influence);

        System.out.println("Data size: " + data.size());
        System.out.println("Signals size: " + resultsMap.get("signals").size());

        // print data
        for (double d : data) {
            System.out.print(df.format(d) + "\t");

        // print signals
        List<Integer> signalsList = resultsMap.get("signals");
        for (int i : signalsList) {
            System.out.print(df.format(i) + "\t");

        // print filtered data
        System.out.print("Filtered Data:\t");
        List<Double> filteredDataList = resultsMap.get("filteredData");
        for (double d : filteredDataList) {
            System.out.print(df.format(d) + "\t");

        // print running average
        System.out.print("Avg Filter:\t");
        List<Double> avgFilterList = resultsMap.get("avgFilter");
        for (double d : avgFilterList) {
            System.out.print(df.format(d) + "\t");

        // print running std
        System.out.print("Std filter:\t");
        List<Double> stdFilterList = resultsMap.get("stdFilter");
        for (double d : stdFilterList) {
            System.out.print(df.format(d) + "\t");

        for (int i = 0; i < signalsList.size(); i++) {
            if (signalsList.get(i) != 0) {
                System.out.println("Point " + i + " gave signal " + signalsList.get(i));


lag: 30     threshold: 5.0      influence: 0.0
Data size: 74
Signals size: 74
Data:           1.000   1.000   1.100   1.000   0.900   1.000   1.000   1.100   1.000   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.000   1.100   1.000   1.000   1.000   1.000   1.100   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.100   1.000   1.000   1.100   1.000   0.800   0.900   1.000   1.200   0.900   1.000   1.000   1.100   1.200   1.000   1.500   1.000   3.000   2.000   5.000   3.000   2.000   1.000   1.000   1.000   0.900   1.000   1.000   3.000   2.600   4.000   3.000   3.200   2.000   1.000   1.000   0.800   4.000   4.000   2.000   2.500   1.000   1.000   1.000   
Signals:        0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   1.000   0.000   1.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   0.000   0.000   0.000   1.000   1.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   
Filtered Data:  1.000   1.000   1.100   1.000   0.900   1.000   1.000   1.100   1.000   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.000   1.100   1.000   1.000   1.000   1.000   1.100   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.100   1.000   1.000   1.100   1.000   0.800   0.900   1.000   1.200   0.900   1.000   1.000   1.100   1.200   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   0.900   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   0.800   0.800   0.800   0.800   0.800   1.000   1.000   1.000   
Avg Filter:     0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   1.003   1.003   1.007   1.007   1.003   1.007   1.010   1.003   1.000   0.997   1.003   1.003   1.003   1.000   1.003   1.010   1.013   1.013   1.013   1.010   1.010   1.010   1.010   1.010   1.007   1.010   1.010   1.003   1.003   1.003   1.007   1.007   1.003   1.003   1.003   1.000   1.000   1.007   1.003   0.997   0.983   0.980   0.973   0.973   0.970   
Std filter:     0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.060   0.060   0.063   0.063   0.060   0.063   0.060   0.071   0.073   0.071   0.080   0.080   0.080   0.077   0.080   0.087   0.085   0.085   0.085   0.083   0.083   0.083   0.083   0.083   0.081   0.079   0.079   0.080   0.080   0.080   0.077   0.077   0.075   0.075   0.075   0.073   0.073   0.063   0.071   0.080   0.078   0.083   0.089   0.089   0.086   

Point 45 gave signal 1
Point 47 gave signal 1
Point 48 gave signal 1
Point 49 gave signal 1
Point 50 gave signal 1
Point 51 gave signal 1
Point 58 gave signal 1
Point 59 gave signal 1
Point 60 gave signal 1
Point 61 gave signal 1
Point 62 gave signal 1
Point 63 gave signal 1
Point 67 gave signal 1
Point 68 gave signal 1
Point 69 gave signal 1
Point 70 gave signal 1

Gráficos mostrando dados e resultados da execução em java


Apêndice 1 à resposta original: Matlabe Rtraduções

Código Matlab

function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
% Initialise signal results
signals = zeros(length(y),1);
% Initialise filtered series
filteredY = y(1:lag+1);
% Initialise filters
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
% Loop over all datapoints y(lag+2),...,y(t)
for i=lag+2:length(y)
    % If new value is a specified number of deviations away
    if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
        if y(i) > avgFilter(i-1)
            % Positive signal
            signals(i) = 1;
            % Negative signal
            signals(i) = -1;
        % Make influence lower
        filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
        % No signal
        signals(i) = 0;
        filteredY(i) = y(i);
    % Adjust the filters
    avgFilter(i) = mean(filteredY(i-lag:i));
    stdFilter(i) = std(filteredY(i-lag:i));
% Done, now return results


% Data
y = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1,...
    1 1 1.1 0.9 1 1.1 1 1 0.9 1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1,...
    1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1,...
    1 3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

% Settings
lag = 30;
threshold = 5;
influence = 0;

% Get results
[signals,avg,dev] = ThresholdingAlgo(y,lag,threshold,influence);

figure; subplot(2,1,1); hold on;
x = 1:length(y); ix = lag+1:length(y);
area(x(ix),avg(ix)+threshold*dev(ix),'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
area(x(ix),avg(ix)-threshold*dev(ix),'FaceColor',[1 1 1],'EdgeColor','none');
stairs(signals,'r','LineWidth',1.5); ylim([-1.5 1.5]);

Código R

ThresholdingAlgo <- function(y,lag,threshold,influence) {
  signals <- rep(0,length(y))
  filteredY <- y[0:lag]
  avgFilter <- NULL
  stdFilter <- NULL
  avgFilter[lag] <- mean(y[0:lag], na.rm=TRUE)
  stdFilter[lag] <- sd(y[0:lag], na.rm=TRUE)
  for (i in (lag+1):length(y)){
    if (abs(y[i]-avgFilter[i-1]) > threshold*stdFilter[i-1]) {
      if (y[i] > avgFilter[i-1]) {
        signals[i] <- 1;
      } else {
        signals[i] <- -1;
      filteredY[i] <- influence*y[i]+(1-influence)*filteredY[i-1]
    } else {
      signals[i] <- 0
      filteredY[i] <- y[i]
    avgFilter[i] <- mean(filteredY[(i-lag):i], na.rm=TRUE)
    stdFilter[i] <- sd(filteredY[(i-lag):i], na.rm=TRUE)


# Data
y <- c(1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,

lag       <- 30
threshold <- 5
influence <- 0

# Run algo with lag = 30, threshold = 5, influence = 0
result <- ThresholdingAlgo(y,lag,threshold,influence)

# Plot result
par(mfrow = c(2,1),oma = c(2,2,0,0) + 0.1,mar = c(0,0,2,1) + 0.2)

Este código (ambos os idiomas) produzirá o seguinte resultado para os dados da pergunta original:

Exemplo de limiar do código do Matlab

Apêndice 2 da resposta original: Matlab código de demonstração

(clique para criar dados)

Demonstração do Matlab

function [] = RobustThresholdingDemo()

lag         = 5;       % lag for the smoothing
threshold   = 3.5;     % number of away from the mean to signal
influence   = 0.3;     % when signal: how much influence for new data? (between 0 and 1)
                       % 1 is normal influence, 0.5 is half      


function [signals,avgFilter,stdFilter] = ThresholdingAlgo(y,lag,threshold,influence)
signals = zeros(length(y),1);
filteredY = y(1:lag+1);
avgFilter(lag+1,1) = mean(y(1:lag+1));
stdFilter(lag+1,1) = std(y(1:lag+1));
for i=lag+2:length(y)
    if abs(y(i)-avgFilter(i-1)) > threshold*stdFilter(i-1)
        if y(i) > avgFilter(i-1)
            signals(i) = 1;
            signals(i) = -1;
        filteredY(i) = influence*y(i)+(1-influence)*filteredY(i-1);
        signals(i) = 0;
        filteredY(i) = y(i);
    avgFilter(i) = mean(filteredY(i-lag:i));
    stdFilter(i) = std(filteredY(i-lag:i));

% Demo screen function
function [] = DemoScreen(n,lag,threshold,influence)
figure('Position',[200 100,1000,500]);
title(sprintf(['Draw data points (%.0f max)      [settings: lag = %.0f, '...
    'threshold = %.2f, influence = %.2f]'],n,lag,threshold,influence));
ylim([0 5]); xlim([0 50]);
H = gca; subplot(2,1,1);
set(H, 'YLimMode', 'manual'); set(H, 'XLimMode', 'manual');
set(H, 'YLim', get(H,'YLim')); set(H, 'XLim', get(H,'XLim'));
xg = []; yg = [];
for i=1:n
        [xi,yi] = ginput(1);
    xg = [xg xi]; yg = [yg yi];
    if i == 1
        subplot(2,1,1); hold on;
        plot(H, xg(i),yg(i),'r.'); 
    if length(xg) > lag
        [signals,avg,dev] = ...
            'FaceColor',[0.9 0.9 0.9],'EdgeColor','none');
            'FaceColor',[1 1 1],'EdgeColor','none');
        subplot(2,1,2); hold on; title('Signal output');
        ylim([-2 2]); xlim([0 50]); hold off;
    subplot(2,1,1); hold on;
    for j=2:i
        plot(xg([j-1:j]),yg([j-1:j]),'r'); plot(H,xg(j),yg(j),'r.');

Aqui está minha tentativa de criar uma solução Ruby para o "Smoothed z-score algo" a partir da resposta aceita:

module ThresholdingAlgoMixin
  def mean(array)
    array.reduce(&:+) / array.size.to_f

  def stddev(array)
    array_mean = mean(array)
    Math.sqrt(array.reduce(0.0) { |a, b| a.to_f + ((b.to_f - array_mean) ** 2) } / array.size.to_f)

  def thresholding_algo(lag: 5, threshold: 3.5, influence: 0.5)
    return nil if size < lag * 2, 0).tap do |signals|
      filtered =

      initial_slice = take(lag)
      avg_filter = - 1, 0.0) + [mean(initial_slice)]
      std_filter = - 1, 0.0) + [stddev(initial_slice)]
      (lag..size-1).each do |idx|
        prev = idx - 1
        if (fetch(idx) - avg_filter[prev]).abs > threshold * std_filter[prev]
          signals[idx] = fetch(idx) > avg_filter[prev] ? 1 : -1
          filtered[idx] = (influence * fetch(idx)) + ((1-influence) * filtered[prev])

        filtered_slice = filtered[idx-lag..prev]
        avg_filter[idx] = mean(filtered_slice)
        std_filter[idx] = stddev(filtered_slice)

E exemplo de uso:

test_data = [
  1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1,
  1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1,
  1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5,
  1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2,
  1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1

puts test_data.thresholding_algo.inspect

# Output: [
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
#   1, 1, 0, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0
# ]
Uma versão iterativa em python / numpy para a resposta está aqui. Esse código é mais rápido que o cálculo da média e do desvio padrão a cada atraso para grandes dados (100000+).

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    iterative smoothed z-score algorithm
    Implementation of algorithm from
    import numpy as np
    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return dict(signals=labels,
Tranfer Will

Pensei em fornecer minha implementação Julia do algoritmo para outros. A essência pode ser encontrada aqui

using Statistics
using Plots
function SmoothedZscoreAlgo(y, lag, threshold, influence)
    # Julia implimentation of
    n = length(y)
    signals = zeros(n) # init signal results
    filteredY = copy(y) # init filtered series
    avgFilter = zeros(n) # init average filter
    stdFilter = zeros(n) # init std filter
    avgFilter[lag - 1] = mean(y[1:lag]) # init first value
    stdFilter[lag - 1] = std(y[1:lag]) # init first value

    for i in range(lag, stop=n-1)
        if abs(y[i] - avgFilter[i-1]) > threshold*stdFilter[i-1]
            if y[i] > avgFilter[i-1]
                signals[i] += 1 # postive signal
                signals[i] += -1 # negative signal
            # Make influence lower
            filteredY[i] = influence*y[i] + (1-influence)*filteredY[i-1]
            signals[i] = 0
            filteredY[i] = y[i]
        avgFilter[i] = mean(filteredY[i-lag+1:i])
        stdFilter[i] = std(filteredY[i-lag+1:i])
    return (signals = signals, avgFilter = avgFilter, stdFilter = stdFilter)

# Data
y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

results = SmoothedZscoreAlgo(y, lag, threshold, influence)
upper_bound = results[:avgFilter] + threshold * results[:stdFilter]
lower_bound = results[:avgFilter] - threshold * results[:stdFilter]
x = 1:length(y)

yplot = plot(x,y,color="blue", label="Y",legend=:topleft)
yplot = plot!(x,upper_bound, color="green", label="Upper Bound",legend=:topleft)
yplot = plot!(x,results[:avgFilter], color="cyan", label="Average Filter",legend=:topleft)
yplot = plot!(x,lower_bound, color="green", label="Lower Bound",legend=:topleft)
signalplot = plot(x,results[:signals],color="red",label="Signals",legend=:topleft)


Aqui está uma implementação Groovy (Java) do algoritmo z-score suavizado ( veja a resposta acima ).

 * "Smoothed zero-score alogrithm" shamelessly copied from
 *  Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
 * @param y - The input vector to analyze
 * @param lag - The lag of the moving window (i.e. how big the window is)
 * @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
 * @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
 * @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)

public HashMap<String, List<Object>> thresholdingAlgo(List<Double> y, Long lag, Double threshold, Double influence) {
    //init stats instance
    SummaryStatistics stats = new SummaryStatistics()

    //the results (peaks, 1 or -1) of our algorithm
    List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(y.size(), 0))
    //filter out the signals (peaks) from our original list (using influence arg)
    List<Double> filteredY = new ArrayList<Double>(y)
    //the current average of the rolling window
    List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
    //the current standard deviation of the rolling window
    List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
    //init avgFilter and stdFilter
    (0..lag-1).each { stats.addValue(y[it as int]) }
    avgFilter[lag - 1 as int] = stats.getMean()
    stdFilter[lag - 1 as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
    //loop input starting at end of rolling window
    (lag..y.size()-1).each { i ->
        //if the distance between the current value and average is enough standard deviations (threshold) away
        if (Math.abs((y[i as int] - avgFilter[i - 1 as int]) as Double) > threshold * stdFilter[i - 1 as int]) {
            //this is a signal (i.e. peak), determine if it is a positive or negative signal
            signals[i as int] = (y[i as int] > avgFilter[i - 1 as int]) ? 1 : -1
            //filter this signal out using influence
            filteredY[i as int] = (influence * y[i as int]) + ((1-influence) * filteredY[i - 1 as int])
        } else {
            //ensure this signal remains a zero
            signals[i as int] = 0
            //ensure this value is not filtered
            filteredY[i as int] = y[i as int]
        //update rolling average and deviation
        (i - lag..i-1).each { stats.addValue(filteredY[it as int] as Double) }
        avgFilter[i as int] = stats.getMean()
        stdFilter[i as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)

    return [
        signals  : signals,
        avgFilter: avgFilter,
        stdFilter: stdFilter

Abaixo está um teste no mesmo conjunto de dados que produz os mesmos resultados da implementação Python / numpy acima .

    // Data
    def y = [1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
         1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
         1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
         1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d]

    // Settings
    def lag = 30
    def threshold = 5
    def influence = 0

    def thresholdingResults = thresholdingAlgo((List<Double>) y, (Long) lag, (Double) threshold, (Double) influence)

    println y.size()
    println thresholdingResults.signals.size()
    println thresholdingResults.signals

    thresholdingResults.signals.eachWithIndex { x, idx ->
        if (x) {
            println y[idx]

Aqui está uma versão Scala (não-idiomática) do algoritmo de z-score suavizado :

  * Smoothed zero-score alogrithm shamelessly copied from
  * Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
  * @param y - The input vector to analyze
  * @param lag - The lag of the moving window (i.e. how big the window is)
  * @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
  * @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
  * @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
private def smoothedZScore(y: Seq[Double], lag: Int, threshold: Double, influence: Double): Seq[Int] = {
  val stats = new SummaryStatistics()

  // the results (peaks, 1 or -1) of our algorithm
  val signals = mutable.ArrayBuffer.fill(y.length)(0)

  // filter out the signals (peaks) from our original list (using influence arg)
  val filteredY =[mutable.ArrayBuffer]

  // the current average of the rolling window
  val avgFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // the current standard deviation of the rolling window
  val stdFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // init avgFilter and stdFilter
  y.take(lag).foreach(s => stats.addValue(s))

  avgFilter(lag - 1) = stats.getMean
  stdFilter(lag - 1) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)

  // loop input starting at end of rolling window
  y.zipWithIndex.slice(lag, y.length - 1).foreach {
    case (s: Double, i: Int) =>
      // if the distance between the current value and average is enough standard deviations (threshold) away
      if (Math.abs(s - avgFilter(i - 1)) > threshold * stdFilter(i - 1)) {
        // this is a signal (i.e. peak), determine if it is a positive or negative signal
        signals(i) = if (s > avgFilter(i - 1)) 1 else -1
        // filter this signal out using influence
        filteredY(i) = (influence * s) + ((1 - influence) * filteredY(i - 1))
      } else {
        // ensure this signal remains a zero
        signals(i) = 0
        // ensure this value is not filtered
        filteredY(i) = s

      // update rolling average and deviation
      filteredY.slice(i - lag, i).foreach(s => stats.addValue(s))
      avgFilter(i) = stats.getMean
      stdFilter(i) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)


  signals.zipWithIndex.foreach {
    case(x: Int, idx: Int) =>
      if (x == 1) {
        println(idx + " " + y(idx))

  val data = { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "y", "row" -> "data") } ++ { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "avgFilter", "row" -> "data") } ++ { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s - threshold * stdFilter(i)), "name" -> "lower", "row" -> "data") } ++ { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s + threshold * stdFilter(i)), "name" -> "upper", "row" -> "data") } ++ { case (s: Int, i: Int) => Map("x" -> i, "y" -> s, "name" -> "signal", "row" -> "signal") }

  Vegas("Smoothed Z")
    .encodeX("x", Quant)
    .encodeY("y", Quant)
    .encodeRow("row", Ordinal)

  return signals

Aqui está um teste que retorna os mesmos resultados que as versões Python e Groovy:

val y = List(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
  1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
  1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
  1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d)

val lag = 30
val threshold = 5d
val influence = 0d

smoothedZScore(y, lag, threshold, influence)

vegas gráfico de resultado

Gist aqui

Eu precisava de algo assim no meu projeto Android. Pensei que eu poderia devolver a implementação do Kotlin .

* Smoothed zero-score alogrithm shamelessly copied from
* Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
fun smoothedZScore(y: List<Double>, lag: Int, threshold: Double, influence: Double): Triple<List<Int>, List<Double>, List<Double>> {
    val stats = SummaryStatistics()
    // the results (peaks, 1 or -1) of our algorithm
    val signals = MutableList<Int>(y.size, { 0 })
    // filter out the signals (peaks) from our original list (using influence arg)
    val filteredY = ArrayList<Double>(y)
    // the current average of the rolling window
    val avgFilter = MutableList<Double>(y.size, { 0.0 })
    // the current standard deviation of the rolling window
    val stdFilter = MutableList<Double>(y.size, { 0.0 })
    // init avgFilter and stdFilter
    y.take(lag).forEach { s -> stats.addValue(s) }
    avgFilter[lag - 1] = stats.mean
    stdFilter[lag - 1] = Math.sqrt(stats.populationVariance) // getStandardDeviation() uses sample variance (not what we want)
    //loop input starting at end of rolling window
    (lag..y.size - 1).forEach { i ->
        //if the distance between the current value and average is enough standard deviations (threshold) away
        if (Math.abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]) {
            //this is a signal (i.e. peak), determine if it is a positive or negative signal
            signals[i] = if (y[i] > avgFilter[i - 1]) 1 else -1
            //filter this signal out using influence
            filteredY[i] = (influence * y[i]) + ((1 - influence) * filteredY[i - 1])
        } else {
            //ensure this signal remains a zero
            signals[i] = 0
            //ensure this value is not filtered
            filteredY[i] = y[i]
        //update rolling average and deviation
        (i - lag..i - 1).forEach { stats.addValue(filteredY[it]) }
        avgFilter[i] = stats.getMean()
        stdFilter[i] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
    return Triple(signals, avgFilter, stdFilter)

insira a descrição da imagem aqui

Aqui está uma versão Fortran alterada do algoritmo z-score . Ele é alterado especificamente para a detecção de pico (ressonância) nas funções de transferência no espaço de frequência (cada alteração possui um pequeno comentário no código).

A primeira modificação avisa o usuário se houver uma ressonância próxima ao limite inferior do vetor de entrada, indicada por um desvio padrão superior a um determinado limite (10% neste caso). Isso significa simplesmente que o sinal não é plano o suficiente para a detecção inicializar os filtros corretamente.

A segunda modificação é que apenas o valor mais alto de um pico é adicionado aos picos encontrados. Isso é alcançado comparando cada valor de pico encontrado com a magnitude de seus antecessores (lag) e seus sucessores (lag).

A terceira mudança é respeitar que os picos de ressonância geralmente mostram alguma forma de simetria em torno da frequência de ressonância. Portanto, é natural calcular a média e o padrão simetricamente em torno do ponto de dados atual (em vez de apenas para os predecessores). Isso resulta em um melhor comportamento de detecção de pico.

As modificações têm o efeito de que todo o sinal deve ser conhecido antecipadamente pela função, que é o caso usual para a detecção de ressonância (algo como o Exemplo Matlab de Jean-Paul, onde os pontos de dados são gerados em tempo real não funciona).

function PeakDetect(y,lag,threshold, influence)
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer, dimension(size(y)) :: PeakDetect
    real, dimension(size(y)) :: filteredY, avgFilter, stdFilter
    integer :: lag, ii
    real :: threshold, influence

    ! Executing part
    PeakDetect = 0
    filteredY = 0.0
    filteredY(1:lag+1) = y(1:lag+1)
    avgFilter = 0.0
    avgFilter(lag+1) = mean(y(1:2*lag+1))
    stdFilter = 0.0
    stdFilter(lag+1) = std(y(1:2*lag+1))

    if (stdFilter(lag+1)/avgFilter(lag+1)>0.1) then ! If the coefficient of variation exceeds 10%, the signal is too uneven at the start, possibly because of a peak.
1001        format(1X,'Warning: Peak detection might have failed, as there may be a peak at the edge of the frequency range.',/)
    end if
    do ii = lag+2, size(y)
        if (abs(y(ii) - avgFilter(ii-1)) > threshold * stdFilter(ii-1)) then
            ! Find only the largest outstanding value which is only the one greater than its predecessor and its successor
            if (y(ii) > avgFilter(ii-1) .AND. y(ii) > y(ii-1) .AND. y(ii) > y(ii+1)) then
                PeakDetect(ii) = 1
            end if
            filteredY(ii) = influence * y(ii) + (1 - influence) * filteredY(ii-1)
            filteredY(ii) = y(ii)
        end if
        ! Modified with respect to the original code. Mean and standard deviation are calculted symmetrically around the current point
        avgFilter(ii) = mean(filteredY(ii-lag:ii+lag))
        stdFilter(ii) = std(filteredY(ii-lag:ii+lag))
    end do
end function PeakDetect

real function mean(y)
    !> @brief Calculates the mean of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    mean = sum(y)/N
end function mean

real function std(y)
    !> @brief Calculates the standard deviation of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    std = sqrt((N*dot_product(y,y) - sum(y)**2) / (N*(N-1)))
end function std

Para minha aplicação, o algoritmo funciona como um encanto! insira a descrição da imagem aqui


Se você obteve seus dados em uma tabela de banco de dados, aqui está uma versão SQL de um algoritmo simples de z-score:

with data_with_zscore as (
        value / (avg(value) over ()) as pct_of_mean,
        (value - avg(value) over ()) / (stdev(value) over ()) as z_score
    from {{tablename}}  where datetime > '2018-11-26' and datetime < '2018-12-03'

-- select all
select * from data_with_zscore 

-- select only points greater than a certain threshold
select * from data_with_zscore where z_score > abs(2)
Ocean Airdrop
Seu código faz algo diferente do algoritmo que propus. Sua consulta simplesmente calcula z-scores ([data point - mean] / std), mas não incorpora a lógica do meu algoritmo que ignora sinais passados ​​ao calcular novos limites de sinal. Você também ignora os três parâmetros (atraso, influência, limite). Você poderia revisar sua resposta para incorporar a lógica real?
31418 Jean-Paul
Sim, você está certo. No começo, pensei que poderia me safar da versão simplificada acima. Desde então, peguei sua solução completa e a transportai para C #. Veja minha resposta abaixo. Quando tiver mais tempo, voltarei a visitar esta versão do SQL e incorporarei seu algoritmo. A propósito, obrigado por uma ótima resposta e explicação visual.
Ocean Airdrop
Não há problema e feliz que o algoritmo possa ajudá-lo! Obrigado pelo envio do C #, que ainda estava faltando. Vou adicioná-lo à lista de traduções!

Versão em Python que funciona com fluxos em tempo real (não recalcula todos os pontos de dados na chegada de cada novo ponto de dados). Você pode ajustar o que a função de classe retorna - para os meus propósitos, eu apenas precisava dos sinais.

import numpy as np

class real_time_peak_detection():
    def __init__(self, array, lag, threshold, influence):
        self.y = list(array)
        self.length = len(self.y)
        self.lag = lag
        self.threshold = threshold
        self.influence = influence
        self.signals = [0] * len(self.y)
        self.filteredY = np.array(self.y).tolist()
        self.avgFilter = [0] * len(self.y)
        self.stdFilter = [0] * len(self.y)
        self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
        self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()

    def thresholding_algo(self, new_value):
        i = len(self.y) - 1
        self.length = len(self.y)
        if i < self.lag:
            return 0
        elif i == self.lag:
            self.signals = [0] * len(self.y)
            self.filteredY = np.array(self.y).tolist()
            self.avgFilter = [0] * len(self.y)
            self.stdFilter = [0] * len(self.y)
            self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
            self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()
            return 0

        self.signals += [0]
        self.filteredY += [0]
        self.avgFilter += [0]
        self.stdFilter += [0]

        if abs(self.y[i] - self.avgFilter[i - 1]) > self.threshold * self.stdFilter[i - 1]:
            if self.y[i] > self.avgFilter[i - 1]:
                self.signals[i] = 1
                self.signals[i] = -1

            self.filteredY[i] = self.influence * self.y[i] + (1 - self.influence) * self.filteredY[i - 1]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
            self.signals[i] = 0
            self.filteredY[i] = self.y[i]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])

        return self.signals[i]
Eu me permiti criar uma versão javascript dele. Pode ser útil. O javascript deve ser uma transcrição direta do Pseudocódigo fornecido acima. Disponível como pacote npm e repositório github:

Tradução Javascript:

// javascript port of: /programming/22583391/peak-signal-detection-in-realtime-timeseries-data/48895639#48895639

function sum(a) {
    return a.reduce((acc, val) => acc + val)

function mean(a) {
    return sum(a) / a.length

function stddev(arr) {
    const arr_mean = mean(arr)
    const r = function(acc, val) {
        return acc + ((val - arr_mean) * (val - arr_mean))
    return Math.sqrt(arr.reduce(r, 0.0) / arr.length)

function smoothed_z_score(y, params) {
    var p = params || {}
    // init cooefficients
    const lag = p.lag || 5
    const threshold = p.threshold || 3.5
    const influence = p.influece || 0.5

    if (y === undefined || y.length < lag + 2) {
        throw ` ## y data array to short(${y.length}) for given lag of ${lag}`
    //console.log(`lag, threshold, influence: ${lag}, ${threshold}, ${influence}`)

    // init variables
    var signals = Array(y.length).fill(0)
    var filteredY = y.slice(0)
    const lead_in = y.slice(0, lag)
    //console.log("1: " + lead_in.toString())

    var avgFilter = []
    avgFilter[lag - 1] = mean(lead_in)
    var stdFilter = []
    stdFilter[lag - 1] = stddev(lead_in)
    //console.log("2: " + stdFilter.toString())

    for (var i = lag; i < y.length; i++) {
        //console.log(`${y[i]}, ${avgFilter[i-1]}, ${threshold}, ${stdFilter[i-1]}`)
        if (Math.abs(y[i] - avgFilter[i - 1]) > (threshold * stdFilter[i - 1])) {
            if (y[i] > avgFilter[i - 1]) {
                signals[i] = +1 // positive signal
            } else {
                signals[i] = -1 // negative signal
            // make influence lower
            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1]
        } else {
            signals[i] = 0 // no signal
            filteredY[i] = y[i]

        // adjust the filters
        const y_lag = filteredY.slice(i - lag, i)
        avgFilter[i] = mean(y_lag)
        stdFilter[i] = stddev(y_lag)

    return signals

module.exports = smoothed_z_score
Se o valor limite ou outros critérios dependem de valores futuros, a única solução (sem uma máquina do tempo ou outro conhecimento de valores futuros) é adiar qualquer decisão até que se tenha valores futuros suficientes. Se você deseja um nível acima de uma média que mede, por exemplo, 20 pontos, terá que esperar até que tenha pelo menos 19 pontos à frente de qualquer decisão de pico, ou então o próximo novo ponto poderá reduzir completamente seu limiar há 19 pontos .

Seu gráfico atual não tem picos ... a menos que você saiba de antemão que o próximo ponto não é 1e99, que depois de redimensionar a dimensão Y do gráfico, ficaria nivelado até esse ponto.

Como eu disse antes, podemos assumir que, se ocorrer um pico, ele é tão grande quanto os picos da imagem e se desvia significativamente dos valores 'normais'.
Se você sabe quão grandes os picos serão antecipadamente, pré-defina sua média e / ou limite para um valor abaixo desse valor.
E é exatamente isso que não sei de antemão.
31414 Jean-Paul
Você acabou de se contradizer e escreveu que os picos são conhecidos pelo tamanho da imagem. Ou você sabe disso ou não.
Estou tentando explicar para você. Você entendeu a idéia agora? 'Como identificar picos significativamente grandes'. Você pode abordar o problema estatisticamente ou com um algoritmo inteligente. Com .. As large as in the pictureeu quis dizer: para situações semelhantes, onde há picos significativos e ruído básico.

E aqui vem a implementação PHP do algo ZSCORE:

$y = array(1,7,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,

function mean($data, $start, $len) {
    $avg = 0;
    for ($i = $start; $i < $start+ $len; $i ++)
        $avg += $data[$i];
    return $avg / $len;

function stddev($data, $start,$len) {
    $mean = mean($data,$start,$len);
    $dev = 0;
    for ($i = $start; $i < $start+$len; $i++) 
        $dev += (($data[$i] - $mean) * ($data[$i] - $mean));
    return sqrt($dev / $len);

function zscore($data, $len, $lag= 20, $threshold = 1, $influence = 1) {

    $signals = array();
    $avgFilter = array();
    $stdFilter = array();
    $filteredY = array();
    $avgFilter[$lag - 1] = mean($data, 0, $lag);
    $stdFilter[$lag - 1] = stddev($data, 0, $lag);

    for ($i = 0; $i < $len; $i++) {
        $filteredY[$i] = $data[$i];
        $signals[$i] = 0;

    for ($i=$lag; $i < $len; $i++) {
        if (abs($data[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$lag - 1]) {
            if ($data[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            else {
                $signals[$i] = -1;
            $filteredY[$i] = $influence * $data[$i] + (1 - $influence) * $filteredY[$i-1];
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $data[$i];

        $avgFilter[$i] = mean($filteredY, $i - $lag, $lag);
        $stdFilter[$i] = stddev($filteredY, $i - $lag, $lag);
    return $signals;

$sig = zscore($y, count($y));

print_r($y); echo "<br><br>";
print_r($sig); echo "<br><br>";

for ($i = 0; $i < count($y); $i++) echo $i. " " . $y[$i]. " ". $sig[$i]."<br>";

Em vez de comparar um máximo com a média, também é possível comparar o máximo com o mínimo adjacente, onde os mínimos são definidos apenas acima de um limite de ruído. Se o máximo local for> 3 vezes (ou outro fator de confiança) ou mínimos adjacentes, esse máximo será um pico. A determinação do pico é mais precisa com janelas móveis mais amplas. A descrição acima usa um cálculo centralizado no meio da janela, a propósito, em vez de um cálculo no final da janela (== lag).

Observe que um máximo deve ser visto como um aumento no sinal antes e uma diminuição depois.


A função scipy.signal.find_peaks, como o próprio nome sugere, é útil para isso. Mas é importante entender bem seus parâmetros widthe threshold, distance acima de tudo,prominence obter uma boa extração de pico.

De acordo com meus testes e a documentação, o conceito de destaque é "o conceito útil" para manter os bons picos e descartar os barulhentos.

O que é destaque (topográfico) ? É "a altura mínima necessária para descer do topo até qualquer terreno mais alto" , como pode ser visto aqui:

A ideia é:

Quanto maior a proeminência, mais "importante" é o pico.


Versão orientada a objeto do algoritmo z-score usando mordern C +++

template<typename T>
class FindPeaks{
    std::vector<T> m_input_signal;                      // stores input vector
    std::vector<T> m_array_peak_positive;               
    std::vector<T> m_array_peak_negative;               

    FindPeaks(const std::vector<T>& t_input_signal): m_input_signal{t_input_signal}{ }

    void estimate(){
        int lag{5};
        T threshold{ 5 };                                                                                       // set a threshold
        T influence{ 0.5 };                                                                                    // value between 0 to 1, 1 is normal influence and 0.5 is half the influence

        std::vector<T> filtered_signal(m_input_signal.size(), 0.0);                                             // placeholdered for smooth signal, initialie with all zeros
        std::vector<int> signal(m_input_signal.size(), 0);                                                          // vector that stores where the negative and positive located
        std::vector<T> avg_filtered(m_input_signal.size(), 0.0);                                                // moving averages
        std::vector<T> std_filtered(m_input_signal.size(), 0.0);                                                // moving standard deviation

        avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag);                         // pass the iteartor to vector
        std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);

        for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) {                                         // start index frm 
            if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) {     // check if value is above threhold             
                if ((m_input_signal[iLag]) > avg_filtered[iLag - 1]) {
                    signal[iLag] = 1;                                                                               // assign positive signal
                else {
                    signal[iLag] = -1;                                                                                  // assign negative signal
                filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1];        // exponential smoothing
            else {
                signal[iLag] = 0;                                                                                         // no signal
                filtered_signal[iLag] = m_input_signal[iLag];

            avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
            std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);


        for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
            if (signal[iSignal] == 1) {
                m_array_peak_positive.emplace_back(m_input_signal[iSignal]);                                        // store the positive peaks
            else if (signal[iSignal] == -1) {
                m_array_peak_negative.emplace_back(m_input_signal[iSignal]);                                         // store the negative peaks
        printVoltagePeaks(signal, m_input_signal);


    std::pair< std::vector<T>, std::vector<T> > get_peaks()
        return std::make_pair(m_array_peak_negative, m_array_peak_negative);


template<typename T1, typename T2 >
void printVoltagePeaks(std::vector<T1>& m_signal, std::vector<T2>& m_input_signal) {
    std::ofstream output_file("./voltage_peak.csv");
    std::ostream_iterator<T2> output_iterator_voltage(output_file, ",");
    std::ostream_iterator<T1> output_iterator_signal(output_file, ",");
    std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
    output_file << "\n";
    std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findMean(iterator_type it, iterator_type end)
    /* function that receives iterator to*/
    typename std::iterator_traits<iterator_type>::value_type sum{ 0.0 };
    int counter = 0;
    while (it != end) {
        sum += *(it++);
    return sum / counter;

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findStandardDeviation(iterator_type it, iterator_type end)
    auto mean = findMean(it, end);
    typename std::iterator_traits<iterator_type>::value_type sum_squared_error{ 0.0 };
    int counter{ 0 };
    while (it != end) {
        sum_squared_error += std::pow((*(it++) - mean), 2);
    auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
    return standard_deviation;
Boa tradução. Seria um pouco melhor se o objeto também salva os filtered_signal, signal, avg_filterede std_filteredvariáveis como privadas e só atualiza essas matrizes de uma vez quando um novo datapoint chega (agora os loops de código sobre todos os pontos de dados cada vez que ele é chamado). Isso melhoraria o desempenho do seu código e se adequaria à estrutura OOP ainda melhor.