Maneira adequada de criar fluxos de trabalho dinâmicos no Airflow

96

Problema

Existe alguma maneira no Airflow de criar um fluxo de trabalho de forma que o número de tarefas B. * seja desconhecido até a conclusão da Tarefa A? Eu olhei para subdags, mas parece que só pode funcionar com um conjunto estático de tarefas que devem ser determinadas na criação do Dag.

Os gatilhos dag funcionariam? E se sim, você poderia dar um exemplo.

Eu tenho um problema em que é impossível saber o número de tarefas B que serão necessárias para calcular a Tarefa C até que a Tarefa A seja concluída. Cada Tarefa B. * levará várias horas para ser computada e não pode ser combinada.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idéia # 1

Não gosto desta solução porque tenho que criar um ExternalTaskSensor bloqueador e toda a Tarefa B. * levará entre 2 e 24 horas para ser concluída. Portanto, não considero essa solução viável. Certamente existe uma maneira mais fácil? Ou o Airflow não foi projetado para isso?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Editar 1:

No momento, essa pergunta ainda não tem uma boa resposta . Tenho sido contactado por várias pessoas à procura de uma solução.

costrouc
fonte
Todas as tarefas B * são semelhantes, no sentido de que podem ser criadas em um loop?
Daniel Lee
Sim, todas as tarefas B. * podem ser criadas rapidamente em um loop assim que a Tarefa A for concluída. A tarefa A leva cerca de 2 horas para ser concluída.
costrouc
Você encontrou uma solução para o problema? você se importaria de postar talvez?
Daniel Dubovski
3
Um recurso útil para a ideia nº 1: linkedin.com/pulse/…
Juan Riaza
1
Aqui está um artigo que escrevi explicando como fazer isso linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine
Kyle Bridenstine

Respostas:

30

Aqui está como fiz isso com uma solicitação semelhante sem subdags:

Primeiro crie um método que retorna todos os valores que você deseja

def values_function():
     return values

Em seguida, crie o método que irá gerar os trabalhos dinamicamente:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

E então combine-os:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
Oleg Yamin
fonte
Onde os valores são definidos?
monksy
11
Em vez de, for i in values_function()eu esperaria algo parecido for i in push_func_output. O problema é que não consigo encontrar uma maneira de obter essa saída dinamicamente. A saída do PythonOperator estará no Xcom após a execução, mas não sei se posso fazer referência a ele na definição do DAG.
Ena
@Ena Você encontrou uma maneira de conseguir isso?
eldos de
1
@eldos veja minha resposta abaixo
Ena
1
E se tivéssemos que executar uma série de etapas dependentes de etapas dentro do loop? Haveria uma segunda cadeia de dependência dentro da groupfunção?
CodingInCircles de
12

Eu descobri uma maneira de criar fluxos de trabalho com base no resultado de tarefas anteriores.
Basicamente, o que você deseja fazer é ter duas subdags com o seguinte:

  1. Xcom envia uma lista (ou o que for necessário para criar o fluxo de trabalho dinâmico posteriormente) no subdag que é executado primeiro (consulte test1.py def return_list())
  2. Passe o objeto dag principal como um parâmetro para o seu segundo subdag
  3. Agora, se você tiver o objeto dag principal, poderá usá-lo para obter uma lista de suas instâncias de tarefa. A partir dessa lista de instâncias de tarefas, você pode filtrar uma tarefa da execução atual usando parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), provavelmente seria possível adicionar mais filtros aqui.
  4. Com essa instância de tarefa, você pode usar xcom pull para obter o valor que você precisa, especificando o dag_id para aquele do primeiro subdag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Use a lista / valor para criar suas tarefas dinamicamente

Agora eu testei isso na minha instalação de fluxo de ar local e funciona bem. Não sei se a parte xcom pull terá problemas se houver mais de uma instância do dag em execução ao mesmo tempo, mas provavelmente você usaria uma chave única ou algo assim para identificar exclusivamente o xcom valor que você deseja. Provavelmente, seria possível otimizar o passo 3. para ter 100% de certeza de obter uma tarefa específica do dag principal atual, mas para meu uso isso funciona bem o suficiente, acho que só é necessário um objeto task_instance para usar xcom_pull.

Além disso, eu limpo os xcoms para a primeira subdag antes de cada execução, apenas para ter certeza de que não recebo nenhum valor errado acidentalmente.

Sou muito ruim em explicar, então espero que o código a seguir deixe tudo claro:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

