from celery.app.control importInspect# Inspect all nodes.
i =Inspect()# Show the items that have an ETA or are scheduled for later processing
i.scheduled()# Show tasks that are currently active.
i.active()# Show tasks that have been claimed by workers
i.reserved()
Eu tentei isso, mas é muito lento (como 1 segundo). Estou usando de forma sincrônica em um aplicativo tornado para monitorar o progresso, por isso tem que ser rápido.
JulienFr
41
Isso não retornará uma lista de tarefas na fila que ainda precisam ser processadas.
Ed J
9
Use i.reserved()para obter uma lista de tarefas na fila.
Banana
4
Alguém já experimentou que i.reserved () não terá uma lista precisa de tarefas ativas? Tenho tarefas em execução que não aparecem na lista. Estou no django-aipo == 3.1.10
Seperman
6
Ao especificar o trabalhador Eu tive que usar uma lista como argumento: inspect(['celery@Flatty']). Enorme melhoria na velocidade inspect().
Adversus
42
Se você estiver usando rabbitMQ, use isto no terminal:
sudo rabbitmqctl list_queues
imprimirá uma lista de filas com o número de tarefas pendentes. por exemplo:
Estou familiarizado com isso quando tenho privilégios de sudo, mas quero que um usuário do sistema sem privilégios possa verificar - alguma sugestão?
sage
Além disso, você pode canalizar isso grep -e "^celery\s" | cut -f2para extrair que, 166se quiser processar esse número posteriormente, digamos para estatísticas.
Jamesc #
22
Se você não usa tarefas priorizadas, isso é bastante simples se você estiver usando o Redis. Para que a tarefa conte:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Porém, as tarefas priorizadas usam uma chave diferente em redis , portanto, a imagem completa é um pouco mais complicada. A imagem completa é que você precisa consultar o redis para todas as prioridades da tarefa. Em python (e do projeto Flower), isso se parece com:
PRIORITY_SEP ='\x06\x16'
DEFAULT_PRIORITY_STEPS =[0,3,6,9]def make_queue_name_for_pri(queue, pri):"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""if pri notin DEFAULT_PRIORITY_STEPS:raiseValueError('Priority not in priority steps')return'{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri)if pri else(queue,'','')))def get_queue_length(queue_name='celery'):"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names =[make_queue_name_for_pri(queue_name, pri)for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)return sum([r.llen(x)for x in priority_names])
Se você deseja obter uma tarefa real, pode usar algo como:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0-1
A partir daí, você precisará desserializar a lista retornada. No meu caso, consegui fazer isso com algo como:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],)
l = r.lrange('celery',0,-1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Apenas esteja avisado de que a desserialização pode demorar um pouco e você precisará ajustar os comandos acima para trabalhar com várias prioridades.
Atualizei o acima para lidar com tarefas priorizadas. Progresso!
Mlissner
1
Apenas para explicar, o DATABASE_NUMBERusado por padrão é 0e o QUEUE_NAMEé celery, portanto redis-cli -n 0 llen celery, retornará o número de mensagens na fila.
Vineet Bansal
Para o meu aipo, o nome da fila é em '{{{0}}}{1}{2}'vez de '{0}{1}{2}'. Fora isso, isso funciona perfeitamente!
Se você estiver usando o Celery + Django, a maneira mais simples de inspecionar tarefas usando comandos diretamente do seu terminal no seu ambiente virtual ou usando um caminho completo para o aipo:
Se você tem um projeto de definição, pode usarcelery -A my_proj inspect reserved
sashaboulouds
6
Uma solução de copiar e colar para Redis com serialização json:
def get_celery_queue_items(queue_name):import base64
import json
# Get a configured instance of a celery app:from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True)as conn:
tasks = conn.default_channel.client.lrange(queue_name,0,-1)
decoded_tasks =[]for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)return decoded_tasks
Funciona com o Django. Só não se esqueça de mudar yourproject.celery.
Se você estiver usando o serializador de pickle, poderá alterar a body =linha para body = pickle.loads(base64.b64decode(j['body'])).
Jim Hunziker
4
O módulo de inspeção de aipo parece estar ciente apenas das tarefas da perspectiva dos trabalhadores. Se você quiser visualizar as mensagens que estão na fila (ainda a serem puxadas pelos trabalhadores), sugiro usar o pyrabbit , que pode interagir com a API api rabbitmq http para recuperar todos os tipos de informações da fila.
Acho que a única maneira de obter as tarefas que estão aguardando é manter uma lista de tarefas iniciadas e deixar que a tarefa se remova da lista quando for iniciada.
Se o que você deseja inclui a tarefa que está sendo processada, mas ainda não foi concluída, você pode manter uma lista de tarefas e verificar seus estados:
from tasks import add
result = add.delay(4,4)
result.ready()# True if finished
Ou deixe o Celery armazenar os resultados com CELERY_RESULT_BACKEND e verifique quais de suas tarefas não estão lá.
@daveoncode Acho que não há informações suficientes para responder de maneira útil. Você pode abrir sua própria pergunta. Eu não acho que seria uma duplicata deste se você especificar que deseja recuperar as informações em python. Eu voltaria para stackoverflow.com/a/19465670/9843399 , que é o que baseei minha resposta, e certifique-se de que funcione primeiro.
Caleb Syring
@CalebSyring Esta é a primeira abordagem que realmente me mostra as tarefas na fila. Muito agradável. O único problema para mim é que a lista anexada parece não funcionar. Alguma idéia de como eu posso fazer a função de retorno de chamada escrever para a lista?
Varlor 07/07
@ Varlor Sinto muito, alguém fez uma edição imprópria da minha resposta. Você pode procurar no histórico de edições a resposta original, que provavelmente funcionará para você. Estou trabalhando para consertar isso. (Edição: Acabei de entrar e rejeitei a edição, que tinha um óbvio erro de python. Deixe-me saber se isso resolveu o seu problema ou não.)
Caleb Syring 07/07
@CalebSyring Agora eu usei seu código em uma classe, tendo a lista como um atributo de classe funciona!
Varlor 8/07
2
Tanto quanto eu sei, o Aipo não fornece API para examinar tarefas que estão aguardando na fila. Isso é específico do broker. Se você usar o Redis como um broker, por exemplo, examinar as tarefas que estão aguardando na celeryfila (padrão) é tão simples quanto:
conectar-se ao banco de dados do broker
listar itens na celerylista (comando LRANGE, por exemplo)
Lembre-se de que essas são tarefas que esperam serem escolhidas pelos trabalhadores disponíveis. Seu cluster pode ter algumas tarefas em execução - elas não estarão nesta lista, pois já foram selecionadas.
Cheguei à conclusão de que a melhor maneira de obter o número de trabalhos em uma fila é usar o rabbitmqctlque foi sugerido várias vezes aqui. Para permitir que qualquer usuário escolhido execute o comando, sudoeu segui as instruções aqui (pulei a edição da parte do perfil, pois não me importo de digitar sudo antes do comando.)
Também peguei o jamesc grepe o cutsnippet e o envolvi em chamadas de subprocesso.
from subprocess importPopen, PIPE
p1 =Popen(["sudo","rabbitmqctl","list_queues","-p","[name of your virtula host"], stdout=PIPE)
p2 =Popen(["grep","-e","^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 =Popen(["cut","-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()print("number of jobs on queue: %i"% int(p3.communicate()[0]))
from celery.task.control import inspect
def key_in_list(k, l):return bool([Truefor i in l if k in i.values()])def check_task(task_id):
task_value_dict = inspect().active().values()for task_list in task_value_dict:if self.key_in_list(task_id, task_list):returnTruereturnFalse
Se você controlar o código das tarefas, poderá solucionar o problema, deixando uma tarefa disparar uma nova tentativa trivial na primeira vez em que for executada e, em seguida, verificando inspect().reserved(). A nova tentativa registra a tarefa com o resultado final e o aipo pode ver isso. A tarefa deve aceitar selfou contextcomo primeiro parâmetro para que possamos acessar a contagem de novas tentativas.
Respostas:
EDIT: consulte outras respostas para obter uma lista de tarefas na fila.
Você deve procurar aqui: Guia do Aipo - Inspecionando Trabalhadores
Basicamente isso:
Dependendo do que você quer
fonte
i.reserved()
para obter uma lista de tarefas na fila.inspect(['celery@Flatty'])
. Enorme melhoria na velocidadeinspect()
.Se você estiver usando rabbitMQ, use isto no terminal:
imprimirá uma lista de filas com o número de tarefas pendentes. por exemplo:
o número na coluna da direita é o número de tarefas na fila. acima, a fila de aipo tem 166 tarefas pendentes.
fonte
grep -e "^celery\s" | cut -f2
para extrair que,166
se quiser processar esse número posteriormente, digamos para estatísticas.Se você não usa tarefas priorizadas, isso é bastante simples se você estiver usando o Redis. Para que a tarefa conte:
Porém, as tarefas priorizadas usam uma chave diferente em redis , portanto, a imagem completa é um pouco mais complicada. A imagem completa é que você precisa consultar o redis para todas as prioridades da tarefa. Em python (e do projeto Flower), isso se parece com:
Se você deseja obter uma tarefa real, pode usar algo como:
A partir daí, você precisará desserializar a lista retornada. No meu caso, consegui fazer isso com algo como:
Apenas esteja avisado de que a desserialização pode demorar um pouco e você precisará ajustar os comandos acima para trabalhar com várias prioridades.
fonte
DATABASE_NUMBER
usado por padrão é0
e oQUEUE_NAME
écelery
, portantoredis-cli -n 0 llen celery
, retornará o número de mensagens na fila.'{{{0}}}{1}{2}'
vez de'{0}{1}{2}'
. Fora isso, isso funciona perfeitamente!Para recuperar tarefas do back-end, use este
fonte
Se você estiver usando o Celery + Django, a maneira mais simples de inspecionar tarefas usando comandos diretamente do seu terminal no seu ambiente virtual ou usando um caminho completo para o aipo:
Doc : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
Além disso, se você estiver usando o Aipo + RabbitMQ, poderá inspecionar a lista de filas usando o seguinte comando:
Mais informações : https://linux.die.net/man/1/rabbitmqctl
fonte
celery -A my_proj inspect reserved
Uma solução de copiar e colar para Redis com serialização json:
Funciona com o Django. Só não se esqueça de mudar
yourproject.celery
.fonte
body =
linha parabody = pickle.loads(base64.b64decode(j['body']))
.O módulo de inspeção de aipo parece estar ciente apenas das tarefas da perspectiva dos trabalhadores. Se você quiser visualizar as mensagens que estão na fila (ainda a serem puxadas pelos trabalhadores), sugiro usar o pyrabbit , que pode interagir com a API api rabbitmq http para recuperar todos os tipos de informações da fila.
Um exemplo pode ser encontrado aqui: Recuperar o comprimento da fila com o Aipo (RabbitMQ, Django)
fonte
Acho que a única maneira de obter as tarefas que estão aguardando é manter uma lista de tarefas iniciadas e deixar que a tarefa se remova da lista quando for iniciada.
Com rabbitmqctl e list_queues, você pode obter uma visão geral de quantas tarefas estão aguardando, mas não as tarefas em si: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
Se o que você deseja inclui a tarefa que está sendo processada, mas ainda não foi concluída, você pode manter uma lista de tarefas e verificar seus estados:
Ou deixe o Celery armazenar os resultados com CELERY_RESULT_BACKEND e verifique quais de suas tarefas não estão lá.
fonte
Isso funcionou para mim no meu aplicativo:
active_jobs
será uma lista de cadeias que correspondem às tarefas na fila.Não se esqueça de trocar CELERY_APP_INSTANCE pelos seus.
Obrigado a @ashish por me apontar na direção certa com sua resposta aqui: https://stackoverflow.com/a/19465670/9843399
fonte
jobs
é sempre zero ... alguma ideia?Tanto quanto eu sei, o Aipo não fornece API para examinar tarefas que estão aguardando na fila. Isso é específico do broker. Se você usar o Redis como um broker, por exemplo, examinar as tarefas que estão aguardando na
celery
fila (padrão) é tão simples quanto:celery
lista (comando LRANGE, por exemplo)Lembre-se de que essas são tarefas que esperam serem escolhidas pelos trabalhadores disponíveis. Seu cluster pode ter algumas tarefas em execução - elas não estarão nesta lista, pois já foram selecionadas.
fonte
Cheguei à conclusão de que a melhor maneira de obter o número de trabalhos em uma fila é usar o
rabbitmqctl
que foi sugerido várias vezes aqui. Para permitir que qualquer usuário escolhido execute o comando,sudo
eu segui as instruções aqui (pulei a edição da parte do perfil, pois não me importo de digitar sudo antes do comando.)Também peguei o jamesc
grep
e ocut
snippet e o envolvi em chamadas de subprocesso.fonte
fonte
Se você controlar o código das tarefas, poderá solucionar o problema, deixando uma tarefa disparar uma nova tentativa trivial na primeira vez em que for executada e, em seguida, verificando
inspect().reserved()
. A nova tentativa registra a tarefa com o resultado final e o aipo pode ver isso. A tarefa deve aceitarself
oucontext
como primeiro parâmetro para que possamos acessar a contagem de novas tentativas.Esta solução é independente do corretor, ou seja. você não precisa se preocupar se está usando o RabbitMQ ou o Redis para armazenar as tarefas.
EDIT: após o teste, descobri que isso é apenas uma solução parcial. O tamanho do reservado é limitado à configuração de pré-busca para o trabalhador.
fonte
Com
subprocess.run
:Cuidado para mudar
my_proj
comyour_proj
fonte