Estou tendo muitos problemas para tentar entender como a fila de multiprocessamento funciona em python e como implementá-la. Digamos que eu tenha dois módulos python que acessam dados de um arquivo compartilhado, vamos chamar esses dois módulos de escritor e leitor. Meu plano é fazer com que o leitor e o gravador coloquem as solicitações em duas filas de multiprocessamento separadas e, em seguida, um terceiro processo coloque essas solicitações em um loop e as execute como tal.
Meu principal problema é que eu realmente não sei como implementar multiprocessing.queue corretamente, você não pode realmente instanciar o objeto para cada processo, já que serão filas separadas, como você se certifica de que todos os processos se relacionam a uma fila compartilhada (ou neste caso, filas)
Respostas:
Este é um exemplo simples de um leitor e um gravador compartilhando uma única fila ... O gravador envia vários números inteiros para o leitor; quando o gravador fica sem números, ele envia 'DONE', que permite que o leitor saiba que saiu do loop de leitura.
from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
fonte
em "
from queue import Queue
" não há nenhum módulo chamadoqueue
, em vez dissomultiprocessing
deve ser usado. Portanto, deve ser semelhante a "from multiprocessing import Queue
"fonte
multiprocessing.Queue
é correto. O normalQueue.Queue
é usado para threads de python . Ao tentar usarQueue.Queue
com multiprocessamento, cópias do objeto Fila serão criadas em cada processo filho e os processos filho nunca serão atualizados. Basicamente,Queue.Queue
funciona usando um objeto compartilhado global emultiprocessing.Queue
funciona usando IPC. Consulte: stackoverflow.com/questions/925100/…Aqui está um uso muito simples de
multiprocessing.Queue
emultiprocessing.Process
que permite aos chamadores enviar um "evento" mais argumentos para um processo separado que despacha o evento para um método "do_" no processo. (Python 3.4+)import multiprocessing as mp import collections Msg = collections.namedtuple('Msg', ['event', 'args']) class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Uso:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2) if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
O
send
acontece no processo pai, odo_*
acontece no processo filho.Eu deixei de fora qualquer tratamento de exceção que obviamente interromperia o loop de execução e sairia do processo filho. Você também pode personalizá-lo substituindo
run
para controlar o bloqueio ou qualquer outra coisa.Isso é realmente útil apenas em situações em que você tem um único processo de trabalho, mas acho que é uma resposta relevante a essa pergunta para demonstrar um cenário comum com um pouco mais de orientação a objetos.
fonte
Eu dei uma olhada em várias respostas em overflow de pilha e na web enquanto tentava configurar uma maneira de fazer multiprocessamento usando filas para passar grandes dataframes de pandas. Pareceu-me que cada resposta estava reiterando o mesmo tipo de solução, sem qualquer consideração da multiplicidade de casos extremos que definitivamente encontraremos ao configurar cálculos como esses. O problema é que há muitas coisas em jogo ao mesmo tempo. O número de tarefas, o número de trabalhadores, a duração de cada tarefa e possíveis exceções durante a execução da tarefa. Tudo isso torna a sincronização complicada e a maioria das respostas não aborda como você pode fazer isso. Portanto, esta é a minha opinião depois de mexer por algumas horas, espero que seja genérico o suficiente para a maioria das pessoas considerá-lo útil.
Algumas reflexões antes de qualquer exemplo de codificação. Como
queue.Empty
ouqueue.qsize()
qualquer outro método semelhante não é confiável para o controle de fluxo, qualquer código do mesmowhile True: try: task = pending_queue.get_nowait() except queue.Empty: break
é falso. Isso matará o trabalhador, mesmo se, milissegundos depois, outra tarefa aparecer na fila. O trabalhador não irá se recuperar e depois de um tempo TODOS os trabalhadores desaparecerão, pois eles encontram a fila momentaneamente vazia. O resultado final será que a função de multiprocessamento principal (aquela com join () nos processos) retornará sem que todas as tarefas tenham sido concluídas. Agradável. Boa sorte na depuração se você tiver milhares de tarefas e algumas estiverem faltando.
A outra questão é o uso de valores sentinela. Muitas pessoas sugeriram adicionar um valor de sentinela na fila para sinalizar o fim da fila. Mas sinalizar para quem exatamente? Se houver N trabalhadores, assumindo que N seja o número de núcleos disponíveis mais ou menos, um único valor sentinela sinalizará apenas o fim da fila para um trabalhador. Todos os outros trabalhadores ficarão sentados esperando por mais trabalho quando não houver mais nenhum. Exemplos típicos que vi são
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
Um trabalhador obterá o valor da sentinela enquanto o restante aguardará indefinidamente. Nenhuma postagem que encontrei mencionou que você precisa enviar o valor sentinela para a fila PELO MENOS quantas vezes você tiver trabalhadores, para que TODOS eles o recebam.
O outro problema é o tratamento de exceções durante a execução da tarefa. Novamente, eles devem ser capturados e gerenciados. Além disso, se você tiver uma
completed_tasks
fila, deverá contar independentemente de forma determinística quantos itens estão na fila antes de decidir que o trabalho está concluído. Mais uma vez, confiar nos tamanhos das filas está fadado ao fracasso e retorna resultados inesperados.No exemplo abaixo, a
par_proc()
função receberá uma lista de tarefas incluindo as funções com as quais essas tarefas devem ser executadas junto com quaisquer argumentos e valores nomeados.import multiprocessing as mp import dill as pickle import queue import time import psutil SENTINEL = None def do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None}) def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
E aqui está um teste para executar o código acima contra
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
mais outro com algumas exceções
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
Espero que isso seja útil.
fonte
Implementamos duas versões disso, uma um pool multi thread simples que pode executar muitos tipos de callables, tornando nossas vidas muito mais fáceis e a segunda versão que usa processos , que é menos flexível em termos de callables e requer uma chamada extra para endro.
Definir frozen_pool como true congelará a execução até que finish_pool_queue seja chamada em qualquer uma das classes.
Versão do Tópico:
''' Created on Nov 4, 2019 @author: Kevin ''' from threading import Lock, Thread from Queue import Queue import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os class ThreadPool(object): def __init__(self, queue_threads, *args, **kwargs): self.frozen_pool = kwargs.get('frozen_pool', False) self.print_queue = kwargs.get('print_queue', True) self.pool_results = [] self.lock = Lock() self.queue_threads = queue_threads self.queue = Queue() self.threads = [] for i in range(self.queue_threads): t = Thread(target=self.make_pool_call) t.daemon = True t.start() self.threads.append(t) def make_pool_call(self): while True: if self.frozen_pool: #print '--> Queue is frozen' sleep(1) continue item = self.queue.get() if item is None: break call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.lock.acquire() self.pool_results.append((item, result)) self.lock.release() except Exception as e: self.lock.acquire() print e traceback.print_exc() self.lock.release() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self): self.frozen_pool = False while self.queue.unfinished_tasks > 0: if self.print_queue: print_info('--> Thread pool... %s' % self.queue.unfinished_tasks) sleep(5) self.queue.join() for i in range(self.queue_threads): self.queue.put(None) for t in self.threads: t.join() del self.threads[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Versão do processo:
''' Created on Nov 4, 2019 @author: Kevin ''' import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\ RawArray, Manager from dill import dill import ctypes from helium.misc.utils import ignore_exception from mem_top import mem_top import gc class ProcessPool(object): def __init__(self, queue_processes, *args, **kwargs): self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False)) self.print_queue = kwargs.get('print_queue', True) self.manager = Manager() self.pool_results = self.manager.list() self.queue_processes = queue_processes self.queue = JoinableQueue() self.processes = [] for i in range(self.queue_processes): p = Process(target=self.make_pool_call) p.start() self.processes.append(p) print 'Processes', self.queue_processes def make_pool_call(self): while True: if self.frozen_pool.value: sleep(1) continue item_pickled = self.queue.get() if item_pickled is None: #print '--> Ending' self.queue.task_done() break item = dill.loads(item_pickled) call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.pool_results.append(dill.dumps((item, result))) else: del call, args, kwargs, keep_results, item, result except Exception as e: print e traceback.print_exc() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self, callable=None): self.frozen_pool.value = False while self.queue._unfinished_tasks.get_value() > 0: if self.print_queue: print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value())) if callable: callable() sleep(5) for i in range(self.queue_processes): self.queue.put(None) self.queue.join() self.queue.close() for p in self.processes: with ignore_exception: p.join(10) with ignore_exception: p.terminate() with ignore_exception: del self.processes[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Ligue com:
tp = ThreadPool(queue_threads=2) tp.queue.put({'call': test, 'args': [random.randint(0, 100)]}) tp.finish_pool_queue()
ou
pp = ProcessPool(queue_processes=2) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.finish_pool_queue()
fonte
Acabei de fazer um exemplo simples e geral para demonstrar a passagem de uma mensagem por uma Fila entre 2 programas independentes. Não responde diretamente à pergunta do OP, mas deve ser suficientemente claro indicando o conceito.
Servidor:
multiprocessing-queue-manager-server.py
import asyncio import concurrent.futures import multiprocessing import multiprocessing.managers import queue import sys import threading from typing import Any, AnyStr, Dict, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: global q if not ident in q: q[ident] = multiprocessing.Queue() return q[ident] q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict() delattr(QueueManager, 'get_queue') def init_queue_manager_server(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue', get_queue) def serve(no: int, term_ev: threading.Event): manager: QueueManager with QueueManager(authkey=QueueManager.__name__.encode()) as manager: print(f"Server address {no}: {manager.address}") while not term_ev.is_set(): try: item: Any = manager.get_queue().get(timeout=0.1) print(f"Client {no}: {item} from {manager.address}") except queue.Empty: continue async def main(n: int): init_queue_manager_server() term_ev: threading.Event = threading.Event() executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor() i: int for i in range(n): asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev)) # Gracefully shut down try: await asyncio.get_running_loop().create_future() except asyncio.CancelledError: term_ev.set() executor.shutdown() raise if __name__ == '__main__': asyncio.run(main(int(sys.argv[1])))
Cliente:
multiprocessing-queue-manager-client.py
import multiprocessing import multiprocessing.managers import os import sys from typing import AnyStr, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass delattr(QueueManager, 'get_queue') def init_queue_manager_client(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue') def main(): init_queue_manager_client() manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode()) manager.connect() message = f"A message from {os.getpid()}" print(f"Message to send: {message}") manager.get_queue().put(message) if __name__ == '__main__': main()
Uso
Servidor:
N
é um número inteiro que indica quantos servidores devem ser criados. Copie um dos<server-address-N>
resultados do servidor e torne-o o primeiro argumento de cada ummultiprocessing-queue-manager-client.py
.Cliente:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Resultado
Servidor:
Client 1: <item> from <server-address-1>
Síntese: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : criou um pacote aqui .
Servidor:
import ipcq with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server: server.get_queue().get()
Cliente:
import ipcq client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) client.get_queue().put('a message')
fonte