Como o Asyncio realmente funciona?

120

Essa pergunta é motivada por outra pergunta: Como esperar no cdef?

Existem toneladas de artigos e postagens de blogs na web sobre asyncio, mas todos são muito superficiais. Não consegui encontrar nenhuma informação sobre como asynciorealmente é implementado e o que torna o I / O assíncrono. Eu estava tentando ler o código-fonte, mas são milhares de linhas de código C não do mais alto grau, muitas das quais lidam com objetos auxiliares, mas o mais crucial, é difícil conectar entre a sintaxe Python e o código C que ela traduziria para dentro.

A documentação do próprio Asycnio é ainda menos útil. Não há nenhuma informação sobre como funciona, apenas algumas orientações sobre como usá-lo, que às vezes também são enganosas / mal escritas.

Estou familiarizado com a implementação de corrotinas do Go e esperava que o Python fizesse a mesma coisa. Se fosse esse o caso, o código que sugeri no post com link acima teria funcionado. Como isso não aconteceu, agora estou tentando descobrir o porquê. Meu melhor palpite até agora é o seguinte, corrija-me onde estou errado:

  1. As definições de procedimento do formulário async def foo(): ...são, na verdade, interpretadas como métodos de herança de uma classe coroutine.
  2. Talvez, async defna verdade , seja dividido em vários métodos por awaitinstruções, onde o objeto, no qual esses métodos são chamados, é capaz de acompanhar o progresso feito durante a execução até o momento.
  3. Se o acima for verdadeiro, então, essencialmente, a execução de uma co-rotina se resume a chamar métodos de objeto de co-rotina por algum gerenciador global (loop?).
  4. O gerenciador global está de alguma forma (como?) Ciente de quando as operações de I / O são realizadas pelo código Python (apenas?) E é capaz de escolher um dos métodos de co-rotina pendentes para executar após o método de execução atual abandonar o controle (clique na awaitinstrução )

Em outras palavras, aqui está minha tentativa de "transformar" alguma asynciosintaxe em algo mais compreensível:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Se meu palpite estiver correto: então eu tenho um problema. Como o I / O realmente acontece neste cenário? Em um tópico separado? Todo o interpretador está suspenso e o I / O ocorre fora do interpretador? O que exatamente significa I / O? Se meu procedimento python chamou procedimento C open()e, por sua vez, enviou interrupção ao kernel, cedendo o controle a ele, como o interpretador Python sabe sobre isso e é capaz de continuar executando algum outro código, enquanto o código do kernel faz a E / S real e até ele acorda o procedimento Python que enviou a interrupção originalmente? Como pode o interpretador Python, em princípio, estar ciente de que isso está acontecendo?

wvxvw
fonte
2
A maior parte da lógica é tratada pela implementação do loop de evento. Veja como o CPython BaseEventLoopé implementado: github.com/python/cpython/blob/…
Blender
@Blender ok, acho que finalmente encontrei o que queria, mas agora não entendo o motivo pelo qual o código foi escrito da maneira que estava. Por _run_onceque, que é realmente a única função útil em todo o módulo, tornou-se "privada"? A implementação é horrível, mas isso não é um problema. Por que a única função que você deseja chamar em um loop de evento está marcada como "não me chame"?
wvxvw
Essa é uma pergunta para a lista de discussão. Qual caso de uso exigiria que você tocasse _run_onceem primeiro lugar?
Blender
8
Isso não responde realmente à minha pergunta, no entanto. Como você resolveria qualquer problema útil usando apenas _run_once? asyncioé complexo e tem seus defeitos, mas mantenha a discussão civilizada. Não fale mal dos desenvolvedores por trás do código que você mesmo não entende.
Blender
1
@ user8371915 Se você acredita que há algo que eu não abordei, fique à vontade para adicionar ou comentar minha resposta.
Bharel

Respostas:

203

Como funciona o assíncio?

Antes de responder a esta pergunta, precisamos entender alguns termos básicos, pule-os se você já conhece algum deles.

Geradores

Geradores são objetos que nos permitem suspender a execução de uma função python. Os geradores selecionados pelo usuário são implementados usando a palavra-chave yield. Ao criar uma função normal contendo a yieldpalavra - chave, transformamos essa função em um gerador:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Como você pode ver, chamar next()o gerador faz com que o interpretador carregue o frame do teste e retorne o yieldvalor ed. Chamando next()novamente, faz com que o quadro seja carregado novamente na pilha do interpretador e continue com yieldoutro valor.

Na terceira vez que next()é chamado, nosso gerador estava pronto e StopIterationacionado.

Comunicando-se com um gerador

