Desejo usar um Stream
para paralelizar o processamento de um conjunto heterogêneo de arquivos JSON armazenados remotamente, de número desconhecido (o número de arquivos não é conhecido antecipadamente). Os arquivos podem variar amplamente em tamanho, de 1 registro JSON por arquivo até 100.000 registros em alguns outros arquivos. Um registro JSON nesse caso significa um objeto JSON independente, representado como uma linha no arquivo.
Eu realmente quero usar o Streams para isso e, por isso, implementei isso Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
O problema que estou enfrentando é que, embora o Stream seja paralelamente bonito no início, eventualmente o maior arquivo fica sendo processado em um único thread. Acredito que a causa proximal está bem documentada: o spliterator é "desequilibrado".
Mais concretamente, parece que o trySplit
método não é chamado após um certo ponto do Stream.forEach
ciclo de vida do produto, portanto, a lógica extra para distribuir pequenos lotes no final do processo trySplit
raramente é executada.
Observe como todos os separadores retornados do trySplit compartilham o mesmo paths
iterador. Eu pensei que essa era uma maneira realmente inteligente de equilibrar o trabalho entre todos os separadores, mas não foi suficiente para alcançar um paralelismo completo.
Gostaria que o processamento paralelo prosseguisse primeiro entre os arquivos e, quando ainda houver poucos arquivos grandes, eu quero fazer um paralelo entre os pedaços dos arquivos restantes. Essa foi a intenção do else
bloco no final de trySplit
.
Existe uma maneira fácil / simples / canônica de contornar esse problema?
fonte
Long.MAX_VALUE
causa uma divisão excessiva e desnecessária, enquanto qualquer estimativa que não seja aLong.MAX_VALUE
causa de uma interrupção adicional, mata o paralelismo. Retornar uma mistura de estimativas precisas não parece levar a otimizações inteligentes.AbstractSpliterator
mas substituindo, otrySplit()
que é uma combinação ruim para algo diferenteLong.MAX_VALUE
, pois não está adaptando a estimativa de tamanhotrySplit()
. DepoistrySplit()
, a estimativa de tamanho deve ser reduzida pelo número de elementos que foram divididos.Respostas:
Você
trySplit
deve gerar divisões de tamanho igual, independentemente do tamanho dos arquivos subjacentes. Você deve tratar todos os arquivos como uma única unidade e preencher oArrayList
spliterator com o mesmo número de objetos JSON a cada vez. O número de objetos deve ser tal que o processamento de uma divisão leve entre 1 e 10 milissegundos: menor que 1 ms e você começa a se aproximar dos custos de entrega do lote para um encadeamento de trabalho, maior que isso e começa a arriscar a carga desigual da CPU devido a tarefas com granulação muito grossa.O spliterator não é obrigado a relatar uma estimativa de tamanho e você já está fazendo isso corretamente: sua estimativa é
Long.MAX_VALUE
, que é um valor especial que significa "ilimitado". No entanto, se você tiver muitos arquivos com um único objeto JSON, resultando em lotes de tamanho 1, isso prejudicará seu desempenho de duas maneiras: a sobrecarga de abrir-ler-fechar o arquivo pode se tornar um gargalo e, se você conseguir escapar isso, o custo da transferência do encadeamento pode ser significativo comparado ao custo do processamento de um item, causando novamente um gargalo.Há cinco anos, eu estava resolvendo um problema semelhante, você pode dar uma olhada na minha solução .
fonte
Long.MAX_VALUE
está descrevendo corretamente um tamanho desconhecido, mas isso não ajuda quando a implementação real do Stream apresenta um desempenho ruim. Mesmo usando o resultado doThreadLocalRandom.current().nextInt(100, 100_000)
tamanho estimado, obtém melhores resultados.ArraySpliterator
que tem um tamanho estimado (mesmo um tamanho exato). Portanto, a implementação do Stream verá o tamanho do array vsLong.MAX_VALUE
, considere isso desequilibrado e divida o separador "maior" (ignorando o queLong.MAX_VALUE
significa "desconhecido"), até que não possa ser dividido mais. Então, se não houver pedaços suficientes, ele dividirá os separadores baseados em matriz, utilizando seus tamanhos conhecidos. Sim, isso funciona muito bem, mas não contradiz minha afirmação de que você precisa de uma estimativa de tamanho, independentemente de quão ruim seja.Long.MAX_VALUE
seria.Após muita experimentação, ainda não consegui obter nenhum paralelismo adicional ao brincar com as estimativas de tamanho. Basicamente, qualquer valor que não
Long.MAX_VALUE
seja o de tender a fazer com que o spliterator termine muito cedo (e sem divisão), enquanto, por outro lado, umaLong.MAX_VALUE
estimativa fará comtrySplit
que seja chamado incansavelmente até retornarnull
.A solução que encontrei é compartilhar internamente recursos entre os separadores e deixá-los se reequilibrar.
Código de trabalho:
fonte