multiprocessamento: como faço para compartilhar um dicionário entre vários processos?

113

Um programa que cria vários processos que funcionam em uma fila que pode ser juntada, Q pode ser juntada e pode, eventualmente, manipular um dicionário global Dpara armazenar os resultados. (assim, cada processo filho pode usar Dpara armazenar seu resultado e também ver quais resultados os outros processos filho estão produzindo)

Se eu imprimir o dicionário D em um processo filho, vejo as modificações que foram feitas nele (ou seja, em D). Mas depois que o processo principal une Q, se eu imprimir D, é um dict vazio!

Eu entendo que é um problema de sincronização / bloqueio. Alguém pode me dizer o que está acontecendo aqui e como posso sincronizar o acesso ao D?

dop
fonte
1
Isso não funciona conforme o esperado, pelo menos no python 3.7.2 usando osx 10.14.4 Dict não está sincronizado e seu conteúdo é reescrito por outros processos. No entanto, <code> multiprocessing.Manager (). List () </code> funciona conforme o esperado.
Andrew Druchenko,

Respostas:

162

Uma resposta geral envolve o uso de um Managerobjeto. Adaptado dos documentos:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Resultado:

$ python mul.py 
{1: '111', '2': 6}
remetente
fonte
4
Obrigado, remetente. Na verdade, D = multiprocessing.Manager (). Dict () resolve meu problema. Eu estava usando D = dict ().
dop
3
@LorenzoBelli, se está perguntando se o acesso ao gerente está sincronizado, acredito que sim. multiprocessing.Manager()retorna uma instância deSyncManager , cujo nome sugere tanto!
remetente em
@senderle Eu quero compartilhar o estado aleatório numpy de um processo pai com um processo filho. Tentei usar, Managermas ainda não tive sorte. Você poderia dar uma olhada na minha pergunta aqui e ver se você pode oferecer uma solução? Ainda posso obter números aleatórios diferentes se fizer isso np.random.seed(None)toda vez que gerar um número aleatório, mas isso não me permite usar o estado aleatório do processo pai, que não é o que desejo. Qualquer ajuda é muito apreciada.
Amir
1
@RadioControlled feliz em escrever uma atualização, mas brevemente, embora eu não ache que você possa fazer isso acontecer diretamente, você pode facilmente criar um novo dicionário gerenciado com as mesmas chaves e valores e usá-lo em vez do original. Isso é adequado para o seu caso?
remetente
1
@senderle, foi o que acabei fazendo. Portanto, a resposta seria que você teria que fazer exatamente isso.
Rádio Controlado em
25

multiprocessamento não é como threading. Cada processo filho receberá uma cópia da memória do processo principal. Geralmente o estado é compartilhado por meio de comunicação (tubos / soquetes), sinais ou memória compartilhada.

O multiprocessamento disponibiliza algumas abstrações para o seu caso de uso - estado compartilhado que é tratado como local pelo uso de proxies ou memória compartilhada: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Seções relevantes:

Jeremy Brown
fonte
1
Muito obrigado. Você me levou à solução / a: multiprocessing.Manager (). Dict ().
dop
Alguém pode explicar o que significa a afirmação "Cada processo filho receberá uma cópia da memória do processo principal".
Itsme2003
@ Itsme2003 por padrão, um processo gerado não tem acesso à memória do processo pai (esta é uma das principais diferenças para threads). Portanto, quando um processo precisa de um objeto do processo pai, ele deve criar uma cópia dele (em vez de obter uma referência para o objeto real). A resposta acima explica como compartilhar objetos entre processos.
Niklas Mertsch
Porque isso é frequentemente errado: Contanto que você não modifique o objeto, pelo menos na configuração normal do Linux, o objeto só será realmente armazenado uma vez na memória. Ele será copiado assim que for alterado. Isso pode ser muito importante se você precisar economizar memória e não modificar o objeto.
Rádio Controlado em
16

Eu gostaria de compartilhar meu próprio trabalho, que é mais rápido que o dict do Manager e é mais simples e estável do que a biblioteca pyshmht que usa toneladas de memória e não funciona no Mac OS. Embora meu dict funcione apenas para strings simples e seja imutável atualmente. Eu uso a implementação de teste linear e armazeno chaves e pares de valores em um bloco de memória separado após a tabela.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

No meu laptop, os resultados de desempenho são:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

exemplo de uso simples:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')
Aliaxia
fonte
14
Github? Documentação? como podemos usar esta ferramenta?
Pavlos Panteliadis
10

Além de @senderle aqui, alguns também podem estar se perguntando como usar a funcionalidade do multiprocessing.Pool.

O bom é que existe um .Pool()método para a managerinstância que imita todas as APIs familiares de nível superior multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Resultado:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

Este é um exemplo ligeiramente diferente em que cada processo apenas registra seu ID de processo no DictProxyobjeto global d.

Brad Solomon
fonte
3

Talvez você possa tentar pyshmht , que compartilha uma extensão de tabela de hash baseada em memória para Python.

Aviso prévio

  1. Não foi totalmente testado, apenas para sua referência.

  2. Atualmente carece de mecanismos de bloqueio / sem para multiprocessamento.

felix021
fonte