Ler fluxo duas vezes

127

Como você lê o mesmo fluxo de entrada duas vezes? É possível copiá-lo de alguma forma?

Preciso obter uma imagem da Web, salvá-la localmente e retornar a imagem salva. Eu apenas pensei que seria mais rápido usar o mesmo fluxo em vez de iniciar um novo fluxo no conteúdo baixado e depois lê-lo novamente.

Warpzit
fonte
1
Talvez use mark e reset
Vyacheslav Shylkin

Respostas:

113

Você pode usar org.apache.commons.io.IOUtils.copypara copiar o conteúdo do InputStream para uma matriz de bytes e, em seguida, ler repetidamente da matriz de bytes usando um ByteArrayInputStream. Por exemplo:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();

// either
while (needToReadAgain) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    yourReadMethodHere(bais);
}

// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
    bais.reset();
    yourReadMethodHere(bais);
}
Paul Grime
fonte
1
Eu acho que essa é a única solução válida, pois mark não é suportado para todos os tipos.
Warpzit
3
@ Paul Grime: IOUtils.toByeArray internamente também chama o método de cópia de dentro.
Ankit
4
Como o @Ankit diz, esta solução não é válida para mim, pois a entrada é lida internamente e não pode ser reutilizada.
Xtreme Biker
30
Sei que esse comentário está sem tempo, mas, aqui na primeira opção, se você ler o fluxo de entrada como uma matriz de bytes, isso não significa que você está carregando todos os dados na memória? o que pode ser um grande problema se você estiver carregando algo como arquivos grandes?
jaxkodex
2
Pode-se usar IOUtils.toByteArray (InputStream) para obter uma matriz de bytes em uma chamada.
útil
30

Dependendo da origem do InputStream, talvez não seja possível redefini-lo. Você pode verificar se mark()e reset()é suportado usando markSupported().

Se for, você pode chamar reset()o InputStream para retornar ao início. Caso contrário, você precisará ler o InputStream da fonte novamente.

Kevin Parker
fonte
1
O InputStream não suporta 'mark' - você pode chamar a marca em um IS, mas não faz nada. Da mesma forma, chamar a redefinição em um IS gerará uma exceção.
ayahuasca
4
@ayahuasca InputStreamsubsclasses como BufferedInputStreamsuporta 'marca'
Dmitry Bogdanovich
10

se o seu InputStreamsuporte usando mark, então você pode mark()seu inputStream e então reset(). se a sua InputStremmarca não suportar, você poderá usar a classe java.io.BufferedInputStream, para incorporar seu stream a algo BufferedInputStreamcomo este

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
    bufferdInputStream.mark(some_value);
    //read your bufferdInputStream 
    bufferdInputStream.reset();
    //read it again
wannas
fonte
1
Um fluxo de entrada em buffer pode apenas retornar ao tamanho do buffer; portanto, se a fonte não couber, não será possível voltar ao início.
Blanc L.
@ L.Blanc desculpe, mas isso não parece correto. Dê uma olhada BufferedInputStream.fill(), há a seção "buffer de expansão", onde o novo tamanho do buffer é comparado apenas a marklimite MAX_BUFFER_SIZE.
eugene82
8

Você pode agrupar o fluxo de entrada com PushbackInputStream. PushbackInputStream permite não ler (" escrever de volta ") bytes que já foram lidos, para que você possa fazer assim:

public class StreamTest {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's wrap it with PushBackInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    ((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3


  }

  private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
    System.out.print("Reading stream: ");

    byte[] buf = new byte[howManyBytes];

    int next = 0;
    for (int i = 0; i < howManyBytes; i++) {
      next = is.read();
      if (next > 0) {
        buf[i] = (byte) next;
      }
    }
    return buf;
  }

  private static void printBytes(byte[] buffer) throws IOException {
    System.out.print("Reading stream: ");

    for (int i = 0; i < buffer.length; i++) {
      System.out.print(buffer[i] + " ");
    }
    System.out.println();
  }


}

Observe que PushbackInputStream armazena buffer interno de bytes para que ele realmente crie um buffer na memória que retenha os bytes "gravados de volta".

Conhecendo essa abordagem, podemos ir além e combiná-la com o FilterInputStream. FilterInputStream armazena o fluxo de entrada original como um representante. Isso permite criar uma nova definição de classe que permite " não ler " dados originais automaticamente. A definição desta classe é a seguinte:

public class TryReadInputStream extends FilterInputStream {
  private final int maxPushbackBufferSize;

