Estou tentando processar o código abaixo usando multithreading no nível do pedido.
List<String> orders = Arrays.asList("order1", "order2",
"order3", "order4", "order1");
Execução sequencial atual:
orders.stream().forEach(order -> {
rules.forEach(rule -> {
finalList.add(beanMapper.getBean(rule)
.applyRule(createTemplate.apply(getMetaData.apply(rule), command),
order));
});
});
Eu tentei usar:
orders.parallelStream().forEach(order -> {}} // code snippet.
Mas está mudando a ordem rules.forEach (rule -> {}} .
Por exemplo:
Entrada:
List<String> orders = Arrays.asList("order1", "order2",
"order3", "order4", "order1");
List<String> rules = Arrays.asList("rule1", "rule2", "rule3");
Saída esperada:
order1 with rule1, rule2, rule3
order2 with rule1, rule2, rule3
Saída real com parallelStream()
:
order1 with rule3, rule1, rule2
order1 with rule2, rule1, rule3
Não estou preocupado com a ordem das ordens , mas estou preocupado com a ordem das regras . Os pedidos podem processar em qualquer ordem, mas as regras devem ser executadas na mesma ordem para cada pedido.
Por favor ajude.
java
multithreading
java-8
java-stream
Mayank Bisht
fonte
fonte
Você adiciona elementos ao
finalList
de diferentes segmentos ao mesmo tempo. Isso está causando resultados conflitantes da aplicação de regras a diferentes pedidos (as regras não estão sendo agrupadas por seus pedidos).Você pode corrigi-lo criando uma lista temporária para cada uma
order
e mesclando sincronamente todas as listas temporárias em afinalList
.Aqui está como você pode fazer isso usando a API de fluxo (Java 9+):
Nota:
Collectors.flatMapping()
é usado aqui em vez de simplesflatMap
executar o mapeamento plano de forma síncrona durante a coleta de fluxo.Analógico Java 8:
fonte
beanMapper.getBean(rule) .applyRule(createTemplate.apply(getMetaData.apply(rule), command), order)
não é uma função pura, portanto não pode ser usada em paralelo. Tente remover todos os efeitos colaterais dele;ConcurrentModificationException
o rastreamento de pilha pode ajudar a localizá-los.Isso vai funcionar?
Resultado
fonte
Se for esse o caso, não há espaço para o paralelismo real.
Quando
e
são as únicas execuções válidas para 2 pedidos e 2 regras,
e
é considerado inválido, isso não é paralelismo, apenas randomização de
order
s, presumivelmente sem ganho. Se você está "entediado" comorder1
a primeira vez o tempo todo, pode embaralhar a lista, mas isso é tudo:Nem mesmo o streaming é necessário, apenas dois loops aninhados. Teste: https://ideone.com/qI3dqd
Resposta original
Não, não tem. Os
order
s podem se sobrepor, mas a ordem derule
s para cada pedido é mantida. Por que um não paraleloforEach
faria outra coisa?Código de exemplo:
Teste: https://ideone.com/95Cybg
Exemplo de saída:
A ordem de
order
s é mista, mas osrule
sempre são 1-2-3. Acho que sua saída simplesmente ocultou os pares (na verdade você não mostrou como foi gerada).É claro que ele pode ser estendido com alguns atrasos, portanto o processamento de
order
s se sobrepõe:Teste: https://ideone.com/cSFaqS
Exemplo de saída:
Isso pode ser algo que você viu, apenas sem a
orderx
parte. Com osorder
s visíveis, pode ser rastreado querule
continua chegando como 1-2-3, pororder
. Além disso, sua lista de exemplos continhaorder1
duas vezes, o que com certeza não ajudou a ver o que estava acontecendo.fonte
order
s não podem se sobrepor (rule
talvez sejam esses stateful e existam em um número limitado de cópias, talvez apenas uma única?). Mas geralmente não há paralelismo sem que as coisas funcionem paralelamente, afinal esse é o ponto principal do paralelismo.Se você não se importa de tentar a biblioteca de terceiros. Aqui está um exemplo com minha biblioteca: abacus-util
E você pode até especificar o número da linha:
A ordem de
rule
será mantida.A propósito, como está no fluxo paralelo, o código
...finalList.add(...
provavelmente não funcionará. Eu acho que é melhor coletar o resultado para listar:também é possível, mesmo que você queira manter a ordem
order
por algum motivo posteriormente:fonte