NetworkBoundResource com corotinas Kotlin

8

Você tem alguma idéia de como implementar o padrão de repositório com as rotinas de rede NetworkBoundResource e Kotlin? Sei que podemos lançar uma corotina dentro de um GlobalScope, mas isso pode levar ao vazamento de corotina. Gostaria de passar um viewModelScope como parâmetro, mas é um pouco complicado quando se trata de implementação (porque meu repositório não conhece um CoroutineScope de nenhum ViewModel).

abstract class NetworkBoundResource<ResultType, RequestType>
@MainThread constructor(
    private val coroutineScope: CoroutineScope
) {

    private val result = MediatorLiveData<Resource<ResultType>>()

    init {
        result.value = Resource.loading(null)
        @Suppress("LeakingThis")
        val dbSource = loadFromDb()
        result.addSource(dbSource) { data ->
            result.removeSource(dbSource)
            if (shouldFetch(data)) {
                fetchFromNetwork(dbSource)
            } else {
                result.addSource(dbSource) { newData ->
                    setValue(Resource.success(newData))
                }
            }
        }
    }

    @MainThread
    private fun setValue(newValue: Resource<ResultType>) {
        if (result.value != newValue) {
            result.value = newValue
        }
    }

    private fun fetchFromNetwork(dbSource: LiveData<ResultType>) {
        val apiResponse = createCall()
        result.addSource(dbSource) { newData ->
            setValue(Resource.loading(newData))
        }
        result.addSource(apiResponse) { response ->
            result.removeSource(apiResponse)
            result.removeSource(dbSource)
            when (response) {
                is ApiSuccessResponse -> {
                    coroutineScope.launch(Dispatchers.IO) {
                        saveCallResult(processResponse(response))

                        withContext(Dispatchers.Main) {
                            result.addSource(loadFromDb()) { newData ->
                                setValue(Resource.success(newData))
                            }
                        }
                    }
                }

                is ApiEmptyResponse -> {
                    coroutineScope.launch(Dispatchers.Main) {
                        result.addSource(loadFromDb()) { newData ->
                            setValue(Resource.success(newData))
                        }
                    }
                }

                is ApiErrorResponse -> {
                    onFetchFailed()
                    result.addSource(dbSource) { newData ->
                        setValue(Resource.error(response.errorMessage, newData))
                    }
                }
            }
        }
    }
}
Kamil Szustak
fonte
2
IMHO, o repositório deve expor suspendfunções ou retornar Channel/ Flowobjetos, dependendo da natureza da API. As reais rotinas são configuradas no modelo de exibição. LiveDataé apresentado pelo viewmodel, não pelo repositório.
CommonsWare 21/10/19
@CommonsWare Então você propõe reescrever o NetworkBoundResource para retornar dados reais (ou Recurso <T>), sem usar o LiveData nele e no repositório?
Kamil Szustak
Você é quem quer usar NetworkBoundResource. Meus comentários são mais gerais: IMHO, uma implementação de repositório Kotlin deve expor APIs relacionadas a corotinas.
CommonsWare 21/10/19
Gostaria de agradecer a todos vocês por me ajudarem com esta pergunta e várias respostas. E graças a @CommonsWare cuja dica me ajudou a fazer o meu melhor código (de novo)
Valerio
1
Eu diria isso mais como uma preferência pessoal. LiveDatanão possui o poder das corotinas RxJava ou Kotlin. LiveDataé muito bom para as comunicações da "última milha" da atividade ou fragmento e foi projetado com isso em mente. E para aplicativos pequenos, se você quiser pular repositórios e apenas ViewModelconversar diretamente com um RoomDatabase, tudo LiveDatabem.
CommonsWare

Respostas:

7

A resposta @ N1hk funciona corretamente, esta é apenas uma implementação diferente que não usa o flatMapConcatoperador (está marcada como FlowPreviewneste momento)

@FlowPreview
@ExperimentalCoroutinesApi
abstract class NetworkBoundResource<ResultType, RequestType> {

