Digamos que eu solicite um arquivo json grande que contenha uma lista de muitos objetos. Não quero que eles estejam na memória de uma só vez, mas prefiro lê-los e processá-los um por um. Então, eu preciso transformar um System.IO.Stream
fluxo assíncrono em um IAsyncEnumerable<T>
. Como uso a nova System.Text.Json
API para fazer isso?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
c#
.net-core
.net-core-3.0
c#-8.0
system.text.json
Rick de Water
fonte
fonte
Utf8JsonReader
, por favor dê uma olhada em algumas github amostras e no existente rosca bemGetAsync
por si só retorna quando toda a resposta é recebida. Você precisa usarSendAsync
com `HttpCompletionOption.ResponseContentRead`. Depois de ter isso, você pode usar o JsonTextReader do JSON.NET . UsarSystem.Text.Json
para isso não é tão fácil quanto esse problema mostra . A funcionalidade não está disponível e implementá-lo em um baixo alocação utilizando estruturas não é trivialRespostas:
Sim, um serializador JSON (de) verdadeiramente de streaming seria uma boa melhoria de desempenho, em muitos lugares.
Infelizmente,
System.Text.Json
não faz isso no momento. Não tenho certeza se será no futuro - espero que sim! A desserialização do JSON por streaming é realmente um desafio.Você pode verificar se o Utf8Json extremamente rápido suporta, talvez.
No entanto, pode haver uma solução personalizada para sua situação específica, pois seus requisitos parecem restringir a dificuldade.
A idéia é ler manualmente um item da matriz por vez. Estamos fazendo uso do fato de que cada item da lista é, por si só, um objeto JSON válido.
Você pode pular manualmente o
[
(para o primeiro item) ou o,
(para cada próximo item). Acho que sua melhor aposta é usar o .NET CoreUtf8JsonReader
para determinar onde o objeto atual termina e alimentar os bytes digitalizadosJsonDeserializer
.Dessa forma, você está armazenando um pouco de buffer de um objeto por vez.
E já que estamos falando de desempenho, você pode obter a entrada de a
PipeReader
enquanto está nisso. :-)fonte
TL; DR Não é trivial
Parece que alguém já postou o código completo de uma
Utf8JsonStreamReader
estrutura que lê buffers de um fluxo e os alimenta para um Utf8JsonRreader, permitindo fácil desserialização comJsonSerializer.Deserialize<T>(ref newJsonReader, options);
. O código também não é trivial. A pergunta relacionada está aqui e a resposta está aqui .Porém, isso não é suficiente -
HttpClient.GetAsync
retornará somente depois que toda a resposta for recebida, armazenando essencialmente tudo na memória.Para evitar isso, HttpClient.GetAsync (string, HttpCompletionOption) deve ser usado com
HttpCompletionOption.ResponseHeadersRead
.O loop de desserialização também deve verificar o token de cancelamento e sair ou lançar se estiver sinalizado. Caso contrário, o loop continuará até que todo o fluxo seja recebido e processado.
Esse código é baseado no exemplo da resposta relacionada e usa
HttpCompletionOption.ResponseHeadersRead
e verifica o token de cancelamento. Ele pode analisar cadeias JSON que contêm uma matriz adequada de itens, por exemplo:A primeira chamada para
jsonStreamReader.Read()
se move para o início da matriz, enquanto a segunda se move para o início do primeiro objeto. O próprio loop termina quando o final da matriz (]
) é detectado.Fragmentos JSON, AKA streaming de JSON aka ... *
É bastante comum nos cenários de streaming ou log de eventos anexar objetos JSON individuais a um arquivo, um elemento por linha, por exemplo:
Este não é um documento JSON válido , mas os fragmentos individuais são válidos. Isso tem várias vantagens para cenários de big data / altamente simultâneos. A adição de um novo evento requer apenas o acréscimo de uma nova linha ao arquivo, não a análise e a reconstrução do arquivo inteiro. O processamento , especialmente o processamento paralelo , é mais fácil por dois motivos:
Usando um StreamReader
A maneira de alocar-y para fazer isso seria usar um TextReader, ler uma linha de cada vez e analisá-la com JsonSerializer.Deserialize :
Isso é muito mais simples que o código que desserializa uma matriz adequada. Existem dois problemas:
ReadLineAsync
não aceita um token de cancelamentoIsso pode ser suficiente, pois a tentativa de produzir os
ReadOnlySpan<Byte>
buffers necessários ao JsonSerializer.Deserialize não é trivial.Pipelines e SequenceReader
Para evitar alocações, precisamos obter a
ReadOnlySpan<byte>
partir do fluxo. Isso requer o uso de pipes System.IO.Pipeline e a estrutura SequenceReader . Uma Introdução ao SequenceReader, de Steve Gordon, explica como essa classe pode ser usada para ler dados de um fluxo usando delimitadores.Infelizmente,
SequenceReader
é uma ref struct, o que significa que não pode ser usado em métodos assíncronos ou locais. É por isso que Steve Gordon em seu artigo cria umO método para ler itens forma um ReadOnlySequence e retorna a posição final, para que o PipeReader possa retomar a partir dele. Infelizmente , queremos retornar um IEnumerable ou IAsyncEnumerable, e os métodos do iterador também não gostam
in
nem dosout
parâmetros.Poderíamos coletar os itens desserializados em uma Lista ou Fila e devolvê-los como um único resultado, mas isso ainda alocaria listas, buffers ou nós e teríamos que esperar que todos os itens em um buffer fossem desserializados antes de retornar:
Precisamos de algo que atue como um enumerável sem a necessidade de um método iterador, que funcione com assíncrono e não armazene em buffer tudo.
Adicionando canais para produzir um IAsyncEnumerable
ChannelReader.ReadAllAsync retorna um IAsyncEnumerable. Podemos retornar um ChannelReader de métodos que não poderiam funcionar como iteradores e ainda produzir um fluxo de elementos sem armazenar em cache.
Adaptando o código de Steve Gordon para usar canais, obtemos os ReadItems (ChannelWriter ...) e
ReadLastItem
métodos. O primeiro, lê um item de cada vez, usando uma nova linhaReadOnlySpan<byte> itemBytes
. Isso pode ser usado porJsonSerializer.Deserialize
. SeReadItems
não conseguir encontrar o delimitador, ele retornará sua posição para que o PipelineReader possa extrair o próximo pedaço do fluxo.Quando atingimos o último pedaço e não há outro delimitador, o ReadLastItem` lê os bytes restantes e os desserializa.
O código é quase idêntico ao de Steve Gordon. Em vez de escrever no console, escrevemos no ChannelWriter.
O
DeserializeToChannel<T>
método cria um leitor de pipeline na parte superior do fluxo, cria um canal e inicia uma tarefa de trabalho que analisa os pedaços e os envia ao canal:ChannelReader.ReceiveAllAsync()
pode ser usado para consumir todos os itens através de umIAsyncEnumerable<T>
:fonte
Parece que você precisa implantar seu próprio leitor de fluxo. Você precisa ler os bytes um por um e parar assim que a definição do objeto for concluída. Na verdade, é bastante de baixo nível. Como tal, você NÃO carregará o arquivo inteiro na RAM, mas participará da parte com a qual está lidando. Parece ser uma resposta?
fonte
Talvez você possa usar o
Newtonsoft.Json
serializador? https://www.newtonsoft.com/json/help/html/Performance.htmVeja especialmente a seção:
Editar
Você pode tentar desserializar valores do JsonTextReader, por exemplo
fonte
I don't want them to be in memory all at once, but I would rather read and process them one by one.
A classe relevante no JSON.NET é JsonTextReader.