Uma característica menos conhecida dos geradores é o fato de que você pode se comunicar com eles usando dois métodos: send()e throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Ao chamar gen.send(), o valor é passado como um valor de retorno da yieldpalavra - chave.

gen.throw()por outro lado, permite lançar Exceptions dentro de geradores, com a exceção levantada no mesmo local que yieldfoi chamada.

Retornando valores de geradores

Retornar um valor de um gerador, resulta no valor sendo colocado dentro da StopIterationexceção. Podemos posteriormente recuperar o valor da exceção e usá-lo de acordo com nossa necessidade.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Eis uma nova palavra-chave: yield from

Python 3.4 veio com a adição de uma nova palavra-chave: yield from. O que essa palavra-chave nos permite fazer, é passar em qualquer next(), send()e throw()em um gerador de mais interna aninhada. Se o gerador interno retornar um valor, também será o valor de retorno de yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

Escrevi um artigo para elaborar mais sobre este tópico.

Juntando tudo

Ao introduzir a nova palavra-chave yield fromno Python 3.4, agora éramos capazes de criar geradores dentro de geradores que, assim como um túnel, passam os dados de um lado para outro dos geradores mais internos para os mais externos. Isso gerou um novo significado para geradores - corrotinas .

Corrotinas são funções que podem ser interrompidas e retomadas durante a execução. Em Python, eles são definidos usando a async defpalavra - chave. Assim como os geradores, eles também usam sua própria forma, yield fromque é await. Antes asynce depois de awaitintroduzidos no Python 3.5, criamos corrotinas exatamente da mesma maneira que os geradores foram criados (com em yield fromvez de await).

async def inner():
    return 1

async def outer():
    await inner()

Como todo iterador ou gerador que implementa o __iter__()método, as corrotinas implementam o __await__()que permite que elas continuem sempre que await coroé chamado.

Há um belo diagrama de sequência dentro da documentação do Python que você deve verificar.

Em asyncio, além das funções de co-rotina, temos 2 objetos importantes: tarefas e futuros .

Futuros

Futuros são objetos que têm o __await__()método implementado e sua função é manter um determinado estado e resultado. O estado pode ser um dos seguintes:

  1. PENDENTE - futuro não tem nenhum resultado ou conjunto de exceção.
  2. CANCELADO - o futuro foi cancelado usando fut.cancel()
  3. FINISHED - o futuro foi concluído, seja por um conjunto de resultados usando fut.set_result()ou por um conjunto de exceção usandofut.set_exception()

O resultado, assim como você adivinhou, pode ser um objeto Python, que será retornado, ou uma exceção que pode ser gerada.

Outra característica importante dos futureobjetos é que eles contêm um método chamado add_done_callback(). Este método permite que as funções sejam chamadas assim que a tarefa for concluída - seja ela gerada uma exceção ou concluída.

Tarefas

Objetos de tarefa são futuros especiais, que envolvem corrotinas e se comunicam com as corrotinas mais internas e externas. Cada vez que uma co-rotina sai do awaitfuturo, o futuro é passado de volta para a tarefa (assim como em yield from), e a tarefa o recebe.

Em seguida, a tarefa se liga ao futuro. Fá-lo invocando add_done_callback()o futuro. A partir de agora, se o futuro estiver pronto, seja cancelado, aprovado uma exceção ou um objeto Python como resultado, o retorno de chamada da tarefa será chamado e voltará à existência.

Asyncio

A última questão candente que devemos responder é - como o IO é implementado?

Bem no fundo do assíncio, temos um loop de eventos. Um ciclo de eventos de tarefas. O trabalho do loop de eventos é chamar as tarefas sempre que estiverem prontas e coordenar todo esse esforço em uma única máquina de trabalho.

A parte IO do loop de eventos é construída sobre uma única função crucial chamada select. Selecionar é uma função de bloqueio, implementada pelo sistema operacional subjacente, que permite aguardar nos soquetes por dados de entrada ou saída. Quando os dados são recebidos, ele desperta e retorna os soquetes que receberam os dados, ou os soquetes que estão prontos para gravação.

Quando você tenta receber ou enviar dados por um soquete através do assíncio, o que realmente acontece a seguir é que o soquete é primeiro verificado se possui algum dado que pode ser lido ou enviado imediatamente. Se seu .send()buffer estiver cheio ou .recv()vazio, o socket é registrado para a selectfunção (simplesmente adicionando-o a uma das listas, rlistfor recve wlistfor send) e a função apropriada é awaitum futureobjeto recém-criado , ligado a esse socket.

Quando todas as tarefas disponíveis estão esperando por futuros, o loop de eventos chama selecte espera. Quando um dos soquetes tem dados de entrada, ou seu sendbuffer é drenado, asyncio verifica o futuro objeto vinculado a esse soquete e define como concluído.

