Tempo limite em uma chamada de função

300

Estou chamando uma função em Python que sei que pode parar e me forçar a reiniciar o script.

Como chamo a função ou em que a envolvo para que, se demorar mais de 5 segundos, o script a cancele e faça outra coisa?

Teifion
fonte

Respostas:

227

Você pode usar o pacote de sinal se estiver executando no UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 segundos após a chamada alarm.alarm(10), o manipulador é chamado. Isso gera uma exceção que você pode interceptar a partir do código Python comum.

Este módulo não funciona bem com threads (mas quem é?)

Observe que, como criamos uma exceção quando o tempo limite ocorre, ele pode ser capturado e ignorado dentro da função, por exemplo, em uma dessas funções:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
piro
fonte
5
Eu uso o Python 2.5.4. Existe um erro: Traceback (última chamada mais recente): arquivo "aa.py", linha 85, em func signal.signal (signal.SIGALRM, manipulador) AttributeError: o objeto 'module' não tem atributo 'SIGALRM'
flypen
11
@flypen é porque signal.alarme os relacionados SIGALRMnão estão disponíveis nas plataformas Windows.
Double AA
2
Se houver muitos processos e cada chamada signal.signal--- todos eles funcionarão corretamente? Cada signal.signalchamada não cancela uma "simultânea"?
brownian
1
Aviso para aqueles que desejam usar isso com uma extensão C: O manipulador de sinal Python não será chamado até que a função C retorne o controle ao intérprete Python. Para este caso de uso, use de ATOzTOA resposta: stackoverflow.com/a/14924210/1286628
wkschwartz
13
Eu segundo o aviso sobre tópicos. signal.alarm funciona apenas no thread principal. Eu tentei usar isso nas visualizações do Django - falha imediata com palavreado apenas sobre o thread principal.
JL Peyret #
154

Você pode usar multiprocessing.Processpara fazer exatamente isso.

Código

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
ATOzTOA
fonte
36
Como posso obter o valor de retorno do método de destino?
bad_keypoints
4
Isso não parece funcionar se a função chamada ficar presa em um bloco de E / S.
sudo
4
@bad_keypoints Veja esta resposta: stackoverflow.com/a/10415215/1384471 Basicamente, você passa uma lista na qual coloca a resposta.
Peter
1
@sudo e remova o join(). que faz com que o número x de subprocessos simultâneos esteja em execução até que eles terminem o trabalho ou a quantidade definida em join(10). Caso você tenha uma E / S de bloqueio para 10 processos, usando o join (10), você os configurou para aguardar todos eles no máximo 10 por EACH processo iniciado. Use o sinalizador daemon como este exemplo stackoverflow.com/a/27420072/2480481 . Claro que você pode passar a flag daemon=Truediretamente para multiprocessing.Process()funcionar.
M3nda
2
@ATOzTOA O problema com esta solução, pelo menos para os meus propósitos, é que ela potencialmente não permite que as crianças pisem depois de si mesmas. De documentação de terminar a funçãoterminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
abalcerek
78

Como chamo a função ou em que a envolvo para que, se demorar mais de 5 segundos, o script a cancele?

Eu publiquei uma essência que resolve esta questão / problema com um decorador e um threading.Timer. Aqui está com um colapso.

Importações e configurações para compatibilidade

Foi testado com Python 2 e 3. Também deve funcionar em Unix / Linux e Windows.

Primeiro as importações. Eles tentam manter o código consistente, independentemente da versão do Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Use código independente da versão:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Agora importamos nossa funcionalidade da biblioteca padrão.

exit_after decorador

Em seguida, precisamos de uma função para finalizar a main()partir do thread filho:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

E aqui está o próprio decorador:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Uso

E aqui está o uso que responde diretamente à sua pergunta sobre como sair após 5 segundos !:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

A segunda chamada de função não será concluída. Em vez disso, o processo deve sair com um retorno!

KeyboardInterrupt nem sempre para um fio adormecido

Observe que o sono nem sempre será interrompido por uma interrupção do teclado, no Python 2 no Windows, por exemplo:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

nem é provável que interrompa o código em execução nas extensões, a menos que verifique explicitamente PyErr_CheckSignals(), consulte Cython, Python e KeyboardInterrupt ignorados

Eu evitaria dormir um fio por mais de um segundo, em qualquer caso - é uma eternidade no tempo do processador.

Como chamo a função ou em que a envolvo para que, se demorar mais de 5 segundos, o script a cancele e faça outra coisa?

Para capturá-lo e fazer outra coisa, você pode capturar o KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Aaron Hall
fonte
Ainda não li todo o seu post, mas me perguntei: e se o flush for 0? Isso seria interpretado como falso na declaração if abaixo, certo?
Koenraad van Duin
2
Por que preciso ligar thread.interrupt_main(), por que não consigo gerar uma exceção diretamente?
Anirban Nag 'tintinmj' 30/07/19
Alguma idéia de encerrar multiprocessing.connection.Clientcom isso? - Tentando resolver: stackoverflow.com/questions/57817955/…
wwii
51