    fun asFlow() = flow {
        emit(Resource.loading(null))

        val dbValue = loadFromDb().first()
        if (shouldFetch(dbValue)) {
            emit(Resource.loading(dbValue))
            when (val apiResponse = fetchFromNetwork()) {
                is ApiSuccessResponse -> {
                    saveNetworkResult(processResponse(apiResponse))
                    emitAll(loadFromDb().map { Resource.success(it) })
                }
                is ApiErrorResponse -> {
                    onFetchFailed()
                    emitAll(loadFromDb().map { Resource.error(apiResponse.errorMessage, it) })
                }
            }
        } else {
            emitAll(loadFromDb().map { Resource.success(it) })
        }
    }

    protected open fun onFetchFailed() {
        // Implement in sub-classes to handle errors
    }

    @WorkerThread
    protected open fun processResponse(response: ApiSuccessResponse<RequestType>) = response.body

    @WorkerThread
    protected abstract suspend fun saveNetworkResult(item: RequestType)

    @MainThread
    protected abstract fun shouldFetch(data: ResultType?): Boolean

    @MainThread
    protected abstract fun loadFromDb(): Flow<ResultType>

    @MainThread
    protected abstract suspend fun fetchFromNetwork(): ApiResponse<RequestType>
}
Juan Cruz Soler
fonte
1
Não é melhor emitir Resource.error no caso ApiErrorResponse?
Kamil Szustak 14/11/19
Que tipo de retorno de veiculação de modernização deve ser?
Mahmood Ali
@MahmoodAli suspende a diversão someData (@ Query / @ Path): ApiResponse <List <Postitems>> ... gerencie de acordo com seus dados
USMAN osman
3

Atualização (2020-05-27):

Uma maneira que é mais idiomática para a linguagem Kotlin do que meus exemplos anteriores, usa as APIs de fluxo e empresta a resposta de Juan pode ser representada como uma função autônoma como a seguinte:

inline fun <ResultType, RequestType> networkBoundResource(
    crossinline query: () -> Flow<ResultType>,
    crossinline fetch: suspend () -> RequestType,
    crossinline saveFetchResult: suspend (RequestType) -> Unit,
    crossinline onFetchFailed: (Throwable) -> Unit = { Unit },
    crossinline shouldFetch: (ResultType) -> Boolean = { true }
) = flow<Resource<ResultType>> {
    emit(Resource.Loading(null))
    val data = query().first()

    val flow = if (shouldFetch(data)) {
        emit(Resource.Loading(data))

        try {
            saveFetchResult(fetch())
            query().map { Resource.Success(it) }
        } catch (throwable: Throwable) {
            onFetchFailed(throwable)
            query().map { Resource.Error(throwable, it) }
        }
    } else {
        query().map { Resource.Success(it) }
    }

    emitAll(flow)
}

Resposta original:

É assim que eu tenho feito isso usando o livedata-ktxartefato; não é necessário passar em nenhum CoroutineScope. A classe também usa apenas um tipo em vez de dois (por exemplo, ResultType / RequestType), pois eu sempre acabo usando um adaptador em outro lugar para mapear esses tipos.

import androidx.lifecycle.LiveData
import androidx.lifecycle.liveData
import androidx.lifecycle.map
import nihk.core.Resource

// Adapted from: https://developer.android.com/topic/libraries/architecture/coroutines
abstract class NetworkBoundResource<T> {

    fun asLiveData() = liveData<Resource<T>> {
        emit(Resource.Loading(null))

        if (shouldFetch(query())) {
            val disposable = emitSource(queryObservable().map { Resource.Loading(it) })

            try {
                val fetchedData = fetch()
                // Stop the previous emission to avoid dispatching the saveCallResult as `Resource.Loading`.
                disposable.dispose()
                saveFetchResult(fetchedData)
                // Re-establish the emission as `Resource.Success`.
                emitSource(queryObservable().map { Resource.Success(it) })
            } catch (e: Exception) {
                onFetchFailed(e)
                emitSource(queryObservable().map { Resource.Error(e, it) })
            }
        } else {
            emitSource(queryObservable().map { Resource.Success(it) })
        }
    }

