MLOps에서 중요한 workflow management 중에서 Airflow를 실습해 보자.
Airflow 실습 환경 구축
실습을 할 수 있는 환경을 구축할 수 있는 두 가지 방법이 있다.
- pip install을 통해 local에 설정하기
- Docker container를 사용해서 docker image로 실행하기 (독립적인 공간에서 수행)
여기서 당연히 더 확장성이 좋은 docker container기반 환경에서 실습해보려고 한다.
Dockerfile로 Airflow 실행
우선 도커 이미지를 생성하고, 해당 컨테이너를 실행시켜 보자.
도커 파일은 다음과 같다.
FROM python:3.8-slim
# 환경 변수 설정
ENV AIRFLOW_HOME=/usr/local/airflow
# 필요한 시스템 라이브러리 설치
RUN apt-get update && \
apt-get install -y gcc libc-dev vim && \
rm -rf /var/lib/apt/lists/*
# Airflow 설치
RUN pip install 'apache-airflow==2.8.1' \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.8.txt"
# Airflow 환경 설정
RUN mkdir -p $AIRFLOW_HOME
WORKDIR $AIRFLOW_HOME
# Airflow 데이터베이스 초기화
RUN airflow db init
# Airflow DAGs 폴더에 DAG 파일 복사
COPY my_dag.py $AIRFLOW_HOME/dags/
# Airflow 웹 서버 포트 노출
EXPOSE 8080
# 사용자 계정 생성 및 웹 서버/스케줄러 실행
CMD bash -c "airflow db migrate && sleep 5 && \
airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com --password admin && \
airflow webserver -p 8080 & airflow scheduler"
이렇게 Dockerfile을 만들고 위에서 정의한 my_dag.py 파일을 생성해 주자.
vim my_dag.py
import os
print(os)
이렇게 가볍게 만들고 이미지를 빌드해 주자.
docker build -t my_airflow .
🚩 만약 ERROR: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running? 이런 에러가 나오면 Docker app을 실행해주면 된다!
생성된 이미지 목록 확인 명령어는 다음과 같다.
docker images
이렇게 잘 생성된 걸 볼 수 있다. 이제 생성된 이미지를 실행해 보자.
docker run --name my_airflow_container -d -p 8080:8080 my_airflow:latest
실행하고 docker ps로 실행된 컨테이너를 확인해 보면 잘 떠있는 걸 확인할 수 있다.
http://localhost:8080으로 접속하면 airflow 화면이 잘 나온다.
username, password를 입력하면 해당 화면이 잘 나온다.
DAG 실습 1
문장에 있는 각 단어를 프린트해 보는 간단한 DAG로 airflow를 실행해 보자.
아래 DAG는 하루에 한 번씩 실행되는 코드로 작성했다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# DAG 정의
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 11, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'hello_airflow_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# 출력 함수 정의
def print_word(word):
print(word)
# 문장
sentence = "hello airflow dag. fast campus lecture! we can do it."
# 각 단어를 순차적으로 출력하는 Task 생성
prev_task = None
for i, word in enumerate(sentence.split()):
task = PythonOperator(
task_id=f'print_word_{i}',
python_callable=print_word,
op_kwargs={'word': word},
dag=dag,
)
if prev_task:
prev_task >> task
prev_task = task
이렇게 파일을 만들어주고 일단 도커 컨테이너 내부에 접속해 보자
이 안에서 다음 명령어를 치면 사전에 DAG 에러를 확인할 수 있다.
airflow dags reserialize
지금은 우리가 작성한 코드에서 에러가 안 나니 페이지에서 기다리면 이렇게 hello_airflow_dag가 나온다.
들어가서 실행 버튼을 클릭하면 이렇게 잘 실행되는 걸 확인할 수 있다.
첫 번째 task에 들어가서 로그를 보면 hello가 잘 찍힌 걸 볼 수 있고
두 번째 task에 들어가서 보면 airflow가 잘 찍힌 걸 볼 수 있다.
DAG 실습 2
이제 실제 MLOps 환경에서 진행되는 코드를 예시로 들어보자.
이런 플로우를 만들어보자.
플로우는 피처 엔지니어를 진행한 뒤에 하나의 task에서는 randomforest로, 다른 하나는 gradient boosting 기반으로 모델을 돌려서 둘 중 최고 성능의 모델을 선택하는 DAG를 작성해 보자.
물론 실제 현업에서는 이것보다 더 복잡한 구조로 이루어져 있을 텐데 일단 실습 삼아 진행해 보자.
아래 코드는 iris 데이터셋을 받아서 분할하고, XCom을 사용해서 데이터를 저장한다.
💡 XCom이란?
Cross-Communication의 약자로 태스크 간 작은 데이터(데이터, 파라미터 등)를 교환하는 데 사용한다. 데이터는 Airflow의 메타데이터 데이터베이스에 저장되며, 이를 통해 태스크 간 데이터를 공유한다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 11, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'model_training_selection',
default_args=default_args,
description='A simple DAG for model training and selection',
schedule_interval=timedelta(days=1),
)
def feature_engineering(**kwargs):
from sklearn.datasets import load_iris
import pandas as pd
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = pd.Series(iris.target)
# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
# XCom을 사용하여 데이터 저장
ti = kwargs['ti']
ti.xcom_push(key='X_train', value=X_train.to_json())
ti.xcom_push(key='X_test', value=X_test.to_json())
ti.xcom_push(key='y_train', value=y_train.to_json(orient='records'))
ti.xcom_push(key='y_test', value=y_test.to_json(orient='records'))
def train_model(model_name, **kwargs):
ti = kwargs['ti']
X_train = pd.read_json(ti.xcom_pull(key='X_train', task_ids='feature_engineering'))
X_test = pd.read_json(ti.xcom_pull(key='X_test', task_ids='feature_engineering'))
y_train = pd.read_json(ti.xcom_pull(key='y_train', task_ids='feature_engineering'), typ='series')
y_test = pd.read_json(ti.xcom_pull(key='y_test', task_ids='feature_engineering'), typ='series')
if model_name == 'RandomForest':
model = RandomForestClassifier()
elif model_name == 'GradientBoosting':
model = GradientBoostingClassifier()
else:
raise ValueError("Unsupported model: " + model_name)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
performance = accuracy_score(y_test, predictions)
ti.xcom_push(key=f'performance_{model_name}', value=performance)
def select_best_model(**kwargs):
ti = kwargs['ti']
rf_performance = ti.xcom_pull(key='performance_RandomForest', task_ids='train_rf')
gb_performance = ti.xcom_pull(key='performance_GradientBoosting', task_ids='train_gb')
best_model = 'RandomForest' if rf_performance > gb_performance else 'GradientBoosting'
print(f"Best model is {best_model} with performance {max(rf_performance, gb_performance)}")
return best_model
with dag:
t1 = PythonOperator(
task_id='feature_engineering',
python_callable=feature_engineering,
)
t2 = PythonOperator(
task_id='train_rf',
python_callable=train_model,
op_kwargs={'model_name': 'RandomForest'},
provide_context=True,
)
t3 = PythonOperator(
task_id='train_gb',
python_callable=train_model,
op_kwargs={'model_name': 'GradientBoosting'},
provide_context=True,
)
t4 = PythonOperator(
task_id='select_best_model',
python_callable=select_best_model,
provide_context=True,
)
t1 >> [t2, t3] >> t4
이렇게 파일을 만들어주고 저장하고 airflow dags reserialize를 실행해 보자.
그럼 이렇게 No module named 'pandas'라는 에러가 떠있을 거다. 설치해 주면 된다!
pip install pandas scikit-learn
에러를 해결해 주면 이렇게 잘 나오는 걸 볼 수 있다.
실행시켜 주고 성공한 다음 마지막 태스크의 로그를 보면 최고 성능 모델 로그도 잘 찍힌 걸 볼 수 있다.
'MLOps' 카테고리의 다른 글
젠킨스 CI/CD 개요 및 실습 (0) | 2024.11.22 |
---|---|
쿠버네티스 환경 구축 및 주요 구성 (0) | 2024.11.19 |
[MLflow] 로컬 파일 시스템에 기록된 실험 데이터 확인 방법 (0) | 2024.11.02 |
Docker 기반 Airflow, MLFlow, FastAPI, Streamlit 적용한 MLOps 프로젝트 후기 (5) | 2024.10.12 |
Docker로 mlflow 실행할때 OSError: [Errno 30] Read-only file system: '/mlflow' 에러 발생 (5) | 2024.10.09 |