Summary Table 구현 Airflow+Redshift로 간단한 ELT 구현을 알아보자


Summary table: 써머리 테이블 예

MAU 써머리 테이블 생성 DAG

  TO_CHAR(A.ts, 'YYYY-MM') AS month,
  COUNT(DISTINCT B.userid) AS mau
FROM raw_data.session_timestamp A
JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid

사용자별 Channel 정보를 요약해주는 정보 추가 DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta

from airflow import AirflowException

import requests
import logging
import psycopg2

from airflow.exceptions import AirflowException

def get_Redshift_connection():
    hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
    return hook.get_conn().cursor()

def execSQL(**context):

    schema = context['params']['schema'] 
    table = context['params']['table']
    select_sql = context['params']['sql']


    cur = get_Redshift_connection()

    sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
    sql += select_sql

    cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
    count = cur.fetchone()[0]
    if count == 0:
        raise ValueError(f"{schema}.{table} didn't have any record")

        sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
        sql += "COMMIT;"
    except Exception as e:
        logging.error('Failed to sql. Completed ROLLBACK!')
        raise AirflowException("")

dag = DAG(
    dag_id = "Build_Summary_channel",
    start_date = datetime(2021,12,10),
    schedule_interval = '@once',
    catchup = False

execsql = PythonOperator(
    task_id = 'execsql',
    python_callable = execSQL,
    params = {
        'schema' : 'hajuny129',
        'table': 'channel_summary',
        'sql' : """SELECT
    provide_context = True,
    dag = dag

CTAS 부분을 아예 별도의 환경설정 파일로 떼어내면 어떨까?

일별 NPS 계산해보자