Tenho um grande arquivo que contém uma lista de itens.
Eu gostaria de criar um lote de itens, fazer uma solicitação HTTP com esse lote (todos os itens são necessários como parâmetros na solicitação HTTP). Posso fazer isso facilmente com um for
loop, mas como amante do Java 8, quero tentar escrever isso com a estrutura Stream do Java 8 (e colher os benefícios do processamento lento).
Exemplo:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Eu quero fazer algo ao longo da linha de
lazyFileStream.group(500).map(processBatch).collect(toList())
Qual seria a melhor forma de fazer isso?
java
java-8
batch-processing
java-stream
Andy Dang
fonte
fonte
flatMap
(+ um flatMap adicional para recolher os fluxos novamente)? Não acho que algo assim exista como um método conveniente na biblioteca padrão. Você terá que encontrar uma biblioteca de terceiros ou escrever a sua própria baseada em divisores e / ou um coletor emitindo um fluxo de fluxosStream.generate
comreader::readLine
elimit
, mas o problema é que os fluxos não vão bem com exceções. Além disso, isso provavelmente não é bem paralelizável. Acho que ofor
loop ainda é a melhor opção.Respostas:
Nota! Esta solução lê todo o arquivo antes de executar o forEach.
Você poderia fazer isso com jOOλ , uma biblioteca que estende fluxos Java 8 para casos de uso de fluxo sequencial de thread único:
Nos bastidores,
zipWithIndex()
é apenas:... enquanto a
groupBy()
API é conveniente para:(Isenção de responsabilidade: eu trabalho para a empresa por trás da jOOλ)
fonte
Map
(ao contrário, por exemplo, da solução Ben Manes)Para completar, aqui está uma solução de Guava .
Na questão, a coleção está disponível, portanto, um fluxo não é necessário e pode ser escrito como,
fonte
Lists.partition
é outra variação que eu deveria ter mencionado.Stream
para a memória antes de processar o lote relevantebatchSize
elementos por iteração.A implementação pura do Java-8 também é possível:
Observe que, ao contrário do JOOl, ele pode funcionar bem em paralelo (desde que
data
seja uma lista de acesso aleatório).fonte
List
(vejadata.size()
,data.get()
na pergunta). Estou respondendo à pergunta feita. Se você tiver outra pergunta, faça-a (embora eu ache que a pergunta do stream também já foi feita).Solução Java 8 pura :
Podemos criar um coletor personalizado para fazer isso com elegância, que leva um
batch size
e umConsumer
para processar cada lote:Opcionalmente, crie uma classe de utilitário auxiliar:
Exemplo de uso:
Eu postei meu código no GitHub também, se alguém quiser dar uma olhada:
Link para Github
fonte
Escrevi um Spliterator personalizado para cenários como este. Ele preencherá listas de um determinado tamanho do Fluxo de entrada. A vantagem dessa abordagem é que ela executará processamento lento e funcionará com outras funções de fluxo.
fonte
SUBSIZED
as divisões retornadas,trySplit
podem ter mais itens do que antes da divisão (se a divisão acontecer no meio do lote).Spliterators
está correto, entãotrySplit
sempre devo particionar os dados em duas partes aproximadamente iguais para que o resultado nunca seja maior que o original?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Tínhamos um problema semelhante para resolver. Queríamos pegar um stream que fosse maior do que a memória do sistema (iterando por todos os objetos em um banco de dados) e randomizar a ordem da melhor maneira possível - pensamos que não haveria problema em armazenar 10.000 itens em buffer e randomizá-los.
O alvo era uma função que incorporava um fluxo.
Das soluções propostas aqui, parece haver uma gama de opções:
Nosso instinto era originalmente usar um coletor personalizado, mas isso significava sair do streaming. A solução de coletor personalizado acima é muito boa e quase a usamos.
Aqui está uma solução que engana, usando o fato de que
Stream
s pode fornecer umIterator
que você pode usar como uma saída de emergência para permitir que você faça algo extra que os fluxos não suportam. OIterator
é convertido de volta para um fluxo usando outro bit deStreamSupport
feitiçaria Java 8 .Um exemplo simples de uso seria assim:
As estampas acima
Para nosso caso de uso, queríamos embaralhar os lotes e depois mantê-los como um fluxo - parecia com isto:
Isso produz algo como (é aleatório, tão diferente a cada vez)
O segredo aqui é que sempre há um fluxo, então você pode operar em um fluxo de lotes ou fazer alguma coisa com cada lote e depois
flatMap
voltar para um fluxo. Mesmo melhor, todos os itens acima só funciona como a finalforEach
oucollect
ou outras expressões de terminação PULL os dados através da corrente.Acontece que
iterator
é um tipo especial de operação de término em um fluxo e não faz com que todo o fluxo seja executado e entre na memória! Obrigado ao pessoal do Java 8 pelo design brilhante!fonte
List
- você não pode adiar a iteração dos elementos dentro do lote porque o consumidor pode querer pular um lote inteiro, e se você não consumiu o elementos então eles não estariam pulando muito longe. (Eu implementei um deles em C #, embora fosse substancialmente mais fácil.)Você também pode usar RxJava :
ou
ou
fonte
Você também pode dar uma olhada na reação de ciclope , eu sou o autor desta biblioteca. Ele implementa a interface jOOλ (e, por extensão, JDK 8 Streams), mas ao contrário do JDK 8 Parallel Streams, ele se concentra em operações assíncronas (como potencialmente bloquear chamadas de E / S assíncronas). JDK Parallel Streams, por contraste, concentra-se no paralelismo de dados para operações vinculadas à CPU. Ele funciona gerenciando agregados de tarefas baseadas no futuro, mas apresenta uma API de fluxo estendida padrão para os usuários finais.
Este código de amostra pode ajudá-lo a começar
Há um tutorial sobre batching aqui
E um tutorial mais geral aqui
Para usar seu próprio Thread Pool (que provavelmente é mais apropriado para bloquear I / O), você pode iniciar o processamento com
fonte
Exemplo puro de Java 8 que também funciona com fluxos paralelos.
Como usar:
A declaração e implementação do método:
fonte
Com toda a justiça, dê uma olhada na elegante solução Vavr :
fonte
Exemplo simples usando Spliterator
A resposta de Bruce é mais abrangente, mas eu estava procurando por algo rápido e sujo para processar vários arquivos.
fonte
esta é uma solução Java pura que é avaliada lentamente.
fonte
Você pode usar apache.commons:
A parte de particionamento é feita sem preguiça, mas depois que a lista é particionada, você obtém os benefícios de trabalhar com fluxos (por exemplo, usar fluxos paralelos, adicionar filtros, etc.). Outras respostas sugeriram soluções mais elaboradas, mas às vezes a legibilidade e a manutenção são mais importantes (e às vezes não são :-))
fonte
Isso poderia ser feito facilmente usando o Reactor :
fonte
Com
Java 8
ecom.google.common.collect.Lists
, você pode fazer algo como:Aqui
T
está o tipo dos itens na lista de entrada eU
o tipo dos itens na lista de saídaE você pode usá-lo assim:
fonte