python으로 하는 일반적인 기능은 구현이 가능한 자유도가 높은 operator
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table': 'delighted_nps',
'schema': 'raw_data'
},
)
from airflow.exceptions import AirflowException
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
...
python operator에 정의된 params는 context 변수의 params key로 저장이 된다
2개의 태스크로 구성된 데이터 파이프라인 (DAG)
DAG 선언
Airflow의 DAG는 task들의 집합
하나의 Task는 하나의 Operator로 이루어짐
tag
를 통해 business owner를 적어놓는 것을 권장
tag는 여러 개 사용 가능
dag = DAG(
dag_id = 'my_first_dag',
start_date = datetime(2021,8,26),
catchup=False, # 미래에 활성화 시켜도 catchup 시키지 않음
tags=['example'],
schedule_interval = '0 2 * * *' # 2시 0분에 한번 도는 daily DAG
)
Task
print_hello
: PythonOperator로 구성되어 있으며 먼저 실행print_goodbye
: PythonOperator로 구성되어 있으며 두번째로 실행sudo su airflow로 사용자 전환을 한 후
/var/lib/airflow/dags
경로로 이동을 해서 아래 소스코드가 포함된 **HelloWorld.py 파일을 만든다**
성공적으로 만들면 위와 같이 명령어를 쳤을 때 저렇게 뜬다.
아래는 코드
## 전체 소스 코드
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'my_first_dag',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule_interval = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
Tree 뷰에서는 그 동안 실행되었던 record들이 나온다
Graph 탭에 들어가게 되면 Task 들 간의 순서가 graph 형식으로 표현이 된다
Airflow Decorator를 사용하게 되면 훨씬 프로그램이 더 직관적이게 된다.
소스코드
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
# 이 함수 이름이 태스크 ID가 됨
함수들 앞에 데코레이터를 통해서 annotation을 한다
함수를 정의하면 그 자체가 task가 된다
DAG 내 task 함수 실행 순서를 지정해주면 된다. 또한 task id를 별도로 설정해주지 않으면, task 함수명이 task id가 된다.
참고로
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
# 이 함수 이름이 태스크 ID가 됨
위 방식처럼 하는 것이 context manage 방법이라고 한다.
operator와 task는 dag에 할당돼야 실행이 가능한데
1. dag=DAG()는 dag id를 태스크마다 명시해줘서 할당해줘야 해요
2. 근데 with 절을 사용하면 해당 오퍼레이터에 자동으로 dag를 할당해줘서 간결하게 작성 가능해요
3. decorator는 여러번 호출시에 재사용 가능해서 용이해요
1,2,3 순서대로 airflow가 버전업 되면서 쓰기 편하게 새로 생기는 기능들이라고 생각하면 편하다.