Combinar estrutura serializar operações assíncronas

8

Como faço para que os pipelines assíncronos que constituem a estrutura Combine sejam alinhados de forma síncrona (serial)?

Suponha que eu tenha 50 URLs dos quais desejo fazer o download dos recursos correspondentes e digamos que eu queira fazer um de cada vez. Eu sei como fazer isso com Operation / OperationQueue, por exemplo, usando uma subclasse Operation que não se declara concluída até que o download seja concluído. Como eu faria a mesma coisa usando o Combine?

No momento, tudo o que me ocorre é manter uma lista global dos URLs restantes e abrir uma, configurar esse pipeline para um download, fazer o download e, no sinkpipeline, repetir. Isso não parece muito parecido com um Combine.

Tentei criar uma matriz de URLs e mapeá-la para uma variedade de editores. Eu sei que posso "produzir" um editor e fazê-lo publicar no pipeline usando flatMap. Mas ainda estou fazendo o download simultaneamente. Não existe uma maneira combinada de percorrer a matriz de maneira controlada - ou existe?

(Também imaginei fazer algo com o Future, mas fiquei irremediavelmente confuso. Não estou acostumado a esse modo de pensar.)

mate
fonte

Respostas:

2

Eu apenas testei isso brevemente, mas, a princípio, parece que cada solicitação aguarda a conclusão da solicitação anterior antes de iniciar.

Estou postando esta solução em busca de feedback. Seja crítico se essa não for uma boa solução.

extension Collection where Element: Publisher {

    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        // If the collection is empty, we can't just create an arbititary publisher
        // so we return nil to indicate that we had nothing to serialize.
        if isEmpty { return nil }

        // We know at this point that it's safe to grab the first publisher.
        let first = self.first!

        // If there was only a single publisher then we can just return it.
        if count == 1 { return first.eraseToAnyPublisher() }

        // We're going to build up the output starting with the first publisher.
        var output = first.eraseToAnyPublisher()

        // We iterate over the rest of the publishers (skipping over the first.)
        for publisher in self.dropFirst() {
            // We build up the output by appending the next publisher.
            output = output.append(publisher).eraseToAnyPublisher()
        }

        return output
    }
}


Uma versão mais concisa desta solução (fornecida por @matt):

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            $0.append($1).eraseToAnyPublisher()
        }
    }
}
Clay Ellis
fonte
Excelente, obrigado. appendé exatamente o que eu estava procurando. - Seu código pode ser consideravelmente mais rígido; em particular, não há necessidade de retornar prematuramente no caso em que count == 1, porque nesse caso dropFirstestará vazio e simplesmente não executaremos um loop. E não há necessidade de manter a outputvariável, porque podemos usar em reducevez de for...in. Veja minha resposta para obter uma renderização mais precisa.
matt
3

Você pode criar um assinante personalizado onde recebe Subscribers.Demand.max (1) retornado. Nesse caso, o assinante solicitará o próximo valor somente quando receber um. O exemplo é para Int.publisher, mas algum atraso aleatório no mapa imita o tráfego de rede :-)

import PlaygroundSupport
import SwiftUI
import Combine

class MySubscriber: Subscriber {
  typealias Input = String
  typealias Failure = Never

  func receive(subscription: Subscription) {
    print("Received subscription", Thread.current.isMainThread)
    subscription.request(.max(1))
  }

  func receive(_ input: Input) -> Subscribers.Demand {
    print("Received input: \(input)", Thread.current.isMainThread)
    return .max(1)
  }

  func receive(completion: Subscribers.Completion<Never>) {
    DispatchQueue.main.async {
        print("Received completion: \(completion)", Thread.current.isMainThread)
        PlaygroundPage.current.finishExecution()
    }
  }
}

(110...120)
    .publisher.receive(on: DispatchQueue.global())
    .map {
        print(Thread.current.isMainThread, Thread.current)
        usleep(UInt32.random(in: 10000 ... 1000000))
        return String(format: "%02x", $0)
    }
    .subscribe(on: DispatchQueue.main)
    .subscribe(MySubscriber())

