Como posso tornar uma construção universal mais eficiente?

16

Uma "construção universal" é uma classe de wrapper para um objeto seqüencial que permite que ele seja linearizado (uma forte condição de consistência para objetos simultâneos). Por exemplo, aqui está uma construção sem espera adaptada, em Java, a partir de [1], que pressupõe a existência de uma fila sem espera que satisfaça a interface WFQ(que requer apenas consenso único entre os segmentos) e assume uma Sequentialinterface:

public interface WFQ<T> // "FIFO" iteration
{
    int enqueue(T t); // returns the sequence number of t
    Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
    // Apply an invocation (method + arguments)
    // and get a response (return value + state)
    Response apply(Invocation i); 
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}

public class SlowUniversal implements Universal
{
    Factory<? extends Sequential> generator;
    WFQ<Invocation> wfq = new WFQ<Invocation>();
    Universal(Factory<? extends Sequential> g) { generator = g; } 
    public Response apply(Invocation i)
    {
        int max = wfq.enqueue(i);
        Sequential s = generator.generate();
        for(Invocation invoc : wfq.iterateUntil(max))
            s.apply(invoc);
        return s.apply(i);
    }
}

Essa implementação não é muito satisfatória, pois é muito lenta (você se lembra de todas as invocações e precisa repeti-las a cada aplicação - temos tempo de execução linear no tamanho do histórico). Existe alguma maneira de estender as interfaces WFQe Sequential(de maneira razoável) para permitir salvar algumas etapas ao aplicar uma nova chamada?

Podemos tornar isso mais eficiente (tempo de execução não linear no tamanho do histórico, de preferência o uso da memória também diminui) sem perder a propriedade de espera?

Esclarecimento

Uma "construção universal" é um termo que eu tenho certeza que foi composto por [1], que aceita um objeto inseguro, mas compatível com o thread, que é generalizado pela Sequentialinterface. Usando uma fila sem espera, a primeira construção oferece uma versão linearizável do objeto , segura para threads, também sem espera (isso pressupõe applyoperações de determinismo e parada ).

Isso é ineficiente, pois o método é efetivamente fazer com que cada encadeamento local inicie a partir de um slate limpo e aplique todas as operações já registradas a ele. Em qualquer caso, isso funciona porque atinge a sincronização de forma eficaz usando a WFQdeterminar a ordem em que todas as operações devem ser aplicadas: cada chamada segmento applyvai ver o mesmo local, Sequentialobjeto, com a mesma sequência de Invocations aplicadas a ele.

Minha pergunta é se podemos (por exemplo) introduzir um processo de limpeza em segundo plano que atualize o "estado inicial" para que não tenhamos que reiniciar do zero. Isso não é tão simples quanto ter um ponteiro atômico com um ponteiro inicial - esses tipos de abordagens perdem facilmente a garantia sem espera. Minha suspeita é que alguma outra abordagem baseada em fila possa funcionar aqui.

Jargão:

  1. sem espera - independentemente do número de threads ou da tomada de decisão do agendador, applyterminará em um número comprovadamente limitado de instruções executadas para esse thread.
  2. livre de bloqueio - o mesmo que acima, mas admite a possibilidade de um tempo de execução ilimitado, apenas no caso de um número ilimitado de applyoperações estar sendo realizado em outros threads. Normalmente, os esquemas de sincronização otimistas se enquadram nessa categoria.
  3. bloqueio - eficiência à mercê do planejador.

Um exemplo de trabalho, conforme solicitado (agora em uma página que não expira)

[1] Herlihy e Shavit, a arte da programação de multiprocessadores .

VF1
fonte
A pergunta 1 só é responsável se soubermos o que "funciona" significa para você.
Robert Harvey
@RobertHarvey Eu o corrigi - tudo o que precisa "funcionar" é que o wrapper fique sem espera e todas as operações CopyableSequentialsejam válidas - a linearizabilidade deve seguir o fato de que é Sequential.
VF1
Há muitas palavras significativas nesta pergunta, mas estou lutando para reuni-las para entender exatamente o que você está tentando realizar. Você pode fornecer uma explicação sobre qual problema está tentando resolver e talvez diminuir um pouco o jargão?
21716 JimmyJames
@JimmyJames Eu elaborei um "comentário estendido" dentro da questão. Por favor, deixe-me saber se há algum outro jargão para esclarecer.
VF1 26/10
no primeiro parágrafo do comentário, você diz "objeto não seguro, mas compatível com thread" e "versão linearizável do objeto". Não está claro o que você quer dizer com isso, porque thread-safe e linearizable são realmente relevantes apenas para instruções executáveis, mas você as está usando para descrever objetos, que são dados. Presumo que a invocação (que não está definida) seja efetivamente um ponteiro de método e esse método não é seguro para threads. Não sei o que significa compatível com threads .
21716 JimmyJames

