Use numpy array na memória compartilhada para multiprocessamento

111

Eu gostaria de usar uma matriz numpy na memória compartilhada para uso com o módulo de multiprocessamento. A dificuldade é usá-lo como um array numpy, e não apenas como um array ctypes.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

Isso produz resultados como:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

O array pode ser acessado de forma ctypes, por exemplo, arr[i]faz sentido. No entanto, não é uma matriz numpy e não posso executar operações como -1*arr, ou arr.sum(). Suponho que uma solução seria converter a matriz ctypes em uma matriz numpy. Porém (além de não poder fazer esse trabalho), não acredito que seria mais compartilhado.

Parece que haveria uma solução padrão para o que deve ser um problema comum.

Ian Langmore
fonte
1
Não é igual a este? stackoverflow.com/questions/5033799/...
pygabriel
1
Não é exatamente a mesma pergunta. A questão vinculada é perguntar sobre, em subprocessvez de multiprocessing.
Andrew

Respostas:

82

Para adicionar às respostas de @unutbu (não está mais disponível) e de @Henry Gomersall. Você pode usar shared_arr.get_lock()para sincronizar o acesso quando necessário:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Exemplo

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

Se você não precisa de acesso sincronizado ou cria seus próprios bloqueios, então mp.Array()é desnecessário. Você poderia usar mp.sharedctypes.RawArrayneste caso.

jfs
fonte
2
Bela resposta! Se eu quiser ter mais de um array compartilhado, cada um separadamente travável, mas com o número de arrays determinado em tempo de execução, isso é uma extensão direta do que você fez aqui?
Andrew
3
@Andrew: arrays compartilhados devem ser criados antes que os processos filho sejam gerados.
jfs de
Bom ponto sobre a ordem das operações. No entanto, era isso que eu tinha em mente: criar um número de matrizes compartilhadas especificado pelo usuário e, em seguida, gerar alguns processos filho. Isso é simples?
Andrew
1
@Chicony: você não pode alterar o tamanho do Array. Pense nisso como um bloco compartilhado de memória que precisava ser alocado antes que os processos filho fossem iniciados. Você não precisa usar toda a memória, por exemplo, você pode passar countpara numpy.frombuffer(). Você pode tentar fazer isso em um nível inferior usando mmapou algo como posix_ipcdiretamente para implementar um analógico do RawArray (pode envolver a cópia durante o redimensionamento) (ou procurar por uma biblioteca existente). Ou se sua tarefa permitir: copie os dados em partes (se não precisar de todos de uma vez). "Como redimensionar uma memória compartilhada" é uma boa pergunta à parte.
jfs
1
@umopapisdn: Pool()define o número de processos (o número de núcleos de CPU disponíveis é usado por padrão). Mé o número de vezes que a f()função é chamada.
jfs
21

O Arrayobjeto possui um get_obj()método associado a ele, que retorna o array ctypes que apresenta uma interface de buffer. Acho que o seguinte deve funcionar ...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

Quando executado, ele imprime o primeiro elemento de aagora sendo 10.0, mostrando ae bsão apenas duas visualizações na mesma memória.

Para ter certeza de que ele ainda é seguro para multiprocessadores, acredito que você terá que usar os métodos acquiree releaseexistentes no Arrayobjeto a, e seu bloqueio embutido para garantir que todos sejam acessados ​​com segurança (embora eu não seja um especialista no módulo multiprocessador).

Henry Gomersall
fonte
não funcionará sem sincronização como @unutbu demonstrou em sua resposta (agora excluída).
jfs
1
Presumivelmente, se você quiser apenas acessar o pós-processamento do array, isso pode ser feito de forma limpa, sem se preocupar com problemas de simultaneidade e bloqueio.
Henry Gomersall
neste caso, você não precisa mp.Array.
jfs
1
O código de processamento pode exigir matrizes bloqueadas, mas a interpretação pós-processamento dos dados pode não necessariamente. Acho que isso vem da compreensão de qual é exatamente o problema. Claramente, acessar dados compartilhados simultaneamente exigirá alguma proteção, o que achei que seria óbvio!
Henry Gomersall
16

