A biblioteca do Akka Streams já vem com bastante documentação . No entanto, o principal problema para mim é que ele fornece muito material - sinto-me bastante impressionado com o número de conceitos que tenho que aprender. Muitos exemplos mostrados lá parecem muito pesados e não podem ser facilmente traduzidos para casos de uso do mundo real e, portanto, são bastante esotéricos. Eu acho que dá muitos detalhes sem explicar como construir todos os componentes e como exatamente isso ajuda a resolver problemas específicos.
Existem fontes, sumidouros, fluxos, estágios de gráficos, gráficos parciais, materialização, DSL de um gráfico e muito mais e eu simplesmente não sei por onde começar. O guia de início rápido deve ser um ponto de partida, mas eu não o entendo. Ele apenas lança os conceitos mencionados acima sem explicá-los. Além disso, os exemplos de código não podem ser executados - faltam partes, o que torna mais ou menos impossível para mim seguir o texto.
Alguém pode explicar as fontes dos conceitos, sumidouros, fluxos, estágios do gráfico, gráficos parciais, materialização e talvez algumas outras coisas que eu perdi em palavras simples e com exemplos fáceis que não explicam todos os detalhes (e que provavelmente não são necessários de qualquer maneira em o início)?
fonte
Respostas:
Esta resposta é baseada na
akka-stream
versão2.4.2
. A API pode ser um pouco diferente em outras versões. A dependência pode ser consumida pelo sbt :Tudo bem, vamos começar. A API do Akka Streams consiste em três tipos principais. Ao contrário dos Reativos reativos , esses tipos são muito mais poderosos e, portanto, mais complexos. Supõe-se que, para todos os exemplos de código, as seguintes definições já existam:
As
import
instruções são necessárias para as declarações de tipo.system
representa o sistema de atores de Akka ematerializer
o contexto de avaliação do fluxo. No nosso caso, usamos aActorMaterializer
, o que significa que os fluxos são avaliados em cima dos atores. Ambos os valores são marcados comoimplicit
, o que dá ao compilador Scala a possibilidade de injetar essas duas dependências automaticamente sempre que necessário. Também importamossystem.dispatcher
, que é um contexto de execuçãoFutures
.Uma nova API
O Akka Streams possui estas propriedades principais:
Materializer
.Source
,Sink
eFlow
. Os blocos de construção formam um gráfico cuja avaliação se baseiaMaterializer
e precisa ser explicitamente acionada.A seguir, será apresentada uma introdução mais profunda sobre como usar os três tipos principais.
Fonte
A
Source
é um criador de dados, serve como fonte de entrada para o fluxo. CadaSource
um possui um único canal de saída e nenhum canal de entrada. Todos os dados fluem através do canal de saída para o que estiver conectado aoSource
.Imagem tirada em boldradius.com .
A
Source
pode ser criado de várias maneiras:Nos casos acima, alimentamos os
Source
dados finitos, o que significa que eles terminarão eventualmente. Não se deve esquecer que os Fluxos Reativos são preguiçosos e assíncronos por padrão. Isso significa que é necessário solicitar explicitamente a avaliação do fluxo. No Akka Streams, isso pode ser feito através dosrun*
métodos. A funçãorunForeach
não seria diferente da conhecidaforeach
função - por meio darun
adição, fica explícito que solicitamos uma avaliação do fluxo. Como os dados finitos são chatos, continuamos com um infinito:Com o
take
método, podemos criar um ponto de parada artificial que nos impede de avaliar indefinidamente. Como o suporte ao ator é incorporado, também podemos alimentar facilmente o fluxo com mensagens enviadas a um ator:Podemos ver que os
Futures
são executados assincronamente em diferentes threads, o que explica o resultado. No exemplo acima, um buffer para os elementos recebidos não é necessário e, portantoOverflowStrategy.fail
, podemos configurar que o fluxo falhe em um estouro de buffer. Especialmente através dessa interface de ator, podemos alimentar o fluxo através de qualquer fonte de dados. Não importa se os dados são criados pelo mesmo encadeamento, por um diferente, por outro processo ou se vierem de um sistema remoto pela Internet.Pia
A
Sink
é basicamente o oposto de aSource
. É o ponto final de um fluxo e, portanto, consome dados. ASink
possui um único canal de entrada e nenhum canal de saída.Sinks
são especialmente necessários quando queremos especificar o comportamento do coletor de dados de maneira reutilizável e sem avaliar o fluxo. Osrun*
métodos já conhecidos não nos permitem essas propriedades, portanto, é preferível usá-loSink
.Imagem tirada em boldradius.com .
Um pequeno exemplo de um
Sink
em ação:A conexão de a
Source
aSink
pode ser feita com oto
método Ele retorna um assim chamadoRunnableFlow
, que é como veremos mais tarde uma forma especial de aFlow
- um fluxo que pode ser executado apenas chamando seurun()
método.Imagem tirada em boldradius.com .
É claro que é possível encaminhar todos os valores que chegam a um coletor para um ator:
Fluxo
As fontes de dados e os coletores são ótimos se você precisar de uma conexão entre os fluxos Akka e um sistema existente, mas não se pode realmente fazer nada com eles. Os fluxos são a última peça que falta na abstração base do Akka Streams. Eles atuam como um conector entre diferentes fluxos e podem ser usados para transformar seus elementos.
Imagem tirada em boldradius.com .
Se um
Flow
está conectado aSource
um novo,Source
é o resultado. Da mesma forma, umFlow
conectado a umSink
cria um novoSink
. E umFlow
conectado com aSource
e aSink
resulta em aRunnableFlow
. Portanto, eles ficam entre a entrada e o canal de saída, mas por si só não correspondem a um dos sabores, desde que não estejam conectados a umSource
ou aSink
.Imagem tirada em boldradius.com .
Para entender melhor
Flows
, veremos alguns exemplos:Através do
via
método, podemos conectar aSource
com aFlow
. Precisamos especificar o tipo de entrada porque o compilador não pode inferir isso para nós. Como já podemos ver neste exemplo simples, os fluxosinvert
edouble
são completamente independentes de quaisquer produtores e consumidores de dados. Eles apenas transformam os dados e os encaminham para o canal de saída. Isso significa que podemos reutilizar um fluxo entre vários fluxos:s1
es2
representam fluxos completamente novos - eles não compartilham dados através de seus blocos de construção.Fluxos de dados não ligados
Antes de seguirmos em frente, devemos revisitar alguns dos principais aspectos dos Fluxos Reativos. Um número ilimitado de elementos pode chegar a qualquer ponto e colocar um fluxo em diferentes estados. Além de um fluxo executável, que é o estado usual, um fluxo pode ser interrompido por um erro ou por um sinal que indica que nenhum dado adicional chegará. Um fluxo pode ser modelado de maneira gráfica, marcando eventos em uma linha do tempo, como é o caso aqui:
Imagem retirada da introdução à Programação Reativa que você está perdendo .
Já vimos fluxos executáveis nos exemplos da seção anterior. Temos um
RunnableGraph
sempre que um fluxo pode realmente ser materializado, o que significa que aSink
está conectado a umSource
. Até agora, sempre nos materializamos com o valorUnit
, que pode ser visto nos tipos:Para
Source
e paraSink
o segundo parâmetro de tipo e paraFlow
o terceiro parâmetro de tipo, denote o valor materializado. Ao longo desta resposta, o significado completo da materialização não deve ser explicado. No entanto, mais detalhes sobre materialização podem ser encontrados na documentação oficial . Por enquanto, a única coisa que precisamos saber é que o valor materializado é o que obtemos quando executamos um fluxo. Como estávamos interessados apenas em efeitos colaterais até agora, obtivemosUnit
o valor materializado. A exceção a isso foi a materialização de uma pia, que resultou em aFuture
. Nos devolveu umFuture
, pois esse valor pode indicar quando o fluxo conectado ao coletor foi finalizado. Até agora, os exemplos de código anteriores eram bons para explicar o conceito, mas também eram chatos, porque lidávamos apenas com fluxos finitos ou com infinitos muito simples. Para torná-lo mais interessante, a seguir, um fluxo completo assíncrono e ilimitado deve ser explicado.Exemplo de ClickStream
Como exemplo, queremos ter um fluxo que capture eventos de clique. Para torná-lo mais desafiador, digamos que também queremos agrupar eventos de clique que acontecem em um curto espaço de tempo um após o outro. Dessa forma, poderíamos descobrir facilmente cliques duplos, triplos ou dez vezes maiores. Além disso, queremos filtrar todos os cliques únicos. Respire fundo e imagine como você resolveria esse problema de maneira imperativa. Aposto que ninguém seria capaz de implementar uma solução que funcione corretamente na primeira tentativa. De uma maneira reativa, esse problema é trivial de resolver. De fato, a solução é tão simples e direta de implementar que podemos até expressá-la em um diagrama que descreve diretamente o comportamento do código:
Imagem retirada da introdução à Programação Reativa que você está perdendo .
As caixas cinzas são funções que descrevem como um fluxo é transformado em outro. Com a
throttle
função que acumulamos cliques em 250 milissegundos, as funçõesmap
efilter
devem ser auto-explicativas. As esferas coloridas representam um evento e as setas mostram como elas fluem através de nossas funções. Posteriormente nas etapas de processamento, obtemos cada vez menos elementos que fluem através de nosso fluxo, pois os agrupamos e os filtramos. O código para esta imagem seria algo como isto:Toda a lógica pode ser representada em apenas quatro linhas de código! Em Scala, poderíamos escrever ainda mais:
A definição de
clickStream
é um pouco mais complexa, mas esse é apenas o caso, porque o programa de exemplo é executado na JVM, onde a captura de eventos de clique não é facilmente possível. Outra complicação é que o Akka por padrão não fornece athrottle
função. Em vez disso, tivemos que escrever sozinhos. Como essa função é (como é o caso para as funçõesmap
oufilter
) reutilizável em diferentes casos de uso, não conto essas linhas com o número de linhas necessárias para implementar a lógica. No entanto, em linguagens imperativas, é normal que a lógica não possa ser reutilizada com tanta facilidade e que as diferentes etapas lógicas ocorram em um só lugar, em vez de serem aplicadas seqüencialmente, o que significa que provavelmente teríamos deformado nosso código com a lógica de limitação. O exemplo de código completo está disponível como umessência e não será discutido aqui ainda mais.Exemplo de SimpleWebServer
O que deveria ser discutido é outro exemplo. Embora o fluxo de cliques seja um bom exemplo para permitir que o Akka Streams lide com um exemplo do mundo real, ele não tem o poder de mostrar execução paralela em ação. O próximo exemplo deve representar um pequeno servidor Web que pode lidar com várias solicitações em paralelo. O servidor da Web deve aceitar conexões de entrada e receber seqüências de bytes delas que representam sinais ASCII imprimíveis. Essas seqüências de bytes ou seqüências de caracteres devem ser divididas em todos os caracteres de nova linha em partes menores. Depois disso, o servidor deve responder ao cliente com cada uma das linhas de divisão. Como alternativa, ele poderia fazer outra coisa com as linhas e fornecer um token de resposta especial, mas queremos mantê-lo simples neste exemplo e, portanto, não apresentar nenhum recurso sofisticado. Lembrar, o servidor precisa ser capaz de lidar com várias solicitações ao mesmo tempo, o que basicamente significa que nenhuma solicitação pode bloquear qualquer outra solicitação de execução posterior. A solução de todos esses requisitos pode ser difícil de uma maneira imperativa - com o Akka Streams, no entanto, não precisamos de mais do que algumas linhas para resolver qualquer um deles. Primeiro, vamos ter uma visão geral sobre o próprio servidor:
Basicamente, existem apenas três componentes principais. O primeiro precisa aceitar conexões de entrada. O segundo precisa lidar com solicitações recebidas e o terceiro precisa enviar uma resposta. A implementação de todos esses três componentes é apenas um pouco mais complicada do que a implementação do fluxo de cliques:
A função
mkServer
leva (além do endereço e da porta do servidor) também um sistema ator e um materializador como parâmetros implícitos. O fluxo de controle do servidor é representado porbinding
, que pega uma fonte de conexões de entrada e as encaminha para um coletor de conexões de entrada. Dentro deconnectionHandler
, que é nossa pia, lidamos com todas as conexões pelo fluxoserverLogic
, que serão descritas mais adiante.binding
retorna umFuture
, que termina quando o servidor foi iniciado ou o início falhou, o que pode acontecer quando a porta já é tomada por outro processo. O código, no entanto, não reflete completamente o gráfico, pois não podemos ver um bloco de construção que lida com respostas. A razão para isso é que a conexão já fornece essa lógica por si só. É um fluxo bidirecional e não apenas unidirecional como os fluxos que vimos nos exemplos anteriores. Como foi o caso da materialização, tais fluxos complexos não serão explicados aqui. A documentação oficial possui bastante material para cobrir gráficos de fluxo mais complexos. Por enquanto, basta saber queTcp.IncomingConnection
representa uma conexão que sabe como receber solicitações e como enviar respostas. A parte que ainda está faltando é aserverLogic
bloco de construção. Pode ficar assim:Mais uma vez, somos capazes de dividir a lógica em vários blocos de construção simples que, juntos, formam o fluxo do nosso programa. Primeiro, queremos dividir nossa sequência de bytes em linhas, o que devemos fazer sempre que encontrarmos um caractere de nova linha. Depois disso, os bytes de cada linha precisam ser convertidos em uma sequência, porque trabalhar com bytes brutos é complicado. No geral, poderíamos receber um fluxo binário de um protocolo complicado, o que tornaria o trabalho com os dados brutos recebidos extremamente desafiador. Depois de termos uma sequência legível, podemos criar uma resposta. Por razões de simplicidade, a resposta pode ser qualquer coisa no nosso caso. No final, temos que converter de volta nossa resposta em uma sequência de bytes que podem ser enviados por fio. O código para toda a lógica pode ser assim:
Já sabemos que
serverLogic
é um fluxo que leva aeByteString
tem que produzir aByteString
. Comdelimiter
isso, podemos dividir umByteString
em partes menores - no nosso caso, isso precisa acontecer sempre que um caractere de nova linha ocorre.receiver
é o fluxo que pega todas as seqüências de bytes divididos e as converte em uma sequência. Obviamente, essa é uma conversão perigosa, pois apenas caracteres ASCII imprimíveis devem ser convertidos em uma string, mas, para as nossas necessidades, é bom o suficiente.responder
é o último componente e é responsável por criar uma resposta e converter a resposta em uma sequência de bytes. Ao contrário do gráfico, não dividimos esse último componente em dois, pois a lógica é trivial. No final, conectamos todos os fluxos através dovia
função. Nesse ponto, pode-se perguntar se cuidamos da propriedade multiusuário mencionada no início. E de fato o fizemos, mesmo que isso não seja óbvio imediatamente. Ao olhar para este gráfico, deve ficar mais claro:O
serverLogic
componente nada mais é do que um fluxo que contém fluxos menores. Este componente recebe uma entrada, que é uma solicitação, e produz uma saída, que é a resposta. Como os fluxos podem ser construídos várias vezes e todos funcionam independentemente um do outro, conseguimos com isso aninhar nossa propriedade multiusuário. Cada solicitação é tratada dentro de sua própria solicitação e, portanto, uma solicitação de execução curta pode substituir uma solicitação de execução longa iniciada anteriormente. Caso você tenha se perguntado, a definiçãoserverLogic
disso foi mostrada anteriormente, é claro, pode ser escrita muito mais curta, incorporando a maioria de suas definições internas:Um teste do servidor da web pode ser assim:
Para que o exemplo de código acima funcione corretamente, primeiro precisamos iniciar o servidor, que é representado pelo
startServer
script:O exemplo de código completo deste servidor TCP simples pode ser encontrado aqui . Não somos apenas capazes de escrever um servidor com o Akka Streams, mas também o cliente. Pode ser assim:
O cliente TCP de código completo pode ser encontrado aqui . O código parece bastante semelhante, mas, ao contrário do servidor, não precisamos mais gerenciar as conexões de entrada.
Gráficos complexos
Nas seções anteriores, vimos como podemos construir programas simples a partir dos fluxos. No entanto, na realidade, muitas vezes não basta confiar apenas em funções já incorporadas para construir fluxos mais complexos. Se queremos poder usar o Akka Streams para programas arbitrários, precisamos saber como criar nossas próprias estruturas de controle personalizadas e fluxos combináveis que nos permitem lidar com a complexidade de nossos aplicativos. A boa notícia é que o Akka Streams foi projetado para se adaptar às necessidades dos usuários e, para oferecer uma breve introdução às partes mais complexas do Akka Streams, adicionamos mais alguns recursos ao nosso exemplo de cliente / servidor.
Uma coisa que ainda não podemos fazer é fechar uma conexão. Nesse ponto, começa a ficar um pouco mais complicado, porque a API de fluxo que vimos até agora não nos permite interromper um fluxo em um ponto arbitrário. No entanto, existe a
GraphStage
abstração, que pode ser usada para criar estágios arbitrários de processamento gráfico com qualquer número de portas de entrada ou saída. Vamos primeiro dar uma olhada no lado do servidor, onde apresentamos um novo componente, chamadocloseConnection
:Essa API parece muito mais complicada do que a API de fluxo. Não é de admirar, temos que fazer muitos passos imperativos aqui. Em troca, temos mais controle sobre o comportamento de nossos fluxos. No exemplo acima, especificamos apenas uma porta de entrada e uma saída e as disponibilizamos para o sistema substituindo o
shape
valor. Além disso, definimos um chamadoInHandler
e aOutHandler
, que são, nessa ordem, responsáveis por receber e emitir elementos. Se você olhou atentamente para o exemplo completo do fluxo de cliques, já deve reconhecer esses componentes. NoInHandler
agarramos um elemento e, se for uma string com um único caractere'q'
, queremos fechar o fluxo. Para dar ao cliente a chance de descobrir que o fluxo será fechado em breve, emitimos a string"BYE"
e então fechamos o palco imediatamente depois. OcloseConnection
componente pode ser combinado com um fluxo por meio dovia
método, que foi introduzido na seção sobre fluxos.Além de poder fechar conexões, também seria bom se pudéssemos mostrar uma mensagem de boas-vindas a uma conexão recém-criada. Para fazer isso, mais uma vez temos que ir um pouco mais longe:
A função
serverLogic
agora aceita a conexão recebida como parâmetro. Dentro de seu corpo, usamos uma DSL que nos permite descrever um comportamento complexo do fluxo. Comwelcome
isso, criamos um fluxo que pode emitir apenas um elemento - a mensagem de boas-vindas.logic
é o que foi descritoserverLogic
na seção anterior. A única diferença notável é que nós adicionamoscloseConnection
a ele. Agora, na verdade, vem a parte interessante da DSL. AGraphDSL.create
função disponibiliza um construtorb
, que é usado para expressar o fluxo como um gráfico. Com a~>
função, é possível conectar portas de entrada e saída entre si. OConcat
componente usado no exemplo pode concatenar elementos e aqui é usado para anexar a mensagem de boas-vindas na frente dos outros elementos que saem dointernalLogic
. Na última linha, apenas disponibilizamos a porta de entrada da lógica do servidor e a porta de saída do fluxo concatenado, porque todas as outras portas permanecerão um detalhe de implementação doserverLogic
componente. Para uma introdução detalhada ao DSL gráfico do Akka Streams, visite a seção correspondente na documentação oficial . O exemplo de código completo do servidor TCP complexo e de um cliente que pode se comunicar com ele pode ser encontrado aqui . Sempre que você abrir uma nova conexão a partir do cliente, deverá receber uma mensagem de boas-vindas e, digitando"q"
-o, deverá receber uma mensagem informando que a conexão foi cancelada.Ainda existem alguns tópicos que não foram abordados por esta resposta. Especialmente a materialização pode assustar um leitor ou outro, mas tenho certeza de que, com o material abordado aqui, todos devem poder dar os próximos passos sozinhos. Como já foi dito, a documentação oficial é um bom lugar para continuar aprendendo sobre o Akka Streams.
fonte