Python Process Pool não demoníaco?

96

Seria possível criar um pool python que não seja demoníaco? Quero que um pool possa chamar uma função que tenha outro pool dentro.

Eu quero isso porque os processos deamon não podem criar processos. Especificamente, isso causará o erro:

AssertionError: daemonic processes are not allowed to have children

Por exemplo, considere o cenário em que function_ahá uma piscina que funciona function_be uma piscina que funciona function_c. Esta cadeia de funções falhará porque function_bestá sendo executado em um processo daemon e os processos daemon não podem criar processos.

Max
fonte
AFAIK, não, não é possível que todos os workers do pool estejam daemonizados e não seja possível injetar a dependência , BTW, não entendi a segunda parte da sua pergunta I want a pool to be able to call a function that has another pool insidee como isso interfere no fato dos workers serem daemonizados.
mouad de
4
Porque se a função a tem um pool que executa a função b que tem um pool que executa a função c, há um problema em b que ela está sendo executada em um processo daemon e os processos daemon não podem criar processos. AssertionError: daemonic processes are not allowed to have children
Máx.

Respostas:

118

A multiprocessing.pool.Poolclasse cria os processos de trabalho em seu __init__método, torna-os demoníacos e os inicia, e não é possível reconfigurar seus daemonatributos para Falseantes de serem iniciados (e depois disso não é mais permitido). Mas você pode criar sua própria subclasse de multiprocesing.pool.Pool( multiprocessing.Poolé apenas uma função de invólucro) e substituir sua própria multiprocessing.Processsubclasse, que é sempre não demoníaca, para ser usada para os processos de trabalho.

Aqui está um exemplo completo de como fazer isso. As partes importantes são as duas classes NoDaemonProcesse MyPoolno topo e para chamar pool.close()e pool.join()em sua MyPoolinstância no final.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
Chris Arndt
fonte
1
Acabei de testar meu código novamente com Python 2.7 / 3.2 (depois de corrigir as linhas "imprimir") no Linux e Python 2.6 / 2.7 / 3.2 OS X. Linux e Python 2.7 / 3.2 no OS X funcionam bem, mas o código realmente permanece Python 2.6 no OS X (Lion). Este parece ser um bug no módulo de multiprocessamento, que foi corrigido, mas eu não verifiquei o rastreador de bug.
Chris Arndt
1
Obrigado! No Windows, você também precisa ligarmultiprocessing.freeze_support()
frmdstryr
2
Bom trabalho. Se alguém está tendo vazamento de memória com isso, tente usar "com fechamento (MyPool (processos = num_cpu)) como pool:" para descartar o pool corretamente
Chris Lucian
31
Quais são as desvantagens de usar em MyPoolvez do padrão Pool? Em outras palavras, em troca da flexibilidade de iniciar processos filho, quais custos devo pagar? (Se não houvesse custos, presumivelmente o padrão Poolteria usado processos não demoníacos).
máx.
4
@machen Sim, infelizmente é verdade. No Python 3.6, a Poolclasse foi amplamente refatorada, então Processnão é mais um atributo simples, mas um método, que retorna a instância do processo que obtém de um contexto . Tentei sobrescrever este método para retornar uma NoDaemonPoolinstância, mas isso resultou na exceção AssertionError: daemonic processes are not allowed to have childrenquando o Pool é usado.
Chris Arndt
26

Tive a necessidade de empregar um pool não demoníaco no Python 3.7 e acabei adaptando o código postado na resposta aceita. Abaixo está o snippet que cria o pool não demoníaco:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

Como a implementação atual de multiprocessingfoi amplamente refatorada para ser baseada em contextos, precisamos fornecer uma NoDaemonContextclasse que tenha nosso NoDaemonProcessatributo as. MyPoolirá então usar esse contexto em vez do padrão.

Dito isso, devo alertar que há pelo menos 2 ressalvas para essa abordagem:

  1. Ele ainda depende dos detalhes de implementação do multiprocessingpacote e, portanto, pode falhar a qualquer momento.
  2. Existem razões válidas para multiprocessingtornar tão difícil o uso de processos não demoníacos, muitos dos quais são explicados aqui . O mais atraente na minha opinião é:

    Quanto a permitir que threads filhos gerem seus próprios filhos, usar o subprocesso corre o risco de criar um pequeno exército de 'netos' zumbis se as threads pai ou filho terminarem antes que o subprocesso seja concluído e retorne.

Massimiliano
fonte
Em relação a ressalva: Meu caso de uso é parallelising tarefas, mas os netos retornar informações para seus pais que, em informações por sua vez volta a seus pais depois de fazer algum processamento local necessário. Conseqüentemente, cada nível / ramificação tem uma espera explícita para todas as suas folhas. A advertência ainda se aplica se você tiver que esperar explicitamente a conclusão dos processos gerados?
A_A
Obtendo o erro AttributeError: module 'multiprocessing' has no attribute 'pool'no Python 3.8.0
Nyxynyx
@Nyxynyx Não se esqueçaimport multiprocessing.pool
Chris Arndt
22

O módulo de multiprocessamento tem uma interface agradável para usar pools com processos ou threads. Dependendo do seu caso de uso atual, você pode considerar o uso multiprocessing.pool.ThreadPoolpara seu Pool externo, o que resultará em threads (que permitem gerar processos de dentro) em vez de processos.

Pode ser limitado pelo GIL, mas no meu caso particular (eu testei ambos) , o tempo de inicialização dos processos externos, Poolconforme criado aqui, superou em muito a solução com ThreadPool.


É realmente fácil de trocar Processespara Threads. Leia mais sobre como usar uma ThreadPoolsolução aqui ou aqui .

timmwagener
fonte
Obrigado - isso me ajudou muito - ótimo uso de threading aqui (para gerar processos que realmente funcionam bem)
trance_dude
1
Para pessoas que procuram uma solução prática que provavelmente se aplica à sua situação, é esta.
abanana
6

Em algumas versões do Python substituindo Piscina padrão para personalizado pode levantar erro: AssertionError: group argument must be None for now.

Aqui encontrei uma solução que pode ajudar:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
Atterratio
fonte
4

concurrent.futures.ProcessPoolExecutornão tem essa limitação. Ele pode ter um pool de processos aninhados sem nenhum problema:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

O código de demonstração acima foi testado com Python 3.8.

Crédito: resposta por jfs

Acumenus
fonte
1
Esta agora é claramente a melhor solução, pois requer alterações mínimas.
DreamFlasher
1
funciona perfeitamente! ... como nota lateral, usar uma criança multiprocessing.Pooldentro de um ProcessPoolExecutor.Pooltambém é possível!
raphael
3

O problema que encontrei foi ao tentar importar globais entre módulos, fazendo com que a linha ProcessPool () fosse avaliada várias vezes.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Em seguida, importe com segurança de outro lugar em seu código

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         
James McGuigan
fonte
2

Tenho visto pessoas lidando com esse problema usando celeryo fork do multiprocessingchamado billiard (extensões de pool de multiprocessamento), que permite que processos demoníacos gerem filhos. A solução é simplesmente substituir o multiprocessingmódulo por:

import billiard as multiprocessing
Tomasz Bartkowiak
fonte