πŸ’» μ‹€μŠ΅: Airflow#

# Airflow κ΄€λ ¨ 폴더 및 μœ μ € κΆŒν•œ μ„€μ •
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
# 참고사항 - CLI μ„€μ •
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/airflow.sh'
chmod +x airflow.sh

# μ„€μΉ˜ 정보 확인
./airflow.sh airflow info

μ‹€μŠ΅ 1️⃣ hello_world.py#

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def helloworld():
    print("hello world")

with DAG(
    dag_id="hello_world_dag",
    start_date=datetime(2023,5,25),
    schedule_interval="@hourly",
    catchup=False
    ) as dag:
    
    task1 = PythonOperator(
        task_id = "hello_world",
        python_callable=helloworld
    )

task1
# Airflow μ‹€ν–‰
# 처음 μ‹€ν–‰ μ‹œ, 2λΆ„ λ‚΄λ‘œ μ‹€ν–‰
docker-compose up -d

# 접속
localhost:8080
- ID: airflow
- PW: airflow
# μ‚¬μš© λ¦¬μ†ŒμŠ€ 정리
docker-compose down --volumes --rmi all

# μž‘μ—… 폴더 μ‚­μ œ
cd .. && rm -r airflow-docker

μ‹€μŠ΅ 2️⃣ download_rocket_launches.py#

# μž‘μ—… 폴더 생성
mkdir -p airflow-docker/dags && cd airflow-docker

# docker-compose.yaml 파일 생성
cat <<EOF > docker-compose.yaml
version: '3.7'
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
  - AIRFLOW__CORE__EXECUTOR=LocalExecutor
  - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
  - AIRFLOW__CORE__LOAD_EXAMPLES=False
  - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
  - AIRFLOW__CORE__STORE_DAG_CODE=True
  - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
  - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

x-airflow-image: &airflow_image apache/airflow:2.0.0-python3.8
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
services:
  postgres:
    image: postgres:12-alpine
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  init:
    image: *airflow_image
    depends_on:
      - postgres
    environment: *airflow_environment
    entrypoint: /bin/bash
    command: -c 'airflow db init && airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org'

  webserver:
    image: *airflow_image
    restart: always
    depends_on:
      - postgres
    ports:
      - "8080:8080"
    volumes:
      - logs:/opt/airflow/logs
    environment: *airflow_environment
    command: webserver

  scheduler:
    image: *airflow_image
    restart: always
    depends_on:
      - postgres
    volumes:
      - logs:/opt/airflow/logs
      - ./dags:/opt/airflow/dags
    environment: *airflow_environment
    command: scheduler

volumes:
  logs:
EOF
import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
# μ•„λž˜μ˜ λΆ€λΆ„ 쀑, ls~ λŠ” μ‹€μ œ κ²½λ‘œκ°€ μ—†μœΌλ―€λ‘œ 파일 λ‚΄μ—μ„œ μˆ˜μ •
# bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',

# μ‹€ν–‰
docker-compose up -d

# 접속
localhost:8080
ID: admin
PW: admin

# μ‹€μŠ΅ μ™„λ£Œ ν›„, λ¦¬μ†ŒμŠ€ μ‚­μ œ
docker-compose down
cd .. && rm -r airflow-docker

3️⃣ μΆ”κ°€ μ˜ˆμ‹œ μ†Œκ°œ#

둜그 μ „μ²˜λ¦¬ 및 전달 (둜그 μ„œλ²„ 및 μƒ˜ν”Œ 데이터 ν•„μš”)#

Step by step: build a data pipeline with Airflow

GitHub - kyokin78/airflow: Build a data pipeline with Apache Airflow

둜그 처리 κ³Όμ •#

../../_images/7_4_1.png

DAG ꡬ성#

../../_images/7_4_2.png