Java 8 Stream com processamento em lote

93

Tenho um grande arquivo que contém uma lista de itens.

Eu gostaria de criar um lote de itens, fazer uma solicitação HTTP com esse lote (todos os itens são necessários como parâmetros na solicitação HTTP). Posso fazer isso facilmente com um forloop, mas como amante do Java 8, quero tentar escrever isso com a estrutura Stream do Java 8 (e colher os benefícios do processamento lento).

Exemplo:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Eu quero fazer algo ao longo da linha de lazyFileStream.group(500).map(processBatch).collect(toList())

Qual seria a melhor forma de fazer isso?

Andy Dang
fonte
Não consigo descobrir como realizar o agrupamento, desculpe, mas Arquivos # linhas lerão preguiçosamente o conteúdo do arquivo.
Toby
1
então você basicamente precisa de um inverso de flatMap(+ um flatMap adicional para recolher os fluxos novamente)? Não acho que algo assim exista como um método conveniente na biblioteca padrão. Você terá que encontrar uma biblioteca de terceiros ou escrever a sua própria baseada em divisores e / ou um coletor emitindo um fluxo de fluxos
the8472
3
Talvez você possa combinar Stream.generatecom reader::readLinee limit, mas o problema é que os fluxos não vão bem com exceções. Além disso, isso provavelmente não é bem paralelizável. Acho que o forloop ainda é a melhor opção.
tobias_k
Acabei de adicionar um código de exemplo. Não acho que flatMap seja o caminho a percorrer. Suspeitando que eu poderia ter que escrever um Spliterator personalizado
Andy Dang
1
Estou cunhando o termo "abuso de stream" para perguntas como essa.
kervin

Respostas:

13

Nota! Esta solução lê todo o arquivo antes de executar o forEach.

Você poderia fazer isso com jOOλ , uma biblioteca que estende fluxos Java 8 para casos de uso de fluxo sequencial de thread único:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Nos bastidores, zipWithIndex()é apenas:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... enquanto a groupBy()API é conveniente para:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Isenção de responsabilidade: eu trabalho para a empresa por trás da jOOλ)

Lukas Eder
fonte
Uau. Isso é EXATAMENTE o que estou procurando. Nosso sistema normalmente processa fluxos de dados em sequência, portanto, seria uma boa opção para migrar para o Java 8.
Andy Dang
16
Observe que esta solução armazena desnecessariamente todo o fluxo de entrada no intermediário Map(ao contrário, por exemplo, da solução Ben Manes)
Tagir Valeev
123

Para completar, aqui está uma solução de Guava .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

Na questão, a coleção está disponível, portanto, um fluxo não é necessário e pode ser escrito como,

Iterables.partition(data, batchSize).forEach(this::process);
Ben Manes
fonte
11
Lists.partitioné outra variação que eu deveria ter mencionado.
Ben Manes
2
isso é preguiçoso, certo? ele não chamará todo o conjunto Streampara a memória antes de processar o lote relevante
orirab
1
@orirab sim. É preguiçoso entre os lotes, pois irá consumir batchSizeelementos por iteração.
Ben Manes
Você poderia dar uma olhada stackoverflow.com/questions/58666190/…
gstackoverflow
57

A implementação pura do Java-8 também é possível:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Observe que, ao contrário do JOOl, ele pode funcionar bem em paralelo (desde que dataseja uma lista de acesso aleatório).

Tagir Valeev
fonte
1
e se seus dados forem realmente um fluxo? (digamos, linhas em um arquivo, ou mesmo da rede).
Omry Yadan
6
@OmryYadan, a questão era sobre ter uma opinião do List(veja data.size(), data.get()na pergunta). Estou respondendo à pergunta feita. Se você tiver outra pergunta, faça-a (embora eu ache que a pergunta do stream também já foi feita).
Tagir Valeev
1
Como processar os lotes em paralelo?
soup_boy
36

Solução Java 8 pura :

