tentando criar subdags dinâmicos a partir do pai dag com base na matriz de nomes de arquivos

10

Estou tentando mover arquivos s3 de um bloco "não excluído" (o que significa que não posso excluir os arquivos) para o GCS usando o fluxo de ar. Não posso garantir que novos arquivos estejam lá todos os dias, mas devo verificar se há novos arquivos todos os dias.

meu problema é a criação dinâmica de subdags. Se houver arquivos, preciso de subdags. Se não houver arquivos, não preciso de subdags. Meu problema é as configurações upstream / downstream. No meu código, ele detecta arquivos, mas não inicia os subdags como deveriam. Estou sentindo falta de algo.

aqui está o meu código:

from airflow import models
from  airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
    'owner': 'Airflow',
    'start_date': dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
    dag_id='My_Ingestion',
    default_args=args,
    schedule_interval='@daily',
    catchup=False
)

def Check_For_Files(**kwargs):
    s3 = S3Hook(aws_conn_id='S3_BOX')
    s3.get_conn()
    bucket = bucket
    LastBDEXDate = int(Variable.get("last_publish_date"))
    maxdate = LastBDEXDate
    files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
    for file in files:
        print(file)
        print(file.split("_")[-2])
        print(file.split("_")[-2][-8:])  ##proves I can see a date in the file name is ok.
        maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
    if maxdate > LastBDEXDate:
        return 'Start_Process'
    return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

    # dag
    subdag = models.DAG(dag_id=dag_id_child,
              default_args=args,
              schedule_interval=None)

    # operators
    s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
        task_id=dag_id_child,
        bucket=bucket,
        prefix=file_name,
        dest_gcs_conn_id='GCP_Account',
        dest_gcs='gs://my_files/To_Process/',
        replace=False,
        gzip=True,
        dag=subdag)


    return subdag

def create_subdag_operator(dag_parent, filename, index):
    tid_subdag = 'file_{}'.format(index)
    subdag = create_subdag(dag_parent, tid_subdag, filename)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

def create_subdag_operators(dag_parent, file_list):
    subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
    # chain subdag-operators together
    chain(*subdags)
    return subdags

check_for_files = BranchPythonOperator(
    task_id='Check_for_s3_Files',
    provide_context=True,
    python_callable=Check_For_Files,
    dag=parent_dag
)

finished = DummyOperator(
    task_id='finished',
    dag=parent_dag
)

decision_to_continue = DummyOperator(
    task_id='Start_Process',
    dag=parent_dag
)

if len(files) > 0:
    subdag_ops = create_subdag_operators(parent_dag, files)
    check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished
arcee123
fonte
Que tipo de trabalho é executado no back-end desses DAGS, são esses sparktrabalhos ou algum pythonscript e o que você está usando para executá-lo como livyoutro método
ashwin agrawal
Me desculpe, eu não entendo a pergunta. você pode por favor reafirmar?
arcee123 23/02
Quero dizer, você está usando apenas scripts python simples e não está usando nenhum trabalho de faísca, certo?
ashwin agrawal 23/02
Sim. operadores simples que são padrão no fluxo de ar. Desejo adicionar operadores existentes a uma taxa dinâmica com base em arquivos sinalizados no S3. Desejo inserir no GCS.
arcee123 24/02
Por que filesuma lista vazia?
Oluwafemi Sule

Respostas:

3

Abaixo está a maneira recomendada de criar um DAG dinâmico ou sub-DAG no fluxo de ar, embora existam outras maneiras também, mas acho que isso seria amplamente aplicável ao seu problema.

Primeiro, crie um arquivo (yaml/csv)que inclua a lista de todos os s3arquivos e locais. No caso de você ter escrito uma função para armazená-los na lista, eu diria que os armazene em um yamlarquivo separado e carregue-o em tempo de execução no ambiente de fluxo de ar e, em seguida, crie DAGs.

Abaixo está um yamlarquivo de exemplo : dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
    - File1: 'S3Loc1'
    - File2: 'S3Loc2'
    - File3: 'S3Loc3'

Você pode modificar sua Check_For_Filesfunção para armazená-las em um yamlarquivo.

Agora podemos avançar para a criação dinâmica de dag:

Primeiro, defina duas tarefas usando operadores fictícios, a tarefa inicial e final. Tais tarefas são aquelas em que iremos desenvolver nossas DAGtarefas, criando dinamicamente tarefas entre elas:

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

DAG dinâmico: usaremos PythonOperatorsno fluxo de ar. A função deve receber como argumentos o ID da tarefa; uma função python a ser executada, ou seja, o python_callable para o operador Python; e um conjunto de argumentos a serem usados ​​durante a execução.

Inclua um argumento the task id. Assim, podemos trocar dados entre tarefas geradas de maneira dinâmica, por exemplo, via XCOM.

Você pode especificar sua função de operação dentro deste dag dinâmico s3_to_gcs_op.

def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Finalmente, com base no local presente no arquivo yaml, você pode criar dags dinâmicos, primeiro leia o yamlarquivo como abaixo e crie o dag dinâmico:

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.

Definição final do DAG:

A ideia é que

#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks. 
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

Código completo do fluxo de ar em ordem:

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
ashwin agrawal
fonte
Muito obrigado. então um dos problemas que tive foi o que acontece se não houver novos arquivos? Um dos problemas que enfrento é que sempre haverá arquivos neste local, mas não garantimos que NOVOS arquivos sejam puxados, o que significa que a seção upload_s3_toGCSnão existirá e ocorrerá um erro no fluxo de ar.
arcee123 25/02
Você pode resolver o problema removendo os arquivos do yamlarquivo depois que todos esses arquivos forem carregados no GCS, dessa forma, apenas novos arquivos estarão presentes no yamlarquivo. E caso não haja novos arquivos, o yamlarquivo estará vazio e nenhum dag dinâmico será criado. É por isso que o yamlarquivo é uma opção muito melhor em comparação ao armazenamento de arquivos em uma lista.
ashwin agrawal 25/02
O yamlarquivo também ajudará a manter o log dos arquivos s3 de alguma maneira, se suponha que alguns dos arquivos s3 não sejam carregados no GCS, você também poderá manter um sinalizador correspondente a esse arquivo e tentar novamente na próxima execução do DAG.
ashwin agrawal 25/02
E se não houver novos arquivos, você poderá colocar uma ifcondição no DAG que verificará novos arquivos nos yamlarquivos se houver novos arquivos executados, caso contrário, pule-o.
ashwin agrawal 25/02
o problema aqui é que os downstreams estão definidos. se os downstreams forem definidos sem os trabalhos reais (porque não existem arquivos), ocorrerá um erro.
arcee123 26/02