  /**
  * Creates a <code>FilterInputStream</code>
  * by assigning the  argument <code>in</code>
  * to the field <code>this.in</code> so as
  * to remember it for later use.
  *
  * @param in the underlying input stream, or <code>null</code> if
  *           this instance is to be created without an underlying stream.
  */
  public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
    super(new PushbackInputStream(in, maxPushbackBufferSize));
    this.maxPushbackBufferSize = maxPushbackBufferSize;
  }

  /**
   * Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
   * in the stream
   *
   * @param buffer the destination buffer to which read the data
   * @param offset  the start offset in the destination <code>buffer</code>
   * @aram length how many bytes to read from the stream to buff. Length needs to be less than
   *        <code>maxPushbackBufferSize</code> or IOException will be thrown
   *
   * @return number of bytes read
   * @throws java.io.IOException in case length is
   */
  public int tryRead(byte[] buffer, int offset, int length) throws IOException {
    validateMaxLength(length);

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int bytesRead = 0;

    int nextByte = 0;

    for (int i = 0; (i < length) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        buffer[offset + bytesRead++] = (byte) nextByte;
      }
    }

    if (bytesRead > 0) {
      ((PushbackInputStream) in).unread(buffer, offset, bytesRead);
    }

    return bytesRead;

  }

  public byte[] tryRead(int maxBytesToRead) throws IOException {
    validateMaxLength(maxBytesToRead);

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int nextByte = 0;

    for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        baos.write((byte) nextByte);
      }
    }

    byte[] buffer = baos.toByteArray();

    if (buffer.length > 0) {
      ((PushbackInputStream) in).unread(buffer, 0, buffer.length);
    }

    return buffer;

  }

  private void validateMaxLength(int length) throws IOException {
    if (length > maxPushbackBufferSize) {
      throw new IOException(
        "Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
        length);
    }
  }

}

Esta classe tem dois métodos. Um para ler no buffer existente (a definição é análoga à chamadapublic int read(byte b[], int off, int len) da classe InputStream). Segundo, que retorna um novo buffer (isso pode ser mais eficaz se o tamanho do buffer para leitura for desconhecido).

Agora vamos ver nossa classe em ação:

public class StreamTest2 {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's use our TryReadInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new TryReadInputStream(originalStream, 10);

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
    printBytes(readBytes); // prints 1 2 3

    // we can also call normal read which will actually read the bytes without "writing them back"
    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 4 5 6

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
    printBytes(readBytes); // prints 7 8 9

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 7 8 9


  }



}
walkeros
fonte
5

Se você estiver usando uma implementação de InputStream, poderá verificar o resultado e InputStream#markSupported()informar se pode ou não usar o método mark()/reset() .

Se você puder marcar o fluxo ao ler, ligue reset()para voltar para começar.

Se não conseguir, será necessário abrir um fluxo novamente.

Outra solução seria converter InputStream em matriz de bytes e, em seguida, iterar sobre a matriz quantas vezes você precisar. Você pode encontrar várias soluções neste post Converter InputStream em matriz de bytes em Java usando bibliotecas de terceiros ou não. Cuidado, se o conteúdo lido for muito grande, você poderá ter alguns problemas de memória.

Por fim, se você precisar ler uma imagem, use:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

O uso ImageIO#read(java.net.URL)também permite usar o cache.

alain.janinm
fonte
1
uma palavra de aviso ao usar ImageIO#read(java.net.URL): alguns servidores da Web e CDNs podem rejeitar chamadas simples (ou seja, sem um Agente do Usuário que faça o servidor acreditar que a chamada é proveniente de um navegador da Web) feita por ImageIO#read. Nesse caso, usando a URLConnection.openConnection()configuração do agente do usuário para essa conexão + usando `ImageIO.read (InputStream), na maioria das vezes, fará o truque.
Clint Eastwood
InputStreamnão é uma interface
Brice
3

E se:

if (stream.markSupported() == false) {

        // lets replace the stream object
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        IOUtils.copy(stream, baos);
        stream.close();
        stream = new ByteArrayInputStream(baos.toByteArray());
        // now the stream should support 'mark' and 'reset'

    }
Anshuman Chatterjee
fonte
5
Essa é uma péssima ideia. Você coloca todo o conteúdo do fluxo na memória assim.
Niels Doucet
3

