9. Airflow

 

1. 기존 데이터 파이프라인(Cron Script)의 문제점

  1. 실패 복구
    언제 어떻게 다시 실행할 것인가?
  2. 모니터링
    잘 돌아가고 있는지 확인하기 어려움
  3. 의존성 관리
    데이터 파이프라인 간의 의존성이 있는 경우 상위 파이프라인이 잘 동작하고 있는지 파악이 어려움
  4. 확장성
    중앙화하여 관리하는 툴이 없어 분산된 환경에서 파이프라인들을 관리하기 어려움
  5. 배포
    새로운 workflow를 배포하기 어려움

2. Airflow

Airflow는 workflow를 작성하고 스케줄링하고 모니터링하는 작업을 프로그래밍할 수 있게 해주는 플랫폼

  • Python 프로그래밍 가능
  • 분산된 환경에서 확장성 있음
  • 웹 대시보드 (UI) 제공
  • 커스터마이징이 가능

3. Workflow

의존성(DAG)으로 연결된 작업(task)들의 집합

4. Airflow의 구성요소

  1. 웹 서버
    웹 대시보드 UI
  2. 스케줄러
    workflow가 언제 실행되는지 관리
  3. Metastore
    메타데이터 관리
  4. Executor
    테스크가 어떻게 실행되는지 정의
  5. Worker
    테스크를 실행하는 프로세스

5. Operator

작업(task)을 정의하는데 사용

  1. Action Operators
    실제 연산을 수행
  2. Transfer Operators
    데이터를 옮김
  3. Sensor Operators
    테스크를 실행시킬 트리거를 기다림

6. Task

Operator를 실행시키면 task가 된다
Task = Operator instance

7. Airflow의 구조

7.1 One Node Architecture

  • Web Server
  • Metastore
  • Scheduler
  • Executor
  1. DAG(task)를 작성하여 workflow를 생성
  2. DAG를 실행시킬 때 scheduler는 DagRun 오브젝트(DAG instance)를 생성
  3. DagRun 오브젝트는 task instance를 생성
  4. Worker가 task 수행 후 DagRun의 상태를 “완료”로 바꿔놓는다

8. 실습

8.1 설치 및 계정 생성

8.1.1 설치 및 실행

$ pip install apache-airflow
$ airflow webserver
$ airflow db init  # DB 초기화

8.1.2 계정 생성

$ airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

8.2 Scheduler 생성

$ airflow scheduler

9. 내장 Operators

9.1 BashOperator

Bash function 사용

9.2 PythonOperator

Python function 사용

9.3 EmailOperator

Email 전송 시 사용

10. DAG 작성 후 테스트

$ airflow tasks test nft-pipeline creating_table 2021-01-01

11. NFT 예제

요약글
from datetime import datetime
import json
from pandas import json_normalize

from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator


default_args = {
    'start_date': datetime(2023, 1, 1),
}

def _processing_nft(ti):
    data = ti.xcom_pull(task_ids='extract_nft')
    if data is None:
        raise ValueError("data is empty")

    assets         = data['assets'][0]
    asset_contract = assets['asset_contract']

    processed_nft = json_normalize({
        'token_id': assets['token_id'],
        'name': asset_contract['name'],
        'image_url': asset_contract['image_url']
    })
    processed_nft.to_csv('/tmp/processed_nft.csv', index=False, header=False)


with DAG(
    dag_id='nft-pipeline',
    schedule_interval='@daily',
    default_args=default_args,
    tags=['nft'],
    catchup=True
) as dag:

    creating_table = SqliteOperator(
        task_id='creating_table',
        sqlite_conn_id='db_sqlite',
        sql='''
            CREATE TABLE IF NOT EXISTS nfts (
                token_id    TEXT PRIMARY KEY,
                name        TEXT NOT NULL,
                image_url   TEXT NOT NULL
            )
        '''
    )

    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1',
    )

    extract_nft = SimpleHttpOperator(
        task_id='extract_nft',
        http_conn_id='opensea_api',
        endpoint='api/v1/assets?collection=doodles-official&limit=1',
        method='GET',
        response_filter=lambda res: json.loads(res.text),
        log_response=True
    )

    process_nft = PythonOperator(
        task_id='process_nft',
        python_callable=_processing_nft,
    )

    store_nft = BashOperator(
        task_id='store_nft',
        bash_command='echo -e ".separator ","\n.import /tmp/processed_nft.csv nfts" | sqlite3 /root/airflow/airflow.db'
    )

    creating_table >> is_api_available >> extract_nft >> process_nft >> store_nft