Python threading.timer - função de repetição a cada 'n' segundos

94

Quero disparar uma função a cada 0,5 segundos e ser capaz de iniciar e parar e zerar o cronômetro. Não tenho muito conhecimento de como os threads do Python funcionam e estou tendo dificuldades com o cronômetro do Python.

No entanto, continuo recebendo RuntimeError: threads can only be started oncequando executo threading.timer.start()duas vezes. Existe uma solução alternativa para isso? Tentei me inscrever threading.timer.cancel()antes de cada início.

Pseudo-código:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()
user1431282
fonte

Respostas:

112

A melhor maneira é iniciar o encadeamento do cronômetro uma vez. Dentro do thread do cronômetro, você codificaria o seguinte

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

No código que iniciou o cronômetro, você pode então seto evento interrompido para interromper o cronômetro.

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
Hans então
fonte
4
Então ele vai terminar o sono e parar depois. Não há como suspender à força um thread em python. Esta é uma decisão de design feita pelos desenvolvedores de Python. No entanto, o resultado líquido será o mesmo. Seu thread ainda funcionará (hiberne) por um curto período, mas não executará sua função.
Hans Então
13
Bem, na verdade, se você quiser interromper o encadeamento do cronômetro imediatamente, use um threading.Evente em waitvez de sleep. Então, para acordá-lo, basta definir o evento. Você nem mesmo precisa do self.stoppedthen porque apenas verifica a sinalização do evento.
nneonneo,
3
O evento seria usado estritamente para interromper o encadeamento do cronômetro. Normalmente, o event.waitiria apenas expirar e agir como um adormecido, mas se você quisesse parar (ou interromper de outra forma o thread), você configuraria o evento do thread e ele seria ativado imediatamente.
nneonneo
2
Eu atualizei minha resposta para usar event.wait (). Obrigado pelas sugestões.
Hans Então
1
só uma pergunta, como posso reiniciar o tópico depois disso? chamando thread.start()me dáthreads can only be started once
Motassem MK
32

De Equivalent de setInterval em python :

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

Uso:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

Ou aqui está a mesma funcionalidade, mas como uma função autônoma em vez de um decorador :

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

Veja como fazer isso sem usar threads .

jfs
fonte
como você mudaria o intervalo ao usar um decorador? Digamos que eu queira mudar .5s em tempo de execução para 1 segundo ou algo assim?
lightxx
@lightxx: apenas use @setInterval(1).
jfs
hm. então ou sou um pouco lento ou você me entendeu mal. Eu quis dizer em tempo de execução. Eu sei que posso alterar o decorador no código-fonte a qualquer momento. o que, por exemplo, eu tinha três funções, cada uma decorada com um @setInterval (n). agora, em tempo de execução, quero alterar o intervalo da função 2, mas deixar as funções 1 e 3 sozinhas.
lightxx
@lightxx: você pode usar uma interface diferente, por exemplo stop = repeat(every=second, call=your_function); ...; stop(),.
jfs
31

Usando threads de cronômetro-

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

isso pode ser interrompido por t.cancel()

swapnil jariwala
fonte
3
Acredito que esse código tenha um bug no cancelmétodo. Quando é chamado, o encadeamento 1) não está em execução ou 2) está em execução. Em 1) estamos esperando para executar a função, então cancelar funcionará bem. em 2) estamos executando atualmente, então cancelar não terá efeito na execução atual. além disso, a execução atual é reprogramada para não ter efeito no futuro.
Rich Episcopo
1
Este código cria um novo encadeamento sempre que o cronômetro diminui. Este é um desperdício colossal em comparação com a resposta aceita.
Adrian W,
Esta solução deve ser evitada, pelo motivo mencionado acima: ela cria um novo thread a cada vez
Pynchia
17

Aprimorando um pouco a resposta de Hans Then , podemos apenas criar uma subclasse da função Timer. O seguinte torna-se o nosso código de "cronômetro de repetição" inteiro e pode ser usado como um substituto para threading.Timer com todos os mesmos argumentos:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

Exemplo de uso:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

produz a seguinte saída:

foo
foo
foo
foo

e

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

produz

bar
bar
bar
bar
right2clicky
fonte
Essa abordagem me permitirá iniciar / cancelar / iniciar / cancelar o encadeamento do cronômetro?
Paul Knopf,
1
Não. Embora essa abordagem permita que você faça qualquer coisa que faria com um cronômetro comum, você não pode fazer isso com um cronômetro comum. Uma vez que start / cancel está relacionado ao thread subjacente, se você tentar .start () um thread que foi previamente .cancel () 'ed, então você obterá uma exceção RuntimeError: threads can only be started once,.
right2clicky
Solução realmente elegante! Estranho que eles não incluíram apenas uma classe que faz isso.
Roger Dahl de
esta solução é muito impressionante, mas eu me esforcei para entender como ela foi projetada simplesmente lendo a documentação da interface do Python3 threading Timer . A resposta parece basear-se no conhecimento da implementação, entrando no threading.pypróprio módulo.
Adam.at.Epsilon
14