Para dividir um InputStreamem dois, evitando carregar todos os dados na memória e processá-los independentemente:

  1. Crie algumas OutputStream, precisamente:PipedOutputStream
  2. Conecte cada PipedOutputStream a um PipedInputStream, esses PipedInputStreamsão os retornados InputStream.
  3. Conecte o InputStream de origem com o recém-criado OutputStream. Então, tudo o que é lido no sourcing InputStreamseria escrito em ambos OutputStream. Não é necessário implementar isso, porque isso já é feito em TeeInputStream(commons.io).
  4. Em um encadeamento separado, leia todo o inputStream da fonte e implicitamente os dados de entrada são transferidos para o inputStreams de destino.

    public static final List<InputStream> splitInputStream(InputStream input) 
        throws IOException 
    { 
        Objects.requireNonNull(input);      
    
        PipedOutputStream pipedOut01 = new PipedOutputStream();
        PipedOutputStream pipedOut02 = new PipedOutputStream();
    
        List<InputStream> inputStreamList = new ArrayList<>();
        inputStreamList.add(new PipedInputStream(pipedOut01));
        inputStreamList.add(new PipedInputStream(pipedOut02));
    
        TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
    
        TeeInputStream tin = new TeeInputStream(input, tout, true);
    
        Executors.newSingleThreadExecutor().submit(tin::readAllBytes);  
    
        return Collections.unmodifiableList(inputStreamList);
    }

Lembre-se de fechar o inputStreams depois de consumido e feche o encadeamento que é executado: TeeInputStream.readAllBytes()

No caso, você precisa dividi-lo em váriosInputStream , em vez de apenas dois. Substitua no fragmento anterior de código a classe TeeOutputStreampara sua própria implementação, que encapsularia a List<OutputStream>e substituiria a OutputStreaminterface:

public final class TeeListOutputStream extends OutputStream {
    private final List<? extends OutputStream> branchList;

    public TeeListOutputStream(final List<? extends OutputStream> branchList) {
        Objects.requireNonNull(branchList);
        this.branchList = branchList;
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        for (OutputStream branch : branchList) {
            branch.write(b);
        }
    }

    @Override
    public void flush() throws IOException {
        for (OutputStream branch : branchList) {
            branch.flush();
        }
    }

    @Override
    public void close() throws IOException {
        for (OutputStream branch : branchList) {
            branch.close();
        }
    }
}
Zeugor
fonte
Por favor, você poderia explicar um pouco mais o passo 4? Por que temos que acionar a leitura manualmente? Por que a leitura de qualquer pipedInputStream NÃO aciona a leitura da fonte inputStream? E por que chamamos isso assincronamente?
Дмитрий Кулешов
2

Converta o fluxo de entrada em bytes e passe-o para a função savefile onde você monta o mesmo no fluxo de entrada. Também na função original, use bytes para usar em outras tarefas

Maneesh
fonte
5
Digo uma má idéia neste caso, a matriz resultante pode ser enorme e irá roubar o dispositivo de memória.
9119 Kevin Parker
0

Caso alguém esteja executando um aplicativo Spring Boot, e você queira ler o corpo da resposta de um RestTemplate (e é por isso que desejo ler um fluxo duas vezes), existe uma maneira mais limpa de fazer isso.

Primeiro de tudo, você precisa usar o Spring StreamUtilspara copiar o fluxo para uma String:

String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))

Mas isso não é tudo. Você também precisa usar uma fábrica de solicitações que possa armazenar em buffer o fluxo para você, assim:

ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
RestTemplate restTemplate = new RestTemplate(factory);

Ou, se você estiver usando o bean de fábrica, então (este é o Kotlin, no entanto):

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
  .requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
  .additionalInterceptors(loggingInterceptor)
  .build()

Fonte: https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-westout-destroying-the-body/

milosmns
fonte
0

Se você estiver usando o RestTemplate para fazer chamadas http, basta adicionar um interceptador. O corpo da resposta é armazenado em cache pela implementação do ClientHttpResponse. Agora o fluxo de entrada pode ser recuperado da reposição quantas vezes for necessário

ClientHttpRequestInterceptor interceptor =  new ClientHttpRequestInterceptor() {

            @Override
            public ClientHttpResponse intercept(HttpRequest request, byte[] body,
                    ClientHttpRequestExecution execution) throws IOException {
                ClientHttpResponse  response = execution.execute(request, body);

                  // additional work before returning response
                  return response 
            }
        };

    // Add the interceptor to RestTemplate Instance 

         restTemplate.getInterceptors().add(interceptor); 
Noman Khan
fonte