Como começar com o Akka Streams? [fechadas]

222

A biblioteca do Akka Streams já vem com bastante documentação . No entanto, o principal problema para mim é que ele fornece muito material - sinto-me bastante impressionado com o número de conceitos que tenho que aprender. Muitos exemplos mostrados lá parecem muito pesados ​​e não podem ser facilmente traduzidos para casos de uso do mundo real e, portanto, são bastante esotéricos. Eu acho que dá muitos detalhes sem explicar como construir todos os componentes e como exatamente isso ajuda a resolver problemas específicos.

Existem fontes, sumidouros, fluxos, estágios de gráficos, gráficos parciais, materialização, DSL de um gráfico e muito mais e eu simplesmente não sei por onde começar. O guia de início rápido deve ser um ponto de partida, mas eu não o entendo. Ele apenas lança os conceitos mencionados acima sem explicá-los. Além disso, os exemplos de código não podem ser executados - faltam partes, o que torna mais ou menos impossível para mim seguir o texto.

Alguém pode explicar as fontes dos conceitos, sumidouros, fluxos, estágios do gráfico, gráficos parciais, materialização e talvez algumas outras coisas que eu perdi em palavras simples e com exemplos fáceis que não explicam todos os detalhes (e que provavelmente não são necessários de qualquer maneira em o início)?

kiritsuku
fonte
2
Para sua informação, isso está sendo discutido na meta
DavidG
10
Como a primeira pessoa a votar para encerrar isso (seguindo o tópico Meta), deixe-me primeiro dizer que sua resposta aqui é ótima . É realmente profundo e é certamente um recurso muito útil. No entanto, infelizmente, a pergunta que você fez é muito ampla para o Stack Overflow. Se, de alguma forma, sua resposta pode ser postada em uma pergunta com palavras diferentes, é incrível, mas acho que não. Aconselho vivamente a reenviar isso como uma postagem de blog ou algo semelhante que você e outras pessoas possam usar como recurso de referência em respostas futuras.
James Donnelly
2
Eu acho que escrever essa pergunta como um post no blog não seria eficaz. Sim, esta é uma pergunta ampla - e é realmente uma boa pergunta. Reduzir seu escopo não o melhoraria. A resposta fornecida é fantástica. Tenho certeza de que o Quora ficaria feliz em tirar negócios da SO para as grandes perguntas.
Mike Slinn 25/02
11
O @MikeSlinn não tenta discutir com o pessoal da SO as perguntas apropriadas, ele segue cegamente as regras. Contanto que a pergunta não seja removida, fico feliz e não sinto vontade de mudar para uma plataforma diferente.
Kiritsuku
2
@sschaef Como pedante. Sim, é claro, as regras não valem nada, seu grande eu sabe muito melhor e todos que tentam aplicar regras seguem apenas cegamente o hype. / discurso retórico. mais a sério, isso seria um ótimo complemento para a documentação beta, se você estiver nela. Você ainda pode aplicá-lo e colocá-lo lá em cima, mas pelo menos deve ver que não é um ótimo ajuste para o site principal.
Félix Gagnon-Grenier

Respostas:

506

Esta resposta é baseada na akka-streamversão 2.4.2. A API pode ser um pouco diferente em outras versões. A dependência pode ser consumida pelo sbt :

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"

Tudo bem, vamos começar. A API do Akka Streams consiste em três tipos principais. Ao contrário dos Reativos reativos , esses tipos são muito mais poderosos e, portanto, mais complexos. Supõe-se que, para todos os exemplos de código, as seguintes definições já existam:

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._

implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
import system.dispatcher

As importinstruções são necessárias para as declarações de tipo. systemrepresenta o sistema de atores de Akka e materializero contexto de avaliação do fluxo. No nosso caso, usamos a ActorMaterializer, o que significa que os fluxos são avaliados em cima dos atores. Ambos os valores são marcados como implicit, o que dá ao compilador Scala a possibilidade de injetar essas duas dependências automaticamente sempre que necessário. Também importamos system.dispatcher, que é um contexto de execução Futures.

Uma nova API

