Manipulando Exceções de Tarefas Java ExecutorService

213

Estou tentando usar a ThreadPoolExecutorclasse Java para executar um grande número de tarefas pesadas com um número fixo de threads. Cada uma das tarefas possui muitos locais nos quais pode falhar devido a exceções.

Subclassifiquei ThreadPoolExecutore substituí o afterExecutemétodo que deve fornecer quaisquer exceções não detectadas encontradas durante a execução de uma tarefa. No entanto, não consigo fazê-lo funcionar.

Por exemplo:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

A saída deste programa é "Está tudo bem - situação normal!" mesmo que o único Runnable enviado ao conjunto de encadeamentos emita uma exceção. Alguma pista do que está acontecendo aqui?

Obrigado!

Tom
fonte
você nunca consultou o futuro da tarefa, o que aconteceu lá. O executor de serviço ou o programa inteiro não será travado. A exceção é capturada e é agrupada em ExecutionException. E ele voltará a mostrar se você chamar future.get (). PS: O future.isDone () [Leia o nome real da API] retornará verdadeiro, mesmo quando o executável terminar incorretamente. Porque a tarefa é feita de verdade.
Jai Pandit

Respostas:

156

Dos documentos :

Nota: Quando as ações são incluídas em tarefas (como FutureTask) explicitamente ou por meio de métodos como envio, esses objetos de tarefa capturam e mantêm exceções computacionais e, portanto, não causam encerramento abrupto, e as exceções internas não são passadas para este método .

Quando você envia um Runnable, ele será envolvido em um futuro.

Seu afterExecute deve ser algo como isto:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
n
fonte
7
Obrigado, acabei usando esta solução. Além disso, caso alguém esteja interessado: outros sugeriram não subclassificar o ExecutorService, mas fiz assim mesmo porque queria monitorar as tarefas à medida que elas são concluídas, em vez de esperar que todas terminem e depois chamar get () em todos os futuros retornados. .
Tom
1
Outra abordagem para subclasse o executor é a subclasse FutureTask e substituir seu método de 'feito'
nºs
1
Tom >> Você pode por favor postar o seu trecho de código de exemplo onde uma subclasse ExecutorService para monitorar tarefas enquanto completam ...
jagamot
1
Esta resposta não funcionará se você estiver usando ComplableFuture.runAsync, pois o afterExecute conterá um objeto que é pacote privado e não há maneira de acessar a lança. Eu consegui contornar a ligação. Veja minha resposta abaixo.
mmm
2
Temos que verificar se o futuro está completo usando future.isDone()? Como afterExecuteé executado após a Runnableconclusão, presumo future.isDone()sempre retornos true.
Searene
248

AVISO : Observe que esta solução bloqueará o segmento de chamada.


Se você deseja processar exceções geradas pela tarefa, geralmente é melhor usá-lo Callabledo que Runnable.

Callable.call() tem permissão para lançar exceções verificadas e elas são propagadas de volta para o segmento de chamada:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Se Callable.call()lança uma exceção, ela será envolta em ExecutionExceptione lançada por Future.get().

É provável que seja muito preferível à subclasse ThreadPoolExecutor. Também oferece a oportunidade de reenviar a tarefa se a exceção for recuperável.

skaffman
fonte
5
> Callable.call () tem permissão para lançar exceções verificadas e elas são propagadas de volta para o segmento de chamada: Observe que a exceção lançada será propagada para o segmento de chamada apenas se future.get()ou sua versão sobrecarregada for chamada.
Nylated 27/08/14
16
É perfeito, mas o que fazer se eu executar tarefas em paralelo e não quiser bloquear a execução?
Grigory Kislin
43
Não use esta solução, pois ela quebra todo o propósito de usar o ExecutorService. Um ExecutorService é um mecanismo de execução assíncrono capaz de executar tarefas em segundo plano. Se você chamar future.get () logo após executar, ele bloqueará o thread de chamada até que a tarefa seja concluída.
user1801374
2
Esta solução não deve ter uma classificação tão alta. Future.get () funciona de forma síncrona e irá atuar como um bloqueador até que a Runnable ou mobilizável foram executados e como dito acima derrotas a finalidade de usar o Serviço Executor
Super Hans
2
Como #nhylated apontou, isso merece um erro do jdk. Se Future.get () não for chamado, qualquer exceção não capturada de Callable será ignorada silenciosamente. Um projeto muito ruim ... passou apenas mais de um dia para descobrir que uma biblioteca usava isso e o jdk ignorou silenciosamente as exceções. E isso ainda existe no jdk12.
Ben Jiang
18

A explicação para esse comportamento está correta no javadoc para afterExecute :

Nota: Quando as ações são incluídas em tarefas (como FutureTask) explicitamente ou por meio de métodos como envio, esses objetos de tarefa capturam e mantêm exceções computacionais e, portanto, não causam encerramento abrupto, e as exceções internas não são passadas para este método .

Drew Wills
fonte
10

Eu consegui contornar o problema, executando o executável fornecido enviado ao executor.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
mmm
fonte
3
Você pode melhorar a legibilidade usando o whenComplete()método de CompletableFuture.
Eduard Wirch
@EduardWirch isso funciona, mas você não pode retroceder uma exceção do whenComplete ()
Akshat
7

Estou usando a VerboseRunnableclasse do jcabi-log , que engole todas as exceções e as registra. Muito conveniente, por exemplo:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
yegor256
fonte
3

Outra solução seria usar o ManagedTask e o ManagedTaskListener .