Eu tenho uma proposta diferente, que é uma função pura (com a mesma API da sugestão de encadeamento) e parece funcionar bem (com base nas sugestões deste tópico)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
fonte
3
Você também deve restaurar o manipulador de sinal original. Veja stackoverflow.com/questions/492519/…
Martin Konecny
9
Mais uma observação: O método de sinal Unix funciona apenas se você o estiver aplicando na thread principal. A aplicação em um sub-thread lança uma exceção e não funcionará.
Martin Konecny
12
Esta não é a melhor solução, pois funciona apenas no Linux.
max
17
Max, not true - funciona em qualquer unix compatível com POSIX. Acho que seu comentário deve ser mais preciso, não funciona no Windows.
Chris Johnson
6
Você deve evitar definir kwargs para um ditado vazio. Uma pegadinha comum do Python é que os argumentos padrão das funções são mutáveis. Para que esse dicionário seja compartilhado em todas as chamadas para timeout. É muito melhor definir o padrão como Nonee, na primeira linha da função, adicionar kwargs = kwargs or {}. Args está bem porque as tuplas não são mutáveis.
scottmrogowski
32

Passei por esse segmento ao procurar uma chamada de tempo limite em testes de unidade. Não encontrei nada simples nas respostas ou nos pacotes de terceiros; por isso, escrevi o decorador abaixo. Você pode entrar no código:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Então, é tão simples como esgotar o tempo limite de um teste ou qualquer função que você desejar:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Rico
fonte
14
Tenha cuidado, pois isso não encerra a função após o tempo limite ser atingido!
22416 Sylvain
Observe que, no Windows, isso gera um processo totalmente novo - que pode demorar até o tempo limite, talvez muito, se as dependências demorarem muito para serem configuradas.
Aaron Hall
1
Sim, isso precisa de alguns ajustes. Deixa os fios indo para sempre.
Sudo #
2
IDK, se esta for a melhor maneira, mas você pode tentar / capturar Exceptiondentro de func_wrapper e fazer pool.close()após a captura para garantir que o encadeamento sempre morra depois, não importa o que aconteça . Então você pode jogar TimeoutErrorou o que quiser depois. Parece funcionar para mim.
Sudo #
2
Isso é útil, mas depois de ter feito isso muitas vezes, entendo RuntimeError: can't start new thread. Ainda funcionará se eu ignorá-lo ou há algo mais que eu possa fazer para contornar isso? Desde já, obrigado!
21417 Benjie
20

O stopitpacote, encontrado no pypi, parece lidar bem com os tempos limite.

Eu gosto do @stopit.threading_timeoutabledecorador, que adiciona um timeoutparâmetro à função decorada, que faz o que você espera, para a função.

Confira no pypi: https://pypi.python.org/pypi/stopit

egeland
fonte
1
É muito útil e seguro para threads! Obrigado e mais um! Esta é a melhor opção que encontrei até agora e ainda melhor do que a resposta aceita !!
Yahia
Biblioteca afirma que algumas funcionalidades não funcionam no Windows.
Stefan Simik 03/06/19
16

Existem muitas sugestões, mas nenhuma usa concurrent.futures, que eu acho que é a maneira mais legível de lidar com isso.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super simples de ler e manter.

Fazemos um pool, enviamos um único processo e esperamos até 5 segundos antes de gerar um TimeoutError que você pode capturar e manipular da maneira que precisar.

Nativo para python 3.2+ e portado para 2.7 (futuros de instalação do pip).

A alternância entre fios e processos é tão simples como a substituição ProcessPoolExecutorcom ThreadPoolExecutor.

Se você deseja encerrar o processo com o tempo limite, sugiro pesquisar no Pebble .

Brian
fonte
2
O que significa "Aviso: isso não encerra a função se o tempo limite" significa?
Scott Stafford
5
@ScottStafford Os processos / threads não terminam apenas porque um TimeoutError foi gerado. Portanto, o processo ou o encadeamento ainda tentará executar até a conclusão e não retornará automaticamente o controle no tempo limite.
Brian
Isso me permite salvar resultados intermediários na época? por exemplo, se eu tenho uma função recursiva que defino o tempo limite como 5 e nesse período tenho resultados parciais, como escrevo a função para retornar os resultados parciais no tempo limite?
SumNeuron 16/03/19
Estou usando exatamente isso, no entanto, tenho 1000 tarefas, cada uma é permitida 5 segundos antes do tempo limite. Meu problema é que os núcleos ficam entupidos em tarefas que nunca terminam porque o tempo limite é aplicado apenas no total de tarefas e não em tarefas individuais. concurrent.futures não fornece uma solução para este auge.
Bastiaan
12

