Como acionar uma tarefa Airflow somente quando novas partições / dados estão disponíveis na tabela AWS athena usando o DAG em python?

9

Eu tenho um cenário como abaixo:

  1. Acione um Task 1e Task 2somente quando novos dados estiverem disponíveis para eles na tabela de origem (Athena). O acionador da Tarefa1 e da Tarefa2 deve ocorrer quando uma nova parição de dados em um dia.
  2. Disparar Task 3apenas após a conclusão Task 1eTask 2
  3. Acionar Task 4apenas a conclusão deTask 3

insira a descrição da imagem aqui

Meu código

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Qual é a melhor maneira ideal de alcançá-lo?

pankaj
fonte
você está tendo problemas com esta solução?
Bernardo stearns reisen
@ Bernardostearnsreisen, Às vezes, o Task1e Task2vai em loop. Para mim, os dados são carregados na tabela de fontes Athena 10 AM CET.
pankaj 21/04
Indo em um loop, o fluxo de ar tenta novamente a Tarefa1 e a Tarefa2 várias vezes até que seja bem-sucedida
Bernardo stearns reisen
@Bernardostearnsreisen, yup exatamente
pankaj
11
@Bernardostearnsreisen, eu não sabia como premiar a recompensa :)
pankaj

Respostas:

1

Acredito que sua pergunta aborda dois grandes problemas:

  1. esquecendo de configurá- schedule_intervallo de maneira explícita, para que o @daily esteja configurando algo que você não está esperando.
  2. Como acionar e tentar novamente corretamente a execução do dag quando você depende de um evento externo para concluir a execução

a resposta curta: defina explicitamente o seu schedule_interval com um formato de tarefa cron e use operadores de sensores para verificar periodicamente

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

onde startimeé a hora em que sua tarefa diária começará, endtimequal é a última hora do dia em que você deve verificar se um evento foi feito antes de sinalizar como falha e poke_timeé o intervalo em que você sensor_operatorverificará se o evento ocorreu.

Como abordar o trabalho cron explicitamente sempre que você estiver configurando seu dag@dailycomo você fez:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

pelos documentos , você pode ver que está fazendo: @daily - Run once a day at midnight

O que agora faz sentido por que você está recebendo um erro de tempo limite e falha após 5 minutos porque você definiu 'retries': 1e 'retry_delay': timedelta(minutes=5). Por isso, tenta executar o dag à meia-noite, falha. tenta novamente 5 minutos depois e falha novamente, portanto, sinaliza como falha.

Então, basicamente, @daily run está definindo um trabalho cron implícito de:

@daily -> Run once a day at midnight -> 0 0 * * *

O formato do trabalho cron é do formato abaixo e você define o valor para *sempre que quiser dizer "todos".

Minute Hour Day_of_Month Month Day_of_Week

Então, @daily está dizendo basicamente executar isso a cada: minuto 0 hora 0 de todos os dias_de_mês de todos os meses de todos os dias_de_semana

Portanto, seu caso é executado a cada: minuto 0 hora 10 de todos os dias de todos os dias de todos os dias de uma semana. Isso é traduzido no formato de tarefa cron para:

0 10 * * *

Como acionar e tentar novamente corretamente a execução do dag quando você depende de um evento externo para concluir a execução

  1. você poderia desencadear uma dag no fluxo de ar a partir de um evento externo, usando o comando airflow trigger_dag. isso seria possível se, de alguma maneira, você pudesse acionar um script de função / python lambda para direcionar sua instância de fluxo de ar.

  2. Se você não pode acionar o dag externamente, use um operador de sensor como o OP, defina um tempo-poke para ele e defina um número razoável de tentativas.

Bernardo stearns reisen
fonte
Obrigado por isso. Além disso, se eu quiser acionar as tarefas com base no evento, e não no tempo, ou seja, assim que uma nova partição de dados estiver disponível na fonte, a próxima tarefa de origem `AWS Athena Tables` deve ser acionada. Então, como faço para agendar. O meu código atual é suficiente?
pankaj 22/04
@pankaj, vejo apenas duas alternativas. Eu não sei muito sobre AWS Athena, mas você poderia desencadear uma dag no fluxo de ar a partir de um evento externo, usando o comando airflow trigger_dag. isso seria possível se, de alguma maneira, você pudesse acionar um script de função / python lambda para direcionar sua instância de fluxo de ar.
Bernardo stearns reisen
a outra alternativa é mais ou menos o que você está fazendo, porque você não possui um gatilho baseado em evento, precisa verificar periodicamente se esse evento aconteceu. Portanto, o uso desta solução atual definiria uma tarefa cron por um intervalo de horas, executar o dag em uma alta frequência de minutos ... muitos falharão, mas serão capazes de capturar rapidamente após o evento acontecer
Bernardo stearns reisen
@ Bernado, eu descobri o pacote no Airflow chamado AwsGlueCatalogPartitionSensorjunto com o comando airflow {{ds_nodash}}para as saídas da partição. Minha pergunta então como agendar isso.
pankaj 22/04
@Benado, você pode dar uma olhada no meu código onde eu implementei a verificação acima mencionada e dar suas entradas
pankaj