Como verificar o status da tarefa no Celery?

91

Como verificar se uma tarefa está sendo executada no aipo (especificamente, estou usando o celery-django)?

Eu li a documentação e pesquisei, mas não consigo ver uma chamada como:

my_example_task.state() == RUNNING

Meu caso de uso é que tenho um serviço externo (java) para transcodificação. Quando envio um documento para ser transcodificado, quero verificar se a tarefa que executa esse serviço está em execução e, caso não esteja, (re) iniciá-la.

Estou usando as versões estáveis ​​atuais - 2.4, acredito.

Marcin
fonte

Respostas:

97

Retorne o task_id (que é fornecido a partir de .delay ()) e pergunte à instância de aipo depois sobre o estado:

x = method.delay(1,2)
print x.task_id

Ao perguntar, obtenha um novo AsyncResult usando este task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Gregor
fonte
10
Obrigado, mas e se eu não tiver acesso a x?
Marcin
4
Onde você enfileira seus empregos com aipo? Lá você deve retornar o task_id para rastrear o trabalho no futuro.
Gregor
Ao contrário de @Marcin, esta resposta não usa o método estático Task.AsyncResult () como a fábrica do AsyncResult, que reutiliza a configuração de back-end, caso contrário, um erro é gerado ao tentar obter o resultado.
ArnauOrriols
2
@Chris A controvérsia com o código @gregor está na instanciação de async_result. No seu caso de uso, você já tem a instância, está pronto para prosseguir. Mas o que acontecerá se você tiver apenas o ID da tarefa e precisar instanciar uma async_resultinstância para poder fazer a chamada async_result.get()? Esta é uma instância da AsyncResultclasse, mas você não pode usar a classe bruta celery.result.AsyncResult, você precisa obter a classe da função envolvida por app.task(). No seu caso, você fariaasync_result = run_instance.AsyncResult('task-id')
ArnauOrriols
1
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - Acho que é assim que deveria ser usado. Leia o código: github.com/celery/celery/blob/…
nevelis
69

Criar um AsyncResultobjeto a partir do id da tarefa é a forma recomendada no FAQ para obter o status da tarefa quando a única coisa que você tem é o id da tarefa.

No entanto, a partir do Celery 3.x, existem advertências significativas que podem morder as pessoas se não derem atenção a elas. Realmente depende do cenário de caso de uso específico.

Por padrão, o Celery não registra um estado de "execução".

Para que o Celery registre que uma tarefa está em execução, você deve definir task_track_startedcomo True. Aqui está uma tarefa simples que testa isso:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Quando task_track_startedé False, que é o padrão, a exibição do estado é PENDINGmesmo que a tarefa tenha sido iniciada. Se você definir task_track_startedcomo True, o estado será STARTED.

O estado PENDINGsignifica "Não sei".

Um AsyncResultcom o estado PENDINGnão significa nada mais do que o fato de que Celery não conhece o status da tarefa. Isso pode ser por uma série de razões.

Por um lado, AsyncResultpode ser construído com ids de tarefa inválidos. Essas "tarefas" serão consideradas pendentes pelo Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, então ninguém vai alimentar ids obviamente inválidos para AsyncResult. É justo, mas também tem efeito que AsyncResulttambém considerará uma tarefa que foi executada com sucesso, mas que o aipo esqueceu como sendo PENDING. Novamente, em alguns cenários de casos de uso, isso pode ser um problema. Parte do problema gira em torno de como o Celery está configurado para manter os resultados das tarefas, porque depende da disponibilidade das "marcas para exclusão" no backend de resultados. ("Tombstones" é o termo usado na documentação do Celery para os blocos de dados que registram como a tarefa terminou.) O uso AsyncResultnão funcionará se task_ignore_resultfor True. Um problema mais irritante é que o Celery expira as marcas de exclusão por padrão. oresult_expiresconfiguração padrão é 24 horas. Portanto, se você iniciar uma tarefa e registrar a id no armazenamento de longo prazo, e mais 24 horas depois, criar um AsyncResultcom ela, o status será PENDING.

Todas as "tarefas reais" começam no PENDINGestado. Portanto, realizar PENDINGuma tarefa pode significar que a tarefa foi solicitada, mas nunca avançou além disso (por qualquer motivo). Ou pode significar que a tarefa foi executada, mas Celery esqueceu seu estado.

Ai! AsyncResultnão vai funcionar para mim. O que mais eu posso fazer?

Prefiro controlar as metas do que as tarefas em si . Eu mantenho algumas informações sobre as tarefas, mas elas são secundárias para manter o controle dos objetivos. As metas são armazenadas em armazenamento independente do Aipo. Quando uma solicitação precisa realizar um cálculo depende de algum objetivo ter sido alcançado, ele verifica se o objetivo já foi alcançado, se sim, então ele usa este objetivo em cache, caso contrário, ele inicia a tarefa que afetará o objetivo e envia para o cliente que fez a solicitação HTTP uma resposta que indica que ele deve esperar por um resultado.


Os nomes de variáveis ​​e hiperlinks acima são para Celery 4.x. Em 3.x as variáveis e hiperlinks correspondentes são: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

