Eu tenho um cenário como abaixo:
- Acione um
Task 1
eTask 2
somente quando novos dados estiverem disponíveis para eles na tabela de origem (Athena). O acionador da Tarefa1 e da Tarefa2 deve ocorrer quando uma nova parição de dados em um dia. - Disparar
Task 3
apenas após a conclusãoTask 1
eTask 2
- Acionar
Task 4
apenas a conclusão deTask 3
Meu código
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
Qual é a melhor maneira ideal de alcançá-lo?
Task1
eTask2
vai em loop. Para mim, os dados são carregados na tabela de fontes Athena 10 AM CET.Respostas:
Acredito que sua pergunta aborda dois grandes problemas:
schedule_interval
lo de maneira explícita, para que o @daily esteja configurando algo que você não está esperando.a resposta curta: defina explicitamente o seu schedule_interval com um formato de tarefa cron e use operadores de sensores para verificar periodicamente
onde
startime
é a hora em que sua tarefa diária começará,endtime
qual é a última hora do dia em que você deve verificar se um evento foi feito antes de sinalizar como falha epoke_time
é o intervalo em que vocêsensor_operator
verificará se o evento ocorreu.Como abordar o trabalho cron explicitamente sempre que você estiver configurando seu dag
@daily
como você fez:pelos documentos , você pode ver que está fazendo:
@daily - Run once a day at midnight
O que agora faz sentido por que você está recebendo um erro de tempo limite e falha após 5 minutos porque você definiu
'retries': 1
e'retry_delay': timedelta(minutes=5)
. Por isso, tenta executar o dag à meia-noite, falha. tenta novamente 5 minutos depois e falha novamente, portanto, sinaliza como falha.Então, basicamente, @daily run está definindo um trabalho cron implícito de:
O formato do trabalho cron é do formato abaixo e você define o valor para
*
sempre que quiser dizer "todos".Minute Hour Day_of_Month Month Day_of_Week
Então, @daily está dizendo basicamente executar isso a cada: minuto 0 hora 0 de todos os dias_de_mês de todos os meses de todos os dias_de_semana
Portanto, seu caso é executado a cada: minuto 0 hora 10 de todos os dias de todos os dias de todos os dias de uma semana. Isso é traduzido no formato de tarefa cron para:
Como acionar e tentar novamente corretamente a execução do dag quando você depende de um evento externo para concluir a execução
você poderia desencadear uma dag no fluxo de ar a partir de um evento externo, usando o comando
airflow trigger_dag
. isso seria possível se, de alguma maneira, você pudesse acionar um script de função / python lambda para direcionar sua instância de fluxo de ar.Se você não pode acionar o dag externamente, use um operador de sensor como o OP, defina um tempo-poke para ele e defina um número razoável de tentativas.
fonte
airflow trigger_dag
. isso seria possível se, de alguma maneira, você pudesse acionar um script de função / python lambda para direcionar sua instância de fluxo de ar.AwsGlueCatalogPartitionSensor
junto com o comando airflow{{ds_nodash}}
para as saídas da partição. Minha pergunta então como agendar isso.