print("Hello")

PlaygroundPage.current.needsIndefiniteExecution = true

Impressão para parque infantil ...

Hello
Received subscription true
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 6e false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 6f false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 70 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 71 false
false <NSThread: 0x60000007cc80>{number = 9, name = (null)}
Received input: 72 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 73 false
false <NSThread: 0x600000064780>{number = 5, name = (null)}
Received input: 74 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 75 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 76 false
false <NSThread: 0x60000004dc80>{number = 8, name = (null)}
Received input: 77 false
false <NSThread: 0x600000053400>{number = 3, name = (null)}
Received input: 78 false
Received completion: finished true

ATUALIZAÇÃO finalmente encontrei .flatMap(maxPublishers: ), o que me força a atualizar esse tópico interessante com uma abordagem um pouco diferente. Por favor, veja que estou usando a fila global para agendamento, não apenas algum atraso aleatório, apenas para garantir que o recebimento do fluxo serializado não seja um comportamento "aleatório" ou "sortudo" :-)

import PlaygroundSupport
import Combine
import Foundation

PlaygroundPage.current.needsIndefiniteExecution = true

let A = (1 ... 9)
    .publisher
    .flatMap(maxPublishers: .max(1)) { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: DispatchQueue.global())
        }
}
.sink { value in
    print(value, "A")
}

let B = (1 ... 9)
    .publisher
    .flatMap { value in
        [value].publisher
            .flatMap { value in
                Just(value)
                    .delay(for: .milliseconds(Int.random(in: 0 ... 100)), scheduler: RunLoop.main)
        }
}
.sink { value in
    print("     ",value, "B")
}

impressões

1 A
      4 B
      5 B
      7 B
      1 B
      2 B
      8 B
      6 B
2 A
      3 B
      9 B
3 A
4 A
5 A
6 A
7 A
8 A
9 A

Baseado aqui escrito

.serialize ()?

definido por Clay Ellis, a resposta aceita pode ser substituída por

.publisher.flatMap (maxPublishers: .max (1)) {$ 0}

enquanto a versão "não serializada" deve usar

.publisher.flatMap {$ 0}

"exemplo do mundo real"

import PlaygroundSupport
import Foundation
import Combine

let path = "postman-echo.com/get"
let urls: [URL] = "... which proves the downloads are happening serially .-)".map(String.init).compactMap { (parameter) in
    var components = URLComponents()
    components.scheme = "https"
    components.path = path
    components.queryItems = [URLQueryItem(name: parameter, value: nil)]
    return components.url
}
//["https://postman-echo.com/get?]
struct Postman: Decodable {
    var args: [String: String]
}


let collection = urls.compactMap { value in
        URLSession.shared.dataTaskPublisher(for: value)
        .tryMap { data, response -> Data in
            return data
        }
        .decode(type: Postman.self, decoder: JSONDecoder())
        .catch {_ in
            Just(Postman(args: [:]))
    }
}

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}

var streamA = ""
let A = collection
    .publisher.flatMap{$0}

    .sink(receiveCompletion: { (c) in
        print(streamA, "     ", c, "    .publisher.flatMap{$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamA)
    })


var streamC = ""
let C = collection
    .serialize()?

    .sink(receiveCompletion: { (c) in
        print(streamC, "     ", c, "    .serialize()?")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamC)
    })

var streamD = ""
let D = collection
    .publisher.flatMap(maxPublishers: .max(1)){$0}

    .sink(receiveCompletion: { (c) in
        print(streamD, "     ", c, "    .publisher.flatMap(maxPublishers: .max(1)){$0}")
    }, receiveValue: { (postman) in
        print(postman.args.keys.joined(), terminator: "", to: &streamD)
    })

PlaygroundPage.current.needsIndefiniteExecution = true

impressões

