1. 기존 데이터 파이프라인(Cron Script)의 문제점
- 실패 복구
언제 어떻게 다시 실행할 것인가? - 모니터링
잘 돌아가고 있는지 확인하기 어려움 - 의존성 관리
데이터 파이프라인 간의 의존성이 있는 경우 상위 파이프라인이 잘 동작하고 있는지 파악이 어려움 - 확장성
중앙화하여 관리하는 툴이 없어 분산된 환경에서 파이프라인들을 관리하기 어려움 - 배포
새로운 workflow를 배포하기 어려움
2. Airflow
Airflow는 workflow를 작성하고 스케줄링하고 모니터링하는 작업을 프로그래밍할 수 있게 해주는 플랫폼
- Python 프로그래밍 가능
- 분산된 환경에서 확장성 있음
- 웹 대시보드 (UI) 제공
- 커스터마이징이 가능
3. Workflow
의존성(DAG)으로 연결된 작업(task)들의 집합
4. Airflow의 구성요소
- 웹 서버
웹 대시보드 UI - 스케줄러
workflow가 언제 실행되는지 관리 - Metastore
메타데이터 관리 - Executor
테스크가 어떻게 실행되는지 정의 - Worker
테스크를 실행하는 프로세스
5. Operator
작업(task)을 정의하는데 사용
- Action Operators
실제 연산을 수행 - Transfer Operators
데이터를 옮김 - Sensor Operators
테스크를 실행시킬 트리거를 기다림
6. Task
Operator를 실행시키면 task가 된다
Task = Operator instance
7. Airflow의 구조
7.1 One Node Architecture
- Web Server
- Metastore
- Scheduler
- Executor
- DAG(task)를 작성하여 workflow를 생성
- DAG를 실행시킬 때 scheduler는 DagRun 오브젝트(DAG instance)를 생성
- DagRun 오브젝트는 task instance를 생성
- Worker가 task 수행 후 DagRun의 상태를 “완료”로 바꿔놓는다
8. 실습
8.1 설치 및 계정 생성
8.1.1 설치 및 실행
$ pip install apache-airflow
$ airflow webserver
$ airflow db init # DB 초기화
8.1.2 계정 생성
$ airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
8.2 Scheduler 생성
$ airflow scheduler
9. 내장 Operators
9.1 BashOperator
Bash function 사용
9.2 PythonOperator
Python function 사용
9.3 EmailOperator
Email 전송 시 사용
10. DAG 작성 후 테스트
$ airflow tasks test nft-pipeline creating_table 2021-01-01
11. NFT 예제
요약글
from datetime import datetime
import json
from pandas import json_normalize
from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'start_date': datetime(2023, 1, 1),
}
def _processing_nft(ti):
data = ti.xcom_pull(task_ids='extract_nft')
if data is None:
raise ValueError("data is empty")
assets = data['assets'][0]
asset_contract = assets['asset_contract']
processed_nft = json_normalize({
'token_id': assets['token_id'],
'name': asset_contract['name'],
'image_url': asset_contract['image_url']
})
processed_nft.to_csv('/tmp/processed_nft.csv', index=False, header=False)
with DAG(
dag_id='nft-pipeline',
schedule_interval='@daily',
default_args=default_args,
tags=['nft'],
catchup=True
) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE IF NOT EXISTS nfts (
token_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
image_url TEXT NOT NULL
)
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='opensea_api',
endpoint='api/v1/assets?collection=doodles-official&limit=1',
)
extract_nft = SimpleHttpOperator(
task_id='extract_nft',
http_conn_id='opensea_api',
endpoint='api/v1/assets?collection=doodles-official&limit=1',
method='GET',
response_filter=lambda res: json.loads(res.text),
log_response=True
)
process_nft = PythonOperator(
task_id='process_nft',
python_callable=_processing_nft,
)
store_nft = BashOperator(
task_id='store_nft',
bash_command='echo -e ".separator ","\n.import /tmp/processed_nft.csv nfts" | sqlite3 /root/airflow/airflow.db'
)
creating_table >> is_api_available >> extract_nft >> process_nft >> store_nft
PREVIOUSEtc