Agora toda a mágica acontece. O futuro está pronto, a tarefa que se adicionou antes com add_done_callback()volta à vida e chama .send()a co-rotina que retoma a co-rotina mais interna (por causa da awaitcadeia) e você lê os dados recém-recebidos de um buffer próximo. foi derramado até.

Cadeia de método novamente, em caso de recv():

  1. select.select espera.
  2. Um soquete pronto, com dados é retornado.
  3. Os dados do soquete são movidos para um buffer.
  4. future.set_result() é chamado.
  5. A tarefa que se adicionou add_done_callback()agora foi ativada.
  6. A tarefa chama .send()a co-rotina que vai até a co-rotina mais interna e a desperta.
  7. Os dados estão sendo lidos do buffer e devolvidos ao nosso humilde usuário.

Em resumo, o asyncio usa recursos de gerador, que permitem pausar e retomar funções. Ele usa yield fromrecursos que permitem a passagem de dados para frente e para trás do gerador interno para o externo. Ele usa tudo isso para interromper a execução da função enquanto aguarda a conclusão do IO (usando a selectfunção OS ).

E o melhor de tudo? Enquanto uma função está em pausa, outra pode correr e intercalar com o delicado tecido, que é asyncio.

Bharel
fonte
12
Se houver necessidade de mais alguma explicação, não hesite em comentar. A propósito, não tenho certeza se deveria ter escrito isso como um artigo de blog ou uma resposta em stackoverflow. A pergunta é longa para responder.
Bharel
1
Em um soquete assíncrono, a tentativa de enviar ou receber dados verifica primeiro o buffer do SO. Se você estiver tentando receber e não houver dados no buffer, a função de recebimento subjacente retornará um valor de erro que se propagará como uma exceção no Python. O mesmo ocorre com o envio e um buffer cheio. Quando a exceção é levantada, o Python, por sua vez, envia esses sockets para a função select que suspende o processo. Mas não é assim que o asyncio funciona, é como o select e os sockets funcionam, que também é altamente específico do sistema operacional.
Bharel 01 de
2
@ user8371915 Sempre aqui para ajudar :-) Lembre-se que para entender o Asyncio você deve saber como funcionam os geradores, sua comunicação e yield fromfuncionamento. No entanto, observei no início que ele pode ser ignorado caso o leitor já saiba sobre ele :-) Mais alguma coisa que você acha que devo acrescentar?
Bharel
2
As coisas antes da seção de Assíncio são talvez as mais críticas, pois são a única coisa que a linguagem realmente faz por si mesma. O também selectpode se qualificar, pois é assim que as chamadas de sistema de E / S sem bloqueio funcionam no sistema operacional. As asyncioconstruções reais e o loop de evento são apenas código no nível do aplicativo criado a partir dessas coisas.
MisterMiyagi
3
Este post contém informações sobre backbone de E / S assíncronas em Python. Obrigado por uma explicação tão gentil.
mjkim
83

Falar sobre async/awaite asyncionão é a mesma coisa. A primeira é uma construção fundamental de baixo nível (corrotinas), enquanto a última é uma biblioteca que usa essas construções. Por outro lado, não existe uma resposta definitiva única.

O seguinte é uma descrição geral de como async/awaite asynciobibliotecas -como trabalho. Ou seja, pode haver outros truques no topo (existem ...), mas eles são irrelevantes, a menos que você os construa sozinho. A diferença deve ser insignificante, a menos que você já saiba o suficiente para não ter que fazer essa pergunta.

1. Corrotinas versus sub-rotinas em uma casca de noz

Assim como as sub-rotinas (funções, procedimentos, ...), co-rotinas (geradores, ...) são uma abstração da pilha de chamadas e do ponteiro de instrução: há uma pilha de peças de código em execução e cada uma está em uma instrução específica.

A distinção de defversus async defé apenas para maior clareza. A diferença real é returnversus yield. A partir disso, awaitou yield fromveja a diferença de chamadas individuais para pilhas inteiras.

1.1. Sub-rotinas

Uma sub-rotina representa um novo nível de pilha para armazenar variáveis ​​locais e uma única passagem de suas instruções para chegar ao fim. Considere uma sub-rotina como esta:

def subfoo(bar):
     qux = 3
     return qux * bar

Quando você o executa, isso significa

  1. alocar espaço de pilha para barequx
  2. executar recursivamente a primeira instrução e pular para a próxima instrução
  3. uma vez por vez return, envia seu valor para a pilha de chamadas
  4. limpe a pilha (1.) e o ponteiro de instrução (2.)

