728x90
반응형

Pyspark 에서 Window 함수를 사용해 rank 나 row number 등과 같은 결과를 계산할 수 있다.
직접 사용해보았지만 제대로 이해하고 사용하고 있는 것 같지 않아서 정리해보려고  한다.

 

 

Window Functions

Pyspark window 함수는 frame 과 partition 과 같은 행(row) 의 그룹에서 입력된 모든 row 를 하나의 값으로 반환한다.
Window 함수의 핵심은 여러 개의 row 를 하나의 값으로 사용한다는 것이다.

 

Pyspark window 함수는 3가지 종류의 함수로 구분된다고 한다.

  • Ranking Functions
  • Analytic Functions
  • Aggregate Functions

 

아래의 테이블은 Window Functions 를 정리한 함수들이다. 참고하면 좋을 것 같다.

출처 - https://sparkbyexamples.com/pyspark/pyspark-window-functions/

 

 

Window Functions 직접 사용해보기

Window 함수가 무엇을 말하는지 이해해보았으니 직접 사용해보면서 익혀보려고 한다.

 

다음과 같이 하나의 Pyspark Dataframe 을 생성해주었다.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pyspark_window_functions").getOrCreate()

columns = ["class", "std_name", "std_gender", "std_avg"]

simpleData = (
    ("class1", "James", "male", 88),
    ("class2", "Michael", "female", 90),
    ("class2", "Robert", "male", 85),
    ("class3", "Maria", "female", 78),
    ("class1", "James", "male", 88),
    ("class3", "Scott", "male", 85),
    ("class2", "Jen", "female", 85),
    ("class3", "Jeff", "male", 90),
    ("class1", "Kumar", "male", 80),
)


df = spark.createDataFrame(data=simpleData, schema=columns)

df.printSchema()

df.show(truncate=False)

 

실행 결과

root
 |-- class: string (nullable = true)
 |-- std_name: string (nullable = true)
 |-- std_gender: string (nullable = true)
 |-- std_avg: long (nullable = true)

+------+--------+----------+-------+
|class |std_name|std_gender|std_avg|
+------+--------+----------+-------+
|class1|James   |male      |88     |
|class2|Michael |female    |90     |
|class2|Robert  |male      |85     |
|class3|Maria   |female    |78     |
|class1|James   |male      |88     |
|class3|Scott   |male      |85     |
|class2|Jen     |female    |85     |
|class3|Jeff    |male      |90     |
|class1|Kumar   |male      |80     |
+------+--------+----------+-------+

 

이 데이터를 가지고 Window 함수를 사용해볼 예정이다.

 

 

학급 별 학생들의 번호 부여하기 - row_number()

학생들을 학급 별로 번호를 부여하고싶다. 그러면 다음과 같이 코드를 작성할 수 있다.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

number_windowSpec = Window.partitionBy("class").orderBy("std_name")

df.withColumn("row_number", row_number().over(number_windowSpec)).show()

 

실행 결과

+------+--------+----------+-------+----------+
| class|std_name|std_gender|std_avg|row_number|
+------+--------+----------+-------+----------+
|class1|   James|      male|     88|         1|
|class1|   James|      male|     88|         2|
|class1|   Kumar|      male|     80|         3|
|class2|     Jen|    female|     85|         1|
|class2| Michael|    female|     90|         2|
|class2|  Robert|      male|     85|         3|
|class3|    Jeff|      male|     90|         1|
|class3|   Maria|    female|     78|         2|
|class3|   Scott|      male|     85|         3|
+------+--------+----------+-------+----------+

 

실행 결과를 확인해보면 class 별로 구분되어 Row 의 번호가 생성된 것을 확인할 수 있었다.
만약 partitionBy 를 해서 파티션을 구분하지 않으면 어떻게 될까?

 

다음과 같이 경고 문구가 출력된다.

24/03/14 16:03:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/14 16:03:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/03/14 16:03:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

 

경고 문구는 Window Operation 을 위한 파티션이 정의되어있지 않다고 하고 모든 데이터를 단일 파티션으로 설정하게 되면 성능 저하가 발생할 수 있다고 한다.

 

 

학급별 학생들의 평균 점수로 성적 순위 구하기 - rank()

이번에는 학급별로 학생들의 평균 점수를 가지고 성적 순위를 구해보려고 한다.
rank() 함수를 사용해서 구해보았다.

from pyspark.sql.functions import rank

rank_windowSpec = Window.partitionBy("class").orderBy("str_avg")

df.withColumn("rank", rank().over(rank_windowSpec)).show()

 

실행 결과

+------+--------+----------+-------+----+
| class|std_name|std_gender|std_avg|rank|
+------+--------+----------+-------+----+
|class1|   Kumar|      male|     80|   1|
|class1|   James|      male|     88|   2|
|class1|   James|      male|     88|   2|
|class2|  Robert|      male|     85|   1|
|class2|     Jen|    female|     85|   1|
|class2| Michael|    female|     90|   3|
|class3|   Maria|    female|     78|   1|
|class3|   Scott|      male|     85|   2|
|class3|    Jeff|      male|     90|   3|
+------+--------+----------+-------+----+

 

