매일 지난 30일 간의 정보들만 Full refresh 형식으로 구현하게 됩니다.
먼저 라이브러리 설치
pip3 install yfinance
Extract/Transform: Yahoo Finance API 호출
Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱
기본으로 지난 한달의 주식 가격을 리턴해줌
import yfinance as yf
@task
def get_historical_prices(symbol):
# extract
ticket = yf.Ticker(symbol)
data = ticket.history() #pandas 데이터 프레임 형태로 리턴
records = []
# transform
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
Load: Redshift의 테이블을 업데이트
실행 데모
앞서 코드 실행
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol)
data = ticket.history()
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise # error를 raise
logging.info("load done")
with DAG(
dag_id = 'UpdateSymbol',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '0 10 * * *'
) as dag:
results = get_historical_prices("AAPL")
load("hajuny129", "stock_info", results)
Docker 스케쥴러와 워커 컨테이너에 로그인해서 yfinance 모듈 설치가 필요
docker container에 루트 유저로 로그인하는 방법
<aside> 💡 **구현 DAG의 세부 사항 - Incremental Update로 구현 기존 테이블 내용 임시 테이블로 복사 - 가져온 데이터 적재 - 임시 테이블 중복 제거
</aside>