728x90
반응형

spark 를 통해 JDBC 를 통해 데이터베이스의 테이블을 불러올 때 사용하는 옵션에 대해서 정리해보려고 한다.

 

Spark JDBC To Other Databases

해당 내용은 야래의 spark 공식 문서에서 확인할 수 있다.

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

 

JDBC To Other Databases - Spark 3.5.0 Documentation

 

spark.apache.org

 

문서를 보면 다음과 같이 설명이 되어있다.

 

Spark SQL 은 JDBC 를 사용해서 다른 데이터베이스들로부터 데이터를 읽을 수 있는 data source 를 포함하고 있다.
이 기능은 JdbcRDD 를 사용하는 것보다 선호된다. 그 이유는 DataFrame 으로 결과를 반환할 수 있고 더 쉽게 Spark SQL 을 사용해 쉽게 처리할 수 있고 또 다른 데이터 소스들과 조인할 수 있기 때문이다.
JDBC data source 는 사용자가 CalssTag 를 제공하지 않아도 되기 때문에 Java 또는 Python 에서 사용하기 쉽다.

 

spark classpath 에서 특정 데이터베이스에 대한 JDBC driver 가 필요하다.
다음 예를 통해 spark 를 사용해 postgres 에 연결하는 방법을 알려주고 있다.

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

 

 

그럼 이제 Data Source Option 에 대해서 알아보고 정리해보려고 한다. Spark 는 다음과 같이 대소문자 구분 옵션을 지원한다고 한다.
그리고 JDBC 의 Data Source option 은 아래의 설정을 통해 설정할 수 있다.

 

  • DataFrameReader 의 .option / .options method
  • DataFrameWriter  의 .option / .options method

 

더 쉽게 말하면 DataFameReaderDataFrameWriter 의 .option 또는 .options 메서드를 통해 설정할 수 있다.
직접 사용해본다면 쉽게 이해할 수 있을 거라고 생각한다.

 

문서에 나와있는 예를 들어 다음과 같이 사용해볼 수 있다.

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

 

 

Data Source Option

그럼 기본적으로 자주 사용하는 option 들에 대해서 정리해봐야겠다.
이 외의 옵션들에 대해서는 필요할 때 문서를 참고하면 될 것 같다.

Property Name Default Meaning Scope
url (none) 데이터베이스에 연결하기 위한 JDBC URL

[사용 방법]
jdbc:subprotocol:subname

[예시]
jdbc:postgresql://localhost/test?user=fred&password=secret
read/write
query (none) Spark 를 통해 데이터를 읽어올 때 Query 를 사용할 수 있다.

[사용 방법]
SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

[예시]
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()

[주의 사항]
dbtable 옵션과 같이 사용할 수 없음
partitionColumn 옵션과 같이 사용할 수 없음
사용하고 싶다면, dbtable 옵션을 사용하여 하위 쿼리를 지정할 수 있고 dbtable 의 일부로 제공된 하위 쿼리 별칭을 사용하여 파티션 열을 한정할 수 있음
read/write
driver (none) URL 에 연결하기 위해 사용하는 JDBC driver 의 class name. read/write
paritionColumn
lowerBound
upperBound
(none) 이 옵션을 따로 사용할 수 없고 다 같이 사용해야 한다.
하나씩 별개의 옵션을 지정할 수 없다. 3개의 옵션이 세트라고 생각하면 된다.
또한 numPartitions 옵션도 지정해주어야 한다.

paritionColumn 은 파티션을 결정할 컬럼을 지정한다.
해당 테이블의 숫자, 날짜 또는 타임스탬프의 컬럼이어야 한다.

lowerBound, upperBound 는 가져올 파티션의 범위를 지정해준다.
테이블의 행을 필터링하는 것이 아니라 partition stride 를 결정할 때 사용한다. 따라서 모든 행이 분할되어 반환된다. 이 옵션을 읽을때만 적용된다.

[예시]
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
read
numPartitions (none) 테이블을 읽거나 쓸 때 병렬 처리에 사용할 수 있는 파티션의 최대 수를 말한다. 또한 JDBC connections 의 최대 수를 결정한다.
쓰기를 할 때 파티션의 수가 초과되면 쓰기 전에 coalesce(numPartitions) 를 호출해서 파티션의 수를 줄입니다.
read/write
fetchsize 0 JDBC 의 fetch size, 즉 한번 실행할 때마다 가져오는 행(row)의 개수.
기본적으로 낮은 fetch size 가 JDBC 드라이버 성능에 도움이 된다.
예를 들어, Oracle 은 10 rows 라고 한다.
read
batchsize 1000 JDBC 의 batch size, 즉 한번 실행할 때마다 저장하는 행(row) 의 개수.
batchsize 를 설정하면 JDBC 드라이버 성능에 도움이 된다.
이 옵션은 write 할 때만 적용된다.
write
truncate false JDBC writer 와 관련된 옵션이다.
SaveMode.Overwrite 가 enabled 일 때 이 옵션은 이미 존재하는 테이블을 drop 하는 대신에 truncate 하고 다시 생성한다.
테이블을 삭제했을 때의 메타데이터가 삭제되는 것을 방지할 수 있다.
drop table 을 사용하기 위해서는 false 로 해야한다.
각 데이터베이스마다 truncate 마다 동작이 다르기 때문에 주의해야 한다.
참고로 PostgresDialect 는 JDBCDialect 를 지원하지 않는다고 한다.
write

 

728x90
반응형
복사했습니다!