Problema
Existe alguma maneira no Airflow de criar um fluxo de trabalho de forma que o número de tarefas B. * seja desconhecido até a conclusão da Tarefa A? Eu olhei para subdags, mas parece que só pode funcionar com um conjunto estático de tarefas que devem ser determinadas na criação do Dag.
Os gatilhos dag funcionariam? E se sim, você poderia dar um exemplo.
Eu tenho um problema em que é impossível saber o número de tarefas B que serão necessárias para calcular a Tarefa C até que a Tarefa A seja concluída. Cada Tarefa B. * levará várias horas para ser computada e não pode ser combinada.
|---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
| .... |
|---> Task B.N --|
Idéia # 1
Não gosto desta solução porque tenho que criar um ExternalTaskSensor bloqueador e toda a Tarefa B. * levará entre 2 e 24 horas para ser concluída. Portanto, não considero essa solução viável. Certamente existe uma maneira mais fácil? Ou o Airflow não foi projetado para isso?
Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C
Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
| .... |
|-- Task B.N --|
Editar 1:
No momento, essa pergunta ainda não tem uma boa resposta . Tenho sido contactado por várias pessoas à procura de uma solução.
Respostas:
Aqui está como fiz isso com uma solicitação semelhante sem subdags:
Primeiro crie um método que retorna todos os valores que você deseja
Em seguida, crie o método que irá gerar os trabalhos dinamicamente:
E então combine-os:
fonte
for i in values_function()
eu esperaria algo parecidofor i in push_func_output
. O problema é que não consigo encontrar uma maneira de obter essa saída dinamicamente. A saída do PythonOperator estará no Xcom após a execução, mas não sei se posso fazer referência a ele na definição do DAG.group
função?Eu descobri uma maneira de criar fluxos de trabalho com base no resultado de tarefas anteriores.
Basicamente, o que você deseja fazer é ter duas subdags com o seguinte:
def return_list()
)parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
), provavelmente seria possível adicionar mais filtros aqui.dag_id='%s.%s' % (parent_dag_name, 'test1')
Agora eu testei isso na minha instalação de fluxo de ar local e funciona bem. Não sei se a parte xcom pull terá problemas se houver mais de uma instância do dag em execução ao mesmo tempo, mas provavelmente você usaria uma chave única ou algo assim para identificar exclusivamente o xcom valor que você deseja. Provavelmente, seria possível otimizar o passo 3. para ter 100% de certeza de obter uma tarefa específica do dag principal atual, mas para meu uso isso funciona bem o suficiente, acho que só é necessário um objeto task_instance para usar xcom_pull.
Além disso, eu limpo os xcoms para a primeira subdag antes de cada execução, apenas para ter certeza de que não recebo nenhum valor errado acidentalmente.
Sou muito ruim em explicar, então espero que o código a seguir deixe tudo claro:
test1.py
test2.py
e o fluxo de trabalho principal:
test.py
fonte
_ _init_ _.py
à pasta subdags. erro de novatoSim, é possível. Criei um DAG de exemplo que demonstra isso.
Antes de executar o DAG, crie essas três variáveis de fluxo de ar
Você verá que o DAG parte deste
Para isso depois de executado
Você pode ver mais informações sobre este DAG em meu artigo sobre a criação de fluxos de trabalho dinâmicos no Airflow .
fonte
OA: "Existe alguma maneira no Airflow de criar um fluxo de trabalho de forma que o número de tarefas B. * seja desconhecido até a conclusão da Tarefa A?"
A resposta curta é não. O Airflow criará o fluxo DAG antes de começar a executá-lo.
Dito isso, chegamos a uma conclusão simples, que não temos essa necessidade. Quando você deseja paralelizar algum trabalho, deve avaliar os recursos disponíveis e não o número de itens a processar.
Fizemos assim: geramos dinamicamente um número fixo de tarefas, digamos 10, que dividirão o trabalho. Por exemplo, se precisarmos processar 100 arquivos, cada tarefa processará 10 deles. Vou postar o código mais tarde hoje.
Atualizar
Aqui está o código, desculpe pelo atraso.
Explicação do código:
Aqui temos uma única tarefa inicial e uma única tarefa final (ambas fictícias).
Então, desde a tarefa inicial com o loop for, criamos 10 tarefas com o mesmo python que pode ser chamado. As tarefas são criadas na função create_dynamic_task.
Para cada python que pode ser chamado, passamos como argumentos o número total de tarefas paralelas e o índice da tarefa atual.
Suponha que você tenha 1000 itens para elaborar: a primeira tarefa receberá na entrada que deve elaborar o primeiro bloco de 10 blocos. Ele dividirá os 1000 itens em 10 blocos e elaborará o primeiro.
fonte
parallelTask
não está definida: estou faltando alguma coisa?O que acho que você está procurando é a criação de DAG dinamicamente. Encontrei esse tipo de situação há alguns dias após algumas pesquisas que encontrei neste blog .
Geração de Tarefa Dinâmica
Configurando o fluxo de trabalho DAG
É assim que nosso DAG se parece depois de colocar o código junto
Foi uma grande esperança de ajuda também vai ajudar outra pessoa
fonte
Acho que encontrei uma solução mais agradável para isso em https://github.com/mastak/airflow_multi_dagrun , que usa o enfileiramento simples de DagRuns disparando vários dagruns, semelhante a TriggerDagRuns . A maioria dos créditos vai para https://github.com/mastak , embora eu tenha que corrigir alguns detalhes para que funcione com o fluxo de ar mais recente.
A solução usa um operador personalizado que aciona vários DagRuns :
Você pode então enviar vários dagruns da função chamável em seu PythonOperator, por exemplo:
Eu criei um fork com o código em https://github.com/flinz/airflow_multi_dagrun
fonte
O gráfico de tarefas não é gerado em tempo de execução. Em vez disso, o gráfico é construído quando é obtido pelo Airflow de sua pasta dags. Portanto, não será realmente possível ter um gráfico diferente para o trabalho toda vez que ele for executado. Você pode configurar um trabalho para construir um gráfico baseado em uma consulta no momento do carregamento . Esse gráfico permanecerá o mesmo para cada execução depois disso, o que provavelmente não é muito útil.
Você pode criar um gráfico que executa tarefas diferentes em cada execução com base nos resultados da consulta usando um operador de filial.
O que fiz foi pré-configurar um conjunto de tarefas e, em seguida, pegar os resultados da consulta e distribuí-los pelas tarefas. Isso provavelmente é melhor de qualquer maneira, porque se sua consulta retornar muitos resultados, você provavelmente não vai querer inundar o agendador com muitas tarefas simultâneas de qualquer maneira. Para ficar ainda mais seguro, também usei um pool para garantir que minha simultaneidade não saia do controle com uma consulta inesperadamente grande.
fonte
for tasks in tasks
loop do meu exemplo, excluo o objeto sobre o qual estou iterando. Essa é uma má ideia. Em vez disso, obtenha uma lista das chaves e repita - ou ignore as exclusões. Da mesma forma, se xcom_pull retornar None (em vez de uma lista ou lista vazia), o loop for também falha. Pode-se querer executar o xcom_pull antes do 'for', e então verificar se ele é Nenhum - ou certifique-se de que haja pelo menos uma lista vazia lá. YMMV. Boa sorte!open_order_task
?Não entende qual é o problema?
Aqui está um exemplo padrão. Agora, se na função subdag substituir
for i in range(5):
porfor i in range(random.randint(0, 10)):
, tudo funcionará. Agora imagine que o operador 'start' coloque os dados em um arquivo e, em vez de um valor aleatório, a função lerá esses dados. Então, o operador 'start' afetará o número de tarefas.O problema estará apenas no display na IU, pois ao entrar no subdag, o número de tarefas será igual à última lida do arquivo / banco de dados / XCom no momento. O que automaticamente dá uma restrição em vários lançamentos de um dag ao mesmo tempo.
fonte
Achei este post do Medium que é muito parecido com esta questão. No entanto, ele está cheio de erros de digitação e não funcionou quando tentei implementá-lo.
Minha resposta ao acima é a seguinte:
Se você estiver criando tarefas dinamicamente, deve fazê-lo iterando sobre algo que não foi criado por uma tarefa upstream ou pode ser definido independentemente dessa tarefa. Aprendi que você não pode passar datas de execução ou outras variáveis de fluxo de ar para algo fora de um modelo (por exemplo, uma tarefa) como muitos outros apontaram antes. Veja também este post .
fonte