세계 나라 정보 API 사용 DAG 작성
https://restcountries.com/에 가면 세부 사항을 찾을 수 있음
https://restcountries.com/v3/all를 호출하여 나라별로 다양한 정보를 얻을 수 있음
{"name": {"common": "South Korea", "official": "Republic of Korea", …
"area": 100210.0,
"population": 51780579, …}
Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할 것!
API 결과에서 아래 3개의 정보를 추출하여 Redshift에 각자 스키마 밑에 테이블 생성
단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것!
숙제는 개인 github에 repo를 만든 후 제출할 것!
Country DAG 코드 리뷰
import requests
@task
def extract_transform():
response = requests.get('<https://restcountries.com/v3/all>')
countries = response.json() # dictionary list
records = []
for country in countries:
name = country['name']['common']
population = country['population']
area = country['area']
records.append([name, population, area])
return records
with DAG(
dag_id = 'CountryInfo',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '30 6 * * 6' # 마지막 자릿수: 0 - Sunday, …, 6 - Saturday
) as dag:
results = extract_transform()
load("keeyong", "country_info", results)
-- 테이블은 아래와 같이 생성하면 된다.
CREATE TABLE {schema}.{table} (
name varchar(256) primary key, --DW가 PK를 보장하진 않고 옵티마이저에 힌트를 주는 정도
population int,
area float
);
Docker 환경에서 실행 데모 예시
코드 예시
위에는 없지만 롤백 이후 raise 코드를 넣어주는 것이 좋다 → 디버깅에 유리