1. Introdução ao Google Cloud Composer
O Objetivo deste tutorial, é passar os conceitos básicos de Cloud Composer / Airflow para orquestração de pipelines dedados construídos em Google Cloud Platform.
2. Passo a passo
Vamos supor que o ambiente do Airflow já foi criado para seu projeto do GCP. Digite composer na barra de pesquisa do GCP, que abre uma Interface conforme mostrado abaixo:
Por fim a execução da tarefa, com o comando :
bash_command = ‘bq extract–destination_format CSV –field_delimiter “;” –print_header=TRUEvtex.tbl_cie_food.
Extraio o conteúdo da tabela vtex.tbl_cie_food para um arquivo no format CSV , com header e delimitador ; (ponto e virgula).
Após a extração o arquivo é enviado para o bucket do projeto:
gs://cie_vtex/FOOD/QUARTA/CIE_FOOD_$(date +”%Y%m%d” ).csv’
Onde $( date +”%Y%m%d” ).csv criará arquivos com Ano/Mes/Dia a cada nova execução da DAG.
Salve este arquivo como <<file_name>> .py e coloque-o na pasta DAGs (esta pasta pode ser aberta clicando na opção DAGs na GUI e arrastando o arquivo para dentro da pasta.
Em seguida, abra o ambiente do Airflow clicando na opção Airflow
Nome: É o nome do seu projeto no qual o ambiente do Composer foi criado, este será anexado ao seu projeto. Se você deseja criar uma programação entre projetos, é necessário estabelecer uma conexão de projeto https://cloud.google.com/composer/docs/how-to/managing/connections
Airflow: Isso abrirá a interface de programação , onde você pode ver o fluxo de trabalho, acionar um fluxo de trabalho (DAG), verificar os registros etc.
DAGs: Directed Acyclic Graph é uma coleção de todas as tarefas que você deseja executar, organizadas de uma forma que reflita seus relacionamentos e dependências. Um DAG é definido em um script Python, que representa a estruturados DAGs (tarefas e suas dependências) como código.
DAG de amostra : vamos criar e entender um DAG de amostra, digamos que você queira criar e agendar um fluxo de trabalho, que pega uma determinada tabela e transforma em um arquivo . Csv e faz a entrega em um determinado Bucket , para ser consumido pela área solicitante.
—————————————– início do código ————————————————
importairflow
fromairflow
import DAG
fromairflow.operators.bash_operator import BashOperator
from datetime import datetime,timedelta
default_args = {
‘start_date’: datetime(2021, 4, 16),
‘retries’: 36,
‘retry_delay’:timedelta(minutes=5),
‘depends_on_past’: False,
}
dag = DAG(
‘FOOD_quarta,
default_args=default_args,
description=’FOOD_quarta,
schedule_interval=’0 9 * * WED,
catchup=False,
max_active_runs=1,
concurrency=3
)
FOOD_segunda = BashOperator(
task_id=’FOOD_quarta,
bash_command=’bq extract–destination_format CSV –field_delimiter “;”–print_header=TRUE vtex.tbl_cie_foodgs://cie_vtex/FOOD/QUARTA/CIE_FOOD_$( date +”%Y%m%d” ).csv’,
dag=dag,
depends_on_past=False)
FOOD_quarta
—————————————– fim do código————————————————
Em uma DAG, primeiro você precisa importar as bibliotecas que deseja usar, neste exemplo, importei os modelos de biblioteca padrão usados para executar o DAG, depois o operador from airflow.operators.bash_operator import BashOperator e ,em seguida, as bibliotecas from datetime import datetime, timedelta para agendamento .
Agora definimos nossos locais de intervalo e argumento padrão, em default_args para oDAG, como data de início, novas tentativas e há muitos outros que podem ser definidos conforme necessário, mas aqui defini apenas alguns que são necessários.
Agora o nome da DAG , sendo o schedule_interval o item mais importante para os agendamentos.
Neste caso o processo executa todas as quartas-feiras. ás 09:00 horas AM. Usei como apoio o link https://airflow.apache.org/docs/apache-airflow/1.10.1/scheduler.html novamente,há muitos parâmetros que podem ser definidos, usei apenas os que são necessários.
Clique nas DAG escolhidas, e isso abrirá os detalhes do fluxo de trabalho conforme mostrado abaixo, Existem vários estágios mencionados como agendado, ignorado,sucesso, falha, execução, etc. Aqui, todas as DAGs estão em sucesso, sendo mostrado como Verde Escuro.
Em Task instance details , todos os detalhes da execução
Então,isso é tudo para criar e agendar uma DAG. Há muito que você pode explorar e fazer por meio do Composer / Airflow. Este foi um exemplo simples e prático aplicado no ambiente de produção, em Google Cloud Platform.
Gostou do artigo? Leia agora nossos estudos sobre Funções Mais Legíveis em Javascript com Option Object.