Como você testa a unidade de uma tarefa de aipo?

Respostas:

61

É possível testar tarefas de forma síncrona usando qualquer biblioteca de teste de unidade disponível. Normalmente faço 2 sessões de teste diferentes ao trabalhar com tarefas de aipo. O primeiro (como estou sugerindo abaixo) é completamente síncrono e deve ser aquele que garante que o algoritmo faça o que deve fazer. A segunda sessão usa todo o sistema (incluindo o corretor) e garante que não estou tendo problemas de serialização ou qualquer outro problema de distribuição ou comunicação.

Assim:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

E seu teste:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Espero que ajude!

FlaPer87
fonte
1
Isso funciona, exceto em tarefas que usam um HttpDispatchTask - docs.celeryproject.org/en/latest/userguide/remote-tasks.html onde eu tenho que definir celery.conf.CELERY_ALWAYS_EAGER = True, mas mesmo com a configuração celery.conf.CELERY_IMPORTS = ('celery.task.http') o teste falha com NotRegistered: celery.task.http.HttpDispatchTask
davidmytton
Estranho, tem certeza de que não está tendo problemas de importação? Este teste funciona (observe que estou falsificando a resposta para que ele retorne o que o aipo espera). Além disso, os módulos definidos em CELERY_IMPORTS serão importados durante a inicialização dos workers , para evitar isso sugiro que você chame celery.loader.import_default_modules().
FlaPer87 de
Eu também sugiro que você dê uma olhada aqui . Ele zomba da solicitação http. Não sei se isso ajuda, acho que você quer testar um serviço que está instalado e funcionando, não é?
FlaPer87 de
52

Eu uso isso:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Documentos: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER permite que você execute sua tarefa de forma síncrona e você não precisa de um servidor de celery.

Guettli
fonte
1
Eu acho que isso está desatualizado - eu entendo ImportError: No module named celeryconfig.
Daenyth,
7
Acredito que acima assume que o módulo celeryconfig.pyexiste em um pacote. Consulte docs.celeryproject.org/en/latest/getting-started/… .
Kamil Sindi
1
Eu sei que é antigo, mas você pode fornecer um exemplo completo de como iniciar as tarefas addda pergunta do OP em uma TestCaseclasse?
Kruupös
@ MaxChrétien, desculpe, não posso fornecer um exemplo completo, pois não uso mais aipo. Você pode editar minha pergunta, se tiver pontos de reputação suficientes. Se você não tiver o suficiente, diga o que devo copiar e colar nesta resposta.
guettli
1
@ miken32 obrigado. Como a resposta mais recente de alguma forma resolve o problema com o qual eu queria ajudar, acabei de deixar um comentário de que a documentação oficial do 4.0 desencoraja o uso CELERY_TASK_ALWAYS_EAGERpara testes de unidade.
Krassowski
33

Depende do que exatamente você deseja testar.

  • Teste o código da tarefa diretamente. Não chame "task.delay (...)" apenas chame "task (...)" de seus testes de unidade.
  • Use CELERY_ALWAYS_EAGER . Isso fará com que suas tarefas sejam chamadas imediatamente no ponto em que você disser "task.delay (...)", para que você possa testar todo o caminho (mas não qualquer comportamento assíncrono).
slacy
fonte
24

teste de unidade

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test fixtures

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Adendo: faça com que send_task respeite ansioso

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task
Kamil Sindi
fonte
22

Para quem está no Celery 4, é:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Como os nomes das configurações foram alterados e precisam ser atualizados se você optar por atualizar, consulte

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

okrutny
fonte
11
De acordo com os documentos oficiais , o uso de "task_always_eager" (anterior "CELERY_ALWAYS_EAGER") não é adequado para teste de unidade. Em vez disso, eles propõem algumas outras ótimas maneiras de testar a unidade do seu aplicativo Celery.
Krassowski
4
Vou apenas acrescentar que a razão pela qual você não quer tarefas ansiosas em seus testes de unidade é porque você não está testando, por exemplo, a serialização de parâmetros que acontecerá quando você estiver usando o código em produção.
maldito
15

A partir do Celery 3.0 , uma maneira de definir CELERY_ALWAYS_EAGERno Django é:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())
Aaron Lelevier
fonte
7

Desde o Celery v4.0 , os acessórios py.test são fornecidos para iniciar um trabalhador de aipo apenas para o teste e são desligados quando concluído:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: [email protected] (running)>
    assert myfunc.delay().wait(3)

Entre outros acessórios descritos em http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test , você pode alterar as opções padrão de aipo redefinindo o celery_configacessório desta forma:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

Por padrão, o trabalhador de teste usa um broker na memória e back-end de resultado. Não há necessidade de usar um Redis ou RabbitMQ local se não estiver testando recursos específicos.

Alanjds
fonte
Caro downvoter, você gostaria de compartilhar por que essa é uma resposta ruim? Sinceramente obrigado.
alanjds
2
Não funcionou para mim, o conjunto de testes apenas trava. Você poderia fornecer mais algum contexto? (Eu não votei ainda;)).
dualidade_
No meu caso, tive que definir explicitamente o fixture celey_config para usar o corretor de memória e o back
sanzoghenzo
5

referência usando pytest.

def test_add(celery_worker):
    mytask.delay()

se você usar o flask, defina a configuração do aplicativo

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

e em conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application
Yoge
fonte
2

No meu caso (e suponho que muitos outros), tudo que eu queria era testar a lógica interna de uma tarefa usando o pytest.

TL; DR; acabou zombando de tudo ( OPÇÃO 2 )


Exemplo de caso de uso :

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

mas desde shared_task decorador faz muita lógica interna de aipo, não é realmente um teste de unidade.

Então, para mim, havia 2 opções:

OPÇÃO 1: lógica interna separada

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

Isso parece muito estranho e, além de torná-lo menos legível, é necessário extrair e transmitir manualmente os atributos que fazem parte da solicitação, por exemplo, task_id , caso você precise, o que torna a lógica menos pura.

OPÇÃO 2:
zombando de partes internas de aipo

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

que me permite simular o objeto de solicitação (novamente, no caso de você precisar de coisas da solicitação, como o id ou o contador de novas tentativas.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

Esta solução é muito mais manual, mas me dá o controle que preciso para testar de fato a unidade , sem me repetir e sem perder o alcance do aipo.

Daniel Dubovski
fonte