Podemos criar um coletor personalizado para fazer isso com elegância, que leva um batch sizee um Consumerpara processar cada lote:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

Opcionalmente, crie uma classe de utilitário auxiliar:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Exemplo de uso:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

Eu postei meu código no GitHub também, se alguém quiser dar uma olhada:

Link para Github

Rohitvats
fonte
1
Essa é uma boa solução, a menos que você não consiga colocar todos os elementos de seu stream na memória. Além disso, não funcionará em fluxos infinitos - o método de coleta é terminal, o que significa que, em vez de produzir fluxo de lotes, ele esperará até que o fluxo seja concluído e, em seguida, processará o resultado em lotes.
Alex Ackerman
2
@AlexAckerman um fluxo infinito significará que o finalizador nunca será chamado, mas o acumulador ainda será chamado, então os itens ainda serão processados. Além disso, requer apenas que o tamanho do lote de itens esteja na memória a qualquer momento.
Solubris de
@Solubris, você está certo! Que pena, obrigado por apontar isso - não vou deletar o comentário da referência, se alguém tiver a mesma ideia de como funciona o método collect.
Alex Ackerman
A lista enviada ao consumidor deve ser copiada para tornar a modificação segura, por exemplo: batchProcessor.accept (copyOf (ts))
Solubris
19

Escrevi um Spliterator personalizado para cenários como este. Ele preencherá listas de um determinado tamanho do Fluxo de entrada. A vantagem dessa abordagem é que ela executará processamento lento e funcionará com outras funções de fluxo.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
Bruce Hamilton
fonte
Realmente util. Se alguém quiser colocar em lote alguns critérios personalizados (por exemplo, tamanho da coleção em bytes), você pode delegar seu predicado personalizado e usá-lo no loop for como uma condição (o loop while será mais legível então)
pls
Não tenho certeza se a implementação está correta. Por exemplo, se o fluxo de base for SUBSIZEDas divisões retornadas, trySplitpodem ter mais itens do que antes da divisão (se a divisão acontecer no meio do lote).
Malte
@Malt se meu entendimento Spliteratorsestá correto, então trySplitsempre devo particionar os dados em duas partes aproximadamente iguais para que o resultado nunca seja maior que o original?
Bruce Hamilton
@BruceHamilton Infelizmente, de acordo com os documentos, as partes não podem ser aproximadamente iguais. Eles devem ser iguais:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Malte
Sim, isso é consistente com o meu entendimento da divisão do Spliterator. No entanto, estou tendo dificuldade em entender como "as divisões retornadas de trySplit podem ter mais itens do que antes da divisão", você poderia explicar o que quer dizer aqui?
Bruce Hamilton
13

Tínhamos um problema semelhante para resolver. Queríamos pegar um stream que fosse maior do que a memória do sistema (iterando por todos os objetos em um banco de dados) e randomizar a ordem da melhor maneira possível - pensamos que não haveria problema em armazenar 10.000 itens em buffer e randomizá-los.

O alvo era uma função que incorporava um fluxo.

Das soluções propostas aqui, parece haver uma gama de opções:

  • Use várias bibliotecas adicionais não Java 8
  • Comece com algo que não seja um stream - por exemplo, uma lista de acesso aleatório
  • Tem um fluxo que pode ser facilmente dividido em um divisor

Nosso instinto era originalmente usar um coletor personalizado, mas isso significava sair do streaming. A solução de coletor personalizado acima é muito boa e quase a usamos.

Aqui está uma solução que engana, usando o fato de que Streams pode fornecer um Iteratorque você pode usar como uma saída de emergência para permitir que você faça algo extra que os fluxos não suportam. O Iteratoré convertido de volta para um fluxo usando outro bit de StreamSupportfeitiçaria Java 8 .

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Um exemplo simples de uso seria assim:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

As estampas acima

[A, B, C]
[D, E, F]

Para nosso caso de uso, queríamos embaralhar os lotes e depois mantê-los como um fluxo - parecia com isto:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Isso produz algo como (é aleatório, tão diferente a cada vez)

