Multiprocessamento: Como usar Pool.map em uma função definida em uma classe?

179

Quando executo algo como:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

Funciona bem. No entanto, colocando isso como uma função de uma classe:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Dá-me o seguinte erro:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Eu vi um post de Alex Martelli lidando com o mesmo tipo de problema, mas não era explícito o suficiente.

Mermoz
fonte
1
"isso como uma função de uma classe"? Você pode postar o código que realmente obtém o erro real. Sem o código real, podemos apenas adivinhar o que você está fazendo de errado.
31510 S.Lott
Como observação geral, existem módulos de decapagem mais poderosos que o módulo de decapagem padrão do Python (como o módulo picloud mencionado nesta resposta ).
precisa
1
Eu tive um problema semelhante com fechamentos IPython.Parallel, mas era possível contornar o problema empurrando os objetos para os nós. Parece muito chato contornar esse problema com o multiprocessamento.
Alex S
Aqui calculateé selecionável, então parece que isso pode ser resolvido por 1) criando um objeto de função com um construtor que copia sobre uma calculateinstância e 2) passando uma instância desse objeto de função para Poolo mapmétodo de. Não?
Rd11 17/07/2014
1
@ math Eu não acredito que nenhuma das "alterações recentes" do Python será de alguma ajuda. Algumas limitações do multiprocessingmódulo devem-se ao objetivo de ser uma implementação de plataforma cruzada e à falta de uma fork(2)chamada de sistema semelhante no Windows. Se você não se importa com o suporte ao Win32, pode haver uma solução alternativa baseada em processo mais simples. Ou, se você estiver preparado para usar threads em vez de processos, poderá substituí from multiprocessing import Pool-lo from multiprocessing.pool import ThreadPool as Pool.
Aya

Respostas:

69

Também fiquei irritado com as restrições sobre que tipo de funções o pool.map poderia aceitar. Eu escrevi o seguinte para contornar isso. Parece funcionar, mesmo para o uso recursivo do parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
mrule
fonte
1
Isso funcionou muito bem para mim, obrigado. Eu encontrei um ponto fraco: tentei usar o parmap em algumas funções que contornavam um padrão e obtivemos o PicklingError novamente. Eu não descobri uma solução para isso, apenas refiz meu código para não usar o padrão.
sans
2
Isso não funciona no Python 2.7.2 (padrão, 12 de junho de 2011, 15:08:59) [MSC v.1500 32 bits (Intel)] no win32
ubershmekel
3
Isso funciona no Python 2.7.3 Aug 1,2012, 05:14:39. Isso não funciona em iterables gigantes -> causa um erro de OSE: [Erro 24] Muitos arquivos abertos devido ao número de pipes abertos.
Eiyrioü de Kauyf
Essa solução gera um processo para cada item de trabalho. A solução do "klaus se" abaixo é mais eficiente.
ypnos 12/07/2013
85

Não pude usar os códigos publicados até agora porque os códigos que usam "multiprocessing.Pool" não funcionam com expressões lambda e os códigos que não usam "multiprocessing.Pool" geram tantos processos quanto itens de trabalho.

Adaptei o código ao gerar uma quantidade predefinida de trabalhadores e apenas percorre a lista de entrada se houver um trabalhador ocioso. Também habilitei o modo "daemon" para os trabalhadores que o st ctrl-c trabalha conforme o esperado.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
klaus se
fonte
2
Como você faria com que uma barra de progresso funcionasse corretamente com essa parmapfunção?
shockburner
2
Uma pergunta - usei essa solução, mas notei que os processos python que gerava permaneciam ativos na memória. Alguma reflexão rápida sobre como matá-los quando o seu parmap sair?
CompEcon
1
@ klaus-se Eu sei que estamos desencorajados de apenas agradecer nos comentários, mas sua resposta é muito valiosa para mim, eu não pude resistir. Eu gostaria de poder dar a você mais do que apenas uma reputação ...
deshtop 30/07/2015
2
@greole passando (None, None)como o último item indica funque atingiu o final da sequência de itens para cada processo.
precisa saber é o seguinte
4
@deshtop: você pode com uma recompensa, se você tem reputação suficiente a si mesmo :-)
Mark
57

O multiprocessamento e a decapagem são interrompidos e limitados, a menos que você pule para fora da biblioteca padrão.

Se você usar uma bifurcação de multiprocessingchamada pathos.multiprocesssing, poderá usar diretamente classes e métodos de classe nas mapfunções do multiprocessamento . Isso ocorre porque dillé usado em vez de pickleou cPicklee dillpode serializar quase tudo em python.

pathos.multiprocessingtambém fornece uma função de mapa assíncrona ... e pode mapfuncionar com vários argumentos (por exemplo map(math.pow, [1,2,3], [4,5,6]))

Veja as discussões: O que o multiprocessamento e o endro podem fazer juntos?

e: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Ele até lida com o código que você escreveu inicialmente, sem modificações, e com o intérprete. Por que qualquer outra coisa mais frágil e específica para um único caso?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Obtenha o código aqui: https://github.com/uqfoundation/pathos

