1. Redshift 웨어하우스를 대상으로 구성

데이터 웨어하우스로 Amazon Redshift를 사용하는 경우, 추출 후 데이터를 로드하기 위해 S3와 통합하는 것은 매우 간단합니다. 다음 단계를 따릅니다.

  1. IAM역할이 아직 없는 경우 생성합니다.

    [AWS] S3 생성하기 + IAM 설정하기

  2. 생성한 IAM 역할을 Redshift 클러스터와 연결합니다.

    AWS Redshift Cluster 생성하기

  3. redshift 자격증명과 방금생성한 IAM 이름을 사용하여 pipline.conf파일에 하나의 섹션을 추가합니다. AWS Redshift 콘솔에서 Redshift 클러스터 연결 정보를 찾을 수 있습니다.

    Untitled

2. Redshift 웨어하우스에 데이터 로드

copy 명령어를 통해 로드

Redshift클러스터를 쿼리하는데 사용하는 SQL 클라이언트나 Boto3 라이브러리를 사용하는 파이썬 스크립트에서 SQL문으로 실행할 수 있습니다. COPY는 로드중인 데이터를 대상 테이블의 기존 행에 추가합니다.

COPY table_name 
[ column_list ]
FROM source_file
authorization
[ [ FORMAT ] [ AS ] data_format ]
[ parameter [ argument ] [, .. ] ]

모든 [] 항목은 선택사항입니다.

이를 4장에서 지정된 IAM 역할 권한 부여와 S3 버킷의 파일을 사용하면, 그리고 열 순서를 지정하여 SQL 클라이언트에서 실행할 때 다음과 같습니다.

COPY my_schema.my_table (column_1, column_2, .... )
FROM 's3://bucket-name/file.csv'
iam_role 'arn:aws:iam::222:role/RedshiftloadRole';

파이썬 스크립트에서 Boto3라이브러리를 사용해 COPY구현하기

  1. Redshift클러스터에서 copy 명령을 실행하기 위해 psycopg2를, pipline.conf파일을 읽기위해 configparser라이브러리를 가져옵니다.

    import boto3 
    import configparser 
    import psycopg2
    
  2. psycopg2.connect 함수와 pipline.conf파일에 저장된 자격증명을 사용하여 redshift클러스터에 연결

    parser = configparser.ConfigParser() 
    parser.read("pipeline.conf") 
    dbname = parser.get("aws_creds", "database") 
    user = parser.get("aws一creds","username") 
    password = parser. get( "aws_creds" , "password" ) 
    host = parser,get("aws_creds", "host") 
    port = parser.get("aws_creds", "port") 
    
    # redshift 클러스터에 연결 
    rs_conn = psycopg2.connect( "dbname=" + dbname
    	+	" user=" + user
    	+	" password=" + password
    	+	" host=" + host
    	+	" port=" + port)
    
  3. 이제 psycopg2 Cursor객체를 사용하여 copy 명령을 사용할 수 있다. copy명령을 실행하되 pipline.conf파일에서 해당 값을 불러옵니다.

    # account_id 및 iam_role을 로드
    parser = configparser.ConfigParser()
    parser.read("pipeline.conf")
    account_id = parser.get( "aws_boto_credentials" ,"account_id " )
    iam_role = parser.get("aws_creds", "iam_role")
    bucket_name = parser.get("aws_boto_credentials","bucket_name")
    
    # Redshift에 파일을 로드하기 위해 COPY 명령을 실행
    filepath = ( "s3: //"
    	+ bucket_name
    	+ "/order_extract.csv" )
    role_string = ("arn:aws: iam: :"
    	+ account_id
    	+ " : role/" + iam_role )
    
    sql = "COPY public.Orders"
    sql = sql + " from %s "
    sql = sql + " iam_role %s;"
    
    # cursor 객체를 생성하고 COPY를 실행
    cur = rs_conn . cursor()
    cur.execute(sql,(file_path, role_string))
    
    # cursor를 총료하고 트랜잭션을 커 밋
    cur . close()
    rs_conn . commit ()
    
    # 연결욜 종료
    rs_conn . close ()
    

전체로드 및 증분