spark 를 통해 JDBC 를 통해 데이터베이스의 테이블을 불러올 때 사용하는 옵션에 대해서 정리해보려고 한다.
Spark JDBC To Other Databases
해당 내용은 야래의 spark 공식 문서에서 확인할 수 있다.
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
문서를 보면 다음과 같이 설명이 되어있다.
Spark SQL 은 JDBC 를 사용해서 다른 데이터베이스들로부터 데이터를 읽을 수 있는 data source 를 포함하고 있다.
이 기능은 JdbcRDD 를 사용하는 것보다 선호된다. 그 이유는 DataFrame 으로 결과를 반환할 수 있고 더 쉽게 Spark SQL 을 사용해 쉽게 처리할 수 있고 또 다른 데이터 소스들과 조인할 수 있기 때문이다.
JDBC data source 는 사용자가 CalssTag 를 제공하지 않아도 되기 때문에 Java 또는 Python 에서 사용하기 쉽다.
spark classpath 에서 특정 데이터베이스에 대한 JDBC driver 가 필요하다.
다음 예를 통해 spark 를 사용해 postgres 에 연결하는 방법을 알려주고 있다.
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
그럼 이제 Data Source Option 에 대해서 알아보고 정리해보려고 한다. Spark 는 다음과 같이 대소문자 구분 옵션을 지원한다고 한다.
그리고 JDBC 의 Data Source option 은 아래의 설정을 통해 설정할 수 있다.
- DataFrameReader 의 .option / .options method
- DataFrameWriter 의 .option / .options method
더 쉽게 말하면 DataFameReader 와 DataFrameWriter 의 .option 또는 .options 메서드를 통해 설정할 수 있다.
직접 사용해본다면 쉽게 이해할 수 있을 거라고 생각한다.
문서에 나와있는 예를 들어 다음과 같이 사용해볼 수 있다.
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
Data Source Option
그럼 기본적으로 자주 사용하는 option 들에 대해서 정리해봐야겠다.
이 외의 옵션들에 대해서는 필요할 때 문서를 참고하면 될 것 같다.
Property Name | Default | Meaning | Scope |
url | (none) | 데이터베이스에 연결하기 위한 JDBC URL [사용 방법] jdbc:subprotocol:subname [예시] jdbc:postgresql://localhost/test?user=fred&password=secret |
read/write |
query | (none) | Spark 를 통해 데이터를 읽어올 때 Query 를 사용할 수 있다. [사용 방법] SELECT <columns> FROM (<user_specified_query>) spark_gen_alias [예시] spark.read.format("jdbc") .option("url", jdbcUrl) .option("query", "select c1, c2 from t1") .load() [주의 사항] dbtable 옵션과 같이 사용할 수 없음 partitionColumn 옵션과 같이 사용할 수 없음 사용하고 싶다면, dbtable 옵션을 사용하여 하위 쿼리를 지정할 수 있고 dbtable 의 일부로 제공된 하위 쿼리 별칭을 사용하여 파티션 열을 한정할 수 있음 |
read/write |
driver | (none) | URL 에 연결하기 위해 사용하는 JDBC driver 의 class name. | read/write |
paritionColumn lowerBound upperBound |
(none) | 이 옵션을 따로 사용할 수 없고 다 같이 사용해야 한다. 하나씩 별개의 옵션을 지정할 수 없다. 3개의 옵션이 세트라고 생각하면 된다. 또한 numPartitions 옵션도 지정해주어야 한다. paritionColumn 은 파티션을 결정할 컬럼을 지정한다. 해당 테이블의 숫자, 날짜 또는 타임스탬프의 컬럼이어야 한다. lowerBound, upperBound 는 가져올 파티션의 범위를 지정해준다. 테이블의 행을 필터링하는 것이 아니라 partition stride 를 결정할 때 사용한다. 따라서 모든 행이 분할되어 반환된다. 이 옵션을 읽을때만 적용된다. [예시] spark.read.format("jdbc") .option("url", jdbcUrl) .option("dbtable", "(select c1, c2 from t1) as subq") .option("partitionColumn", "c1") .option("lowerBound", "1") .option("upperBound", "100") .option("numPartitions", "3") .load() |
read |
numPartitions | (none) | 테이블을 읽거나 쓸 때 병렬 처리에 사용할 수 있는 파티션의 최대 수를 말한다. 또한 JDBC connections 의 최대 수를 결정한다. 쓰기를 할 때 파티션의 수가 초과되면 쓰기 전에 coalesce(numPartitions) 를 호출해서 파티션의 수를 줄입니다. |
read/write |
fetchsize | 0 | JDBC 의 fetch size, 즉 한번 실행할 때마다 가져오는 행(row)의 개수. 기본적으로 낮은 fetch size 가 JDBC 드라이버 성능에 도움이 된다. 예를 들어, Oracle 은 10 rows 라고 한다. |
read |
batchsize | 1000 | JDBC 의 batch size, 즉 한번 실행할 때마다 저장하는 행(row) 의 개수. batchsize 를 설정하면 JDBC 드라이버 성능에 도움이 된다. 이 옵션은 write 할 때만 적용된다. |
write |
truncate | false | JDBC writer 와 관련된 옵션이다. SaveMode.Overwrite 가 enabled 일 때 이 옵션은 이미 존재하는 테이블을 drop 하는 대신에 truncate 하고 다시 생성한다. 테이블을 삭제했을 때의 메타데이터가 삭제되는 것을 방지할 수 있다. drop table 을 사용하기 위해서는 false 로 해야한다. 각 데이터베이스마다 truncate 마다 동작이 다르기 때문에 주의해야 한다. 참고로 PostgresDialect 는 JDBCDialect 를 지원하지 않는다고 한다. |
write |
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
Spark explode() 사용해서 List 로 된 컬럼을 행으로 분리하기 (0) | 2023.10.01 |
---|---|
Spark User Defined Functions (UDFs) (0) | 2023.10.01 |
Spark multi process error in macOS (0) | 2023.09.27 |
spark config 리스트 정리 (spark.config.set) (0) | 2023.09.17 |
Spark SQL function - ifnull(), nullif(), nvl(), nvl2() (0) | 2022.07.09 |