Commit fc694b32 authored by insun park's avatar insun park
Browse files

docs: AI 전문가 과정 강의자료 신규 추가 및 업데이트

parent 6a5f7213
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
# ==============================================================================
# 재학습 트리거 및 조건부 실행 파이프라인 실습 코드
#
# 본 예제는 Kubeflow Pipelines(KFP)를 사용하여 간단한 ML 파이프라인을 구축하고,
# 특정 조건(모델 정확도)에 따라 다음 컴포넌트의 실행 여부를 결정하는 방법을 보여줍니다.
#
# 실제 프로덕션 환경에서는 데이터 변경, 모델 성능 저하 등을 감지하는 모니터링 시스템이
# 이 파이프라인을 주기적으로 또는 이벤트 기반으로 '트리거'하여 모델을 '재학습'하고,
# 성능이 개선되었을 경우에만 자동으로 배포하는 흐름을 자동화할 수 있습니다.
# ==============================================================================
# Component 1: 데이터 전처리 및 모델 학습
# 실제로는 데이터를 불러와 전처리하고 모델을 학습시키는 복잡한 과정이 포함됩니다.
# 여기서는 간단히 학습된 모델의 '정확도'를 반환하는 것으로 시뮬레이션합니다.
@func_to_container_op
def train_model(learning_rate: float) -> float:
"""간단한 모델을 학습하고 정확도를 반환하는 컴포넌트."""
print(f"모델 학습 시작... (learning_rate: {learning_rate})")
# 임의의 모델 정확도 반환 (실제로는 평가 데이터셋으로 측정)
accuracy = 0.95
print(f"모델 학습 완료. 정확도: {accuracy}")
return accuracy
# Component 2: 모델 배포
# 실제로는 학습된 모델을 서빙 환경에 배포하는 과정입니다.
# 여기서는 간단히 배포 메시지를 출력합니다.
@func_to_container_op
def deploy_model(accuracy: float):
"""(시뮬레이션) 학습된 모델을 프로덕션에 배포하는 컴포넌트."""
print(f"배포 시작... 성능 임계값 통과 (정확도: {accuracy})")
print("모델이 성공적으로 배포되었습니다.")
# Pipeline: 전체 ML 워크플로우를 정의합니다.
@dsl.pipeline(
name="Conditional Execution Pipeline",
description="모델 정확도에 따라 배포 여부를 결정하는 파이프라인"
)
def conditional_pipeline(
learning_rate: float = 0.01,
deploy_threshold: float = 0.9
):
"""
ML 파이프라인 정의:
1. 모델을 학습시킵니다.
2. 학습된 모델의 정확도가 임계값(deploy_threshold)보다 높을 경우에만 배포를 진행합니다.
"""
# 1. 모델 학습 컴포넌트 실행
train_task = train_model(learning_rate=learning_rate)
# 2. 조건부 실행: 학습된 모델의 정확도(train_task.output)를 확인
with dsl.Condition(train_task.output > deploy_threshold):
# 정확도가 deploy_threshold 보다 높을 때만 아래 deploy_model 컴포넌트 실행
deploy_model(accuracy=train_task.output)
if __name__ == '__main__':
# 파이프라인을 컴파일하여 YAML 파일로 저장합니다.
# 이 YAML 파일을 Kubeflow UI에 업로드하여 파이프라인을 실행할 수 있습니다.
kfp.compiler.Compiler().compile(
pipeline_func=conditional_pipeline,
package_path='conditional_pipeline.yaml'
)
print("Conditional pipeline compiled to conditional_pipeline.yaml")
\ No newline at end of file
import os
import json
import requests
import pandas as pd
from datetime import datetime
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
# ==============================================================================
# 데이터 드리프트 감지 및 슬랙 알림 시스템 실습 코드
#
# 본 예제는 Evidently AI를 사용하여 데이터 드리프트를 감지하고,
# 드리프트가 확인되면 Slack을 통해 알림을 보내는 과정을 보여줍니다.
#
# 실행 방법:
# 1. `pip install -r requirements.txt` 로 라이브러리를 설치합니다.
# 2. 아래 `SLACK_WEBHOOK_URL`을 실제 Slack Webhook URL로 변경합니다.
# 3. `python data_drift_detection.py`를 실행합니다.
# ==============================================================================
# --- 설정 ---
# TODO: 본인의 Slack Incoming Webhook URL을 입력하세요.
# Slack Webhook URL 생성 가이드: https://api.slack.com/messaging/webhooks
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK_URL")
def send_slack_notification(message: str):
"""Slack으로 알림 메시지를 전송합니다."""
try:
payload = {"text": message}
response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(payload), headers={'Content-Type': 'application/json'})
if response.status_code != 200:
raise ValueError(f"Slack 알림 실패: {response.status_code} {response.text}")
print("✅ Slack 알림을 성공적으로 보냈습니다.")
except Exception as e:
print(f"❌ Slack 알림 전송 중 에러 발생: {e}")
def check_data_drift():
"""
Evidently AI를 사용하여 데이터 드리프트를 감지하고,
결과에 따라 Slack 알림을 트리거합니다.
"""
print("데이터 드리프트 감지를 시작합니다...")
# 1. 데이터 준비 (시뮬레이션)
# 참조 데이터셋 (예: 모델 학습에 사용된 데이터)
reference_data = pd.DataFrame({
'feature1': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'feature2': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
'target': [0, 0, 0, 0, 1, 1, 1, 1, 1, 1]
})
# 현재 데이터셋 (예: 프로덕션에서 수집된 최신 데이터)
# feature1에서 의도적으로 분포 변화(드리프트) 발생시킴
current_data = pd.DataFrame({
'feature1': [5, 6, 7, 8, 9, 10, 11, 12, 13, 14], # 드리프트 발생
'feature2': [12, 22, 31, 42, 53, 61, 72, 81, 93, 101],
'target': [0, 0, 1, 0, 1, 1, 1, 0, 1, 1]
})
print("참조 데이터와 현재 데이터를 준비했습니다.")
# 2. Evidently AI 리포트 생성
data_drift_report = Report(metrics=[DataDriftPreset()])
data_drift_report.run(reference_data=reference_data, current_data=current_data)
# 3. 드리프트 결과 분석
report_dict = data_drift_report.as_dict()
is_drift_detected = report_dict['metrics'][0]['result']['dataset_drift']
number_of_drifted_features = report_dict['metrics'][0]['result']['number_of_drifted_columns']
print(f"📊 드리프트 감지 결과: {is_drift_detected}")
# 4. Slack 알림 전송 (드리프트가 감지된 경우)
if is_drift_detected:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = (
f"🚨 *데이터 드리프트 경고* 🚨\n\n"
f"- *시간*: {timestamp}\n"
f"- *상태*: 데이터 드리프트 감지됨\n"
f"- *드리프트된 피처 수*: {number_of_drifted_features}\n\n"
"상세 분석을 위해 MLOps 대시보드를 확인해주세요."
)
send_slack_notification(message)
else:
print("✅ 데이터 드리프트가 감지되지 않았습니다. 시스템이 안정적입니다.")
if __name__ == '__main__':
if "YOUR/SLACK/WEBHOOK_URL" in SLACK_WEBHOOK_URL:
print("⚠️ 경고: SLACK_WEBHOOK_URL이 설정되지 않았습니다. Slack 알림은 전송되지 않습니다.")
print("스크립트 상단의 URL을 실제 Slack Webhook URL로 변경해주세요.")
check_data_drift()
\ No newline at end of file
import os
import pandas as pd
from datetime import datetime, timedelta
import subprocess
import shutil
# --- 1. 실습 환경 설정 ---
# Feast Feature Repository를 저장할 디렉토리를 생성합니다.
repo_path = "feature_repo"
if os.path.exists(repo_path):
shutil.rmtree(repo_path)
os.makedirs(os.path.join(repo_path, "data"), exist_ok=True)
print(f"'{repo_path}' 디렉토리를 생성했습니다.")
# --- 2. 샘플 데이터 생성 ---
# 오프라인 스토어에 저장될 샘플 데이터를 생성합니다.
# 이 데이터는 운전자별 시간당 주행 통계를 나타냅니다.
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
n_drivers = 10
driver_ids = [1000 + i for i in range(n_drivers)]
data = {
"event_timestamp": [],
"driver_id": [],
"conv_rate": [],
"acc_rate": [],
"avg_daily_trips": [],
}
for driver_id in driver_ids:
current_date = start_date
while current_date < end_date:
data["event_timestamp"].append(current_date)
data["driver_id"].append(driver_id)
data["conv_rate"].append(pd.np.random.uniform(0, 1))
data["acc_rate"].append(pd.np.random.uniform(0, 1))
data["avg_daily_trips"].append(pd.np.random.randint(0, 100))
current_date += timedelta(hours=1)
df = pd.DataFrame(data)
driver_stats_path = os.path.join(repo_path, "data", "driver_stats.parquet")
df.to_parquet(driver_stats_path)
print(f"샘플 데이터 '{driver_stats_path}'를 생성했습니다.")
# --- 3. Feature Repository 정의 ---
# Feast의 핵심 구성 요소(Entity, Feature View, Data Source 등)를 정의하는
# feature_store.yaml 파일과 피처 정의 python 파일을 생성합니다.
feature_store_yaml = f"""
project: my_driver_project
registry: {os.path.join(repo_path, 'data', 'registry.db')}
provider: local
online_store:
type: sqlite
path: {os.path.join(repo_path, 'data', 'online.db')}
"""
with open(os.path.join(repo_path, "feature_store.yaml"), "w") as f:
f.write(feature_store_yaml)
print(f"'{os.path.join(repo_path, 'feature_store.yaml')}' 파일을 생성했습니다.")
driver_repo_py = f"""
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64
# 운전자 ID를 Entity로 정의합니다.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
# 오프라인 데이터 소스를 정의합니다.
driver_stats_source = FileSource(
path="{driver_stats_path}",
event_timestamp_column="event_timestamp",
)
# Feature View를 정의하여 피처들을 그룹화하고 데이터 소스에 연결합니다.
driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
tags={{"team": "driver_performance"}},
)
"""
with open(os.path.join(repo_path, "driver_repo.py"), "w") as f:
f.write(driver_repo_py)
print(f"'{os.path.join(repo_path, 'driver_repo.py')}' 파일을 생성했습니다.")
# --- 4. 피처 등록 및 구체화 ---
# 'feast apply' 명령어를 실행하여 Feature Repository에 피처 정의를 등록(등록)합니다.
# 'feast materialize' 명령어로 오프라인 스토어의 데이터를 온라인 스토어로 로딩합니다.
print("\n--- Feast Apply 실행 ---")
subprocess.run(["feast", "apply"], cwd=repo_path, check=True)
print("\n--- Feast Materialize 실행 ---")
materialize_end_date = datetime.now()
materialize_start_date = materialize_end_date - timedelta(days=7)
subprocess.run([
"feast", "materialize",
materialize_end_date.strftime("%Y-%m-%d"),
materialize_start_date.strftime("%Y-%m-%d")
], cwd=repo_path, check=True)
# --- 5. 학습 데이터셋 생성 및 모델 학습 ---
# get_historical_features()를 사용하여 과거 시점의 피처들을 가져와 학습 데이터셋을 만듭니다.
print("\n--- 학습 데이터셋 생성 ---")
from feast import FeatureStore
from sklearn.linear_model import LinearRegression
# FeatureStore 객체를 초기화합니다.
store = FeatureStore(repo_path=repo_path)
# 학습에 사용할 Entity와 타임스탬프를 정의합니다.
entity_df = pd.DataFrame(
{
"event_timestamp": [
end_date - timedelta(days=1),
end_date - timedelta(hours=12),
end_date - timedelta(hours=6),
] * n_drivers,
"driver_id": sorted(driver_ids * 3),
}
)
# Feast를 통해 학습 데이터를 가져옵니다.
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
).to_df()
print("생성된 학습 데이터셋:")
print(training_df.head())
# 가상의 타겟 변수(label)를 생성합니다.
training_df["label_driver_good"] = (
training_df["conv_rate"] + training_df["acc_rate"] / 1000
) > 0.8
# 모델을 학습시킵니다.
features = ["conv_rate", "acc_rate", "avg_daily_trips"]
target = "label_driver_good"
reg = LinearRegression()
reg.fit(training_df[features], training_df[target])
print("\n간단한 선형 회귀 모델을 학습시켰습니다.")
# --- 6. 실시간 추론을 위한 온라인 피처 조회 ---
# get_online_features()를 사용하여 실시간 예측에 필요한 최신 피처를 조회합니다.
print("\n--- 온라인 피처 조회 및 실시간 예측 ---")
feature_vector = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
).to_dict()
online_features_df = pd.DataFrame.from_dict(feature_vector)
print("온라인 스토어에서 조회한 피처:")
print(online_features_df)
# 온라인 피처를 사용하여 예측을 수행합니다.
predictions = reg.predict(online_features_df[features])
print(f"\n실시간 예측 결과 (1001, 1002 드라이버): {predictions}")
# --- 7. 정리 ---
# 실습을 위해 생성했던 디렉토리를 삭제합니다.
# cleanup = input("생성된 feature_repo 디렉토리를 삭제하시겠습니까? (y/n): ")
# if cleanup.lower() == 'y':
# shutil.rmtree(repo_path)
# print(f"'{repo_path}' 디렉토리를 삭제했습니다.")
print(f"\n실습 완료! '{repo_path}'에 생성된 파일들을 확인해보세요.")
print(f"정리를 원하시면 'rm -rf {repo_path}' 명령어를 실행하세요.")
\ No newline at end of file
# 데이터 드리프트 감지 및 슬랙 알림 시스템 실습 코드용 라이브러리
pandas~=2.0.0
requests~=2.31.0
evidently~=0.4.0
# Feature Store 실습 코드용 라이브러리
feast[sqlite]~=0.36.0
scikit-learn~=1.3.0
\ No newline at end of file
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
import os
def get_model_size(model):
"""모델의 메모리 크기를 MB 단위로 계산합니다."""
param_size = 0
for param in model.parameters():
param_size += param.nelement() * param.element_size()
buffer_size = 0
for buffer in model.buffers():
buffer_size += buffer.nelement() * buffer.element_size()
size_all_mb = (param_size + buffer_size) / 1024**2
return size_all_mb
def main():
"""
Hugging Face의 사전 학습된 BERT 모델을 로드하고, 8-bit 양자화를 적용한 후
원본 모델과 양자화된 모델의 크기를 비교하고 추론 결과를 확인하는 스크립트입니다.
"""
# 사용할 모델과 디바이스 설정
model_id = "bert-base-uncased"
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"사용할 모델: {model_id}")
print(f"사용할 디바이스: {device}")
if device == "cpu":
print("\n경고: CUDA를 사용할 수 없어 CPU에서 실행합니다. bitsandbytes 양자화는 CUDA GPU에서만 효과가 있습니다.")
print("이 스크립트는 CUDA 환경에서 실행하는 것을 권장합니다.")
# --- 1. 원본 FP32 모델 로드 및 크기 확인 ---
print("\n--- 1. 원본 FP32 모델 로드 중... ---")
tokenizer = AutoTokenizer.from_pretrained(model_id)
fp32_model = AutoModelForSequenceClassification.from_pretrained(model_id)
# 모델을 디바이스로 이동
fp32_model.to(device)
fp32_model_size = get_model_size(fp32_model)
print(f"FP32 모델 크기: {fp32_model_size:.2f} MB")
# --- 2. 8-bit 양자화 모델 로드 및 크기 확인 ---
# `load_in_8bit=True` 옵션을 사용하여 모델을 8-bit로 양자화합니다.
# `device_map="auto"`는 모델 레이어를 사용 가능한 디바이스(GPU, CPU, RAM)에 자동으로 분배합니다.
print("\n--- 2. 8-bit 양자화 모델 로드 중... ---")
try:
int8_model = AutoModelForSequenceClassification.from_pretrained(
model_id,
load_in_8bit=True,
device_map="auto"
)
int8_model_size = get_model_size(int8_model)
print(f"INT8 모델 크기: {int8_model_size:.2f} MB")
size_reduction = 100 * (1 - int8_model_size / fp32_model_size)
print(f"\n메모리 사용량 감소율: {size_reduction:.2f}%")
except Exception as e:
print(f"8-bit 모델 로드 중 오류 발생: {e}")
print("bitsandbytes가 올바르게 설치되었는지, CUDA 환경이 맞는지 확인하세요.")
int8_model = None
# --- 3. 추론 파이프라인으로 결과 비교 ---
print("\n--- 3. 추론 결과 비교 ---")
text = "This is a great movie, I really enjoyed it!"
print(f"입력 텍스트: '{text}'")
# FP32 모델 추론
print("\nFP32 모델 추론 결과:")
classifier_fp32 = pipeline("text-classification", model=fp32_model, tokenizer=tokenizer, device=0 if device=="cuda" else -1)
result_fp32 = classifier_fp32(text)
print(result_fp32)
# INT8 모델 추론
if int8_model:
print("\nINT8 모델 추론 결과:")
# 양자화된 모델은 device_map을 통해 이미 GPU에 할당되어 있으므로 device=-1로 설정해도 GPU에서 실행됩니다.
classifier_int8 = pipeline("text-classification", model=int8_model, tokenizer=tokenizer)
result_int8 = classifier_int8(text)
print(result_int8)
print("\n실습 완료!")
print("양자화를 통해 모델 크기가 크게 줄어들면서도 추론 결과는 유사하게 유지되는 것을 확인할 수 있습니다.")
if __name__ == "__main__":
main()
\ No newline at end of file
# BERT 모델 8-bit 양자화 실습 코드용 라이브러리
torch~=2.1.0
transformers~=4.35.0
datasets~=2.15.0
accelerate~=0.24.0
bitsandbytes~=0.41.0
# Triton 마이그레이션 및 벤치마킹 실습 코드용 라이브러리
scikit-learn~=1.3.0
fastapi~=0.104.0
uvicorn~=0.24.0
onnx~=1.15.0
skl2onnx~=1.16.0
tritonclient[http]~=2.39.0
numpy~=1.26.0
\ No newline at end of file
import os
import shutil
import time
import threading
import numpy as np
import requests
import uvicorn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
import tritonclient.http as httpclient
# --- 1. 기본 설정 ---
MODEL_DIR = "triton_model_repo"
MODEL_NAME = "sklearn_rf"
MODEL_VERSION = "1"
FASTAPI_PORT = 8001
TRITON_PORT = 8000
def prepare_model_and_repo():
"""
Scikit-learn 모델을 학습하고, ONNX로 변환한 후,
Triton 서버가 사용할 수 있는 모델 리포지토리 구조를 생성합니다.
"""
print("--- 1. 모델 학습 및 ONNX 변환 시작 ---")
# 간단한 분류 모델 학습
X, y = make_classification(n_samples=1000, n_features=10, n_classes=2, random_state=42)
model = RandomForestClassifier(random_state=42)
model.fit(X, y)
print("RandomForestClassifier 모델 학습 완료.")
# ONNX 변환을 위한 초기 타입 정의
initial_type = [("float_input", FloatTensorType([None, X.shape[1]]))]
onnx_model = convert_sklearn(model, initial_types=initial_type)
print("모델을 ONNX 형식으로 변환 완료.")
# Triton 모델 리포지토리 생성
model_repo_path = os.path.join(MODEL_DIR, MODEL_NAME, MODEL_VERSION)
if os.path.exists(MODEL_DIR):
shutil.rmtree(MODEL_DIR)
os.makedirs(model_repo_path)
# ONNX 모델 저장
onnx_model_path = os.path.join(model_repo_path, "model.onnx")
with open(onnx_model_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"ONNX 모델을 '{onnx_model_path}'에 저장했습니다.")
# Triton 설정 파일(config.pbtxt) 생성
config_pbtxt = f"""
name: "{MODEL_NAME}"
platform: "onnxruntime_onnx"
input [
{{
name: "float_input"
data_type: TYPE_FP32
dims: [ -1, {X.shape[1]} ]
}}
]
output [
{{
name: "output_label"
data_type: TYPE_INT64
dims: [ -1 ]
}},
{{
name: "output_probability"
data_type: TYPE_FP32
dims: [ -1, 2 ]
}}
]
dynamic_batching {{
preferred_batch_size: [4, 8]
max_queue_delay_microseconds: 100
}}
"""
config_path = os.path.join(MODEL_DIR, MODEL_NAME, "config.pbtxt")
with open(config_path, "w") as f:
f.write(config_pbtxt)
print(f"Triton 설정 파일을 '{config_path}'에 생성했습니다.")
return X[0].astype(np.float32) # 벤치마킹에 사용할 샘플 데이터 반환
def run_fastapi_server(stop_event):
"""
간단한 FastAPI 서버를 백그라운드 스레드에서 실행합니다.
Triton과 동일한 ONNX 모델을 서빙합니다. (실제로는 onnxruntime 사용 필요)
여기서는 간단히 scikit-learn 모델을 직접 사용합니다.
"""
from fastapi import FastAPI
from pydantic import BaseModel
# 모델 재학습 (간단한 예제를 위해)
X, y = make_classification(n_samples=1000, n_features=10, n_classes=2, random_state=42)
model = RandomForestClassifier(random_state=42)
model.fit(X, y)
app = FastAPI()
class Item(BaseModel):
data: list
@app.post("/predict")
def predict(item: Item):
data = np.array(item.data).reshape(1, -1)
prediction = model.predict(data)
return {"prediction": prediction.tolist()}
config = uvicorn.Config(app, host="0.0.0.0", port=FASTAPI_PORT, log_level="info")
server = uvicorn.Server(config)
# stop_event가 설정될 때까지 서버 실행
server.run()
# while not stop_event.is_set():
# # uvicorn.run()은 블로킹 함수이므로, 실제 백그라운드 실행을 위해서는
# # 별도 프로세스나 더 정교한 스레딩 제어가 필요합니다.
# # 이 예제에서는 시연을 위해 메인 스레드에서 제어합니다.
# pass
def benchmark_server(name, url, sample_data, n_requests=100):
"""서버 성능을 벤치마킹합니다."""
print(f"\n--- {name} 벤치마킹 시작 (요청 {n_requests}회) ---")
# Latency 측정 (순차 요청)
latencies = []
start_time = time.time()
for _ in range(n_requests):
req_start_time = time.time()
if "Triton" in name:
triton_client = httpclient.InferenceServerClient(url=url)
inputs = [httpclient.InferInput("float_input", [1, 10], "FP32")]
inputs[0].set_data_from_numpy(sample_data.reshape(1, -1), binary_data=True)
triton_client.infer(model_name=MODEL_NAME, inputs=inputs)
else:
requests.post(url, json={"data": sample_data.tolist()})
latencies.append(time.time() - req_start_time)
total_time = time.time() - start_time
avg_latency = np.mean(latencies) * 1000 # ms
throughput = n_requests / total_time # rps
print(f"평균 지연 시간 (Avg. Latency): {avg_latency:.2f} ms")
print(f"처리량 (Throughput): {throughput:.2f} req/s")
return avg_latency, throughput
def main():
"""메인 실행 함수"""
sample_data = prepare_model_and_repo()
print("\n--- 2. 서버 실행 준비 ---")
print(f"FastAPI 서버가 포트 {FASTAPI_PORT}에서 실행될 예정입니다.")
print("Triton 서버를 별도의 터미널에서 실행해주세요.")
print("\n" + "="*50)
print(" 다음 Docker 명령어를 사용하여 Triton 서버를 실행하세요:")
print(f" docker run --rm -p {TRITON_PORT}:8000 -p 8001:8001 -p 8002:8002 \\")
print(f" -v {os.getcwd()}/{MODEL_DIR}:/models nvcr.io/nvidia/tritonserver:23.10-py3 tritonserver --model-repository=/models")
print("="*50 + "\n")
input("Triton 서버가 실행되면 Enter 키를 눌러 벤치마킹을 시작하세요...")
# FastAPI 서버를 백그라운드 스레드에서 실행
# stop_event = threading.Event()
# fastapi_thread = threading.Thread(target=run_fastapi_server, args=(stop_event,))
# fastapi_thread.start()
# time.sleep(5) # 서버가 시작될 때까지 대기
print("\n[알림] 이 스크립트는 FastAPI 서버를 직접 실행하지 않습니다.")
print("별도 터미널에서 'python -m uvicorn fastapi_server:app --port 8001'과 같이 실행하거나,")
print("아래 코드를 활성화하여 스레드 기반으로 실행할 수 있습니다.")
print("이 데모에서는 Triton과의 비교에 집중합니다.")
# 벤치마킹 실행
triton_latency, triton_throughput = benchmark_server(
"NVIDIA Triton Server", f"localhost:{TRITON_PORT}", sample_data
)
# fastapi_latency, fastapi_throughput = benchmark_server(
# "FastAPI Server", f"http://localhost:{FASTAPI_PORT}/predict", sample_data
# )
print("\n\n--- 벤치마킹 결과 요약 ---")
print(f"| {'Server':<20} | {'Avg. Latency (ms)':<20} | {'Throughput (req/s)':<20} |")
print(f"| {'-'*20} | {'-'*20} | {'-'*20} |")
# print(f"| {'FastAPI Server':<20} | {fastapi_latency:<20.2f} | {fastapi_throughput:<20.2f} |")
print(f"| {'NVIDIA Triton Server':<20} | {triton_latency:<20.2f} | {triton_throughput:<20.2f} |")
print("\n[참고] FastAPI 벤치마크를 위해서는 위 코드의 주석을 해제하고,")
print("`run_fastapi_server` 함수 및 관련 스레드 코드를 활성화해야 합니다.")
# FastAPI 서버 종료
# stop_event.set()
# fastapi_thread.join()
print("\n실습 완료!")
if __name__ == "__main__":
main()
\ No newline at end of file
import os
import faiss
from dotenv import load_dotenv
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.docstore.document import Document
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.chains import LLMChain
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from sentence_transformers import CrossEncoder
import numpy as np
def setup_environment():
"""환경 변수를 로드하고 OpenAI API 키를 설정합니다."""
load_dotenv()
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY가 설정되지 않았습니다. .env 파일을 확인하세요.")
print("API 키 로드 완료.")
def prepare_documents():
"""지식 베이스로 사용할 샘플 문서를 준비합니다."""
return [
Document(page_content="LangChain은 LLM을 활용한 애플리케이션 개발을 돕는 프레임워크입니다. 다양한 모듈을 제공합니다.", metadata={"doc_id": 1}),
Document(page_content="RAG는 '검색 증강 생성'의 약자로, LLM이 외부 지식 베이스를 참조하여 답변을 생성하는 기술입니다.", metadata={"doc_id": 2}),
Document(page_content="고급 RAG 기법에는 쿼리를 여러 개로 변환하는 Multi-Query와 검색된 문서 순위를 재조정하는 Re-ranking이 있습니다.", metadata={"doc_id": 3}),
Document(page_content="Re-ranking에는 Bi-Encoder로 1차 검색 후, Cross-Encoder로 정교하게 순위를 다시 매기는 방식이 효과적입니다.", metadata={"doc_id": 4}),
Document(page_content="쿼리 변환(Query Transformation)은 사용자의 모호한 질문을 명확하게 만들어 검색 성능을 높이는 것을 목표로 합니다.", metadata={"doc_id": 5}),
Document(page_content="BM25는 키워드 기반 검색 알고리즘으로, 벡터 검색과 함께 사용하면(하이브리드 검색) 성능을 보완할 수 있습니다.", metadata={"doc_id": 6})
]
def build_hybrid_retriever(docs, embeddings_model):
"""키워드 검색(BM25)과 벡터 검색(FAISS)을 결합한 하이브리드 리트리버를 구축합니다."""
# 1. FAISS 벡터 스토어 및 리트리버
faiss_vectorstore = FAISS.from_documents(docs, embeddings_model)
faiss_retriever = faiss_vectorstore.as_retriever(search_kwargs={"k": 5})
# 2. BM25 리트리버
bm25_retriever = BM25Retriever.from_documents(docs)
bm25_retriever.k = 5
# 3. 앙상블 리트리버 (두 리트리버 결합)
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, faiss_retriever], weights=[0.5, 0.5]
)
return ensemble_retriever
def get_query_transformation_chain(llm):
"""사용자의 질문을 여러 관점의 질문 3개로 변환하는 체인을 생성합니다."""
prompt = PromptTemplate(
input_variables=["question"],
template="""당신은 AI 언어 모델입니다. 사용자의 질문을 검색에 더 효과적인 3개의 다른 버전으로 다시 작성해주세요.
다양한 관점에서 질문을 재구성해야 합니다. 각 질문은 한 줄로 구분해주세요.
원본 질문: {question}"""
)
return LLMChain(llm=llm, prompt=prompt, output_key="queries")
def rerank_documents(question, retrieved_docs):
"""CrossEncoder를 사용하여 검색된 문서들의 순위를 재조정합니다."""
model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', max_length=512)
pairs = [(question, doc.page_content) for doc in retrieved_docs]
scores = model.predict(pairs)
# 점수가 높은 순으로 정렬
sorted_docs = sorted(zip(scores, retrieved_docs), key=lambda x: x[0], reverse=True)
return [doc for score, doc in sorted_docs]
def format_docs(docs):
"""검색된 문서들을 LLM 프롬프트에 넣기 좋은 형태로 포맷팅합니다."""
return "\n\n".join(doc.page_content for doc in docs)
def main():
"""고급 RAG 파이프라인을 시연하는 메인 함수"""
setup_environment()
docs = prepare_documents()
llm = ChatOpenAI(temperature=0)
embeddings = HuggingFaceEmbeddings(model_name="jhgan/ko-sbert-nli")
# 1. 하이브리드 리트리버 구축
hybrid_retriever = build_hybrid_retriever(docs, embeddings)
# 2. 쿼리 변환 체인 준비
query_transformer_chain = get_query_transformation_chain(llm)
# 3. 최종 답변 생성을 위한 프롬프트 및 체인
template = """당신은 주어진 컨텍스트를 바탕으로 질문에 답변하는 AI 어시스턴트입니다.
컨텍스트를 벗어난 답변은 하지 마세요.
컨텍스트:
{context}
질문:
{question}
답변:
"""
prompt = PromptTemplate.from_template(template)
final_rag_chain = (
RunnablePassthrough.assign(context=(lambda x: format_docs(x["documents"])))
| prompt
| llm
| StrOutputParser()
)
# --- 고급 RAG 파이프라인 실행 ---
question = "RAG의 검색 성능을 어떻게 높일 수 있을까?"
print(f"\n원본 질문: {question}")
# 쿼리 변환
transformed_queries_result = query_transformer_chain.invoke({"question": question})
transformed_queries = transformed_queries_result['queries'].strip().split('\n')
print(f"\n변환된 쿼리:\n{transformed_queries_result['queries']}")
# 변환된 각 쿼리로 문서 검색 및 종합
all_retrieved_docs = []
for q in transformed_queries:
all_retrieved_docs.extend(hybrid_retriever.invoke(q))
# 중복 제거
unique_docs = {doc.metadata['doc_id']: doc for doc in all_retrieved_docs}.values()
print(f"\n1차 검색된 문서 (중복 제거 전 {len(all_retrieved_docs)}개, 후 {len(unique_docs)}개)")
# 재순위화
reranked_docs = rerank_documents(question, list(unique_docs))
top_k_reranked = reranked_docs[:3] # 상위 3개만 사용
print("\n재순위화 후 선택된 최종 문서:")
for doc in top_k_reranked:
print(f"- {doc.page_content} (Doc ID: {doc.metadata['doc_id']})")
# 최종 답변 생성
print("\n--- 최종 답변 생성 ---")
answer = final_rag_chain.invoke({"documents": top_k_reranked, "question": question})
print(answer)
print("\n\n--- 비교: 기본 RAG 파이프라인 ---")
basic_retrieved_docs = hybrid_retriever.invoke(question)
basic_answer = final_rag_chain.invoke({"documents": basic_retrieved_docs, "question": question})
print("기본 RAG로 검색된 문서:")
for doc in basic_retrieved_docs:
print(f"- {doc.page_content} (Doc ID: {doc.metadata['doc_id']})")
print("\n기본 RAG 답변:")
print(basic_answer)
print("\n실습 완료!")
if __name__ == "__main__":
main()
\ No newline at end of file
import os
from typing import TypedDict, Annotated, Sequence
import operator
from dotenv import load_dotenv
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor, ToolInvocation
# --- 1. 환경 설정 및 도구 정의 ---
load_dotenv()
# .env 파일에 TAVILY_API_KEY 설정 필요
tavily_tool = TavilySearchResults(max_results=3)
tools = [tavily_tool]
tool_executor = ToolExecutor(tools)
# --- 2. 에이전트 상태(State) 정의 ---
# 그래프의 모든 노드가 공유하는 상태 객체
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
# 각 노드에서 다음 노드로 전달될 메시지 목록
# `operator.add`는 새 메시지가 기존 목록에 추가되도록 함
# --- 3. 에이전트 노드(Node) 생성 ---
# LLM 모델 정의
model = ChatOpenAI(temperature=0, streaming=True)
def create_agent_node(llm, system_message: str):
"""
주어진 시스템 메시지를 따르는 에이전트 노드를 생성하는 팩토리 함수.
이 노드는 LLM을 호출하여 응답(액션 또는 최종 답변)을 결정합니다.
"""
prompt = [
("system", system_message),
("placeholder", "{messages}"),
]
agent_runnable = ChatOpenAI(model="gpt-4-turbo", temperature=0).bind_tools(tools)
def agent_node(state: AgentState):
result = agent_runnable.invoke(state["messages"])
return {"messages": [result]}
return agent_node
def create_tool_node():
"""도구를 실행하는 노드를 생성합니다."""
def tool_node(state: AgentState):
tool_invocations = []
last_message = state["messages"][-1]
for tool_call in last_message.tool_calls:
tool_invocations.append(ToolInvocation(tool=tool_call["name"], tool_input=tool_call["args"]))
responses = tool_executor.batch(tool_invocations, return_exceptions=True)
# 응답을 BaseMessage 형태로 변환
from langchain_core.messages import ToolMessage
tool_messages = [
ToolMessage(content=str(response), tool_call_id=inv.tool_call_id)
for inv, response in zip(last_message.tool_calls, responses)
]
return {"messages": tool_messages}
return tool_node
# --- 4. 조건부 엣지(Conditional Edge)를 위한 함수 ---
def should_continue(state: AgentState):
"""
마지막 메시지에 tool_calls가 있으면 'continue' (도구 실행),
없으면 'end' (종료)를 반환하여 워크플로우를 분기합니다.
"""
if "tool_calls" in state["messages"][-1].additional_kwargs:
return "continue"
return "end"
# --- 5. 그래프 구축 ---
# 각 역할을 수행할 에이전트 정의
researcher_node = create_agent_node(
model,
"당신은 전문 리서처입니다. 주어진 주제에 대해 신뢰할 수 있는 출처를 바탕으로 상세하고 깊이 있는 정보를 찾아 정리해주세요."
)
blogger_node = create_agent_node(
model,
"당신은 IT 전문 블로거입니다. 주어진 리서치 결과를 바탕으로 독자들이 이해하기 쉽고 흥미를 느낄만한 블로그 포스트 초안을 작성해주세요."
)
critic_node = create_agent_node(
model,
"당신은 날카로운 비평가입니다. 작성된 블로그 초안을 검토하고, 내용의 정확성, 논리적 흐름, 표현의 명확성 등을 평가하여 구체적인 수정 제안을 해주세요."
)
tool_node = create_tool_node()
# StateGraph를 사용하여 워크플로우 정의
workflow = StateGraph(AgentState)
workflow.add_node("Researcher", researcher_node)
workflow.add_node("Blogger", blogger_node)
workflow.add_node("Critic", critic_node)
workflow.add_node("call_tool", tool_node)
# 각 노드 간의 흐름(엣지) 정의
workflow.set_entry_point("Researcher") # 시작점은 Researcher
workflow.add_edge("Researcher", "Blogger")
workflow.add_edge("Blogger", "Critic")
# 조건부 엣지: 비평 결과에 따라 흐름 분기
def after_critic(state: AgentState):
last_message = state["messages"][-1].content
if "수정할 필요 없이 훌륭합니다" in last_message:
return "end"
else:
# 피드백을 다시 Researcher에게 전달 (실제 구현에서는 더 정교한 로직 필요)
return "continue_research"
workflow.add_conditional_edges(
"Critic",
after_critic,
{
"end": END,
"continue_research": "Researcher" # 피드백을 바탕으로 리서치 재시작
}
)
# 도구 사용 후에는 다시 원래 노드로 돌아가야 함 (이 예제에서는 Researcher만 도구 사용 가능)
workflow.add_conditional_edges(
"Researcher",
should_continue,
{
"continue": "call_tool",
"end": "Blogger" # Researcher의 최종 결과물을 Blogger에게 전달
}
)
workflow.add_edge("call_tool", "Researcher")
# --- 6. 그래프 컴파일 및 실행 ---
# 그래프 컴파일
app = workflow.compile()
# 그래프 시각화 (pygraphviz 또는 pydot 필요)
try:
image_data = app.get_graph().draw_png()
with open("autonomous_agent_workflow.png", "wb") as f:
f.write(image_data)
print("워크플로우 그래프를 'autonomous_agent_workflow.png' 파일로 저장했습니다.")
except ImportError:
print("Graphviz가 설치되지 않아 그래프를 시각화할 수 없습니다.")
print("시각화를 원하시면 'pip install pygraphviz' 또는 'pip install pydot'을 실행하세요.")
# 실행
topic = "최신 AI 에이전트 기술 동향과 LangGraph의 역할"
initial_messages = [HumanMessage(content=f"주제: '{topic}'에 대한 리서치를 시작해주세요.")]
# 스트리밍 출력을 위해 invoke 대신 stream 사용
for s in app.stream({"messages": initial_messages}):
print(s)
print("---")
\ No newline at end of file
import os
from dotenv import load_dotenv
# advanced_rag_chatbot.py 스크립트의 main 함수를 가져옵니다.
# (실제 환경에서는 모듈로 분리하여 import하는 것이 좋습니다)
from autonomous_agent import main as run_advanced_rag_agent
def setup_langsmith_environment():
"""
LangSmith 연동을 위한 환경 변수를 설정하고 사용자에게 안내합니다.
.env 파일에 다음 변수들이 설정되어 있어야 합니다.
LANGCHAIN_TRACING_V2="true"
LANGCHAIN_ENDPOINT="https://api.smith.langchain.com"
LANGCHAIN_API_KEY="YOUR_LANGSMITH_API_KEY"
LANGCHAIN_PROJECT="YOUR_PROJECT_NAME" (예: "Advanced RAG Monitoring")
"""
load_dotenv()
required_vars = [
"LANGCHAIN_TRACING_V2",
"LANGCHAIN_ENDPOINT",
"LANGCHAIN_API_KEY",
"LANGCHAIN_PROJECT",
"OPENAI_API_KEY", # RAG 에이전트 실행에 필요
"TAVILY_API_KEY", # RAG 에이전트 실행에 필요
]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"다음 환경 변수가 .env 파일에 설정되지 않았습니다: {', '.join(missing_vars)}")
if os.getenv("LANGCHAIN_TRACING_V2") != "true":
print("경고: LANGCHAIN_TRACING_V2가 'true'로 설정되지 않았습니다. LangSmith 추적이 활성화되지 않습니다.")
print("LangSmith 환경 변수 설정이 확인되었습니다.")
print(f"프로젝트 '{os.getenv('LANGCHAIN_PROJECT')}' 에 실행 결과가 기록됩니다.")
def main():
"""
LangSmith 환경 설정을 확인하고, 고급 RAG 에이전트를 실행하여
그 과정을 LangSmith으로 추적하는 실습을 진행합니다.
"""
print("--- LangSmith LLM 앱 추적 및 디버깅 실습 ---")
try:
setup_langsmith_environment()
except ValueError as e:
print(f"오류: {e}")
print("\n실습을 진행하기 전에 .env 파일에 필요한 모든 키를 설정해주세요.")
print("LangSmith 가입 및 API 키 발급은 https://smith.langchain.com/ 를 참고하세요.")
return
print("\n이제 'autonomous_agent.py'의 자율 에이전트를 실행합니다.")
print("실행이 완료된 후, LangSmith 프로젝트 페이지에서 전체 실행 과정을 추적할 수 있습니다.")
print("="*50)
try:
# autonomous_agent.py의 main 함수를 실행
run_advanced_rag_agent()
project_name = os.getenv('LANGCHAIN_PROJECT')
print("\n" + "="*50)
print("에이전트 실행이 완료되었습니다.")
print("LangSmith 대시보드에서 결과를 확인하세요.")
print(f"프로젝트 URL (예상): https://smith.langchain.com/o/YOUR_ORGANIZATION/projects/p/{project_name}")
except Exception as e:
print(f"\n에이전트 실행 중 오류가 발생했습니다: {e}")
print("API 키가 유효한지, 관련 라이브러리가 모두 설치되었는지 확인해주세요.")
if __name__ == "__main__":
main()
\ No newline at end of file
# 고급 RAG 기반 Q&A 챗봇 실습 코드용 라이브러리
langchain~=0.1.16
langchain-community~=0.0.32
langchain-openai~=0.1.3
faiss-cpu~=1.8.0
sentence-transformers~=2.6.1
unstructured~=0.13.2
python-dotenv~=1.0.1
openai~=1.16.2
# 자율 AI 에이전트 실습 코드용 라이브러리
langgraph~=0.0.30
tavily-python~=0.3.0
pygraphviz~=1.12
pydot~=1.4.2
\ No newline at end of file
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score
from fairlearn.datasets import fetch_adult
from fairlearn.metrics import MetricFrame, demographic_parity_difference, equalized_odds_difference
from fairlearn.postprocessing import ThresholdOptimizer
def run():
"""
Fairlearn을 사용하여 성인 소득 데이터셋의 성별 편향성을 탐지하고 완화하는 예제입니다.
1. 데이터 로드 및 전처리
2. 기준 모델 학습 및 성능/편향성 평가
3. Fairlearn의 ThresholdOptimizer를 사용한 편향성 완화
4. 완화 전후의 성능 및 편향성 지표 비교
"""
# 1. 데이터 로드 및 전처리
data = fetch_adult(as_frame=True)
X_raw = data.data
y = (data.target == '>50K').astype(int)
# 민감한 특성(성별) 분리
sensitive_features = X_raw['sex']
# 범주형 변수 인코딩 및 데이터 스케일링
X = X_raw.drop(columns=['sex'])
X = pd.get_dummies(X, drop_first=True)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = pd.DataFrame(X_scaled, columns=X.columns)
X_train, X_test, y_train, y_test, sf_train, sf_test = train_test_split(
X_scaled, y, sensitive_features, test_size=0.2, random_state=42, stratify=y
)
# 2. 기준 모델 학습 및 평가
baseline_model = LogisticRegression(solver='liblinear', random_state=42)
baseline_model.fit(X_train, y_train)
y_pred_baseline = baseline_model.predict(X_test)
# MetricFrame을 사용한 평가
metrics = {
'accuracy': accuracy_score,
'demographic_parity_difference': demographic_parity_difference,
'equalized_odds_difference': equalized_odds_difference
}
grouped_on_sex = MetricFrame(metrics=metrics,
y_true=y_test,
y_pred=y_pred_baseline,
sensitive_features=sf_test)
print("--- 기준 모델 평가 결과 ---")
print(grouped_on_sex.overall)
print("\n--- 성별에 따른 그룹별 평가 결과 ---")
print(grouped_on_sex.by_group)
# 3. Fairlearn ThresholdOptimizer를 사용한 편향성 완화
# `constraint="demographic_parity"`는 그룹 간 예측 결과의 비율을 유사하게 만듭니다.
postprocess_model = ThresholdOptimizer(
estimator=baseline_model,
constraints="demographic_parity", # or "equalized_odds"
objective="accuracy_score",
prefit=True
)
# 최적의 임계값을 찾기 위해 postprocess_model을 학습시킵니다.
postprocess_model.fit(X_train, y_train, sensitive_features=sf_train)
y_pred_postprocess = postprocess_model.predict(X_test, sensitive_features=sf_test)
# 완화 후 모델 평가
grouped_on_sex_postprocess = MetricFrame(metrics=metrics,
y_true=y_test,
y_pred=y_pred_postprocess,
sensitive_features=sf_test)
print("\n\n--- 편향성 완화 후 모델 평가 결과 ---")
print(grouped_on_sex_postprocess.overall)
print("\n--- 성별에 따른 그룹별 평가 결과 (완화 후) ---")
print(grouped_on_sex_postprocess.by_group)
# 4. 결과 비교 및 시각화
summary = pd.DataFrame({
"Baseline": grouped_on_sex.overall,
"ThresholdOptimizer": grouped_on_sex_postprocess.overall
})
print("\n\n--- 최종 비교 ---")
print(summary)
# 시각화
fig, axes = plt.subplots(1, 2, figsize=(12, 5), sharey=True)
summary.loc[['accuracy']].T.plot(kind='bar', ax=axes[0], title='Accuracy Comparison', legend=False)
axes[0].set_ylabel('Score')
axes[0].set_xticklabels(axes[0].get_xticklabels(), rotation=0)
summary.loc[['demographic_parity_difference', 'equalized_odds_difference']].T.plot(
kind='bar', ax=axes[1], title='Fairness Metrics Comparison'
)
axes[1].set_xticklabels(axes[1].get_xticklabels(), rotation=0)
plt.suptitle("Model Performance and Fairness Comparison")
plt.tight_layout(rect=[0, 0, 1, 0.96])
# 결과 이미지 파일로 저장
plt.savefig("ai lecture/source_code/part_14_ai_ethics/fairness_comparison.png")
print("\n결과 비교 그래프를 'fairness_comparison.png' 파일로 저장했습니다.")
plt.show()
if __name__ == '__main__':
run()
scikit-learn
pandas
numpy
matplotlib
fairlearn
shap
lime
\ No newline at end of file
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
import shap
import lime
import lime.lime_tabular
import matplotlib.pyplot as plt
def prepare_data():
"""가상의 대출 심사 데이터를 생성하고 준비합니다."""
print("--- 1. 데이터 준비 ---")
data = {
'나이': np.random.randint(20, 70, size=1000),
'소득': np.random.randint(2000, 15000, size=1000),
'부채': np.random.randint(0, 8000, size=1000),
'신용등급': np.random.choice(['A', 'B', 'C', 'D'], size=1000, p=[0.3, 0.4, 0.2, 0.1]),
'대출기록': np.random.choice(['있음', '없음'], size=1000, p=[0.6, 0.4])
}
df = pd.DataFrame(data)
# Target 변수 생성 (규칙 기반)
df['대출승인'] = (
(df['소득'] > 5000) &
(df['부채'] < df['소득'] * 0.4) &
(df['신용등급'].isin(['A', 'B']))
).astype(int)
# 범주형 변수 인코딩
for col in ['신용등급', '대출기록']:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
X = df.drop('대출승인', axis=1)
y = df['대출승인']
feature_names = list(X.columns)
class_names = ['거절', '승인']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print("데이터 준비 완료.")
return X_train, X_test, y_train, y_test, feature_names, class_names
def train_model(X_train, y_train):
"""분류 모델을 학습시킵니다."""
print("\n--- 2. 모델 학습 ---")
model = RandomForestClassifier(random_state=42)
model.fit(X_train.values, y_train.values)
print("RandomForestClassifier 모델 학습 완료.")
return model
def analyze_with_lime(model, X_train, X_test, instance_idx, feature_names, class_names):
"""LIME을 사용하여 특정 예측 건을 분석하고 결과를 저장합니다."""
print(f"\n--- 3. LIME 분석 (설명 대상: Test 데이터 {instance_idx}번) ---")
explainer = lime.lime_tabular.LimeTabularExplainer(
training_data=X_train.values,
feature_names=feature_names,
class_names=class_names,
mode='classification'
)
instance = X_test.iloc[instance_idx].values
explanation = explainer.explain_instance(instance, model.predict_proba, num_features=5)
# LIME 결과 시각화 및 저장
fig = explanation.as_pyplot_figure()
lime_fig_path = "lime_report.png"
fig.savefig(lime_fig_path, bbox_inches='tight')
print(f"LIME 분석 결과를 '{lime_fig_path}' 파일로 저장했습니다.")
plt.close()
return explanation
def analyze_with_shap(model, X_train, X_test, instance_idx):
"""SHAP을 사용하여 로컬 및 글로벌 설명을 생성하고 결과를 저장합니다."""
print("\n--- 4. SHAP 분석 ---")
explainer = shap.TreeExplainer(model, X_train)
shap_values = explainer.shap_values(X_test)
# 로컬 설명 (Force Plot)
print(f"SHAP 로컬 분석 (설명 대상: Test 데이터 {instance_idx}번)")
instance = X_test.iloc[instance_idx]
force_plot = shap.force_plot(
explainer.expected_value[1],
shap_values[1][instance_idx, :],
instance,
matplotlib=True,
show=False
)
shap_force_plot_path = "shap_force_plot.png"
plt.savefig(shap_force_plot_path, bbox_inches='tight')
print(f"SHAP Force Plot을 '{shap_force_plot_path}' 파일로 저장했습니다.")
plt.close()
# 글로벌 설명 (Summary Plot)
print("SHAP 글로벌 분석 (전체 테스트 데이터)")
shap.summary_plot(shap_values[1], X_test, show=False)
shap_summary_plot_path = "shap_summary_plot.png"
plt.savefig(shap_summary_plot_path, bbox_inches='tight')
print(f"SHAP Summary Plot을 '{shap_summary_plot_path}' 파일로 저장했습니다.")
plt.close()
def generate_report(model, X_test, instance_idx, lime_exp):
"""분석 결과를 종합하여 텍스트 보고서를 생성합니다."""
print("\n--- 5. 최종 보고서 생성 ---")
instance = X_test.iloc[instance_idx]
prediction = model.predict(instance.values.reshape(1, -1))[0]
pred_proba = model.predict_proba(instance.values.reshape(1, -1))[0]
report = f"""
=================================================
XAI 기반 대출 거절 사유 분석 보고서
=================================================
고객 정보:
{instance.to_string()}
-------------------------------------------------
모델 예측 결과:
- 예측: {'승인' if prediction == 1 else '거절'}
- 거절 확률: {pred_proba[0]:.2%}
- 승인 확률: {pred_proba[1]:.2%}
-------------------------------------------------
LIME 기반 주요 거절 요인 분석:
(이 피처들이 '거절' 예측에 기여했습니다)
"""
# LIME 결과에서 거절에 영향을 미친 요인 추출
for feature, weight in lime_exp.as_list(label=0):
report += f"- {feature}\n"
report += """
-------------------------------------------------
결론:
LIME 분석에 따르면, 위 목록의 피처들이 이번 대출 신청의 '거절' 결정에 주요하게 작용한 것으로 보입니다.
SHAP 분석 결과(첨부 이미지 참조) 또한 이러한 경향을 뒷받침합니다.
첨부 파일:
1. lime_report.png (LIME 시각화)
2. shap_force_plot.png (SHAP 개별 예측 분석)
3. shap_summary_plot.png (SHAP 모델 전체 분석)
=================================================
"""
report_path = "xai_analysis_report.txt"
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
print(f"최종 분석 보고서를 '{report_path}' 파일로 저장했습니다.")
def main():
"""XAI 분석 및 보고서 생성 파이프라인을 실행합니다."""
X_train, X_test, y_train, y_test, feature_names, class_names = prepare_data()
model = train_model(X_train, y_train)
# 분석할 인스턴스 선택 (예: 첫 번째 거절 건)
refused_indices = y_test[y_test == 0].index
if len(refused_indices) > 0:
instance_to_explain_idx = refused_indices[0]
instance_to_explain_loc = X_test.index.get_loc(instance_to_explain_idx)
else:
print("테스트 데이터에 거절 건이 없어 임의의 데이터를 사용합니다.")
instance_to_explain_loc = 0
lime_explanation = analyze_with_lime(model, X_train, X_test, instance_to_explain_loc, feature_names, class_names)
analyze_with_shap(model, X_train, X_test, instance_to_explain_loc)
generate_report(model, X_test, instance_to_explain_loc, lime_explanation)
print("\n모든 실습 과정이 완료되었습니다.")
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment