Fazendo uma tarefa assíncrona no Flask

96

Estou escrevendo um aplicativo no Flask, que funciona muito bem, exceto que WSGIé síncrono e bloqueador. Tenho uma tarefa em particular que chama uma API de terceiros e essa tarefa pode levar vários minutos para ser concluída. Eu gostaria de fazer essa ligação (na verdade, é uma série de ligações) e deixá-la funcionar. enquanto o controle é devolvido ao Flask.

Minha visão é semelhante a:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )

Agora, o que eu quero fazer é ter a linha

final_file = audio_class.render_audio()

execute e forneça um retorno de chamada a ser executado quando o método retornar, enquanto o Flask pode continuar a processar as solicitações. Esta é a única tarefa que preciso que o Flask execute de forma assíncrona, e gostaria de alguns conselhos sobre a melhor forma de implementar isso.

Eu olhei para Twisted e Klein, mas não tenho certeza se eles são exageros, já que talvez Threading fosse suficiente. Ou talvez o aipo seja uma boa escolha para isso?

Darwin Tech
fonte
Eu costumo usar aipo para isso ... pode ser um exagero, mas afaik threading não funciona bem em ambientes web (iirc ...)
Joran Beasley
Certo. Sim - eu estava investigando o Celery. Pode ser uma boa abordagem. Fácil de implementar com Flask?
Darwin Tech
heh eu costumo usar um servidor de socket também (flask-socketio) e sim eu achei que era bem fácil ... a parte mais difícil foi instalar tudo
Joran Beasley
4
Eu recomendaria verificar isso . Esse cara escreve ótimos tutoriais para flask em geral, e este é ótimo para entender como integrar tarefas assíncronas em um aplicativo flask.
atlspin em

Respostas:

100

Eu usaria o Celery para lidar com a tarefa assíncrona para você. Você precisará instalar um corretor para servir como sua fila de tarefas (RabbitMQ e Redis são recomendados).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )

Execute seu aplicativo Flask e inicie outro processo para executar seu trabalhador de aipo.

$ celery worker -A app.celery --loglevel=debug

Também gostaria de referir-se a Miguel Gringberg escrever-se para um mais em guia de profundidade para usar aipo com garrafa.

Connie
fonte
34

O threading é outra solução possível. Embora a solução baseada em Celery seja melhor para aplicativos em escala, se você não está esperando muito tráfego no terminal em questão, o encadeamento é uma alternativa viável.

Esta solução é baseada na apresentação PyCon 2016 Flask at Scale de Miguel Grinberg , especificamente o slide 41 em seu conjunto de slides. Seu código também está disponível no github para os interessados ​​no código-fonte original.

Da perspectiva do usuário, o código funciona da seguinte maneira:

  1. Você faz uma chamada para o terminal que executa a tarefa de longa duração.
  2. Este terminal retorna 202 Aceito com um link para verificar o status da tarefa.
  3. As chamadas para o link de status retornam 202 enquanto a taks ainda está em execução e retornam 200 (e o resultado) quando a tarefa é concluída.

Para converter uma chamada de API em uma tarefa em segundo plano, basta adicionar o decorador @async_api.

Aqui está um exemplo completo:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)
Jurgen Strydom
fonte
Quando uso este código, recebo o erro werkzeug.routing.BuildError: Não foi possível construir o url para o endpoint 'gettaskstatus' com os valores ['task_id'] Estou faltando alguma coisa?
Nicolas Dufaur
10

Você também pode tentar usar multiprocessing.Processcom daemon=True; o process.start()método não bloqueia e você pode retornar uma resposta / status imediatamente ao chamador enquanto sua função cara é executada em segundo plano.

Eu experimentei um problema semelhante ao trabalhar com o framework Falcon e usar o daemonprocesso ajudou.

Você precisaria fazer o seguinte:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")

Você deve obter uma resposta imediatamente e, após 10s, deverá ver uma mensagem impressa no console.

NOTA: Lembre-se de que os daemonicprocessos não podem gerar nenhum processo filho.

Tomasz Bartkowiak
fonte
assíncrono é um certo tipo de simultaneidade que não é threading nem multiprocessamento. Threading é, no entanto, muito mais próximo em propósito como tarefa assíncrona,
tortal
3
Eu não entendo seu ponto. O autor está falando sobre uma tarefa assíncrona, que é a tarefa executada "em segundo plano", de forma que o chamador não bloqueia até obter uma resposta. Gerar um processo deamon é um exemplo de onde tal assincronismo pode ser alcançado.
Tomasz Bartkowiak
e se o /render/<id>endpoint espera algo como resultado my_func()?
Will Gu
Você pode my_funcenviar resposta / pulsação para algum outro terminal, por exemplo. Ou você pode estabelecer e compartilhar alguma fila de mensagens através da qual você pode se comunicar commy_func
Tomasz Bartkowiak