No interesse de fornecer uma resposta correta usando o Timer conforme solicitado pelo OP, aprimorarei a resposta de swapnil jariwala :

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
Bill Schumacher
fonte
3

Eu mudei algum código no código swapnil-jariwala para fazer um pequeno relógio de console.

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

RESULTADO

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

Temporizador com interface gráfica tkinter

Este código coloca o cronômetro do relógio em uma pequena janela com tkinter

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

Um exemplo de jogo de flashcards (mais ou menos)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("\n{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

resultado:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...
Giovanni G. PY
fonte
se hFunction está bloqueando, isso não atrasaria os horários de início subsequentes? Talvez você pudesse trocar as linhas de modo que handle_function inicie o cronômetro primeiro e depois chame hFunction?
Bigode
2

Eu tive que fazer isso para um projeto. O que acabei fazendo foi iniciar um thread separado para a função

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

**** batimento cardíaco é minha função, trabalhador é um dos meus argumentos ****

dentro da minha função de batimento cardíaco:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

Portanto, quando eu inicio o thread, a função espera repetidamente 5 segundos, executa todo o meu código e faz isso indefinidamente. Se você quiser encerrar o processo, basta encerrar o thread.

Andrew Wilkins
fonte
1

Implementei uma classe que funciona como um cronômetro.

Deixo o link aqui caso alguém precise: https://github.com/ivanhalencp/python/tree/master/xTimer

Ivan Coppes
fonte
3
Embora isso possa teoricamente responder à pergunta, seria preferível incluir as partes essenciais da resposta aqui e fornecer o link para referência.
Karl Richter
1
from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

Aqui está uma pequena amostra, que ajudará a entender melhor como funciona. function taskManager () no final cria uma chamada de função atrasada para ela mesma.

Tente mudar a variável "dalay" e você será capaz de ver a diferença

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False
ch3ll0v3k
fonte
1
você também pode explicar um pouco mais por que isso funciona?
minocha de
seu exemplo foi um pouco confuso, o primeiro bloco de código foi tudo o que você precisava dizer.
Partack
1

Eu gosto da resposta do right2clicky, especialmente porque não requer que um Thread seja derrubado e um novo criado toda vez que o Timer for ativado. Além disso, é uma substituição fácil criar uma classe com um retorno de chamada do cronômetro que é chamado periodicamente. Este é meu caso de uso normal:

class MyClass(RepeatTimer):
    def __init__(self, period):
        super().__init__(period, self.on_timer)

    def on_timer(self):
        print("Tick")


if __name__ == "__main__":
    mc = MyClass(1)
    mc.start()
    time.sleep(5)
    mc.cancel()
Frank p
fonte
1

Esta é uma implementação alternativa usando função em vez de classe. Inspirado por @Andrew Wilkins acima.

Porque esperar é mais preciso do que dormir (leva em consideração o tempo de execução da função):

import threading

PING_ON = threading.Event()

def ping():
  while not PING_ON.wait(1):
    print("my thread %s" % str(threading.current_thread().ident))

t = threading.Thread(target=ping)
t.start()

sleep(5)
PING_ON.set()
Paul Kenjora
fonte
1

Eu vim com outra solução com a classe SingleTon. Por favor, diga-me se houver algum vazamento de memória aqui.

import time,threading

class Singleton:
  __instance = None
  sleepTime = 1
  executeThread = False

  def __init__(self):
     if Singleton.__instance != None:
        raise Exception("This class is a singleton!")
     else:
        Singleton.__instance = self

  @staticmethod
  def getInstance():
     if Singleton.__instance == None:
        Singleton()
     return Singleton.__instance


  def startThread(self):
     self.executeThread = True
     self.threadNew = threading.Thread(target=self.foo_target)
     self.threadNew.start()
     print('doing other things...')


  def stopThread(self):
     print("Killing Thread ")
     self.executeThread = False
     self.threadNew.join()
     print(self.threadNew)


  def foo(self):
     print("Hello in " + str(self.sleepTime) + " seconds")


  def foo_target(self):
     while self.executeThread:
        self.foo()
        print(self.threadNew)
        time.sleep(self.sleepTime)

        if not self.executeThread:
           break


sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()

sClass.getInstance().sleepTime = 2
sClass.startThread()
Yunus
fonte