Notavelmente, 4. significa que uma sub-rotina sempre começa no mesmo estado. Tudo o que é exclusivo da função em si é perdido na conclusão. Uma função não pode ser retomada, mesmo se houver instruções depois return.

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. Corrotinas como sub-rotinas persistentes

Uma co-rotina é como uma sub-rotina, mas pode sair sem destruir seu estado. Considere uma co-rotina como esta:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

Quando você o executa, isso significa

  1. alocar espaço de pilha para barequx
  2. executar recursivamente a primeira instrução e pular para a próxima instrução
    1. uma vez por vez yield, envia seu valor para a pilha de chamada, mas armazena a pilha e o ponteiro de instrução
    2. uma vez chamando para yield, restaure a pilha e o ponteiro de instrução e empurre os argumentos paraqux
  3. uma vez por vez return, envia seu valor para a pilha de chamadas
  4. limpe a pilha (1.) e o ponteiro de instrução (2.)

Observe a adição de 2.1 e 2.2 - uma co-rotina pode ser suspensa e reiniciada em pontos predefinidos. Isso é semelhante a como uma sub-rotina é suspensa durante a chamada de outra sub-rotina. A diferença é que a co-rotina ativa não está estritamente ligada à sua pilha de chamada. Em vez disso, uma co-rotina suspensa faz parte de uma pilha separada e isolada.

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

Isso significa que co-rotinas suspensas podem ser armazenadas livremente ou movidas entre pilhas. Qualquer pilha de chamadas que tenha acesso a uma co-rotina pode decidir retomá-la.

1.3. Atravessando a pilha de chamadas

Até agora, nossa co-rotina apenas desce na pilha de chamadas com yield. Uma sub-rotina pode descer e subir na pilha de chamadas com returne (). Para completar, as corrotinas também precisam de um mecanismo para subir na pilha de chamadas. Considere uma co-rotina como esta:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

Quando você o executa, isso significa que ele ainda aloca a pilha e o ponteiro de instrução como uma sub-rotina. Quando ele é suspenso, ainda é como armazenar uma sub-rotina.

No entanto, yield fromfaz ambos . Ele suspende a pilha e o ponteiro de instrução wrap e é executado cofoo. Observe que wrappermanece suspenso até cofooterminar completamente. Sempre que cofoosuspende ou algo é enviado, cofooé conectado diretamente à pilha de chamada.

1.4. Corrotinas até o fim

Conforme estabelecido, yield frompermite conectar dois escopos em outro intermediário. Quando aplicado recursivamente, significa que o topo da pilha pode ser conectado à parte inferior da pilha.

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

Observe isso roote coro_bnão se conheçam. Isso torna as co-rotinas muito mais limpas do que os callbacks: as corrotinas ainda são construídas em uma relação 1: 1 como as sub-rotinas. As corrotinas suspendem e retomam toda a pilha de execução existente até um ponto de chamada regular.

Notavelmente, rootpoderia ter um número arbitrário de corrotinas para retomar. No entanto, nunca pode retomar mais de um ao mesmo tempo. As co-rotinas da mesma raiz são concorrentes, mas não paralelas!

1,5. Python's asynceawait

A explicação até agora usou explicitamente o vocabulário yielde yield fromdos geradores - a funcionalidade subjacente é a mesma. A nova sintaxe Python3.5 asynce awaitexiste principalmente para maior clareza.

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

As instruções async fore async withsão necessárias porque você quebraria a yield from/awaitcadeia com as instruções fore nuas with.

2. Anatomia de um loop de evento simples

Por si só, uma co-rotina não tem o conceito de ceder o controle a outra co-rotina. Ele só pode ceder o controle ao chamador na parte inferior de uma pilha de co-rotinas. Esse chamador pode então mudar para outra co-rotina e executá-la.

Este nó raiz de várias co-rotinas é comumente um loop de eventos : na suspensão, uma co-rotina produz um evento no qual deseja retomar. Por sua vez, o loop de eventos é capaz de esperar com eficiência que esses eventos ocorram. Isso permite que ele decida qual co-rotina executar a seguir ou como esperar antes de retomar.

Tal design implica que existe um conjunto de eventos predefinidos que o loop entende. Várias corrotinas awaitentre si, até que finalmente um evento é awaited. Este evento pode se comunicar diretamente com o loop de eventos por meio yielddo controle.

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

A chave é que a suspensão da co-rotina permite que o loop de eventos e os eventos se comuniquem diretamente. A pilha de co-rotina intermediária não requer nenhum conhecimento sobre qual loop a está executando, nem como os eventos funcionam.

2.1.1. Eventos no tempo

