Eu estive olhando para o novo rx java 2 e não tenho muita certeza de entender a idéia de backpressure
mais ...
Estou ciente de que temos Observable
que não tem backpressure
suporte eFlowable
tem.
Assim, com base no exemplo, digamos que eu tenho flowable
com interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Isso vai falhar após cerca de 128 valores, e isso é bastante óbvio que estou consumindo mais devagar do que recebendo itens.
Mas então nós temos o mesmo com Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
Isso não trava, mesmo quando eu coloco algum atraso no consumo, ainda funciona. Para fazer o Flowable
trabalho, digamos que eu coloque o onBackpressureDrop
operador, a falha desapareceu, mas nem todos os valores são emitidos.
Portanto, a pergunta básica que não consigo encontrar resposta atualmente em minha cabeça é por que devo me preocupar backpressure
quando posso usar o plain Observable
ainda receber todos os valores sem gerenciar o buffer
? Ou talvez do outro lado, quais vantagens backpressure
me dão em favor do gerenciamento e manuseio do consumidor?
Respostas:
O que a contrapressão manifesta na prática são buffers limitados,
Flowable.observeOn
possui um buffer de 128 elementos que é drenado o mais rápido que o fluxo de fluxo pode suportar. Você pode aumentar esse tamanho do buffer individualmente para lidar com a origem explosiva e todas as práticas de gerenciamento de contrapressão ainda se aplicam a partir da 1.x.Observable.observeOn
possui um buffer ilimitado que continua coletando os elementos e seu aplicativo pode ficar sem memória.Você pode usar,
Observable
por exemplo:Você pode usar,
Flowable
por exemplo:fonte
Maybe
,Single
eCompletable
pode sempre ser usado em vez deFlowable
quando eles são semanticamente apropriado?Maybe
,Single
, eCompletable
são muito pequeno demais para ter qualquer necessidade do conceito de contrapressão. Não há chance de um produtor emitir itens mais rapidamente do que pode ser consumido, uma vez que itens de 1 a 1 serão produzidos ou consumidos.Contrapressão é quando o observável (editor) está criando mais eventos do que o assinante pode suportar. Assim, você pode fazer com que os assinantes faltem aos eventos ou uma enorme fila de eventos, o que acaba levando à falta de memória.
Flowable
leva em consideração a contrapressão.Observable
não. É isso aí.isso me lembra um funil que quando transborda muito líquido. O Flowable pode ajudar a não fazer isso acontecer:
com tremenda contrapressão:
mas com o uso de fluidos, há muito menos contrapressão:
O Rxjava2 possui algumas estratégias de contrapressão que você pode usar, dependendo do seu caso de uso. Por estratégia, quero dizer que o Rxjava2 fornece uma maneira de lidar com objetos que não podem ser processados devido ao estouro (contrapressão).
aqui estão as estratégias. Não vou passar por todos eles, mas, por exemplo, se você não quiser se preocupar com os itens que estão em excesso, pode usar uma estratégia de queda como esta:
observable.toFlowable (BackpressureStrategy.DROP)
Tanto quanto eu sei, deve haver um limite de 128 itens na fila, depois disso pode haver um estouro (contrapressão). Mesmo que não seja 128, é próximo desse número. Espero que isso ajude alguém.
se você precisar alterar o tamanho do buffer de 128, parece que isso pode ser feito assim (mas observe as restrições de memória:
no desenvolvimento de software, geralmente a estratégia de contrapressão significa dizer ao emissor que diminua um pouco, pois o consumidor não consegue lidar com a velocidade dos eventos emitidos.
fonte
O fato de que seu
Flowable
travado após emitir 128 valores sem manipulação de contrapressão não significa que sempre travará após exatamente 128 valores: às vezes travará após 10, e às vezes não travará. Acredito que foi isso que aconteceu quando você tentou o exemploObservable
- não houve contrapressão; portanto, seu código funcionou normalmente, da próxima vez que não. A diferença no RxJava 2 é que não há mais conceito de contrapressão emObservable
s e não há como lidar com isso. Se você estiver projetando uma sequência reativa que provavelmente exigirá manipulação explícita de contrapressão - entãoFlowable
é a sua melhor escolha.fonte
interval
sembackpressure
, eu esperaria algum comportamento ou problemas estranhos?