Como implementar paralelo, atrasado de tal forma que o loop for paralelizado para quando a saída fica abaixo de um limite?

8

Suponha que eu tenha o seguinte código:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

def func(x,y):
    return y/x
def main(y, xmin,xmax, dx):
    x = arange(xmin,xmax,dx)
    output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
    return x, asarray(output)
def demo():
    x,z = main(2.,1.,30.,.1)
    plt.plot(x,z, label='All values')
    plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
    plt.show()

demo()

Eu quero calcular a saída apenas até a saída> um determinado número (pode-se supor que os elementos da saída diminuam monotonicamente com o aumento de x) e depois pare (NÃO calculando todos os valores de x e depois classificando, isso é ineficiente para o meu objetivo). Existe alguma maneira de fazer isso usando paralelo, atrasado ou qualquer outro multiprocessamento?

user247534
fonte
Você pode usar numpy também. Eu adicionei alguns números. A seleção [z> .1] na função demo deve ser feita na função principal para tornar o código mais eficiente.
user247534
Eu sei que seria uma bagunça, mas eu criaria uma lista, passaria para a função e a função acrescentaria o resultado a essa lista. Então, fora, eu verificaria se a lista contém um número maior que isso e encerraria os threads de alguma forma. Agora que penso sobre isso, provavelmente existem métodos mais inteligentes para fazer isso, como filas
Maxxik CZ

Respostas:

1

Não havia nenhum output > a given numberespecificado, então eu inventei um. após o teste, tive que reverter a condição para uma operação adequada output < a given number.

Eu usaria um pool, iniciaria os processos com uma função de retorno de chamada para verificar a condição de parada e, em seguida, encerraria o pool quando estiver pronto. mas isso causaria uma condição de corrida que permitiria que os resultados fossem omitidos da execução de processos que não tinham permissão para concluir. Eu acho que esse método tem uma modificação mínima no seu código e é muito fácil de ler. A ordem da lista NÃO é garantida.

Prós: muito pouca sobrecarga
Contras: podem ter resultados ausentes.

Método 1)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
        output.append(ret)
        if ret < stop_condition:
            worker_pool.terminate()


def func(x, y, ):
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Esse método possui mais sobrecarga, mas permitirá processos que começaram a terminar. Método 2)

from scipy import *
import multiprocessing

import matplotlib.pyplot as plt


def stop_condition_callback(ret):
    if ret is not None:
        if ret < stop_condition:
            worker_stop.value = 1
        else:
            output.append(ret)


def func(x, y, ):
    if worker_stop.value != 0:
        return None
    return y / x


def main(y, xmin, xmax, dx):
    x = arange(xmin, xmax, dx)
    print("Number of calculations: %d" % (len(x)))

    # add calculations to the pool
    for i in x:
        worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)

    # wait for the pool to finish/terminate
    worker_pool.close()
    worker_pool.join()

    print("Number of results: %d" % (len(output)))
    return x, asarray(output)


def demo():
    x, z_list = main(2., 1., 30., .1)
    plt.plot(z_list, label='desired range')
    plt.show()


output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1

worker_pool = multiprocessing.Pool()
demo()

Método 3) Prós: Nenhum resultado será deixado de lado
Contras: Este passo está muito além do que você normalmente faria.

pegue o método 1 e adicione

def stopPoolButLetRunningTaskFinish(pool):
    # Pool() shutdown new task from being started, by emptying the query all worker processes draw from
    while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
        pool._inqueue._reader.recv()
    # Send sentinels to all worker processes
    for a in range(len(pool._pool)):
            pool._inqueue.put(None)

Então mude stop_condition_callback

def stop_condition_callback(ret):
    if ret[1] < stop_condition:
        #worker_pool.terminate()
        stopPoolButLetRunningTaskFinish(worker_pool)
    else:
        output.append(ret)
Ron
fonte
0

Eu usaria o Dask para executar em paralelo e, especificamente, a interface de futuros para feedback em tempo real dos resultados à medida que eles são concluídos. Quando terminar, você poderá cancelar os futuros restantes em andamento, conceder os desnecessários para concluir de forma assíncrona ou fechar o cluster.

from dask.distributed import Client, as_completed
client = Client()  # defaults to ncores workers, one thread each
y, xmin, xmax, dx = 2.,1.,30.,.1

def func(x, y):
    return x, y/x
x = arange(xmin,xmax,dx)
outx = []
output = []
futs = [client.submit(func, val, y) for val in x]
for future in as_completed(futs):
    outs = future.result()
    outx.append(outs[0])
    output.append(outs[1])
    if outs[1] < 0.1:
        break

Notas: - Suponho que você quis dizer "menor que", porque, caso contrário, o primeiro valor já passa ( y / xmin > 0.1) - as saídas não são garantidas na ordem em que você as inseriu, se desejar buscar resultados à medida que ficarem prontos, mas com tal cálculo rápido, talvez sempre sejam (é por isso que a função retornou também o valor da entrada) - se você parar de calcular, a saída será mais curta que o conjunto completo de entradas, por isso não tenho certeza do que deseja impressão.

mdurant
fonte