Como recuperar o valor de retorno de uma função passada para multiprocessing.Process?

190

No código de exemplo abaixo, gostaria de recuperar o valor de retorno da função worker. Como posso fazer isso? Onde esse valor é armazenado?

Código de exemplo:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Resultado:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Não consigo encontrar o atributo relevante nos objetos armazenados jobs.

blz
fonte

Respostas:

189

Use variável compartilhada para se comunicar. Por exemplo, assim:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
vartec
fonte
46
Eu recomendaria usar um multiprocessing.Queue, em vez de um Manageraqui. Usar um Managerrequer gerar um processo totalmente novo, que é um exagero quando um Queuefaria.
19/04
1
@ano: Gostaria de saber, se usarmos o objeto Queue (), não podemos ter certeza da ordem quando cada processo retornar o valor. Quero dizer, se precisamos da ordem no resultado, para fazer o próximo trabalho. Como poderíamos certeza de onde exatamente qual a saída é a partir do qual processo
Catbuilts
4
@ Catbuilts Você pode retornar uma tupla de cada processo, em que um valor é o valor de retorno real de seu interesse e o outro é um identificador exclusivo do processo. Mas também me pergunto por que você precisa saber qual processo está retornando qual valor. Se isso é o que você realmente precisa saber sobre o processo, ou precisa se correlacionar entre sua lista de entradas e a lista de saídas? Nesse caso, eu recomendaria o uso multiprocessing.Pool.mappara processar sua lista de itens de trabalho.
Dano
5
advertências para funções com apenas um único argumento : deve usar args=(my_function_argument, ). Observe a ,vírgula aqui! Ou então o Python irá reclamar de "falta de argumentos posicionais". Levei 10 minutos para descobrir. Verifique também o uso manual (na seção "classe de processo").
yuqli
2
@vartec Uma desvantagem do uso de um dicionário multipriocessing.Manager () é que pickles (serializa) o objeto que ele retorna, portanto, ele possui um gargalo fornecido pela biblioteca de pickles com tamanho máximo de 2GiB para o objeto retornar. Existe alguma outra maneira de fazer isso, evitando a serialização do objeto retornado?
Hirschme
68

Acho que a abordagem sugerida por @sega_sai é a melhor. Mas ele realmente precisa de um exemplo de código, então aqui vai:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

O que imprimirá os valores de retorno:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Se você estiver familiarizado com map(o Python 2 embutido), isso não deve ser muito desafiador. Caso contrário, dê uma olhada no link de sega_Sai .

Observe como é necessário pouco código. (Observe também como os processos são reutilizados).

Marca
fonte
1
Alguma idéia de por que meu getpid()retorno tem o mesmo valor? Estou executando o Python3
zelusp
Não tenho certeza de como o Pool distribui tarefas pelos trabalhadores. Talvez todos possam acabar no mesmo trabalhador se forem realmente rápidos? Isso acontece de forma consistente? Além disso, se você adicionar um atraso?
Mark
Eu também pensei que era uma questão relacionada à velocidade, mas quando eu alimento pool.mapum intervalo de 1.000.000 usando mais de 10 processos, vejo no máximo dois pids diferentes.
Zelusp
1
Então não tenho certeza. Eu acho que seria interessante abrir uma pergunta separada para isso.
Mark
Se as coisas que você deseja enviar uma função diferente para cada processo, de uso pool.apply_async: docs.python.org/3/library/...
Kyle
24

Este exemplo mostra como usar uma lista de instâncias de multiprocessamento.Pipe para retornar seqüências de caracteres de um número arbitrário de processos:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Resultado:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Esta solução utiliza menos recursos que um multiprocessamento.Queue que usa

  • um cano
  • pelo menos um bloqueio
  • um buffer
  • um fio

ou um multiprocessing.SimpleQueue que usa

  • um cano
  • pelo menos um bloqueio

É muito instrutivo procurar a fonte para cada um desses tipos.

David Cullen
fonte
Qual seria a melhor maneira de fazer isso sem tornar os tubos uma variável global?
Nickpick # 25/16
Coloquei todos os dados e códigos globais em uma função principal e funciona da mesma maneira. Isso responde à sua pergunta?
David Cullen
o canal sempre precisa ser lido antes que qualquer novo valor possa ser adicionado (enviado) a ele?
Nickpick
+1, boa resposta. Mas sobre a solução ser mais eficiente, a desvantagem é que você está criando uma Pipepor processo versus umaQueue para todos os processos. Não sei se isso acaba sendo mais eficiente em todos os casos.
sudo
2
Essa resposta causa um conflito se o objeto retornado for grande. Em vez de fazer o proc.join () primeiro, primeiro tentaria recv () o valor de retorno e depois faria a junção.
L. Pes
22

