7.1 Introduction to Apache Airflow#
Airflow ๋?#
Pythonย ๊ธฐ๋ฐ์ย workflowย tool ์ ๋๋ค.
workflowย ๋ฅผย ๊ตฌ์ฑํ๋ย ๋ฐฉ๋ฒ์ย DAGย ์ย taskย ๋ผ๋ย ๋จ์๋กย ๋ณต์กํ๊ณ ย ๋ค์ํ๊ฒย ๊ตฌ์ฑ ๊ฐ๋ฅํฉ๋๋ค.
Task
: airflow์ operator, etl๋ฑ์ ์์ ๋ค์ด ๊ฐ๊ฐ์ task๊ฐ ๋๋ฉฐ, task๊ฐ ์์ ์ง์ ์ด ๊ฐ๋ฅํฉ๋๋ค.DAG(Directes Acyclic Graph)
: ๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ๋ก ๋ฐ์ดํฐํ์ดํ๋ผ์ธ ํ๋์ ๋จ์, ํ๋์ DAG์๋ ํ๋ ์ด์์ task๋ก ๊ตฌ์ฑ๋ฉ๋๋ค.Airflow๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ํ์คํฌ๋ฅผ ์์ฝํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ชจ๋ํฐ๋งํ๋
์น ์๋ฒ
,์ค์ผ์ค๋ฌ
๋ฐ์์ปค ํ๋ก์ธ์ค
๋ผ๋ ์ธ ๊ฐ์ง ์ฃผ์ ์ปดํฌ๋ํธ๋ก ๊ตฌ์ฑ!
๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
์ ๊ด๋ฆฌํ๊ธฐ ์ํ Airflow#
๐ก ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์
๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ(DAG)
์Task
์ ์ด์ ๋ํ์์กด์ฑ
์ ์ ์ํฉ๋๋ค.๐ก ์๋ ๊ฐ ํ์คํฌ ๊ทธ๋ํ๋ฅผ ์คํํ ์ ์๋ ์ฌ๋ฌ ์ํฌํ๋ก ๊ด๋ฆฌ ์์คํ ์ด ๊ฐ๋ฐ๋์๊ณ , ๊ทธ ์ค Airflow๋ ๋ฐฐ์น ์งํฅ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๊ตฌํ์ ์ํด ํนํ๋ ์ฃผ์ ๊ธฐ๋ฅ์ ๊ฐ์ง๊ณ ์์ต๋๋ค.
Airflow์ ์ฅ๋จ์ #
์ฅ์ :#
ํ์ด์ฌ ์ฝ๋๋ก ์์ฑ๋์ด, ์ฝ๊ณ ๊ฐ๋จํ๊ฒ ๋ฐฐ์น, ์ํฌํ๋ก์ฐ ๊ตฌ์ฑ์ด ๊ฐ๋ฅํฉ๋๋ค.
ํ์ด์ฌ ๊ธฐ๋ฐ์ผ๋ก ๊ตฌํ๋์ด ์๊ธฐ ๋๋ฌธ์, ํ์ด์ฌ ์ธ์ด์์ ๊ตฌํํ ์ ์๋ ๋๋ถ๋ถ์ ๋ฐฉ๋ฒ์ ์ฌ์ฉํ์ฌ ๋ณต์กํ ์ปค์คํ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ์ด ๊ฐ๋ฅํฉ๋๋ค.
ํ์ ํ
Backfill
๊ธฐ๋ฅ์ ์ฌ์ฉํ์ฌ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ๋ฅผ ์์ฝ๊ฒ ์ฌ์ฒ๋ฆฌํ ์ ์๊ธฐ ๋๋ฌธ์ ์ฝ๋๋ฅผ ๋ณ๊ฒฝํ ํ ์ฌ์์ฑ์ด ํ์ํ ๋ฐ์ดํฐ ์ฌ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํฉ๋๋ค.
๋จ์ :#
๋ฐ๋ณต์ ์ด๊ฑฐ๋ ๋ฐฐ์นํ์คํฌ์ ๊ธฐ๋ฅ์ด์ ์ด ๋ง์ถฐ์ ธ ์๊ธฐ ๋๋ฌธ์, ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ์ ํฉํ์ง ์์ต๋๋ค.
Airflow์ ์ฃผ์ ์ปดํฌ๋ํธ#
Airflow ์์ปค
: ์์ฝ๋ task๋ฅผ ์ ํํ๊ณ ์คํํฉ๋๋ค.Airflow ์ค์ผ์ค๋ฌ
: DAG๋ฅผ ๋ถ์, ํ ์์ ์์ DAG์ ์ค์ผ์ค์ด ์ง๋ ๊ฒฝ์ฐ ์์ปค์ DAG์ ํ์คํฌ๋ฅผ ์์ฝํฉ๋๋ค.Airflow ์น์๋ฒ
: ์ค์ผ์ค๋ฌ์์ ๋ถ์ํ DAG๋ฅผ ์๊ฐํํ๊ณ DAG ์คํ๊ณผ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ ์ฃผ์ ์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํฉ๋๋ค.
์ด์ธ ์ปดํฌ๋ํธ#
Airflow database
: ์ต์ด ์ค์น์ ๊ธฐ๋ณธ์ผ๋ก sqlite๊ฐ ๊ธฐ๋ณธ์ผ๋ก ์ค์น๋ฉ๋๋ค.Airflow queue
: ๋ฉํฐ๋ ธ๋ ๊ตฌ์ฑ์ธ ๊ฒฝ์ฐ์๋ง ์ฌ์ฉํฉ๋๋ค.
Airflow ์ค์ผ์ฅด๋ง#
๐ก Airflow์ ์ค์ผ์ค ๊ฐ๊ฒฉ์ Cron๊ตฌ๋ฌธ์ ํ์ฉํ์ฌ ์ฃผ๋ก ์ค์ ํฉ๋๋ค.
ํฌ๋ก ๊ตฌ๋ฌธ ์์ |
์ค๋ช |
---|---|
0 * * * * |
๋งค์๊ฐ (์ ์ ์คํ) |
0 0 * * * |
๋งค์ผ (์์ ์ ์คํ) |
0 0 * * 0 |
๋งค์ฃผ (์ผ์์ผ ์์ ์ ์คํ) |
0 0 1 * * |
๋งค์ 1์ผ ์์ |
45 23 * * SAT |
๋งค์ฃผ ํ ์์ผ 23์ 45๋ถ |
0 0 * * MON, WED, FRI |
๋งค์ฃผ ์, ํ, ๊ธ ์์ ์ ์คํ |
0 0 * * MON-FRI |
๋งค์ฃผ ์~๊ธ ์์ ์ ์คํ |
0 0,12 * * * |
๋งค์ผ ์์ ๋ฐ ์คํ 12์ ์คํ |
๋ฐฑํ(backfill)#
DAG์ ๊ณผ๊ฑฐ ์์ ์ ์ง์ ํด ์คํํ๋ ํ๋ก์ธ์ค ์ ๋๋ค.
DAG์ ๊ณผ๊ฑฐ ์์ ๋ ์ง๋ฅผ ์ง์ ํ๊ณ ํด๋น DAG๋ฅผ ํ์ฑํํ๋ฉด ๊ณผ๊ฑฐ ์์ ์ดํ๋ถํฐ ํ์ฌ์๊ฐ๊น์ง์ ๋ชจ๋ ์ค์ผ์ฅด ๊ฐ๊ฒฉ์ด ์์ฑ๋ฉ๋๋ค.
Backfill์ ๊ฒฝ์ฐ
catchup
์ด๋ผ๋ ๋งค๊ฐ๋ณ์์ ์ํด ์ ์ด๋ฉ๋๋ค. (True/False)
๐ ๋งค์ผ 0์ 0๋ถ์ ์คํ๋๋ DAG์์ #
DAG(
dag_id='test_dag',
schedule_interval='0 0 * * *',
start_date=datetime.datetime(2023, 1, 1)
)
์์ ๊ฐ์ด ์ค์ ํ๊ฒ ๋๋ฉด, ์ต์ด ์คํ์ผ์๋ 2023๋ 1์ 2์ผ 0์ 0๋ถ์ด๋ฉฐ, 2023๋ 1์ 1์ผ 0์ ~ 2023๋ 1์ 1์ผ 23์ 59๋ถ ๊น์ง์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ์ค์ผ๋ก ์์ ํฉ๋๋ค.
๋ง์ฝ 2021๋ 1์ 4์ผ์ start_date๊ฐ 2021-01-01์ธ DAG๋ฅผ ์์ฑํ์ฌ ONํ๋ค๋ฉด 2021-01-01, 2021-01-02, 2021-01-03์ ํด๋นํ๋ ์ค์ผ์ฅด์ด ํธ๋ฆฌ๊ฑฐ๋์ด ์คํ๋ฉ๋๋ค.
Start_date
: ์ฒ์ DAG๊ฐ ์คํ๋๋ ์๊ฐ์ด ์๋, COPYํ๊ธฐ ์์ํ ์๊ฐ ์ ๋๋ค.Execution_date
: ์คํ์๊ฐ์ด ์๋, ์ผ๋ จ์ ์ฃผ๋ฌธ๋ฒํธ ๊ฐ๋ (crontab์คํ์์ )์ฆ๋ถ ์ ์ฌํ๋ ๊ฒฝ์ฐ์๋ง ํ์ฉ ๊ฐ๋ฅํฉ๋๋ค.
ํ์ดํ๋ผ์ธ ์คํจ ํน์ ์ฌ์ฒ๋ฆฌ๊ฐ ํ์ํ ๊ฒฝ์ฐ ์ฌ์ฉํ๋ ๋ณ์๊ฐ ์ ๋๋ค.
Airflow๋ ๋ง๋ฅ?!#
Airflow๋ scheduling ๊ธฐ๋ฐ์ batch ์ฉ workflow ๋๊ตฌ์ ๋๋ค.
์๋์ ๊ฐ์ ๊ธฐ๋ฅ์ ์ด๋ ค์์ด ์์ต๋๋ค.
streaming ์์ (๋ง์ดํฌ๋ก ๋ฐฐ์น์์ผ๋ก ๋ถ๋ถ์ ์ผ๋ก ๊ตฌํ์ ๊ฐ๋ฅ)
๋ฌดํํ ์คํ๋๋ ์์
Airflow ์ธ๋ถ ์์์ ์ํด trigger ๋๋ scheduling ๋ฐฉ์ (API๋ก ํธ๋ฆฌ๊ฑฐ๋ ๊ฐ๋ฅ, ์ธ๋ถ API๊ฐ DAG์ ์ฐ๋ฅด๋ ๊ฑด ์ด๋ ค์)
์๋์ ๊ฐ์ ์์ ์ airflow์์ ์ ํฉํ์ง ์์ ์ ์์ต๋๋ค.
์ง์ฐ์ ํ์ฉํ์ง ์๋ ์์ ์ ์ค์ผ์ค๋ง
Airflow worker ๋ด์์ ๊ณ ๋ถํ ์์ ์ ์ฒ๋ฆฌ (๋ด๋ถ์์ ๊ณ ์ฐ์ฐ ์์ ์ ํ๊ธฐ๋ณด๋จ ์ ์ถํ๋ ํํ๋ก ๊ตฌํํฉ๋๋ค.)