A
C
B
E
D
F

O segredo aqui é que sempre há um fluxo, então você pode operar em um fluxo de lotes ou fazer alguma coisa com cada lote e depois flatMapvoltar para um fluxo. Mesmo melhor, todos os itens acima só funciona como a final forEachou collectou outras expressões de terminação PULL os dados através da corrente.

Acontece que iteratoré um tipo especial de operação de término em um fluxo e não faz com que todo o fluxo seja executado e entre na memória! Obrigado ao pessoal do Java 8 pelo design brilhante!

Ashley Frieze
fonte
E é muito bom que você itere totalmente sobre cada lote quando ele é coletado e persista em um List- você não pode adiar a iteração dos elementos dentro do lote porque o consumidor pode querer pular um lote inteiro, e se você não consumiu o elementos então eles não estariam pulando muito longe. (Eu implementei um deles em C #, embora fosse substancialmente mais fácil.)
ErikE
9

Você também pode usar RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

ou

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

ou

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
frhack
fonte
8

Você também pode dar uma olhada na reação de ciclope , eu sou o autor desta biblioteca. Ele implementa a interface jOOλ (e, por extensão, JDK 8 Streams), mas ao contrário do JDK 8 Parallel Streams, ele se concentra em operações assíncronas (como potencialmente bloquear chamadas de E / S assíncronas). JDK Parallel Streams, por contraste, concentra-se no paralelismo de dados para operações vinculadas à CPU. Ele funciona gerenciando agregados de tarefas baseadas no futuro, mas apresenta uma API de fluxo estendida padrão para os usuários finais.

Este código de amostra pode ajudá-lo a começar

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

Há um tutorial sobre batching aqui

E um tutorial mais geral aqui

Para usar seu próprio Thread Pool (que provavelmente é mais apropriado para bloquear I / O), você pode iniciar o processamento com

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
John McClean
fonte
3

Exemplo puro de Java 8 que também funciona com fluxos paralelos.

Como usar:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

A declaração e implementação do método:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
Nicolas Lacombe
fonte
2

Com toda a justiça, dê uma olhada na elegante solução Vavr :

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Nolequen
fonte
1

Exemplo simples usando Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

A resposta de Bruce é mais abrangente, mas eu estava procurando por algo rápido e sujo para processar vários arquivos.

rhinmass
fonte
1

esta é uma solução Java pura que é avaliada lentamente.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Ele eu
fonte
1

Você pode usar apache.commons:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

A parte de particionamento é feita sem preguiça, mas depois que a lista é particionada, você obtém os benefícios de trabalhar com fluxos (por exemplo, usar fluxos paralelos, adicionar filtros, etc.). Outras respostas sugeriram soluções mais elaboradas, mas às vezes a legibilidade e a manutenção são mais importantes (e às vezes não são :-))

Tal Joffe
fonte
Não tenho certeza de quem votou contra, mas seria bom entender o porquê. Eu dei uma resposta que complementou as outras respostas para pessoas que não podem usar Goiaba
Tal Joffe
Você está processando uma lista aqui, não um fluxo.
Drakemor
@Drakemor, estou processando um fluxo de sublistas. observe a chamada da função stream ()
Tal Joffe
Mas primeiro você a transforma em uma lista de sublistas, que não funcionará corretamente para dados de fluxo verdadeiros . Aqui está a referência à partição: commons.apache.org/proper/commons-collections/apidocs/org/…
Drakemor
1
TBH: Não entendo totalmente o seu argumento, mas acho que podemos concordar em discordar. Eu editei minha resposta para refletir nossa conversa aqui. Obrigado pela discussão
Tal Joffe
1

Isso poderia ser feito facilmente usando o Reactor :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);
Alex
fonte
0

Com Java 8e com.google.common.collect.Lists, você pode fazer algo como:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Aqui Testá o tipo dos itens na lista de entrada e Uo tipo dos itens na lista de saída

E você pode usá-lo assim:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
Josebui
fonte