Respostas:

1

Aqui está uma explicação e exemplo de como isso é realizado. Deixe-me saber se há peças que não estão claras.

Gist com fonte

Universal

Inicialização:

Os índices de encadeamento são aplicados de maneira atomicamente incrementada. Isso é gerenciado usando um AtomicIntegernome nextIndex. Esses índices são atribuídos aos threads por meio de uma ThreadLocalinstância que se inicializa, obtendo o próximo índice nextIndexe incrementando-o. Isso acontece na primeira vez que o índice de cada thread é recuperado na primeira vez. A ThreadLocalé criado para rastrear a última sequência criada por este encadeamento. É inicializado 0. A referência seqüencial do objeto de fábrica é passada e armazenada. Duas AtomicReferenceArrayinstâncias são criadas de tamanho n. O objeto final é atribuído a cada referência, tendo sido inicializado com o estado inicial fornecido pela Sequentialfábrica. né o número máximo de threads permitido. Cada elemento nessas matrizes 'pertence' ao índice de encadeamento correspondente.

Aplicar método:

Este é o método que faz o trabalho interessante. Faz o seguinte:

  • Crie um novo nó para esta chamada: mine
  • Defina esse novo nó na matriz de anúncios no índice do encadeamento atual

Em seguida, o loop de seqüenciamento começa. Ele continuará até que a chamada atual tenha sido sequenciada:

  1. encontre um nó na matriz de anúncios usando a sequência do último nó criado por este encadeamento. Mais sobre isso mais tarde.
  2. se um nó for encontrado na etapa 2, ele ainda não foi sequenciado, continue com ele; caso contrário, concentre-se apenas na chamada atual. Isso tentará apenas ajudar um outro nó por chamada.
  3. Qualquer que seja o nó selecionado na etapa 3, continue tentando sequenciá-lo após o último nó sequenciado (outros encadeamentos podem interferir.) Independentemente do sucesso, defina a referência principal do encadeamento atual para a sequência retornada por decideNext()

A chave para o loop aninhado descrito acima é o decideNext()método Para entender isso, precisamos olhar para a classe Node.

Classe de nó

Esta classe especifica nós em uma lista duplamente vinculada. Não há muita ação nesta classe. A maioria dos métodos são métodos de recuperação simples que devem ser bastante auto-explicativos.

método da cauda

isso retorna uma instância de nó especial com uma sequência de 0. Ele simplesmente atua como um marcador de posição até que uma chamada a substitua.

Propriedades e inicialização

  • seq: o número de sequência, inicializado em -1 (significando sem sequência)
  • invocation: o valor da chamada de apply(). Situado na construção.
  • next: AtomicReferencepara o link direto. uma vez atribuído, isso nunca será alterado
  • previous: AtomicReferencepara o link reverso atribuído após o seqüenciamento e limpo portruncate()

Decidir Próximo

Este método é apenas um no Nó com lógica não trivial. Em poucas palavras, um nó é oferecido como candidato para ser o próximo nó na lista vinculada. O compareAndSet()método verificará se sua referência é nula e, em caso afirmativo, defina a referência para o candidato. Se a referência já estiver definida, não fará nada. Esta operação é atômica, portanto, se dois candidatos forem oferecidos no mesmo momento, apenas um será selecionado. Isso garante que apenas um nó será selecionado como o próximo. Se o nó candidato estiver selecionado, sua sequência será configurada para o próximo valor e o link anterior será definido para este nó.

Voltando à classe Universal, aplique o método ...

Depois de chamar decideNext()o último nó seqüenciado (quando marcado) com nosso nó ou um nó da announcematriz, há duas ocorrências possíveis: 1. O nó foi sequenciado com êxito 2. Algum outro encadeamento antecipou esse encadeamento.

