CS

[PostgreSQL] Tibero to PostgreSQL 개별 테이블 이관하기 (pandas 사용)

뭉치v

0. 이기종 DBMS 마이그레이션?

 이기종 DBMS 테이블 이관은 오라클의 expdp/impdp 처럼 DBMS에서 제공하는 방식들을 적용 할수 없어 고민이 필요한 작업이다.

 보통 1)CSV 파일 EXPORT/IMPORT 2)이기종 DBLINK 3)별도 솔루션으로 테이블을 넘기곤 하는데 대용량인 경우 1번 csv 파일로 떨궈서 넘기는건 거의 불가능하다😮
 이번에 Tibero의 특정 테이블을 PostgreSQL로 이관 업무가 생겨서, 많은 레퍼런스들을 기대하며 폭풍 구글링 시작.. 하지만 소스가 티베로인 이관 프로그램은 찾기가 어려웠다.. 어라..없나?😥

이후 유사 업무가 반복 될 것 같아 파이썬 코드로 ODBC로 티베로에 연결 후 데이터를 가져와 타겟 Postgresql에 넣는 기능을 구현했다.

postgresql tibero to postgresql


소스(Tibero) ↔ 로컬(window) ↔ 타겟(PostgreSQL)  5GB 테이블 이관 작업

1. 로컬(window) 환경에서 python으로 SOURCE, TARGET 연결하기

Python 설치 버전에 맞는 ODBC 32bit 또는 64bit 설치가 필요하다.
둘이 bit가 다르면 "ERROR [IM014][Microsoft][ODBC 드라이버 관리자] 지정된 DSN은 드라이버와 응용 프로그램 간 아키텍처 불일치를 포함합니다." 이 에러가 출력된다=.=

1) Windows 용 티베로 ODBC 64bit 설치

- 티베로 사이트에서 다운로드 진행
- https://technet.tmax.co.kr/ko/front/download/viewDownload.do?cmProductCode=0301&version_seq=PVER-20150504-000001&doc_type_cd=DN#binary
- tibero6-bin-FS07_CS_2005-windows64_2008-254895-20221012002446.zip
2) ODBC 환경설정
- ./client/win64/bin/tbodbc_driver_installer_6_64.exe 실행
3) ODBC 64bit에 DSN 생성해주기

2. 데이터 읽고 쓰기 진행

1) pyodbc와 pandas로 tibero 데이터 읽어오기 (from Tibero)
2) sqlalchemy와 pandas로 postgresql로 데이터 넣어주기 (to PostgreSQL)
3) [이슈핸들] 소스 데이터에 0x00 처리
- PostgreSQL에서 0x00을 인식 할 수 없어서 해당 값을 전부 replace 해주는 전처리 진행
- (참고) PostgreSQL의 bytea 데이터타입이 0x00을 저장 가능하다는데, 어차피 다시 변환해주어야 되는 번거로움이 있어 아예 소스 읽어오면서 replace로 진행
4) 이관 속도 개선 진행
- postgresql의 insert 속도는 많이 알려진대로 COPY >>>> multiple row insert > single row 순이다.
- pandas의 to_sql 함수에서 method값은 insert 방식을 정의하는데, default는 single row 이고, multi는 multiple row를 지원한다.
- multi로 설정후 진행했는데, 너무 느려 다시 폭풍 구글링 시작🤔

- 찾다보니 pandas에서 COPY를 활용 할 수 있는 방법을 제공하는데, 아래 코드에 psql_insert_copy 로 정의하여 사용했다.
- multi로 170초 걸리는게 psql_insert_copy 적용 후 5초로 줄었다. (WoW)
- 구글링하면 engine 파라미터 추가, batch 작업 처리 등등 과거 버전의 게시글이 많은데, 현재 기준으로는psql_insert_copy 방식 쓰는게 가장 빠르고 간편하다고 결론을 내렸다.
5) 기타 정보
- pandas 의 if_exists='replace'로 하면 target 테이블 drop 후 create 부터 진행된다.
- sqlalchemy.types로 타겟쪽 타입을 하나씩 조정할 수 있다. (DB에 DDL은 미리 해놓고 데이터만 옮기는게 편해보임)
- data type을 명시하지 않으면 소스 데이터의 형식에 따라서 float, text 등으로 결정 된다.

