Objetos de memória compartilhada no multiprocessamento

123

Suponha que eu tenha uma matriz numpy grande de memória, que tenha uma função funcque aceite essa matriz gigante como entrada (junto com alguns outros parâmetros). funccom parâmetros diferentes pode ser executado em paralelo. Por exemplo:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Se eu usar a biblioteca de multiprocessamento, essa matriz gigante será copiada várias vezes em diferentes processos.

Existe uma maneira de permitir que diferentes processos compartilhem a mesma matriz? Este objeto de matriz é somente leitura e nunca será modificado.

O que é mais complicado, se arr não é uma matriz, mas um objeto python arbitrário, existe uma maneira de compartilhá-la?

[EDITADO]

Eu li a resposta, mas ainda estou um pouco confuso. Como fork () é copiado na gravação, não devemos invocar nenhum custo adicional ao gerar novos processos na biblioteca de multiprocessamento python. Mas o código a seguir sugere uma enorme sobrecarga:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

saída (e, a propósito, o custo aumenta à medida que o tamanho da matriz aumenta, então eu suspeito que ainda haja sobrecarga relacionada à cópia de memória):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Por que existe uma sobrecarga tão grande, se não copiamos a matriz? E que parte a memória compartilhada me salva?

vendeta
fonte
Você olhou para os documentos , certo?
Lev Levitsky
@FrancisAvila existe uma maneira de compartilhar não apenas array, mas objetos python arbitrários?
Vendetta
1
@LevLevitsky eu tenho que perguntar, existe uma maneira de compartilhar não apenas array, mas objetos python arbitrários?
Vendetta
2
Esta resposta explica bem por que objetos Python arbitrários não podem ser compartilhados.
Janne Karila

Respostas:

121

Se você usar um sistema operacional que usa fork()semântica de cópia na gravação (como qualquer unix comum), desde que você nunca altere sua estrutura de dados, ela estará disponível para todos os processos filhos sem ocupar memória adicional. Você não precisará fazer nada de especial (exceto tenha certeza absoluta de não alterar o objeto).

A coisa mais eficiente que você pode fazer pelo seu problema seria compactar sua matriz em uma estrutura de matriz eficiente (usando numpyou array), coloque-a na memória compartilhada, envolva-a multiprocessing.Arraye passe-a para suas funções. Esta resposta mostra como fazer isso .

Se você deseja um objeto compartilhado gravável , precisará envolvê-lo com algum tipo de sincronização ou bloqueio. multiprocessingfornece dois métodos para fazer isso : um usando memória compartilhada (adequado para valores simples, matrizes ou ctypes) ou umManager proxy, em que um processo retém a memória e um gerente arbitra o acesso a ela de outros processos (mesmo em uma rede).

o Manager abordagem pode ser usada com objetos Python arbitrários, mas será mais lenta que o equivalente usando memória compartilhada porque os objetos precisam ser serializados / desserializados e enviados entre processos.

Existem inúmeras bibliotecas e abordagens de processamento paralelo disponíveis no Python . multiprocessingé uma biblioteca excelente e bem arredondada, mas se você tiver necessidades especiais, talvez uma das outras abordagens seja melhor.

Francis Avila
fonte
25
Só para observar, no fork do Python (), na verdade, significa cópia no acesso (porque apenas o acesso ao objeto alterará sua contagem de ref).
Fabio Zadrozny
3
@FabioZadrozny Copiaria realmente o objeto inteiro ou apenas a página de memória que contém sua refcount?
zigg
5
AFAIK, apenas a página de memória que contém o refcount (portanto, 4kb em cada acesso ao objeto).
Fabio Zadrozny
1
@max Use um fechamento. A função atribuída apply_asyncdeve fazer referência ao objeto compartilhado no escopo diretamente, e não através de seus argumentos.
Francis Avila
3
@FrancisAvila como você usa um fechamento? A função que você atribui ao apply_async não deve ser selecionável? Ou isso é apenas uma restrição map_async?
GermanK
17

Encontrei o mesmo problema e escrevi uma pequena classe de utilitário de memória compartilhada para contornar isso.

Estou usando multiprocessing.RawArray(sem bloqueio), e também o acesso às matrizes não é sincronizado (sem bloqueio), tenha cuidado para não atirar nos próprios pés.

Com a solução, recebo acelerações por um fator de aproximadamente 3 em um i7 quad-core.

Aqui está o código: sinta-se à vontade para usá-lo e melhorá-lo, e informe quaisquer bugs.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
martin.preinfalk
fonte
Acabei de perceber que você precisa configurar suas matrizes de memória compartilhada antes de criar o pool de multiprocessamento, ainda não sei o porquê, mas definitivamente não funcionará ao contrário.
martin.preinfalk
o motivo é que o pool de multiprocessamento chama fork () quando o pool é instanciado; portanto, nada depois disso terá acesso ao ponteiro para qualquer mem compartilhado criado posteriormente.
Xiv
Quando tentei esse código no py35, recebi uma exceção no multiprocessing.sharedctypes.py, então acho que esse código é apenas para o py2.
Dr. Hillier Dániel
11

Este é o caso de uso pretendido para Ray , que é uma biblioteca para Python paralelo e distribuído. Sob o capô, ele serializa objetos usando o layout de dados Apache Arrow (que é um formato de cópia zero) e os armazena em um armazenamento de objetos de memória compartilhada para que possam ser acessados ​​por vários processos sem criar cópias.

O código seria semelhante ao seguinte.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Se você não ligar ray.put, o array ainda será armazenado na memória compartilhada, mas isso será feito uma vez por chamada func, o que não é o que você deseja.

Observe que isso funcionará não apenas para matrizes, mas também para objetos que contêm matrizes , por exemplo, dicionários que mapeiam entradas para matrizes como abaixo.

Você pode comparar o desempenho da serialização no Ray versus pickle executando o seguinte no IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

A serialização com Ray é apenas um pouco mais rápida que a pickle, mas a desserialização é 1000x mais rápida por causa do uso de memória compartilhada (esse número, é claro, depende do objeto).

Veja a documentação do Ray . Você pode ler mais sobre serialização rápida usando Ray e Arrow . Note que eu sou um dos desenvolvedores Ray.

Robert Nishihara
fonte
1
Ray parece bom! Mas, eu tentei usar essa biblioteca antes, mas infelizmente percebi que Ray não suporta janelas. Espero que vocês possam suportar o Windows o mais rápido possível. Obrigado, desenvolvedores!
Hzzkygcs
5

Como Robert Nishihara mencionou, o Apache Arrow facilita isso, especificamente com o armazenamento de objetos na memória do Plasma, no qual Ray é construído.

Fiz cérebro de plasma especificamente para esta razão - carregamento rápido e recarga de grandes objetos em um aplicativo Flask. É um namespace de objeto de memória compartilhada para objetos serializáveis ​​do Apache Arrow, incluindo pickle'd bytestrings geradas por pickle.dumps(...).

A principal diferença com o Apache Ray e o Plasma é que ele controla os IDs de objetos para você. Quaisquer processos, threads ou programas em execução localmente podem compartilhar os valores das variáveis ​​chamando o nome de qualquer Brainobjeto.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
russellthehippo
fonte