Cancelar uma tarefa já em execução com o Celery?

91

Tenho lido o documento e pesquisado, mas não consigo encontrar uma resposta direta:

Você pode cancelar uma tarefa já em execução? (como na tarefa foi iniciada, demora um pouco e precisa ser cancelada no meio)

Eu encontrei isso no documento no FAQ do Celery

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

Mas não estou certo se isso irá cancelar tarefas enfileiradas ou se irá interromper um processo em execução em um trabalhador. Obrigado por qualquer luz que você possa lançar!

dcoffey3296
fonte

Respostas:

179

revoke cancela a execução da tarefa. Se uma tarefa for revogada, os trabalhadores ignoram a tarefa e não a executam. Se você não usar revogações persistentes, sua tarefa pode ser executada após a reinicialização do trabalhador.

http://docs.celeryproject.org/en/latest/userguide/workers.html#worker-persistent-revokes

revoke tem uma opção de término que é False por padrão. Se você precisa matar a tarefa em execução, você precisa definir terminate como True .

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

http://docs.celeryproject.org/en/latest/userguide/workers.html#revoke-revoking-tasks

mais
fonte
3
Essa é exatamente a explicação que eu procurava, obrigado!
dcoffey3296
1
Isso funciona em um ambiente distribuído? Quero dizer, se eu tiver trabalhadores em várias máquinas executando tarefas. O aipo rastreia em qual máquina a tarefa está sendo executada?
ksrini
1
É verdade. A comunicação com os trabalhadores ocorre por meio do corretor.
mher
4
result.revoke (terminate = True) deve fazer a mesma coisa que revoke (task_id, terminate = True)
CamHart
9
Além disso, usar a opção encerrar é "um último recurso para administradores", conforme documentos recentes do Celery. Você corre o risco de encerrar outra tarefa que foi iniciada recentemente nesse trabalhador.
kouk
38

No Celery 3.1, a API de revogação de tarefas foi alterada.

De acordo com o FAQ de aipo , você deve usar result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

ou se você tiver apenas o ID da tarefa:

>>> from proj.celery import app
>>> app.control.revoke(task_id)
Rockallite
fonte
25

A resposta de @ 0x00mh está correta, no entanto documentos recentes sobre aipo dizem que usar a terminateopção é " um último recurso para administradores " porque você pode encerrar acidentalmente outra tarefa que começou a ser executada nesse meio tempo. Possivelmente, uma solução melhor é combinar terminate=Truecom signal='SIGUSR1'(o que faz com que a exceção SoftTimeLimitExceeded seja gerada na tarefa).

kouk
fonte
2
Essa solução funcionou muito bem para mim. Quando SoftTimeLimitExceededé gerado em minha tarefa, minha lógica de limpeza personalizada (implementada via try/ except/ finally) é chamada. Isso é muito melhor, na minha opinião, do que o que AbortableTaskoferece ( docs.celeryproject.org/en/latest/reference/… ). Com o último, você precisa de um back - end de resultado do banco de dados e tem que verificar manual e repetidamente o status de uma tarefa em andamento para ver se ela foi abortada.
David Schneider
2
Como isso fica melhor, pelo que eu entendi se houver alguma outra tarefa capturada pelo processo, ela será interrompida de qualquer maneira, apenas uma exceção diferente será lançada.
marxin
Se eu usar worker_prefetch_multiplier = 1desde que tenho apenas algumas tarefas de longa execução, o encerramento deve funcionar - já que nenhuma outra tarefa será efetuada pelo encerramento - entendi corretamente? @spicyramen
maffe
1

Veja as seguintes opções para tarefas: time_limit , soft_time_limit (ou você pode defini-lo para trabalhadores). Se você deseja controlar não apenas o tempo de execução, consulte o argumento expires do método apply_async.

simplylizz
fonte