Excelente, fácil de usar e confiável projeto de timeout do PyPi ( https://pypi.org/project/timeout-decorator/ )

instalação :

pip install timeout-decorator

Uso :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Gil
fonte
2
Agradeço a solução clara. Mas alguém poderia explicar como essa biblioteca funciona, especialmente ao lidar com multithreading. Pessoalmente, receio usar um machanismo desconhecido para lidar com threads ou sinais.
Wsysuper # 25/18
@wsysuper o lib tem 2 modos de operação: novo tópico aberto ou um novo subprocesso (que supõem para ser thread-safe)
Gil
isso funcionou muito bem para mim!
Florian Heigl
6

Eu sou o autor de wrapt_timeout_decorator

A maioria das soluções apresentadas aqui funciona de maneira soberba no Linux à primeira vista - porque temos fork () e sinais () -, mas no Windows as coisas parecem um pouco diferentes. E quando se trata de sub-threads no Linux, você não pode mais usar sinais.

Para gerar um processo no Windows, ele precisa ser selecionável - e muitas funções decoradas ou métodos de classe não são.

Portanto, você precisa usar um seletor melhor, como endro e multiprocesso (não pickle e multiprocessamento) - é por isso que você não pode usar ProcessPoolExecutor (ou apenas com funcionalidade limitada).

Para o tempo limite em si - Você precisa definir o que o tempo limite significa - porque no Windows levará um tempo considerável (e não determinável) para gerar o processo. Isso pode ser complicado em tempos curtos. Vamos supor, a geração do processo leva cerca de 0,5 segundos (facilmente !!!). Se você der um tempo limite de 0,2 segundos, o que deve acontecer? A função deve expirar após 0,5 + 0,2 segundos (deixe o método funcionar por 0,2 segundos)? Ou o processo chamado expirará após 0,2 segundos (nesse caso, a função decorada SEMPRE excederá o tempo limite, porque nesse momento nem sequer é gerada)?

Os decoradores aninhados também podem ser desagradáveis ​​e você não pode usar sinais em um sub-rosca. Se você deseja criar um decorador de plataforma cruzada verdadeiramente universal, tudo isso precisa ser levado em consideração (e testado).

Outros problemas estão passando exceções de volta para o chamador, bem como problemas de logon (se usado na função decorada - o log de arquivos em outro processo NÃO é suportado)

Tentei cobrir todos os casos extremos. Você pode examinar o pacote wrapt_timeout_decorator ou, pelo menos, testar suas próprias soluções inspiradas nos unittests usados ​​lá.

@ Alexis Eggermont - infelizmente não tenho pontos suficientes para comentar - talvez alguém possa notificá-lo - acho que resolvi seu problema de importação.

bitranox
fonte
3

timeout-decoratornão funciona no sistema windows, pois o windows não suportava signalbem.

Se você usar o decorador de tempo limite no sistema Windows, obterá o seguinte

AttributeError: module 'signal' has no attribute 'SIGALRM'

Alguns sugeriram usar, use_signals=Falsemas não funcionaram para mim.

O autor @bitranox criou o seguinte pacote:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Exemplo de código:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Dá a seguinte exceção:

TimeoutError: Function mytest timed out after 5 seconds
Até parece
fonte
Isso soa como uma solução muito boa. Estranhamente, a linha from wrapt_timeout_decorator import * parece matar algumas das minhas outras importações. Por exemplo eu recebo ModuleNotFoundError: No module named 'google.appengine', mas eu não receber este erro se eu fizer wrapt_timeout_decorator não importação
Alexis Eggermont
@AlexisEggermont Eu estava prestes a usar isso com o appengine ... por isso estou muito curioso se esse erro persistir?
PascalVKooten
2

Podemos usar sinais para o mesmo. Eu acho que o exemplo abaixo será útil para você. É muito simples comparado aos threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
AR
fonte
1
Seria melhor escolher uma exceção específica e capturar apenas ela. Nua try: ... except: ...é sempre uma má ideia.
Hivert
Eu concordo com você hivert.
AR
Embora eu entenda o motivo, como sysadmin / integrator eu discordo - o código python é notório por negligenciar o tratamento de erros, e lidar com o que você espera não é bom o suficiente para um software de qualidade. você pode lidar com as 5 coisas que planeja E uma estratégia genérica para outras coisas. "Traceback, None" não é uma estratégia, é um insulto.
Florian Heigl
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Hal Canary
fonte
7
Embora esse código pode responder à pergunta, fornecendo contexto adicional a respeito de como e / ou porque ele resolve o problema iria melhorar a resposta é valor a longo prazo
Dan Cornilescu
1

Eu tinha uma necessidade de nestable interrupções programadas (que SIGALARM não pode fazer) que não vai ficar bloqueado por time.sleep (que a abordagem baseada em threads não pode fazer). Acabei copiando e modificando levemente o código a partir daqui: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

O próprio código:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

e um exemplo de uso:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
James
fonte
Isso também usa sinal, portanto, não funcionará se chamado de um thread.
precisa saber é o seguinte
0

Aqui está uma pequena melhoria na solução baseada em encadeamento fornecida.

O código abaixo suporta exceções :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invocando-o com um tempo limite de 5 segundos:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
diemacht
fonte
1
Isso criará uma nova exceção, ocultando o retorno original. Veja minha versão abaixo ...
Meitham
1
Isso também é inseguro, como se dentro de runFunctionCatchExceptions()certas funções Python, a obtenção de GIL fosse chamada. Por exemplo, o seguinte nunca, ou por muito tempo, o retorno se for chamado dentro da função: eval(2**9999999999**9999999999). Veja stackoverflow.com/questions/22138190/…
Mikko Ohtamaa