Preciso ligar para um serviço upstream (Serviço de Blob do Azure) para enviar dados para um OutputStream, que então eu preciso dar a volta e enviá-los de volta ao cliente, através de akka. Sem akka (e apenas o código do servlet), eu apenas pegava o ServletOutputStream e o passava para o método do serviço azure.
O mais próximo que eu posso tentar tropeçar, e claramente isso está errado, é algo como isto
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
A idéia é que eu estou chamando um serviço upstream para obter um fluxo de saída preenchido chamando blobClient.download (os);
Parece que a função lambda é chamada e retornada, mas depois falha, porque não há dados ou algo assim. Como se eu não devesse ter essa função lambda fazendo o trabalho, mas talvez retorne algum objeto que faça o trabalho? Não tenho certeza.
Como se faz isso?
fonte
download
? Ele transmite dadosos
e retorna apenas quando os dados são gravados?Respostas:
O problema real aqui é que a API do Azure não foi projetada para contrapressão. Não há como o fluxo de saída sinalizar ao Azure que ele não está pronto para mais dados. Em outras palavras: se o Azure envia dados mais rapidamente do que você é capaz de consumi-los, haverá alguma falha feia de estouro de buffer em algum lugar.
Aceitando esse fato, a próxima melhor coisa que podemos fazer é:
Source.lazySource
para iniciar o download de dados apenas quando houver demanda a jusante (ou seja, a fonte está sendo executada e os dados estão sendo solicitados).download
chamada em algum outro encadeamento para continuar executando sem impedir que a fonte seja retornada. Uma maneira de fazer isso é com umFuture
(não sei quais são as melhores práticas de Java, mas deve funcionar bem de qualquer maneira). Embora isso não importe inicialmente, você pode precisar escolher um contexto de execução diferente desystem.dispatcher
- tudo depende sedownload
está bloqueando ou não.Peço desculpas antecipadamente se esse código Java estiver malformado - eu uso o Akka com Scala, portanto, tudo isso é olhar para a API da Akka Java e a referência de sintaxe Java.
fonte
A
OutputStream
neste caso é o do "valor materializado"Source
e só será criado uma vez que o fluxo é executado (ou "materializado" em um fluxo de execução). A execução está fora de seu controle, uma vez que você entrega oSource
Akka HTTP e que mais tarde executará sua fonte..mapMaterializedValue(matval -> ...)
geralmente é usado para transformar o valor materializado, mas, como é invocado como parte da materialização, você pode usá-lo para fazer efeitos colaterais, como enviar o matval em uma mensagem, exatamente como você descobriu, não há necessariamente nada de errado em mesmo que pareça descolada. É importante entender que o fluxo não concluirá sua materialização e ficará em execução até que o lambda seja concluído. Isso significa problemas sedownload()
estiver bloqueando em vez de executar algum trabalho em um encadeamento diferente e retornar imediatamente.Existe, no entanto, outra solução:
Source.preMaterialize()
ela materializa a fonte e fornece umPair
valor materializado e uma novaSource
que pode ser usado para consumir a fonte já iniciada:Observe que há algumas coisas adicionais em que pensar no seu código, o mais importante se a
blobClient.download(os)
chamada bloquear até que seja concluída e você chamar isso do ator; nesse caso, você deve garantir que seu ator não passe fome pelo despachante e pare outros atores em sua aplicação sejam executados (consulte os documentos da Akka: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).fonte