O Akka Streams possui estas propriedades principais:

  • Eles implementam a especificação Reactive Streams , cujos três objetivos principais são contrapressão, limites assíncronos e sem bloqueio e interoperabilidade entre diferentes implementações também se aplicam totalmente ao Akka Streams.
  • Eles fornecem uma abstração para um mecanismo de avaliação para os fluxos, que é chamado Materializer.
  • Programas são formulados como blocos de construção reutilizáveis, que são representados como os três principais tipos Source, Sinke Flow. Os blocos de construção formam um gráfico cuja avaliação se baseia Materializere precisa ser explicitamente acionada.

A seguir, será apresentada uma introdução mais profunda sobre como usar os três tipos principais.

Fonte

A Sourceé um criador de dados, serve como fonte de entrada para o fluxo. Cada Sourceum possui um único canal de saída e nenhum canal de entrada. Todos os dados fluem através do canal de saída para o que estiver conectado ao Source.

Fonte

Imagem tirada em boldradius.com .

A Sourcepode ser criado de várias maneiras:

scala> val s = Source.empty
s: akka.stream.scaladsl.Source[Nothing,akka.NotUsed] = ...

scala> val s = Source.single("single element")
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> val s = Source(1 to 3)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val s = Source(Future("single value from a Future"))
s: akka.stream.scaladsl.Source[String,akka.NotUsed] = ...

scala> s runForeach println
res0: scala.concurrent.Future[akka.Done] = ...
single value from a Future

Nos casos acima, alimentamos os Sourcedados finitos, o que significa que eles terminarão eventualmente. Não se deve esquecer que os Fluxos Reativos são preguiçosos e assíncronos por padrão. Isso significa que é necessário solicitar explicitamente a avaliação do fluxo. No Akka Streams, isso pode ser feito através dos run*métodos. A função runForeachnão seria diferente da conhecida foreachfunção - por meio da runadição, fica explícito que solicitamos uma avaliação do fluxo. Como os dados finitos são chatos, continuamos com um infinito:

scala> val s = Source.repeat(5)
s: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> s take 3 runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
5
5
5

Com o takemétodo, podemos criar um ponto de parada artificial que nos impede de avaliar indefinidamente. Como o suporte ao ator é incorporado, também podemos alimentar facilmente o fluxo com mensagens enviadas a um ator:

def run(actor: ActorRef) = {
  Future { Thread.sleep(300); actor ! 1 }
  Future { Thread.sleep(200); actor ! 2 }
  Future { Thread.sleep(100); actor ! 3 }
}
val s = Source
  .actorRef[Int](bufferSize = 0, OverflowStrategy.fail)
  .mapMaterializedValue(run)

scala> s runForeach println
res1: scala.concurrent.Future[akka.Done] = ...
3
2
1

Podemos ver que os Futuressão executados assincronamente em diferentes threads, o que explica o resultado. No exemplo acima, um buffer para os elementos recebidos não é necessário e, portanto OverflowStrategy.fail, podemos configurar que o fluxo falhe em um estouro de buffer. Especialmente através dessa interface de ator, podemos alimentar o fluxo através de qualquer fonte de dados. Não importa se os dados são criados pelo mesmo encadeamento, por um diferente, por outro processo ou se vierem de um sistema remoto pela Internet.

Pia

A Sinké basicamente o oposto de a Source. É o ponto final de um fluxo e, portanto, consome dados. A Sinkpossui um único canal de entrada e nenhum canal de saída. Sinkssão especialmente necessários quando queremos especificar o comportamento do coletor de dados de maneira reutilizável e sem avaliar o fluxo. Os run*métodos já conhecidos não nos permitem essas propriedades, portanto, é preferível usá-lo Sink.

Pia

Imagem tirada em boldradius.com .

Um pequeno exemplo de um Sinkem ação:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](elem => println(s"sink received: $elem"))
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val flow = source to sink
flow: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> flow.run()
res3: akka.NotUsed = NotUsed
sink received: 1
sink received: 2
sink received: 3

A conexão de a Sourcea Sinkpode ser feita com o tométodo Ele retorna um assim chamado RunnableFlow, que é como veremos mais tarde uma forma especial de a Flow- um fluxo que pode ser executado apenas chamando seu run()método.

Fluxo executável

Imagem tirada em boldradius.com .

É claro que é possível encaminhar todos os valores que chegam a um coletor para um ator:

val actor = system.actorOf(Props(new Actor {
  override def receive = {
    case msg => println(s"actor received: $msg")
  }
}))

