Multiprocessamento Python pool.map para vários argumentos

536

Na biblioteca de multiprocessamento Python, existe uma variante de pool.map que suporta vários argumentos?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
user642897
fonte
4
Para minha surpresa, eu não poderia fazer partialnem lambdafazer isso. Eu acho que tem a ver com a maneira estranha como as funções são passadas para os subprocessos (via pickle).
Senderle
10
@senderle: Este é um bug no Python 2.6, mas foi corrigido a partir de 2,7: bugs.python.org/issue5228
unutbu
1
Basta substituir pool.map(harvester(text,case),case, 1) por: pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
3
@Syrtis_Major, não edite perguntas do OP que distorcem efetivamente as respostas que foram fornecidas anteriormente. Adicionando returnà harvester()resposta do @senderie transformada em imprecisa. Isso não ajuda futuros leitores.
Ricalsin 29/01
1
Eu diria que a solução mais fácil seria empacotar todos os argumentos em uma tupla e descompactá-los na função em execução. Fiz isso quando precisei enviar vários argumentos complicados para uma função sendo executada por um conjunto de processos.
HS Rathore

Respostas:

358

A resposta para isso é dependente da versão e da situação. A resposta mais geral para versões recentes do Python (desde 3.3) foi descrita pela primeira vez abaixo por JF Sebastian . 1 Ele usa o Pool.starmapmétodo, que aceita uma sequência de tuplas de argumento. Em seguida, descompacta automaticamente os argumentos de cada tupla e os passa para a função fornecida:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Para versões anteriores do Python, você precisará escrever uma função auxiliar para descompactar os argumentos explicitamente. Se você deseja usar with, também precisará escrever um wrapper para se transformar Poolem um gerenciador de contexto. (Obrigado ao muon por apontar isso.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Em casos mais simples, com um segundo argumento fixo, você também pode usar partial, mas apenas no Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Muito disso foi inspirado em sua resposta, que provavelmente deveria ter sido aceita. Mas como este está preso no topo, parecia melhor melhorá-lo para futuros leitores.

remetente
fonte
Parece-me que RAW_DATASET neste caso deve ser uma variável global? Enquanto eu quero que o column_harvester parcial altere o valor de maiúsculas e minúsculas em todas as chamadas de harvester (). Como conseguir isso?
Xgdgsc
A coisa mais importante aqui é atribuir =RAW_DATASETvalor padrão a case. Caso contrário, pool.mapirá confundir os vários argumentos.
Emerson Xu
1
Estou confuso, o que aconteceu com a textvariável no seu exemplo? Por que RAW_DATASETaparentemente é passado duas vezes. Eu acho que você pode ter um erro de digitação?
Dave
não tenho certeza por que usar with .. as .. me dá AttributeError: __exit__, mas funciona bem se eu apenas chamar pool = Pool();em seguida, feche manualmente pool.close()(python2.7)
múon
1
@ Muu, boa captura. Parece que os Poolobjetos não se tornam gerenciadores de contexto até o Python 3.3. Eu adicionei uma função de wrapper simples que retorna um Poolgerenciador de contexto.
Senderle #
501

existe uma variante do pool.map que suporta vários argumentos?

Python 3.3 inclui o pool.starmap()método :

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Para versões mais antigas:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Resultado

1 1
2 1
3 1

Observe como itertools.izip()e itertools.repeat()são usados ​​aqui.

Devido ao bug mencionado por @unutbu, você não pode usar functools.partial()recursos similares no Python 2.6, portanto, a função simples do wrapper func_star()deve ser definida explicitamente. Consulte também a solução alternativa sugerida poruptimebox .

jfs
fonte
1
F .: Você pode descompactar o tuple argumento na assinatura da func_starseguinte forma: def func_star((a, b)). Obviamente, isso funciona apenas para um número fixo de argumentos, mas se esse é o único caso que ele tem, é mais legível.
Björn Pollex 26/03
1
@ Space_C0wb0y: a f((a,b))sintaxe foi descontinuada e removida no py3k. E é desnecessário aqui.
JFS
talvez mais Python: func = lambda x: func(*x)em vez de definir uma função wrapper
dylam
1
@ zthomas.nc esta questão é sobre como suportar vários argumentos para o multiprocessamento pool.map. Se quiser saber como chamar um método em vez de uma função em um processo Python diferente via multiprocessamento, faça uma pergunta separada (se tudo mais falhar, você sempre poderá criar uma função global que agrupe a chamada de método semelhante à func_star()acima)
jfs
1
Eu gostaria que houvesse starstarmap.
Jun
141

Eu acho que o abaixo será melhor

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

resultado

[3, 5, 7]
imotai
fonte
16
Solução mais fácil. Há uma pequena otimização; remova a função wrapper e descompacte-a argsdiretamente add, ela funciona para qualquer número de argumentos:def add(args): (x,y) = args
Ahmed
1
você também pode usar uma lambdafunção em vez de definirmulti_run_wrapper(..)
Andre Holzner
2
hm ... na verdade, usando um lambdanão funciona porque pool.map(..)tenta pickle a função dada
Andre Holzner
Como você usa isso se deseja armazenar o resultado de adduma lista?
Vivek Subramanian
@ Ahmed Eu gosto de como é, porque IMHO a chamada do método deve falhar, sempre que o número de parâmetro não estiver correto.
Michael Dorner
56

Usando Python 3.3+ compool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Resultado:

1 --- 4
2 --- 5
3 --- 6

Você também pode compactar () mais argumentos se desejar: zip(a,b,c,d,e)

Caso você queira que um valor constante seja passado como argumento, você deve usar import itertoolse, zip(itertools.repeat(constant), a)em seguida, por exemplo.

user136036
fonte
2
Esta é uma resposta duplicada quase exata como a de @JFSebastian em 2011 (com mais de 60 votos).
Mike McKerns
29
Não. Antes de tudo, ele removeu muitas coisas desnecessárias e afirma claramente que é para o python 3.3+ e se destina a iniciantes que buscam uma resposta simples e limpa. Como iniciante, levei algum tempo para entender dessa maneira (sim, com as postagens do JFSebastians) e foi por isso que escrevi minha postagem para ajudar outros iniciantes, porque a postagem dele simplesmente dizia "existe um mapa de estrelas", mas não explicava - isso é o que meu post pretende. Portanto, não há absolutamente nenhuma razão para me criticar com dois votos negativos.
user136036
Em 2011, não havia "+" no python 3.3 + ... então, obviamente.
Mike McKerns
27

Tendo aprendido sobre itertools na resposta do JF Sebastian , decidi dar um passo adiante e escrever um parmappacote que cuida da paralelização, oferta mape starmapfunções no python-2.7 e python-3.2 (e mais tarde também) que podem receber qualquer número de argumentos posicionais .

Instalação

pip install parmap

Como paralelizar:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

Fiz upload do parmap no PyPI e em um repositório do github .

Como exemplo, a pergunta pode ser respondida da seguinte maneira:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
zeehio
fonte
20

# "Como receber vários argumentos".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
Dane Lee
fonte
2
Limpo e elegante.
Prav001 8/08/19
1
Não entendo por que tenho que rolar até aqui para encontrar a melhor resposta.
toti 27/04
12

Existe uma bifurcação do multiprocessingchamado pathos ( nota: use a versão no github ) que não precisa starmap- as funções map refletem a API do mapa do python, assim o mapa pode receber vários argumentos. Com pathos, geralmente você também pode fazer multiprocessamento no intérprete, em vez de ficar preso no __main__bloco. O Pathos deve ser lançado, após algumas atualizações leves - principalmente a conversão para o python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathostem várias maneiras pelas quais você pode obter o comportamento exato de starmap.

>>> def add(*x):
...   return sum(x)
... 
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>> 
Mike McKerns
fonte
Quero observar que isso não aborda a estrutura na pergunta original. [[1,2,3], [4,5,6]] descompactaria com o mapa estelar para [pow (1,2,3), pow (4,5,6)], não [pow (1,4) , pow (2,5), pow (3, 6)]. Se você não tiver um bom controle sobre as entradas que estão sendo passadas para sua função, pode ser necessário reestruturá-las primeiro.
Scott
@ Scott: ah, eu não percebi isso ... mais de 5 anos atrás. Vou fazer uma pequena atualização. Obrigado.
Mike McKerns
8

Você pode usar as duas funções a seguir para evitar escrever um invólucro para cada nova função:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Use a função functioncom as listas de argumentos arg_0, arg_1e arg_2como segue:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
M. Toya
fonte
8

Uma solução melhor para python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

Fora[]:

[3, 5, 7]

xmduhan
fonte
7

Outra alternativa simples é agrupar os parâmetros de função em uma tupla e, em seguida, agrupar os parâmetros que também devem ser passados ​​nas tuplas. Talvez isso não seja o ideal quando se lida com grandes quantidades de dados. Eu acredito que faria cópias para cada tupla.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Fornece a saída em alguma ordem aleatória:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Alex Klibisz
fonte
De fato, ainda está procurando uma maneira melhor :(
Fábio Dias
6

Uma maneira melhor é usar o decorador em vez de escrever a função do invólucro manualmente. Especialmente quando você tem muitas funções para mapear, o decorador economiza seu tempo evitando escrever wrapper para todas as funções. Normalmente, uma função decorada não é selecionável, no entanto, podemos usá-la functoolspara contorná-la. Mais disscusões podem ser encontradas aqui .

Aqui o exemplo

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Então você pode mapeá-lo com argumentos compactados

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Obviamente, você sempre pode usar Pool.starmapno Python 3 (> = 3.3), como mencionado em outras respostas.

Syrtis Major
fonte
Os resultados não são os esperados: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Eu esperaria: [0,1,2,3,4,5,6,7,8, 9,1,2,3,4,5,6,7,8,9,10,2,3,4,5,6,7,8,9,10,11, ...
Tedo Vrbanec
Os resultados de @TedoVrbanec devem ser apenas [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]. Se você quiser o último, você pode usar em itertools.productvez de zip.
Syrtis Major
4

Outra maneira é passar uma lista de listas para uma rotina de um argumento:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Pode-se construir uma lista de argumentos com o método favorito.

Adobe
fonte
Essa é uma maneira fácil, mas você precisa alterar suas funções originais. Além disso, algumas vezes lembram as funções de outras pessoas que podem não ser modificadas.
precisa saber é o seguinte
Eu vou dizer que isso segue o Python zen. Deve haver uma e apenas uma maneira óbvia de fazê-lo. Se por acaso você é o autor da função de chamada, você deve usar este método; em outros casos, podemos usar o método de imotai.
Nehem
Minha escolha é usar uma tupla e, em seguida, desembrulhá-las imediatamente como a primeira coisa na primeira linha.
Nehem
3

Aqui está outra maneira de fazê-lo: o IMHO é mais simples e elegante do que qualquer outra resposta fornecida.

Este programa possui uma função que aceita dois parâmetros, os imprime e também imprime a soma:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

saída é:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Veja os documentos python para mais informações:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

Em particular, verifique a starmapfunção.

Estou usando o Python 3.6, não tenho certeza se isso funcionará com versões mais antigas do Python

Por que não há um exemplo muito direto como esse nos documentos, não tenho certeza.

cdahms
fonte
2

No python 3.4.4, você pode usar multiprocessing.get_context () para obter um objeto de contexto para usar vários métodos de inicialização:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

Ou você simplesmente substitui

pool.map(harvester(text,case),case, 1)

de:

pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
fonte
2

Há muitas respostas aqui, mas nenhuma parece fornecer código compatível com Python 2/3 que funcione em qualquer versão. Se você deseja que seu código funcione , isso funcionará para a versão do Python:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

Depois disso, você pode usar o multiprocessamento da maneira regular do Python 3, como quiser. Por exemplo:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

funcionará em Python 2 ou Python 3.

cgnorthcutt
fonte
1

Na documentação oficial afirma que ele suporta apenas um argumento iterável. Eu gosto de usar o apply_async nesses casos. No seu caso, eu faria:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()
roj4s
fonte
1
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()
Jaime
fonte
1

Este é um exemplo da rotina que eu uso para passar vários argumentos para uma função de um argumento usada em um fork do pool.imap :

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()
A. Nodar
fonte
-3

para python2, você pode usar esse truque

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
Hz Shang
fonte
por que b = 233. derrota o objetivo da pergunta
como - se