Não é possível selecionar <type 'instancemethod'> ao usar o multiprocessamento Pool.map ()

218

Estou tentando usar multiprocessinga Pool.map()função de dividir o trabalho simultaneamente. Quando uso o seguinte código, ele funciona bem:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

No entanto, quando eu o uso em uma abordagem mais orientada a objetos, ele não funciona. A mensagem de erro exibida é:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Isso ocorre quando o seguinte é o meu programa principal:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

e a seguinte é a minha someClassturma:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Alguém sabe qual poderia ser o problema ou uma maneira fácil de contornar isso?

ventolin
fonte
4
se f é uma função aninhada, há um erro semelhantePicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
ggg 09/02

Respostas:

122

O problema é que o multiprocessamento deve separar as coisas para separá-las entre os processos, e os métodos vinculados não são selecionáveis. A solução alternativa (se você considera "fácil" ou não ;-) é adicionar a infraestrutura ao seu programa para permitir a seleção de tais métodos, registrando-a no método de biblioteca padrão copy_reg .

Por exemplo, a contribuição de Steven Bethard para esse thread (no final do thread) mostra uma abordagem perfeitamente viável para permitir a seleção / remoção de pickp do método copy_reg.

Alex Martelli
fonte
Isso é ótimo, obrigado. Parece ter progredido de alguma maneira, de qualquer maneira: Usando o código em pastebin.ca/1693348 Agora recebo um RuntimeError: profundidade máxima de recursão excedida. Olhei em volta e uma postagem no fórum recomendou aumentar a profundidade máxima para 1500 (a partir do padrão 1000), mas não tive alegria por lá. Para ser sincero, não consigo ver qual parte (do meu código, pelo menos) pode estar se descontrolando, a menos que, por algum motivo, o código esteja decapando e descolando em um loop, devido a pequenas alterações que eu fiz para fazer Código de Steven OO'd?
ventolin
1
Seus _pickle_methodretornos self._unpickle_method, um método vinculado; então é claro que pickle agora tenta escolher ISSO - e faz o que você pediu: ligando _pickle_methodrecursivamente. Ou seja, ao OOinserir o código dessa maneira, você inevitavelmente introduziu uma recursão infinita. Sugiro voltar ao código de Steven (e não adorar no altar do OO quando não for apropriado: muitas coisas em Python são melhor executadas de maneira mais funcional, e essa é uma).
Alex Martelli
15
Para o super super preguiçoso , consulte a única resposta que deu ao trabalho de postar o código não-mutilado real ...
Cerin
2
Outra forma de correção / contornar o problema de decapagem está usando endro, veja a minha resposta stackoverflow.com/questions/8804830/...
rocksportrocker
74

Todas essas soluções são feias porque o multiprocessamento e a decapagem são interrompidos e limitados, a menos que você pule para fora da biblioteca padrão.

Se você usar uma bifurcação de multiprocessingchamada pathos.multiprocesssing, poderá usar diretamente classes e métodos de classe nas mapfunções do multiprocessamento . Isso ocorre porque dillé usado em vez de pickleou cPicklee dillpode serializar quase tudo em python.

pathos.multiprocessingtambém fornece uma função de mapa assíncrona ... e pode mapfuncionar com vários argumentos (por exemplo map(math.pow, [1,2,3], [4,5,6]))

Veja: O que o multiprocessamento e o endro podem fazer juntos?

e: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

E apenas para ser explícito, você pode fazer exatamente o que deseja fazer em primeiro lugar, e pode fazê-lo a partir do intérprete, se quiser.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Obtenha o código aqui: https://github.com/uqfoundation/pathos

Mike McKerns
fonte
3
Você pode atualizar esta resposta com base no pathos.pp porque o pathos.multiprocessing não existe mais?
Saheel Godhane
10
Eu sou o pathosautor. A versão a que você está se referindo tem vários anos. Experimente a versão no github, Você pode usar pathos.ppou github.com/uqfoundation/ppft .
precisa saber é o seguinte
1
ou github.com/uqfoundation/pathos . @SaheelGodhane: Uma nova versão está atrasada, mas deve sair em breve.
Mike McKerns
3
Primeiro pip install setuptoolsentão pip install git+https://github.com/uqfoundation/pathos.git@master. Isso obterá as dependências apropriadas. Uma nova versão está quase pronta ... agora quase tudo pathostambém roda no Windows e é 3.xcompatível.
Mike McKerns
1
@Rika: Sim. mapas de bloqueio, iterativo e assíncrono estão disponíveis.
Mike McKerns
35

Você também pode definir um __call__()método dentro do seu someClass(), que chama someClass.go()e depois passa uma instância someClass()para o pool. Este objeto é selecionável e funciona bem (para mim) ...

dorvak
fonte
3
Isso é muito mais fácil do que a técnica proposta por Alex Martelli, mas você está limitado a enviar apenas um método por classe para seu pool de multiprocessamento.
preterido
6
Um outro detalhe a ter em mente é que é apenas o objeto (instância de classe) que é captado, não a própria classe. Portanto, se você alterou algum atributo de classe a partir de seus valores padrão, essas alterações não serão propagadas para os diferentes processos. A solução alternativa é garantir que tudo que sua função precisa seja armazenado como um atributo de instância.
preterido
2
@dorvak você poderia mostrar um exemplo simples __call__()? Acho que a sua resposta pode ser a mais limpa - estou lutando para entender esse erro e, pela primeira vez, venho atender. A propósito, também esta resposta ajuda a esclarecer o que o multiprocessamento faz: [ stackoverflow.com/a/20789937/305883] #
user305883 14/16
1
Você pode dar um exemplo disso?
frmsaul
1
Há uma nova resposta postada (atualmente abaixo desta) com um código de exemplo para isso.
Aaron
22

