É possível ler de um InputStream com um tempo limite?

147

Especificamente, o problema é escrever um método como este:

int maybeRead(InputStream in, long timeout)

onde o valor de retorno é o mesmo que in.read () se os dados estiverem disponíveis dentro de 'timeout' milissegundos e -2 caso contrário. Antes do método retornar, todos os threads gerados devem sair.

Para evitar argumentos, o assunto aqui java.io.InputStream, conforme documentado pela Sun (qualquer versão do Java). Observe que isso não é tão simples quanto parece. Abaixo estão alguns fatos suportados diretamente pela documentação da Sun.

  1. O método in.read () pode ser ininterrupto.

  2. Quebrar o InputStream em um Reader ou InterruptibleChannel não ajuda, porque tudo o que essas classes podem fazer é chamar métodos do InputStream. Se fosse possível usar essas classes, seria possível escrever uma solução que apenas execute a mesma lógica diretamente no InputStream.

  3. É sempre aceitável que in.available () retorne 0.

  4. O método in.close () pode bloquear ou não fazer nada.

  5. Não existe uma maneira geral de eliminar outro segmento.

cinzento
fonte

Respostas:

83

Usando inputStream.available ()

É sempre aceitável que System.in.available () retorne 0.

Eu descobri o contrário - ele sempre retorna o melhor valor para o número de bytes disponíveis. Javadoc para InputStream.available():

Returns an estimate of the number of bytes that can be read (or skipped over) 
from this input stream without blocking by the next invocation of a method for 
this input stream.

Uma estimativa é inevitável devido ao tempo / estabilidade. A figura pode ser uma subestimação pontual porque novos dados estão chegando constantemente. No entanto, ele sempre "alcança" a próxima chamada - deve ser responsável por todos os dados recebidos, exceto pelos que chegam exatamente no momento da nova chamada. Retornar permanentemente 0 quando houver dados falha na condição acima.

Primeira advertência: as subclasses de concreto do InputStream são responsáveis ​​pela disponibilidade ()

InputStreamé uma classe abstrata. Não possui fonte de dados. Não faz sentido ter dados disponíveis. Portanto, o javadoc para available()também declara:

The available method for class InputStream always returns 0.

This method should be overridden by subclasses.

E, de fato, as classes concretas do fluxo de entrada substituem available (), fornecendo valores significativos, não 0s constantes.

Segunda Advertência: Certifique-se de usar retorno de carro ao digitar entrada no Windows.

Se estiver usando System.in, seu programa só recebe entrada quando seu shell de comando o entrega. Se você estiver usando redirecionamentos de arquivo / pipes (por exemplo, somefile> java myJavaApp ou algum comando | java myJavaApp), os dados de entrada geralmente são entregues imediatamente. No entanto, se você digitar manualmente a entrada, a transferência de dados poderá ser atrasada. Por exemplo, com o shell do cmd.exe do windows, os dados são armazenados em buffer no shell do cmd.exe. Os dados são passados ​​apenas para o programa java em execução após retorno de carro (control-m ou <enter>). Essa é uma limitação do ambiente de execução. Obviamente, InputStream.available () retornará 0 enquanto o shell armazenar em buffer os dados - esse é o comportamento correto; não há dados disponíveis nesse ponto. Assim que os dados estiverem disponíveis no shell, o método retornará um valor> 0. NB: Cygwin usa cmd.

Solução mais simples (sem bloqueio, portanto, não é necessário tempo limite)

Apenas use isto:

    byte[] inputData = new byte[1024];
    int result = is.read(inputData, 0, is.available());  
    // result will indicate number of bytes read; -1 for EOF with no data read.

Ou equivalente,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
    // ...
         // inside some iteration / processing logic:
         if (br.ready()) {
             int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
         }

Solução mais rica (preenche o buffer no máximo dentro do período de tempo limite)

Declare isso:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
     throws IOException  {
     int bufferOffset = 0;
     long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
     while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
         int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
         // can alternatively use bufferedReader, guarded by isReady():
         int readResult = is.read(b, bufferOffset, readLength);
         if (readResult == -1) break;
         bufferOffset += readResult;
     }
     return bufferOffset;
 }

Então use isto:

    byte[] inputData = new byte[1024];
    int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
    // readCount will indicate number of bytes read; -1 for EOF with no data read.