e o fluxo de trabalho principal:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
Christopher Beck
fonte
no Airflow 1.9, eles não carregaram quando adicionados à pasta DAG, e está faltando alguma coisa?
Anthony Keane
@AnthonyKeane você colocou test1.py e test2.py em uma pasta chamada subdags em sua pasta dag?
Christopher Beck
Eu fiz sim. Copiei os dois arquivos para subdags e coloquei o test.py na pasta dag, ainda obterá este erro. DAG quebrado: [/home/airflow/gcs/dags/test.py] Nenhum módulo denominado subdags.test1 Observação: estou usando o Google Cloud Composer (Airflow gerenciado do Google 1.9.0)
Anthony Keane
@AnthonyKeane é o único erro que você vê nos logs? DAG quebrado pode ser causado pelo subdag com um erro de compilação.
Christopher Beck
3
Olá @Christopher Beck, achei MEU erro que precisava adicionar _ _init_ _.pyà pasta subdags. erro de novato
Anthony Keane
8

Sim, é possível. Criei um DAG de exemplo que demonstra isso.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Antes de executar o DAG, crie essas três variáveis ​​de fluxo de ar

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

Você verá que o DAG parte deste

insira a descrição da imagem aqui

Para isso depois de executado

insira a descrição da imagem aqui

Você pode ver mais informações sobre este DAG em meu artigo sobre a criação de fluxos de trabalho dinâmicos no Airflow .

Kyle Bridenstine
fonte
1
Mas o que acontece se você tiver vários DagRun's deste DAG. Todos eles compartilham as mesmas variáveis?
Mar-k
1
Sim, eles usariam a mesma variável; Eu abordo isso no meu artigo no final. Você precisaria criar a variável dinamicamente e usar o id de execução dag no nome da variável. Meu exemplo é simples apenas para demonstrar a possibilidade dinâmica, mas você precisará torná-lo com qualidade de produção :)
Kyle Bridenstine
As pontes são necessárias ao criar tarefas dinâmicas? Leu seu artigo na íntegra momentaneamente, mas queria perguntar. Estou lutando para criar uma tarefa dinâmica baseada em uma tarefa upstream agora e estou começando a descobrir onde errei. Meu problema atual é que, por algum motivo, não consigo fazer com que o DAG sincronize com o DAG-Bag. Meu DAG sincronizou quando eu estava usando uma lista estática no módulo, mas parou quando troquei essa lista estática para ser construída a partir de uma tarefa upstream.
lucid_goose
6

OA: "Existe alguma maneira no Airflow de criar um fluxo de trabalho de forma que o número de tarefas B. * seja desconhecido até a conclusão da Tarefa A?"

A resposta curta é não. O Airflow criará o fluxo DAG antes de começar a executá-lo.

Dito isso, chegamos a uma conclusão simples, que não temos essa necessidade. Quando você deseja paralelizar algum trabalho, deve avaliar os recursos disponíveis e não o número de itens a processar.

Fizemos assim: geramos dinamicamente um número fixo de tarefas, digamos 10, que dividirão o trabalho. Por exemplo, se precisarmos processar 100 arquivos, cada tarefa processará 10 deles. Vou postar o código mais tarde hoje.

Atualizar

Aqui está o código, desculpe pelo atraso.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Explicação do código:

Aqui temos uma única tarefa inicial e uma única tarefa final (ambas fictícias).

Então, desde a tarefa inicial com o loop for, criamos 10 tarefas com o mesmo python que pode ser chamado. As tarefas são criadas na função create_dynamic_task.

Para cada python que pode ser chamado, passamos como argumentos o número total de tarefas paralelas e o índice da tarefa atual.

Suponha que você tenha 1000 itens para elaborar: a primeira tarefa receberá na entrada que deve elaborar o primeiro bloco de 10 blocos. Ele dividirá os 1000 itens em 10 blocos e elaborará o primeiro.