A próxima etapa é verificar se o nó foi criado para esta chamada. Isso pode acontecer porque esse encadeamento o sequenciou com êxito ou algum outro encadeamento o pegou da announcematriz e o sequenciou para nós. Se não foi sequenciado, o processo é repetido. Caso contrário, a chamada será concluída limpando a matriz de anúncios no índice desse encadeamento e retornando o valor resultante da chamada. A matriz de anúncio é limpa para garantir que não haja referências ao nó existente que impeçam a coleta de lixo do nó e, portanto, mantenha todos os nós na lista vinculada a partir desse ponto em tempo real no heap.

Avaliar método

Agora que o nó da chamada foi sequenciado com êxito, a chamada precisa ser avaliada. Para fazer isso, o primeiro passo é garantir que as chamadas anteriores a esta tenham sido avaliadas. Se eles não tiverem esse segmento, não esperarão, mas farão esse trabalho imediatamente.

Método GuarantePrior

O ensurePrior()método faz esse trabalho verificando o nó anterior na lista vinculada. Se o estado não estiver definido, o nó anterior será avaliado. Nó que é recursivo. Se o nó anterior ao nó anterior não tiver sido avaliado, ele chamará a avaliação desse nó e assim por diante.

Agora que o nó anterior é conhecido por ter um estado, podemos avaliar esse nó. O último nó é recuperado e designado a uma variável local. Se essa referência for nula, significa que algum outro encadeamento antecipou esse e já avaliou esse nó; definindo seu estado. Caso contrário, o estado do nó anterior é passado para o Sequentialmétodo de aplicação do objeto, juntamente com a invocação desse nó. O estado retornado é definido no nó e o truncate()método é chamado, limpando o link para trás do nó, pois ele não é mais necessário.

Método MoveForward

O método avançar tentará mover todas as referências principais para este nó se elas ainda não estiverem apontando para algo mais adiante. Isso é para garantir que, se um encadeamento parar de chamar, seu cabeçalho não reterá uma referência a um nó que não é mais necessário. O compareAndSet()método garantirá que apenas atualizemos o nó se algum outro encadeamento não o tiver alterado desde que foi recuperado.

Anunciar matriz e ajudar

A chave para tornar essa abordagem livre de espera, em vez de simplesmente sem bloqueio, é que não podemos assumir que o agendador de encadeamentos dará prioridade a cada encadeamento quando necessário. Se cada thread simplesmente tentar sequenciar seus próprios nós, é possível que um thread possa ser continuamente esvaziado sob carga. Para explicar essa possibilidade, cada thread tentará primeiro 'ajudar' outros threads que talvez não possam ser sequenciados.

A idéia básica é que, conforme cada thread cria nós com êxito, as sequências atribuídas aumentam monotonicamente. Se um segmento ou segmentos estão antecipando continuamente outro segmento, o índice usado para encontrar nós não sequenciados na announcematriz avançará. Mesmo que todos os encadeamentos que atualmente tentam sequenciar um determinado nó sejam continuamente esvaziados por outro encadeamento, eventualmente todos os encadeamentos tentarão sequenciar esse nó. Para ilustrar, construiremos um exemplo com três threads.

No ponto de partida, a cabeça dos três threads e os elementos de anúncio estão apontados para o tailnó. O lastSequencepara cada thread é 0.

Neste ponto, o Thread 1 é executado com uma invocação. Ele verifica a matriz de anunciantes quanto à sua última sequência (zero), que é o nó que está atualmente programado para indexar. Sequencia o nó e lastSequenceestá definido como 1.

O segmento 2 agora é executado com uma invocação, verifica a matriz de anúncios na sua última sequência (zero) e vê que não precisa de ajuda e, portanto, tenta sequenciar sua invocação. É bem-sucedido e agora lastSequenceestá definido como 2.

O segmento 3 agora é executado e também vê que o nó em announce[0]já está sequenciado e sequencia sua própria invocação. Agora lastSequenceestá definido como 3.