Glen Best
fonte
1
Se is.available() > 1024essa sugestão falhar. Certamente existem fluxos que retornam zero. SSLSockets, por exemplo, até recentemente. Você não pode confiar nisso.
Marquês de Lorne
O caso 'is.available ()> 1024' é tratado especificamente via readLength.
Glen Best,
Comentário sobre SSLSockets incorreto - retorna 0 para disponível se não houver dados no buffer. Conforme minha resposta. Javadoc: "Se não houver bytes em buffer no soquete, e o soquete não tiver sido fechado usando close, então disponível retornará 0."
Glen Best
@GlenBest Meu comentário re SSLSocket não está incorreto. Até recentemente [minha ênfase], costumava retornar zero o tempo todo. Você está falando sobre o presente. Estou falando de toda a história do JSSE, e trabalhei desde antes de ser incluído no Java 1.4 em 2002 .
Marquês de Lorne
Alterando as condições do loop while para "while (is.available ()> 0 && System.currentTimeMillis () <maxTimeMillis && bufferOffset <comprimento b) {", poupou-me uma tonelada de sobrecarga da CPU.
Logic1
65

Supondo que seu fluxo não seja suportado por um soquete (então você não pode usá-lo Socket.setSoTimeout()), acho que a maneira padrão de resolver esse tipo de problema é usar um futuro.

Suponha que eu tenha o seguinte executor e fluxos:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final PipedOutputStream outputStream = new PipedOutputStream();
    final PipedInputStream inputStream = new PipedInputStream(outputStream);

Eu tenho um escritor que grava alguns dados e aguarda 5 segundos antes de gravar o último dado e fechar o fluxo:

    Runnable writeTask = new Runnable() {
        @Override
        public void run() {
            try {
                outputStream.write(1);
                outputStream.write(2);
                Thread.sleep(5000);
                outputStream.write(3);
                outputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    executor.submit(writeTask);

A maneira normal de ler isso é a seguinte. A leitura bloqueará indefinidamente os dados e, portanto, isso será concluído em 5s:

    long start = currentTimeMillis();
    int readByte = 1;
    // Read data without timeout
    while (readByte >= 0) {
        readByte = inputStream.read();
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }
    System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

quais saídas:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

Se houvesse um problema mais fundamental, como o escritor não responder, o leitor iria bloquear para sempre. Se eu encerrar a leitura em um futuro, posso controlar o tempo limite da seguinte maneira:

    int readByte = 1;
    // Read data with timeout
    Callable<Integer> readTask = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return inputStream.read();
        }
    };
    while (readByte >= 0) {
        Future<Integer> future = executor.submit(readTask);
        readByte = future.get(1000, TimeUnit.MILLISECONDS);
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }

quais saídas:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

Eu posso pegar a TimeoutException e fazer a limpeza que eu quiser.

Ian Jones
fonte
14
Mas e o segmento de bloqueio ?! Ele permanecerá na memória até o aplicativo terminar? Se eu estiver correto, isso pode produzir threads intermináveis, o aplicativo é muito carregado e ainda mais, impede que outros threads usem seu pool, que possui os threads ocupados e bloqueados. Por favor me corrija se eu estiver errado. Obrigado.
Muhammad Gelbana 25/09/12
4
Muhammad Gelbana, você está certo: o thread de bloqueio read () permanece em execução e isso não está correto. No entanto, encontrei uma maneira de evitar isso: quando o tempo limite chegar, feche o thread de chamada do fluxo de entrada (no meu caso, fecho o soquete bluetooth do Android do qual o fluxo de entrada vem). Quando você faz isso, a chamada read () retornará imediatamente. Bem, no meu caso, eu uso a sobrecarga int read (byte []), e essa retorna imediatamente. Talvez a sobrecarga int read () gerasse uma IOException, pois não sei o que retornaria ... Na minha opinião, essa é a solução adequada.
Emmanuel Touzery
5
-1, pois a leitura dos threads permanece bloqueada até o término do aplicativo.
Ortwin Angermeier
11
@ortang Isso é o que eu quis dizer com "pegar a TimeoutException e fazer qualquer limpeza ..." Por exemplo, eu posso querer matar o thread de leitura: ... catch (TimeoutException e) {executor.shutdownNow (); }
Ian Jones
12
executer.shutdownNownão vai matar o fio. Ele tentará interrompê-lo, sem efeito. Não há limpeza possível e esse é um problema sério.
Marko Topolnik
22

Se o seu InputStream for apoiado por um Socket, você poderá definir um tempo limite de Socket (em milissegundos) usando setSoTimeout . Se a chamada read () não desbloquear dentro do tempo limite especificado, lançará uma SocketTimeoutException.

Apenas certifique-se de chamar setSoTimeout no soquete antes de fazer a chamada read ().

Templários
fonte
18

Eu questionaria a declaração do problema em vez de apenas aceitá-la cegamente. Você só precisa de tempos limite no console ou na rede. Se o último que você possui Socket.setSoTimeout()e os HttpURLConnection.setReadTimeout()dois fazem exatamente o que é necessário, desde que você os configure corretamente ao construí-los / adquiri-los. Deixando-o para um ponto arbitrário mais tarde no aplicativo, quando tudo o que você tem é o InputStream é um design ruim, levando a uma implementação muito complicada.

Marquês de Lorne
fonte
10
Há outras situações em que uma leitura pode bloquear por um tempo significativo; por exemplo, ao ler de uma unidade de fita, de uma unidade de rede montada remotamente ou de um HFS com um robô de fita no back-end. (Mas o principal impulso da sua resposta está correta.)
Stephen C
1
@StephenC +1 para seu comentário e exemplos. Para adicionar mais um exemplo, um caso simples pode ser onde as conexões de soquete foram feitas corretamente, mas a tentativa de leitura foi bloqueada, pois os dados foram buscados no DB, mas de alguma forma isso não aconteceu (digamos que o DB não estava respondendo e a consulta foi concluída) no estado Bloqueado). Nesse cenário, você precisa ter uma maneira de atingir explicitamente o tempo limite da operação de leitura no soquete.
sactiw
1
O objetivo da abstração InputStream é não pensar na implementação subjacente. É justo discutir sobre os prós e contras das respostas postadas. Mas, para questionar a declaração do problema, não vai ajudar o disussion
pellucide
2
O InputStream funciona em um fluxo e bloqueia, mas não fornece um mecanismo de tempo limite. Portanto, a abstração InputStream não é uma abstração adequadamente projetada. Portanto, pedir uma maneira de tempo limite em um fluxo não exige muito. Portanto, a pergunta está pedindo uma solução para um problema muito prático. A maioria das implementações subjacentes será bloqueada. Essa é a própria essência de um fluxo. Soquetes, arquivos, tubulações serão bloqueados se o outro lado do fluxo não estiver pronto com novos dados.
pellucide
2
@EJP. Eu não sei como você conseguiu isso. Eu não concordo com você. A declaração do problema "como atingir o tempo limite em um InputStream" é válida. Como a estrutura não fornece uma maneira de atingir o tempo limite, é apropriado fazer essa pergunta.
pellucide
7