.w.h i.c hporves ht edownloadsa erh appeninsg eriall y.-)       finished     .publisher.flatMap{$0}
... which proves the downloads are happening serially .-)       finished     .publisher.flatMap(maxPublishers: .max(1)){$0}
... which proves the downloads are happening serially .-)       finished     .serialize()?

Parece-me muito útil em outros cenários também. Tente usar o valor padrão de maxPublishers no próximo snippet e compare os resultados :-)

import Combine

let sequencePublisher = Publishers.Sequence<Range<Int>, Never>(sequence: 0..<Int.max)
let subject = PassthroughSubject<String, Never>()

let handle = subject
    .zip(sequencePublisher.print())
    //.publish
    .flatMap(maxPublishers: .max(1), { (pair)  in
        Just(pair)
    })
    .print()
    .sink { letters, digits in
        print(letters, digits)
    }

"Hello World!".map(String.init).forEach { (s) in
    subject.send(s)
}
subject.send(completion: .finished)
user3441734
fonte
@matt coletor não funciona de maneira diferente, apenas ao receber retorno Subsribers.Demand.unlimited ... Pode estar usando o instrumento adequado, como a fila serial e o Data.init? (contentsOf url: URL) é a melhor opção no seu cenário . Se você precisar somar dois Int, faça como [lhs: Int, rhs: Int] .reduce .... ??? Usarei Data.init? (ContentsOf url: URL) dentro de Receive (_ input :) do MySerialDownloaderSubscriber.
user3441734 24/01
@matt, por favor, veja a resposta atualizada. Combinar é emocionante, mas (pelo menos para mim) é muito difícil de entender ...
user3441734 25/01
Sim eu entendo! Com o maxPublishersparâmetro, adicionamos contrapressão. Isso acontece com o que eu disse na minha pergunta: "Eu sei que posso" produzir "um editor e fazê-lo publicar no pipeline usando o flatMap. Mas, ainda assim, continuo fazendo o download simultaneamente". Bem, com o maxPublishersparâmetro, eles não são simultâneos.
matt
@matt sim, coletor de chamadas do próprio assinante do editor com Subscribers.Demand.unlimited, flatMap tem o mesmo efeito que definir o próprio assinante do editor com um valor diferente, no nosso caso de uso .max (1). Acabei de adicionar outro exemplo com cenário diferente, onde é tão utilizável.
User3441734 25/01
2

Em todas as outras estruturas reativas, isso é realmente fácil; você apenas usa concatpara concatenar e nivelar os resultados em uma etapa e, em seguida, você pode reduceos resultados em uma matriz final. A Apple dificulta isso porque Publisher.Concatenatenão possui sobrecarga que aceite uma variedade de editores. Há estranheza semelhante com Publisher.Merge. Sinto que isso tem a ver com o fato de que eles retornam editores genéricos aninhados em vez de apenas retornar um único tipo genérico como rx Observable. Eu acho que você pode simplesmente chamar Concatenarem um loop e, em seguida, reduza os resultados concatenados em uma única matriz, mas eu realmente espero que eles resolvam esse problema no próximo lançamento. Certamente, há a necessidade de concaturar mais de 2 publicadores e mesclar mais de 4 publicadores (e as sobrecargas para esses dois operadores nem são consistentes, o que é estranho).

EDITAR:

Voltei a isso e descobri que você pode concatrar uma variedade arbitrária de editores e eles serão emitidos em sequência. Não tenho ideia de por que não existe uma função ConcatenateManypara fazer isso por você, mas parece que, desde que você esteja disposto a usar um editor de tipo apagado, não é tão difícil escrever você mesmo. Este exemplo mostra que a mesclagem emite na ordem temporal, enquanto concat emite na ordem da combinação:

import PlaygroundSupport
import SwiftUI
import Combine