Agora o segmento 1 é chamado novamente. Ele verifica a matriz de anúncios no índice 1 e descobre que ela já está sequenciada. Simultaneamente, o Thread 2 é chamado. Ele verifica a matriz de anúncios no índice 2 e descobre que ela já está sequenciada. Ambos Thread 1 e Thread 2 agora tentar sequenciar seus próprios nós. O segmento 2 vence e sequencia sua invocação. ElelastSequence definido como 4. Enquanto isso, o segmento três foi chamado. Ele verifica o índice lastSequence(mod 3) e descobre que o nó em announce[0]não foi sequenciado. O segmento 2 é novamente chamado ao mesmo tempo que o segmento 1 está na segunda tentativa. Tópico 1localiza uma chamada não sequencial na announce[1]qual é o nó recém-criado pelo Thread 2 . Ele tenta sequenciar a chamada do Thread 2 e obtém êxito. O segmento 2 encontra seu próprio nó announce[1]e foi sequenciado. É definido lastSequencecomo 5. O thread 3 é chamado e descobre que o nó no qual o thread 1 colocado announce[0]ainda não é sequenciado e tenta fazer isso. Enquanto isso, o Thread 2 também foi chamado e pré-impõe o Thread 3. Sequencia seu nó e o define lastSequencepara 6.

Pobre Thread 1 . Mesmo que o Thread 3 esteja tentando sequenciá-lo, ambos os threads foram continuamente impedidos pelo agendador. Mas neste momento. Tópico 2 também está agora apontando para announce[0](6 mod 3). Todos os três encadeamentos estão configurados para tentar sequenciar a mesma chamada. Não importa qual thread seja bem-sucedido, o próximo nó a ser sequenciado será a chamada em espera do Thread 1, ou seja, o nó referenciado por announce[0].

Isso é inevitável. Para que os encadeamentos sejam antecipados, outros encadeamentos devem ser nós de seqüenciamento e, ao fazê-lo, eles moverão continuamente seuslastSequence frente. Se o nó de um determinado encadeamento não for continuamente sequenciado, eventualmente todos os encadeamentos apontarão para seu índice na matriz de anúncios. Nenhum encadeamento fará mais nada até que o nó que está tentando ajudar seja sequenciado, o pior cenário é que todos os encadeamentos estejam apontando para o mesmo nó não sequenciado. Portanto, o tempo necessário para sequenciar qualquer chamada é uma função do número de threads e não do tamanho da entrada.

JimmyJames
fonte
Você se importaria de colocar alguns trechos de código no pastebin? Muitas coisas (como a lista vinculada sem bloqueio) podem ser simplesmente declaradas como tal? É um pouco difícil entender sua resposta como um todo quando há tantos detalhes. De qualquer forma, isso parece promissor, eu certamente gostaria de descobrir quais garantias ele oferece.
VF1 28/10
Isso certamente parece uma implementação livre de bloqueio válida, mas está faltando a questão fundamental que me preocupa. O requisito de linearizabilidade requer a presença de um "histórico válido", que, no caso da implementação de lista vinculada, precisa de um ponteiro previouse nextpara ser válido. Manter e criar um histórico válido de maneira sem espera parece difícil.
VF1 #
@ VF1 Não sei ao certo qual problema não foi resolvido. Tudo o que você mencionou no restante do comentário é abordado no exemplo que dei, pelo que posso dizer.
JimmyJames
Você desistiu da propriedade sem espera .
VF1 #
@ VF1 Como você figura?
JimmyJames
0

Minha resposta anterior realmente não responde à pergunta corretamente, mas como o OP a considera útil, vou deixar como está. Com base no código no link da pergunta, aqui está minha tentativa. Fiz apenas testes realmente básicos sobre isso, mas parece calcular as médias corretamente. Comentários bem-vindos quanto a se isso é adequadamente sem espera.

OBSERVAÇÃO : removi a interface Universal e a tornei uma classe. Ter o Universal sendo composto de Sequenciais, além de ser um, parece uma complicação desnecessária, mas posso estar perdendo alguma coisa. Na classe média, marquei a variável de estado comovolatile . Isso não é necessário para fazer o código funcionar. Ser conservador (uma boa idéia com o encadeamento) e impedir que cada thread faça todos os cálculos (uma vez).

Sequencial e Fábrica

public interface Sequential<E, S, R>
{ 
  R apply(S priorState);

  S state();

  default boolean isApplied()
  {
    return state() != null;
  }
}

public interface Factory<E, S, R>
{
   S initial();

   Sequential<E, S, R> generate(E input);
}

Universal

import java.util.concurrent.ConcurrentLinkedQueue;

public class Universal<I, S, R> 
{
  private final Factory<I, S, R> generator;
  private final ConcurrentLinkedQueue<Sequential<I, S, R>> wfq = new ConcurrentLinkedQueue<>();
  private final ThreadLocal<Sequential<I, S, R>> last = new ThreadLocal<>();

