import torch from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline import os import time import numpy as np def get_model_size(model): """모델의 메모리 크기를 MB 단위로 계산합니다.""" param_size = 0 for param in model.parameters(): param_size += param.nelement() * param.element_size() buffer_size = 0 for buffer in model.buffers(): buffer_size += buffer.nelement() * buffer.element_size() size_all_mb = (param_size + buffer_size) / 1024**2 return size_all_mb def benchmark_performance(pipe, text, num_runs=100): """ 파이프라인의 성능(지연 시간 및 처리량)을 벤치마킹합니다. GPU 예열(warm-up) 단계를 포함하여 더 정확한 측정치를 얻습니다. :param pipe: 벤치마킹할 Hugging Face 파이프라인 :param text: 추론에 사용할 입력 텍스트 :param num_runs: 벤치마킹을 위해 실행할 횟수 :return: (평균 지연 시간(ms), 처리량(inferences/sec)) """ latencies = [] # GPU 예열(warm-up)을 위해 몇 번의 추론을 미리 실행합니다. print("성능 측정을 위한 예열 중...") for _ in range(10): _ = pipe(text) # 본격적인 성능 측정을 시작합니다. print(f"{num_runs}회 추론으로 성능을 측정합니다...") for _ in range(num_runs): start_time = time.time() _ = pipe(text) end_time = time.time() latencies.append((end_time - start_time) * 1000) # ms 단위로 저장 # 평균 지연 시간과 처리량을 계산합니다. avg_latency_ms = np.mean(latencies) throughput = 1000 / avg_latency_ms if avg_latency_ms > 0 else 0 return avg_latency_ms, throughput def main(): """ Hugging Face의 사전 학습된 BERT 모델을 로드하고, 8-bit 양자화를 적용한 후 원본 모델과 양자화된 모델의 크기, 성능(지연 시간, 처리량)을 비교하고 추론 결과를 확인하는 스크립트입니다. """ # 사용할 모델과 디바이스 설정 model_id = "bert-base-uncased" device = "cuda" if torch.cuda.is_available() else "cpu" print(f"사용할 모델: {model_id}") print(f"사용할 디바이스: {device}") if device == "cpu": print( "\n경고: CUDA를 사용할 수 없어 CPU에서 실행합니다. bitsandbytes 양자화는 CUDA GPU에서만 효과가 있습니다." ) print("이 스크립트는 CUDA 환경에서 실행하는 것을 권장합니다.") # --- 1. 원본 FP32 모델 로드 및 크기 확인 --- print("\n--- 1. 원본 FP32 모델 로드 중... ---") tokenizer = AutoTokenizer.from_pretrained(model_id) fp32_model = AutoModelForSequenceClassification.from_pretrained(model_id) # 모델을 디바이스로 이동 fp32_model.to(device) fp32_model_size = get_model_size(fp32_model) print(f"FP32 모델 크기: {fp32_model_size:.2f} MB") # --- 2. 8-bit 양자화 모델 로드 및 크기 확인 --- # `load_in_8bit=True` 옵션을 사용하여 모델을 8-bit로 양자화합니다. # `device_map="auto"`는 모델 레이어를 사용 가능한 디바이스(GPU, CPU, RAM)에 자동으로 분배합니다. print("\n--- 2. 8-bit 양자화 모델 로드 중... ---") int8_model = None int8_model_size = 0 try: int8_model = AutoModelForSequenceClassification.from_pretrained( model_id, load_in_8bit=True, device_map="auto" ) # 양자화된 모델의 파라미터는 `int8`이지만, 계산 중에는 다른 자료형을 사용할 수 있어 # 정확한 크기 계산을 위해 `get_memory_footprint`를 사용하는 것이 더 적합합니다. int8_model_size = int8_model.get_memory_footprint() / 1024**2 print(f"INT8 모델 크기: {int8_model_size:.2f} MB") except Exception as e: print(f"8-bit 모델 로드 중 오류 발생: {e}") print("bitsandbytes가 올바르게 설치되었는지, CUDA 환경이 맞는지 확인하세요.") # --- 3. 추론 파이프라인 생성 --- print("\n--- 3. 추론 파이프라인 생성 ---") text = "This is a great movie, I really enjoyed it!" print(f"입력 텍스트: '{text}'") classifier_fp32 = pipeline( "text-classification", model=fp32_model, tokenizer=tokenizer, device=0 if device == "cuda" else -1, ) print("FP32 파이프라인 생성 완료.") classifier_int8 = None if int8_model: # 양자화된 모델은 device_map을 통해 이미 GPU에 할당되어 있으므로 device 설정이 필요 없습니다. classifier_int8 = pipeline( "text-classification", model=int8_model, tokenizer=tokenizer ) print("INT8 파이프라인 생성 완료.") # --- 4. 성능 벤치마킹 --- print("\n--- 4. 성능 벤치마킹 ---") fp32_latency, fp32_throughput = benchmark_performance(classifier_fp32, text) int8_latency, int8_throughput = (0, 0) if classifier_int8: int8_latency, int8_throughput = benchmark_performance(classifier_int8, text) # --- 5. 결과 요약 및 비교 --- print("\n\n--- 5. 최종 성능 비교 ---") print("=" * 50) header = f"{'모델':<10} | {'모델 크기(MB)':<15} | {'평균 지연시간(ms)':<20} | {'처리량(inf/sec)':<15}" print(header) print("-" * len(header) * 2) fp32_results = f"{'FP32':<10} | {fp32_model_size:<15.2f} | {fp32_latency:<20.2f} | {fp32_throughput:<15.2f}" print(fp32_results) if int8_model: int8_results = f"{'INT8':<10} | {int8_model_size:<15.2f} | {int8_latency:<20.2f} | {int8_throughput:<15.2f}" print(int8_results) print("-" * len(header) * 2) size_reduction = 100 * (1 - int8_model_size / fp32_model_size) latency_improvement = ( 100 * (fp32_latency - int8_latency) / fp32_latency if fp32_latency > 0 else 0 ) throughput_improvement = ( 100 * (int8_throughput - fp32_throughput) / fp32_throughput if fp32_throughput > 0 else 0 ) print(f"📊 모델 크기 감소율: {size_reduction:.2f}%") print(f"⏱️ 지연 시간 개선율: {latency_improvement:.2f}%") print(f"🚀 처리량 향상률: {throughput_improvement:.2f}%") print("=" * 50) # --- 6. 추론 결과 비교 --- print("\n--- 6. 추론 결과 비교 ---") print(f"입력 텍스트: '{text}'") # FP32 모델 추론 print("\nFP32 모델 추론 결과:") result_fp32 = classifier_fp32(text) print(result_fp32) # INT8 모델 추론 if classifier_int8: print("\nINT8 모델 추론 결과:") result_int8 = classifier_int8(text) print(result_int8) print("\n실습 완료!") print( "양자화를 통해 모델 크기가 크게 줄어들고, 지연 시간이 감소하며 처리량이 향상되는 것을 확인할 수 있습니다." ) if __name__ == "__main__": main()