Algumas limitações à solução de Steven Bethard:

Quando você registra seu método de classe como uma função, surpreendentemente o destruidor de sua classe é chamado toda vez que o processamento do método é concluído. Portanto, se você tiver uma instância da sua classe que chama n vezes seu método, os membros podem desaparecer entre duas execuções e você pode receber uma mensagem malloc: *** error for object 0x...: pointer being freed was not allocated(por exemplo, abrir arquivo de membro) ou pure virtual method called, terminate called without an active exception(o que significa que o tempo de vida de um objeto de membro que usei foi menor que o que eu pensei). Eu consegui isso ao lidar com n maior que o tamanho da piscina. Aqui está um pequeno exemplo:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Resultado:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

O __call__método não é tão equivalente, porque [Nenhum, ...] é lido a partir dos resultados:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Portanto, nenhum dos dois métodos é satisfatório ...

Eric H.
fonte
7
Você Nonevolta porque sua definição de __call__está faltando o return: deveria ser return self.process_obj(i).
Torek
1
@Eric eu estava recebendo o mesmo erro e eu tentei esta solução, no entanto eu comecei a receber novo erro como "cPickle.PicklingError: Não é possível pickle < 'função' type>: atributo de pesquisa embutido .function falhou". Você sabe o que pode ser uma provável razão por trás disso?
Naman
15

Há outro atalho que você pode usar, embora possa ser ineficiente, dependendo do que está em suas instâncias de classe.

Como todos disseram, o problema é que o multiprocessingcódigo precisa selecionar as coisas que envia para os subprocessos que foram iniciados, e o selecionador não usa métodos de instância.

No entanto, em vez de enviar o método da instância, você pode enviar a instância da classe real, mais o nome da função a ser chamada, para uma função comum que então usa getattrpara chamar o método da instância, criando o método vinculado no Poolsubprocesso. Isso é semelhante à definição de um __call__método, exceto que você pode chamar mais de uma função de membro.

Roubar o código do @ EricH. Da sua resposta e anotá-lo um pouco (redigitei-o, portanto, todo o nome muda e tal, por algum motivo isso parecia mais fácil do que copiar e colar :-)) para ilustrar toda a mágica:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

A saída mostra que, de fato, o construtor é chamado uma vez (no pid original) e o destruidor é chamado 9 vezes (uma vez para cada cópia feita = 2 ou 3 vezes por processo de trabalho conjunto, conforme necessário, mais uma vez no original processo). Isso geralmente é bom, como neste caso, já que o seletor padrão faz uma cópia de toda a instância e (semi-) a preenche secretamente - nesse caso, fazendo:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

- é por isso que, embora o destruidor seja chamado oito vezes nos três processos de trabalho, ele é reduzido de 1 a 0 a cada vez - mas é claro que você ainda pode ter problemas dessa maneira. Se necessário, você pode fornecer o seu próprio __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

neste caso, por exemplo.

torek
fonte
1
Esta é de longe a melhor resposta para este problema, porque é o mais fácil de aplicar para o comportamento padrão não-pickle capaz
Matt Taylor
12

Você também pode definir um __call__()método dentro do seu someClass(), que chama someClass.go()e depois passa uma instância someClass()para o pool. Este objeto é selecionável e funciona bem (para mim) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
parisjohn
fonte
3

A solução da parisjohn acima funciona bem comigo. Além disso, o código parece limpo e fácil de entender. No meu caso, existem algumas funções para chamar usando Pool, então modifiquei o código de parisjohn um pouco abaixo. Eu fiz call para poder chamar várias funções, e os nomes das funções são passados ​​no argumento dict de go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
neobot
fonte
1

Uma solução potencialmente trivial para isso é mudar para o uso multiprocessing.dummy . Esta é uma implementação baseada em thread da interface de multiprocessamento que parece não ter esse problema no Python 2.7. Não tenho muita experiência aqui, mas essa mudança rápida de importação me permitiu chamar apply_async em um método de classe.

Alguns bons recursos sobre multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

David Parks
fonte
1

Nesse caso simples, onde someClass.fnão está herdando nenhum dado da classe e não anexando nada à classe, uma solução possível seria separá- fla para que ela possa ser decapada:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
mhh
fonte
1

Por que não usar func separado?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0script0
fonte
1

Corri para o mesmo problema, mas descobri que existe um codificador JSON que pode ser usado para mover esses objetos entre processos.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Use isto para criar sua lista:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Em seguida, na função mapeada, use isso para recuperar o objeto:

pfVmomiObj = json.loads(jsonSerialized)
George
fonte
0

Atualização: no dia em que este artigo foi escrito, os nomeadosTuples são selecionáveis ​​(começando com python 2.7)

O problema aqui é que os processos filho não conseguem importar a classe do objeto - nesse caso, a classe P -, no caso de um projeto de modelo múltiplo, a classe P deve ser importável em qualquer lugar em que o processo filho seja usado

uma solução rápida é torná-lo importável, afetando-o para globais ()

globals()["P"] = P
rachid el kedmiri
fonte