    abstract suspend fun query(): T
    abstract fun queryObservable(): LiveData<T>
    abstract suspend fun fetch(): T
    abstract suspend fun saveFetchResult(data: T)
    open fun onFetchFailed(exception: Exception) = Unit
    open fun shouldFetch(data: T) = true
}

Como o @CommonsWare disse nos comentários, no entanto, seria melhor apenas expor a Flow<T>. Aqui está o que eu tentei fazer para fazer isso. Observe que eu não usei esse código na produção, portanto, cuidado com o comprador.

import kotlinx.coroutines.flow.*
import nihk.core.Resource

abstract class NetworkBoundResource<T> {

    fun asFlow(): Flow<Resource<T>> = flow {
        val flow = query()
            .onStart { emit(Resource.Loading<T>(null)) }
            .flatMapConcat { data ->
                if (shouldFetch(data)) {
                    emit(Resource.Loading(data))

                    try {
                        saveFetchResult(fetch())
                        query().map { Resource.Success(it) }
                    } catch (throwable: Throwable) {
                        onFetchFailed(throwable)
                        query().map { Resource.Error(throwable, it) }
                    }
                } else {
                    query().map { Resource.Success(it) }
                }
            }

        emitAll(flow)
    }

    abstract fun query(): Flow<T>
    abstract suspend fun fetch(): T
    abstract suspend fun saveFetchResult(data: T)
    open fun onFetchFailed(throwable: Throwable) = Unit
    open fun shouldFetch(data: T) = true
}
N1hk
fonte
O Flowcódigo irá fazer a solicitação de rede novamente quando os dados nas mudanças de banco de dados, eu postei uma nova resposta que mostra como lidar com isso
Juan Cruz Soler
Nos meus testes, coloquei um ponto de interrupção dentro do flatMapConcatbloco e também dentro do query().map { Resource.Success(it) }bloco, depois inseri um item no banco de dados. Somente o último ponto de interrupção foi atingido. Em outras palavras, a solicitação de rede não é feita novamente quando os dados no banco de dados são alterados.
N1hk 17/11/19
Se você colocar um ponto de interrupção aqui if (shouldFetch(data)), verá que ele é chamado duas vezes. A primeira vez que você obtém os resultados do banco de dados e a segunda quando saveFetchResult(fetch())é chamada #
Juan Cruz Soler
E se você pensar sobre isso, é o que você deseja quando usa um Flow. Você está salvando algo no banco de dados e deseja que o Room informe sobre essa alteração e chame seu flatMapConcatcódigo novamente. Você não estava usando um Flow<T>e usando Tem vez se você não quer que o comportamento
Juan Cruz Soler
3
Você está certo, eu não entendi o código. Como flatMapConcatretornará um novo fluxo a ser observado, o fluxo inicial não será mais chamado. As duas respostas se comportam da mesma maneira, por isso manterei as minhas como uma maneira diferente de implementá-las. Desculpe a confusão e obrigado pela sua explicação!
Juan Cruz Soler
0

Eu sou novo na Kotlin Coroutine. Acabei de encontrar este problema esta semana.

Eu acho que se você seguir o padrão de repositório mencionado no post acima, minha opinião é livre para passar um CoroutineScope no NetworkBoundResource . O CoroutineScope pode ser um dos parâmetros da função no Repositório , que retorna um LiveData, como:

suspend fun getData(scope: CoroutineScope): LiveDate<T>

Passe o viewmodelscope do escopo interno como o CoroutineScope ao chamar getData () no seu ViewModel, para que o NetworkBoundResource funcione no viewmodelscope e seja vinculado ao ciclo de vida do Viewmodel. A corotina no NetworkBoundResource será cancelada quando o ViewModel estiver morto, o que seria um benefício.

Para usar o viewmodelscope do escopo incorporado , não se esqueça de adicionar abaixo em seu build.gradle.

implementation 'androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0-alpha01'
Freddie
fonte