O evento mais simples de lidar é chegar a um determinado momento. Este é um bloco fundamental de código encadeado também: um encadeamento repetidamente sleepaté que uma condição seja verdadeira. No entanto, uma sleepexecução normal bloqueia por si só - queremos que outras corrotinas não sejam bloqueadas. Em vez disso, queremos dizer ao loop de eventos quando ele deve retomar a pilha de co-rotinas atual.

2.1.2. Definindo um Evento

Um evento é simplesmente um valor que podemos identificar - seja por meio de um enum, um tipo ou outra identidade. Podemos definir isso com uma classe simples que armazena nosso tempo alvo. Além de armazenar as informações do evento, podemos permitir a awaituma classe diretamente.

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self

    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

Essa classe apenas armazena o evento - não diz como tratá-lo de fato.

A única característica especial é __await__- é o que a awaitpalavra - chave procura. Praticamente, é um iterador, mas não está disponível para a máquina de iteração regular.

2.2.1. Esperando um evento

Agora que temos um evento, como as corrotinas reagem a ele? Devemos ser capazes de expressar o equivalente a sleeppor awaiting nosso evento. Para ver melhor o que está acontecendo, esperamos duas vezes na metade do tempo:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

Podemos instanciar e executar essa co-rotina diretamente. Semelhante a um gerador, o uso de coroutine.sendexecuta a co-rotina até o yieldresultado.

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

Isso nos dá dois AsyncSleepeventos e um StopIterationquando a co-rotina é concluída. Observe que o único atraso é do time.sleeploop! Cada AsyncSleepum armazena apenas um deslocamento da hora atual.

2.2.2. Evento + Sono

Neste ponto, temos dois mecanismos separados à nossa disposição:

  • AsyncSleep Eventos que podem ser gerados de dentro de uma co-rotina
  • time.sleep que pode esperar sem impactar as corrotinas

Notavelmente, esses dois são ortogonais: nenhum afeta ou ativa o outro. Como resultado, podemos criar nossa própria estratégia sleeppara atender ao atraso de um AsyncSleep.

2.3. Um ciclo de eventos ingênuo

Se tivermos várias corrotinas, cada uma pode nos dizer quando deseja ser acordada. Podemos então esperar até que o primeiro deles queira ser retomado, depois o que vem depois e assim por diante. Notavelmente, em cada ponto, só nos importamos com qual é o próximo .

Isso torna o agendamento direto:

  1. classificar as corrotinas pelo tempo de despertar desejado
  2. escolha o primeiro que quer acordar
  3. espere até este ponto no tempo
  4. execute esta co-rotina
  5. repita a partir de 1.

Uma implementação trivial não precisa de nenhum conceito avançado. A listpermite classificar as corrotinas por data. Esperar é normal time.sleep. A execução de corrotinas funciona exatamente como antes com coroutine.send.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

Claro, isso tem muito espaço para melhorias. Podemos usar um heap para a fila de espera ou uma tabela de despacho para eventos. Também poderíamos buscar valores de retorno de StopIteratione atribuí-los à co-rotina. No entanto, o princípio fundamental permanece o mesmo.

2.4. Espera Cooperativa

O AsyncSleepevento e o runloop de evento são uma implementação totalmente funcional de eventos cronometrados.

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

Isso alterna cooperativamente entre cada uma das cinco co-rotinas, suspendendo cada uma por 0,1 segundos. Mesmo que o loop de eventos seja síncrono, ele ainda executa o trabalho em 0,5 segundos em vez de 2,5 segundos. Cada co-rotina mantém o estado e atua independentemente.

3. Loop de evento de I / O

Um loop de evento compatível sleepé adequado para pesquisa . No entanto, esperar por E / S em um identificador de arquivo pode ser feito com mais eficiência: o sistema operacional implementa E / S e, portanto, sabe quais identificadores estão prontos. Idealmente, um loop de evento deve oferecer suporte a um evento "pronto para I / O" explícito.

3.1. A selectchamada

Python já tem uma interface para consultar o sistema operacional para ler as alças de E / S. Quando chamado com identificadores para ler ou gravar, ele retorna os identificadores prontos para ler ou gravar:

readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

Por exemplo, podemos openescrever um arquivo e esperar que esteja pronto:

write_target = open('/tmp/foo')
readable, writeable, _ = select.select([], [write_target], [])

Uma vez que o select retorna, writeablecontém nosso arquivo aberto.

3.2. Evento de I / O básico

Semelhante à AsyncSleepsolicitação, precisamos definir um evento para E / S. Com a selectlógica subjacente , o evento deve se referir a um objeto legível - digamos um openarquivo. Além disso, armazenamos quantos dados devemos ler.

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