let p = Just<Int>(1).append(2).append(3).delay(for: .seconds(0.25), scheduler: RunLoop.main).eraseToAnyPublisher()
let q = Just<Int>(4).append(5).append(6).eraseToAnyPublisher()
let r = Just<Int>(7).append(8).append(9).delay(for: .seconds(0.5), scheduler: RunLoop.main).eraseToAnyPublisher()
let concatenated: AnyPublisher<Int, Never> = [q,r].reduce(p) { total, next in
  total.append(next).eraseToAnyPublisher()
}

var subscriptions = Set<AnyCancellable>()

concatenated
  .sink(receiveValue: { v in
    print("concatenated: \(v)")
  }).store(in: &subscriptions)

Publishers
  .MergeMany([p,q,r])
  .sink(receiveValue: { v in
    print("merge: \(v)")
  }).store(in: &subscriptions)
Josh Homann
fonte
Sim, você provavelmente adivinhou que escolhi um número grande como 50 intencionalmente.
matt
Existe um MergeMany. Não entendo por que não existe um ConcatenateMany. Rx swift possui Observable.concat e Reactive Swift possui flatMap (.concat), portanto isso é estranho; talvez esteja perdendo alguma coisa. Eu continuarei procurando developer.apple.com/documentation/combine/publishers/mergemany
Josh Homann
Seria concatserializado (nas outras estruturas reativas)?
matt
Sim. Para uma Sequência de sequências, você tem apenas uma maneira de achatar, ou seja, coloque os elementos de uma sequência interna após outra, exatamente como Sequence.flatMap em swift. Quando você tem uma sequência assíncrona, deve considerar a dimensão temporal ao achatar. Portanto, você pode emitir os elementos de todas as seqüências internas na ordem temporal (mesclagem) ou emitir os elementos de cada sequência interna na ordem das sequências (concat). Veja o diagrama de mármore: rxmarbles.com/#concat vs rxmarbles.com/#merge
Josh Homann
Observe que .appendé um operador que cria um Publisher.Concatenate.
rob mayoff
2

Da pergunta original:

Tentei criar uma matriz de URLs e mapeá-la para uma variedade de editores. Eu sei que posso "produzir" um editor e fazê-lo publicar no pipeline usando flatMap. Mas ainda estou fazendo o download simultaneamente. Não existe uma maneira combinada de percorrer a matriz de maneira controlada - ou existe?


Aqui está um exemplo de brinquedo para substituir o verdadeiro problema:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap() {$0}
    .sink {print($0)}.store(in:&self.storage)

Isso emite os números inteiros de 1 a 10 em ordem aleatória, chegando em momentos aleatórios. O objetivo é fazer algo com collectionisso fará com que ele emita os números inteiros de 1 a 10 em ordem.


Agora vamos mudar apenas uma coisa: na linha

.flatMap {$0}

nós adicionamos o maxPublishersparâmetro:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
collection.publisher
    .flatMap(maxPublishers:.max(1)) {$0}
    .sink {print($0)}.store(in:&self.storage)

Presto, nós agora fazer emitir os inteiros de 1 a 10, em ordem, com intervalos aleatórios entre eles.


Vamos aplicar isso ao problema original. Para demonstrar, preciso de uma conexão à Internet bastante lenta e de um recurso bastante grande para baixar. Primeiro, farei isso com o comum .flatMap:

let eph = URLSessionConfiguration.ephemeral
let session = URLSession(configuration: eph)
let url = "https://photojournal.jpl.nasa.gov/tiff/PIA23172.tif"
let collection = [url, url, url]
    .map {URL(string:$0)!}
    .map {session.dataTaskPublisher(for: $0)
        .eraseToAnyPublisher()
}
collection.publisher.setFailureType(to: URLError.self)
    .handleEvents(receiveOutput: {_ in print("start")})
    .flatMap() {$0}
    .map {$0.data}
    .sink(receiveCompletion: {comp in
        switch comp {
        case .failure(let err): print("error", err)
        case .finished: print("finished")
        }
    }, receiveValue: {_ in print("done")})
    .store(in:&self.storage)

O resultado é

start
start
start
done
done
done
finished