scala> val sink = Sink.actorRef[Int](actor, onCompleteMessage = "stream completed")
sink: akka.stream.scaladsl.Sink[Int,akka.NotUsed] = ...

scala> val runnable = Source(1 to 3) to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res3: akka.NotUsed = NotUsed
actor received: 1
actor received: 2
actor received: 3
actor received: stream completed

Fluxo

As fontes de dados e os coletores são ótimos se você precisar de uma conexão entre os fluxos Akka e um sistema existente, mas não se pode realmente fazer nada com eles. Os fluxos são a última peça que falta na abstração base do Akka Streams. Eles atuam como um conector entre diferentes fluxos e podem ser usados ​​para transformar seus elementos.

Fluxo

Imagem tirada em boldradius.com .

Se um Flowestá conectado a Sourceum novo, Sourceé o resultado. Da mesma forma, um Flowconectado a um Sinkcria um novo Sink. E um Flowconectado com a Sourcee a Sinkresulta em a RunnableFlow. Portanto, eles ficam entre a entrada e o canal de saída, mas por si só não correspondem a um dos sabores, desde que não estejam conectados a um Sourceou a Sink.

Stream completo

Imagem tirada em boldradius.com .

Para entender melhor Flows, veremos alguns exemplos:

scala> val source = Source(1 to 3)
source: akka.stream.scaladsl.Source[Int,akka.NotUsed] = ...

scala> val sink = Sink.foreach[Int](println)
sink: akka.stream.scaladsl.Sink[Int,scala.concurrent.Future[akka.Done]] = ...

scala> val invert = Flow[Int].map(elem => elem * -1)
invert: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val doubler = Flow[Int].map(elem => elem * 2)
doubler: akka.stream.scaladsl.Flow[Int,Int,akka.NotUsed] = ...

scala> val runnable = source via invert via doubler to sink
runnable: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> runnable.run()
res10: akka.NotUsed = NotUsed
-2
-4
-6

Através do viamétodo, podemos conectar a Sourcecom a Flow. Precisamos especificar o tipo de entrada porque o compilador não pode inferir isso para nós. Como já podemos ver neste exemplo simples, os fluxos inverte doublesão completamente independentes de quaisquer produtores e consumidores de dados. Eles apenas transformam os dados e os encaminham para o canal de saída. Isso significa que podemos reutilizar um fluxo entre vários fluxos:

scala> val s1 = Source(1 to 3) via invert to sink
s1: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> val s2 = Source(-3 to -1) via invert to sink
s2: akka.stream.scaladsl.RunnableGraph[akka.NotUsed] = ...

scala> s1.run()
res10: akka.NotUsed = NotUsed
-1
-2
-3

scala> s2.run()
res11: akka.NotUsed = NotUsed
3
2
1

s1e s2representam fluxos completamente novos - eles não compartilham dados através de seus blocos de construção.

Fluxos de dados não ligados

Antes de seguirmos em frente, devemos revisitar alguns dos principais aspectos dos Fluxos Reativos. Um número ilimitado de elementos pode chegar a qualquer ponto e colocar um fluxo em diferentes estados. Além de um fluxo executável, que é o estado usual, um fluxo pode ser interrompido por um erro ou por um sinal que indica que nenhum dado adicional chegará. Um fluxo pode ser modelado de maneira gráfica, marcando eventos em uma linha do tempo, como é o caso aqui:

Mostra que um fluxo é uma sequência de eventos em andamento ordenados no tempo

Imagem retirada da introdução à Programação Reativa que você está perdendo .

Já vimos fluxos executáveis ​​nos exemplos da seção anterior. Temos um RunnableGraphsempre que um fluxo pode realmente ser materializado, o que significa que a Sinkestá conectado a um Source. Até agora, sempre nos materializamos com o valor Unit, que pode ser visto nos tipos:

val source: Source[Int, NotUsed] = Source(1 to 3)
val sink: Sink[Int, Future[Done]] = Sink.foreach[Int](println)
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(x => x)

