๐ป ์ค์ต: Airflow#
# Airflow ๊ด๋ จ ํด๋ ๋ฐ ์ ์ ๊ถํ ์ค์
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
# ์ฐธ๊ณ ์ฌํญ - CLI ์ค์
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/airflow.sh'
chmod +x airflow.sh
# ์ค์น ์ ๋ณด ํ์ธ
./airflow.sh airflow info
์ค์ต 1๏ธโฃ hello_world.py#
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def helloworld():
print("hello world")
with DAG(
dag_id="hello_world_dag",
start_date=datetime(2023,5,25),
schedule_interval="@hourly",
catchup=False
) as dag:
task1 = PythonOperator(
task_id = "hello_world",
python_callable=helloworld
)
task1
# Airflow ์คํ
# ์ฒ์ ์คํ ์, 2๋ถ ๋ด๋ก ์คํ
docker-compose up -d
# ์ ์
localhost:8080
- ID: airflow
- PW: airflow
# ์ฌ์ฉ ๋ฆฌ์์ค ์ ๋ฆฌ
docker-compose down --volumes --rmi all
# ์์
ํด๋ ์ญ์
cd .. && rm -r airflow-docker
์ค์ต 2๏ธโฃ download_rocket_launches.py#
# ์์
ํด๋ ์์ฑ
mkdir -p airflow-docker/dags && cd airflow-docker
# docker-compose.yaml ํ์ผ ์์ฑ
cat <<EOF > docker-compose.yaml
version: '3.7'
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
x-environment: &airflow_environment
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
- AIRFLOW__CORE__STORE_DAG_CODE=True
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
x-airflow-image: &airflow_image apache/airflow:2.0.0-python3.8
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
services:
postgres:
image: postgres:12-alpine
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
init:
image: *airflow_image
depends_on:
- postgres
environment: *airflow_environment
entrypoint: /bin/bash
command: -c 'airflow db init && airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org'
webserver:
image: *airflow_image
restart: always
depends_on:
- postgres
ports:
- "8080:8080"
volumes:
- logs:/opt/airflow/logs
environment: *airflow_environment
command: webserver
scheduler:
image: *airflow_image
restart: always
depends_on:
- postgres
volumes:
- logs:/opt/airflow/logs
- ./dags:/opt/airflow/dags
environment: *airflow_environment
command: scheduler
volumes:
logs:
EOF
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501
dag=dag,
)
def _get_pictures():
# Ensure directory exists
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
# Download all pictures in launches.json
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)
notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
dag=dag,
)
download_launches >> get_pictures >> notify
# ์๋์ ๋ถ๋ถ ์ค, ls~ ๋ ์ค์ ๊ฒฝ๋ก๊ฐ ์์ผ๋ฏ๋ก ํ์ผ ๋ด์์ ์์
# bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
# ์คํ
docker-compose up -d
# ์ ์
localhost:8080
ID: admin
PW: admin
# ์ค์ต ์๋ฃ ํ, ๋ฆฌ์์ค ์ญ์
docker-compose down
cd .. && rm -r airflow-docker
3๏ธโฃ ์ถ๊ฐ ์์ ์๊ฐ#
๋ก๊ทธ ์ ์ฒ๋ฆฌ ๋ฐ ์ ๋ฌ (๋ก๊ทธ ์๋ฒ ๋ฐ ์ํ ๋ฐ์ดํฐ ํ์)#
Step by step: build a data pipeline with Airflow
GitHub - kyokin78/airflow: Build a data pipeline with Apache Airflow