Você pode reequilibrar um Spliterator desequilibrado de tamanho desconhecido?

12

Desejo usar um Streampara paralelizar o processamento de um conjunto heterogêneo de arquivos JSON armazenados remotamente, de número desconhecido (o número de arquivos não é conhecido antecipadamente). Os arquivos podem variar amplamente em tamanho, de 1 registro JSON por arquivo até 100.000 registros em alguns outros arquivos. Um registro JSON nesse caso significa um objeto JSON independente, representado como uma linha no arquivo.

Eu realmente quero usar o Streams para isso e, por isso, implementei isso Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

O problema que estou enfrentando é que, embora o Stream seja paralelamente bonito no início, eventualmente o maior arquivo fica sendo processado em um único thread. Acredito que a causa proximal está bem documentada: o spliterator é "desequilibrado".

Mais concretamente, parece que o trySplitmétodo não é chamado após um certo ponto do Stream.forEachciclo de vida do produto, portanto, a lógica extra para distribuir pequenos lotes no final do processo trySplitraramente é executada.

Observe como todos os separadores retornados do trySplit compartilham o mesmo pathsiterador. Eu pensei que essa era uma maneira realmente inteligente de equilibrar o trabalho entre todos os separadores, mas não foi suficiente para alcançar um paralelismo completo.

Gostaria que o processamento paralelo prosseguisse primeiro entre os arquivos e, quando ainda houver poucos arquivos grandes, eu quero fazer um paralelo entre os pedaços dos arquivos restantes. Essa foi a intenção do elsebloco no final de trySplit.

Existe uma maneira fácil / simples / canônica de contornar esse problema?

Alex R
fonte
2
Você precisa de uma estimativa de tamanho. Pode ser totalmente falso, desde que reflita aproximadamente a proporção da sua divisão desequilibrada. Caso contrário, o fluxo não saberá que as divisões estão desequilibradas e parará assim que um certo número de partes tiver sido criado.
Holger
@ Holger, você pode elaborar "irá parar quando um certo número de partes tiver sido criado" ou me indicar a fonte JDK para isso? Qual é o número de pedaços onde ele pára?
Alex R
O código é irrelevante, pois mostraria muitos detalhes de implementação irrelevantes, que poderiam mudar a qualquer momento. O ponto relevante é que a implementação tenta chamar a divisão com freqüência suficiente, para que cada segmento de trabalho (ajustado ao número de núcleos da CPU) tenha algo a fazer. Para compensar diferenças imprevisíveis no tempo de computação, provavelmente produzirá ainda mais pedaços do que os threads de trabalho para permitir o roubo do trabalho e usar os tamanhos estimados como heurísticos (por exemplo, para decidir qual subdivisor deve ser dividido). Veja também stackoverflow.com/a/48174508/2711488
Holger
Fiz algumas experiências para tentar entender o seu comentário. As heurísticas parecem ser bastante primitivas. Parece que o retorno Long.MAX_VALUEcausa uma divisão excessiva e desnecessária, enquanto qualquer estimativa que não seja a Long.MAX_VALUEcausa de uma interrupção adicional, mata o paralelismo. Retornar uma mistura de estimativas precisas não parece levar a otimizações inteligentes.
Alex R
Não estou afirmando que a estratégia da implementação foi muito inteligente, mas pelo menos funciona para alguns cenários com tamanhos estimados (caso contrário, havia muito mais relatórios de erros sobre isso). Parece que houve alguns erros do seu lado durante os experimentos. Por exemplo, no código da sua pergunta, você está estendendo, AbstractSpliteratormas substituindo, o trySplit()que é uma combinação ruim para algo diferente Long.MAX_VALUE, pois não está adaptando a estimativa de tamanho trySplit(). Depois trySplit(), a estimativa de tamanho deve ser reduzida pelo número de elementos que foram divididos.
Holger

Respostas:

0

