Transforme Java Future em CompletableFuture

92

Java 8 apresenta CompletableFutureuma nova implementação de Future que pode ser combinada (inclui vários métodos Xxx). Gostaria de usar isso exclusivamente, mas muitas das bibliotecas que desejo usar retornam apenas Futureinstâncias não composíveis .

Existe uma maneira de encerrar as Futureinstâncias retornadas dentro de um CompleteableFuturepara que eu possa compô-lo?

Dan Midwood
fonte

Respostas:

56

Existe uma maneira, mas você não vai gostar. O método a seguir transforma um Future<T>em CompletableFuture<T>:

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

Obviamente, o problema dessa abordagem é que, para cada Futuro , um fio será bloqueado para aguardar o resultado do Futuro - contradizendo a ideia de futuro. Em alguns casos, pode ser possível fazer melhor. Porém, em geral, não há solução sem esperar ativamente pelo resultado do Futuro .

nosid
fonte
1
Ha, isso é exatamente o que escrevi antes de pensar que deve haver uma maneira melhor. Mas acho que não
Dan Midwood,
12
Hmmm ... essa solução não come um dos fios da "piscina comum", só de esperar? Esses fios de "pool comum" nunca devem bloquear ... hmmmm ...
Peti
1
@Peti: Você está certo. No entanto, o ponto é, se você provavelmente está fazendo algo errado, independentemente de estar usando o pool comum ou um pool de threads ilimitado .
nosid
4
Pode não ser perfeito, mas o uso CompletableFuture.supplyAsync(supplier, new SinglethreadExecutor())pelo menos não bloquearia os threads comuns do pool.
MikeFHay
6
Por favor, nunca faça isso
Laymain
55

Se a biblioteca que você deseja usar também oferece um método de estilo de retorno de chamada além do estilo Future, você pode fornecer a ela um manipulador que completa o CompletableFuture sem qualquer bloqueio de thread extra. Igual a:

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

Sem o retorno de chamada, a única outra maneira que vejo para resolver isso é usar um loop de pesquisa que coloca todas as suas Future.isDone()verificações em um único encadeamento e, em seguida, chama complete sempre que um Future for gettable.

Kafkiano
fonte
Estou usando a biblioteca assíncrona Apache Http que aceita FutureCallback. Isso facilitou minha vida :)
Abhishek Gayakwad
11

Se o seu Futurefor o resultado de uma chamada a um ExecutorServicemétodo (por exemplo submit()), o mais fácil seria usar o CompletableFuture.runAsync(Runnable, Executor)método.

De

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

para

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

O CompletableFutureé então criado "nativamente".

EDIT: Seguindo comentários de @SamMefford corrigidos por @MartinAndersson, se você quiser passar um Callable, você precisa chamar supplyAsync(), convertendo o Callable<T>em um Supplier<T>, por exemplo, com:

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value
}, myExecutor);

Como T Callable.call() throws Exception;lança uma exceção e T Supplier.get();não, você precisa capturar a exceção para que os protótipos sejam compatíveis.

Matthieu
fonte
1
Ou, se você estiver usando Callable <T> em vez de Runnable, tente supplyAsync:CompletableFuture<T> future = CompletableFuture.supplyAsync(myCallable, myExecutor);
Sam Mefford
@SamMefford obrigado, editei para incluir essa informação.
Matthieu
supplyAsyncrecebe um Supplier. O código não será compilado se você tentar passar um Callable.
Martin Andersson
@MartinAndersson isso mesmo, obrigado. Eu editei ainda mais para converter a Callable<T>em a Supplier<T>.
Matthieu
10

Publiquei um pequeno projeto de futuro que tenta fazer melhor do que o caminho direto na resposta.

A ideia principal é usar o único thread (e, claro, não apenas um loop de rotação) para verificar todos os estados Futures dentro, o que ajuda a evitar o bloqueio de um thread de um pool para cada transformação Future -> CompletableFuture.

Exemplo de uso:

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);
Dmitry Spikhalskiy
fonte
Isso parece interessante. Ele está usando um thread de timer? Por que essa não é a resposta aceita?
Kira
@Kira Sim, basicamente usa um thread de cronômetro para aguardar todos os futuros enviados.
Dmitry Spikhalskiy
7

Sugestão:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

Mas, basicamente:

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

E, o CompletablePromise:

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

Exemplo:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}
Gabriel francisco
fonte
isto é bastante simples de raciocinar e elegante e se encaixa na maioria dos casos de uso. Eu faria o CompletablePromiseContext não estático e tomaria o parâmetro para o intervalo de verificação (que é definido como 1 ms aqui) e, em seguida, sobrecarregaria o CompletablePromise<V>construtor para ser capaz de fornecer o seu próprio CompletablePromiseContextintervalo de verificação possivelmente diferente (mais longo) para longa duração Future<V>onde você não não precisa absolutamente ser capaz de executar callback (ou compor) imediatamente após terminar, e você também pode ter uma instância de CompletablePromiseContextpara assistir a um conjunto de Future(no caso de você ter muitos)
Dexter Legaspi
5

Deixe-me sugerir outra (espero, melhor) opção: https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata / concorrente

Resumidamente, a ideia é a seguinte:

  1. Apresente a CompletableTask<V>interface - a união do CompletionStage<V>+RunnableFuture<V>
  2. Warp ExecutorServicepara retornar CompletableTaskde submit(...)métodos (em vez de Future<V>)
  3. Feito, temos Futuros executáveis ​​E combináveis.

A implementação usa uma implementação alternativa CompletionStage (preste atenção, CompletionStage em vez de CompletableFuture):

Uso:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c); 
Valery Silaev
fonte
2
Pequena atualização: o código foi movido para um projeto separado, github.com/vsilaev/tascalate-concurrent , e agora é possível usar o Executor-s de caixa out-of-the-way de java.util.concurrent.
Valery Silaev