728x90
반응형

Spark 를 사용하다보면 UDFs 를 사용하여 새로운 column 을 만드는 경우가 많이 있다.
그래서 Spark UDFs 에 대해서 정확하게 무엇을 말하고 어떻게 사용하는 지에 대해서 정리해보려고 한다.

 

정리는 아래의 공식 문서를 참고해서 정리해보았다.

https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html

 

Scalar User Defined Functions (UDFs) - Spark 3.5.0 Documentation

 

spark.apache.org

 

 

Spark User-Defined Functions 

Spark User-Defined Functions (UDFs) 는 하나의 행에서 동작하는 사용자가 프로그래밍할 수 있는 루틴을 말한다.
쉽게 말해서 하나의 행을 가져와서 사용자가 프로그래밍을 통해 원하는 값을 얻을 수 있도록 해주는 것을 말한다.

 

실제로 사용해보지 않으면 이해가 어려우니 문서에 나와있는 예제들을 통해 이해해보려고 한다.

 

UDFs 의 속성을 정의하기 위해 사용자는 이 클래스에 정의된 일부 메서드를 사용할 수 있다고 한다.

  • asNonNullable(): UserDefinedFunction
    Updates UserDefinedFunction to non-nullable.
  • asNondeterministic(): UserDefinedFunction
    Updates UserDefinedFunction to nondeterministic.
  • withName(name: String): UserDefinedFunction
    Updates UserDefinedFunction with a given name.

 

Spark UDFs Scala example

문서에는 Scala 와 Java 언어를 통해 예시를 보여주고 있다.
나는 먼저 Scala 를 통해 사용해볼 예정이고 추가로 pyspark 를 통해 사용해볼 예정이다.
Spark-shell 을 실행하고 Scala 를 통해 예제를 작성해보았다.

 

먼저 SparkSession 을 생성해준다.
필요한 라이브러리를 import 하고 SparkSession 을 생성해주었다.

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.udf

scala> val spark = SparkSession.builder().appName("Spark SQL UDF scala example").getOrCreate()
23/10/01 16:56:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@3255707f

 

 

다음으로 udf 함수를 만들어보자.
먼저 random 이라는 Math.random() 함수를 통해 랜덤으로 숫자를 입력받는 UDF 를 정의한다.

scala> val random = udf(() => Math.random())
random: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2341/0x0000008801a62a50@7bea6d16,DoubleType,List(),Some(class[value[0]: double]),None,false,true)

 

 

정의된 UDF 를 등록할 때 asNondeterministic() 함수를 사용했는데 이 함수가 뭔지에 대해서도 자세히 알아봐야할 것 같다.
그리고 SQL 를 통해 UDF 의 결과를 확인해볼 수 있었다.

scala> spark.udf.register("random", random.asNondeterministic())
23/10/01 16:58:50 WARN SimpleFunctionRegistry: The function random replaced a previously registered function.
res0: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2341/0x0000008801a62a50@7bea6d16,DoubleType,List(),Some(class[value[0]: double]),Some(random),false,false)

scala> spark.sql("select random()").show()
+------------------+
|          random()|
+------------------+
|0.2942297413711151|
+------------------+

 

 

다음 예제는 하나의 argument 를 받는 UDF 함수이다.
x 라는 argument 를 입력받아 +1 을 해주는 UDF 를 작성하고 등록한 다음에 SQL 을 통해 UDF 함수의 결과를 확인해볼 수 있었다.

scala> val plusOne = udf((x: Int) => x + 1)
plusOne: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3817/0x000000880208d800@3695fa3a,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),None,false,true)

scala> spark.udf.register("plusOne", plusOne)
res2: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3817/0x000000880208d800@3695fa3a,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),Some(plusOne),false,true)

scala> spark.sql("select plusOne(5)").show()
+----------+
|plusOne(5)|
+----------+
|         6|
+----------+

 

 

다음 예제는 2개의 argument 를 입력받는 함수를 정의하고 등록까지 한번에 할 수 있었다.

scala> spark.udf.register("strLenScala", (_: String).length + (_: Int))
res4: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3970/0x00000088020cd108@574a5837,IntegerType,List(Some(class[value[0]: string]), Some(class[value[0]: int])),Some(class[value[0]: int]),Some(strLenScala),false,true)

scala> spark.sql("select strLenScala('test', 1)").show()
+--------------------+
|strLenScala(test, 1)|
+--------------------+
|                   5|
+--------------------+

 

 

다음 예제는 WHERE 절에서 사용할 수 있는 UDF 이다.

scala> spark.udf.register("oneArgFilter", (n: Int) => { n > 5 })
res6: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4008/0x0000008802036000@349d1ecc,BooleanType,List(Some(class[value[0]: int])),Some(class[value[0]: boolean]),Some(onArgFilter),false,true)

scala> spark.range(1, 10).createOrReplaceTempView("test")

scala> spark.sql("select * from test where oneArgFilter(id)").show()
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

 

 

그럼 이번에는 직접 조금 더 복잡한 코드를 작성해보려고 한다.
이름과 태어난 연도가 들어있는 Dataframe 을 만들었고 태어난 연도를 가져와 나이를 구하는 UDF 를 작성했고
withColumn 함수를 통해 UDF 를 사용해서 새로운 컬럼을 만들어주었다.

scala> val df = Seq(("Nam", 1993), ("Park", 1995), ("Min", 1997)).toDF("name", "birth_year")
df: org.apache.spark.sql.DataFrame = [name: string, birth_year: int]

scala> df.show()
+----+----------+
|name|birth_year|
+----+----------+
| Nam|      1993|
|Park|      1995|
| Min|      1997|
+----+----------+

scala> val calculateAge = udf((birth_year: Int) => { 2023 - birth_year + 1 })
calculateAge: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3127/0x0000007001ee8000@28353636,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),None,false,true)

scala> val df_with_age = df.withColumn("age", calculateAge(col("birth_year")))
df_with_age: org.apache.spark.sql.DataFrame = [name: string, birth_year: int ... 1 more field]

scala> df_with_age.show()
+----+----------+---+
|name|birth_year|age|
+----+----------+---+
| Nam|      1993| 31|
|Park|      1995| 29|
| Min|      1997| 27|
+----+----------+---+

 

 

Spark UDFs 를 어떻게 사용하느냐에 따라서 내가 원하는 결과를 만들어낼 수 있었고
UDFs 를 통해 보다 유연하게 데이터를 만들거나 가공할 수 있다는 것이 매우 좋게 느껴졌다.
또한 UDFs 를 잘 활용한다면 보다 쉽게 데이터를 가공할 수 있지 않을까 하는 생각도 해본다.

 

추가로 pyspark UDF 에 대해서도 아래의 문서를 참고해서 사용해볼 수 있다.

https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

 

PySpark UDF (User Defined Function) - Spark By {Examples}

PySpark UDF Example PySpark UDF (a.k.a User Defined Function) is the most useful feature of Spark SQL & DataFrame that is used to extend the PySpark

sparkbyexamples.com

 

728x90
반응형
복사했습니다!