O teclado interrompe o pool de multiprocessamento do python

136

Como posso lidar com eventos KeyboardInterrupt com os pools de multiprocessamento do python? Aqui está um exemplo simples:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Ao executar o código acima, ele KeyboardInterrupté gerado quando pressiono ^C, mas o processo simplesmente trava nesse momento e eu tenho que eliminá-lo externamente.

Quero poder pressionar ^Ca qualquer momento e fazer com que todos os processos sejam encerrados normalmente.

Fragsworth
fonte
Eu resolvi meu problema usando psutil, você pode ver a solução aqui: stackoverflow.com/questions/32160054/...
Tiago Albineli Motta

Respostas:

137

Este é um bug do Python. Ao aguardar uma condição em threading.Condition.wait (), KeyboardInterrupt nunca é enviado. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

A exceção KeyboardInterrupt não será entregue até wait () retornar e nunca retornar, portanto a interrupção nunca acontece. KeyboardInterrupt quase certamente deve interromper uma espera de condição.

Observe que isso não acontece se um tempo limite for especificado; cond.wait (1) receberá a interrupção imediatamente. Portanto, uma solução alternativa é especificar um tempo limite. Para fazer isso, substitua

    results = pool.map(slowly_square, range(40))

com

    results = pool.map_async(slowly_square, range(40)).get(9999999)

ou similar.

Glenn Maynard
fonte
3
Esse bug está no rastreador oficial do python em algum lugar? Estou tendo problemas para encontrá-lo, mas provavelmente não estou usando os melhores termos de pesquisa.
31810 Joseph Garvin
18
Este bug foi arquivado como [Problema 8296] [1]. [1]: bugs.python.org/issue8296
Andrey Vlasovskikh
1
Aqui está um hack que corrige pool.imap () da mesma maneira, tornando possível o Ctrl-C ao interagir com o imap. Capture a exceção e chame pool.terminate () e seu programa será encerrado. gist.github.com/626518
Alexander Ljungberg
6
Isso não resolve as coisas. Às vezes, recebo o comportamento esperado quando pressiono Control + C, outras vezes não. Não sei por que, mas parece que talvez o KeyboardInterrupt seja recebido por um dos processos aleatoriamente, e só recebo o comportamento correto se o processo pai for o que o captura.
Ryan C. Thompson
6
Isso não funciona para mim com o Python 3.6.1 no Windows. Recebo toneladas de rastreamentos de pilha e outro lixo quando faço o Ctrl-C, ou seja, o mesmo que sem essa solução alternativa. Na verdade nenhuma das soluções que eu tentei de esta discussão parecem funcionar ...
szx
56

Pelo que descobri recentemente, a melhor solução é configurar os processos de trabalho para ignorar completamente o SIGINT e restringir todo o código de limpeza ao processo pai. Isso corrige o problema para os processos de trabalho ocioso e ocupado e não requer nenhum código de manipulação de erros nos processos filhos.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

A explicação e o código de exemplo completo podem ser encontrados em http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ e http://github.com/jreese/multiprocessing-keyboardinterrupt, respectivamente.

John Reese
fonte
4
Olá John. Sua solução não realiza a mesma coisa que a minha, sim, infelizmente, complicada. Esconde-se atrás do time.sleep(10)processo principal. Se você remover esse sono, ou se você esperar que o processo tente ingressar na piscina, o que você deve fazer para garantir que os trabalhos sejam concluídos, você ainda sofrerá com o mesmo problema que o processo principal não faz. recebe o KeyboardInterrupt enquanto espera em uma joinoperação de votação .
bboe
No caso em que usei esse código na produção, o time.sleep () fazia parte de um loop que verificaria o status de cada processo filho e, em seguida, reiniciaria certos processos com atraso, se necessário. Em vez de join () que aguardaria a conclusão de todos os processos, ele os verificaria individualmente, garantindo que o processo mestre permanecesse responsivo.
John Reese
2
Portanto, foi mais uma espera ocupada (talvez com pequenos intervalos entre verificações) que pesquisou a conclusão do processo por outro método, em vez de ingressar? Se for esse o caso, talvez seja melhor incluir esse código em sua postagem no blog, pois você pode garantir que todos os trabalhadores tenham concluído antes de tentar ingressar.
bboe
4
Isso não funciona. Somente as crianças recebem o sinal. Os pais nunca a recebem, portanto pool.terminate()nunca são executados. Ter as crianças ignorando o sinal não realiza nada. A resposta de @ Glenn resolve o problema.
Cerin
1
Minha versão disso está em gist.github.com/admackin/003dd646e5fadee8b8d6 ; ele não chama, .join()exceto na interrupção - simplesmente verifica manualmente o resultado do .apply_async()uso AsyncResult.ready()para ver se está pronto, o que significa que terminamos corretamente.
Andy MacKinlay
29

Por alguns motivos, apenas exceções herdadas da Exceptionclasse base são tratadas normalmente. Como solução alternativa, você pode aumentar novamente o seu KeyboardInterruptcomo uma Exceptioninstância:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalmente você obteria a seguinte saída:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Então, se você acertar ^C, receberá:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Andrey Vlasovskikh
fonte
2
Parece que esta não é uma solução completa. Se um KeyboardInterruptchegar enquanto multiprocessingestiver realizando sua própria troca de dados IPC, try..catchele não será ativado (obviamente).
Andrey Vlasovskikh 03/04/10
Você pode substituir raise KeyboardInterruptErrorpor um return. Você só precisa garantir que o processo filho termine assim que o KeyboardInterrupt for recebido. O valor de retorno parece ser ignorado, mainainda o KeyboardInterrupt é recebido.
Bernhard
8

