Quando executo algo como:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
Funciona bem. No entanto, colocando isso como uma função de uma classe:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Dá-me o seguinte erro:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Eu vi um post de Alex Martelli lidando com o mesmo tipo de problema, mas não era explícito o suficiente.
python
multiprocessing
pickle
Mermoz
fonte
fonte
IPython.Parallel
, mas era possível contornar o problema empurrando os objetos para os nós. Parece muito chato contornar esse problema com o multiprocessamento.calculate
é selecionável, então parece que isso pode ser resolvido por 1) criando um objeto de função com um construtor que copia sobre umacalculate
instância e 2) passando uma instância desse objeto de função paraPool
omap
método de. Não?multiprocessing
módulo devem-se ao objetivo de ser uma implementação de plataforma cruzada e à falta de umafork(2)
chamada de sistema semelhante no Windows. Se você não se importa com o suporte ao Win32, pode haver uma solução alternativa baseada em processo mais simples. Ou, se você estiver preparado para usar threads em vez de processos, poderá substituífrom multiprocessing import Pool
-lofrom multiprocessing.pool import ThreadPool as Pool
.Respostas:
Também fiquei irritado com as restrições sobre que tipo de funções o pool.map poderia aceitar. Eu escrevi o seguinte para contornar isso. Parece funcionar, mesmo para o uso recursivo do parmap.
fonte
Não pude usar os códigos publicados até agora porque os códigos que usam "multiprocessing.Pool" não funcionam com expressões lambda e os códigos que não usam "multiprocessing.Pool" geram tantos processos quanto itens de trabalho.
Adaptei o código ao gerar uma quantidade predefinida de trabalhadores e apenas percorre a lista de entrada se houver um trabalhador ocioso. Também habilitei o modo "daemon" para os trabalhadores que o st ctrl-c trabalha conforme o esperado.
fonte
parmap
função?(None, None)
como o último item indicafun
que atingiu o final da sequência de itens para cada processo.O multiprocessamento e a decapagem são interrompidos e limitados, a menos que você pule para fora da biblioteca padrão.
Se você usar uma bifurcação de
multiprocessing
chamadapathos.multiprocesssing
, poderá usar diretamente classes e métodos de classe nasmap
funções do multiprocessamento . Isso ocorre porquedill
é usado em vez depickle
oucPickle
edill
pode serializar quase tudo em python.pathos.multiprocessing
também fornece uma função de mapa assíncrona ... e podemap
funcionar com vários argumentos (por exemplomap(math.pow, [1,2,3], [4,5,6])
)Veja as discussões: O que o multiprocessamento e o endro podem fazer juntos?
e: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Ele até lida com o código que você escreveu inicialmente, sem modificações, e com o intérprete. Por que qualquer outra coisa mais frágil e específica para um único caso?
Obtenha o código aqui: https://github.com/uqfoundation/pathos
E, apenas para mostrar um pouco mais do que ele pode fazer:
fonte
amap
) que permite o uso de barras de progresso e outras programações assíncronas.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
sendo compatível com 2/3. Consulte stackoverflow.com/questions/27873093/… e pypi.python.org/pypi/multiprocess .pathos
teve uma nova versão estável e também é compatível com as versões 2.xe 3.x.Atualmente, não há solução para o seu problema, até onde eu saiba: a função que você atribui
map()
deve estar acessível através da importação do seu módulo. É por isso que o código de robert funciona: a funçãof()
pode ser obtida importando o seguinte código:Na verdade, adicionei uma seção "principal", porque segue as recomendações para a plataforma Windows ("Verifique se o módulo principal pode ser importado com segurança por um novo interpretador Python sem causar efeitos colaterais indesejados").
Também adicionei uma letra maiúscula na frente
Calculate
para seguir o PEP 8 . :)fonte
A solução por mrule está correta, mas possui um erro: se a criança enviar uma grande quantidade de dados, poderá preencher o buffer do pipe, bloqueando a criança
pipe.send()
, enquanto o pai estiver aguardando a saída da criançapipe.join()
. A solução é ler os dados da criança antes dejoin()
a criança. Além disso, a criança deve fechar a extremidade do cano dos pais para evitar um conflito. O código abaixo corrige isso. Lembre-se também de que issoparmap
cria um processo por elemento noX
. Uma solução mais avançada é usarmultiprocessing.cpu_count()
para dividirX
em vários pedaços e mesclar os resultados antes de retornar. Deixo isso como um exercício para o leitor, a fim de não estragar a concisão da bela resposta da mrule. ;)fonte
OSError: [Errno 24] Too many open files
. Eu acho que é necessário que haja algum tipo de limites sobre o número de processos para que ele funcione adequadamente ...Eu também lutei com isso. Eu tinha funções como membros de dados de uma classe, como um exemplo simplificado:
Eu precisava usar a função self.f em uma chamada Pool.map () de dentro da mesma classe e self.f não recebeu uma tupla como argumento. Como essa função foi incorporada em uma classe, não ficou claro para mim como escrever o tipo de invólucro sugerido por outras respostas.
Resolvi esse problema usando um wrapper diferente que usa uma tupla / lista, em que o primeiro elemento é a função e os elementos restantes são os argumentos dessa função, chamados eval_func_tuple (f_args). Usando isso, a linha problemática pode ser substituída por pool.map de retorno (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aqui está o código completo:
Arquivo: util.py
Arquivo: main.py
Rodar main.py dará [11, 22, 33]. Fique à vontade para melhorar isso, por exemplo, eval_func_tuple também pode ser modificado para receber argumentos de palavras-chave.
Em outra nota, em outras respostas, a função "parmap" pode ser mais eficiente para o caso de mais Processos do que o número de CPUs disponíveis. Estou copiando uma versão editada abaixo. Este é o meu primeiro post e não tinha certeza se deveria editar diretamente a resposta original. Também renomei algumas variáveis.
fonte
Peguei a resposta de klaus se e aganders3 e criei um módulo documentado que é mais legível e contém um arquivo. Você pode apenas adicioná-lo ao seu projeto. Ele ainda tem uma barra de progresso opcional!
EDIT : adicionada sugestão @ alexander-mcfarlane e uma função de teste
fonte
join()
ao mesmo tempo e você terá apenas um flash100%
concluído notqdm
visor. A única vez que ele será útil é se cada processador tem uma carga de trabalho tendenciosatqdm()
para embrulhar a linha:result = [q_out.get() for _ in tqdm(sent)]
e ele funciona muito melhor - grande esforço que realmente aprecio isso para +1Sei que isso foi perguntado há mais de 6 anos, mas só queria adicionar minha solução, pois algumas das sugestões acima parecem terrivelmente complicadas, mas minha solução era realmente muito simples.
Tudo o que eu precisava fazer era agrupar a chamada pool.map () para uma função auxiliar. Passando o objeto de classe junto com args para o método como uma tupla, que se parecia um pouco com isso.
fonte
As funções definidas nas classes (mesmo dentro das funções dentro das classes) realmente não funcionam. No entanto, isso funciona:
fonte
Sei que essa pergunta foi feita há 8 anos e 10 meses, mas quero apresentar a minha solução:
Você só precisa transformar sua classe em um método estático. Mas também é possível com um método de classe:
Testado em Python 3.7.3
fonte
Modifiquei o método do klaus se porque, embora estivesse trabalhando para mim com pequenas listas, ele seria interrompido quando o número de itens fosse ~ 1000 ou superior. Em vez de enviar os trabalhos um de cada vez com a
None
condição de parada, carrego a fila de entrada de uma só vez e deixo os processos consumir até ficar vazio.Edit: infelizmente agora estou encontrando este erro no meu sistema: O limite de tamanho máximo da fila de multiprocessamento é 32767 , espero que as soluções alternativas lá ajudem.
fonte
Você pode executar seu código sem problemas se, de alguma forma, ignorar manualmente o
Pool
objeto da lista de objetos da classe, porque ele não épickle
capaz, como diz o erro. Você pode fazer isso com a__getstate__
função (veja aqui também) como segue. OPool
objeto vai tentar encontrar os__getstate__
e__setstate__
funções e executá-los se o considerar quando você executarmap
,map_async
etc:Então faça:
lhe dará a saída:
Eu testei o código acima no Python 3.xe funciona.
fonte
Não tenho certeza se essa abordagem foi adotada, mas uma solução alternativa que estou usando é:
A saída deve ser:
fonte
Existe a possibilidade de você querer aplicar esta função para cada instância diferente da classe. Então aqui está a solução para isso também
fonte
Aqui está a minha solução, que eu acho que é um pouco menos burra do que a maioria das outras aqui. É semelhante à resposta do nightowl.
fonte
De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 e http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Podemos criar uma função externa e propagá-la com o auto-objeto da classe:
OU sem joblib:
fonte
Esta pode não ser uma solução muito boa, mas no meu caso, eu a resolvo assim.
Eu tive que passar
self
para a minha função, pois tenho que acessar atributos e funções da minha classe através dessa função. Isso está funcionando para mim. Correções e sugestões são sempre bem-vindas.fonte
Aqui está um boilerplate que escrevi para usar o pool de multiprocessamento em python3, especificamente python3.7.7 foi usado para executar os testes. Eu consegui minhas corridas mais rápidas usando
imap_unordered
. Basta conectar seu cenário e experimentá-lo. Você pode usartimeit
ou apenastime.time()
descobrir o que funciona melhor para você.No cenário acima,
imap_unordered
na verdade, parece ter o pior desempenho para mim. Experimente o seu caso e compare-o na máquina em que planeja executá-lo. Leia também sobre Process Pools . Felicidades!fonte