Como acontece com a AsyncSleepmaioria, apenas armazenamos os dados necessários para a chamada de sistema subjacente. Desta vez, __await__pode ser reiniciado várias vezes - até que o nosso desejado amountseja lido. Além disso, obtemos returno resultado de E / S em vez de apenas retomar.

3.3. Aumentando um loop de evento com I / O de leitura

A base para nosso loop de eventos ainda é a rundefinida anteriormente. Primeiro, precisamos rastrear as solicitações de leitura. Este não é mais um cronograma classificado, apenas mapeamos solicitações de leitura para corrotinas.

# new
waiting_read = {}  # type: Dict[file, coroutine]

Uma vez que select.selectleva um parâmetro de tempo limite, podemos usá-lo no lugar de time.sleep.

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(reads), [], [])

Isso nos dá todos os arquivos legíveis - se houver algum, executamos a co-rotina correspondente. Se não houver nenhum, esperamos o suficiente para que nossa corrotina atual seja executada.

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

Finalmente, temos que realmente ouvir as solicitações de leitura.

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. Juntar as peças

O texto acima foi um pouco simplificado. Precisamos fazer algumas mudanças para não deixar as corrotinas adormecidas de fome, se sempre pudermos ler. Precisamos lidar com o fato de não termos nada para ler ou nada pelo que esperar. No entanto, o resultado final ainda se encaixa em 30 LOC.

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3,5. E / S cooperativa

Os AsyncSleep, AsyncReade runimplementações estão agora totalmente funcional para dormir e / ou leitura. Da mesma forma que para sleepy, podemos definir um auxiliar para testar a leitura:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = return await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

Executando isso, podemos ver que nosso I / O é intercalado com a tarefa de espera:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. E / S sem bloqueio

Embora a E / S em arquivos transmita o conceito, não é realmente adequado para uma biblioteca como asyncio: a selectchamada sempre retorna para arquivos , e ambos opene readpode bloquear indefinidamente . Isso bloqueia todas as co-rotinas de um loop de evento - o que é ruim. Bibliotecas como o aiofilesuso de threads e sincronização para falsificar E / S sem bloqueio e eventos no arquivo.

No entanto, os soquetes permitem E / S sem bloqueio - e sua latência inerente o torna muito mais crítico. Quando usado em um loop de evento, a espera por dados e a nova tentativa podem ser quebradas sem bloquear nada.

4.1. Evento de I / O sem bloqueio

Semelhante ao nosso AsyncRead, podemos definir um evento suspend-and-read para sockets. Em vez de pegar um arquivo, pegamos um socket - que deve ser não bloqueador. Além disso, nossos __await__usos em socket.recvvez de file.read.

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

Em contraste com AsyncRead, __await__executa E / S verdadeiramente sem bloqueio. Quando os dados estão disponíveis, ele sempre lê. Quando nenhum dado está disponível, ele sempre é suspenso. Isso significa que o loop de eventos só é bloqueado enquanto realizamos um trabalho útil.

4.2. Desbloqueando o loop de eventos

No que diz respeito ao loop de eventos, nada muda muito. O evento a ser escutado ainda é o mesmo que o dos arquivos - um descritor de arquivo marcado como pronto por select.

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

Neste ponto, deve ser óbvio que AsyncReade AsyncRecvsão o mesmo tipo de evento. Poderíamos facilmente refatorá-los para ser um evento com um componente de E / S intercambiável. Na verdade, o loop de eventos, as corrotinas e os eventos separam claramente um planejador, código intermediário arbitrário e a E / S real.

4.3. O lado feio do I / O sem bloqueio

Em princípio, o que você deve fazer neste ponto é replicar a lógica de readcomo um recvpara AsyncRecv. No entanto, isso é muito mais feio agora - você tem que lidar com os retornos iniciais quando as funções bloqueiam dentro do kernel, mas fornecem o controle para você. Por exemplo, abrir uma conexão em vez de abrir um arquivo é muito mais demorado:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

Para encurtar a história, o que resta são algumas dezenas de linhas de tratamento de exceções. Os eventos e o loop de eventos já funcionam neste ponto.

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

Termo aditivo

Exemplo de código no github

MisterMiyagi
fonte
Usar yield selfem AsyncSleep me dá Task got back yielderro, por que isso? Vejo que o código em asyncio.Futures usa isso. Usar um rendimento mínimo funciona bem.
Ron Serruya de
1
Os loops de eventos geralmente esperam apenas seus próprios eventos. Geralmente, você não pode misturar eventos e loops de eventos entre bibliotecas; os eventos mostrados aqui funcionam apenas com o loop de eventos mostrado. Especificamente, o asyncio usa apenas Nenhum (ou seja, um rendimento mínimo) como um sinal para o loop de eventos. Os eventos interagem diretamente com o objeto de loop de evento para registrar wakeups.
MisterMiyagi
12

