Spark 를 사용하다보면 UDFs 를 사용하여 새로운 column 을 만드는 경우가 많이 있다.
그래서 Spark UDFs 에 대해서 정확하게 무엇을 말하고 어떻게 사용하는 지에 대해서 정리해보려고 한다.
정리는 아래의 공식 문서를 참고해서 정리해보았다.
https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
Spark User-Defined Functions
Spark User-Defined Functions (UDFs) 는 하나의 행에서 동작하는 사용자가 프로그래밍할 수 있는 루틴을 말한다.
쉽게 말해서 하나의 행을 가져와서 사용자가 프로그래밍을 통해 원하는 값을 얻을 수 있도록 해주는 것을 말한다.
실제로 사용해보지 않으면 이해가 어려우니 문서에 나와있는 예제들을 통해 이해해보려고 한다.
UDFs 의 속성을 정의하기 위해 사용자는 이 클래스에 정의된 일부 메서드를 사용할 수 있다고 한다.
- asNonNullable(): UserDefinedFunction
Updates UserDefinedFunction to non-nullable. - asNondeterministic(): UserDefinedFunction
Updates UserDefinedFunction to nondeterministic. - withName(name: String): UserDefinedFunction
Updates UserDefinedFunction with a given name.
Spark UDFs Scala example
문서에는 Scala 와 Java 언어를 통해 예시를 보여주고 있다.
나는 먼저 Scala 를 통해 사용해볼 예정이고 추가로 pyspark 를 통해 사용해볼 예정이다.
Spark-shell 을 실행하고 Scala 를 통해 예제를 작성해보았다.
먼저 SparkSession 을 생성해준다.
필요한 라이브러리를 import 하고 SparkSession 을 생성해주었다.
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf
scala> val spark = SparkSession.builder().appName("Spark SQL UDF scala example").getOrCreate()
23/10/01 16:56:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3255707f
다음으로 udf 함수를 만들어보자.
먼저 random 이라는 Math.random() 함수를 통해 랜덤으로 숫자를 입력받는 UDF 를 정의한다.
scala> val random = udf(() => Math.random())
random: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2341/0x0000008801a62a50@7bea6d16,DoubleType,List(),Some(class[value[0]: double]),None,false,true)
정의된 UDF 를 등록할 때 asNondeterministic() 함수를 사용했는데 이 함수가 뭔지에 대해서도 자세히 알아봐야할 것 같다.
그리고 SQL 를 통해 UDF 의 결과를 확인해볼 수 있었다.
scala> spark.udf.register("random", random.asNondeterministic())
23/10/01 16:58:50 WARN SimpleFunctionRegistry: The function random replaced a previously registered function.
res0: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2341/0x0000008801a62a50@7bea6d16,DoubleType,List(),Some(class[value[0]: double]),Some(random),false,false)
scala> spark.sql("select random()").show()
+------------------+
| random()|
+------------------+
|0.2942297413711151|
+------------------+
다음 예제는 하나의 argument 를 받는 UDF 함수이다.
x 라는 argument 를 입력받아 +1 을 해주는 UDF 를 작성하고 등록한 다음에 SQL 을 통해 UDF 함수의 결과를 확인해볼 수 있었다.
scala> val plusOne = udf((x: Int) => x + 1)
plusOne: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3817/0x000000880208d800@3695fa3a,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),None,false,true)
scala> spark.udf.register("plusOne", plusOne)
res2: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3817/0x000000880208d800@3695fa3a,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),Some(plusOne),false,true)
scala> spark.sql("select plusOne(5)").show()
+----------+
|plusOne(5)|
+----------+
| 6|
+----------+
다음 예제는 2개의 argument 를 입력받는 함수를 정의하고 등록까지 한번에 할 수 있었다.
scala> spark.udf.register("strLenScala", (_: String).length + (_: Int))
res4: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3970/0x00000088020cd108@574a5837,IntegerType,List(Some(class[value[0]: string]), Some(class[value[0]: int])),Some(class[value[0]: int]),Some(strLenScala),false,true)
scala> spark.sql("select strLenScala('test', 1)").show()
+--------------------+
|strLenScala(test, 1)|
+--------------------+
| 5|
+--------------------+
다음 예제는 WHERE 절에서 사용할 수 있는 UDF 이다.
scala> spark.udf.register("oneArgFilter", (n: Int) => { n > 5 })
res6: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4008/0x0000008802036000@349d1ecc,BooleanType,List(Some(class[value[0]: int])),Some(class[value[0]: boolean]),Some(onArgFilter),false,true)
scala> spark.range(1, 10).createOrReplaceTempView("test")
scala> spark.sql("select * from test where oneArgFilter(id)").show()
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
그럼 이번에는 직접 조금 더 복잡한 코드를 작성해보려고 한다.
이름과 태어난 연도가 들어있는 Dataframe 을 만들었고 태어난 연도를 가져와 나이를 구하는 UDF 를 작성했고
withColumn 함수를 통해 UDF 를 사용해서 새로운 컬럼을 만들어주었다.
scala> val df = Seq(("Nam", 1993), ("Park", 1995), ("Min", 1997)).toDF("name", "birth_year")
df: org.apache.spark.sql.DataFrame = [name: string, birth_year: int]
scala> df.show()
+----+----------+
|name|birth_year|
+----+----------+
| Nam| 1993|
|Park| 1995|
| Min| 1997|
+----+----------+
scala> val calculateAge = udf((birth_year: Int) => { 2023 - birth_year + 1 })
calculateAge: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3127/0x0000007001ee8000@28353636,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),None,false,true)
scala> val df_with_age = df.withColumn("age", calculateAge(col("birth_year")))
df_with_age: org.apache.spark.sql.DataFrame = [name: string, birth_year: int ... 1 more field]
scala> df_with_age.show()
+----+----------+---+
|name|birth_year|age|
+----+----------+---+
| Nam| 1993| 31|
|Park| 1995| 29|
| Min| 1997| 27|
+----+----------+---+
Spark UDFs 를 어떻게 사용하느냐에 따라서 내가 원하는 결과를 만들어낼 수 있었고
UDFs 를 통해 보다 유연하게 데이터를 만들거나 가공할 수 있다는 것이 매우 좋게 느껴졌다.
또한 UDFs 를 잘 활용한다면 보다 쉽게 데이터를 가공할 수 있지 않을까 하는 생각도 해본다.
추가로 pyspark UDF 에 대해서도 아래의 문서를 참고해서 사용해볼 수 있다.
https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
Spark explode() 함수 사용시 주의할 점 (0) | 2024.02.11 |
---|---|
Spark explode() 사용해서 List 로 된 컬럼을 행으로 분리하기 (0) | 2023.10.01 |
Spark multi process error in macOS (0) | 2023.09.27 |
Spark JDBC Data Source Option (0) | 2023.09.26 |
spark config 리스트 정리 (spark.config.set) (0) | 2023.09.17 |