Geralmente essa estrutura simples funciona para Ctrl- Cno Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Como foi afirmado em alguns posts semelhantes:

Capture a interrupção do teclado no Python sem tentar, exceto

igco
fonte
1
Isso também teria que ser feito em cada um dos processos de trabalho e ainda poderá falhar se o KeyboardInterrupt for gerado enquanto a biblioteca de multiprocessamento estiver inicializando.
21813 MarioVilas
7

A resposta votada não aborda a questão central, mas um efeito colateral semelhante.

Jesse Noller, o autor da biblioteca de multiprocessamento, explica como lidar corretamente com CTRL + C ao usar multiprocessing.Poolem uma postagem de blog antiga .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
fonte
Descobri que ProcessPoolExecutor também tem o mesmo problema. A única correção que eu era capaz de encontrar era para chamar os.setpgrp()de dentro do futuro
portforwardpodcast
1
Claro, a única diferença é que ProcessPoolExecutornão suporta as funções do inicializador. No Unix, você pode alavancar a forkestratégia desativando o manipulador de sinais no processo principal antes de criar o pool e reativá-lo posteriormente. No seixo , silencio SIGINTnos processos filho por padrão. Não estou ciente do motivo pelo qual eles não fazem o mesmo com os Python Pools. No final, o usuário pode redefinir o SIGINTmanipulador caso ele / ela queira se machucar.
noxdafox 12/09
Essa solução parece impedir que o Ctrl-C interrompa também o processo principal.
Paul Price
1
Acabei de testar no Python 3.5 e funciona, qual versão do Python você está usando? Qual SO?
Noxdafox
5

Parece que há dois problemas que fazem exceções, enquanto o multiprocessamento é irritante. A primeira (observada por Glenn) é que você precisa usar map_asynccom um tempo limite em vez de mapobter uma resposta imediata (ou seja, não termine de processar a lista inteira). O segundo (observado por Andrey) é que o multiprocessamento não captura exceções que não são herdadas de Exception(por exemplo, SystemExit). Então, aqui está minha solução que lida com os dois:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Paul Price
fonte
1
Não notei nenhuma penalidade de desempenho, mas, no meu caso, a vida functioné bastante longa (centenas de segundos).
Paul Price,
Na verdade, isso não é mais o caso, pelo menos dos meus olhos e experiência. Se você capturar a exceção do teclado nos processos filhos individuais e capturá-la mais uma vez no processo principal, poderá continuar usando mape tudo estará bem. @Linux Cli Aikforneceu uma solução abaixo que produz esse comportamento. O uso map_asyncnem sempre é desejado se o thread principal depender dos resultados dos processos filho.
Code Doggo
4

Descobri, por enquanto, a melhor solução é não usar o recurso multiprocessing.pool, mas sim rolar sua própria funcionalidade de pool. Forneci um exemplo demonstrando o erro com apply_async, bem como um exemplo mostrando como evitar o uso completo da funcionalidade do pool.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
fonte
Funciona como um encanto. É uma solução limpa e não algum tipo de hack (/ me pensa) .btw, o truque com .get (99999), conforme proposto por outros, prejudica muito o desempenho.
21313 Walter Walter
Eu não notei nenhuma penalidade de desempenho ao usar um tempo limite, embora eu esteja usando 9999 em vez de 999999. A exceção é quando uma exceção que não herda da classe Exception é gerada: então é necessário aguardar até que o tempo limite seja acertar. A solução para isso é capturar todas as exceções (veja minha solução).
Paul Price
1

Eu sou um novato em Python. Eu estava procurando em todos os lugares por respostas e encontrei este e alguns outros blogs e vídeos do youtube. Eu tentei copiar, colar o código do autor acima e reproduzi-lo no meu python 2.7.13 no windows 7 de 64 bits. É perto do que eu quero alcançar.

Fiz meus processos filhos ignorar o ControlC e fazer com que o processo pai fosse encerrado. Parece que ignorar o processo filho evita esse problema para mim.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

A parte iniciada em pool.terminate()nunca parece ser executada.

Linux Cli Aik
fonte
Eu só descobri isso também! Sinceramente, acho que essa é a melhor solução para um problema como esse. A solução aceita força map_asynco usuário, o que eu particularmente não gosto. Em muitas situações, como a minha, o thread principal precisa aguardar a conclusão dos processos individuais. Esta é uma das razões pelas quais mapexiste!
Code Doggo 20/06/19
1

Você pode tentar usar o método apply_async de um objeto Pool, assim:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Resultado:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Uma vantagem desse método é que os resultados processados ​​antes da interrupção serão retornados no dicionário de resultados:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
fonte
Exemplo glorioso e completo
eMTy
-5

Estranhamente, parece que você precisa lidar KeyboardInterruptcom as crianças também. Eu esperava que isso funcionasse como está escrito ... tente mudar slowly_squarepara:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Isso deve funcionar como você esperava.

D.Shawley
fonte
1
Eu tentei isso e, na verdade, ele não finaliza todo o conjunto de tarefas. Ele finaliza os trabalhos em execução no momento, mas o script ainda atribui os trabalhos restantes na chamada pool.map como se tudo estivesse normal.
Fragsworth
isso está bom, mas você pode perder o controle dos erros que ocorrem. retornar o erro com um rastreamento de pilha pode funcionar para que o processo pai possa dizer que ocorreu um erro, mas ainda não sai imediatamente quando o erro ocorre.
mehtunguh