Seu corodesugaring é conceitualmente correto, mas ligeiramente incompleto.

awaitnão suspende incondicionalmente, mas apenas se encontrar uma chamada de bloqueio. Como ele sabe que uma chamada está bloqueando? Isso é decidido pelo código que está sendo aguardado. Por exemplo, uma implementação aguardada de leitura de soquete pode ser desugared para:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

No assíncio real, o código equivalente modifica o estado de a em Futurevez de retornar valores mágicos, mas o conceito é o mesmo. Quando apropriadamente adaptado a um objeto do tipo gerador, o código acima pode ser awaiteditado.

Do lado do chamador, quando sua co-rotina contém:

data = await read(sock, 1024)

Ele se transforma em algo próximo a:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

Pessoas familiarizadas com geradores tendem a descrever o que precede em termos de yield fromqual faz a suspensão automaticamente.

A cadeia de suspensão continua todo o caminho até o loop de evento, que percebe que a co-rotina está suspensa, remove-a do conjunto executável e continua a executar corrotinas executáveis, se houver. Se nenhuma co-rotina puder ser executada, o loop espera select()até que um descritor de arquivo no qual uma co-rotina esteja interessada esteja pronto para E / S. (O loop de eventos mantém um mapeamento de descritor de arquivo para corrotina.)

No exemplo acima, uma vez que select()diz ao loop de eventos que socké legível, ele será adicionado novamente coroao conjunto executável, de forma que continuará a partir do ponto de suspensão.

Em outras palavras:

  1. Tudo acontece no mesmo segmento por padrão.

  2. O loop de eventos é responsável por programar as corrotinas e despertá-las quando o que quer que estejam esperando (normalmente uma chamada IO que normalmente seria bloqueada ou um tempo limite) estiver pronto.

Para insights sobre os loops de evento de direção de corrotina, recomendo esta palestra de Dave Beazley, onde ele demonstra a codificação de um loop de evento do zero na frente do público ao vivo.

user4815162342
fonte
Obrigado, isso é mais próximo do que estou procurando, mas, isso ainda não explica por async.wait_for()que não faz o que deveria ... Por que é um problema tão grande adicionar um retorno de chamada ao loop de evento e informá-lo processar quantos retornos de chamada forem necessários, incluindo o que você acabou de adicionar? Minha frustração com asyncioé em parte devido ao fato de que o conceito subjacente é muito simples e, por exemplo, o Emacs Lisp teve implementação por muito tempo, sem usar chavões ... (ou seja create-async-processe accept-process-output- e isso é tudo o que é necessário ... (cont.)
wvxvw
10
@wvxvw Fiz o máximo que pude para responder à pergunta que você postou, o máximo possível, visto que apenas o último parágrafo contém seis perguntas. E assim continuamos - não é que wait_for não faça o que deveria (faz, é uma corrotina que você deve esperar), é que suas expectativas não correspondem ao que o sistema foi projetado e implementado para fazer. Acho que seu problema poderia ser comparado a asyncio se o loop de eventos estivesse em execução em um thread separado, mas não sei os detalhes do seu caso de uso e, honestamente, sua atitude não torna muito divertido ajudá-lo.
user4815162342
5
@wvxvw My frustration with asyncio is in part due to the fact that the underlying concept is very simple, and, for example, Emacs Lisp had implementation for ages, without using buzzwords...- Nada impede você de implementar este conceito simples sem chavões para o Python, então :) Por que você usa esse asyncio feio afinal? Implemente o seu do zero. Por exemplo, você pode começar criando sua própria async.wait_for()função que faz exatamente o que deveria.
Mikhail Gerasimov
1
@MikhailGerasimov você parece pensar que é uma pergunta retórica. Mas, eu gostaria de dissipar o mistério para você. A linguagem é projetada para falar com os outros. Não posso escolher para os outros a língua que falam, mesmo que acredite que a língua que falam seja lixo, o melhor que posso fazer é tentar convencê-los de que é esse o caso. Em outras palavras, se eu fosse livre para escolher, nunca escolheria Python para começar, muito menos asyncio. Mas, em princípio, essa decisão não é minha. Sou coagido a usar linguagem lixo por meio de en.wikipedia.org/wiki/Ultimatum_game .
wvxvw
4

Tudo se resume aos dois principais desafios que asyncio está enfrentando:

  • Como realizar I / O múltiplo em um único thread?
  • Como implementar multitarefa cooperativa?