Por alguma razão, não consegui encontrar um exemplo geral de como fazer isso em Queuequalquer lugar (mesmo os exemplos de documentos do Python não geram vários processos), então aqui está o que eu comecei a trabalhar depois de 10 tentativas:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queueé uma fila de bloqueio de segurança de thread que você pode usar para armazenar os valores de retorno dos processos filhos. Então você tem que passar a fila para cada processo. Algo menos óbvio aqui é que você tem que get()partir da fila antes de joinos Processes ou então a fila enche e bloqueia tudo.

Atualização para aqueles que são orientados a objetos (testados no Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
sudo
fonte
18

Para qualquer pessoa que esteja procurando como obter um valor Processusando Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
Matthew Moisen
fonte
1
quando coloco algo em uma fila no meu processo de trabalho, minha associação nunca é alcançada. Alguma idéia de como isso pode acontecer?
Laurens Koppenol
@LaurensKoppenol, você quer dizer que seu código principal fica em p.join () permanentemente e nunca continua? Seu processo tem um loop infinito?
Matthew Moisen
4
Sim, ele fica ali infinitamente. Todos os meus trabalhadores terminam (o loop na função do trabalhador termina, a declaração de impressão é impressa posteriormente para todos os trabalhadores). A junção não faz nada. Se eu remover o Queueda minha função, deixe-me passar ojoin()
Laurens Koppenol 10/10
@LaurensKoppenol Talvez você não esteja ligando queue.put(ret)antes de ligar p.start()? Nesse caso, o segmento de trabalho será interrompido para queue.get()sempre. Você pode replicar isso copiando meu snippet acima enquanto comenta queue.put(ret).
Matthew Moisen
Eu editei esta resposta, o que queue.get()tem que acontecer antes do p.join(). Agora funciona para mim.
Jfunk #
10

Você pode usar o exitbuilt-in para definir o código de saída de um processo. Pode ser obtido a partir do exitcodeatributo do processo:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Resultado:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
David Cullen
fonte
4
Esteja avisado de que essa abordagem pode se tornar confusa. Os processos geralmente devem sair com o código de saída 0, se forem concluídos sem erros. Se você tiver alguma coisa monitorando os códigos de saída do processo do sistema, poderá vê-los como erros.
ferrouswheel
1
Perfeito se você apenas deseja gerar uma exceção no processo pai por erro.
crizCraig
5

O pacote pebble possui uma ótima alavancagem de abstração, o multiprocessing.Pipeque torna isso bastante simples:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Exemplo de: https://pythonhosted.org/Pebble/#concurrent-decorators

erikreed
fonte
3

Pensei em simplificar os exemplos mais simples copiados de cima, trabalhando para mim no Py3.6. O mais simples é multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

Você pode definir o número de processos no pool com, por exemplo Pool(processes=5),. No entanto, o padrão é a contagem de CPU, portanto, deixe em branco para tarefas associadas à CPU. (As tarefas vinculadas à E / S geralmente se adequam aos threads de qualquer maneira, pois os threads estão aguardando a maior parte do tempo para compartilhar um núcleo da CPU.) PoolTambém se aplica à otimização de chunking .

(Observe que o método worker não pode ser aninhado em um método. Inicialmente, defini meu método worker dentro do método que faz a chamada pool.mappara manter tudo independente, mas os processos não puderam importá-lo e joguei "AttributeError : Não é possível selecionar o objeto local outer_method..inner_method ". Mais aqui . Ele pode estar dentro de uma classe.)

(Aprecie a pergunta original especificada e 'represent!'não a impressão time.sleep(), mas sem ela achei que algum código estava sendo executado simultaneamente quando não estava.)


O Py3 ProcessPoolExecutortambém possui duas linhas ( .mapretorna um gerador, portanto você precisa do list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

Com Processes simples :

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Use SimpleQueuese tudo que você precisa é pute get. O primeiro loop inicia todos os processos, antes que o segundo faça as queue.getchamadas de bloqueio . Acho que também não há motivo para ligar p.join().

Chris
fonte
2

Uma solução simples:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Resultado:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Rubens_Zimbres
fonte
2

Se você estiver usando o Python 3, poderá usar concurrent.futures.ProcessPoolExecutorcomo uma abstração conveniente:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Resultado:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Aleph Aleph
fonte
0

Modifiquei a resposta da vartec um pouco, pois precisava obter os códigos de erro da função. (Obrigado vertec !!! é um truque incrível)

Isso também pode ser feito com um, manager.listmas acho melhor colocá-lo em um ditado e armazenar uma lista nele. Dessa forma, mantemos a função e os resultados, pois não podemos ter certeza da ordem em que a lista será preenchida.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
pelos
fonte