Python: Como posso executar funções Python em paralelo?

109

Eu pesquisei primeiro e não consegui encontrar uma resposta para minha pergunta. Estou tentando executar várias funções em paralelo em Python.

Eu tenho algo assim:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Eu quero chamar func1 e func2 e executá-los ao mesmo tempo. As funções não interagem entre si ou no mesmo objeto. No momento, tenho que esperar que func1 termine antes que func2 comece. Como faço algo como abaixo:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Quero ser capaz de criar os dois diretórios quase ao mesmo tempo, porque a cada minuto estou contando quantos arquivos estão sendo criados. Se o diretório não estiver lá, isso prejudicará meu tempo.

lmcadory
fonte
1
Você pode querer redesenhar isso; se você está contando o número de arquivos / pastas a cada minuto, está criando uma condição de corrida. Que tal fazer com que cada função atualize um contador ou use um arquivo de bloqueio para garantir que o processo periódico não atualize a contagem até que ambas as funções tenham concluído a execução?

Respostas:

163

Você pode usar threadingou multiprocessing.

Devido às peculiaridades do CPython , threadingé improvável que alcance o verdadeiro paralelismo. Por esse motivo, multiprocessinggeralmente é uma aposta melhor.

Aqui está um exemplo completo:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

A mecânica de iniciar / juntar processos filho pode ser facilmente encapsulada em uma função ao longo das linhas de runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE
fonte
4
Usei seu código, mas as funções ainda não iniciaram ao mesmo tempo.
lmcadory
4
@Lamar McAdory: Por favor, explique o que exatamente você quer dizer com "ao mesmo tempo", talvez dando um exemplo concreto do que você fez, do que esperava que acontecesse e do que realmente aconteceu.
NPE
4
@Lamar: Você nunca pode ter nenhuma garantia de "exatamente ao mesmo tempo" e pensar que pode é totalmente errado. Dependendo de quantos cpus você tem, a carga da máquina, o tempo de muitas coisas que acontecem no computador, tudo terá uma influência no tempo de início dos threads / processos. Além disso, como os processos são iniciados logo após a criação, a sobrecarga de criação de um processo também deve ser calculada na diferença de tempo que você vê.
Martin
1
é possível obter uma lista dos resultados de cada função? Digamos que cada função retorne um valor diferente, os valores podem ser acrescentados a alguma lista que pode ser usada posteriormente? talvez anexando o resultado a uma lista global?
Pelos
1
Se minhas funções usam parâmetros e quando eu passo os parâmetros ao chamá-los de processos separados, eles não são executados simultaneamente. Você pode ajudar
user2910372
18

Isso pode ser feito elegantemente com o Ray , um sistema que permite paralelizar e distribuir facilmente seu código Python.

Para paralelizar seu exemplo, você precisa definir suas funções com o @ray.remotedecorador e, em seguida, invocá-las com .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Se você passar o mesmo argumento para ambas as funções e o argumento for grande, uma maneira mais eficiente de fazer isso é usando ray.put(). Isso evita que o grande argumento seja serializado duas vezes e crie duas cópias de memória dele:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Se func1()e func2()retornar resultados, você precisará reescrever o código da seguinte maneira:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Existem várias vantagens em usar o Ray em vez do módulo de multiprocessamento . Em particular, o mesmo código será executado em uma única máquina, bem como em um cluster de máquinas. Para obter mais vantagens do Ray, consulte esta postagem relacionada .

Ion Stoica
fonte
18

Se suas funções estão principalmente fazendo trabalho de E / S (e menos trabalho de CPU) e você tem Python 3.2+, você pode usar um ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Se suas funções estão fazendo principalmente trabalho de CPU (e menos trabalho de E / S) e você tem Python 2.6+, você pode usar o módulo de multiprocessamento :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
David Foster
fonte
Esta é uma boa resposta. Como identificar a partir do resultado para as tarefas vinculadas de E / S usando concurrent.futures qual completou? Basicamente, em vez de funções lamba, se tivermos funções normais, como identificar o resultado mapeado para a função chamada?
Tragaknight de
Esqueça, encontrei uma maneira - em vez de run_cpu_tasks_in_parallel ([lambda: print ('CPU task 1 running!'), Lambda: print ('CPU task 2 running!'),]) Use isto - results = run_io_tasks_in_parallel ([lambda: {'is_something1': func1 ()}, lambda: {'is_something2': func2 ()},])
Tragaknight
5

Se você é um usuário do Windows e usa python 3, então esta postagem irá ajudá-lo a fazer programação paralela em python. Quando você executa a programação de pool de uma biblioteca de multiprocessamento normal, obterá um erro em relação à função principal em seu programa. Isso ocorre porque o Windows não tem funcionalidade fork (). O post abaixo está dando uma solução para o problema mencionado.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Como estava usando o python 3, mudei o programa um pouco assim:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

Após esta função, o código do problema acima também é alterado um pouco assim:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

E obtive o resultado como:

[1, 8, 27, 64, 125, 216]

Estou pensando que este post pode ser útil para alguns dos usuários do Windows.

Arun Sooraj
fonte
4

Não há como garantir que duas funções serão executadas em sincronia uma com a outra, o que parece ser o que você deseja fazer.

O melhor que você pode fazer é dividir a função em várias etapas e, em seguida, esperar que ambas terminem em pontos críticos de sincronização usando Process.joincomo as menções de resposta do @aix.

Isto é melhor que time.sleep(10) porque você não pode garantir os tempos exatos. Com a espera explicitamente, você está dizendo que as funções devem ser executadas naquela etapa antes de passar para a próxima, em vez de presumir que será feito em 10ms, o que não é garantido com base no que mais está acontecendo na máquina.

Davy8
fonte
1

Parece que você tem uma única função que precisa chamar em dois parâmetros diferentes. Isso pode ser feito elegantemente usando uma combinação de concurrent.futurese mapcom Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Agora, se sua operação for vinculada a IO, você pode usar o ThreadPoolExecutorcomo tal:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Observe como mapé usado aqui paramap sua função para a lista de argumentos.

Agora, se sua função for limitada pela CPU, você pode usar ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Se não tiver certeza, você pode simplesmente tentar os dois e ver qual oferece melhores resultados.

Finalmente, se quiser imprimir seus resultados, você pode simplesmente fazer o seguinte:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
fonte