A resposta ao primeiro ponto já existe há muito tempo e é chamada de loop de seleção . Em python, é implementado no módulo seletores .

A segunda questão está relacionada ao conceito de co-rotina , ou seja, funções que podem interromper sua execução e ser restauradas posteriormente. Em python, as corrotinas são implementadas usando geradores e o rendimento da instrução. Isso é o que se esconde por trás da sintaxe async / await .

Mais recursos nesta resposta .


EDIT: Endereçando seu comentário sobre goroutines:

O equivalente mais próximo de uma goroutina em asyncio não é, na verdade, uma co-rotina, mas uma tarefa (veja a diferença na documentação ). Em python, uma co-rotina (ou gerador) não sabe nada sobre os conceitos de loop de evento ou I / O. É simplesmente uma função que pode interromper sua execução yieldenquanto mantém seu estado atual, para que possa ser restaurada posteriormente. A yield fromsintaxe permite encadea-los de forma transparente.

Agora, dentro de uma tarefa de assíncio, a co-rotina na base da cadeia sempre acaba rendendo um futuro . Esse futuro então borbulha no ciclo de eventos e é integrado ao mecanismo interno. Quando o futuro é definido como concluído por algum outro retorno de chamada interno, o loop de eventos pode restaurar a tarefa enviando o futuro de volta à cadeia de co-rotina.


EDIT: Resolvendo algumas das perguntas em sua postagem:

Como o I / O realmente acontece neste cenário? Em um tópico separado? Todo o interpretador está suspenso e o I / O ocorre fora do interpretador?

Não, nada acontece em um tópico. A E / S é sempre gerenciada pelo loop de eventos, principalmente por meio de descritores de arquivo. No entanto, o registro desses descritores de arquivo é geralmente oculto por corrotinas de alto nível, fazendo o trabalho sujo para você.

O que exatamente significa I / O? Se meu procedimento python chamou procedimento C open () e, por sua vez, enviou interrupção para o kernel, cedendo o controle a ele, como o interpretador Python sabe sobre isso e é capaz de continuar executando algum outro código, enquanto o código do kernel faz o real I / O e até que acorde o procedimento Python que enviou a interrupção originalmente? Como pode o interpretador Python, em princípio, estar ciente de que isso está acontecendo?

Uma E / S é qualquer chamada de bloqueio. No assíncio, todas as operações de I / O devem passar pelo loop de eventos, pois como você disse, o loop de eventos não tem como saber que uma chamada de bloqueio está sendo realizada em algum código síncrono. Isso significa que você não deve usar um síncrono openno contexto de uma co-rotina. Em vez disso, use uma biblioteca dedicada, como aiofiles, que fornece uma versão assíncrona de open.

Vincent
fonte
Dizer que as corrotinas são implementadas usando yield fromnão diz nada. yield fromé apenas uma construção de sintaxe, não é um bloco de construção fundamental que os computadores podem executar. Da mesma forma, para selecionar o loop. Sim, as corrotinas em Go também usam loop de seleção, mas o que eu estava tentando fazer funcionaria em Go, mas não em Python. Preciso de respostas mais detalhadas para entender por que não funcionou.
wvxvw
Desculpe ... não, na verdade não. "futuro", "tarefa", "forma transparente", "rendimento de" são apenas chavões, não são objetos do domínio da programação. a programação tem variáveis, procedimentos e estruturas. Portanto, dizer que "goroutine é uma tarefa" é apenas uma declaração circular que levanta uma questão. No final das contas, uma explicação do que asyncio, para mim, se resumiria a um código C que ilustra o que a sintaxe do Python foi traduzida.
wvxvw
Para explicar melhor por que sua resposta não responde à minha pergunta: com todas as informações que você forneceu, não tenho ideia de por que minha tentativa a partir do código que postei na pergunta vinculada não funcionou. Estou absolutamente certo de que poderia escrever o loop de eventos de forma que esse código funcionasse. Na verdade, seria assim que eu escreveria um loop de eventos, se tivesse que escrever um.
wvxvw
7
@wvxvw eu discordo. Esses não são "chavões", mas conceitos de alto nível que foram implementados em muitas bibliotecas. Por exemplo, uma tarefa assíncio, um gevent greenlet e uma goroutine correspondem todos à mesma coisa: uma unidade de execução que pode ser executada simultaneamente em um único thread. Além disso, não acho que C seja necessário para entender asyncio, a menos que você queira entrar no funcionamento interno dos geradores python.
Vincent
@wvxvw Veja minha segunda edição. Isso deve eliminar alguns equívocos do caminho.
Vincent