Eu não usei as classes do pacote Java NIO, mas parece que elas podem ser de alguma ajuda aqui. Especificamente, java.nio.channels.Channels e java.nio.channels.InterruptibleChannel .

jt.
fonte
2
+1: Não acredito que exista uma maneira confiável de fazer o que o OP está pedindo apenas com o InputStream. No entanto, o nio foi criado para esse fim, entre outros.
Eddie
2
O OP já descartou isso basicamente. InputStreams estão inerentemente bloqueando e podem ser ininterruptos.
Marquês de Lorne
5

Aqui está uma maneira de obter um NIO FileChannel do System.in e verificar a disponibilidade de dados usando um tempo limite, que é um caso especial do problema descrito na pergunta. Execute-o no console, não digite nenhuma entrada e aguarde os resultados. Foi testado com sucesso no Java 6 no Windows e Linux.

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;

public class Main {

    static final ByteBuffer buf = ByteBuffer.allocate(4096);

    public static void main(String[] args) {

        long timeout = 1000 * 5;

        try {
            InputStream in = extract(System.in);
            if (! (in instanceof FileInputStream))
                throw new RuntimeException(
                        "Could not extract a FileInputStream from STDIN.");

            try {
                int ret = maybeAvailable((FileInputStream)in, timeout);
                System.out.println(
                        Integer.toString(ret) + " bytes were read.");

            } finally {
                in.close();
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    /* unravels all layers of FilterInputStream wrappers to get to the
     * core InputStream
     */
    public static InputStream extract(InputStream in)
            throws NoSuchFieldException, IllegalAccessException {

        Field f = FilterInputStream.class.getDeclaredField("in");
        f.setAccessible(true);

        while( in instanceof FilterInputStream )
            in = (InputStream)f.get((FilterInputStream)in);

        return in;
    }

    /* Returns the number of bytes which could be read from the stream,
     * timing out after the specified number of milliseconds.
     * Returns 0 on timeout (because no bytes could be read)
     * and -1 for end of stream.
     */
    public static int maybeAvailable(final FileInputStream in, long timeout)
            throws IOException, InterruptedException {

        final int[] dataReady = {0};
        final IOException[] maybeException = {null};
        final Thread reader = new Thread() {
            public void run() {                
                try {
                    dataReady[0] = in.getChannel().read(buf);
                } catch (ClosedByInterruptException e) {
                    System.err.println("Reader interrupted.");
                } catch (IOException e) {
                    maybeException[0] = e;
                }
            }
        };

        Thread interruptor = new Thread() {
            public void run() {
                reader.interrupt();
            }
        };

        reader.start();
        for(;;) {

            reader.join(timeout);
            if (!reader.isAlive())
                break;

            interruptor.start();
            interruptor.join(1000);
            reader.join(1000);
            if (!reader.isAlive())
                break;

            System.err.println("We're hung");
            System.exit(1);
        }

        if ( maybeException[0] != null )
            throw maybeException[0];

        return dataReady[0];
    }
}

Curiosamente, ao executar o programa no NetBeans 6.5 e não no console, o tempo limite não funciona e a chamada para System.exit () é realmente necessária para eliminar os threads zumbis. O que acontece é que o encadeamento do interruptor bloqueia (!) A chamada para reader.interrupt (). Outro programa de teste (não mostrado aqui) também tenta fechar o canal, mas isso também não funciona.


fonte
não funciona no Mac OS, nem com o JDK 1.6 nem com o JDK 1.7. A interrupção é reconhecida apenas após pressionar retorno durante a leitura.
Mostowski Collapse
4

Como disse jt, o NIO é a melhor (e correta) solução. Se você realmente está preso a um InputStream, você pode

  1. Gera um thread cujo trabalho exclusivo é ler o InputStream e colocar o resultado em um buffer que pode ser lido no seu thread original sem bloquear. Isso deve funcionar bem se você tiver apenas uma instância do fluxo. Caso contrário, você poderá eliminar o encadeamento usando os métodos descontinuados na classe Thread, embora isso possa causar vazamentos de recursos.

  2. Confie em isAvailable para indicar dados que podem ser lidos sem bloqueio. No entanto, em alguns casos (como em Sockets), pode levar uma leitura potencialmente bloqueadora para isAvailable para relatar algo diferente de 0.


fonte
5
Socket.setSoTimeout()é uma solução igualmente correta e muito mais simples. Or HttpURLConnection.setReadTimeout().
Marquês de Lorne
3
@EJP - estes são apenas "igualmente corretos" em determinadas circunstâncias; por exemplo, se o fluxo de entrada for um fluxo de soquete / fluxo de conexão HTTP.
Stephen C
1
@Stephen C NIO é apenas não bloqueante e selecionável nas mesmas circunstâncias. Não há E / S de arquivo sem bloqueio, por exemplo.
Marquês de Lorne
2
@EJP mas há tubulação sem bloqueio IO (System.in), non-blocking I / O para arquivos (no disco local) é um absurdo
woky
1
@EJP Na maioria (todos?), O Unices System.in é na verdade um pipe (se você não disse ao shell para substituí-lo por um arquivo) e, como um pipe, pode ser não-bloqueante.
Woky
0

Inspirado nesta resposta , criei uma solução um pouco mais orientada a objetos.

Isso é válido apenas se você pretende ler caracteres

Você pode substituir o BufferedReader e implementar algo como isto:

public class SafeBufferedReader extends BufferedReader{

    private long millisTimeout;

    ( . . . )

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(cbuf, off, len);
    }

    protected void waitReady() throws IllegalThreadStateException, IOException {
        if(ready()) return;
        long timeout = System.currentTimeMillis() + millisTimeout;
        while(System.currentTimeMillis() < timeout) {
            if(ready()) return;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                break; // Should restore flag
            }
        }
        if(ready()) return; // Just in case.
        throw new IllegalThreadStateException("Read timed out");
    }
}

Aqui está um exemplo quase completo.

Estou retornando 0 em alguns métodos, você deve alterá-lo para -2 para atender às suas necessidades, mas acho que 0 é mais adequado ao contrato BufferedReader. Nada de errado aconteceu, basta ler 0 caracteres. O método readLine é um assassino de desempenho horrível. Você deve criar um BufferedReader totalmente novo se realmente quiser usar o readLin e. No momento, não é seguro para threads. Se alguém chamar uma operação enquanto o readLines estiver aguardando uma linha, ele produzirá resultados inesperados

Não gosto de retornar -2 onde estou. Eu lançaria uma exceção, porque algumas pessoas podem apenas verificar se int <0 deve considerar o EOS. De qualquer forma, esses métodos afirmam que "não pode bloquear", você deve verificar se essa afirmação é realmente verdadeira e simplesmente não a substitui.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.CharBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

/**
 * 
 * readLine
 * 
 * @author Dario
 *
 */
public class SafeBufferedReader extends BufferedReader{