E, apenas para mostrar um pouco mais do que ele pode fazer:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
Mike McKerns
fonte
1
O pathos.multiprocessing também possui um mapa assíncrono ( amap) que permite o uso de barras de progresso e outras programações assíncronas.
Mike McKerns
Eu gosto do pathos.multiprocessing, que pode servir quase uma substituição imediata de mapa não paralelo enquanto desfruta do multiprocessamento. Eu tenho um invólucro simples de pathos.multiprocessing.map, para que seja mais eficiente em termos de memória ao processar uma estrutura de dados grande somente leitura em vários núcleos, consulte este repositório git .
Fashandge
Parece interessante, mas não é instalado. Este é o pip mensagem dá:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
1
Sim. Não lanço há algum tempo, pois dividi a funcionalidade em pacotes separados e também converti em código compatível com 2/3. Muito do acima foi modularizado, multiprocesssendo compatível com 2/3. Consulte stackoverflow.com/questions/27873093/… e pypi.python.org/pypi/multiprocess .
Mike McKerns
3
@xApple: Assim como um acompanhamento, pathosteve uma nova versão estável e também é compatível com as versões 2.xe 3.x.
Mike McKerns
40

Atualmente, não há solução para o seu problema, até onde eu saiba: a função que você atribui map()deve estar acessível através da importação do seu módulo. É por isso que o código de robert funciona: a função f()pode ser obtida importando o seguinte código:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

Na verdade, adicionei uma seção "principal", porque segue as recomendações para a plataforma Windows ("Verifique se o módulo principal pode ser importado com segurança por um novo interpretador Python sem causar efeitos colaterais indesejados").

Também adicionei uma letra maiúscula na frente Calculatepara seguir o PEP 8 . :)

Eric O Lebigot
fonte
18

A solução por mrule está correta, mas possui um erro: se a criança enviar uma grande quantidade de dados, poderá preencher o buffer do pipe, bloqueando a criança pipe.send(), enquanto o pai estiver aguardando a saída da criança pipe.join(). A solução é ler os dados da criança antes de join()a criança. Além disso, a criança deve fechar a extremidade do cano dos pais para evitar um conflito. O código abaixo corrige isso. Lembre-se também de que isso parmapcria um processo por elemento no X. Uma solução mais avançada é usar multiprocessing.cpu_count()para dividir Xem vários pedaços e mesclar os resultados antes de retornar. Deixo isso como um exercício para o leitor, a fim de não estragar a concisão da bela resposta da mrule. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
Bob McElrath
fonte
Como você escolhe o número de processos?
precisa saber é o seguinte
No entanto, morre muito rapidamente por causa do erro OSError: [Errno 24] Too many open files. Eu acho que é necessário que haja algum tipo de limites sobre o número de processos para que ele funcione adequadamente ...
patapouf_ai
13

Eu também lutei com isso. Eu tinha funções como membros de dados de uma classe, como um exemplo simplificado:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

Eu precisava usar a função self.f em uma chamada Pool.map () de dentro da mesma classe e self.f não recebeu uma tupla como argumento. Como essa função foi incorporada em uma classe, não ficou claro para mim como escrever o tipo de invólucro sugerido por outras respostas.