Você precisa de um Callable ou Runnable que implemente a interface ManagedTask .

O método getManagedTaskListenerretorna a instância que você deseja.

public ManagedTaskListener getManagedTaskListener() {

E você implementa no ManagedTaskListener o taskDonemétodo:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Mais detalhes sobre o ciclo de vida da tarefa gerenciada e o ouvinte .

CSchulz
fonte
2

Isso funciona

  • É derivado do SingleThreadExecutor, mas você pode adaptá-lo facilmente
  • Código Java 8 lamdas, mas fácil de corrigir

Ele criará um Executor com um único thread, que pode executar muitas tarefas; e aguardará a atual terminar a execução para começar com a próxima

Em caso de erro ou exceção do uncaugth, o uncaughtExceptionHandler o capturará

classe final pública SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        Fábrica ThreadFactory = (executável executável) -> {
            thread final newThread = new Thread (executável, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((thread final caugthThread, final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            return newThread;
        };
        retornar novo FinalizableDelegatedExecutorService
                (novo ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue (),
                        fábrica){


                    nulo protegido afterExecute (executável executável, jogável jogável) {
                        super.afterExecute (executável, jogável);
                        if (throwable == null && instance executável de Future) {
                            experimentar {
                                Futuro futuro = (Futuro) executável;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (CancellationException ce) {
                                jogável = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (InterruptedException, por exemplo) {
                                Thread.currentThread (). Interrupt (); // ignorar / redefinir
                            }
                        }
                        if (lançável! = nulo) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                        }
                    }
                });
    }



    classe estática privada FinalizableDelegatedExecutorService
            estende DelegatedExecutorService {
        FinalizableDelegatedExecutorService (executor ExecutorService) {
            super (executor);
        }
        nulo protegido finalize () {
            super.shutdown ();
        }
    }

    / **
     * Uma classe de wrapper que expõe apenas os métodos ExecutorService
     * de uma implementação de ExecutorService.
     * /
    classe estática privada DelegatedExecutorService estende AbstractExecutorService {
        final privado ExecutorService e;
        DelegatedExecutorService (executorService executor) {e = executor; }
        Public void execute (comando executável) {e.execute (comando); }
        shutdown vazio público () {e.shutdown (); }
        lista pública shutdownNow () {return e.shutdownNow (); }
        public boolean isShutdown () {return e.isShutdown (); }
        public boolean isTerminated () {return e.isTerminated (); }
        público booleano waititTermination (tempo limite longo, unidade TimeUnit)
                lança InterruptedException {
            return e.awaitTermination (tempo limite, unidade);
        }
        Envio futuro público (tarefa executável) {
            retornar e.submit (tarefa);
        }
        Envio futuro público (tarefa que pode ser chamada) {
            retornar e.submit (tarefa);
        }
        Envio futuro público (tarefa executável, resultado T) {
            retornar e.submit (tarefa, resultado);
        }
        lista pública> invokeAll (Coleção> tarefas)
                lança InterruptedException {
            retornar e.invokeAll (tarefas);
        }
        lista pública> invokeAll (Collection> tarefas,
                                             tempo limite longo, unidade TimeUnit)
                lança InterruptedException {
            return e.invokeAll (tarefas, tempo limite, unidade);
        }
        public T invokeAny (Coleção> tarefas)
                lança InterruptedException, ExecutionException {
            retornar e.invokeAny (tarefas);
        }
        public T invokeAny (Coleção> tarefas,
                               tempo limite longo, unidade TimeUnit)
                lança InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny (tarefas, tempo limite, unidade);
        }
    }



    private SingleThreadExecutorWithExceptions () {}
}
obesga_tirant
fonte
Usando finalize é um pouco instável, infelizmente, uma vez que só vai ser chamado de "mais tarde, quando as coletas coletor de lixo ele" (ou talvez não no caso de um segmento, não sei) ...
rogerdpack
1

Se você deseja monitorar a execução da tarefa, pode girar 1 ou 2 threads (talvez mais dependendo da carga) e usá-los para executar tarefas de um wrapper ExecutionCompletionService.

Cristian Botiza
fonte
0

Se você ExecutorServicevem de uma fonte externa (ou seja, não é possível subclassificar ThreadPoolExecutore substituir afterExecute()), você pode usar um proxy dinâmico para obter o comportamento desejado:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
Graves
fonte
0

Isso é por causa AbstractExecutorService :: submitestá envolvendo seu runnableem RunnableFuture(nada FutureTask) como abaixo

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Em seguida execute, passará para Workere Worker.run()chamará o abaixo.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Finalmente, task.run();no código acima, a chamada será chamada FutureTask.run(). Aqui está o código do manipulador de exceção, por isso você NÃO está recebendo a exceção esperada.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Kanagavelu Sugumar
fonte
0

Isso é semelhante à solução da mmm, mas um pouco mais compreensível. Faça com que suas tarefas estendam uma classe abstrata que envolva o método run ().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
ccleve
fonte
-4

Em vez de subclassificar ThreadPoolExecutor, eu forneceria uma instância ThreadFactory que cria novos Threads e fornece a eles um UncaughtExceptionHandler

Kevin
fonte
3
Eu tentei isso também, mas o método uncaughtException nunca parece ser chamado. Acredito que isso ocorre porque um segmento de trabalho na classe ThreadPoolExecutor está capturando as exceções.
Tom
5
O método uncaughtException não é chamado porque o método de envio do ExecutorService está envolvendo o Callable / Runnable em um futuro; a exceção está sendo capturada lá.
Emil Sit