Suponha que eu tenha vários futuros e precise esperar até que algum deles falhe ou todos tenham sucesso.
Por exemplo: Let há 3 futuros: f1
, f2
, f3
.
Se
f1
for bem-sucedido ef2
falhar, não esperof3
(e devolvo a falha ao cliente).Se
f2
falhar enquantof1
ef3
ainda estiver em execução, não espero por eles (e retorno falha )Se
f1
for bem-sucedido e depoisf2
for bem - sucedido, continuo esperandof3
.
Como você o implementaria?
scala
concurrency
future
Michael
fonte
fonte
Respostas:
Em vez disso, você poderia usar uma compreensão para a seguinte:
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
Neste exemplo, os futuros 1, 2 e 3 são iniciados em paralelo. Então, na para compreensão, esperamos até que os resultados 1 e 2 e depois 3 estejam disponíveis. Se 1 ou 2 falhar, não esperaremos mais por 3. Se todos os 3 forem bem-sucedidos, o
aggFut
val manterá uma tupla com 3 slots, correspondendo aos resultados dos 3 futuros.Agora, se você precisa do comportamento em que deseja parar de esperar se, digamos, fut2 falhar primeiro, as coisas ficam um pouco mais complicadas. No exemplo acima, você teria que esperar que fut1 fosse concluído antes de perceber que fut2 falhou. Para resolver isso, você pode tentar algo assim:
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
Agora isso funciona corretamente, mas o problema é saber qual
Future
remover doMap
quando um foi concluído com êxito. Contanto que você tenha alguma maneira de correlacionar adequadamente um resultado com o Futuro que gerou esse resultado, algo assim funciona. Ele apenas recursivamente continua removendo Futuros concluídos do Mapa e, em seguida, chamandoFuture.firstCompletedOf
os restantesFutures
até que não haja mais nenhum, coletando os resultados ao longo do caminho. Não é bonito, mas se você realmente precisa do comportamento de que está falando, isso ou algo semelhante pode funcionar.fonte
fut2
falhar antesfut1
? Ainda vamos esperarfut1
nesse caso? Se quisermos, não é exatamente o que eu quero.onFailure
manipulador parafut2
a falhar rápido, e umonSuccess
noaggFut
para o sucesso alça. Um sucesso emaggFut
implicafut2
foi concluído com êxito, então você tem apenas um dos manipuladores chamado.Você pode usar uma promessa e enviar a ela a primeira falha ou o sucesso agregado final concluído:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
Então você pode
Await
no resultadoFuture
se quiser bloquear, ou apenasmap
em outra coisa.A diferença com para compreensão é que aqui você obtém o erro do primeiro a falhar, enquanto com para compreensão você obtém o primeiro erro na ordem de percurso da coleção de entrada (mesmo se outra falhou primeiro). Por exemplo:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
E:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
fonte
Aqui está uma solução sem usar atores.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
fonte
Você pode fazer isso apenas com o futuro. Aqui está uma implementação. Observe que isso não encerrará a execução mais cedo! Nesse caso, você precisa fazer algo mais sofisticado (e provavelmente implementar a interrupção você mesmo). Mas se você simplesmente não quer ficar esperando por algo que não vai funcionar, a chave é continuar esperando a primeira coisa terminar e parar quando não sobrar nada ou você encontrar uma exceção:
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
Aqui está um exemplo disso em ação quando tudo funciona bem:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Mas quando algo dá errado:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
fonte
Para este propósito, eu usaria um ator Akka. Ao contrário da compreensão para, ela falha assim que qualquer um dos futuros falha, então é um pouco mais eficiente nesse sentido.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
Em seguida, crie o ator, envie uma mensagem para ele (para que ele saiba para onde enviar sua resposta) e aguarde a resposta.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
fonte
Esta pergunta foi respondida, mas estou postando minha solução de classe de valor (classes de valor foram adicionadas em 2.10), pois não há nenhuma aqui. Fique à vontade para criticar.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture é um invólucro Future sem sobrecarga que altera o mapa Future / flatMap padrão de do-this-then-that para combinar-all-and-fail-if-any-fail. Uso:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
No exemplo acima, f1, f2 e f3 serão executados simultaneamente e, se houver falha em qualquer ordem, o futuro da tupla falhará imediatamente.
fonte
Você pode querer verificar a API Future do Twitter. Notavelmente o método Future.collect. Ele faz exatamente o que você deseja: https://twitter.github.io/scala_school/finagle.html
O código-fonte Future.scala está disponível aqui: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
fonte
Você pode usar isto:
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
fonte