Para Sourcee para Sinko segundo parâmetro de tipo e para Flowo terceiro parâmetro de tipo, denote o valor materializado. Ao longo desta resposta, o significado completo da materialização não deve ser explicado. No entanto, mais detalhes sobre materialização podem ser encontrados na documentação oficial . Por enquanto, a única coisa que precisamos saber é que o valor materializado é o que obtemos quando executamos um fluxo. Como estávamos interessados ​​apenas em efeitos colaterais até agora, obtivemos Unito valor materializado. A exceção a isso foi a materialização de uma pia, que resultou em a Future. Nos devolveu umFuture, pois esse valor pode indicar quando o fluxo conectado ao coletor foi finalizado. Até agora, os exemplos de código anteriores eram bons para explicar o conceito, mas também eram chatos, porque lidávamos apenas com fluxos finitos ou com infinitos muito simples. Para torná-lo mais interessante, a seguir, um fluxo completo assíncrono e ilimitado deve ser explicado.

Exemplo de ClickStream

Como exemplo, queremos ter um fluxo que capture eventos de clique. Para torná-lo mais desafiador, digamos que também queremos agrupar eventos de clique que acontecem em um curto espaço de tempo um após o outro. Dessa forma, poderíamos descobrir facilmente cliques duplos, triplos ou dez vezes maiores. Além disso, queremos filtrar todos os cliques únicos. Respire fundo e imagine como você resolveria esse problema de maneira imperativa. Aposto que ninguém seria capaz de implementar uma solução que funcione corretamente na primeira tentativa. De uma maneira reativa, esse problema é trivial de resolver. De fato, a solução é tão simples e direta de implementar que podemos até expressá-la em um diagrama que descreve diretamente o comportamento do código:

A lógica do exemplo do fluxo de cliques

Imagem retirada da introdução à Programação Reativa que você está perdendo .

As caixas cinzas são funções que descrevem como um fluxo é transformado em outro. Com a throttlefunção que acumulamos cliques em 250 milissegundos, as funções mape filterdevem ser auto-explicativas. As esferas coloridas representam um evento e as setas mostram como elas fluem através de nossas funções. Posteriormente nas etapas de processamento, obtemos cada vez menos elementos que fluem através de nosso fluxo, pois os agrupamos e os filtramos. O código para esta imagem seria algo como isto:

val multiClickStream = clickStream
    .throttle(250.millis)
    .map(clickEvents => clickEvents.length)
    .filter(numberOfClicks => numberOfClicks >= 2)

Toda a lógica pode ser representada em apenas quatro linhas de código! Em Scala, poderíamos escrever ainda mais:

val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

A definição de clickStreamé um pouco mais complexa, mas esse é apenas o caso, porque o programa de exemplo é executado na JVM, onde a captura de eventos de clique não é facilmente possível. Outra complicação é que o Akka por padrão não fornece a throttlefunção. Em vez disso, tivemos que escrever sozinhos. Como essa função é (como é o caso para as funções mapou filter) reutilizável em diferentes casos de uso, não conto essas linhas com o número de linhas necessárias para implementar a lógica. No entanto, em linguagens imperativas, é normal que a lógica não possa ser reutilizada com tanta facilidade e que as diferentes etapas lógicas ocorram em um só lugar, em vez de serem aplicadas seqüencialmente, o que significa que provavelmente teríamos deformado nosso código com a lógica de limitação. O exemplo de código completo está disponível como umessência e não será discutido aqui ainda mais.

Exemplo de SimpleWebServer

O que deveria ser discutido é outro exemplo. Embora o fluxo de cliques seja um bom exemplo para permitir que o Akka Streams lide com um exemplo do mundo real, ele não tem o poder de mostrar execução paralela em ação. O próximo exemplo deve representar um pequeno servidor Web que pode lidar com várias solicitações em paralelo. O servidor da Web deve aceitar conexões de entrada e receber seqüências de bytes delas que representam sinais ASCII imprimíveis. Essas seqüências de bytes ou seqüências de caracteres devem ser divididas em todos os caracteres de nova linha em partes menores. Depois disso, o servidor deve responder ao cliente com cada uma das linhas de divisão. Como alternativa, ele poderia fazer outra coisa com as linhas e fornecer um token de resposta especial, mas queremos mantê-lo simples neste exemplo e, portanto, não apresentar nenhum recurso sofisticado. Lembrar, o servidor precisa ser capaz de lidar com várias solicitações ao mesmo tempo, o que basicamente significa que nenhuma solicitação pode bloquear qualquer outra solicitação de execução posterior. A solução de todos esses requisitos pode ser difícil de uma maneira imperativa - com o Akka Streams, no entanto, não precisamos de mais do que algumas linhas para resolver qualquer um deles. Primeiro, vamos ter uma visão geral sobre o próprio servidor:

