Exemplo muito simples de usar a fila de multiprocessamento, pool e bloqueio

91

Tentei ler a documentação em http://docs.python.org/dev/library/multiprocessing.html, mas ainda estou lutando com o multiprocessamento Queue, Pool e Locking. E, por enquanto, consegui construir o exemplo abaixo.

Em relação à Fila e ao Pool, não tenho certeza se entendi o conceito da maneira certa, então me corrija se estiver errado. O que estou tentando alcançar é processar 2 solicitações de cada vez (a lista de dados tem 8 neste exemplo), então, o que devo usar? Pool para criar 2 processos que podem lidar com duas filas diferentes (2 no máximo) ou devo apenas usar a Queue para processar 2 entradas de cada vez? O bloqueio seria imprimir as saídas corretamente.

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)
thclpr
fonte

Respostas:

129

A melhor solução para seu problema é utilizar a Pool. Usar se Queueter uma funcionalidade separada de "alimentação de fila" é provavelmente um exagero.

Aqui está uma versão ligeiramente reorganizada do seu programa, desta vez com apenas 2 processos agrupados em a Pool. Acredito que seja a maneira mais fácil de fazer, com alterações mínimas no código original:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2)
    p.map(mp_worker, data)

if __name__ == '__main__':
    mp_handler()

Observe que a mp_worker()função agora aceita um único argumento (uma tupla dos dois argumentos anteriores) porque a map()função fragmenta seus dados de entrada em sublistas, cada sublista fornecida como um único argumento para sua função de trabalho.

Resultado:

Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

Edite de acordo com o comentário de @Thales abaixo:

Se você quiser "um bloqueio para cada limite de pool" para que seus processos sejam executados em pares tandem, ala:

A espera B espera | A pronto, B feito | C esperando, D esperando | C feito, D feito | ...

em seguida, altere a função de manipulador para lançar pools (de 2 processos) para cada par de dados:

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2)
        p.map(mp_worker, (task1, task2))

Agora sua saída é:

 Processs a Waiting 2 seconds
 Processs b Waiting 4 seconds
 Process a  DONE
 Process b  DONE
 Processs c Waiting 6 seconds
 Processs d Waiting 8 seconds
 Process c  DONE
 Process d  DONE
 Processs e Waiting 1 seconds
 Processs f Waiting 3 seconds
 Process e  DONE
 Process f  DONE
 Processs g Waiting 5 seconds
 Processs h Waiting 7 seconds
 Process g  DONE
 Process h  DONE
Velimir Mlaker
fonte
Obrigado pelo exemplo simples e direto de como fazer isso, mas como eu poderia aplicar o bloqueio para cada limite de pool? Quer dizer, se você executar o código, gostaria de ver algo como "A esperando B esperando | A pronto, b pronto | C esperando, D esperando | C pronto, D pronto"
thclpr
2
Em outras palavras, você não quer que C comece até que A e B estejam prontos?
Velimir Mlaker
Exatamente, posso fazer isso usando multiprocessing.Process, mas não consigo descobrir como fazer isso usando pool
thclpr
Muito obrigado, trabalhe como pretendido, mas na função mp_handler você está referenciando os dados variáveis ​​em vez de var1 :)
thclpr
Ok, obrigado, eu removi var1completamente, referindo-me a global data.
Velimir Mlaker
8

Isso pode não estar 100% relacionado à pergunta, mas na minha pesquisa por um exemplo de uso de multiprocessamento com uma fila, isso apareceu primeiro no google.

Este é um exemplo básico de classe que você pode instanciar e colocar itens em uma fila e pode esperar até que a fila termine. Isso é tudo que eu precisava.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process


class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:
            p.start()

    def render(self, item):
        self.queue.put(item)

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:
                break

            # process your item here

            self.queue.task_done()

    def terminate(self):
        """ wait until queue is empty and terminate processes """
        self.queue.join()
        for p in self.processes:
            p.terminate()

r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
linqu
fonte
2
O que são item1e item2? São algum tipo de tarefa ou função, que será executada em dois processos diferentes?
Zelphir Kaltstahl
2
sim, eles são tarefas ou parâmetros de entrada que são processados ​​de forma paralela.
linqu
8

Aqui está meu goto pessoal para este tópico:

Síntese aqui, (solicitações de pull bem-vindas!): Https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

import multiprocessing
import sys

THREADS = 3

# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()


def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    """
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...



    # Serial-only Portion
    GLOBALLOCK.acquire()
    print(str1)
    print(str2)
    GLOBALLOCK.release()


def main(argp=None):
    """Multiprocessing Spawn Example
    """
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 
    ]


    try:
        # Spawn up to 9999999 jobs, I think this is the maximum possible.
        # I do not know what happens if you exceed this.
        pool.map_async(func_worker, func_args).get(9999999)
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('\033[0m')
        sys.stdout.write('User Interupt\n')
    pool.close()

if __name__ == '__main__':
    main()
ThorSummoner
fonte
1
Não tenho certeza se .map_async () é melhor do que .map () de alguma forma.
ThorSummoner
3
O argumento para get()é um tempo limite, não tem nada a ver com o número de jobs que são iniciados.
mata,
@mata então, isso deve ser usado em um loop de votação? .get(timeout=1)? e está certo apenas dizer .get()para obter a lista completa?
ThorSummoner
Sim, .get()aguarda indefinidamente até que todos os resultados estejam disponíveis e retorna a lista de resultados. Você pode usar um loop de pesquisa para verificar se os resultados do clima estão disponíveis ou pode passar uma função de retorno de chamada na map_async()chamada que será invocada para cada resultado assim que estiver disponível.
mata,
2

Para todos que usam editores como o Komodo Edit (win10), adicione sys.stdout.flush()a:

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs
    sys.stdout.flush()

ou como primeira linha para:

    if __name__ == '__main__':
       sys.stdout.flush()

Isso ajuda a ver o que acontece durante a execução do script; em vez de olhar para a caixa preta da linha de comando.

ZF007
fonte
1

Aqui está um exemplo do meu código (para pool encadeado, mas apenas altere o nome da classe e você terá o pool de processos):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            )
            pool.submit(execute_run, rp)
pool.join()

Basicamente:

  • pool = ThreadPoolExecutor(6) cria um pool para 6 threads
  • Então você tem um monte de for's que adicionam tarefas ao pool
  • pool.submit(execute_run, rp) adiciona uma tarefa ao pool, primeiro um argumento é uma função chamada em um thread / processo, o restante dos argumentos são passados ​​para a função chamada.
  • pool.join aguarda até que todas as tarefas sejam concluídas.
jb.
fonte
2
Observe que você está usando concurrent.futures, mas o OP está perguntando sobre multiprocessinge Python 2.7.
Tim Peters