Pyspark Window function
Pyspark 에서 Window 함수를 사용해 rank 나 row number 등과 같은 결과를 계산할 수 있다.
직접 사용해보았지만 제대로 이해하고 사용하고 있는 것 같지 않아서 정리해보려고 한다.
Window Functions
Pyspark window 함수는 frame 과 partition 과 같은 행(row) 의 그룹에서 입력된 모든 row 를 하나의 값으로 반환한다.
Window 함수의 핵심은 여러 개의 row 를 하나의 값으로 사용한다는 것이다.
Pyspark window 함수는 3가지 종류의 함수로 구분된다고 한다.
- Ranking Functions
- Analytic Functions
- Aggregate Functions
아래의 테이블은 Window Functions 를 정리한 함수들이다. 참고하면 좋을 것 같다.
Window Functions 직접 사용해보기
Window 함수가 무엇을 말하는지 이해해보았으니 직접 사용해보면서 익혀보려고 한다.
다음과 같이 하나의 Pyspark Dataframe 을 생성해주었다.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark_window_functions").getOrCreate()
columns = ["class", "std_name", "std_gender", "std_avg"]
simpleData = (
("class1", "James", "male", 88),
("class2", "Michael", "female", 90),
("class2", "Robert", "male", 85),
("class3", "Maria", "female", 78),
("class1", "James", "male", 88),
("class3", "Scott", "male", 85),
("class2", "Jen", "female", 85),
("class3", "Jeff", "male", 90),
("class1", "Kumar", "male", 80),
)
df = spark.createDataFrame(data=simpleData, schema=columns)
df.printSchema()
df.show(truncate=False)
실행 결과
root
|-- class: string (nullable = true)
|-- std_name: string (nullable = true)
|-- std_gender: string (nullable = true)
|-- std_avg: long (nullable = true)
+------+--------+----------+-------+
|class |std_name|std_gender|std_avg|
+------+--------+----------+-------+
|class1|James |male |88 |
|class2|Michael |female |90 |
|class2|Robert |male |85 |
|class3|Maria |female |78 |
|class1|James |male |88 |
|class3|Scott |male |85 |
|class2|Jen |female |85 |
|class3|Jeff |male |90 |
|class1|Kumar |male |80 |
+------+--------+----------+-------+
이 데이터를 가지고 Window 함수를 사용해볼 예정이다.
학급 별 학생들의 번호 부여하기 - row_number()
학생들을 학급 별로 번호를 부여하고싶다. 그러면 다음과 같이 코드를 작성할 수 있다.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
number_windowSpec = Window.partitionBy("class").orderBy("std_name")
df.withColumn("row_number", row_number().over(number_windowSpec)).show()
실행 결과
+------+--------+----------+-------+----------+
| class|std_name|std_gender|std_avg|row_number|
+------+--------+----------+-------+----------+
|class1| James| male| 88| 1|
|class1| James| male| 88| 2|
|class1| Kumar| male| 80| 3|
|class2| Jen| female| 85| 1|
|class2| Michael| female| 90| 2|
|class2| Robert| male| 85| 3|
|class3| Jeff| male| 90| 1|
|class3| Maria| female| 78| 2|
|class3| Scott| male| 85| 3|
+------+--------+----------+-------+----------+
실행 결과를 확인해보면 class 별로 구분되어 Row 의 번호가 생성된 것을 확인할 수 있었다.
만약 partitionBy 를 해서 파티션을 구분하지 않으면 어떻게 될까?
다음과 같이 경고 문구가 출력된다.
24/03/14 16:03:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/14 16:03:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/14 16:03:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
경고 문구는 Window Operation 을 위한 파티션이 정의되어있지 않다고 하고 모든 데이터를 단일 파티션으로 설정하게 되면 성능 저하가 발생할 수 있다고 한다.
학급별 학생들의 평균 점수로 성적 순위 구하기 - rank()
이번에는 학급별로 학생들의 평균 점수를 가지고 성적 순위를 구해보려고 한다.
rank() 함수를 사용해서 구해보았다.
from pyspark.sql.functions import rank
rank_windowSpec = Window.partitionBy("class").orderBy("str_avg")
df.withColumn("rank", rank().over(rank_windowSpec)).show()
실행 결과
+------+--------+----------+-------+----+
| class|std_name|std_gender|std_avg|rank|
+------+--------+----------+-------+----+
|class1| Kumar| male| 80| 1|
|class1| James| male| 88| 2|
|class1| James| male| 88| 2|
|class2| Robert| male| 85| 1|
|class2| Jen| female| 85| 1|
|class2| Michael| female| 90| 3|
|class3| Maria| female| 78| 1|
|class3| Scott| male| 85| 2|
|class3| Jeff| male| 90| 3|
+------+--------+----------+-------+----+
실행 결과를 확인해보니 가장 낮은 평균 점수가 순위가 높게 나와서 다음과 같이 내림차순으로 다시 정렬해주었다.
내림 차순으로 정렬하기 위해서 desc() 함수를 추가로 불러와서 적용해주었다.
from pyspark.sql.functions import rank, desc
rank_windowSpec = Window.partitionBy("class").orderBy(desc("std_avg"))
df.withColumn("rank", rank().over(rank_windowSpec)).show()
실행 결과
+------+--------+----------+-------+----+
| class|std_name|std_gender|std_avg|rank|
+------+--------+----------+-------+----+
|class1| James| male| 88| 1|
|class1| James| male| 88| 1|
|class1| Kumar| male| 80| 3|
|class2| Michael| female| 90| 1|
|class2| Robert| male| 85| 2|
|class2| Jen| female| 85| 2|
|class3| Jeff| male| 90| 1|
|class3| Scott| male| 85| 2|
|class3| Maria| female| 78| 3|
+------+--------+----------+-------+----+
평균 점수대로 순위가 입력된 것을 확인할 수 있다.
그런데 여기서 같은 점수인 경우 순위가 같게 나오고 중간 순위를 제외하고나서 다음 순위가 오게 된다.
예를 들어, 아래의 결과와 같이 90점, 90점, 88점 의 평균 점수가 있을 때
90점 1등, 90점 1등, 88점 3등 이 되어 중간에 2등이 사라지게 된다.
|class1| James| male| 88| 1|
|class1| James| male| 88| 1|
|class1| Kumar| male| 80| 3|
만약에 중간에 순위를 제외하지 않고 연속적으로 사용하고 싶다면 dense_rank() 함수를 사용할 수 있다.
from pyspark.sql.functions import dense_rank, desc
rank_windowSpec = Window.partitionBy("class").orderBy(desc("std_avg"))
df.withColumn("rank", dense_rank().over(rank_windowSpec)).show()
실행 결과
+------+--------+----------+-------+----+
| class|std_name|std_gender|std_avg|rank|
+------+--------+----------+-------+----+
|class1| James| male| 88| 1|
|class1| James| male| 88| 1|
|class1| Kumar| male| 80| 2|
|class2| Michael| female| 90| 1|
|class2| Robert| male| 85| 2|
|class2| Jen| female| 85| 2|
|class3| Jeff| male| 90| 1|
|class3| Scott| male| 85| 2|
|class3| Maria| female| 78| 3|
+------+--------+----------+-------+----+
실행 결과를 보면 아까와 다르게 1등이 2명이어서 2등이 사라지는 것이 아니라 2등도 포함되어 나오게 된다.
따라서, dense_rank() 함수를 사용하게 되면 중간의 gap 이 사라지게 된다.
|class1| James| male| 88| 1|
|class1| James| male| 88| 1|
|class1| Kumar| male| 80| 2|
학급 별 평균, 최고, 최저 점수 구하기 - avg(), min(), max()
이번에는 학급별로 평균 점수와 최고, 최저 점수를 구해보자.
avg(), min(), max() 함수를 통해 구해주었다.
from pyspark.sql.functions import avg, min, max
calc_windowSpec = Window.partitionBy("class")
calc_df = (
df.withColumn("avg", avg("std_avg").over(calc_windowSpec))
.withColumn("min", min("std_avg").over(calc_windowSpec))
.withColumn("max", max("std_avg").over(calc_windowSpec))
)
calc_df.select("class", "avg", "min", "max").dropDuplicates(["class"]).show()
실행 결과
+------+-----------------+---+---+
| class| avg|min|max|
+------+-----------------+---+---+
|class1|85.33333333333333| 80| 88|
|class2|86.66666666666667| 85| 90|
|class3|84.33333333333333| 78| 90|
+------+-----------------+---+---+
자 이렇게 학생 데이터를 통해 window 함수를 다루어보았는데 내가 사용한 window 함수 외에도 다양한 함수가 있다.
아직 다 써보지는 않았지만 어떻게 사용하는지 이해했다고 생각이 되고 앞으로도 spark 를 사용할 예정이기 때문에 사용할 기회가 된다면 내가 원하는 컬럼에 맞게 window 함수를 사용해서 만들어보봐야겠다.
참고 사이트
https://sparkbyexamples.com/pyspark/pyspark-window-functions/
PySpark Window Functions - Spark By {Examples}
PySpark Window functions are used to calculate results, such as the rank, row number, etc., over a range of input rows. In this article, I've explained
sparkbyexamples.com