servidor

Basicamente, existem apenas três componentes principais. O primeiro precisa aceitar conexões de entrada. O segundo precisa lidar com solicitações recebidas e o terceiro precisa enviar uma resposta. A implementação de todos esses três componentes é apenas um pouco mais complicada do que a implementação do fluxo de cliques:

def mkServer(address: String, port: Int)(implicit system: ActorSystem, materializer: Materializer): Unit = {
  import system.dispatcher

  val connectionHandler: Sink[Tcp.IncomingConnection, Future[Unit]] =
    Sink.foreach[Tcp.IncomingConnection] { conn =>
      println(s"Incoming connection from: ${conn.remoteAddress}")
      conn.handleWith(serverLogic)
    }

  val incomingCnnections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] =
    Tcp().bind(address, port)

  val binding: Future[Tcp.ServerBinding] =
    incomingCnnections.to(connectionHandler).run()

  binding onComplete {
    case Success(b) =>
      println(s"Server started, listening on: ${b.localAddress}")
    case Failure(e) =>
      println(s"Server could not be bound to $address:$port: ${e.getMessage}")
  }
}

A função mkServerleva (além do endereço e da porta do servidor) também um sistema ator e um materializador como parâmetros implícitos. O fluxo de controle do servidor é representado por binding, que pega uma fonte de conexões de entrada e as encaminha para um coletor de conexões de entrada. Dentro de connectionHandler, que é nossa pia, lidamos com todas as conexões pelo fluxo serverLogic, que serão descritas mais adiante. bindingretorna umFuture, que termina quando o servidor foi iniciado ou o início falhou, o que pode acontecer quando a porta já é tomada por outro processo. O código, no entanto, não reflete completamente o gráfico, pois não podemos ver um bloco de construção que lida com respostas. A razão para isso é que a conexão já fornece essa lógica por si só. É um fluxo bidirecional e não apenas unidirecional como os fluxos que vimos nos exemplos anteriores. Como foi o caso da materialização, tais fluxos complexos não serão explicados aqui. A documentação oficial possui bastante material para cobrir gráficos de fluxo mais complexos. Por enquanto, basta saber que Tcp.IncomingConnectionrepresenta uma conexão que sabe como receber solicitações e como enviar respostas. A parte que ainda está faltando é aserverLogicbloco de construção. Pode ficar assim:

lógica do servidor

Mais uma vez, somos capazes de dividir a lógica em vários blocos de construção simples que, juntos, formam o fluxo do nosso programa. Primeiro, queremos dividir nossa sequência de bytes em linhas, o que devemos fazer sempre que encontrarmos um caractere de nova linha. Depois disso, os bytes de cada linha precisam ser convertidos em uma sequência, porque trabalhar com bytes brutos é complicado. No geral, poderíamos receber um fluxo binário de um protocolo complicado, o que tornaria o trabalho com os dados brutos recebidos extremamente desafiador. Depois de termos uma sequência legível, podemos criar uma resposta. Por razões de simplicidade, a resposta pode ser qualquer coisa no nosso caso. No final, temos que converter de volta nossa resposta em uma sequência de bytes que podem ser enviados por fio. O código para toda a lógica pode ser assim:

val serverLogic: Flow[ByteString, ByteString, Unit] = {
  val delimiter = Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true)

  val receiver = Flow[ByteString].map { bytes =>
    val message = bytes.utf8String
    println(s"Server received: $message")
    message
  }

  val responder = Flow[String].map { message =>
    val answer = s"Server hereby responds to message: $message\n"
    ByteString(answer)
  }

  Flow[ByteString]
    .via(delimiter)
    .via(receiver)
    .via(responder)
}