3. Python source code

  • Migration Tibero to PostgreSQL for one table
    • 커스텀 필요 : source,target connection info, v_sql, TARGET_TABLE_NAME, TARGET_SCHEMA_NAME
    • 필요 python package 설치 : pip install psycopg2-binary sqlalchemy pandas
# Created by (024.05.13)
from sqlalchemy import create_engine
import pandas as pd
import time
import warnings
import pyodbc
import re
from io import StringIO
import csv

# ignore warning messages
warnings.filterwarnings(action='ignore')

# ----------------------------------
# 0. Setting DB Connection info of Source, Target.
# SOURCE : Tibero(ODBC 연결)
# TARGET : PostgreSQL
# ----------------------------------
def tibero_connect():
    DSN_NAME = "SOURCE_STG"
    DB_USER = "DBA"
    DB_PW = "1!DBA"
    conn = pyodbc.connect('DSN=' + DSN_NAME + ';UID=' + DB_USER + ';PWD=' + DB_PW)
    return conn

def postgres_connect():
    DB_USER = 'DBA'
    DB_PW = '1!DBA'
    DB_NAME = 'AAA'
    HOST = '10.1.1.1'
    PORT = '5432'

    url = 'postgresql://{}:{}@{}:{}/{}'.format(DB_USER, DB_PW, HOST, PORT, DB_NAME)
    pg_eg = create_engine(url, client_encoding='utf8')
    return pg_eg

# ----------------------------------
# Custom insert method of to_sql
# ----------------------------------
def psql_insert_copy(table, conn, keys, data_iter):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

# ----------------------------------
# 1. Creating dataframe from source.
# ----------------------------------
print("Current : ", time.strftime(('%Y-%m-%d %H:%M:%S')))
start_time1 = time.time()
print('*---------------------*')
print('Start creating dataframe from source.')
tb_conn = tibero_connect()

v_sql = """
select *
from DBA_TEST a
"""

df = pd.read_sql(v_sql, tb_conn)

end_time1 = time.time()
print('End creating dataframe.')
print('*---------------------*')

# ----------------------------------
# 0x00 에러 전처리(특이 케이스, optional)
# ----------------------------------
# re_null = re.compile(pattern='\x00')
# df.replace(regex=re_null, value=' ', inplace=True)

# ----------------------------------
# 2. Insert dataframe into target.
# ----------------------------------
TARGET_TABLE_NAME = 'dba_test'
TARGET_SCHEMA_NAME = 'public'
print('*---------------------*')
print('Start df.to_sql into target.')
start_time2 = time.time()
pg_engine = postgres_connect()
df.to_sql(name=TARGET_TABLE_NAME,     # 타겟 테이블
          schema=TARGET_SCHEMA_NAME,                        # 타겟 스키마
          con=pg_engine,                            # 타겟 connection
          if_exists='append',                       # {'fail', 'replace:테이블 교체', 'append:기존 테이블 사용'), default 'fail'
          chunksize=10000,
          index=False,
          method=psql_insert_copy                   # Insert method {none:single row, 'multi':multiple row, psql_insert_copy:copy clause 사용}
          )
print('End df.to_sql.')
print('*---------------------*')

# Close tibero connection
tb_conn.close()

end_time2 = time.time()

# Summary
print('*---------------------*')
print("Total elapsed : %s seconds." % (round(time.time() - start_time1)))
print("Creating dataframe from source : %s seconds" % round((end_time1 - start_time1)))
print("Insert dataframe of to_sql : %s seconds" % round((end_time2 - start_time2)))
print('*---------------------*')

 

참고

 

반응형

댓글

💲 추천 글