Resolvi esse problema usando um wrapper diferente que usa uma tupla / lista, em que o primeiro elemento é a função e os elementos restantes são os argumentos dessa função, chamados eval_func_tuple (f_args). Usando isso, a linha problemática pode ser substituída por pool.map de retorno (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aqui está o código completo:

Arquivo: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

Arquivo: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Rodar main.py dará [11, 22, 33]. Fique à vontade para melhorar isso, por exemplo, eval_func_tuple também pode ser modificado para receber argumentos de palavras-chave.

Em outra nota, em outras respostas, a função "parmap" pode ser mais eficiente para o caso de mais Processos do que o número de CPUs disponíveis. Estou copiando uma versão editada abaixo. Este é o meu primeiro post e não tinha certeza se deveria editar diretamente a resposta original. Também renomei algumas variáveis.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         
Brandt
fonte
8

Peguei a resposta de klaus se e aganders3 e criei um módulo documentado que é mais legível e contém um arquivo. Você pode apenas adicioná-lo ao seu projeto. Ele ainda tem uma barra de progresso opcional!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDIT : adicionada sugestão @ alexander-mcfarlane e uma função de teste

xApple
fonte
um problema com sua barra de progresso ... A barra mede apenas quão ineficientemente a carga de trabalho foi dividida entre os processadores. Se a carga de trabalho estiver perfeitamente dividida, todos os processadores farão join()ao mesmo tempo e você terá apenas um flash 100%concluído no tqdmvisor. A única vez que ele será útil é se cada processador tem uma carga de trabalho tendenciosa
Alexander McFarlane
1
mover tqdm()para embrulhar a linha: result = [q_out.get() for _ in tqdm(sent)]e ele funciona muito melhor - grande esforço que realmente aprecio isso para +1
Alexander McFarlane
Obrigado por esse conselho, vou tentar e atualizar a resposta!
XApple
A resposta é atualizada e a barra de progresso funciona muito melhor!
xApple
8

Sei que isso foi perguntado há mais de 6 anos, mas só queria adicionar minha solução, pois algumas das sugestões acima parecem terrivelmente complicadas, mas minha solução era realmente muito simples.

Tudo o que eu precisava fazer era agrupar a chamada pool.map () para uma função auxiliar. Passando o objeto de classe junto com args para o método como uma tupla, que se parecia um pouco com isso.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
nightowl
fonte
7

As funções definidas nas classes (mesmo dentro das funções dentro das classes) realmente não funcionam. No entanto, isso funciona:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()
Robert
fonte
15
obrigado, mas acho um pouco sujo definir a função fora da classe. A classe deve agrupar tudo o que precisa para realizar uma determinada tarefa.
21810 Mermoz #
3
@Memoz: "A classe deve agrupar tudo o que precisa" Sério? Não consigo encontrar muitos exemplos disso. A maioria das classes depende de outras classes ou funções. Por que chamar uma dependência de classe de "suja"? O que há de errado com uma dependência?
S.Lott
Bem, a função não deve modificar dados de classe existentes - porque modificaria a versão no outro processo - para que pudesse ser um método estático. Você pode escolher um método estático: stackoverflow.com/questions/1914261/… Ou, para algo tão trivial, você pode usar um lambda.
22710 Robert
6

Sei que essa pergunta foi feita há 8 anos e 10 meses, mas quero apresentar a minha solução:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Você só precisa transformar sua classe em um método estático. Mas também é possível com um método de classe:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Testado em Python 3.7.3

TornaxO7
fonte
3

Modifiquei o método do klaus se porque, embora estivesse trabalhando para mim com pequenas listas, ele seria interrompido quando o número de itens fosse ~ 1000 ou superior. Em vez de enviar os trabalhos um de cada vez com a Nonecondição de parada, carrego a fila de entrada de uma só vez e deixo os processos consumir até ficar vazio.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Edit: infelizmente agora estou encontrando este erro no meu sistema: O limite de tamanho máximo da fila de multiprocessamento é 32767 , espero que as soluções alternativas lá ajudem.

aganders3
fonte
1

Você pode executar seu código sem problemas se, de alguma forma, ignorar manualmente o Poolobjeto da lista de objetos da classe, porque ele não é picklecapaz, como diz o erro. Você pode fazer isso com a __getstate__função (veja aqui também) como segue. O Poolobjeto vai tentar encontrar os __getstate__e __setstate__funções e executá-los se o considerar quando você executar map, map_asyncetc:

class calculate(object):
    def __init__(self):
        self.p = Pool()
    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['p']
        return self_dict
    def __setstate__(self, state):
        self.__dict__.update(state)

    def f(self, x):
        return x*x
    def run(self):
        return self.p.map(self.f, [1,2,3])

Então faça:

cl = calculate()
cl.run()

lhe dará a saída:

[1, 4, 9]

Eu testei o código acima no Python 3.xe funciona.

Amir
fonte
0

Não tenho certeza se essa abordagem foi adotada, mas uma solução alternativa que estou usando é:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

A saída deve ser:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
CpILL
fonte
0
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

Existe a possibilidade de você querer aplicar esta função para cada instância diferente da classe. Então aqui está a solução para isso também

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)
ShikharDua
fonte
0

Aqui está a minha solução, que eu acho que é um pouco menos burra do que a maioria das outras aqui. É semelhante à resposta do nightowl.

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)
Erlend Aune
fonte
0

De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 e http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

Podemos criar uma função externa e propagá-la com o auto-objeto da classe:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

OU sem joblib:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()
Bob Baxley
fonte
0

Esta pode não ser uma solução muito boa, mas no meu caso, eu a resolvo assim.

from multiprocessing import Pool

def foo1(data):
    self = data.get('slf')
    lst = data.get('lst')
    return sum(lst) + self.foo2()

class Foo(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def foo2(self):
        return self.a**self.b   

    def foo(self):
        p = Pool(5)
        lst = [1, 2, 3]
        result = p.map(foo1, (dict(slf=self, lst=lst),))
        return result

if __name__ == '__main__':
    print(Foo(2, 4).foo())

Eu tive que passar selfpara a minha função, pois tenho que acessar atributos e funções da minha classe através dessa função. Isso está funcionando para mim. Correções e sugestões são sempre bem-vindas.

Muhammad Hassan
fonte
0

Aqui está um boilerplate que escrevi para usar o pool de multiprocessamento em python3, especificamente python3.7.7 foi usado para executar os testes. Eu consegui minhas corridas mais rápidas usando imap_unordered. Basta conectar seu cenário e experimentá-lo. Você pode usar timeitou apenas time.time()descobrir o que funciona melhor para você.

import multiprocessing
import time

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk {a_chunk}")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

s = time.time()
if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")

No cenário acima, imap_unorderedna verdade, parece ter o pior desempenho para mim. Experimente o seu caso e compare-o na máquina em que planeja executá-lo. Leia também sobre Process Pools . Felicidades!

Radtek
fonte