๐Ÿ’ป ์‹ค์Šต: Airflow#

# Airflow ๊ด€๋ จ ํด๋” ๋ฐ ์œ ์ € ๊ถŒํ•œ ์„ค์ •
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
# ์ฐธ๊ณ ์‚ฌํ•ญ - CLI ์„ค์ •
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/airflow.sh'
chmod +x airflow.sh

# ์„ค์น˜ ์ •๋ณด ํ™•์ธ
./airflow.sh airflow info

์‹ค์Šต 1๏ธโƒฃ hello_world.py#

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def helloworld():
    print("hello world")

with DAG(
    dag_id="hello_world_dag",
    start_date=datetime(2023,5,25),
    schedule_interval="@hourly",
    catchup=False
    ) as dag:
    
    task1 = PythonOperator(
        task_id = "hello_world",
        python_callable=helloworld
    )

task1
# Airflow ์‹คํ–‰
# ์ฒ˜์Œ ์‹คํ–‰ ์‹œ, 2๋ถ„ ๋‚ด๋กœ ์‹คํ–‰
docker-compose up -d

# ์ ‘์†
localhost:8080
- ID: airflow
- PW: airflow
# ์‚ฌ์šฉ ๋ฆฌ์†Œ์Šค ์ •๋ฆฌ
docker-compose down --volumes --rmi all

# ์ž‘์—… ํด๋” ์‚ญ์ œ
cd .. && rm -r airflow-docker

์‹ค์Šต 2๏ธโƒฃ download_rocket_launches.py#

# ์ž‘์—… ํด๋” ์ƒ์„ฑ
mkdir -p airflow-docker/dags && cd airflow-docker

# docker-compose.yaml ํŒŒ์ผ ์ƒ์„ฑ
cat <<EOF > docker-compose.yaml
version: '3.7'
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
  - AIRFLOW__CORE__EXECUTOR=LocalExecutor
  - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
  - AIRFLOW__CORE__LOAD_EXAMPLES=False
  - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
  - AIRFLOW__CORE__STORE_DAG_CODE=True
  - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
  - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True

x-airflow-image: &airflow_image apache/airflow:2.0.0-python3.8
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
services:
  postgres:
    image: postgres:12-alpine
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  init:
    image: *airflow_image
    depends_on:
      - postgres
    environment: *airflow_environment
    entrypoint: /bin/bash
    command: -c 'airflow db init && airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org'

  webserver:
    image: *airflow_image
    restart: always
    depends_on:
      - postgres
    ports:
      - "8080:8080"
    volumes:
      - logs:/opt/airflow/logs
    environment: *airflow_environment
    command: webserver

  scheduler:
    image: *airflow_image
    restart: always
    depends_on:
      - postgres
    volumes:
      - logs:/opt/airflow/logs
      - ./dags:/opt/airflow/dags
    environment: *airflow_environment
    command: scheduler

volumes:
  logs:
EOF
import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify
# ์•„๋ž˜์˜ ๋ถ€๋ถ„ ์ค‘, ls~ ๋Š” ์‹ค์ œ ๊ฒฝ๋กœ๊ฐ€ ์—†์œผ๋ฏ€๋กœ ํŒŒ์ผ ๋‚ด์—์„œ ์ˆ˜์ •
# bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',

# ์‹คํ–‰
docker-compose up -d

# ์ ‘์†
localhost:8080
ID: admin
PW: admin

# ์‹ค์Šต ์™„๋ฃŒ ํ›„, ๋ฆฌ์†Œ์Šค ์‚ญ์ œ
docker-compose down
cd .. && rm -r airflow-docker

3๏ธโƒฃ ์ถ”๊ฐ€ ์˜ˆ์‹œ ์†Œ๊ฐœ#

๋กœ๊ทธ ์ „์ฒ˜๋ฆฌ ๋ฐ ์ „๋‹ฌ (๋กœ๊ทธ ์„œ๋ฒ„ ๋ฐ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ํ•„์š”)#

Step by step: build a data pipeline with Airflow

GitHub - kyokin78/airflow: Build a data pipeline with Apache Airflow

๋กœ๊ทธ ์ฒ˜๋ฆฌ ๊ณผ์ •#

../../_images/7_4_1.png

DAG ๊ตฌ์„ฑ#

../../_images/7_4_2.png