o que mostra que estamos fazendo os três downloads simultaneamente. Ok, agora mude

    .flatMap() {$0}

para

    .flatMap(maxPublishers:.max(1) {$0}

O resultado agora é:

start
done
start
done
start
done
finished

Agora, estamos fazendo o download em série, que é o problema originalmente a ser resolvido.


acrescentar

De acordo com o princípio do TIMTOWTDI, podemos encadear os editores appendpara serializá-los:

let collection = (1 ... 10).map {
    Just($0).delay(
        for: .seconds(Double.random(in:1...5)),
        scheduler: DispatchQueue.main)
        .eraseToAnyPublisher()
}
let pub = collection.dropFirst().reduce(collection.first!) {
    return $0.append($1).eraseToAnyPublisher()
}

O resultado é um editor que serializa os editores atrasados ​​na coleção original. Vamos provar isso assinando:

pub.sink {print($0)}.store(in:&self.storage)

Com certeza, os números inteiros agora chegam em ordem (com intervalos aleatórios entre).


Podemos encapsular a criação de pubuma coleção de editores com uma extensão na coleção, conforme sugerido por Clay Ellis:

extension Collection where Element: Publisher {
    func serialize() -> AnyPublisher<Element.Output, Element.Failure>? {
        guard let start = self.first else { return nil }
        return self.dropFirst().reduce(start.eraseToAnyPublisher()) {
            return $0.append($1).eraseToAnyPublisher()
        }
    }
}
mate
fonte
1

Aqui está um código de playground de uma página que descreve uma possível abordagem. A idéia principal é transformar chamadas de API assíncronas em cadeia de Futureeditores, criando assim um pipeline serial.

Entrada: intervalo de int de 1 a 10 que assina de forma assíncrona na fila de segundo plano convertida em cadeias

Demonstração de chamada direta para API assíncrona:

let group = DispatchGroup()
inputValues.map {
    group.enter()
    asyncCall(input: $0) { (output, _) in
        print(">> \(output), in \(Thread.current)")
        group.leave()
    }
}
group.wait()

Resultado:

>> 1, in <NSThread: 0x7fe76264fff0>{number = 4, name = (null)}
>> 3, in <NSThread: 0x7fe762446b90>{number = 3, name = (null)}
>> 5, in <NSThread: 0x7fe7624461f0>{number = 5, name = (null)}
>> 6, in <NSThread: 0x7fe762461ce0>{number = 6, name = (null)}
>> 10, in <NSThread: 0x7fe76246a7b0>{number = 7, name = (null)}
>> 4, in <NSThread: 0x7fe764c37d30>{number = 8, name = (null)}
>> 7, in <NSThread: 0x7fe764c37cb0>{number = 9, name = (null)}
>> 8, in <NSThread: 0x7fe76246b540>{number = 10, name = (null)}
>> 9, in <NSThread: 0x7fe7625164b0>{number = 11, name = (null)}
>> 2, in <NSThread: 0x7fe764c37f50>{number = 12, name = (null)}

Demonstração do pipeline combinado:

Resultado:

>> got 1
>> got 2
>> got 3
>> got 4
>> got 5
>> got 6
>> got 7
>> got 8
>> got 9
>> got 10
>>>> finished with true

Código:

import Cocoa
import Combine
import PlaygroundSupport

// Assuming there is some Asynchronous API with
// (eg. process Int input value during some time and generates String result)
func asyncCall(input: Int, completion: @escaping (String, Error?) -> Void) {
    DispatchQueue.global(qos: .background).async {
            sleep(.random(in: 1...5)) // wait for random Async API output
            completion("\(input)", nil)
        }
}

// There are some input values to be processed serially
let inputValues = Array(1...10)

// Prepare one pipeline item based on Future, which trasform Async -> Sync
func makeFuture(input: Int) -> AnyPublisher<Bool, Error> {
    Future<String, Error> { promise in
        asyncCall(input: input) { (value, error) in
            if let error = error {
                promise(.failure(error))
            } else {
                promise(.success(value))
            }
        }
    }
    .receive(on: DispatchQueue.main)
    .map {
        print(">> got \($0)") // << sideeffect of pipeline item
        return true
    }
    .eraseToAnyPublisher()
}

// Create pipeline trasnforming input values into chain of Future publishers
var subscribers = Set<AnyCancellable>()
let pipeline =
    inputValues
    .reduce(nil as AnyPublisher<Bool, Error>?) { (chain, value) in
        if let chain = chain {
            return chain.flatMap { _ in
                makeFuture(input: value)
            }.eraseToAnyPublisher()
        } else {
            return makeFuture(input: value)
        }
    }

// Execute pipeline
pipeline?
    .sink(receiveCompletion: { _ in
        // << do something on completion if needed
    }) { output in
        print(">>>> finished with \(output)")
    }
    .store(in: &subscribers)

PlaygroundPage.current.needsIndefiniteExecution = true
Asperi
fonte
0

Use flatMap(maxPublishers:transform:)com .max(1), por exemplo

func imagesPublisher(for urls: [URL]) -> AnyPublisher<UIImage, URLError> {
    Publishers.Sequence(sequence: urls.map { self.imagePublisher(for: $0) })
        .flatMap(maxPublishers: .max(1)) { $0 }
        .eraseToAnyPublisher()
}

Onde

func imagePublisher(for url: URL) -> AnyPublisher<UIImage, URLError> {
    URLSession.shared.dataTaskPublisher(for: url)
        .compactMap { UIImage(data: $0.data) }
        .receive(on: RunLoop.main)
        .eraseToAnyPublisher()
}

e

var imageRequests: AnyCancellable?

func fetchImages() {
    imageRequests = imagesPublisher(for: urls).sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("done")
        case .failure(let error):
            print("failed", error)
        }
    }, receiveValue: { image in
        // do whatever you want with the images as they come in
    })
}