Você trySplitdeve gerar divisões de tamanho igual, independentemente do tamanho dos arquivos subjacentes. Você deve tratar todos os arquivos como uma única unidade e preencher o ArrayListspliterator com o mesmo número de objetos JSON a cada vez. O número de objetos deve ser tal que o processamento de uma divisão leve entre 1 e 10 milissegundos: menor que 1 ms e você começa a se aproximar dos custos de entrega do lote para um encadeamento de trabalho, maior que isso e começa a arriscar a carga desigual da CPU devido a tarefas com granulação muito grossa.

O spliterator não é obrigado a relatar uma estimativa de tamanho e você já está fazendo isso corretamente: sua estimativa é Long.MAX_VALUE, que é um valor especial que significa "ilimitado". No entanto, se você tiver muitos arquivos com um único objeto JSON, resultando em lotes de tamanho 1, isso prejudicará seu desempenho de duas maneiras: a sobrecarga de abrir-ler-fechar o arquivo pode se tornar um gargalo e, se você conseguir escapar isso, o custo da transferência do encadeamento pode ser significativo comparado ao custo do processamento de um item, causando novamente um gargalo.

Há cinco anos, eu estava resolvendo um problema semelhante, você pode dar uma olhada na minha solução .

Marko Topolnik
fonte
Sim, você "não é obrigado a relatar uma estimativa de tamanho" e Long.MAX_VALUEestá descrevendo corretamente um tamanho desconhecido, mas isso não ajuda quando a implementação real do Stream apresenta um desempenho ruim. Mesmo usando o resultado do ThreadLocalRandom.current().nextInt(100, 100_000)tamanho estimado, obtém melhores resultados.
21919 Holger
Ele teve um bom desempenho nos meus casos de uso, onde o custo computacional de cada item foi substancial. Eu estava conseguindo facilmente 98% do uso total da CPU e taxa de transferência escalada quase linearmente com paralelismo. Basicamente, é importante ajustar o tamanho do lote para que o processamento demore entre 1 e 10 milissegundos. Isso está muito acima dos custos de transferência de encadeamentos e não é muito longo para causar problemas de granularidade de tarefas. Publiquei resultados de benchmark no final deste post .
Marko Topolnik 29/10/19
Sua solução separa uma ArraySpliteratorque tem um tamanho estimado (mesmo um tamanho exato). Portanto, a implementação do Stream verá o tamanho do array vs Long.MAX_VALUE, considere isso desequilibrado e divida o separador "maior" (ignorando o que Long.MAX_VALUEsignifica "desconhecido"), até que não possa ser dividido mais. Então, se não houver pedaços suficientes, ele dividirá os separadores baseados em matriz, utilizando seus tamanhos conhecidos. Sim, isso funciona muito bem, mas não contradiz minha afirmação de que você precisa de uma estimativa de tamanho, independentemente de quão ruim seja.
Holger
OK, então parece ser um mal-entendido --- porque você não precisa de uma estimativa de tamanho na entrada. Apenas nas divisões individuais, e você sempre pode ter isso.
Marko Topolnik 29/10/19
Bem, meu primeiro comentário foi " Você precisa de uma estimativa de tamanho. Pode ser totalmente falso, desde que reflita aproximadamente a proporção da sua divisão desequilibrada " . O ponto principal aqui foi que o código do OP cria outro separador que contém um único elemento, mas ainda informando um tamanho desconhecido. É isso que torna a implementação do Stream desamparada. Qualquer número estimado para o novo spliterator sendo significativamente menor do que Long.MAX_VALUEseria.
21919 Holger
0

Após muita experimentação, ainda não consegui obter nenhum paralelismo adicional ao brincar com as estimativas de tamanho. Basicamente, qualquer valor que não Long.MAX_VALUEseja o de tender a fazer com que o spliterator termine muito cedo (e sem divisão), enquanto, por outro lado, uma Long.MAX_VALUEestimativa fará com trySplitque seja chamado incansavelmente até retornar null.

A solução que encontrei é compartilhar internamente recursos entre os separadores e deixá-los se reequilibrar.

Código de trabalho:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
Alex R
fonte