A documentação do multiprocessing
módulo mostra como passar uma fila para um processo iniciado com multiprocessing.Process
. Mas como posso compartilhar uma fila com processos de trabalho assíncronos iniciados apply_async
? Eu não preciso de junção dinâmica ou qualquer outra coisa, apenas uma forma de os trabalhadores (repetidamente) reportarem seus resultados de volta à base.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Esta falha com:
RuntimeError: Queue objects should only be shared between processes through inheritance
. Eu entendo o que isso significa e entendo o conselho para herdar em vez de exigir decapagem / retirada da colheita (e todas as restrições especiais do Windows). Mas como faço para passar na fila de uma maneira que funcione? Não consigo encontrar um exemplo e tentei várias alternativas que falharam de várias maneiras. Ajuda por favor?
queue.Queue()
não é adequado para isso?queue.Queue
foi construído para threading, usando travas na memória. Em um ambiente multiprocesso, cada subprocesso obteria sua própria cópia de umaqueue.Queue()
instância em seu próprio espaço de memória, uma vez que os subprocessos não compartilham memória (principalmente).multiprocessing.Pool
já tem uma fila de resultados compartilhada, não há necessidade de envolver adicionalmente aManager.Queue
.Manager.Queue
é umaqueue.Queue
(fila de multithreading) sob o capô, localizada em um processo de servidor separado e exposta por meio de proxies. Isso adiciona sobrecarga em comparação com a fila interna do Pool. Ao contrário de confiar no tratamento de resultados nativo do Pool, os resultados noManager.Queue
também não têm garantia de serem solicitados.Os processos de trabalho não são iniciados com
.apply_async()
, isso já acontece quando você instanciaPool
. O que é iniciado quando você ligapool.apply_async()
é um novo "trabalho". Os processos de trabalho do Pool executam amultiprocessing.pool.worker
função sob o capô. Esta função se encarrega de processar novas "tarefas" transferidas por meio do pool internoPool._inqueue
e de enviar os resultados de volta ao pai peloPool._outqueue
. Seu especificadofunc
será executado emmultiprocessing.pool.worker
.func
só precisa dereturn
algo e o resultado será automaticamente enviado de volta para o pai..apply_async()
imediatamente (de forma assíncrona) retorna umAsyncResult
objeto (alias paraApplyResult
). Você precisa chamar.get()
(está bloqueando) naquele objeto para receber o resultado real. Outra opção seria registrar uma função de retorno de chamada , que é acionada assim que o resultado fica pronto.Exemplo de saída:
Nota: Especificar o
timeout
parâmetro -para.get()
não interromperá o processamento real da tarefa dentro do trabalhador, apenas desbloqueia o pai em espera levantando amultiprocessing.TimeoutError
.fonte
error_callback
parâmetro -paraapply_async
, então não mudou muito desde então.