Isso resultou em:

serial

Mas devemos reconhecer que você recebe um grande sucesso de desempenho fazendo-os sequencialmente, assim. Por exemplo, se eu aumentar para 6 por vez, é mais do que o dobro da velocidade:

concorrente

Pessoalmente, eu recomendaria o download sequencial apenas se for absolutamente necessário (o que, ao baixar uma série de imagens / arquivos, quase certamente não é o caso). Sim, a execução simultânea de solicitações pode fazer com que elas não terminem em uma ordem específica, mas apenas usamos uma estrutura que é independente da ordem (por exemplo, um dicionário em vez de uma matriz simples), mas os ganhos de desempenho são tão significativos que geralmente valem a pena.

Mas, se você deseja que eles sejam baixados sequencialmente, o maxPublishersparâmetro pode conseguir isso.

Roubar
fonte
Sim, é isso que minha resposta já diz: stackoverflow.com/a/59889993/341994 , bem como a resposta que eu concedeu a recompensa a stackoverflow.com/a/59889174/341994
matt
E veja também agora meu livro apeth.com/UnderstandingCombine/operators/…
matt
A propósito, falando seqüencialmente, fiz um ótimo uso da sua operação assíncrona seqüencial para uma tarefa diferente, obrigado por escrevê-la
matt
@matt - Lol. Confesso que não vi que você havia encontrado a maxPublishersopção. E eu não teria falado sobre “não faça serial” se tivesse notado que era você (como eu sei que você entende completamente os prós e contras de serial versus simultâneo). Eu literalmente vi apenas "Quero baixar um arquivo de cada vez", encontrei recentemente a maxPublishersopção de outra coisa que estava fazendo (a saber, fornecer uma solução moderna para essa pergunta ) e pensei em compartilhar a solução Combine I tinha inventado. Eu não quis ser tão derivado.
Rob
1
Sim, era a solução mencionada em stackoverflow.com/a/48104095/1271826 da qual eu estava falando antes; Achei isso muito útil.
matt