Estou tentando mover arquivos s3 de um bloco "não excluído" (o que significa que não posso excluir os arquivos) para o GCS usando o fluxo de ar. Não posso garantir que novos arquivos estejam lá todos os dias, mas devo verificar se há novos arquivos todos os dias.
meu problema é a criação dinâmica de subdags. Se houver arquivos, preciso de subdags. Se não houver arquivos, não preciso de subdags. Meu problema é as configurações upstream / downstream. No meu código, ele detecta arquivos, mas não inicia os subdags como deveriam. Estou sentindo falta de algo.
aqui está o meu código:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
python
airflow
directed-acyclic-graphs
arcee123
fonte
fonte
spark
trabalhos ou algumpython
script e o que você está usando para executá-lo comolivy
outro métodofiles
uma lista vazia?Respostas:
Abaixo está a maneira recomendada de criar um DAG dinâmico ou sub-DAG no fluxo de ar, embora existam outras maneiras também, mas acho que isso seria amplamente aplicável ao seu problema.
Primeiro, crie um arquivo
(yaml/csv)
que inclua a lista de todos oss3
arquivos e locais. No caso de você ter escrito uma função para armazená-los na lista, eu diria que os armazene em umyaml
arquivo separado e carregue-o em tempo de execução no ambiente de fluxo de ar e, em seguida, crie DAGs.Abaixo está um
yaml
arquivo de exemplo :dynamicDagConfigFile.yaml
Você pode modificar sua
Check_For_Files
função para armazená-las em umyaml
arquivo.Agora podemos avançar para a criação dinâmica de dag:
Primeiro, defina duas tarefas usando operadores fictícios, a tarefa inicial e final. Tais tarefas são aquelas em que iremos desenvolver nossas
DAG
tarefas, criando dinamicamente tarefas entre elas:DAG dinâmico: usaremos
PythonOperators
no fluxo de ar. A função deve receber como argumentos o ID da tarefa; uma função python a ser executada, ou seja, o python_callable para o operador Python; e um conjunto de argumentos a serem usados durante a execução.Inclua um argumento the
task id
. Assim, podemos trocar dados entre tarefas geradas de maneira dinâmica, por exemplo, viaXCOM
.Você pode especificar sua função de operação dentro deste dag dinâmico
s3_to_gcs_op
.Finalmente, com base no local presente no arquivo yaml, você pode criar dags dinâmicos, primeiro leia o
yaml
arquivo como abaixo e crie o dag dinâmico:Definição final do DAG:
A ideia é que
Código completo do fluxo de ar em ordem:
fonte
upload_s3_toGCS
não existirá e ocorrerá um erro no fluxo de ar.yaml
arquivo depois que todos esses arquivos forem carregados no GCS, dessa forma, apenas novos arquivos estarão presentes noyaml
arquivo. E caso não haja novos arquivos, oyaml
arquivo estará vazio e nenhum dag dinâmico será criado. É por isso que oyaml
arquivo é uma opção muito melhor em comparação ao armazenamento de arquivos em uma lista.yaml
arquivo também ajudará a manter o log dos arquivos s3 de alguma maneira, se suponha que alguns dos arquivos s3 não sejam carregados no GCS, você também poderá manter um sinalizador correspondente a esse arquivo e tentar novamente na próxima execução do DAG.if
condição no DAG que verificará novos arquivos nosyaml
arquivos se houver novos arquivos executados, caso contrário, pule-o.