📌 서론
이번 글에서는 간단하게 Airflow의 DAGs를 테스트해 보자. Dockerfile과 docker-compose.yml 파일을 이용해서 실행해 보는 걸로 시작해 보자
Airflow란?
Apache Airflow는 워크플로우 관리 플랫폼이다. 이를 통해 다양한 작업(task)을 자동화하고 관리할 수 있다. 복잡한 데이터 파이프라인, ETL 작업, 머신러닝 모델 학습 및 배포와 같은 반복적인 작업을 자동으로 실행하고 모니터링하는 데 사용된다.
Airflow 주요 기능:
- DAG라는 개념을 통해 작업들을 정의한다.
- 스케줄링: 특정 시간이나 주기에 맞춰 작업을 자동으로 실행한다.
- 의존성 관리: 작업 간의 의존성을 정의하여 특정 작업이 완료된 후에 다른 작업이 실행되도록 한다.
- 모니터링: 각 작업의 상태, 성공 또는 실패 여부를 시각적으로 확인할 수 있다.
DAG란?
DAG는 Directed Acyclic Graph의 약자로, 방향성 비순환 그래프를 의미한다. 이는 Airflow에서 작업의 흐름을 나타내는 구조이다. DAG는 개별 작업(task)들 간의 실행 순서를 정의하고, 그 작업들이 어떤 순서로 실행되어야 하는지를 보여준다.
DAG 주요 기능:
- Directed: 작업들은 특정한 방향으로 흐른다. 즉, 작업 A가 먼저 실행된 후 작업 B가 실행된다는 의미이다.
- Acyclic: 순환이 없어야 한다. 즉, 작업의 흐름이 되돌아가는 경우가 없어야 한다. 예를 들어, 작업 A가 작업 B에 의존하고, 작업 B가 다시 작업 A에 의존하는 순환 구조는 허용되지 않는다.
- Graph: 작업들이 그래프 형태로 구성된다. 각 노드는 개별 작업(task)을 나타내고, 엣지(edge)는 작업 간의 의존성을 나타낸다.
만약 데이터 파이프라인을 관리한다고 가정할 때:
- Task 1: 데이터 수집
- Task 2: 데이터 전처리
- Task 3: 데이터 분석
- Task 4: 분석 결과 저장
이러한 작업들은 특정한 순서로 실행되어야 한다. 먼저 데이터를 수집한 후, 전처리를 하고, 그다음 분석을 한 후 결과를 저장해야 한다. 이 작업들의 순서를 Airflow에서는 DAG로 정의하게 된다.
실습 진행할 트리구조
프로젝트 트리구조는 다음과 같다.
your_project/
│
├── dags/
│ └── your_dag_file.py # 여기에 DAG 파일이 있어야 한다.
├── tmp/
├── Dockerfile
└── docker-compose.yml
Dockerfile 생성
먼저 docker-compose.yml 파일에서 실행할 Dockerfile을 만들어주자.
FROM python:3.10-slim
# 환경 변수 설정
ENV AIRFLOW_HOME=/usr/local/airflow
# 시스템 업데이트 및 필요한 패키지 설치
RUN apt-get update && \
apt-get install -y gcc libc-dev
# Airflow 및 Slack Provider 설치
RUN pip install apache-airflow apache-airflow-providers-slack pandas scikit-learn
# Airflow 홈 디렉터리 생성 및 작업 디렉터리 설정
RUN mkdir -p $AIRFLOW_HOME
WORKDIR $AIRFLOW_HOME
# Airflow 데이터베이스 초기화
RUN airflow db init
# dags 폴더 복사
COPY dags/ $AIRFLOW_HOME/dags/
# 포트 노출
EXPOSE 8080
# Airflow 웹 서버 및 스케줄러 실행, 그리고 관리자 계정 생성
CMD airflow webserver -p 8080 & \
airflow scheduler
- ENV 명령을 통해 AIRFLOW_HOME이라는 환경 변수를 /usr/local/airflow(컨테이너 내부 경로)로 설정한다. 이는 Airflow가 사용할 홈 디렉터리를 지정하는 것으로, DAG 파일 및 설정 파일이 이 디렉터리에서 관리된다.
- COPY 명령어는 호스트 시스템에 있는 dags/ 폴더를 컨테이너 내의 $AIRFLOW_HOME/dags/ 경로로 복사한다. DAG 파일이 컨테이너 내부에서 실행될 수 있도록 복사하는 것이다.
- EXPOSE 8080 명령어는 컨테이너에서 사용하는 포트 8080을 외부로 노출한다. 이 포트를 통해 Airflow 웹 서버에 접근할 수 있다.
- CMD 명령어는 컨테이너가 시작될 때 실행될 명령어를 지정한다. Airflow 웹 서버와 스케줄러를 동시에 실행하고, 웹 서버는 포트 8080에서 실행된다. &를 사용하여 웹 서버와 스케줄러를 백그라운드에서 동시에 실행한다.
- Webserver는 주로 Airflow UI를 제공하는 역할을 한다. 이를 통해 DAG를 시각적으로 관리하고, 실행 상태나 로그 등을 확인할 수 있다. 그러나 webserver 자체는 DAG를 실제로 실행하지 않는다.
- Scheduler는 DAG를 실행하는 핵심 역할을 담당한다. DAG의 정의에 따라 스케줄 된 작업을 트리거하고, 실행할 작업의 상태를 관리한다. 스케줄러는 주기적으로 DAG들을 확인하고, 각 작업이 실행 가능한 시점이 되었을 때 실행을 트리거한다.
그래서 webserver는 UI만 제공하고, 실제로 DAG의 실행 및 작업 관리는 scheduler가 담당하기 때문에, 둘 다 동시에 실행해야 Airflow가 제대로 동작한다. UI에서 DAG를 트리거하거나 확인한 후, 그 작업이 실제로 실행되기 위해서는 스케줄러가 반드시 실행 중이어야 한다.
docker-compose.yml
version: "3" # Docker Compose의 버전을 정의한다. 버전 3은 네트워크, 볼륨 등 다양한 기능을 쉽게 설정할 수 있도록 지원한다.
services: # 하나 이상의 컨테이너(서비스)를 정의하는 섹션
airflow: # 서비스 이름 (컨테이너 이름)
build: . # Dockerfile 위치. 현재 디렉터리에서 Dockerfile을 사용하여 이미지를 빌드한다.
ports: # 호스트와 컨테이너 간의 포트 매핑을 정의한다.
- "8080:8080" # 호스트의 8080 포트를 컨테이너의 8080 포트로 매핑한다. Airflow 웹 UI는 8080 포트를 사용한다.
volumes: # 호스트와 컨테이너 간에 공유할 파일 또는 디렉터리를 정의한다.
- ./dags:/usr/local/airflow/dags # 호스트의 현재 디렉터리 내의 dags 폴더를 컨테이너의 /usr/local/airflow/dags로 마운트한다. 이를 통해 DAG 파일을 컨테이너 내에서 사용할 수 있다.
- ./tmp:/tmp
environment: # 환경 변수를 정의한다. 컨테이너 내에서 사용될 변수들을 설정한다.
- AIRFLOW__CORE__LOAD_EXAMPLES=False # 기본적으로 Airflow는 예제 DAGs를 로드한다. 이를 False로 설정하여 예제 DAGs를 로드하지 않도록 한다.
command: > # 컨테이너 실행 시 수행할 명령어를 정의한다.
bash -c "airflow db migrate && sleep 5 && airflow users create --username [유저 이름] --firstname [유저 이름] --lastname [유저 성] --role Admin --email [유저 이메일] --password [유저 비밀번호] && airflow webserver & airflow scheduler"
# 컨테이너 시작 시 Airflow 데이터베이스 마이그레이션을 수행하고, 기본 관리자 계정을 생성한다. 이후, 웹 서버와 스케줄러를 동시에 백그라운드에서 실행한다.
도커 컨테이너 실행
이제 docker-compose.yml 파일이 있는 경로에서 도커를 빌드하고 실행시켜 보자.
docker-compose up --build
백그라운드에서 실행하고 싶다면 -d 옵션을 추가해 주면 된다.
docker-compose up --build -d
중간에 no table such as job 뭐 이런 에러가 뜨면 다시 airflow db init 명령어를 실행해 주면 해결된다..(로컬에서는 명령어 한줄한줄 입력할 수 있지만 도커는 그게 아니기 때문에 그냥 도커 다 삭제하고 다시 빌드부터 해야 한다.)
Airflow 웹 UI 접속
Docker Compose가 성공적으로 실행되면, 웹 브라우저에서 Airflow UI에 접속할 수 있다.
위 주소에 접속하면 로그인 창이 뜰 텐데 아까 위에서 docker-compose.yml에 만들어준 계정(이메일&비밀번호)으로 로그인하면 된다!
이렇게 DAGs 목록이 보이면 성공한 것이다!
⚡️ dags 폴더 안에 여러 파일이 있는데 몇 개는 뜨고 몇 개는 안 뜬다면 아마 로드되고 있는 중일 것이다. 파일이 로드되는데 5~10분 정도 소요되는 경우도 있다.
⚡️ dags 폴더에 .py 파이썬 파일이 있는데 목록에 안 뜬다면 아마 경로문제일 가능성이 높다! 경로가 다 맞아도 목록에 안 뜬다면 아예 cfg 파일들을 다 삭제하고 다시 재실행해보면 웬만해서 다 해결된다....
DAG test 1 - 파이썬 함수 실행
실제로 DAG를 테스트해 보자. 아주 간단하게 파이썬 함수를 실행하는 태스크이다.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_hello():
print("Hello Airflow!")
# Define the DAG
with DAG('first_dags',
description='A simple DAG',
schedule_interval='0 0 * * *', # 분/시/일/월/요일 -> 0 0: 매일 자정에 실행
start_date=datetime(2024, 9, 25),
catchup=False) as dag:
# Task 1: Print "Hello Airflow!"
task1 = PythonOperator(task_id='print_hello_task',
python_callable=print_hello,
dag=dag)
# Task 2: Dummy task
task2 = DummyOperator(task_id='dummy_task',
dag=dag)
# Define the task dependencies
task1 >> task2
first_dag이라는 이름의 DAG가 매일 실행되도록 설정되었으며, 작업(task)으로는 start와 end라는 DummyOperator가 포함되어 있다. 이는 단순히 실행 흐름을 확인하기 위한 기본적인 DAG이다.
- catchup 옵션은 DAG의 스케줄이 과거의 실행 시간에 대해 어떻게 동작할지를 제어하는 설정이다. 기본적으로 Airflow는 DAG가 실행될 예정이었던 과거의 모든 실행 시간을 “따라잡기(catchup)“하려고 한다. 즉, 만약 DAG가 하루 동안 실행되지 않았다면, 그 하루 동안 미뤄졌던 실행을 모두 수행하려고 한다.
- catchup=True (기본값): DAG가 설정된 시작일 이후의 모든 예약된 실행을 한 번에 수행하려고 한다. 예를 들어, start_date가 1일로 설정되어 있고, 현재 날짜가 10일이라면, 1일부터 9일까지의 실행을 모두 수행하려고 시도한다.
- catchup=False: 과거의 예약된 실행을 무시하고, 현재 날짜의 실행만 수행한다. 즉, 과거에 실행되지 않은 DAG 실행은 무시된다.
- PythonOperator: 이 오퍼레이터는 실제로 작업을 수행할 때 사용된다. python_callable이라는 파라미터에 지정된 함수를 실행한다. 즉, 이 오퍼레이터는 특정한 Python 함수나 작업을 실행하는 데 사용된다.
- DummyOperator: 이 오퍼레이터는 아무런 작업을 수행하지 않는다. 주로 DAG의 작업 흐름을 시각적으로 정의하거나, 작업 간 의존성을 표현할 때 사용된다. 즉, 아무 일도 하지 않는 빈 작업을 정의할 때 사용한다.
- dag=dag의 의미는 Python 작업(또는 다른 작업)을 특정 DAG에 연결하는 것이다. 이는 DAG 내에 작업을 추가할 때 해당 작업이 어떤 DAG에 속하는지를 명확히 해주기 위한 설정이다.
- task1 >> task2는 DAG 내에서 작업 간의 의존성(dependency)을 정의하는 표현이다. 이는 작업 실행 순서를 나타내며, 먼저 task1이 실행된 후 task2가 실행되어야 함을 의미한다.
이렇게 파일을 만들어주면 목록에 다음과 같이 뜬다.
왼쪽 스위치를 켜주고(pause하지 않겠다는 의미) 상세 페이지에 들어가 보자
여기서 Graph 탭을 클릭하고 태스크(블록)를 클릭해야 Logs 탭이 나온다. 그 Logs 탭에 들어가 보자.
이렇게 Hello AirFlow! 가 잘 출력된 걸 확인할 수 있다!
실행이 완료되면 왼쪽에 어두운 초록으로 되는데 다시 재실행(재생버튼)을 하면 형광 초록으로 현황을 볼 수 있다.
DAG test 2 - 병렬 실행
이제 Airflow에서 작업을 병렬로 실행될 수 있는 DAG를 정의해 보자.
각 작업은 BashOperator를 사용하여 간단한 Bash 명령을 실행하며, 작업 간의 의존성을 설정하여 병렬로 실행되도록 한다.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
# DAG 기본 설정
default_args = {
'owner': 'admin',
'start_date': datetime(2024, 9, 25),
'retries': 1, # 실패 시 재시도 횟수
}
- owner: 이 DAG의 소유자를 admin으로 설정
- start_date: DAG가 처음 실행될 날짜를 2024년 9월 25일로 설정. 이는 DAG가 이 날짜 이후에만 실행될 수 있다는 의미이다
- retries: 작업이 실패했을 때 몇 번까지 재시도할지를 설정. 여기서는 1번까지 재시도하도록 설정되었다.
# DAG 정의
dag = DAG(
'example_parallel_dag', # DAG의 ID
default_args=default_args,
description='A DAG with tasks running in parallel',
schedule_interval='@daily', # 매일 실행
)
# Task 정의
task1 = BashOperator(
task_id='task1',
bash_command='echo "Task 1"',
dag=dag,
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "Task 2"',
dag=dag,
)
task3 = BashOperator(
task_id='task3',
bash_command='echo "Task 3"',
dag=dag,
)
# 의존성 설정: task1이 완료되면 task2와 task3이 병렬로 실행
task1 >> [task2, task3]
화면에서 로그를 보면 다음과 같이 잘 출력된 걸 확인할 수 있다.
DAG test 3 - slack 연동
slack에도 연동해서 추후에 모델의 결과를 slack 채널에 메시지를 보내는 방법으로도 사용해 볼 수 있다. 이번 코드는 아주 간단하게 slack 연동까지만 해보자
(아래 과정을 시작하기 전에 slack의 webhook을 미리 설정해놓아야 한다.)
from airflow import DAG
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
import os
os.environ['NO_PROXY'] = '*' # mac에서 airflow로 외부 요청할 때 이슈가 있음. 하여 해당 코드 추가 필요
dag = DAG(
dag_id="slack_test",
start_date=days_ago(1),
max_active_runs=1,
catchup=False,
schedule_interval="@once",
)
send_slack_message = SlackWebhookOperator(
task_id="send_slack",
slack_webhook_conn_id="slack_webhook",
message="Hello slack",
dag=dag,
)
send_slack_message
- SlackWebhookOperator: 이 오퍼레이터는 Airflow가 Slack Webhook을 통해 메시지를 보낼 수 있도록 설정하는 오퍼레이터이다.
이 상황에서 최초 실행은 에러가 난다. 에러를 해석해 보면 slack_webhook ID가 없어서 난 에러다. 이 ID는 수동으로 추가해 줄 수 있다.
메뉴에서 Admin > Connections에 들어가서 + 버튼을 클릭해서 Connection ID를 추가해 준다.
이렇게 slack_webhook ID를 추가해 주면 된다.
Webhook Token에는 Webhook URL 뒤에 붙어있는 값을 잘라서 입력해 주면 된다.
이렇게 추가해 주고 다시 실행해 주면 slack에 메시지가 잘 날아오는 걸 확인할 수 있다!!
DAG test 4 - 머신러닝 파이프라인 구축 (Single Model)
이번에는 Iris 데이터셋을 사용하여 RandomForestClassifier 모델을 학습, 저장, 평가하는 DAG이다. 각 단계는 Airflow의 PythonOperator를 사용하여 Task로 정의되어 있고, 데이터는 XCom(Cross Communication)이라는 Airflow 기능을 통해 Task 간에 전달된다.
XCom(Cross Communication)이란?
XCom은 Airflow에서 작업 간 데이터 공유를 가능하게 하는 메커니즘이다. XCom을 통해 한 작업에서 생성된 데이터를 다른 작업으로 전달할 수 있다.
- xcom_push: 데이터를 XCom에 저장하는 방법이다. key-value 쌍으로 데이터를 저장할 수 있다.
- xcom_pull: XCom에 저장된 데이터를 가져오는 방법이다. 이전 작업에서 저장된 데이터를 key를 사용하여 불러올 수 있다.
데이터 준비 작업
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris
import joblib # 모델을 파일로 저장하고 불러오기 위해 사용
default_args = {
'owner': 'admin',
'start_date': datetime(2023, 9, 22),
'retries': 1,
}
# 1. 데이터 준비 함수
def prepare_data(**context):
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = pd.Series(iris.target)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# XCom을 사용하여 데이터를 전달
context['ti'].xcom_push(key='X_train', value=X_train.to_json())
context['ti'].xcom_push(key='X_test', value=X_test.to_json())
context['ti'].xcom_push(key='y_train', value=y_train.to_json(orient='records'))
context['ti'].xcom_push(key='y_test', value=y_test.to_json(orient='records'))
위 함수는 Iris 데이터셋을 로드하고, train 및 test 데이터셋으로 분할한 후, XCom을 통해 다른 작업으로 데이터를 전달한다.
ti는 task instance를 의미하며, 예약어는 아니지만 Airflow의 컨텍스트 객체를 통해 제공되는 기본 객체 중 하나다. Airflow에서 작업을 실행할 때, 각 작업은 실행 시점의 TaskInstance 객체를 가지고 있으며, 이를 통해 해당 작업에 대한 여러 메타데이터에 접근할 수 있다.
context['ti']는 이 TaskInstance 객체를 가져와서 작업 간의 데이터를 관리하거나 상태를 확인할 수 있게 해 준다. 즉, ti는 TaskInstance 객체의 약어로 자주 사용된다.
모델 학습 작업
def train_model(**context):
ti = context['ti']
X_train = pd.read_json(ti.xcom_pull(key='X_train'))
y_train = pd.read_json(ti.xcom_pull(key='y_train'), typ='series')
# RandomForest 모델 학습
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# 모델을 파일로 저장
model_path = '/tmp/random_forest_model.pkl'
joblib.dump(model, model_path)
# 모델 파일 경로를 XCom에 저장
context['ti'].xcom_push(key='model_path', value=model_path)
이 함수는 XCom에서 학습 데이터를 가져와서 RandomForestClassifier를 학습시키고, 학습된 모델을 /tmp/random_forest_model.pkl에 저장한다.
- joblib.dump: 학습된 모델을 파일로 저장
- xcom_pull: prepare_data에서 전달된 데이터를 가져오기 위해 XCom에서 데이터를 꺼내 사용.
- xcom_push: 저장된 모델의 경로를 다른 작업에서 사용할 수 있도록 XCom에 저장.
모델 평가 작업
# 3. 모델 평가 함수 (파일에서 모델을 로드하여 평가)
def evaluate_model(**context):
ti = context['ti']
# XCom에서 모델 파일 경로를 가져와서 로드
model_path = ti.xcom_pull(key='model_path')
model = joblib.load(model_path)
X_test = pd.read_json(ti.xcom_pull(key='X_test'))
y_test = pd.read_json(ti.xcom_pull(key='y_test'), typ='series')
# 모델 예측 및 성능 평가
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"RandomForest Model Accuracy: {accuracy}")
# 성능을 XCom에 저장
context['ti'].xcom_push(key='performance', value=accuracy)
이 함수는 XCom에서 저장된 모델 파일을 가져와서 테스트 데이터를 사용해 예측을 수행하고, 모델의 정확도를 계산한다. 이후, 정확도를 XCom에 저장한다.
# DAG 정의
dag = DAG(
'iris_ml_training_pipeline_single_model',
default_args=default_args,
description='A machine learning pipeline using RandomForest on Iris dataset',
schedule_interval='@daily',
catchup=False # 과거 실행 주기를 무시
)
# Task 정의
prepare_data_task = PythonOperator(
task_id='prepare_data',
python_callable=prepare_data,
provide_context=True,
dag=dag,
)
train_model_task = PythonOperator(
task_id='train_random_forest',
python_callable=train_model,
provide_context=True,
dag=dag,
)
evaluate_model_task = PythonOperator(
task_id='evaluate_random_forest',
python_callable=evaluate_model,
provide_context=True,
dag=dag,
)
# Task 의존성 설정
prepare_data_task >> train_model_task >> evaluate_model_task
해당 코드를 작성하고 ui를 보면 이렇게 잘 성공한 걸 확인할 수 있다.
그리고 각 태스크의 로그를 봐도 전부 성공한 걸 확인할 수 있다.
두 번째 태스크의 로그를 보면 모델이 잘 저장된 걸 확인할 수 있고,
세 번째 태스크의 로그를 보면 모델의 정확도도 출력된 걸 확인할 수 있다.
이렇게 tmp 폴더에도 모델이 잘 생성된 걸 확인할 수 있다.
DAG test 5 - 머신러닝 파이프라인 구축 (Multi Model)
이제 여러 머신러닝 모델을 학습하고 성능을 비교해서 가장 성능이 좋은 모델을 선택하는 Airflow DAG를 작성해 보자.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris
import joblib # 모델을 파일로 저장하고 불러오기 위해 사용
default_args = {
'owner': 'admin',
'start_date': datetime(2023, 9, 22),
'retries': 1,
}
# 1. 데이터 준비 함수
def prepare_data(**context):
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = pd.Series(iris.target)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# XCom을 사용하여 데이터를 전달
context['ti'].xcom_push(key='X_train', value=X_train.to_json())
context['ti'].xcom_push(key='X_test', value=X_test.to_json())
context['ti'].xcom_push(key='y_train', value=y_train.to_json(orient='records'))
context['ti'].xcom_push(key='y_test', value=y_test.to_json(orient='records'))
# 2. 모델 학습 함수 (모델을 파일로 저장)
def train_model(model_name, **context):
ti = context['ti']
X_train = pd.read_json(ti.xcom_pull(key='X_train'))
y_train = pd.read_json(ti.xcom_pull(key='y_train'), typ='series')
# 모델 선택
if model_name == 'RandomForest':
model = RandomForestClassifier(n_estimators=100, random_state=42)
elif model_name == 'GradientBoosting':
model = GradientBoostingClassifier(random_state=42)
elif model_name == 'SVM':
model = SVC()
else:
raise ValueError(f"Unsupported model: {model_name}")
# 모델 학습
model.fit(X_train, y_train)
# 모델을 파일로 저장
model_path = f'/tmp/{model_name}_model.pkl'
joblib.dump(model, model_path)
# 모델 파일 경로와 성능 평가를 위한 키를 XCom에 저장
context['ti'].xcom_push(key=f'model_path_{model_name}', value=model_path)
# 3. 모델 평가 함수 (파일에서 모델을 로드하여 평가)
def evaluate_model(model_name, **context):
ti = context['ti']
# XCom에서 모델 파일 경로를 가져와서 로드
model_path = ti.xcom_pull(key=f'model_path_{model_name}')
model = joblib.load(model_path)
X_test = pd.read_json(ti.xcom_pull(key='X_test'))
y_test = pd.read_json(ti.xcom_pull(key='y_test'), typ='series')
# 모델 예측 및 성능 평가
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"{model_name} Model Accuracy: {accuracy}")
# 성능을 XCom에 저장
context['ti'].xcom_push(key=f'performance_{model_name}', value=accuracy)
# 4. 최고 성능 모델 선택
def select_best_model(**context):
ti = context['ti']
# 각 모델의 성능을 XCom에서 가져오기
rf_performance = ti.xcom_pull(key='performance_RandomForest')
gb_performance = ti.xcom_pull(key='performance_GradientBoosting')
svm_performance = ti.xcom_pull(key='performance_SVM')
# 모델 성능 비교
performances = {
'RandomForest': rf_performance,
'GradientBoosting': gb_performance,
'SVM': svm_performance
}
best_model = max(performances, key=performances.get)
best_performance = performances[best_model]
print(f"Best Model: {best_model} with accuracy {best_performance}")
context['ti'].xcom_push(key='best_model', value=best_model)
# DAG 정의
dag = DAG(
'iris_ml_training_pipeline_multiple_models',
default_args=default_args,
description='A machine learning pipeline using multiple models on Iris dataset',
schedule_interval='@daily',
catchup=False # 과거 실행 주기를 무시
)
# Task 정의
prepare_data_task = PythonOperator(
task_id='prepare_data',
python_callable=prepare_data,
provide_context=True,
dag=dag,
)
train_rf_task = PythonOperator(
task_id='train_random_forest',
python_callable=train_model,
op_kwargs={'model_name': 'RandomForest'},
provide_context=True,
dag=dag,
)
train_gb_task = PythonOperator(
task_id='train_gradient_boosting',
python_callable=train_model,
op_kwargs={'model_name': 'GradientBoosting'},
provide_context=True,
dag=dag,
)
train_svm_task = PythonOperator(
task_id='train_svm',
python_callable=train_model,
op_kwargs={'model_name': 'SVM'},
provide_context=True,
dag=dag,
)
evaluate_rf_task = PythonOperator(
task_id='evaluate_random_forest',
python_callable=evaluate_model,
op_kwargs={'model_name': 'RandomForest'},
provide_context=True,
dag=dag,
)
evaluate_gb_task = PythonOperator(
task_id='evaluate_gradient_boosting',
python_callable=evaluate_model,
op_kwargs={'model_name': 'GradientBoosting'},
provide_context=True,
dag=dag,
)
evaluate_svm_task = PythonOperator(
task_id='evaluate_svm',
python_callable=evaluate_model,
op_kwargs={'model_name': 'SVM'},
provide_context=True,
dag=dag,
)
select_best_model_task = PythonOperator(
task_id='select_best_model',
python_callable=select_best_model,
provide_context=True,
dag=dag,
)
# Task 의존성 설정
prepare_data_task >> [train_rf_task, train_gb_task, train_svm_task]
train_rf_task >> evaluate_rf_task
train_gb_task >> evaluate_gb_task
train_svm_task >> evaluate_svm_task
[evaluate_rf_task, evaluate_gb_task, evaluate_svm_task] >> select_best_model_task
- prepare_data_task가 완료된 후, 세 가지 모델 (train_rf_task, train_gb_task, train_svm_task)을 병렬로 학습한다.
- 각 모델 학습이 완료된 후, 개별적으로 평가 (evaluate_rf_task, evaluate_gb_task, evaluate_svm_task)가 이루어진다.
- 모든 모델 평가가 완료되면 select_best_model_task에서 성능을 비교하여 가장 좋은 모델을 선택한다.
이렇게 하고 실행시키면 다음과 같이 best model도 잘 출력되는 걸 확인할 수 있다.
tmp 폴더에도 모델 피클 파일이 잘 생성된 걸 확인할 수 있다.
DAG test 6 - 머신러닝 파이프라인 구축 (MLFlow 적용)
위 과정에서 MLFlow까지 적용해 보자.
아래 코드를 실행하기 전에 로컬에서 MLFlow 서버가 실행 중이어야 한다.
👇 MLFlow 설명은 아래 링크에서 확인해 볼 수 있다!
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from sklearn.datasets import load_iris
import joblib
import os
import mlflow
import mlflow.sklearn # sklearn 모델을 로깅할 때 사용
os.environ['NO_PROXY'] = '*' # mac에서 airflow로 외부 요청할 때 이슈가 있음. 하여 해당 코드 추가 필요
# MLflow 설정 (실제 환경에 맞게 수정)
mlflow.set_tracking_uri("http://host.docker.internal:5000")
mlflow.set_experiment("Iris_Model_Training") # 실험 이름 설정
default_args = {
'owner': 'admin',
'start_date': datetime(2023, 9, 22),
'retries': 1,
}
# 1. 데이터 준비 함수
def prepare_data(**context):
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = pd.Series(iris.target)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# XCom을 사용하여 데이터를 전달
context['ti'].xcom_push(key='X_train', value=X_train.to_json())
context['ti'].xcom_push(key='X_test', value=X_test.to_json())
context['ti'].xcom_push(key='y_train', value=y_train.to_json(orient='records'))
context['ti'].xcom_push(key='y_test', value=y_test.to_json(orient='records'))
# 2. 모델 학습 함수
def train_model(model_name, **context):
ti = context['ti']
X_train = pd.read_json(ti.xcom_pull(key='X_train'))
y_train = pd.read_json(ti.xcom_pull(key='y_train'), typ='series')
# MLflow 실험 실행
with mlflow.start_run(run_name=model_name):
if model_name == 'RandomForest':
model = RandomForestClassifier(n_estimators=100, random_state=42)
elif model_name == 'GradientBoosting':
model = GradientBoostingClassifier(random_state=42)
elif model_name == 'SVM':
model = SVC()
else:
raise ValueError(f"Unsupported model: {model_name}")
# 모델 학습
model.fit(X_train, y_train)
# 모델 로깅
mlflow.sklearn.log_model(model, model_name)
mlflow.log_param("model_name", model_name)
# 모델을 파일로 저장
model_path = f'/tmp/{model_name}_model.pkl'
joblib.dump(model, model_path)
context['ti'].xcom_push(key=f'model_path_{model_name}', value=model_path)
# 3. 모델 평가 함수
def evaluate_model(model_name, **context):
ti = context['ti']
model_path = ti.xcom_pull(key=f'model_path_{model_name}')
model = joblib.load(model_path)
X_test = pd.read_json(ti.xcom_pull(key='X_test'))
y_test = pd.read_json(ti.xcom_pull(key='y_test'), typ='series')
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
# MLflow에 메트릭 로깅
with mlflow.start_run(run_name=model_name):
mlflow.log_metric("accuracy", accuracy)
print(f"{model_name} Model Accuracy: {accuracy}")
context['ti'].xcom_push(key=f'performance_{model_name}', value=accuracy)
# 4. 최고 성능 모델 선택
def select_best_model(**context):
ti = context['ti']
rf_performance = ti.xcom_pull(key='performance_RandomForest')
gb_performance = ti.xcom_pull(key='performance_GradientBoosting')
svm_performance = ti.xcom_pull(key='performance_SVM')
performances = {
'RandomForest': rf_performance,
'GradientBoosting': gb_performance,
'SVM': svm_performance
}
best_model = max(performances, key=performances.get)
best_performance = performances[best_model]
print(f"Best Model: {best_model} with accuracy {best_performance}")
context['ti'].xcom_push(key='best_model', value=best_model)
# 5. Slack 메시지 전송 함수
def send_slack_notification(**context):
ti = context['ti']
best_model = ti.xcom_pull(key='best_model')
rf_performance = ti.xcom_pull(key='performance_RandomForest')
gb_performance = ti.xcom_pull(key='performance_GradientBoosting')
svm_performance = ti.xcom_pull(key='performance_SVM')
message = (f"Best Model: *{best_model}*\n"
f"RandomForest Accuracy: {rf_performance}\n"
f"GradientBoosting Accuracy: {gb_performance}\n"
f"SVM Accuracy: {svm_performance}")
slack_notification = SlackWebhookOperator(
task_id='send_slack_notification_task',
slack_webhook_conn_id='slack_webhook',
message=message,
username='airflow_bot',
dag=context['dag']
)
# Slack 메시지를 실제로 전송
slack_notification.execute(context=context)
# DAG 정의
dag = DAG(
'iris_ml_training_pipeline_with_mlflow',
default_args=default_args,
description='A machine learning pipeline with MLflow logging on Iris dataset',
schedule_interval='@daily',
catchup=False
)
# Task 정의
prepare_data_task = PythonOperator(
task_id='prepare_data',
python_callable=prepare_data,
provide_context=True,
dag=dag,
)
train_rf_task = PythonOperator(
task_id='train_random_forest',
python_callable=train_model,
op_kwargs={'model_name': 'RandomForest'},
provide_context=True,
dag=dag,
)
train_gb_task = PythonOperator(
task_id='train_gradient_boosting',
python_callable=train_model,
op_kwargs={'model_name': 'GradientBoosting'},
provide_context=True,
dag=dag,
)
train_svm_task = PythonOperator(
task_id='train_svm',
python_callable=train_model,
op_kwargs={'model_name': 'SVM'},
provide_context=True,
dag=dag,
)
evaluate_rf_task = PythonOperator(
task_id='evaluate_random_forest',
python_callable=evaluate_model,
op_kwargs={'model_name': 'RandomForest'},
provide_context=True,
dag=dag,
)
evaluate_gb_task = PythonOperator(
task_id='evaluate_gradient_boosting',
python_callable=evaluate_model,
op_kwargs={'model_name': 'GradientBoosting'},
provide_context=True,
dag=dag,
)
evaluate_svm_task = PythonOperator(
task_id='evaluate_svm',
python_callable=evaluate_model,
op_kwargs={'model_name': 'SVM'},
provide_context=True,
dag=dag,
)
select_best_model_task = PythonOperator(
task_id='select_best_model',
python_callable=select_best_model,
provide_context=True,
dag=dag,
)
# Slack 메시지 전송 Task
slack_notification_task = PythonOperator(
task_id='send_slack_notification',
python_callable=send_slack_notification,
provide_context=True,
dag=dag,
)
# Task 의존성 설정
prepare_data_task >> [train_rf_task, train_gb_task, train_svm_task]
train_rf_task >> evaluate_rf_task
train_gb_task >> evaluate_gb_task
train_svm_task >> evaluate_svm_task
[evaluate_rf_task, evaluate_gb_task, evaluate_svm_task] >> select_best_model_task >> slack_notification_task
이렇게 모델을 학습하고 해당 결과를 slack까지 전달하는 코드를 작성해 봤다.
해당 DAG를 실행하면 MLFlow에 잘 추적되는 것도 확인할 수 있다!
그리고 slack에도 모델의 결과 알림이 잘 오는걸 확인할 수 있다.
일단 이렇게 airflow를 사용하는 방법을 익혀봤다. 아직 완전 기초적인 내용뿐이라 그냥 그렇구나 오케이.. 오케이.. 하고 넘어갔지만 나중에 진행할 MLOps 프로젝트에서 이 워크플로우에 대해서 더 깊게 탐구해 봐야겠다.
'MLOps' 카테고리의 다른 글
Docker 기반 Airflow, MLFlow, FastAPI, Streamlit 적용한 MLOps 프로젝트 후기 (5) | 2024.10.12 |
---|---|
Docker로 mlflow 실행할때 OSError: [Errno 30] Read-only file system: '/mlflow' 에러 발생 (5) | 2024.10.09 |
FastAPI와 Docker를 활용한 S3 모델 서빙 및 배포 방법 (5) | 2024.09.24 |
TinyBERT로 감정 분석 모델 학습부터 AWS S3에 모델 업로드 (3) | 2024.09.23 |
MLFlow로 머신러닝 모델 실험 관리 및 Tag로 stage 표시 (3) | 2024.09.22 |