실행 결과를 확인해보니 가장 낮은 평균 점수가 순위가 높게 나와서 다음과 같이 내림차순으로 다시 정렬해주었다.
내림 차순으로 정렬하기 위해서 desc() 함수를 추가로 불러와서 적용해주었다.

from pyspark.sql.functions import rank, desc

rank_windowSpec = Window.partitionBy("class").orderBy(desc("std_avg"))

df.withColumn("rank", rank().over(rank_windowSpec)).show()

 

실행 결과

+------+--------+----------+-------+----+
| class|std_name|std_gender|std_avg|rank|
+------+--------+----------+-------+----+
|class1|   James|      male|     88|   1|
|class1|   James|      male|     88|   1|
|class1|   Kumar|      male|     80|   3|
|class2| Michael|    female|     90|   1|
|class2|  Robert|      male|     85|   2|
|class2|     Jen|    female|     85|   2|
|class3|    Jeff|      male|     90|   1|
|class3|   Scott|      male|     85|   2|
|class3|   Maria|    female|     78|   3|
+------+--------+----------+-------+----+

 

평균 점수대로 순위가 입력된 것을 확인할 수 있다.

 

그런데 여기서 같은 점수인 경우 순위가 같게 나오고 중간 순위를 제외하고나서 다음 순위가 오게 된다.
예를 들어, 아래의 결과와 같이 90점, 90점, 88점 의 평균 점수가 있을 때
90점 1등, 90점 1등, 88점 3등 이 되어 중간에 2등이 사라지게 된다.

|class1|   James|      male|     88|   1|
|class1|   James|      male|     88|   1|
|class1|   Kumar|      male|     80|   3|

 

만약에 중간에 순위를 제외하지 않고 연속적으로 사용하고 싶다면 dense_rank() 함수를 사용할 수 있다.

from pyspark.sql.functions import dense_rank, desc

rank_windowSpec = Window.partitionBy("class").orderBy(desc("std_avg"))

df.withColumn("rank", dense_rank().over(rank_windowSpec)).show()

 

실행 결과

+------+--------+----------+-------+----+
| class|std_name|std_gender|std_avg|rank|
+------+--------+----------+-------+----+
|class1|   James|      male|     88|   1|
|class1|   James|      male|     88|   1|
|class1|   Kumar|      male|     80|   2|
|class2| Michael|    female|     90|   1|
|class2|  Robert|      male|     85|   2|
|class2|     Jen|    female|     85|   2|
|class3|    Jeff|      male|     90|   1|
|class3|   Scott|      male|     85|   2|
|class3|   Maria|    female|     78|   3|
+------+--------+----------+-------+----+

 

실행 결과를 보면 아까와 다르게 1등이 2명이어서 2등이 사라지는 것이 아니라 2등도 포함되어 나오게 된다.
따라서, dense_rank() 함수를 사용하게 되면 중간의 gap 이 사라지게 된다.

|class1|   James|      male|     88|   1|
|class1|   James|      male|     88|   1|
|class1|   Kumar|      male|     80|   2|

 

 

학급 별 평균, 최고, 최저 점수 구하기 - avg(), min(), max()

이번에는 학급별로 평균 점수와 최고, 최저 점수를 구해보자.
avg(), min(), max() 함수를 통해 구해주었다.

from pyspark.sql.functions import avg, min, max

calc_windowSpec = Window.partitionBy("class")

calc_df = (
    df.withColumn("avg", avg("std_avg").over(calc_windowSpec))
    .withColumn("min", min("std_avg").over(calc_windowSpec))
    .withColumn("max", max("std_avg").over(calc_windowSpec))
)

calc_df.select("class", "avg", "min", "max").dropDuplicates(["class"]).show()

 

실행 결과

+------+-----------------+---+---+
| class|              avg|min|max|
+------+-----------------+---+---+
|class1|85.33333333333333| 80| 88|
|class2|86.66666666666667| 85| 90|
|class3|84.33333333333333| 78| 90|
+------+-----------------+---+---+

 

 

자 이렇게 학생 데이터를 통해 window 함수를 다루어보았는데 내가 사용한 window 함수 외에도 다양한 함수가 있다.
아직 다 써보지는 않았지만 어떻게 사용하는지 이해했다고 생각이 되고 앞으로도 spark 를 사용할 예정이기 때문에 사용할 기회가 된다면 내가 원하는 컬럼에 맞게 window 함수를 사용해서 만들어보봐야겠다.

 

 

참고 사이트

https://sparkbyexamples.com/pyspark/pyspark-window-functions/

 

PySpark Window Functions - Spark By {Examples}

PySpark Window functions are used to calculate results, such as the rank, row number, etc., over a range of input rows. In this article, I've explained

sparkbyexamples.com

 

728x90
반응형
복사했습니다!