airflow default_args error - Invalid arguments were: **kwargs: {'provide_context': False}
2024. 2. 7. 14:38
데이터 엔지니어링/Airflow
에러 발생 airlfow 를 사용하다가 아래와 같은 에러가 발생했다. airflow.exceptions.AirflowException: Invalid arguments were passed to _PythonDecoratedOperator (task_id: check_steps__1). Invalid arguments were: **kwargs: {'provide_context': False} 에러 내용은 Invalid arguments were: **kwargs: {'provide_context': False} 라는 내용으로 default_args 에 설정해준 provide_context 의 값이 False 되어있어서 발생한 에러로 보였다. 에러가 났을 때 dag default_args 의 내용은 다음..
airflow Default Arguments
2024. 2. 7. 11:27
데이터 엔지니어링/Airflow
airflow 에서 DAG 를 생성할 때 사용되는 default arguments 에 대해서 정리해보려고 한다. Default Arguments default_args 에 대해서 이해하기 위해 airflow concept 에서 다음과 같이 확인할 수 있었다. https://airflow.apache.org/docs/apache-airflow/2.6.2/core-concepts/dags.html#default-arguments DAGs — Airflow Documentation airflow.apache.org 설명을 해석해보면, DAG 안에 있는 많은 Operator 들이 같은 설정을 해주는 경우가 있는데 모든 Operator 들에게 개별적으로 지정하는 대신 DAG 를 생성할 때 default_args ..
airflow Dynamic Task Mapping
2023. 12. 28. 14:27
데이터 엔지니어링/Airflow
airflow 를 사용하면서 하나의 task 안에서 여러 개의 작업이 수행될 때 각 작업의 상태를 확인하거나 모든 작업에 동일한 값을 넘겨주고 싶은 경우가 있다. 이럴 때 airflow 제공하는 것이 airflow Dynamic Task Mapping 이라는 기능이다. 이 기능은 airflow 2.3.0 버전에서 새롭게 추가된 기능이라고 한다. 이 기능 외에도 다양한 기능이 추가된 것으로 알고 있다. airflow 2.3.0 의 release note 는 아래에서 확인할 수 있다. https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-3-0-2022-04-30 Release Notes — Airflow Docu..
3. 공공데이터포털 데이터 전처리하기(2)
2023. 1. 2. 19:08
데이터 엔지니어링/Airflow
개요 지난 번에 1704 건의 데이터를 전처리하기 앞서 1 건의 데이터를 가져와 내가 원하는 데이터로 변환하는 부분까지 해봤다. 여러 삽질 끝에 원하는 데이터를 만들 수 있었고 1 건의 데이터만 가져와서 테스트해봤기 때문에 이번에는 모든 데이터를 전처리해서 내가 원하는 데이터로 만들어봤다. 모든 데이터 전처리하기 나는 1704개의 데이터를 일괄 처리하기 힘들 것 같다고 생각이 들었고 한 건의 데이터마다 전처리 후에 추가해주면 되지 않을까 하는 생각이 들었다. 그래서 1건의 데이터를 가져와 전처리하고 추가해주는 소스를 작성하고 결과를 확인해봤다. 우선 기본적으로 이전에 적었던 내용을 기본으로 전처리를 진행했다. 크게 틀은 벗어나지 않았던 것 같다. 2. 공공데이터포털 데이터 전처리하기(1) 개요 공공데이터..
2. 공공데이터포털 데이터 전처리하기 (1)
2022. 12. 31. 20:47
데이터 엔지니어링/Airflow
개요 공공데이터포털에서 데이터를 json 타입으로 가져오는 것까지 해봤고 데이터를 가져오는 과정을 Airflow DAG 로 만들어 작업을 수행해 원하는 디렉토리에 저장하는 과정까지 해봤다. 이번에는 그렇게 가져온 json 타입의 데이터를 가져와 원하는 데이터로 만드는 과정을 진행해보려고 한다. 그렇게 원하는 데이터가 만들어지면 Mysql 에 테이블을 생성하고 테이블에 저장하는 과정까지 생각하고 있다. 그래서 나는 Pandas 를 사용해 전처리를 진행하려고 하고 이러한 과정을 jupyter Notebook 에서 사용해보려고 한다. 해보기 앞서, 나는 다음과 같이 데이터를 만들어보고 싶다. 아래와 같이 컬럼명으로 되어있는 시간을 값으로 넣고 그 시간에 대한 혼잡도를 같이 넣고 싶었다. 하고나서는 쉬울줄 알았..
1. 공공데이터포털 데이터 가져오기
2022. 12. 25. 23:37
데이터 엔지니어링/Airflow
공공데이터포털 데이터 가져오기 내가 사용하기로한 공공데이터포털의 데이터("서울교통공사_지하철혼잡도정보")를 가져오기 위해서 제공하는 API 를 통해 데이터를 가져올 수 있다. 포털에서 API 를 생성할 수 있도록 제공해주는 기능이 있는데 인증키를 등록하고 API 를 호출할 수 있는 URL 을 생성해준다. 그래서 다음과 같이 URL 을 실행하면 다음과 같이 데이터를 확인할 수 있다. curl -X GET "https://api.odcloud.kr/api/15071311/v1/uddi:b3803d43-ffe3-4d17-9024-fd6cfa37c284?page=1&perPage=10&serviceKey=[서비스키]" -H "accept: application/json" -H "Authorization: [인증..
0. Airflow 데이터 파이프라인 만들어보기 (토이 프로젝트)
2022. 12. 25. 22:50
데이터 엔지니어링/Airflow
개요 개인적으로 공부할겸 Airflow 를 사용해서 데이터 파이프라인을 만들어보려고 한다. 어떤 데이터를 사용해볼까 생각하다 공공데이터포털 이란 곳에서 공공데이터를 제공해주고 있어 사용해보기로 했다. https://www.data.go.kr 공공데이터 포털 국가에서 보유하고 있는 다양한 데이터를『공공데이터의 제공 및 이용 활성화에 관한 법률(제11956호)』에 따라 개방하여 국민들이 보다 쉽고 용이하게 공유•활용할 수 있도록 공공데이터(Datase www.data.go.kr 공공데이터포털에서는 다양한 데이터 타입으로 데이터를 제공하고 있어 어떤 데이터를 다루어볼지 그리고 어떤 형식으로 데이터를 가져올지 등에 대해서 생각해봐야했다. 나는 수많은 데이터 중에서 서울교통공사_지하철혼잡도정보 라는 데이터를 가져..
[Error] airflow.exceptions.AirflowException: The webserver is already running under PID
2022. 11. 18. 18:50
데이터 엔지니어링/Airflow
Airflow 테스트 환경을 만들고 서버를 종료했다고 생각했는데 다시 서버를 실행하려고 하니 다음과 같은 에러가 발생했다. (airflow) ~/workspace/airflow $ airflow webserver --port 8080 ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ Traceback (most recent call last): File "/Users/jaynam/workspace/airflow/..