Ena
fonte
1
Esta é uma boa solução, contanto que você não precise de uma tarefa específica por item (como progresso, resultado, sucesso / falha,
novas
@Ena parallelTasknão está definida: estou faltando alguma coisa?
Anthony Keane
2
@AnthonyKeane É a função python que você deve chamar para realmente fazer algo. Conforme comentado no código, tomará como entrada o número total e o número atual para elaborar um bloco de elementos totais.
Ena
4

O que acho que você está procurando é a criação de DAG dinamicamente. Encontrei esse tipo de situação há alguns dias após algumas pesquisas que encontrei neste blog .

Geração de Tarefa Dinâmica

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Configurando o fluxo de trabalho DAG

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

É assim que nosso DAG se parece depois de colocar o código junto insira a descrição da imagem aqui

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

Foi uma grande esperança de ajuda também vai ajudar outra pessoa

Muhammad Bin Ali
fonte
Você conseguiu isso sozinho? Eu cansei. Mas eu falhei.
Newt
Sim, funcionou para mim. Que problema você está enfrentando?
Muhammad Bin Ali
1
Deixa comigo. Meu problema foi resolvido. Obrigado. Eu simplesmente não entendi a maneira certa de ler as variáveis ​​de ambiente nas imagens do docker.
Newt
e se os itens da tabela mudarem, portanto, não podemos colocá-los em um arquivo yaml estático?
FrankZhu
3

Acho que encontrei uma solução mais agradável para isso em https://github.com/mastak/airflow_multi_dagrun , que usa o enfileiramento simples de DagRuns disparando vários dagruns, semelhante a TriggerDagRuns . A maioria dos créditos vai para https://github.com/mastak , embora eu tenha que corrigir alguns detalhes para que funcione com o fluxo de ar mais recente.

A solução usa um operador personalizado que aciona vários DagRuns :

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

Você pode então enviar vários dagruns da função chamável em seu PythonOperator, por exemplo:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

Eu criei um fork com o código em https://github.com/flinz/airflow_multi_dagrun

flinz
fonte
3

O gráfico de tarefas não é gerado em tempo de execução. Em vez disso, o gráfico é construído quando é obtido pelo Airflow de sua pasta dags. Portanto, não será realmente possível ter um gráfico diferente para o trabalho toda vez que ele for executado. Você pode configurar um trabalho para construir um gráfico baseado em uma consulta no momento do carregamento . Esse gráfico permanecerá o mesmo para cada execução depois disso, o que provavelmente não é muito útil.

Você pode criar um gráfico que executa tarefas diferentes em cada execução com base nos resultados da consulta usando um operador de filial.

O que fiz foi pré-configurar um conjunto de tarefas e, em seguida, pegar os resultados da consulta e distribuí-los pelas tarefas. Isso provavelmente é melhor de qualquer maneira, porque se sua consulta retornar muitos resultados, você provavelmente não vai querer inundar o agendador com muitas tarefas simultâneas de qualquer maneira. Para ficar ainda mais seguro, também usei um pool para garantir que minha simultaneidade não saia do controle com uma consulta inesperadamente grande.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
podre
fonte
Observe que parece que pode ser possível criar subdags em tempo real como resultado de uma tarefa, no entanto, a maioria da documentação sobre subdags que eu achei altamente recomendável ficar longe desse recurso, pois ele causa mais problemas do que resolve na maioria dos casos. Eu vi sugestões de que subdags podem ser removidos como um recurso integrado em breve.
podre de
Observe também que, no for tasks in tasksloop do meu exemplo, excluo o objeto sobre o qual estou iterando. Essa é uma má ideia. Em vez disso, obtenha uma lista das chaves e repita - ou ignore as exclusões. Da mesma forma, se xcom_pull retornar None (em vez de uma lista ou lista vazia), o loop for também falha. Pode-se querer executar o xcom_pull antes do 'for', e então verificar se ele é Nenhum - ou certifique-se de que haja pelo menos uma lista vazia lá. YMMV. Boa sorte!
podre de
1
o que está no open_order_task?
alltej
Você está certo, isso é um erro de digitação no meu exemplo. Deve ser get_orders_task.set_upstream (). Eu resolvo isso.
podre
0

Não entende qual é o problema?

Aqui está um exemplo padrão. Agora, se na função subdag substituir for i in range(5):por for i in range(random.randint(0, 10)):, tudo funcionará. Agora imagine que o operador 'start' coloque os dados em um arquivo e, em vez de um valor aleatório, a função lerá esses dados. Então, o operador 'start' afetará o número de tarefas.

O problema estará apenas no display na IU, pois ao entrar no subdag, o número de tarefas será igual à última lida do arquivo / banco de dados / XCom no momento. O que automaticamente dá uma restrição em vários lançamentos de um dag ao mesmo tempo.

Denis Shcheglov
fonte
-1

Achei este post do Medium que é muito parecido com esta questão. No entanto, ele está cheio de erros de digitação e não funcionou quando tentei implementá-lo.

Minha resposta ao acima é a seguinte:

Se você estiver criando tarefas dinamicamente, deve fazê-lo iterando sobre algo que não foi criado por uma tarefa upstream ou pode ser definido independentemente dessa tarefa. Aprendi que você não pode passar datas de execução ou outras variáveis ​​de fluxo de ar para algo fora de um modelo (por exemplo, uma tarefa) como muitos outros apontaram antes. Veja também este post .

MarMat
fonte
Se você der uma olhada em meu comentário, verá que é realmente possível criar tarefas com base no resultado das tarefas anteriores.
Christopher Beck