Já sabemos que serverLogicé um fluxo que leva ae ByteStringtem que produzir a ByteString. Com delimiterisso, podemos dividir um ByteStringem partes menores - no nosso caso, isso precisa acontecer sempre que um caractere de nova linha ocorre. receiveré o fluxo que pega todas as seqüências de bytes divididos e as converte em uma sequência. Obviamente, essa é uma conversão perigosa, pois apenas caracteres ASCII imprimíveis devem ser convertidos em uma string, mas, para as nossas necessidades, é bom o suficiente. responderé o último componente e é responsável por criar uma resposta e converter a resposta em uma sequência de bytes. Ao contrário do gráfico, não dividimos esse último componente em dois, pois a lógica é trivial. No final, conectamos todos os fluxos através doviafunção. Nesse ponto, pode-se perguntar se cuidamos da propriedade multiusuário mencionada no início. E de fato o fizemos, mesmo que isso não seja óbvio imediatamente. Ao olhar para este gráfico, deve ficar mais claro:

servidor e lógica do servidor combinados

O serverLogiccomponente nada mais é do que um fluxo que contém fluxos menores. Este componente recebe uma entrada, que é uma solicitação, e produz uma saída, que é a resposta. Como os fluxos podem ser construídos várias vezes e todos funcionam independentemente um do outro, conseguimos com isso aninhar nossa propriedade multiusuário. Cada solicitação é tratada dentro de sua própria solicitação e, portanto, uma solicitação de execução curta pode substituir uma solicitação de execução longa iniciada anteriormente. Caso você tenha se perguntado, a definição serverLogicdisso foi mostrada anteriormente, é claro, pode ser escrita muito mais curta, incorporando a maioria de suas definições internas:

val serverLogic = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(msg => s"Server hereby responds to message: $msg\n")
  .map(ByteString(_))

Um teste do servidor da web pode ser assim:

$ # Client
$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 6666
Server hereby responds to message: Hello World
Server hereby responds to message: How are you?

Para que o exemplo de código acima funcione corretamente, primeiro precisamos iniciar o servidor, que é representado pelo startServerscript:

$ # Server
$ ./startServer 127.0.0.1 6666
[DEBUG] Server started, listening on: /127.0.0.1:6666
[DEBUG] Incoming connection from: /127.0.0.1:37972
[DEBUG] Server received: Hello World
[DEBUG] Server received: How are you?

O exemplo de código completo deste servidor TCP simples pode ser encontrado aqui . Não somos apenas capazes de escrever um servidor com o Akka Streams, mas também o cliente. Pode ser assim:

val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .map(println)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_+"\n")
  .map(ByteString(_))

connection.join(flow).run()

O cliente TCP de código completo pode ser encontrado aqui . O código parece bastante semelhante, mas, ao contrário do servidor, não precisamos mais gerenciar as conexões de entrada.

Gráficos complexos

Nas seções anteriores, vimos como podemos construir programas simples a partir dos fluxos. No entanto, na realidade, muitas vezes não basta confiar apenas em funções já incorporadas para construir fluxos mais complexos. Se queremos poder usar o Akka Streams para programas arbitrários, precisamos saber como criar nossas próprias estruturas de controle personalizadas e fluxos combináveis ​​que nos permitem lidar com a complexidade de nossos aplicativos. A boa notícia é que o Akka Streams foi projetado para se adaptar às necessidades dos usuários e, para oferecer uma breve introdução às partes mais complexas do Akka Streams, adicionamos mais alguns recursos ao nosso exemplo de cliente / servidor.

Uma coisa que ainda não podemos fazer é fechar uma conexão. Nesse ponto, começa a ficar um pouco mais complicado, porque a API de fluxo que vimos até agora não nos permite interromper um fluxo em um ponto arbitrário. No entanto, existe a GraphStageabstração, que pode ser usada para criar estágios arbitrários de processamento gráfico com qualquer número de portas de entrada ou saída. Vamos primeiro dar uma olhada no lado do servidor, onde apresentamos um novo componente, chamado closeConnection:

val closeConnection = new GraphStage[FlowShape[String, String]] {
  val in = Inlet[String]("closeConnection.in")
  val out = Outlet[String]("closeConnection.out")

  override val shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush() = grab(in) match {
        case "q" ⇒
          push(out, "BYE")
          completeStage()
        case msg ⇒
          push(out, s"Server hereby responds to message: $msg\n")
      }
    })
    setHandler(out, new OutHandler {
      override def onPull() = pull(in)
    })
  }
}