Louis
fonte
Portanto, se eu quiser verificar o resultado mais tarde (talvez até dentro de outro processo), estou melhor com minha própria implementação? Armazenando o resultado no banco de dados manualmente?
Franklin Yu
Sim, eu separaria o controle de "meta" de "tarefas". Escrevi "realizar um cálculo que depende de algum objetivo". Normalmente, o "objetivo" também é um cálculo. Por exemplo, se eu quiser mostrar o artigo X para um usuário, devo convertê-lo de XML para HTML, mas antes disso, devo ter resolvido todas as referências bibliográficas. (X é como um artigo de jornal.) Eu verifico se a meta "artigo X com todas as referências bibliográficas resolvidas" existe e uso-a em vez de tentar verificar o status da tarefa de uma tarefa de Celery que teria calculado a meta que desejo.
Louis
E a informação "artigo X com todas as referências bibliográficas resolvidas" é armazenada em um cache de memória e armazenada em um banco de dados eXist-db.
Louis
61

Todo Taskobjeto possui uma .requestpropriedade, que o contém AsyncRequest. Consequentemente, a linha a seguir fornece o estado de uma Tarefa task:

task.AsyncResult(task.request.id).state
Marcin
fonte
2
Existe uma maneira de armazenar a porcentagem de progresso de uma tarefa?
patrick
4
Quando faço isso, obtenho um AsyncResult permanentemente PENDENTE, mesmo se esperar o tempo suficiente para a tarefa terminar. Existe uma maneira de fazer isso ver mudanças de estado? Acredito que meu back-end está configurado e tentei definir CELERY_TRACK_STARTED = True sem sucesso.
dstromberg
1
@dstromberg Infelizmente, já se passaram 4 anos desde que isso foi um problema para mim, então não posso ajudar. É quase certo que você precisa configurar o aipo para rastrear o status.
Marcin
16

Você também pode criar estados personalizados e atualizar seu valor na execução da tarefa. Este exemplo é do docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

msangel
fonte
11

Pergunta antiga, mas recentemente me deparei com esse problema.

Se você está tentando obter o task_id, pode fazer assim:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Agora você sabe exatamente o que é task_id e pode usá-lo para obter o AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Cesar Rios
fonte
3
Não há absolutamente nenhuma necessidade de criar seu próprio ID de tarefa e passá-lo para apply_async. O objeto retornado por apply_async é um AsyncResultobjeto, que possui o id da tarefa que o Celery gerou.
Louis
1
Corrija-me se eu estiver errado, mas às vezes não é útil gerar um UUID com base em algumas entradas, de modo que todas as chamadas que obtêm as mesmas entradas obtenham o mesmo UUID? IOW, talvez às vezes seja útil especificar seu task_id.
dstromberg
1
@dstromberg A pergunta feita pelo OP é "como eu verifico o status da tarefa" e a resposta aqui é "Se você está tentando obter o task_id ...". Nem a verificação do status da tarefa, nem a obtenção de task_idexigir que você mesmo gere um ID de tarefa. Em seu comentário, você imaginou um motivo que vai além de "como faço para verificar o status da tarefa" e "Se você está tentando obter o task_id ...` Ótimo se você tiver essa necessidade, mas não é o caso aqui. (Além disso, usar uuid()para gerar um id de tarefa não faz absolutamente nada além do que o Celery faz por padrão.)
Louis
Concordo que o OP não perguntou especificamente como obter IDs de tarefas previsíveis, mas a resposta à pergunta do OP atualmente é "rastrear o ID da tarefa e fazer x". Parece-me que rastrear o ID da tarefa é impraticável em uma ampla variedade de situações, de modo que a resposta pode não ser realmente satisfatória. Essa resposta me ajuda a resolver meu caso de uso (se eu puder superar outras limitações observadas) pelo mesmo motivo que @dstromberg aponta - seja ou não motivado por esse motivo.
claytond
6

Basta usar esta API do FAQ do aipo

result = app.AsyncResult(task_id)

Isso funciona bem.

David Ding
fonte
1

Resposta de 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Adrian Garcia Moreno
fonte
0

Experimentar:

task.AsyncResult(task.request.id).state

isso fornecerá o status da Tarefa de aipo. Se a tarefa de aipo já estiver no estado de FALHA, ela lançará uma exceção:

raised unexpected: KeyError('exc_type',)

gogasca
fonte
0

Encontrei informações úteis no

Guia dos Trabalhadores do Projeto de Aipo trabalhadores-inspetores

No meu caso, estou verificando se o Celery está funcionando.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Você pode brincar com inspecionar para atender às suas necessidades.

zerocog
fonte
0
  • Primeiro, em seu aplicativo de aipo:

vi my_celery_apps / app1.py

app = Celery(worker_name)
  • e, em seguida, mude para o arquivo de tarefa, importe o aplicativo do seu módulo de aplicativo aipo.

vi tasks / task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

Você ZhengChuan
fonte
-1

Além da abordagem programática acima, o status da Tarefa de flores pode ser facilmente visto.

Monitoramento em tempo real usando Celery Events. Flower é uma ferramenta baseada na web para monitorar e administrar clusters de Celery.

  1. Progresso e histórico da tarefa
  2. Capacidade de mostrar detalhes da tarefa (argumentos, hora de início, tempo de execução e mais)
  3. Gráficos e estatísticas

Documento oficial: Flor - ferramenta de monitoramento de aipo

Instalação:

$ pip install flower

Uso:

http://localhost:5555
Roshan Bagdiya
fonte
-1
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
Saurabh I
fonte
2
Por favor, não poste apenas código como resposta, mas também forneça uma explicação sobre o que seu código faz e como ele resolve o problema da questão. As respostas com uma explicação geralmente são mais úteis e de melhor qualidade, e têm maior probabilidade de atrair votos positivos.
Mark Rotteveel