Compartilhando uma fila de resultados entre vários processos

92

A documentação do multiprocessingmódulo mostra como passar uma fila para um processo iniciado com multiprocessing.Process. Mas como posso compartilhar uma fila com processos de trabalho assíncronos iniciados apply_async? Eu não preciso de junção dinâmica ou qualquer outra coisa, apenas uma forma de os trabalhadores (repetidamente) reportarem seus resultados de volta à base.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

Esta falha com: RuntimeError: Queue objects should only be shared between processes through inheritance. Eu entendo o que isso significa e entendo o conselho para herdar em vez de exigir decapagem / retirada da colheita (e todas as restrições especiais do Windows). Mas como faço para passar na fila de uma maneira que funcione? Não consigo encontrar um exemplo e tentei várias alternativas que falharam de várias maneiras. Ajuda por favor?

alexis
fonte

Respostas:

133

Tente usar multiprocessing.Manager para gerenciar sua fila e também para torná-la acessível a diferentes trabalhadores.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
enderskill
fonte
Isso mesmo, obrigado! Houve um problema não relacionado com a chamada assíncrona em meu código original, então copiei a correção para sua resposta também.
alexis
16
Alguma explicação por que queue.Queue()não é adequado para isso?
mrgloom
@mrgloom: queue.Queuefoi construído para threading, usando travas na memória. Em um ambiente multiprocesso, cada subprocesso obteria sua própria cópia de uma queue.Queue()instância em seu próprio espaço de memória, uma vez que os subprocessos não compartilham memória (principalmente).
LeoRochael
@alexis Como obter os elementos do Manager (). Queue () depois que vários workers inseriram dados nele?
MSS
10

multiprocessing.Pooljá tem uma fila de resultados compartilhada, não há necessidade de envolver adicionalmente a Manager.Queue. Manager.Queueé uma queue.Queue(fila de multithreading) sob o capô, localizada em um processo de servidor separado e exposta por meio de proxies. Isso adiciona sobrecarga em comparação com a fila interna do Pool. Ao contrário de confiar no tratamento de resultados nativo do Pool, os resultados no Manager.Queuetambém não têm garantia de serem solicitados.

Os processos de trabalho não são iniciados com .apply_async(), isso já acontece quando você instancia Pool. O que é iniciado quando você liga pool.apply_async()é um novo "trabalho". Os processos de trabalho do Pool executam a multiprocessing.pool.workerfunção sob o capô. Esta função se encarrega de processar novas "tarefas" transferidas por meio do pool interno Pool._inqueuee de enviar os resultados de volta ao pai pelo Pool._outqueue. Seu especificado funcserá executado em multiprocessing.pool.worker. funcsó precisa de returnalgo e o resultado será automaticamente enviado de volta para o pai.

.apply_async() imediatamente (de forma assíncrona) retorna um AsyncResultobjeto (alias para ApplyResult). Você precisa chamar .get()(está bloqueando) naquele objeto para receber o resultado real. Outra opção seria registrar uma função de retorno de chamada , que é acionada assim que o resultado fica pronto.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Exemplo de saída:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Nota: Especificar o timeoutparâmetro -para .get()não interromperá o processamento real da tarefa dentro do trabalhador, apenas desbloqueia o pai em espera levantando a multiprocessing.TimeoutError.

Darkonauta
fonte
Interessante, vou experimentar na primeira chance que tiver. Certamente não funcionou assim em 2012.
alexis,
@alexis Python 2.7 (2010) de forma relevante aqui está faltando apenas o gerenciador de contexto e o error_callbackparâmetro -para apply_async, então não mudou muito desde então.
Darkonaut