Essa API parece muito mais complicada do que a API de fluxo. Não é de admirar, temos que fazer muitos passos imperativos aqui. Em troca, temos mais controle sobre o comportamento de nossos fluxos. No exemplo acima, especificamos apenas uma porta de entrada e uma saída e as disponibilizamos para o sistema substituindo o shapevalor. Além disso, definimos um chamado InHandlere a OutHandler, que são, nessa ordem, responsáveis ​​por receber e emitir elementos. Se você olhou atentamente para o exemplo completo do fluxo de cliques, já deve reconhecer esses componentes. No InHandleragarramos um elemento e, se for uma string com um único caractere 'q', queremos fechar o fluxo. Para dar ao cliente a chance de descobrir que o fluxo será fechado em breve, emitimos a string"BYE"e então fechamos o palco imediatamente depois. O closeConnectioncomponente pode ser combinado com um fluxo por meio do viamétodo, que foi introduzido na seção sobre fluxos.

Além de poder fechar conexões, também seria bom se pudéssemos mostrar uma mensagem de boas-vindas a uma conexão recém-criada. Para fazer isso, mais uma vez temos que ir um pouco mais longe:

def serverLogic
    (conn: Tcp.IncomingConnection)
    (implicit system: ActorSystem)
    : Flow[ByteString, ByteString, NotUsed]
    = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
  import GraphDSL.Implicits._
  val welcome = Source.single(ByteString(s"Welcome port ${conn.remoteAddress}!\n"))
  val logic = b.add(internalLogic)
  val concat = b.add(Concat[ByteString]())
  welcome ~> concat.in(0)
  logic.outlet ~> concat.in(1)

  FlowShape(logic.in, concat.out)
})

A função serverLogic agora aceita a conexão recebida como parâmetro. Dentro de seu corpo, usamos uma DSL que nos permite descrever um comportamento complexo do fluxo. Com welcomeisso, criamos um fluxo que pode emitir apenas um elemento - a mensagem de boas-vindas. logicé o que foi descrito serverLogicna seção anterior. A única diferença notável é que nós adicionamos closeConnectiona ele. Agora, na verdade, vem a parte interessante da DSL. A GraphDSL.createfunção disponibiliza um construtor b, que é usado para expressar o fluxo como um gráfico. Com a ~>função, é possível conectar portas de entrada e saída entre si. O Concatcomponente usado no exemplo pode concatenar elementos e aqui é usado para anexar a mensagem de boas-vindas na frente dos outros elementos que saem dointernalLogic. Na última linha, apenas disponibilizamos a porta de entrada da lógica do servidor e a porta de saída do fluxo concatenado, porque todas as outras portas permanecerão um detalhe de implementação do serverLogiccomponente. Para uma introdução detalhada ao DSL gráfico do Akka Streams, visite a seção correspondente na documentação oficial . O exemplo de código completo do servidor TCP complexo e de um cliente que pode se comunicar com ele pode ser encontrado aqui . Sempre que você abrir uma nova conexão a partir do cliente, deverá receber uma mensagem de boas-vindas e, digitando "q"-o, deverá receber uma mensagem informando que a conexão foi cancelada.

Ainda existem alguns tópicos que não foram abordados por esta resposta. Especialmente a materialização pode assustar um leitor ou outro, mas tenho certeza de que, com o material abordado aqui, todos devem poder dar os próximos passos sozinhos. Como já foi dito, a documentação oficial é um bom lugar para continuar aprendendo sobre o Akka Streams.

kiritsuku
fonte
4
@monksy Eu não planejava publicar isso em nenhum outro lugar. Sinta-se à vontade para republicar isso no seu blog, se quiser. Atualmente, a API é estável na maioria das partes, o que significa que você provavelmente nem precisa se preocupar com manutenção (a maioria dos artigos de blog sobre o Akka Streams por aí está desatualizada, pois mostra uma API que não existe mais).
Kiritsuku #
3
Não vai desaparecer. Por que deveria?
Kiritsuku #
2
@sschaef Pode muito bem desaparecer, pois a questão está fora de tópico e foi encerrada como tal.
DavidG
7
@ Magisch Lembre-se sempre: "Nós não excluímos um bom conteúdo". Não tenho muita certeza, mas acho que essa resposta pode se qualificar, apesar de tudo.
Deduplicator
9
Esta publicação pode ser boa para o novo recurso Documentação do Stack Overflow - uma vez que é aberto para o Scala.
SL Barth - Restabelece Monica