Scala: Listar [Futuro] para Futuro [Listar] desconsiderando futuros com falha

116

Estou procurando uma maneira de converter uma lista de comprimento arbitrário de Futuros em um Futuro de Lista. Estou usando o Playframework, então, no final das contas, o que eu realmente quero é um Future[Result], mas para tornar as coisas mais simples, vamos apenas dizer que Future[List[Int]]a maneira normal de fazer isso seria usar, Future.sequence(...)mas há uma reviravolta ... cerca de 10-20 futuros nele, e não é incomum que um desses futuros falhe (eles estão fazendo solicitações externas de serviço da web). Em vez de tentar novamente todos eles caso um deles falhe, eu gostaria de poder pegar aqueles que tiveram sucesso e devolvê-los.

Por exemplo, fazer o seguinte não funciona

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Success
import scala.util.Failure

val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: 
                    Future.successful(3) :: Nil

val futureOfList = Future.sequence(listOfFutures)

futureOfList onComplete {
  case Success(x) => println("Success!!! " + x)
  case Failure(ex) => println("Failed !!! " + ex)
}

scala> Failed !!! java.lang.Exception: Failure

Em vez de obter a única exceção, gostaria de poder extrair o 1 e o 3 de lá. Tentei usar Future.fold, mas aparentemente só chama Future.sequencenos bastidores.

Obrigado antecipadamente pela ajuda!

Joe
fonte

Respostas:

146

O truque é primeiro certificar-se de que nenhum dos futuros falhou. .recoveré seu amigo aqui, você pode combiná-lo com mappara converter todos os Future[T]resultados em Future[Try[T]]]instâncias, que certamente serão futuros de sucesso.

observação: você pode usar Optionou Eithertambém aqui, mas Tryé a maneira mais limpa se você deseja especificamente interceptar exceções

def futureToFutureTry[T](f: Future[T]): Future[Try[T]] =
  f.map(Success(_)).recover { case x => Failure(x)}

val listOfFutures = ...
val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_))

Em seguida, use Future.sequencecomo antes, para lhe dar umFuture[List[Try[T]]]

val futureListOfTrys = Future.sequence(listOfFutureTrys)

Em seguida, filtre:

val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess))

Você pode até retirar as falhas específicas, se precisar delas:

val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure))
Kevin Wright
fonte
Obrigado! .recoverera de fato a peça que faltava para mim.
Joe
20
Você pode usar em _.collect{ case Success(x) => x}vez de _.filter(_.isSuccess)se livrar Tryem tipo de futureListOfSuccesses.
Senia
43
No scala 2010 .recover(x => Failure(x))não é válido, use em .recover({case e => Failure(e)})vez disso
FGRibreau
Acho que você está perdendo o futuro wrapper: def futureToFutureOfExperimente [A] (f: Future [A]): ​​Future [Try [A]] = {val p = Promise [Try [A]] () f.map {a => p.success (scala.util.Success (a))} .recover {case x: Throwable => p.success (Failure (x))} p.future}
Dario
não tão. Estou mapeando um futuro para outro futuro, uma promessa intermediária não é necessária e seria um desperdício
Kevin Wright
12

Scala 2.12 tem uma melhoria em Future.transformque se presta a uma resposta com menos códigos.

val futures = Seq(Future{1},Future{throw new Exception})

// instead of `map` and `recover`, use `transform`
val seq = Future.sequence(futures.map(_.transform(Success(_)))) 

val successes = seq.map(_.collect{case Success(x)=>x})
successes
//res1: Future[Seq[Int]] = Future(Success(List(1)))

val failures = seq.map(_.collect{case Failure(x)=>x})
failures
//res2: Future[Seq[Throwable]] = Future(Success(List(java.lang.Exception)))
WeiChing 林 煒 清
fonte
11

Tentei a resposta de Kevin e encontrei uma falha na minha versão do Scala (2.11.5) ... Corrigi isso e escrevi alguns testes adicionais se alguém estiver interessado ... aqui está a minha versão>

implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {

    /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`.
      * The returned future is completed only once all of the futures in `fs` have been completed.
      */
    def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry)
      Future.sequence(listOfFutureTrys)
    }

    def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = {
      f.map(Success(_)) .recover({case x => Failure(x)})
    }

    def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isFailure))
    }

    def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = {
      allAsTrys(fItems).map(_.filter(_.isSuccess))
    }
}


// Tests... 



  // allAsTrys tests
  //
  test("futureToFutureTry returns Success if no exception") {
    val future =  Future.futureToFutureTry(Future{"mouse"})
    Thread.sleep(0, 100)
    val futureValue = future.value
    assert(futureValue == Some(Success(Success("mouse"))))
  }
  test("futureToFutureTry returns Failure if exception thrown") {
    val future =  Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")})
    Thread.sleep(5)            // need to sleep a LOT longer to get Exception from failure case... interesting.....
    val futureValue = future.value

    assertResult(true) {
      futureValue match {
        case Some(Success(Failure(error: IllegalStateException)))  => true
      }
    }
  }
  test("Future.allAsTrys returns Nil given Nil list as input") {
    val future =  Future.allAsTrys(Nil)
    assert ( Await.result(future, 100 nanosecond).isEmpty )
  }
  test("Future.allAsTrys returns successful item even if preceded by failing item") {
    val future1 =  Future{throw new IllegalStateException("bad news")}
    var future2 = Future{"dog"}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys, 10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(1) == Success("dog"))
  }
  test("Future.allAsTrys returns successful item even if followed by failing item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    System.out.println("successItem:" + listOfTrys);

    assert(listOfTrys(1).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys(0) == Success("dog"))
  }
  test("Future.allFailedAsTrys returns the failed item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allFailedAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0).failed.get.getMessage.contains("bad news"))
    assert(listOfTrys.size == 1)
  }
  test("Future.allSucceededAsTrys returns the succeeded item and only that item") {
    var future1 = Future{"dog"}
    val future2 =  Future{throw new IllegalStateException("bad news")}

    val futureListOfTrys =  Future.allSucceededAsTrys(List(future1,future2))
    val listOfTrys =  Await.result(futureListOfTrys,  10 milli)
    assert(listOfTrys(0) == Success("dog"))
    assert(listOfTrys.size == 1)
  }
Chris Bedford
fonte
7

Acabei de me deparar com esta pergunta e tenho outra solução a oferecer:

def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])
                                                (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], 
                                                 executor: ExecutionContext): Future[M[A]] = {
    in.foldLeft(Future.successful(cbf(in))) {
      (fr, fa)(for (r ← fr; a ← fa) yield r += a) fallbackTo fr
    } map (_.result())
}

A ideia aqui é que dentro da dobra você está esperando que o próximo elemento da lista seja concluído (usando a sintaxe para compreensão) e se o próximo falhar, você simplesmente volta para o que já tem.

Idan Waisman
fonte
Não gosto do nome, mas gosto da maneira como é feito, direto da sequência impl
crak
1

Você pode facilmente agrupar resultados futuros com opção e, em seguida, nivelar a lista:

def futureToFutureOption[T](f: Future[T]): Future[Option[T]] =
    f.map(Some(_)).recover {
      case e => None
    }
val listOfFutureOptions = listOfFutures.map(futureToFutureOption(_))

val futureListOfOptions = Future.sequence(listOfFutureOptions)

val futureListOfSuccesses = futureListOfOptions.flatten
Amir Hossein Javan
fonte
Apenas no caso de outra pessoa encontrar um erro com Some na primeira função, a primeira função pode ser reescrita para evitar erro do compilador: def futureToFutureOption [T] (f: Future [T]): Future [Option [T]] = f.map (Option (_)). recover {case e => None}
Zee
0

Você também pode coletar resultados bem-sucedidos e malsucedidos em listas diferentes:

def safeSequence[A](futures: List[Future[A]]): Future[(List[Throwable], List[A])] = {
  futures.foldLeft(Future.successful((List.empty[Throwable], List.empty[A]))) { (flist, future) =>
    flist.flatMap { case (elist, alist) =>
      future
        .map { success => (elist, alist :+ success) }
        .recover { case error: Throwable => (elist :+ error, alist) }
    }
  }
}
Evgeniy Lyutikov
fonte