s3에 올려둔 csv 데이터를 가져와서 pandas로 데이터프레임 형식으로 데이터를 읽은 후, 원하는 테이블에 import하고 싶었다.
일단 boto3을 사용하여 s3 데이터를 불러온다.
s3 데이터를 가져오기 위해선, 접근 키 아이디와 비밀번호, 그리고 지역이 필요하다.
접근 키 아이디와 비밀번호는 AWS 계정을 생성한 후 IAM에서 사용자를 생성하는 과정에서 확인할 수 있다. 참고로 IAM은 AWS 리소스에 대한 접근을 안전하게 제어할 수 있는 서비스다.
아무튼 IAM을 생성하고 나서, 해당 키를 csv 파일로 보관해놨기 때문에 쉽게 찾을 수 있었다. 다운을 안받았다면 AWS 콘솔에서 IAM을 검색하고 사용하면 된다.
지역도 같이 확인한다. 혹시 지역이 정확히 기억이 안난다면, 다음 페이지를 확인하자.
https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
필자의 경우 Asia Pacific (Seoul)로 지역이 설정되어있었기 때문에, ap-northeast-2를 가져왔다.
이제 코드를 보자.
import boto3
access_key_id = "발급받은 키 id"
access_key_pw = "발급받은 키 pw"
region = "지역" # ap-northeast-2
# create s3 client
s3 = boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=access_key_pw, region_name=region)
연결이 잘 됐다면, 이제 s3의 버킷명과 원하는 파일의 경로 및 파일명을 가져온다.
s3_bucket_name = "버킷 이름"
s3_file_path = "파일 경로 및 파일 이름" # "data/info.csv" 와 같은 형식
obj = s3.get_object(Bucket=s3_bucket_name, Key=s3_file_path)
df = pd.read_csv(io.BytesIO(obj['Body'].read())
print(df) # check database
데이터를 잘 받아왔으면, 이제 데이터베이스 엔진을 연결하고, to_sql 함수를 이용해 데이터프레임을 바로 원하는 테이블에 집어넣는다.
engine = create_enging(Config.SQLALCHEMY_DATABASE_URI)
conn = engine.connect()
df.to_sql(name='TB_TEST', con=engine, if_exists='replace', index=False)
필자같은 경우엔 Config 파일에 db 관련 정보를 다 담아놨기 떄문에, Config.SQL_DATABSE_URI 변수를 불러와 사용했다.
후, conn 변수에 engine과 연결한 것을 담아두고, 이를 to_sql 할 때 사용했다.
그리고 to_sql의 name은 저장할 테이블명이다.
만약, 테이블이 있는 경우에 대치(replace)하고 싶다면 위의 코드처럼 if_exists에 replace를 입력한다. 추가적으로 덧붙이고 싶을 땐 append를 사용하는데, 더 자세한 정보는 공식 문서를 참조하자.
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
🗒 전체 코드
mport boto3
access_key_id = "발급받은 키 id"
access_key_pw = "발급받은 키 pw"
region = "지역" # ap-northeast-2
# create s3 client
s3 = boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=access_key_pw, region_name=region)
s3_bucket_name = "버킷 이름"
s3_file_path = "파일 경로 및 파일 이름"
obj = s3.get_object(Bucket=s3_bucket_name, Key=s3_file_path)
df = pd.read_csv(io.BytesIO(obj['Body'].read()))
print(df)
engine = create_engine(Config.SQLALCHEMY_DATABASE_URI)
conn = engine.connect()
df.to_sql(name='TB_TEST', con=engine, if_exists='replace', index=False)
'개발 관련 > etc' 카테고리의 다른 글
IntelliJ에서 java와 python 같이 사용하기 (0) | 2022.08.04 |
---|---|
아스키(ASCII) (0) | 2022.01.05 |
[regex] 정규 표현식 기호 정리 (0) | 2021.12.04 |
[regex] Remove text between brackets (0) | 2021.12.04 |
[intelliJ] 자주 쓰는 단축키 (0) | 2021.10.13 |