Embora as respostas já dadas sejam boas, há uma solução muito mais fácil para esse problema, desde que duas condições sejam atendidas:

  1. Você está em um padrão POSIX sistema operacional (por exemplo, Linux, Mac OSX); e
  2. Seus processos filho precisam de acesso somente leitura ao array compartilhado.

Nesse caso, você não precisa se preocupar em tornar as variáveis ​​explicitamente compartilhadas, pois os processos filhos serão criados usando um fork. Uma criança bifurcada compartilha automaticamente o espaço de memória dos pais. No contexto do multiprocessamento Python, isso significa que ele compartilha todas as variáveis ​​de nível de módulo ; observe que isso não se aplica a argumentos que você passa explicitamente para seus processos filho ou para as funções que você chama em um multiprocessing.Poolou outro.

Um exemplo simples:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))
EelkeSpaak
fonte
3
+1 Informação realmente valiosa. Você pode explicar por que apenas os vars de nível de módulo são compartilhados? Por que os vars locais não fazem parte do espaço de memória dos pais? Por exemplo, por que isso não pode funcionar se eu tenho uma função F com var local V e uma função G dentro de F que faz referência a V?
Coffee_Table
5
Aviso: esta resposta é um pouco enganosa. O processo filho recebe uma cópia do estado do processo pai, incluindo variáveis ​​globais, no momento da bifurcação. Os estados não estão de forma alguma sincronizados e irão divergir a partir desse momento. Esta técnica pode ser útil em alguns cenários (por exemplo: bifurcação de processos filho ad-hoc em que cada um manipula um instantâneo do processo pai e, em seguida, termina), mas é inútil em outros (por exemplo: processos filho de longa duração que precisam compartilhar e sincronizar dados com o processo pai).
David Stein,
4
@EelkeSpaak: Sua declaração - "uma criança bifurcada automaticamente compartilha o espaço da memória dos pais" - está incorreta. Se eu tiver um processo filho que deseja monitorar o estado do processo pai, de maneira estritamente somente leitura, a bifurcação não me levará lá: a criança apenas vê um instantâneo do estado pai no momento da bifurcação. Na verdade, era exatamente isso que eu estava tentando fazer (seguindo sua resposta) quando descobri essa limitação. Daí o pós-escrito em sua resposta. Resumindo: o estado pai não é "compartilhado", mas apenas copiado para o filho. Isso não é "compartilhar" no sentido usual.
David Stein,
2
Estou enganado ao pensar que esta é uma situação de cópia na gravação, pelo menos em sistemas posix? Ou seja, após a bifurcação, acho que a memória é compartilhada até que novos dados sejam gravados, ponto em que uma cópia é criada. Portanto, sim, é verdade que os dados não são exatamente "compartilhados", mas podem fornecer um aumento de desempenho potencialmente enorme. Se o seu processo for somente leitura, não haverá sobrecarga de cópia! Eu entendi o ponto corretamente?
remetente
2
@senderle Sim, foi exatamente isso que eu quis dizer! Daí meu ponto (2) na resposta sobre acesso somente leitura.
EelkeSpaak 01 de
11

Eu escrevi um pequeno módulo python que usa memória compartilhada POSIX para compartilhar matrizes entorpecidas entre interpretadores python. Talvez você ache útil.

https://pypi.python.org/pypi/SharedArray

Funciona assim:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])
esteira
fonte
8

Você pode usar o sharedmemmódulo: https://bitbucket.org/cleemesser/numpy-sharedmem

Aqui está seu código original, desta vez usando memória compartilhada que se comporta como uma matriz NumPy (observe a última instrução adicional que chama uma sum()função NumPy ):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()
Velimir Mlaker
fonte
1
Observação: isto não está mais sendo desenvolvido e parece não funcionar no linux github.com/sturlamolden/sharedmem-numpy/issues/4
DC
numpy-sharedmem pode não estar em desenvolvimento, mas ainda funciona no Linux, verifique github.com/vmlaker/benchmark-sharedmem .
Velimir Mlaker