    private long millisTimeout;

    private long millisInterval = 100;

    private int lookAheadLine;

    public SafeBufferedReader(Reader in, int sz, long millisTimeout) {
        super(in, sz);
        this.millisTimeout = millisTimeout;
    }

    public SafeBufferedReader(Reader in, long millisTimeout) {
        super(in);
        this.millisTimeout = millisTimeout;
    }



    /**
     * This is probably going to kill readLine performance. You should study BufferedReader and completly override the method.
     * 
     * It should mark the position, then perform its normal operation in a nonblocking way, and if it reaches the timeout then reset position and throw IllegalThreadStateException
     * 
     */
    @Override
    public String readLine() throws IOException {
        try {
            waitReadyLine();
        } catch(IllegalThreadStateException e) {
            //return null; //Null usually means EOS here, so we can't.
            throw e;
        }
        return super.readLine();
    }

    @Override
    public int read() throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return -2; // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
        }
        return super.read();
    }

    @Override
    public int read(char[] cbuf) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return -2;  // I'd throw a runtime here, as some people may just be checking if int < 0 to consider EOS
        }
        return super.read(cbuf);
    }

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(cbuf, off, len);
    }

    @Override
    public int read(CharBuffer target) throws IOException {
        try {
            waitReady();
        } catch(IllegalThreadStateException e) {
            return 0;
        }
        return super.read(target);
    }

    @Override
    public void mark(int readAheadLimit) throws IOException {
        super.mark(readAheadLimit);
    }

    @Override
    public Stream<String> lines() {
        return super.lines();
    }

    @Override
    public void reset() throws IOException {
        super.reset();
    }

    @Override
    public long skip(long n) throws IOException {
        return super.skip(n);
    }

    public long getMillisTimeout() {
        return millisTimeout;
    }

    public void setMillisTimeout(long millisTimeout) {
        this.millisTimeout = millisTimeout;
    }

    public void setTimeout(long timeout, TimeUnit unit) {
        this.millisTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
    }

    public long getMillisInterval() {
        return millisInterval;
    }

    public void setMillisInterval(long millisInterval) {
        this.millisInterval = millisInterval;
    }

    public void setInterval(long time, TimeUnit unit) {
        this.millisInterval = TimeUnit.MILLISECONDS.convert(time, unit);
    }

    /**
     * This is actually forcing us to read the buffer twice in order to determine a line is actually ready.
     * 
     * @throws IllegalThreadStateException
     * @throws IOException
     */
    protected void waitReadyLine() throws IllegalThreadStateException, IOException {
        long timeout = System.currentTimeMillis() + millisTimeout;
        waitReady();

        super.mark(lookAheadLine);
        try {
            while(System.currentTimeMillis() < timeout) {
                while(ready()) {
                    int charInt = super.read();
                    if(charInt==-1) return; // EOS reached
                    char character = (char) charInt;
                    if(character == '\n' || character == '\r' ) return;
                }
                try {
                    Thread.sleep(millisInterval);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // Restore flag
                    break;
                }
            }
        } finally {
            super.reset();
        }
        throw new IllegalThreadStateException("readLine timed out");

    }

    protected void waitReady() throws IllegalThreadStateException, IOException {
        if(ready()) return;
        long timeout = System.currentTimeMillis() + millisTimeout;
        while(System.currentTimeMillis() < timeout) {
            if(ready()) return;
            try {
                Thread.sleep(millisInterval);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // Restore flag
                break;
            }
        }
        if(ready()) return; // Just in case.
        throw new IllegalThreadStateException("read timed out");
    }

}
DGoiko
fonte