  public Universal(Factory<I, S, R> g)
  { 
    generator = g;
  }

  public R apply(I invocation)
  {
    Sequential<I, S, R> newSequential = generator.generate(invocation);
    wfq.add(newSequential);

    Sequential<I, S, R> last = null;
    S prior = generator.initial(); 

    for (Sequential<I, S, R> i : wfq) {
      if (!i.isApplied() || newSequential == i) {
        R r = i.apply(prior);

        if (i == newSequential) {
          wfq.remove(last.get());
          last.set(newSequential);

          return r;
        }
      }

      prior = i.state();
    }

    throw new IllegalStateException("Houston, we have a problem");
  }
}

Média

public class Average implements Sequential<Integer, Average.State, Double>
{
  private final Integer invocation;
  private volatile State state;

  private Average(Integer invocation)
  {
    this.invocation = invocation;
  }

  @Override
  public Double apply(State prior)
  {
    System.out.println(Thread.currentThread() + " " + invocation + " prior " + prior);

    state = prior.add(invocation);

    return ((double) state.sum)/ state.count;
  }

  @Override
  public State state()
  {
    return state;
  }

  public static class AverageFactory implements Factory<Integer, State, Double> 
  {
    @Override
    public State initial()
    {
      return new State(0, 0);
    }

    @Override
    public Average generate(Integer i)
    {
      return new Average(i);
    }
  }

  public static class State
  {
    private final int sum;
    private final int count;

    private State(int sum, int count)
    {
      this.sum = sum;
      this.count = count;
    }

    State add(int value)
    {
      return new State(sum + value, count + 1);
    }

    @Override
    public String toString()
    {
      return sum + " / " + count;
    }
  }
}

Código de demonstração

private static final int THREADS = 10;
private static final int SIZE = 50;

public static void main(String... args)
{
  Average.AverageFactory factory = new Average.AverageFactory();

  Universal<Integer, Average.State, Double> universal = new Universal<>(factory);

  for (int i = 0; i < THREADS; i++)
  {
    new Thread(new Test(i * SIZE, universal)).start();
  }
}

static class Test implements Runnable
{
  final int start;
  final Universal<Integer, Average.State, Double> universal;

  Test(int start, Universal<Integer, Average.State, Double> universal)
  {
    this.start = start;
    this.universal = universal;
  }

  @Override
  public void run()
  {
    for (int i = start; i < start + SIZE; i++)
    {
      System.out.println(Thread.currentThread() + " " + i);

      System.out.println(System.nanoTime() + " " + Thread.currentThread() + " " + i + " result " + universal.apply(i));
    }
  }
}

Fiz algumas edições no código ao publicá-lo aqui. Deve estar tudo bem, mas deixe-me saber se você tiver problemas com isso.

JimmyJames
fonte
Você não precisa manter sua outra resposta em aberto (atualizei minha pergunta anteriormente para tirar conclusões relevantes). Infelizmente, essa resposta também não responde à pergunta, já que na verdade não libera nenhuma memória da memória wfq, portanto você ainda precisa percorrer toda a história - o tempo de execução não melhorou, exceto por um fator constante.
VF1
@ Vf1 O tempo necessário para percorrer a lista inteira para verificar se foi calculado será minúsculo em comparação com a realização de cada cálculo. Como os estados anteriores não são necessários, deve ser possível remover os estados iniciais. O teste é difícil e pode exigir o uso de uma coleção personalizada, mas adicionei uma pequena alteração.
precisa saber é o seguinte
@ VF1 Atualizado para uma implementação que parece funcionar com testes básicos. Não tenho certeza de que seja seguro, mas, no topo da minha cabeça, se o universal estivesse ciente dos threads que estão trabalhando com ele, ele poderia acompanhar cada thread e remover elementos quando todos os threads passarem por eles com segurança.
21416 JimmyJames #
@ VF1 Analisando o código para ConcurrentLinkedQueue, o método de oferta possui um loop muito parecido com o que você afirmou que fez com que a outra resposta fosse sem espera. Procure o comentário "Lost CAS race to
other
"Deve ser possível remover os estados iniciais" - exatamente. Ele deve ser , mas a sua fácil introduzir sutilmente código que perde espera liberdade. Um esquema de rastreamento de threads pode funcionar. Por fim, não tenho acesso à fonte CLQ, você se importaria de vincular?
VF1