Postgresql Idle in transaction
2024. 3. 14. 18:10
데이터 엔지니어링/Database
다음 명령어를 통해 postgresql 의 활성화되어있는 프로스를 확인해보면 Idle in transaction 상태가 보인다. select * from pg_stat_activity; 찾아보니 이런 경우는 postgresql 에 transaction 이 잡혀있지만 아무런 동작도 하지 않으면 idle in transaction 상태가 된다고 한다. 이런 경우가 많이 발생하게 되면 connection 은 잡고있지만 아무것도 하지 않게 되어 리소스가 낭비된다. 이러한 문제를 해결하는 방법 중 하나는 다음과 같이 수동으로 해당 프로세스의 connection 을 terminate 해주어야 한다. terminate 를 할 때에는 process id 인 pid 를 가져와 terminate 를 실행하면 된다. sel..
Pyspark Window function
2024. 3. 14. 16:48
데이터 엔지니어링/Spark
Pyspark 에서 Window 함수를 사용해 rank 나 row number 등과 같은 결과를 계산할 수 있다. 직접 사용해보았지만 제대로 이해하고 사용하고 있는 것 같지 않아서 정리해보려고 한다. Window Functions Pyspark window 함수는 frame 과 partition 과 같은 행(row) 의 그룹에서 입력된 모든 row 를 하나의 값으로 반환한다. Window 함수의 핵심은 여러 개의 row 를 하나의 값으로 사용한다는 것이다. Pyspark window 함수는 3가지 종류의 함수로 구분된다고 한다. Ranking Functions Analytic Functions Aggregate Functions 아래의 테이블은 Window Functions 를 정리한 함수들이다. 참고하..
Postgresql auto increment 적용하기
2024. 2. 16. 19:13
데이터 엔지니어링/Database
Mysql 에서는 auto_increment 라는 속성을 부여해주면 자동으로 증가하게 만들어줄 수 있다. Postgres 에서는 auto_increment 라는 속성을 부여할 수 없기 때문에 다른 방법을 통해서 증가하게 만들어주어야 한다. 그래서 postgres 의 auto increment 에는 어떤 방법들이 있는지 정리해보려고 한다. Sequence 객체를 사용해서 auto increment 하기 postgres 에는 Sequence 라고 하는 number generator 가 있다. 더 쉽게 말해서 순차적인 값을 생성해주는 객체라고 생각하면 된다. Sequence 는 다음과 같이 명령어를 통해 생성할 수 있다. CREATE SEQUENCE seq_user_id INCREMENT 1 START 1 M..
Spark explode() 함수 사용시 주의할 점
2024. 2. 11. 18:16
데이터 엔지니어링/Spark
spark explode() 함수는 리스트 타입의 컬럼에서 각각의 element 를 하나의 row 로 펼쳐주는 기능을 한다. explode() 함수의 사용 방법에 대해서는 다음과 같이 이전에 정리해두었다. https://jaynamm.tistory.com/entry/Spark-explode-%EC%82%AC%EC%9A%A9%ED%95%B4%EC%84%9C-List-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%B6%84%EB%A6%AC%ED%95%98%EA%B8%B0 Spark explode() 사용해서 List 로 된 컬럼을 행으로 분리하기 Spark Dataframe 에 다음과 같이 리스트 형태로 들어간 컬럼이 있을 것이다. scala> val df = Seq(("Nam", List("A"..
airflow default_args error - Invalid arguments were: **kwargs: {'provide_context': False}
2024. 2. 7. 14:38
데이터 엔지니어링/Airflow
에러 발생 airlfow 를 사용하다가 아래와 같은 에러가 발생했다. airflow.exceptions.AirflowException: Invalid arguments were passed to _PythonDecoratedOperator (task_id: check_steps__1). Invalid arguments were: **kwargs: {'provide_context': False} 에러 내용은 Invalid arguments were: **kwargs: {'provide_context': False} 라는 내용으로 default_args 에 설정해준 provide_context 의 값이 False 되어있어서 발생한 에러로 보였다. 에러가 났을 때 dag default_args 의 내용은 다음..
airflow Default Arguments
2024. 2. 7. 11:27
데이터 엔지니어링/Airflow
airflow 에서 DAG 를 생성할 때 사용되는 default arguments 에 대해서 정리해보려고 한다. Default Arguments default_args 에 대해서 이해하기 위해 airflow concept 에서 다음과 같이 확인할 수 있었다. https://airflow.apache.org/docs/apache-airflow/2.6.2/core-concepts/dags.html#default-arguments DAGs — Airflow Documentation airflow.apache.org 설명을 해석해보면, DAG 안에 있는 많은 Operator 들이 같은 설정을 해주는 경우가 있는데 모든 Operator 들에게 개별적으로 지정하는 대신 DAG 를 생성할 때 default_args ..
airflow Dynamic Task Mapping
2023. 12. 28. 14:27
데이터 엔지니어링/Airflow
airflow 를 사용하면서 하나의 task 안에서 여러 개의 작업이 수행될 때 각 작업의 상태를 확인하거나 모든 작업에 동일한 값을 넘겨주고 싶은 경우가 있다. 이럴 때 airflow 제공하는 것이 airflow Dynamic Task Mapping 이라는 기능이다. 이 기능은 airflow 2.3.0 버전에서 새롭게 추가된 기능이라고 한다. 이 기능 외에도 다양한 기능이 추가된 것으로 알고 있다. airflow 2.3.0 의 release note 는 아래에서 확인할 수 있다. https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-3-0-2022-04-30 Release Notes — Airflow Docu..
Postgresql 컬럼 기본값(default) 설정
2023. 10. 5. 23:38
데이터 엔지니어링/Database
PostgreSQL 에서 컬럼을 설정할 때 기본값(default) 에 대해서 정리해보려고 한다. 기본값 (default) 이란 PostgreSQL 에서 컬럼의 Default 는 해당 컬럼에 데이터를 삽입할 때 명시적으로 값을 제공하지 않을 때 사용되는 기본값을 정의하는데 사용된다. 쉽게 말해서 어떠한 동작을 하지 않을 때 기본적으로 Default 에 명시된 값이 들어간다는 말이다. 예를 들어 Default 는 다음과 같이 설정할 수 있다. CREATE TABLE test_table ( id serial PRIMARY KEY, name VARCHAR(255) DEFAULT '아무개' ); name 에 아무런 값을 등록하지 않으면 '아무개' 라는 값이 들어가게 된다. 그리고 Default 는 다음과 같이 수..