Commit 80cdf0be authored by Administrator's avatar Administrator
Browse files

initial draft

parents
import logging
from typing import Dict, Any, List
import os
logger = logging.getLogger(__name__)
class ConfigValidator:
"""설정 파일 검증"""
@staticmethod
def validate_evaluation_config(config: Dict[str, Any]) -> List[str]:
"""평가 설정 검증"""
errors = []
# 임계값 검증
thresholds = config.get('EVALUATION_THRESHOLDS', {})
for metric, threshold in thresholds.items():
if not isinstance(threshold, (int, float)):
errors.append(f"Invalid threshold type for {metric}: {type(threshold)}")
elif threshold <= 0:
errors.append(f"Invalid threshold value for {metric}: {threshold}")
# 가중치 검증
weights = config.get('METRIC_WEIGHTS', {})
total_weight = sum(weights.values())
if abs(total_weight - 1.0) > 0.01:
errors.append(f"Metric weights sum to {total_weight}, should be 1.0")
# 파일 경로 검증
data_paths = config.get('DATA_PATHS', {})
for path_name, path_value in data_paths.items():
if not os.path.exists(path_value):
errors.append(f"Path does not exist for {path_name}: {path_value}")
return errors
@staticmethod
def validate_and_fix_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""설정 검증 및 자동 수정"""
errors = ConfigValidator.validate_evaluation_config(config)
if errors:
logger.warning(f"Config validation errors found: {errors}")
# 자동 수정 시도
fixed_config = config.copy()
# 임계값 자동 조정
if 'EVALUATION_THRESHOLDS' in fixed_config:
for metric, threshold in fixed_config['EVALUATION_THRESHOLDS'].items():
if threshold <= 0:
# 기본값으로 설정
default_thresholds = {
'chamfer_distance': 0.1,
'emd': 0.15,
'class_accuracy': 0.8
}
fixed_config['EVALUATION_THRESHOLDS'][metric] = default_thresholds.get(metric, 0.1)
logger.info(f"Auto-fixed threshold for {metric}")
return fixed_config
import logging
import traceback
import numpy as np
from typing import Any, Dict, Optional, List, Callable
from functools import wraps
logger = logging.getLogger(__name__)
class EvaluationExceptionHandler:
"""평가 시스템 전용 예외 처리기"""
@staticmethod
def handle_metric_calculation_error(metric_name: str, default_value: float = 0.0):
"""메트릭 계산 오류 처리 데코레이터"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ZeroDivisionError as e:
logger.error(f"Zero division error in {metric_name}: {str(e)}")
return default_value
except ValueError as e:
logger.error(f"Value error in {metric_name}: {str(e)}")
return default_value
except KeyError as e:
logger.error(f"Key error in {metric_name}: {str(e)}")
return default_value
except MemoryError as e:
logger.error(f"Memory error in {metric_name}: {str(e)}")
return default_value
except Exception as e:
logger.error(f"Unexpected error in {metric_name}: {str(e)}")
logger.debug(traceback.format_exc())
return default_value
return wrapper
return decorator
@staticmethod
def validate_metric_result(metric_name: str, result: Any, expected_range: tuple) -> bool:
"""메트릭 결과 검증"""
if not isinstance(result, (int, float)):
logger.warning(f"{metric_name}: Invalid result type {type(result)}")
return False
# 무한대 값 검증
if np.isinf(result):
logger.warning(f"{metric_name}: Result is infinite: {result}")
return False
# NaN 값 검증
if np.isnan(result):
logger.warning(f"{metric_name}: Result is NaN: {result}")
return False
if not (expected_range[0] <= result <= expected_range[1]):
logger.warning(f"{metric_name}: Result {result} out of expected range {expected_range}")
return False
return True
@staticmethod
def handle_vertices_key_error(mesh_data: Dict, fallback_vertices: List[List[float]] = None) -> Dict:
"""vertices 키 누락 오류 처리"""
if 'vertices' not in mesh_data:
logger.warning("vertices 키가 누락되었습니다. 대체 데이터 생성 중...")
if fallback_vertices is None:
# 기본 정육면체 vertices 생성
fallback_vertices = [
[0, 0, 0], [1, 0, 0], [1, 1, 0], [0, 1, 0], # 하단
[0, 0, 1], [1, 0, 1], [1, 1, 1], [0, 1, 1] # 상단
]
mesh_data['vertices'] = fallback_vertices
# faces도 생성
if 'faces' not in mesh_data:
mesh_data['faces'] = [
[0, 1, 2], [0, 2, 3], # 하단
[4, 7, 6], [4, 6, 5], # 상단
[0, 4, 5], [0, 5, 1], # 앞면
[2, 6, 7], [2, 7, 3], # 뒷면
[0, 3, 7], [0, 7, 4], # 왼쪽
[1, 5, 6], [1, 6, 2] # 오른쪽
]
return mesh_data
@staticmethod
def handle_infinite_values(value: float, metric_name: str, max_value: float = 1000.0) -> float:
"""무한대 값 처리"""
if np.isinf(value):
if value > 0: # 양의 무한대
logger.warning(f"{metric_name}: 양의 무한대 값 감지, {max_value}로 대체")
return max_value
else: # 음의 무한대
logger.warning(f"{metric_name}: 음의 무한대 값 감지, 0으로 대체")
return 0.0
if np.isnan(value):
logger.warning(f"{metric_name}: NaN 값 감지, 0으로 대체")
return 0.0
return value
@staticmethod
def safe_divide(numerator: float, denominator: float, default_value: float = 0.0) -> float:
"""안전한 나눗셈"""
if denominator == 0 or np.isclose(denominator, 0):
logger.warning(f"0으로 나누기 시도: {numerator} / {denominator}")
return default_value
result = numerator / denominator
# 무한대 값 검사
if np.isinf(result):
logger.warning(f"나눗셈 결과가 무한대: {numerator} / {denominator}")
return default_value
return result
@staticmethod
def validate_mesh_data(mesh_data: Dict) -> bool:
"""메시 데이터 검증"""
required_keys = ['vertices', 'faces']
for key in required_keys:
if key not in mesh_data:
logger.error(f"메시 데이터에 필수 키 '{key}'가 누락되었습니다")
return False
if not mesh_data[key]:
logger.error(f"메시 데이터의 '{key}'가 비어있습니다")
return False
# vertices 검증
vertices = mesh_data['vertices']
if not isinstance(vertices, (list, np.ndarray)):
logger.error("vertices는 리스트 또는 numpy 배열이어야 합니다")
return False
if len(vertices) == 0:
logger.error("vertices가 비어있습니다")
return False
# faces 검증
faces = mesh_data['faces']
if not isinstance(faces, (list, np.ndarray)):
logger.error("faces는 리스트 또는 numpy 배열이어야 합니다")
return False
if len(faces) == 0:
logger.error("faces가 비어있습니다")
return False
return True
@staticmethod
def create_robust_error_handler(metric_name: str, default_value: float = 0.0) -> Callable:
"""견고한 오류 처리기 생성"""
def error_handler(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
# 결과 검증
if result is None:
logger.warning(f"{metric_name}: 결과가 None입니다")
return default_value
if isinstance(result, float):
if np.isinf(result):
logger.warning(f"{metric_name}: 결과가 무한대입니다")
return default_value
if np.isnan(result):
logger.warning(f"{metric_name}: 결과가 NaN입니다")
return default_value
return result
except KeyError as e:
logger.error(f"{metric_name}: 키 오류 - {str(e)}")
return default_value
except ValueError as e:
logger.error(f"{metric_name}: 값 오류 - {str(e)}")
return default_value
except MemoryError as e:
logger.error(f"{metric_name}: 메모리 오류 - {str(e)}")
return default_value
except Exception as e:
logger.error(f"{metric_name}: 예상치 못한 오류 - {str(e)}")
logger.debug(traceback.format_exc())
return default_value
return wrapper
return error_handler
"""
상세한 로깅 시스템
각 지표 계산 과정의 상세 로깅과 중간 결과 저장을 담당합니다.
"""
import logging
import json
import os
import time
import psutil
import traceback
from datetime import datetime
from typing import Dict, Any, Optional, List
from contextlib import contextmanager
class DetailedLogger:
"""상세한 로깅을 담당하는 클래스"""
def __init__(self, log_dir: str = "logs", enable_performance_monitoring: bool = True):
"""
상세 로거 초기화
Args:
log_dir (str): 로그 파일 저장 디렉토리
enable_performance_monitoring (bool): 성능 모니터링 활성화 여부
"""
self.log_dir = log_dir
self.enable_performance_monitoring = enable_performance_monitoring
# 로그 디렉토리 생성
os.makedirs(log_dir, exist_ok=True)
# 세션 ID 설정 (로거 설정 전에)
self.current_session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
# 중간 결과 저장용
self.intermediate_results = []
# 로거 설정
self._setup_loggers()
# 성능 모니터링
if self.enable_performance_monitoring:
self.performance_monitor = PerformanceMonitor()
else:
self.performance_monitor = None
def _setup_loggers(self):
"""로거들을 설정합니다."""
# 메인 로거
self.main_logger = logging.getLogger('evaluation_main')
self.main_logger.setLevel(logging.INFO)
# 메트릭별 로거
self.metric_loggers = {}
for metric in ['2d_map', '3d_map', 'chamfer_distance', 'emd', 'class_accuracy']:
logger = logging.getLogger(f'evaluation_{metric}')
logger.setLevel(logging.DEBUG)
self.metric_loggers[metric] = logger
# 오류 로거
self.error_logger = logging.getLogger('evaluation_error')
self.error_logger.setLevel(logging.ERROR)
# 성능 로거
self.performance_logger = logging.getLogger('evaluation_performance')
self.performance_logger.setLevel(logging.INFO)
# 핸들러 설정
self._setup_handlers()
def _setup_handlers(self):
"""로그 핸들러들을 설정합니다."""
# 메인 로그 파일 핸들러
main_handler = logging.FileHandler(
os.path.join(self.log_dir, f'evaluation_main_{self.current_session_id}.log'),
encoding='utf-8'
)
main_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
self.main_logger.addHandler(main_handler)
# 메트릭별 로그 파일 핸들러
for metric, logger in self.metric_loggers.items():
handler = logging.FileHandler(
os.path.join(self.log_dir, f'evaluation_{metric}_{self.current_session_id}.log'),
encoding='utf-8'
)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger.addHandler(handler)
# 오류 로그 파일 핸들러
error_handler = logging.FileHandler(
os.path.join(self.log_dir, f'evaluation_error_{self.current_session_id}.log'),
encoding='utf-8'
)
error_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s\n%(pathname)s:%(lineno)d\n%(funcName)s\n'
))
self.error_logger.addHandler(error_handler)
# 성능 로그 파일 핸들러
if self.enable_performance_monitoring:
perf_handler = logging.FileHandler(
os.path.join(self.log_dir, f'evaluation_performance_{self.current_session_id}.log'),
encoding='utf-8'
)
perf_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.performance_logger.addHandler(perf_handler)
# 콘솔 핸들러 (모든 로거에 추가)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
for logger in [self.main_logger, self.error_logger, self.performance_logger]:
logger.addHandler(console_handler)
for logger in self.metric_loggers.values():
logger.addHandler(console_handler)
def log_metric_start(self, metric_name: str, **kwargs):
"""메트릭 계산 시작을 로깅합니다."""
logger = self.metric_loggers.get(metric_name, self.main_logger)
logger.info(f"=== {metric_name.upper()} 계산 시작 ===")
# 입력 파라미터 로깅
for key, value in kwargs.items():
if isinstance(value, (dict, list)):
logger.debug(f"입력 파라미터 {key}: {type(value).__name__} (크기: {len(value)})")
else:
logger.debug(f"입력 파라미터 {key}: {value}")
# 성능 모니터링 시작
if self.performance_monitor:
self.performance_monitor.start_metric(metric_name)
def log_metric_step(self, metric_name: str, step: str, details: Dict[str, Any] = None):
"""메트릭 계산 단계를 로깅합니다."""
logger = self.metric_loggers.get(metric_name, self.main_logger)
logger.info(f"[{metric_name}] {step}")
if details:
for key, value in details.items():
if isinstance(value, (dict, list)):
logger.debug(f" {key}: {type(value).__name__} (크기: {len(value)})")
else:
logger.debug(f" {key}: {value}")
def log_metric_result(self, metric_name: str, result: Any, execution_time: float = None):
"""메트릭 계산 결과를 로깅합니다."""
logger = self.metric_loggers.get(metric_name, self.main_logger)
logger.info(f"=== {metric_name.upper()} 계산 완료 ===")
logger.info(f"결과: {result}")
if execution_time:
logger.info(f"실행 시간: {execution_time:.2f}초")
# 성능 모니터링 종료
if self.performance_monitor:
self.performance_monitor.end_metric(metric_name, execution_time)
# 중간 결과 저장
self._save_intermediate_result(metric_name, result, execution_time)
def log_metric_error(self, metric_name: str, error: Exception, context: Dict[str, Any] = None):
"""메트릭 계산 오류를 로깅합니다."""
logger = self.metric_loggers.get(metric_name, self.error_logger)
logger.error(f"=== {metric_name.upper()} 계산 오류 ===")
logger.error(f"오류 타입: {type(error).__name__}")
logger.error(f"오류 메시지: {str(error)}")
logger.error(f"스택 트레이스:\n{traceback.format_exc()}")
if context:
logger.error("오류 발생 컨텍스트:")
for key, value in context.items():
logger.error(f" {key}: {value}")
# 성능 모니터링 종료 (오류 시)
if self.performance_monitor:
self.performance_monitor.end_metric(metric_name, None, error=True)
def log_system_info(self):
"""시스템 정보를 로깅합니다."""
self.main_logger.info("=== 시스템 정보 ===")
self.main_logger.info(f"CPU 개수: {psutil.cpu_count()}")
self.main_logger.info(f"메모리 총량: {psutil.virtual_memory().total / (1024**3):.2f} GB")
self.main_logger.info(f"사용 가능 메모리: {psutil.virtual_memory().available / (1024**3):.2f} GB")
self.main_logger.info(f"디스크 사용량: {psutil.disk_usage('/').percent:.1f}%")
def log_evaluation_start(self, model_path: str, reference_path: str):
"""평가 시작을 로깅합니다."""
self.main_logger.info("=" * 60)
self.main_logger.info("3D 객체인식 평가 시작")
self.main_logger.info("=" * 60)
self.main_logger.info(f"모델 파일: {model_path}")
self.main_logger.info(f"참조 파일: {reference_path}")
self.main_logger.info(f"세션 ID: {self.current_session_id}")
# 시스템 정보 로깅
self.log_system_info()
def log_evaluation_complete(self, results: Dict[str, Any]):
"""평가 완료를 로깅합니다."""
self.main_logger.info("=" * 60)
self.main_logger.info("3D 객체인식 평가 완료")
self.main_logger.info("=" * 60)
if 'comprehensive_score' in results:
self.main_logger.info(f"종합 점수: {results['comprehensive_score']:.2f}")
if 'grade' in results:
self.main_logger.info(f"성능 등급: {results['grade']}")
# 개별 지표 결과 로깅
if 'metrics' in results:
self.main_logger.info("개별 지표 결과:")
for metric_name, value in results['metrics'].items():
self.main_logger.info(f" {metric_name}: {value}")
# 성능 요약 로깅
if self.performance_monitor:
self.performance_monitor.log_summary(self.performance_logger)
def _save_intermediate_result(self, metric_name: str, result: Any, execution_time: float = None):
"""중간 결과를 저장합니다."""
intermediate_result = {
'timestamp': datetime.now().isoformat(),
'metric_name': metric_name,
'result': result,
'execution_time': execution_time,
'session_id': self.current_session_id
}
self.intermediate_results.append(intermediate_result)
# JSONL 파일에 저장
intermediate_file = os.path.join(
self.log_dir,
f"intermediate_results_{self.current_session_id}.jsonl"
)
with open(intermediate_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(intermediate_result, ensure_ascii=False) + '\n')
@contextmanager
def metric_context(self, metric_name: str, **kwargs):
"""메트릭 계산을 위한 컨텍스트 매니저"""
start_time = time.time()
try:
self.log_metric_start(metric_name, **kwargs)
yield
execution_time = time.time() - start_time
self.log_metric_result(metric_name, "계산 완료", execution_time)
except Exception as e:
execution_time = time.time() - start_time
self.log_metric_error(metric_name, e, {'execution_time': execution_time})
raise
class PerformanceMonitor:
"""성능 모니터링을 담당하는 클래스"""
def __init__(self):
"""성능 모니터 초기화"""
self.metric_times = {}
self.metric_memory = {}
self.metric_cpu = {}
self.current_metric = None
self.start_time = None
self.start_memory = None
self.start_cpu = None
def start_metric(self, metric_name: str):
"""메트릭 계산 시작 시점을 기록합니다."""
self.current_metric = metric_name
self.start_time = time.time()
self.start_memory = psutil.virtual_memory().used
self.start_cpu = psutil.cpu_percent()
def end_metric(self, metric_name: str, execution_time: float = None, error: bool = False):
"""메트릭 계산 종료 시점을 기록합니다."""
if self.current_metric != metric_name:
return
if execution_time is None:
execution_time = time.time() - self.start_time
end_memory = psutil.virtual_memory().used
end_cpu = psutil.cpu_percent()
# 메트릭별 성능 데이터 저장
self.metric_times[metric_name] = execution_time
self.metric_memory[metric_name] = {
'start': self.start_memory,
'end': end_memory,
'peak': end_memory - self.start_memory
}
self.metric_cpu[metric_name] = {
'start': self.start_cpu,
'end': end_cpu,
'average': (self.start_cpu + end_cpu) / 2
}
# 상태 초기화
self.current_metric = None
self.start_time = None
self.start_memory = None
self.start_cpu = None
def log_summary(self, logger: logging.Logger):
"""성능 요약을 로깅합니다."""
logger.info("=== 성능 요약 ===")
total_time = sum(self.metric_times.values())
logger.info(f"총 실행 시간: {total_time:.2f}초")
logger.info("메트릭별 실행 시간:")
for metric, time_taken in self.metric_times.items():
percentage = (time_taken / total_time) * 100 if total_time > 0 else 0
logger.info(f" {metric}: {time_taken:.2f}초 ({percentage:.1f}%)")
logger.info("메트릭별 메모리 사용량:")
for metric, memory_data in self.metric_memory.items():
peak_mb = memory_data['peak'] / (1024**2)
logger.info(f" {metric}: {peak_mb:.2f} MB")
logger.info("메트릭별 CPU 사용률:")
for metric, cpu_data in self.metric_cpu.items():
logger.info(f" {metric}: 평균 {cpu_data['average']:.1f}%")
# 전역 로거 인스턴스
_global_logger = None
def get_logger(log_dir: str = "logs", enable_performance_monitoring: bool = True) -> DetailedLogger:
"""전역 로거 인스턴스를 반환합니다."""
global _global_logger
if _global_logger is None:
_global_logger = DetailedLogger(log_dir, enable_performance_monitoring)
return _global_logger
def reset_logger():
"""전역 로거를 리셋합니다."""
global _global_logger
_global_logger = None
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
class NormalizationValidator:
"""정규화 결과 검증 클래스"""
@staticmethod
def validate_normalization_results(results: Dict[str, Any]) -> bool:
"""
정규화 결과의 합리성 검증
Args:
results: 메트릭별 정규화 결과
Returns:
검증 통과 여부
"""
warnings = []
for metric, data in results.items():
raw_value = data.get('raw_value', 0)
normalized = data.get('normalized', 0)
# 비정상적인 정규화 결과 검증
if normalized > 90 and raw_value > 0.1:
warnings.append(f"{metric}: 높은 원본 값({raw_value})에 높은 정규화 점수({normalized})")
if normalized < 10 and raw_value < 0.01:
warnings.append(f"{metric}: 낮은 원본 값({raw_value})에 낮은 정규화 점수({normalized})")
if warnings:
for warning in warnings:
logger.warning(f"Normalization validation warning: {warning}")
return False
return True
"""
성능 모니터링 유틸리티
메모리 사용량, CPU 사용률, 실행 시간 등을 모니터링합니다.
"""
import time
import psutil
import threading
import logging
from typing import Dict, List, Optional, Callable
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PerformanceMetrics:
"""성능 메트릭 데이터 클래스"""
timestamp: datetime
cpu_percent: float
memory_used_mb: float
memory_percent: float
disk_io_read_mb: float
disk_io_write_mb: float
network_io_sent_mb: float
network_io_recv_mb: float
class PerformanceMonitor:
"""성능 모니터링을 담당하는 클래스"""
def __init__(self, monitoring_interval: float = 1.0, log_threshold_mb: float = 100.0):
"""
성능 모니터 초기화
Args:
monitoring_interval (float): 모니터링 간격 (초)
log_threshold_mb (float): 로그 임계값 (MB)
"""
self.monitoring_interval = monitoring_interval
self.log_threshold_mb = log_threshold_mb
self.is_monitoring = False
self.monitoring_thread = None
self.metrics_history: List[PerformanceMetrics] = []
self.callbacks: List[Callable[[PerformanceMetrics], None]] = []
# 초기 시스템 상태
self.initial_disk_io = psutil.disk_io_counters()
self.initial_network_io = psutil.net_io_counters()
# 로거 설정
self.logger = logging.getLogger('performance_monitor')
def start_monitoring(self):
"""성능 모니터링을 시작합니다."""
if self.is_monitoring:
return
self.is_monitoring = True
self.monitoring_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
self.monitoring_thread.start()
self.logger.info("성능 모니터링이 시작되었습니다.")
def stop_monitoring(self):
"""성능 모니터링을 중지합니다."""
if not self.is_monitoring:
return
self.is_monitoring = False
if self.monitoring_thread:
self.monitoring_thread.join(timeout=2.0)
self.logger.info("성능 모니터링이 중지되었습니다.")
def _monitoring_loop(self):
"""모니터링 루프"""
while self.is_monitoring:
try:
metrics = self._collect_metrics()
self.metrics_history.append(metrics)
# 콜백 실행
for callback in self.callbacks:
try:
callback(metrics)
except Exception as e:
self.logger.error(f"콜백 실행 중 오류: {e}")
# 메모리 사용량이 임계값을 초과하면 경고
if metrics.memory_used_mb > self.log_threshold_mb:
self.logger.warning(
f"높은 메모리 사용량 감지: {metrics.memory_used_mb:.2f} MB "
f"({metrics.memory_percent:.1f}%)"
)
time.sleep(self.monitoring_interval)
except Exception as e:
self.logger.error(f"모니터링 중 오류: {e}")
time.sleep(self.monitoring_interval)
def _collect_metrics(self) -> PerformanceMetrics:
"""현재 시스템 메트릭을 수집합니다."""
# CPU 사용률
cpu_percent = psutil.cpu_percent()
# 메모리 사용량
memory = psutil.virtual_memory()
memory_used_mb = memory.used / (1024**2)
memory_percent = memory.percent
# 디스크 I/O
disk_io = psutil.disk_io_counters()
disk_io_read_mb = (disk_io.read_bytes - self.initial_disk_io.read_bytes) / (1024**2)
disk_io_write_mb = (disk_io.write_bytes - self.initial_disk_io.write_bytes) / (1024**2)
# 네트워크 I/O
network_io = psutil.net_io_counters()
network_io_sent_mb = (network_io.bytes_sent - self.initial_network_io.bytes_sent) / (1024**2)
network_io_recv_mb = (network_io.bytes_recv - self.initial_network_io.bytes_recv) / (1024**2)
return PerformanceMetrics(
timestamp=datetime.now(),
cpu_percent=cpu_percent,
memory_used_mb=memory_used_mb,
memory_percent=memory_percent,
disk_io_read_mb=disk_io_read_mb,
disk_io_write_mb=disk_io_write_mb,
network_io_sent_mb=network_io_sent_mb,
network_io_recv_mb=network_io_recv_mb
)
def add_callback(self, callback: Callable[[PerformanceMetrics], None]):
"""성능 메트릭 콜백을 추가합니다."""
self.callbacks.append(callback)
def remove_callback(self, callback: Callable[[PerformanceMetrics], None]):
"""성능 메트릭 콜백을 제거합니다."""
if callback in self.callbacks:
self.callbacks.remove(callback)
def get_current_metrics(self) -> Optional[PerformanceMetrics]:
"""현재 메트릭을 반환합니다."""
if not self.metrics_history:
return None
return self.metrics_history[-1]
def get_metrics_summary(self) -> Dict:
"""메트릭 요약을 반환합니다."""
if not self.metrics_history:
return {}
cpu_values = [m.cpu_percent for m in self.metrics_history]
memory_values = [m.memory_used_mb for m in self.metrics_history]
return {
'total_samples': len(self.metrics_history),
'monitoring_duration': (
self.metrics_history[-1].timestamp - self.metrics_history[0].timestamp
).total_seconds(),
'cpu': {
'min': min(cpu_values),
'max': max(cpu_values),
'avg': sum(cpu_values) / len(cpu_values)
},
'memory': {
'min_mb': min(memory_values),
'max_mb': max(memory_values),
'avg_mb': sum(memory_values) / len(memory_values)
},
'peak_memory_mb': max(memory_values),
'total_disk_read_mb': self.metrics_history[-1].disk_io_read_mb,
'total_disk_write_mb': self.metrics_history[-1].disk_io_write_mb,
'total_network_sent_mb': self.metrics_history[-1].network_io_sent_mb,
'total_network_recv_mb': self.metrics_history[-1].network_io_recv_mb
}
def clear_history(self):
"""메트릭 히스토리를 초기화합니다."""
self.metrics_history.clear()
self.logger.info("성능 메트릭 히스토리가 초기화되었습니다.")
class MemoryProfiler:
"""메모리 프로파일링을 담당하는 클래스"""
def __init__(self):
"""메모리 프로파일러 초기화"""
self.snapshots: List[Dict] = []
self.logger = logging.getLogger('memory_profiler')
def take_snapshot(self, label: str = None):
"""메모리 스냅샷을 생성합니다."""
process = psutil.Process()
memory_info = process.memory_info()
memory_percent = process.memory_percent()
snapshot = {
'timestamp': datetime.now(),
'label': label,
'rss_mb': memory_info.rss / (1024**2), # 실제 메모리 사용량
'vms_mb': memory_info.vms / (1024**2), # 가상 메모리 사용량
'percent': memory_percent,
'num_threads': process.num_threads(),
'num_fds': process.num_fds() if hasattr(process, 'num_fds') else 0
}
self.snapshots.append(snapshot)
if label:
self.logger.info(f"메모리 스냅샷 [{label}]: {snapshot['rss_mb']:.2f} MB")
return snapshot
def get_memory_diff(self, start_label: str, end_label: str) -> Dict:
"""두 스냅샷 간의 메모리 차이를 계산합니다."""
start_snapshot = None
end_snapshot = None
for snapshot in self.snapshots:
if snapshot['label'] == start_label:
start_snapshot = snapshot
elif snapshot['label'] == end_label:
end_snapshot = snapshot
if not start_snapshot or not end_snapshot:
return {}
return {
'start_label': start_label,
'end_label': end_label,
'rss_diff_mb': end_snapshot['rss_mb'] - start_snapshot['rss_mb'],
'vms_diff_mb': end_snapshot['vms_mb'] - start_snapshot['vms_mb'],
'percent_diff': end_snapshot['percent'] - start_snapshot['percent'],
'time_diff_seconds': (
end_snapshot['timestamp'] - start_snapshot['timestamp']
).total_seconds()
}
def clear_snapshots(self):
"""스냅샷을 초기화합니다."""
self.snapshots.clear()
self.logger.info("메모리 스냅샷이 초기화되었습니다.")
@contextmanager
def monitor_performance(monitor: PerformanceMonitor, label: str = None):
"""성능 모니터링 컨텍스트 매니저"""
if label:
monitor.take_snapshot(f"{label}_start")
monitor.start_monitoring()
try:
yield monitor
finally:
monitor.stop_monitoring()
if label:
monitor.take_snapshot(f"{label}_end")
@contextmanager
def memory_profiling(profiler: MemoryProfiler, label: str = None):
"""메모리 프로파일링 컨텍스트 매니저"""
start_snapshot = profiler.take_snapshot(f"{label}_start" if label else "start")
try:
yield profiler
finally:
end_snapshot = profiler.take_snapshot(f"{label}_end" if label else "end")
if label:
diff = profiler.get_memory_diff(f"{label}_start", f"{label}_end")
if diff:
profiler.logger.info(
f"메모리 프로파일링 [{label}]: "
f"RSS 차이: {diff['rss_diff_mb']:.2f} MB, "
f"실행 시간: {diff['time_diff_seconds']:.2f}초"
)
# 전역 인스턴스
_global_performance_monitor = None
_global_memory_profiler = None
def get_performance_monitor() -> PerformanceMonitor:
"""전역 성능 모니터 인스턴스를 반환합니다."""
global _global_performance_monitor
if _global_performance_monitor is None:
_global_performance_monitor = PerformanceMonitor()
return _global_performance_monitor
def get_memory_profiler() -> MemoryProfiler:
"""전역 메모리 프로파일러 인스턴스를 반환합니다."""
global _global_memory_profiler
if _global_memory_profiler is None:
_global_memory_profiler = MemoryProfiler()
return _global_memory_profiler
import cv2
import numpy as np
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger(__name__)
class ReferenceDataExtractor:
"""참조 이미지에서 Ground Truth 데이터 추출"""
def __init__(self):
self.detector = self._initialize_detector()
def _initialize_detector(self):
"""객체 감지기 초기화"""
# YOLO 또는 다른 객체 감지기 사용
try:
import ultralytics
# 더 강력한 YOLO 모델 사용 (yolov8s)
return ultralytics.YOLO('yolov8s.pt')
except ImportError:
logger.warning("YOLO not available, using OpenCV detector")
return None
def extract_reference_data(self, image_path: str) -> Dict:
"""
참조 이미지에서 객체 정보 추출
Args:
image_path: 참조 이미지 경로
Returns:
추출된 객체 정보
"""
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Cannot load image: {image_path}")
# 객체 감지 수행
if self.detector:
logger.info(f"YOLO 객체 감지 시작 - 이미지 크기: {image.shape}")
results = self.detector(image)
objects = self._parse_yolo_results(results)
# 상세한 디버깅 로그 추가
logger.info(f"YOLO 원본 감지 결과: {len(results)}개")
logger.info(f"confidence 필터링 후: {len(objects)}개 객체")
# 감지된 객체들의 상세 정보 로그
for i, obj in enumerate(objects):
logger.info(f" 객체 {i+1}: bbox={obj['bbox']}, confidence={obj['confidence']:.3f}, class={obj['class']}")
if len(objects) == 0:
logger.warning("YOLO에서 객체를 감지하지 못했습니다. OpenCV fallback 사용")
objects = self._extract_with_opencv(image)
logger.info(f"OpenCV fallback 결과: {len(objects)}개 객체")
else:
logger.info("YOLO 감지기 없음, OpenCV fallback 직접 사용")
objects = self._extract_with_opencv(image)
logger.info(f"OpenCV fallback 결과: {len(objects)}개 객체")
return {
'objects': objects,
'bounding_boxes': [obj['bbox'] for obj in objects],
'class_labels': [obj['class'] for obj in objects],
'confidence_scores': [obj['confidence'] for obj in objects]
}
def _parse_yolo_results(self, results) -> List[Dict]:
"""YOLO 결과 파싱 (개선된 버전)"""
objects = []
confidence_threshold = 0.001 # 극도로 낮은 threshold로 설정 (0.01 → 0.001)
logger.info(f"YOLO 결과 파싱 시작 - confidence threshold: {confidence_threshold}")
total_detections = 0
filtered_by_confidence = 0
filtered_by_size = 0
for result in results:
boxes = result.boxes
if boxes is not None:
total_detections += len(boxes)
for box in boxes:
confidence = box.conf[0].cpu().numpy()
# confidence 필터링 추가
if confidence >= confidence_threshold:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
class_id = int(box.cls[0].cpu().numpy())
# 바운딩 박스 크기 검증 (매우 관대한 필터)
width = x2 - x1
height = y2 - y1
if width > 1 and height > 1: # 최소 크기 필터 매우 완화 (5 → 1)
objects.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(confidence),
'class': class_id
})
logger.debug(f" 감지된 객체: confidence={confidence:.3f}, size=({width:.1f}x{height:.1f}), class={class_id}")
else:
filtered_by_size += 1
logger.debug(f" 크기 필터링: confidence={confidence:.3f}, size=({width:.1f}x{height:.1f})")
else:
filtered_by_confidence += 1
logger.debug(f" confidence 필터링: {confidence:.3f} < {confidence_threshold}")
logger.info(f"YOLO 파싱 결과: 총 {total_detections}개 감지, confidence 필터링 {filtered_by_confidence}개, 크기 필터링 {filtered_by_size}개, 최종 {len(objects)}개 객체")
# 객체가 감지되지 않은 경우 더 낮은 threshold로 재시도
if len(objects) == 0:
logger.warning("기본 threshold에서 객체 감지 실패, 더 낮은 threshold로 재시도")
confidence_threshold = 0.0001 # 극도로 낮은 threshold로 재시도
for result in results:
boxes = result.boxes
if boxes is not None:
for box in boxes:
confidence = box.conf[0].cpu().numpy()
if confidence >= confidence_threshold:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
class_id = int(box.cls[0].cpu().numpy())
width = x2 - x1
height = y2 - y1
if width > 0.5 and height > 0.5: # 극도로 낮은 크기 필터
objects.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(confidence),
'class': class_id
})
return objects
def _extract_with_opencv(self, image: np.ndarray) -> List[Dict]:
"""OpenCV를 사용한 개선된 객체 추출"""
try:
# 이미지 전처리
processed_image = self._preprocess_image(image)
# 다중 방법으로 객체 탐지
objects = []
# 방법 1: 개선된 Canny 엣지 검출
objects.extend(self._detect_with_improved_canny(processed_image))
# 방법 2: 히스토그램 균등화 + 그래디언트 기반 탐지
objects.extend(self._detect_with_histogram_equalization(processed_image))
# 방법 3: 동적 임계값 기반 탐지
objects.extend(self._detect_with_dynamic_threshold(processed_image))
# 중복 제거 및 필터링
objects = self._filter_and_merge_objects(objects)
return objects
except Exception as e:
logger.error(f"OpenCV 객체 추출 중 오류 발생: {str(e)}")
# 오류 발생 시 기본 방법 사용
return self._extract_with_basic_opencv(image)
def _preprocess_image(self, image: np.ndarray) -> np.ndarray:
"""이미지 전처리 (대비 개선, 노이즈 제거, 엣지 강화)"""
# 그레이스케일 변환
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 히스토그램 균등화
equalized = cv2.equalizeHist(gray)
# 가우시안 블러로 노이즈 제거
blurred = cv2.GaussianBlur(equalized, (5, 5), 0)
# 대비 개선
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(blurred)
return enhanced
def _detect_with_improved_canny(self, image: np.ndarray) -> List[Dict]:
"""개선된 Canny 엣지 검출"""
objects = []
# 동적 임계값 계산
median = np.median(image)
lower = int(max(0, 0.7 * median))
upper = int(min(255, 1.3 * median))
# Canny 엣지 검출
edges = cv2.Canny(image, lower, upper)
# 모폴로지 연산으로 엣지 강화
kernel = np.ones((3, 3), np.uint8)
edges = cv2.morphologyEx(edges, cv2.MORPH_CLOSE, kernel)
# 컨투어 찾기
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
if area > 500: # 최소 면적 필터링
x, y, w, h = cv2.boundingRect(contour)
if w > 30 and h > 30: # 최소 크기 필터링
objects.append({
'bbox': [x, y, x + w, y + h],
'confidence': 0.7,
'class': 0,
'method': 'improved_canny'
})
return objects
def _detect_with_histogram_equalization(self, image: np.ndarray) -> List[Dict]:
"""히스토그램 균등화 + 그래디언트 기반 탐지"""
objects = []
# Sobel 그래디언트 계산
grad_x = cv2.Sobel(image, cv2.CV_64F, 1, 0, ksize=3)
grad_y = cv2.Sobel(image, cv2.CV_64F, 0, 1, ksize=3)
gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2)
# 그래디언트 크기 정규화
gradient_magnitude = np.uint8(255 * gradient_magnitude / np.max(gradient_magnitude))
# 동적 임계값 적용
threshold = np.mean(gradient_magnitude) + np.std(gradient_magnitude)
binary = cv2.threshold(gradient_magnitude, threshold, 255, cv2.THRESH_BINARY)[1]
# 모폴로지 연산
kernel = np.ones((5, 5), np.uint8)
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
# 컨투어 찾기
contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
if area > 800: # 최소 면적 필터링
x, y, w, h = cv2.boundingRect(contour)
if w > 40 and h > 40: # 최소 크기 필터링
objects.append({
'bbox': [x, y, x + w, y + h],
'confidence': 0.75,
'class': 0,
'method': 'histogram_equalization'
})
return objects
def _detect_with_dynamic_threshold(self, image: np.ndarray) -> List[Dict]:
"""동적 임계값 기반 탐지"""
objects = []
# Otsu 임계값 계산
threshold_value, binary = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# 적응적 임계값도 시도
adaptive_binary = cv2.adaptiveThreshold(
image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
)
# 두 결과를 결합
combined = cv2.bitwise_or(binary, adaptive_binary)
# 모폴로지 연산
kernel = np.ones((3, 3), np.uint8)
combined = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel)
# 컨투어 찾기
contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
if area > 600: # 최소 면적 필터링
x, y, w, h = cv2.boundingRect(contour)
if w > 35 and h > 35: # 최소 크기 필터링
objects.append({
'bbox': [x, y, x + w, y + h],
'confidence': 0.65,
'class': 0,
'method': 'dynamic_threshold'
})
return objects
def _filter_and_merge_objects(self, objects: List[Dict]) -> List[Dict]:
"""중복 제거 및 객체 필터링"""
if not objects:
return []
# IoU 기반 중복 제거
filtered_objects = []
for obj in objects:
is_duplicate = False
for existing_obj in filtered_objects:
if self._calculate_iou(obj['bbox'], existing_obj['bbox']) > 0.5:
# 더 높은 신뢰도를 가진 객체 유지
if obj['confidence'] > existing_obj['confidence']:
filtered_objects.remove(existing_obj)
filtered_objects.append(obj)
is_duplicate = True
break
if not is_duplicate:
filtered_objects.append(obj)
# 신뢰도 순으로 정렬
filtered_objects.sort(key=lambda x: x['confidence'], reverse=True)
# 상위 5개 객체만 반환
return filtered_objects[:5]
def _calculate_iou(self, bbox1: List[float], bbox2: List[float]) -> float:
"""IoU 계산"""
x1_1, y1_1, x2_1, y2_1 = bbox1
x1_2, y1_2, x2_2, y2_2 = bbox2
# 교집합 영역 계산
x1_i = max(x1_1, x1_2)
y1_i = max(y1_1, y1_2)
x2_i = min(x2_1, x2_2)
y2_i = min(y2_1, y2_2)
if x2_i <= x1_i or y2_i <= y1_i:
return 0.0
intersection = (x2_i - x1_i) * (y2_i - y1_i)
area1 = (x2_1 - x1_1) * (y2_1 - y1_1)
area2 = (x2_2 - x1_2) * (y2_2 - y1_2)
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0.0
def _extract_with_basic_opencv(self, image: np.ndarray) -> List[Dict]:
"""기본 OpenCV 방법 (오류 발생 시 대체) - 개선된 버전"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 더 관대한 Canny 엣지 검출 (임계값 완화)
edges = cv2.Canny(gray, 30, 100) # 50, 150 → 30, 100
# 컨투어 찾기
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
objects = []
for contour in contours:
area = cv2.contourArea(contour)
if area > 500: # 최소 면적 필터링 완화 (1000 → 500)
x, y, w, h = cv2.boundingRect(contour)
if w > 20 and h > 20: # 최소 크기 필터링 완화 (50 → 20)
objects.append({
'bbox': [x, y, x + w, y + h],
'confidence': 0.3, # 더 낮은 신뢰도 (0.5 → 0.3)
'class': 0,
'method': 'basic_fallback'
})
# 객체가 여전히 없는 경우 더 관대한 방법 시도
if len(objects) == 0:
logger.warning("기본 OpenCV 방법으로 객체 감지 실패, 더 관대한 방법 시도")
objects = self._extract_with_adaptive_opencv(image)
return objects
def _extract_with_adaptive_opencv(self, image: np.ndarray) -> List[Dict]:
"""적응적 OpenCV 방법 (더 관대한 임계값)"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 적응적 임계값 적용
adaptive_thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
)
# 모폴로지 연산으로 노이즈 제거
kernel = np.ones((3, 3), np.uint8)
cleaned = cv2.morphologyEx(adaptive_thresh, cv2.MORPH_CLOSE, kernel)
# 컨투어 찾기
contours, _ = cv2.findContours(cleaned, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
objects = []
for contour in contours:
area = cv2.contourArea(contour)
if area > 200: # 매우 낮은 면적 임계값 (500 → 200)
x, y, w, h = cv2.boundingRect(contour)
if w > 10 and h > 10: # 매우 낮은 크기 임계값 (20 → 10)
objects.append({
'bbox': [x, y, x + w, y + h],
'confidence': 0.2, # 매우 낮은 신뢰도 (0.3 → 0.2)
'class': 0,
'method': 'adaptive_fallback'
})
# 여전히 객체가 없는 경우 이미지 전체를 하나의 객체로 처리
if len(objects) == 0:
logger.warning("모든 OpenCV 방법 실패, 이미지 전체를 하나의 객체로 처리")
h, w = image.shape[:2]
objects.append({
'bbox': [0, 0, w, h],
'confidence': 0.1, # 최소 신뢰도
'class': 0,
'method': 'full_image_fallback'
})
return objects
def _create_enhanced_fallback_data(self, image_path: str) -> Dict:
"""향상된 fallback 데이터 생성 (객체 감지 실패 시)"""
logger.info("향상된 fallback 데이터 생성 시작")
# 이미지 로드
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Cannot load image: {image_path}")
h, w = image.shape[:2]
# 이미지를 여러 영역으로 분할하여 객체로 처리
objects = []
# 방법 1: 이미지를 4개 영역으로 분할
quarter_w, quarter_h = w // 2, h // 2
regions = [
[0, 0, quarter_w, quarter_h], # 좌상단
[quarter_w, 0, w, quarter_h], # 우상단
[0, quarter_h, quarter_w, h], # 좌하단
[quarter_w, quarter_h, w, h] # 우하단
]
for i, region in enumerate(regions):
x1, y1, x2, y2 = region
objects.append({
'bbox': [x1, y1, x2, y2],
'confidence': 0.4, # 중간 신뢰도
'class': i % 3, # 클래스 다양화
'method': 'region_split'
})
# 방법 2: 이미지 전체를 하나의 객체로도 추가
objects.append({
'bbox': [0, 0, w, h],
'confidence': 0.3,
'class': 0,
'method': 'full_image'
})
logger.info(f"향상된 fallback 데이터 생성 완료: {len(objects)}개 객체")
return {
'objects': objects,
'bounding_boxes': [obj['bbox'] for obj in objects],
'class_labels': [obj['class'] for obj in objects],
'confidence_scores': [obj['confidence'] for obj in objects]
}
def create_unified_ground_truth(self, reference_image_path: str, model_3d_path: str = None) -> Dict:
"""
표준화된 Ground Truth 데이터 생성
Args:
reference_image_path: 참조 이미지 경로
model_3d_path: 3D 모델 경로 (선택사항)
Returns:
표준화된 Ground Truth 데이터
"""
try:
# 참조 이미지에서 객체 정보 추출
reference_data = self.extract_reference_data(reference_image_path)
# 객체 감지 결과 검증 및 개선
if len(reference_data['objects']) == 0:
logger.warning("객체 감지 실패, 대체 Ground Truth 생성")
reference_data = self._create_enhanced_fallback_data(reference_image_path)
# 표준 메시 구조 생성
unified_ground_truth = {
'vertices': self._generate_standard_vertices(reference_data),
'faces': self._generate_standard_faces(reference_data),
'center': self._calculate_center(reference_data),
'scale': self._calculate_scale(reference_data),
'objects': reference_data['objects'],
'bounding_boxes': reference_data['bounding_boxes'],
'class_labels': reference_data['class_labels'],
'confidence_scores': reference_data['confidence_scores'],
'metadata': {
'source_image': reference_image_path,
'extraction_method': 'yolo' if self.detector else 'opencv',
'timestamp': self._get_timestamp(),
'version': '1.0',
'object_count': len(reference_data['objects']),
'fallback_used': len(reference_data['objects']) == 0
}
}
# 데이터 검증
self._validate_ground_truth_structure(unified_ground_truth)
logger.info(f"Ground Truth 생성 완료: {len(reference_data['objects'])}개 객체, {len(unified_ground_truth['vertices'])}개 vertices")
return unified_ground_truth
except Exception as e:
logger.error(f"Ground Truth 생성 중 오류 발생: {str(e)}")
# 오류 발생 시 기본 구조 반환
return self._create_fallback_ground_truth(reference_image_path)
def _generate_standard_vertices(self, reference_data: Dict) -> List[List[float]]:
"""표준 vertices 생성"""
vertices = []
for bbox in reference_data['bounding_boxes']:
x1, y1, x2, y2 = bbox
# 바운딩 박스를 3D 정육면체로 변환
# Z축은 기본값으로 설정
z_min, z_max = 0.0, 1.0
# 8개 꼭짓점 생성 (정육면체)
cube_vertices = [
[x1, y1, z_min], [x2, y1, z_min], [x2, y2, z_min], [x1, y2, z_min], # 하단
[x1, y1, z_max], [x2, y1, z_max], [x2, y2, z_max], [x1, y2, z_max] # 상단
]
vertices.extend(cube_vertices)
# 객체가 없는 경우 기본 정육면체 생성
if not vertices:
# 이미지 전체를 하나의 객체로 처리
logger.warning("객체 감지 실패. 이미지 전체를 Ground Truth로 사용")
# 이미지 크기 기반 3D 박스 생성
image = cv2.imread(reference_image_path)
if image is not None:
h, w = image.shape[:2]
# 이미지 크기에 비례한 3D 박스 생성
scale_factor = min(w, h) / 100.0 # 정규화
vertices = [
[0, 0, 0], [scale_factor, 0, 0], [scale_factor, scale_factor, 0], [0, scale_factor, 0], # 하단
[0, 0, scale_factor], [scale_factor, 0, scale_factor], [scale_factor, scale_factor, scale_factor], [0, scale_factor, scale_factor] # 상단
]
# bounding_boxes도 업데이트
reference_data['bounding_boxes'] = [[0, 0, w, h]]
reference_data['objects'] = [{
'bbox': [0, 0, w, h],
'confidence': 0.5,
'class': 0
}]
else:
# 이미지 로드 실패 시 기본값
vertices = [
[0, 0, 0], [1, 0, 0], [1, 1, 0], [0, 1, 0],
[0, 0, 1], [1, 0, 1], [1, 1, 1], [0, 1, 1]
]
return vertices
def _generate_standard_faces(self, reference_data: Dict) -> List[List[int]]:
"""표준 faces 생성"""
faces = []
# 각 바운딩 박스마다 12개 삼각형 면 생성 (정육면체)
for i, bbox in enumerate(reference_data['bounding_boxes']):
base_idx = i * 8 # 각 객체마다 8개 꼭짓점
# 정육면체의 12개 면 (각 면은 2개 삼각형으로 구성)
cube_faces = [
# 하단 면
[base_idx, base_idx+1, base_idx+2], [base_idx, base_idx+2, base_idx+3],
# 상단 면
[base_idx+4, base_idx+7, base_idx+6], [base_idx+4, base_idx+6, base_idx+5],
# 앞면
[base_idx, base_idx+4, base_idx+5], [base_idx, base_idx+5, base_idx+1],
# 뒷면
[base_idx+2, base_idx+6, base_idx+7], [base_idx+2, base_idx+7, base_idx+3],
# 왼쪽 면
[base_idx, base_idx+3, base_idx+7], [base_idx, base_idx+7, base_idx+4],
# 오른쪽 면
[base_idx+1, base_idx+5, base_idx+6], [base_idx+1, base_idx+6, base_idx+2]
]
faces.extend(cube_faces)
# 객체가 없는 경우 기본 정육면체 면 생성
if not faces:
faces = [
[0, 1, 2], [0, 2, 3], # 하단
[4, 7, 6], [4, 6, 5], # 상단
[0, 4, 5], [0, 5, 1], # 앞면
[2, 6, 7], [2, 7, 3], # 뒷면
[0, 3, 7], [0, 7, 4], # 왼쪽
[1, 5, 6], [1, 6, 2] # 오른쪽
]
return faces
def _calculate_center(self, reference_data: Dict) -> List[float]:
"""중심점 계산"""
if not reference_data['bounding_boxes']:
return [0.5, 0.5, 0.5] # 기본 중심점
# 모든 바운딩 박스의 중심점 계산
centers = []
for bbox in reference_data['bounding_boxes']:
x1, y1, x2, y2 = bbox
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
center_z = 0.5 # Z축은 기본값
centers.append([center_x, center_y, center_z])
# 전체 중심점 계산
if centers:
center = [sum(c[i] for c in centers) / len(centers) for i in range(3)]
else:
center = [0.5, 0.5, 0.5]
return center
def _calculate_scale(self, reference_data: Dict) -> List[float]:
"""스케일 계산"""
if not reference_data['bounding_boxes']:
return [1.0, 1.0, 1.0] # 기본 스케일
# 모든 바운딩 박스의 크기 계산
scales = []
for bbox in reference_data['bounding_boxes']:
x1, y1, x2, y2 = bbox
scale_x = abs(x2 - x1)
scale_y = abs(y2 - y1)
scale_z = 1.0 # Z축은 기본값
scales.append([scale_x, scale_y, scale_z])
# 평균 스케일 계산
if scales:
scale = [sum(s[i] for s in scales) / len(scales) for i in range(3)]
else:
scale = [1.0, 1.0, 1.0]
return scale
def _validate_ground_truth_structure(self, ground_truth: Dict) -> bool:
"""Ground Truth 구조 검증"""
try:
# 필수 키 확인
required_keys = ['vertices', 'faces', 'objects', 'bounding_boxes']
for key in required_keys:
if key not in ground_truth:
logger.error(f"Ground Truth에 필수 키 '{key}'가 없습니다")
return False
# vertices 검증
vertices = ground_truth['vertices']
if len(vertices) < 3:
logger.error(f"vertices 개수가 너무 적습니다: {len(vertices)}")
return False
# objects 검증
objects = ground_truth['objects']
if len(objects) == 0:
logger.warning("Ground Truth에 객체가 없습니다")
return False
logger.info(f"Ground Truth 구조 검증 완료: {len(vertices)}개 vertices, {len(objects)}개 객체")
return True
except Exception as e:
logger.error(f"Ground Truth 구조 검증 중 오류: {e}")
return False
def _create_fallback_ground_truth(self, reference_image_path: str) -> Dict:
"""오류 발생 시 대체 Ground Truth 생성"""
logger.warning("대체 Ground Truth 생성 중...")
return {
'vertices': [
[0, 0, 0], [1, 0, 0], [1, 1, 0], [0, 1, 0], # 하단
[0, 0, 1], [1, 0, 1], [1, 1, 1], [0, 1, 1] # 상단
],
'faces': [
[0, 1, 2], [0, 2, 3], # 하단
[4, 7, 6], [4, 6, 5], # 상단
[0, 4, 5], [0, 5, 1], # 앞면
[2, 6, 7], [2, 7, 3], # 뒷면
[0, 3, 7], [0, 7, 4], # 왼쪽
[1, 5, 6], [1, 6, 2] # 오른쪽
],
'center': [0.5, 0.5, 0.5],
'scale': [1.0, 1.0, 1.0],
'objects': [],
'bounding_boxes': [],
'class_labels': [],
'confidence_scores': [],
'metadata': {
'source_image': reference_image_path,
'extraction_method': 'fallback',
'timestamp': self._get_timestamp(),
'version': '1.0',
'error': 'fallback_created'
}
}
def _get_timestamp(self) -> str:
"""현재 시간 스탬프 반환"""
from datetime import datetime
return datetime.now().isoformat()
\ No newline at end of file
import psutil
import logging
from typing import Dict, Any
import time
logger = logging.getLogger(__name__)
class SystemMonitor:
"""시스템 상태 모니터링"""
def __init__(self):
self.start_time = time.time()
self.memory_usage = []
self.cpu_usage = []
def log_system_status(self, stage: str):
"""시스템 상태 로깅"""
memory_percent = psutil.virtual_memory().percent
cpu_percent = psutil.cpu_percent()
self.memory_usage.append(memory_percent)
self.cpu_usage.append(cpu_percent)
logger.info(f"System status at {stage}: CPU {cpu_percent}%, Memory {memory_percent}%")
# 메모리 사용량이 90% 이상이면 경고
if memory_percent > 90:
logger.warning(f"High memory usage detected: {memory_percent}%")
def get_performance_summary(self) -> Dict[str, Any]:
"""성능 요약 반환"""
elapsed_time = time.time() - self.start_time
return {
'elapsed_time': elapsed_time,
'avg_memory_usage': sum(self.memory_usage) / len(self.memory_usage) if self.memory_usage else 0,
'max_memory_usage': max(self.memory_usage) if self.memory_usage else 0,
'avg_cpu_usage': sum(self.cpu_usage) / len(self.cpu_usage) if self.cpu_usage else 0,
'max_cpu_usage': max(self.cpu_usage) if self.cpu_usage else 0
}
"""
시각화 모듈
평가 결과를 시각화하고 대시보드 및 리포트를 생성합니다.
"""
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo
from typing import Dict, List, Optional, Tuple
import os
import json
from datetime import datetime
import trimesh
import open3d as o3d
class Visualizer:
"""시각화를 담당하는 클래스"""
def __init__(self, output_dir: str = "results"):
"""
시각화 도구 초기화
Args:
output_dir (str): 출력 디렉토리
"""
self.output_dir = output_dir
self.images_dir = os.path.join(output_dir, "images")
self.reports_dir = os.path.join(output_dir, "reports")
# 디렉토리 생성
os.makedirs(self.images_dir, exist_ok=True)
os.makedirs(self.reports_dir, exist_ok=True)
# matplotlib 스타일 설정
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
def plot_metrics_comparison(self, results: Dict, save_path: Optional[str] = None) -> None:
"""
평가 지표 비교 차트를 생성합니다.
Args:
results (Dict): 평가 결과
save_path (Optional[str]): 저장 경로
"""
metrics = results['metrics']
score_details = results['score_details']
# 서브플롯 생성
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# 1. 원본 지표 값 막대 차트
metric_names = list(metrics.keys())
metric_values = list(metrics.values())
bars1 = ax1.bar(metric_names, metric_values, color='skyblue', alpha=0.7)
ax1.set_title('원본 지표 값', fontsize=14, fontweight='bold')
ax1.set_ylabel('값')
ax1.tick_params(axis='x', rotation=45)
# 막대 위에 값 표시
for bar, value in zip(bars1, metric_values):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
f'{value:.3f}', ha='center', va='bottom')
# 2. 정규화된 점수 막대 차트
normalized_scores = [details['normalized_score'] for details in score_details.values()]
bars2 = ax2.bar(metric_names, normalized_scores, color='lightgreen', alpha=0.7)
ax2.set_title('정규화된 점수 (0-100)', fontsize=14, fontweight='bold')
ax2.set_ylabel('점수')
ax2.set_ylim(0, 100)
ax2.tick_params(axis='x', rotation=45)
# 막대 위에 값 표시
for bar, score in zip(bars2, normalized_scores):
ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 2,
f'{score:.1f}', ha='center', va='bottom')
# 3. 가중치 적용 점수 막대 차트
weighted_scores = [details['weighted_score'] for details in score_details.values()]
weights = [details['weight'] for details in score_details.values()]
bars3 = ax3.bar(metric_names, weighted_scores, color='orange', alpha=0.7)
ax3.set_title('가중치 적용 점수', fontsize=14, fontweight='bold')
ax3.set_ylabel('가중 점수')
ax3.tick_params(axis='x', rotation=45)
# 막대 위에 가중치 표시
for bar, score, weight in zip(bars3, weighted_scores, weights):
ax3.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
f'{score:.1f}\n(w:{weight})', ha='center', va='bottom', fontsize=9)
# 4. 종합 점수 파이 차트
comprehensive_score = results['comprehensive_score']
grade = results['grade']
# 등급별 색상 설정
grade_colors = {'A': '#2e8b57', 'B': '#4169e1', 'C': '#ffa500', 'D': '#ff6347', 'F': '#dc143c'}
ax4.pie([comprehensive_score, 100 - comprehensive_score],
labels=[f'{grade}등급\n{comprehensive_score:.1f}점', ''],
colors=[grade_colors.get(grade, '#666666'), '#f0f0f0'],
autopct='%1.1f%%', startangle=90)
ax4.set_title('종합 점수', fontsize=14, fontweight='bold')
plt.tight_layout()
# 저장
if save_path is None:
save_path = os.path.join(self.images_dir, f"metrics_comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
print(f"지표 비교 차트가 저장되었습니다: {save_path}")
def visualize_3d_model(self, model: Dict, title: str = "3D Model",
save_path: Optional[str] = None) -> None:
"""
3D 모델을 시각화합니다.
Args:
model (Dict): 3D 모델 정보
title (str): 시각화 제목
save_path (Optional[str]): 저장 경로
"""
try:
vertices = model['vertices']
faces = model['faces']
if len(vertices) == 0:
print("시각화할 정점이 없습니다.")
return
# trimesh를 사용한 3D 시각화
mesh = trimesh.Trimesh(vertices=vertices, faces=faces)
# 3D 플롯 생성
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(111, projection='3d')
# 메시 플롯
ax.plot_trisurf(vertices[:, 0], vertices[:, 1], vertices[:, 2],
triangles=faces, alpha=0.8, cmap='viridis')
# 바운딩 박스 표시
bbox = model.get('bounding_box', None)
if bbox is not None:
self._plot_bounding_box(ax, bbox)
ax.set_title(title, fontsize=14, fontweight='bold')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
# 축 비율 동일하게 설정
self._set_equal_axes(ax, vertices)
# 저장
if save_path is None:
save_path = os.path.join(self.images_dir, f"3d_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
print(f"3D 모델 시각화가 저장되었습니다: {save_path}")
except Exception as e:
print(f"3D 모델 시각화 중 오류 발생: {e}")
def create_performance_dashboard(self, results: Dict) -> str:
"""
성능 대시보드를 생성합니다.
Args:
results (Dict): 평가 결과
Returns:
str: 대시보드 HTML 파일 경로
"""
# Plotly 대시보드 생성
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('종합 점수', '지표별 점수', '가중치 분포', '성능 등급'),
specs=[[{"type": "indicator"}, {"type": "bar"}],
[{"type": "pie"}, {"type": "indicator"}]]
)
# 1. 종합 점수 게이지
comprehensive_score = results['comprehensive_score']
fig.add_trace(
go.Indicator(
mode="gauge+number+delta",
value=comprehensive_score,
domain={'x': [0, 1], 'y': [0, 1]},
title={'text': "종합 점수"},
gauge={
'axis': {'range': [None, 100]},
'bar': {'color': "darkblue"},
'steps': [
{'range': [0, 60], 'color': "lightgray"},
{'range': [60, 80], 'color': "yellow"},
{'range': [80, 100], 'color': "green"}
],
'threshold': {
'line': {'color': "red", 'width': 4},
'thickness': 0.75,
'value': 90
}
}
),
row=1, col=1
)
# 2. 지표별 점수 막대 차트
score_details = results['score_details']
metric_names = list(score_details.keys())
normalized_scores = [details['normalized_score'] for details in score_details.values()]
fig.add_trace(
go.Bar(
x=metric_names,
y=normalized_scores,
name="정규화된 점수",
marker_color='lightblue'
),
row=1, col=2
)
# 3. 가중치 분포 파이 차트
weights = [details['weight'] for details in score_details.values()]
fig.add_trace(
go.Pie(
labels=metric_names,
values=weights,
name="가중치 분포"
),
row=2, col=1
)
# 4. 성능 등급 표시
grade = results['grade']
grade_colors = {'A': '#2e8b57', 'B': '#4169e1', 'C': '#ffa500', 'D': '#ff6347', 'F': '#dc143c'}
fig.add_trace(
go.Indicator(
mode="number",
value=0, # 등급은 텍스트로 표시
title={'text': f"성능 등급: {grade}"},
number={'font': {'size': 50, 'color': grade_colors.get(grade, '#666666')}}
),
row=2, col=2
)
# 레이아웃 업데이트
fig.update_layout(
title_text="3D 객체인식 평가 대시보드",
title_x=0.5,
height=800,
showlegend=False
)
# HTML 파일로 저장
html_file = os.path.join(self.reports_dir, f"dashboard_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
fig.write_html(html_file)
print(f"성능 대시보드가 생성되었습니다: {html_file}")
return html_file
def generate_html_report(self, results: Dict) -> str:
"""
HTML 형식의 상세 리포트를 생성합니다.
Args:
results (Dict): 평가 결과
Returns:
str: HTML 리포트 파일 경로
"""
html_content = f"""
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>3D 객체인식 평가 결과</title>
<style>
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}}
.container {{
max-width: 1200px;
margin: 0 auto;
background-color: white;
padding: 30px;
border-radius: 10px;
box-shadow: 0 0 20px rgba(0,0,0,0.1);
}}
.header {{
text-align: center;
margin-bottom: 30px;
padding: 20px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border-radius: 10px;
}}
.score-section {{
display: flex;
justify-content: space-around;
margin: 30px 0;
padding: 20px;
background-color: #f8f9fa;
border-radius: 10px;
}}
.score-item {{
text-align: center;
}}
.score-value {{
font-size: 2.5em;
font-weight: bold;
color: #2e8b57;
}}
.grade-value {{
font-size: 2em;
font-weight: bold;
color: #4169e1;
}}
.metrics-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
margin: 30px 0;
}}
.metric-card {{
background-color: #f8f9fa;
padding: 20px;
border-radius: 10px;
border-left: 5px solid #007bff;
}}
.metric-name {{
font-weight: bold;
font-size: 1.2em;
color: #333;
}}
.metric-value {{
font-size: 1.5em;
color: #666;
margin: 10px 0;
}}
.metric-details {{
font-size: 0.9em;
color: #888;
}}
.summary-table {{
width: 100%;
border-collapse: collapse;
margin: 30px 0;
}}
.summary-table th, .summary-table td {{
border: 1px solid #ddd;
padding: 12px;
text-align: left;
}}
.summary-table th {{
background-color: #f2f2f2;
font-weight: bold;
}}
.footer {{
text-align: center;
margin-top: 30px;
padding: 20px;
background-color: #f8f9fa;
border-radius: 10px;
color: #666;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>3D 객체인식 평가 결과</h1>
<p>평가 시간: {results['evaluation_timestamp']}</p>
<p>모델 파일: {os.path.basename(results['model_path'])}</p>
<p>참조 파일: {os.path.basename(results['reference_path'])}</p>
</div>
<div class="score-section">
<div class="score-item">
<div class="score-value">{results['comprehensive_score']:.1f}점</div>
<div>종합 점수</div>
</div>
<div class="score-item">
<div class="grade-value">{results['grade']}등급</div>
<div>성능 등급</div>
</div>
</div>
<div class="metrics-grid">
{self._generate_metric_cards_html(results['score_details'])}
</div>
<h2>상세 결과</h2>
<table class="summary-table">
<thead>
<tr>
<th>지표</th>
<th>원본 값</th>
<th>정규화 점수</th>
<th>가중치</th>
<th>가중 점수</th>
</tr>
</thead>
<tbody>
{self._generate_summary_table_html(results['score_details'])}
</tbody>
</table>
<div class="footer">
<p>이 리포트는 3D 객체인식 평가 시스템에 의해 자동 생성되었습니다.</p>
<p>생성 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
</div>
</div>
</body>
</html>
"""
# HTML 파일로 저장
html_file = os.path.join(self.reports_dir, f"evaluation_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
with open(html_file, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"HTML 리포트가 생성되었습니다: {html_file}")
return html_file
def _plot_bounding_box(self, ax, bbox):
"""바운딩 박스를 플롯합니다."""
min_coords, max_coords = bbox
# 바운딩 박스의 8개 꼭짓점
vertices = np.array([
[min_coords[0], min_coords[1], min_coords[2]],
[max_coords[0], min_coords[1], min_coords[2]],
[max_coords[0], max_coords[1], min_coords[2]],
[min_coords[0], max_coords[1], min_coords[2]],
[min_coords[0], min_coords[1], max_coords[2]],
[max_coords[0], min_coords[1], max_coords[2]],
[max_coords[0], max_coords[1], max_coords[2]],
[min_coords[0], max_coords[1], max_coords[2]]
])
# 바운딩 박스의 12개 모서리
edges = [
[0, 1], [1, 2], [2, 3], [3, 0], # 하단
[4, 5], [5, 6], [6, 7], [7, 4], # 상단
[0, 4], [1, 5], [2, 6], [3, 7] # 수직
]
# 모서리 그리기
for edge in edges:
points = vertices[edge]
ax.plot3D(*points.T, color='red', linewidth=2, alpha=0.8)
def _set_equal_axes(self, ax, vertices):
"""축 비율을 동일하게 설정합니다."""
max_range = np.array([vertices[:, 0].max() - vertices[:, 0].min(),
vertices[:, 1].max() - vertices[:, 1].min(),
vertices[:, 2].max() - vertices[:, 2].min()]).max() / 2.0
mid_x = (vertices[:, 0].max() + vertices[:, 0].min()) * 0.5
mid_y = (vertices[:, 1].max() + vertices[:, 1].min()) * 0.5
mid_z = (vertices[:, 2].max() + vertices[:, 2].min()) * 0.5
ax.set_xlim(mid_x - max_range, mid_x + max_range)
ax.set_ylim(mid_y - max_range, mid_y + max_range)
ax.set_zlim(mid_z - max_range, mid_z + max_range)
def _generate_metric_cards_html(self, score_details: Dict) -> str:
"""지표 카드 HTML을 생성합니다."""
html = ""
for metric_name, details in score_details.items():
html += f"""
<div class="metric-card">
<div class="metric-name">{metric_name}</div>
<div class="metric-value">{details['normalized_score']:.1f}점</div>
<div class="metric-details">
원본값: {details['raw_value']:.4f}<br>
가중치: {details['weight']}<br>
임계값: {details['threshold']}
</div>
</div>
"""
return html
def _generate_summary_table_html(self, score_details: Dict) -> str:
"""요약 테이블 HTML을 생성합니다."""
html = ""
for metric_name, details in score_details.items():
html += f"""
<tr>
<td>{metric_name}</td>
<td>{details['raw_value']:.4f}</td>
<td>{details['normalized_score']:.1f}</td>
<td>{details['weight']}</td>
<td>{details['weighted_score']:.2f}</td>
</tr>
"""
return html
def create_comparison_chart(self, results_list: List[Dict], save_path: Optional[str] = None) -> None:
"""
여러 모델의 평가 결과를 비교하는 차트를 생성합니다.
Args:
results_list (List[Dict]): 여러 평가 결과 리스트
save_path (Optional[str]): 저장 경로
"""
if not results_list:
print("비교할 결과가 없습니다.")
return
# 데이터 준비
model_names = [f"Model {i+1}" for i in range(len(results_list))]
comprehensive_scores = [results['comprehensive_score'] for results in results_list]
grades = [results['grade'] for results in results_list]
# 서브플롯 생성
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 1. 종합 점수 비교
bars = ax1.bar(model_names, comprehensive_scores, color='lightblue', alpha=0.7)
ax1.set_title('모델별 종합 점수 비교', fontsize=14, fontweight='bold')
ax1.set_ylabel('종합 점수')
ax1.set_ylim(0, 100)
# 막대 위에 점수 표시
for bar, score in zip(bars, comprehensive_scores):
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
f'{score:.1f}', ha='center', va='bottom')
# 2. 등급 분포
grade_counts = {}
for grade in grades:
grade_counts[grade] = grade_counts.get(grade, 0) + 1
ax2.pie(grade_counts.values(), labels=grade_counts.keys(), autopct='%1.1f%%')
ax2.set_title('성능 등급 분포', fontsize=14, fontweight='bold')
plt.tight_layout()
# 저장
if save_path is None:
save_path = os.path.join(self.images_dir, f"model_comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png")
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
print(f"모델 비교 차트가 저장되었습니다: {save_path}")
#!/usr/bin/env python3
"""
임계값 수정 테스트 스크립트
chamfer_distance와 emd의 새로운 정규화 로직을 테스트합니다.
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
from evaluator import normalize_score_improved
def test_normalization():
"""정규화 함수 테스트"""
# 테스트 케이스: 실제 계산된 값들
test_cases = [
{
'metric_name': 'chamfer_distance',
'metric_value': 0.2777,
'threshold': 0.3, # 새로운 임계값
'description': 'chamfer_distance: 0.2777 (새 임계값 0.3)'
},
{
'metric_name': 'emd',
'metric_value': 0.2265,
'threshold': 0.3, # 새로운 임계값
'description': 'emd: 0.2265 (새 임계값 0.3)'
},
{
'metric_name': 'chamfer_distance',
'metric_value': 0.2777,
'threshold': 0.1, # 기존 임계값 (비교용)
'description': 'chamfer_distance: 0.2777 (기존 임계값 0.1)'
},
{
'metric_name': 'emd',
'metric_value': 0.2265,
'threshold': 0.1, # 기존 임계값 (비교용)
'description': 'emd: 0.2265 (기존 임계값 0.1)'
}
]
print("=== 정규화 점수 테스트 결과 ===\n")
for case in test_cases:
score = normalize_score_improved(
case['metric_name'],
case['metric_value'],
case['threshold']
)
ratio = case['metric_value'] / case['threshold']
print(f"{case['description']}")
print(f" 원본 값: {case['metric_value']}")
print(f" 임계값: {case['threshold']}")
print(f" 비율: {ratio:.3f}")
print(f" 정규화 점수: {score:.1f}점")
print()
# 추가 테스트 케이스들
additional_cases = [
('chamfer_distance', 0.15, 0.3, '임계값의 절반 (우수)'),
('chamfer_distance', 0.3, 0.3, '임계값과 동일 (양호)'),
('chamfer_distance', 0.6, 0.3, '임계값의 2배 (보통)'),
('chamfer_distance', 1.0, 0.3, '임계값의 3.33배 (낮음)'),
('emd', 0.15, 0.3, '임계값의 절반 (우수)'),
('emd', 0.3, 0.3, '임계값과 동일 (양호)'),
('emd', 0.6, 0.3, '임계값의 2배 (보통)'),
('emd', 1.0, 0.3, '임계값의 3.33배 (낮음)'),
]
print("=== 추가 테스트 케이스 ===\n")
for metric_name, value, threshold, description in additional_cases:
score = normalize_score_improved(metric_name, value, threshold)
ratio = value / threshold
print(f"{description}")
print(f" {metric_name}: {value} (임계값: {threshold})")
print(f" 비율: {ratio:.3f}")
print(f" 정규화 점수: {score:.1f}점")
print()
if __name__ == "__main__":
test_normalization()
"""
자동화된 테스트 시스템
단위 테스트, 통합 테스트, 성능 테스트를 포함합니다.
"""
import unittest
import numpy as np
import tempfile
import os
import sys
import time
import logging
from typing import Dict, List, Any
# 프로젝트 루트를 Python 경로에 추가
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.evaluator import Evaluator
from src.utils.reference_extractor import ReferenceDataExtractor
from src.metrics.empir3d_evaluator import Empir3DEvaluator
from src.metrics.deep_emd_calculator import DeepEMDCalculator
from src.renderer.improved_renderer import ImprovedRenderer
from src.generators.scene_dreamer import SceneDreamer
from src.integration.scene_detection import SceneDetectionSystem
logger = logging.getLogger(__name__)
class TestDataGenerator:
"""테스트 데이터 생성기"""
@staticmethod
def generate_test_3d_model(num_vertices: int = 100) -> Dict:
"""테스트용 3D 모델 생성"""
vertices = []
for _ in range(num_vertices):
vertex = [
np.random.uniform(-1, 1),
np.random.uniform(-1, 1),
np.random.uniform(-1, 1)
]
vertices.append(vertex)
# 간단한 면 생성
faces = []
for i in range(0, num_vertices - 2, 3):
faces.append([i, i + 1, i + 2])
return {
'vertices': vertices,
'faces': faces,
'center': [0.0, 0.0, 0.0],
'scale': [1.0, 1.0, 1.0]
}
@staticmethod
def generate_test_image(width: int = 512, height: int = 512) -> np.ndarray:
"""테스트용 이미지 생성"""
image = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
return image
@staticmethod
def generate_test_reference_data() -> Dict:
"""테스트용 참조 데이터 생성"""
return {
'objects': [
{
'bbox': [100, 100, 200, 200],
'confidence': 0.9,
'class': 0
}
],
'bounding_boxes': [[100, 100, 200, 200]],
'class_labels': [0],
'confidence_scores': [0.9]
}
class UnitTests(unittest.TestCase):
"""단위 테스트"""
def setUp(self):
"""테스트 설정"""
self.test_data_generator = TestDataGenerator()
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
"""테스트 정리"""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_reference_extractor_initialization(self):
"""참조 데이터 추출기 초기화 테스트"""
extractor = ReferenceDataExtractor()
self.assertIsNotNone(extractor)
self.assertIsNotNone(extractor.detector)
def test_ground_truth_creation(self):
"""Ground Truth 생성 테스트"""
extractor = ReferenceDataExtractor()
test_image = self.test_data_generator.generate_test_image()
# 임시 이미지 파일 생성
temp_image_path = os.path.join(self.temp_dir, 'test_image.jpg')
import cv2
cv2.imwrite(temp_image_path, test_image)
# Ground Truth 생성
ground_truth = extractor.create_unified_ground_truth(temp_image_path)
# 검증
self.assertIn('vertices', ground_truth)
self.assertIn('faces', ground_truth)
self.assertIn('center', ground_truth)
self.assertIn('scale', ground_truth)
self.assertIsInstance(ground_truth['vertices'], list)
self.assertIsInstance(ground_truth['faces'], list)
def test_empir3d_evaluator_initialization(self):
"""Empir3D 평가기 초기화 테스트"""
evaluator = Empir3DEvaluator()
self.assertIsNotNone(evaluator)
self.assertIsNotNone(evaluator.resolution_calculator)
self.assertIsNotNone(evaluator.accuracy_calculator)
self.assertIsNotNone(evaluator.coverage_calculator)
self.assertIsNotNone(evaluator.artifact_calculator)
def test_empir3d_evaluation(self):
"""Empir3D 평가 테스트"""
evaluator = Empir3DEvaluator()
model_3d = self.test_data_generator.generate_test_3d_model()
ground_truth_3d = self.test_data_generator.generate_test_3d_model()
result = evaluator.evaluate_3d_model(model_3d, ground_truth_3d)
# 검증
self.assertIn('empir3d_score', result)
self.assertIn('resolution', result)
self.assertIn('accuracy', result)
self.assertIn('coverage', result)
self.assertIn('artifact', result)
self.assertIsInstance(result['empir3d_score'], (int, float))
def test_deep_emd_calculator_initialization(self):
"""DeepEMD 계산기 초기화 테스트"""
calculator = DeepEMDCalculator()
self.assertIsNotNone(calculator)
self.assertIsNotNone(calculator.transformer_model)
def test_improved_renderer_initialization(self):
"""개선된 렌더러 초기화 테스트"""
renderer = ImprovedRenderer()
self.assertIsNotNone(renderer)
self.assertIsNotNone(renderer.quality_enhancer)
self.assertIsNotNone(renderer.edge_detector)
def test_scene_dreamer_initialization(self):
"""SceneDreamer 초기화 테스트"""
dreamer = SceneDreamer()
self.assertIsNotNone(dreamer)
self.assertIsNotNone(dreamer.scene_encoder)
self.assertIsNotNone(dreamer.3d_generator)
self.assertIsNotNone(dreamer.view_synthesizer)
def test_scene_detection_system_initialization(self):
"""장면 탐지 시스템 초기화 테스트"""
detector = SceneDetectionSystem()
self.assertIsNotNone(detector)
self.assertIsNotNone(detector.object_detector)
self.assertIsNotNone(detector.scene_analyzer)
self.assertIsNotNone(detector.spatial_analyzer)
class IntegrationTests(unittest.TestCase):
"""통합 테스트"""
def setUp(self):
"""테스트 설정"""
self.test_data_generator = TestDataGenerator()
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
"""테스트 정리"""
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
def test_full_evaluation_pipeline(self):
"""전체 평가 파이프라인 테스트"""
# 평가기 초기화
evaluator = Evaluator()
# 테스트 데이터 생성
model_3d = self.test_data_generator.generate_test_3d_model()
test_image = self.test_data_generator.generate_test_image()
# 임시 파일 생성
temp_model_path = os.path.join(self.temp_dir, 'test_model.glb')
temp_image_path = os.path.join(self.temp_dir, 'test_image.jpg')
# 모델 데이터 저장 (실제로는 GLB 파일이어야 함)
import json
with open(temp_model_path, 'w') as f:
json.dump(model_3d, f)
import cv2
cv2.imwrite(temp_image_path, test_image)
# 평가 실행
try:
result = evaluator.evaluate_model(temp_model_path, temp_image_path)
# 검증
self.assertIn('comprehensive_score', result)
self.assertIn('grade', result)
self.assertIn('metrics', result)
self.assertIsInstance(result['comprehensive_score'], (int, float))
self.assertIsInstance(result['grade'], str)
except Exception as e:
# 일부 의존성이 없을 수 있으므로 예외를 로깅만 함
logger.warning(f"전체 평가 파이프라인 테스트에서 예외 발생: {str(e)}")
def test_empir3d_integration(self):
"""Empir3D 통합 테스트"""
evaluator = Empir3DEvaluator()
model_3d = self.test_data_generator.generate_test_3d_model()
ground_truth_3d = self.test_data_generator.generate_test_3d_model()
result = evaluator.evaluate_3d_model(model_3d, ground_truth_3d)
# 모든 지표가 계산되었는지 확인
self.assertIn('empir3d_score', result)
self.assertIn('resolution', result)
self.assertIn('accuracy', result)
self.assertIn('coverage', result)
self.assertIn('artifact', result)
# 각 지표의 점수가 유효한지 확인
for metric in ['resolution', 'accuracy', 'coverage', 'artifact']:
self.assertIn('score', result[metric])
self.assertIsInstance(result[metric]['score'], (int, float))
self.assertGreaterEqual(result[metric]['score'], 0.0)
self.assertLessEqual(result[metric]['score'], 1.0)
def test_scene_dreamer_integration(self):
"""SceneDreamer 통합 테스트"""
dreamer = SceneDreamer()
test_image = self.test_data_generator.generate_test_image()
result = dreamer.generate_3d_scene(test_image, num_views=4)
# 검증
self.assertIn('3d_model', result)
self.assertIn('multi_views', result)
self.assertIn('scene_latent', result)
self.assertIn('metadata', result)
# 3D 모델 검증
model_3d = result['3d_model']
self.assertIn('vertices', model_3d)
self.assertIn('faces', model_3d)
self.assertIsInstance(model_3d['vertices'], list)
self.assertIsInstance(model_3d['faces'], list)
def test_scene_detection_integration(self):
"""장면 탐지 통합 테스트"""
detector = SceneDetectionSystem()
test_image = self.test_data_generator.generate_test_image()
result = detector.detect_scene_objects(test_image)
# 검증
self.assertIn('detected_objects', result)
self.assertIn('scene_analysis', result)
self.assertIn('spatial_relations', result)
self.assertIn('metadata', result)
# 메타데이터 검증
metadata = result['metadata']
self.assertIn('num_objects', metadata)
self.assertIn('scene_complexity', metadata)
self.assertIn('detection_method', metadata)
class PerformanceTests(unittest.TestCase):
"""성능 테스트"""
def setUp(self):
"""테스트 설정"""
self.test_data_generator = TestDataGenerator()
def test_evaluation_performance(self):
"""평가 성능 테스트"""
evaluator = Empir3DEvaluator()
model_3d = self.test_data_generator.generate_test_3d_model(1000)
ground_truth_3d = self.test_data_generator.generate_test_3d_model(1000)
# 성능 측정
start_time = time.time()
result = evaluator.evaluate_3d_model(model_3d, ground_truth_3d)
end_time = time.time()
execution_time = end_time - start_time
# 성능 검증 (5초 이내 완료)
self.assertLess(execution_time, 5.0)
logger.info(f"Empir3D 평가 실행 시간: {execution_time:.2f}초")
def test_rendering_performance(self):
"""렌더링 성능 테스트"""
renderer = ImprovedRenderer()
model_3d = self.test_data_generator.generate_test_3d_model(500)
# 성능 측정
start_time = time.time()
rendered_images = renderer.render_multiple_views(model_3d, num_views=8)
end_time = time.time()
execution_time = end_time - start_time
# 성능 검증 (10초 이내 완료)
self.assertLess(execution_time, 10.0)
logger.info(f"렌더링 실행 시간: {execution_time:.2f}초")
logger.info(f"생성된 이미지 수: {len(rendered_images)}")
def test_memory_usage(self):
"""메모리 사용량 테스트"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
# 대용량 데이터 처리
evaluator = Empir3DEvaluator()
model_3d = self.test_data_generator.generate_test_3d_model(5000)
ground_truth_3d = self.test_data_generator.generate_test_3d_model(5000)
result = evaluator.evaluate_3d_model(model_3d, ground_truth_3d)
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_increase = final_memory - initial_memory
# 메모리 사용량 검증 (500MB 이내 증가)
self.assertLess(memory_increase, 500.0)
logger.info(f"메모리 사용량 증가: {memory_increase:.2f}MB")
class SmokeTests(unittest.TestCase):
"""스모크 테스트"""
def test_basic_functionality(self):
"""기본 기능 테스트"""
# 모든 주요 컴포넌트가 초기화되는지 확인
components = [
ReferenceDataExtractor(),
Empir3DEvaluator(),
DeepEMDCalculator(),
ImprovedRenderer(),
SceneDreamer(),
SceneDetectionSystem()
]
for component in components:
self.assertIsNotNone(component)
def test_error_handling(self):
"""오류 처리 테스트"""
evaluator = Empir3DEvaluator()
# 잘못된 입력으로 테스트
invalid_model = {}
invalid_gt = {}
result = evaluator.evaluate_3d_model(invalid_model, invalid_gt)
# 오류가 발생해도 결과가 반환되는지 확인
self.assertIsNotNone(result)
self.assertIn('empir3d_score', result)
def test_configuration_loading(self):
"""설정 로딩 테스트"""
from config.evaluation_config import EVALUATION_CONFIG
# 기본 설정이 로드되는지 확인
self.assertIsNotNone(EVALUATION_CONFIG)
self.assertIn('weights', EVALUATION_CONFIG)
self.assertIn('thresholds', EVALUATION_CONFIG)
def run_all_tests():
"""모든 테스트 실행"""
# 테스트 스위트 생성
test_suite = unittest.TestSuite()
# 테스트 클래스 추가
test_classes = [
UnitTests,
IntegrationTests,
PerformanceTests,
SmokeTests
]
for test_class in test_classes:
tests = unittest.TestLoader().loadTestsFromTestCase(test_class)
test_suite.addTests(tests)
# 테스트 실행
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 모든 테스트 실행
success = run_all_tests()
if success:
print("\n✅ 모든 테스트가 성공적으로 완료되었습니다!")
sys.exit(0)
else:
print("\n❌ 일부 테스트가 실패했습니다.")
sys.exit(1)
"""
테스트 커버리지 분석
코드 커버리지를 측정하고 분석합니다.
"""
import unittest
import coverage
import os
import sys
import logging
from typing import Dict, List, Any
# 프로젝트 루트를 Python 경로에 추가
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
logger = logging.getLogger(__name__)
class CoverageAnalyzer:
"""코드 커버리지 분석기"""
def __init__(self, source_dir: str = 'src', exclude_patterns: List[str] = None):
"""
커버리지 분석기 초기화
Args:
source_dir (str): 소스 코드 디렉토리
exclude_patterns (List[str]): 제외할 패턴 리스트
"""
self.source_dir = source_dir
self.exclude_patterns = exclude_patterns or [
'*/tests/*',
'*/test_*',
'*/__pycache__/*',
'*/migrations/*',
'*/venv/*',
'*/env/*',
'*/site-packages/*'
]
# 커버리지 설정
self.cov = coverage.Coverage(
source=[source_dir],
omit=self.exclude_patterns,
branch=True,
include=f'{source_dir}/*'
)
def start_coverage(self):
"""커버리지 측정 시작"""
self.cov.start()
logger.info("커버리지 측정을 시작했습니다.")
def stop_coverage(self):
"""커버리지 측정 중지"""
self.cov.stop()
logger.info("커버리지 측정을 중지했습니다.")
def generate_report(self, output_dir: str = 'coverage_report') -> Dict[str, Any]:
"""
커버리지 리포트 생성
Args:
output_dir (str): 리포트 출력 디렉토리
Returns:
Dict[str, Any]: 커버리지 리포트 데이터
"""
try:
# 출력 디렉토리 생성
os.makedirs(output_dir, exist_ok=True)
# HTML 리포트 생성
html_report_path = os.path.join(output_dir, 'htmlcov')
self.cov.html_report(directory=html_report_path)
# XML 리포트 생성
xml_report_path = os.path.join(output_dir, 'coverage.xml')
self.cov.xml_report(outfile=xml_report_path)
# JSON 리포트 생성
json_report_path = os.path.join(output_dir, 'coverage.json')
self.cov.json_report(outfile=json_report_path)
# 텍스트 리포트 생성
text_report_path = os.path.join(output_dir, 'coverage.txt')
with open(text_report_path, 'w') as f:
self.cov.report(file=f)
# 커버리지 데이터 수집
coverage_data = self._collect_coverage_data()
logger.info(f"커버리지 리포트가 생성되었습니다: {output_dir}")
return coverage_data
except Exception as e:
logger.error(f"커버리지 리포트 생성 중 오류: {str(e)}")
return {}
def _collect_coverage_data(self) -> Dict[str, Any]:
"""커버리지 데이터 수집"""
try:
# 전체 커버리지 통계
total_coverage = self.cov.report()
# 파일별 커버리지 데이터
file_coverage = {}
for filename in self.cov.get_data().measured_files():
if filename.startswith(os.path.abspath(self.source_dir)):
relative_path = os.path.relpath(filename, os.path.abspath(self.source_dir))
file_coverage[relative_path] = self._get_file_coverage(filename)
# 모듈별 커버리지 통계
module_stats = self._get_module_stats()
return {
'total_coverage': total_coverage,
'file_coverage': file_coverage,
'module_stats': module_stats,
'summary': self._generate_summary(total_coverage, file_coverage)
}
except Exception as e:
logger.error(f"커버리지 데이터 수집 중 오류: {str(e)}")
return {}
def _get_file_coverage(self, filename: str) -> Dict[str, Any]:
"""파일별 커버리지 정보"""
try:
analysis = self.cov.analysis2(filename)
executed_lines = analysis[1]
missing_lines = analysis[2]
excluded_lines = analysis[3]
total_lines = len(executed_lines) + len(missing_lines) + len(excluded_lines)
covered_lines = len(executed_lines)
if total_lines > 0:
coverage_percent = (covered_lines / total_lines) * 100
else:
coverage_percent = 100.0
return {
'total_lines': total_lines,
'covered_lines': covered_lines,
'missing_lines': len(missing_lines),
'excluded_lines': len(excluded_lines),
'coverage_percent': coverage_percent,
'missing_line_numbers': list(missing_lines)
}
except Exception as e:
logger.error(f"파일 커버리지 분석 중 오류: {str(e)}")
return {}
def _get_module_stats(self) -> Dict[str, Any]:
"""모듈별 통계"""
try:
module_stats = {}
for filename in self.cov.get_data().measured_files():
if filename.startswith(os.path.abspath(self.source_dir)):
relative_path = os.path.relpath(filename, os.path.abspath(self.source_dir))
module_name = relative_path.replace(os.sep, '.').replace('.py', '')
file_coverage = self._get_file_coverage(filename)
module_stats[module_name] = file_coverage
return module_stats
except Exception as e:
logger.error(f"모듈 통계 수집 중 오류: {str(e)}")
return {}
def _generate_summary(self, total_coverage: float, file_coverage: Dict[str, Any]) -> Dict[str, Any]:
"""커버리지 요약 생성"""
try:
# 파일별 커버리지 통계
coverage_percentages = [data['coverage_percent'] for data in file_coverage.values()]
if coverage_percentages:
avg_coverage = sum(coverage_percentages) / len(coverage_percentages)
min_coverage = min(coverage_percentages)
max_coverage = max(coverage_percentages)
else:
avg_coverage = 0.0
min_coverage = 0.0
max_coverage = 0.0
# 커버리지 등급
if total_coverage >= 90:
grade = 'A'
elif total_coverage >= 80:
grade = 'B'
elif total_coverage >= 70:
grade = 'C'
elif total_coverage >= 60:
grade = 'D'
else:
grade = 'F'
# 낮은 커버리지 파일 식별
low_coverage_files = [
filename for filename, data in file_coverage.items()
if data['coverage_percent'] < 70
]
return {
'total_coverage': total_coverage,
'average_coverage': avg_coverage,
'min_coverage': min_coverage,
'max_coverage': max_coverage,
'grade': grade,
'total_files': len(file_coverage),
'low_coverage_files': low_coverage_files,
'low_coverage_count': len(low_coverage_files)
}
except Exception as e:
logger.error(f"요약 생성 중 오류: {str(e)}")
return {}
def check_coverage_threshold(self, threshold: float = 80.0) -> bool:
"""
커버리지 임계값 확인
Args:
threshold (float): 임계값 (기본값: 80%)
Returns:
bool: 임계값 달성 여부
"""
try:
total_coverage = self.cov.report()
return total_coverage >= threshold
except Exception as e:
logger.error(f"커버리지 임계값 확인 중 오류: {str(e)}")
return False
class CoverageTestRunner:
"""커버리지 테스트 실행기"""
def __init__(self, source_dir: str = 'src', test_dir: str = 'tests'):
"""
커버리지 테스트 실행기 초기화
Args:
source_dir (str): 소스 코드 디렉토리
test_dir (str): 테스트 디렉토리
"""
self.source_dir = source_dir
self.test_dir = test_dir
self.analyzer = CoverageAnalyzer(source_dir)
def run_coverage_tests(self, output_dir: str = 'coverage_report') -> Dict[str, Any]:
"""
커버리지 테스트 실행
Args:
output_dir (str): 리포트 출력 디렉토리
Returns:
Dict[str, Any]: 테스트 결과 및 커버리지 데이터
"""
try:
logger.info("커버리지 테스트를 시작합니다...")
# 커버리지 측정 시작
self.analyzer.start_coverage()
# 테스트 실행
test_result = self._run_tests()
# 커버리지 측정 중지
self.analyzer.stop_coverage()
# 커버리지 리포트 생성
coverage_data = self.analyzer.generate_report(output_dir)
# 결과 통합
result = {
'test_result': test_result,
'coverage_data': coverage_data,
'success': test_result['success'] and self.analyzer.check_coverage_threshold()
}
logger.info("커버리지 테스트가 완료되었습니다.")
return result
except Exception as e:
logger.error(f"커버리지 테스트 실행 중 오류: {str(e)}")
return {'success': False, 'error': str(e)}
def _run_tests(self) -> Dict[str, Any]:
"""테스트 실행"""
try:
import subprocess
# pytest 실행
cmd = [
sys.executable, '-m', 'pytest',
self.test_dir,
'-v',
'--tb=short',
'--maxfail=10'
]
result = subprocess.run(cmd, capture_output=True, text=True)
return {
'success': result.returncode == 0,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except Exception as e:
logger.error(f"테스트 실행 중 오류: {str(e)}")
return {'success': False, 'error': str(e)}
class CoverageReportGenerator:
"""커버리지 리포트 생성기"""
def __init__(self, coverage_data: Dict[str, Any]):
"""
커버리지 리포트 생성기 초기화
Args:
coverage_data (Dict[str, Any]): 커버리지 데이터
"""
self.coverage_data = coverage_data
def generate_markdown_report(self, output_file: str = 'coverage_report.md'):
"""마크다운 리포트 생성"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write("# 코드 커버리지 리포트\n\n")
# 요약 정보
summary = self.coverage_data.get('summary', {})
f.write("## 요약\n\n")
f.write(f"- **전체 커버리지**: {summary.get('total_coverage', 0):.1f}%\n")
f.write(f"- **평균 커버리지**: {summary.get('average_coverage', 0):.1f}%\n")
f.write(f"- **최소 커버리지**: {summary.get('min_coverage', 0):.1f}%\n")
f.write(f"- **최대 커버리지**: {summary.get('max_coverage', 0):.1f}%\n")
f.write(f"- **등급**: {summary.get('grade', 'F')}\n")
f.write(f"- **총 파일 수**: {summary.get('total_files', 0)}\n")
f.write(f"- **낮은 커버리지 파일 수**: {summary.get('low_coverage_count', 0)}\n\n")
# 파일별 커버리지
f.write("## 파일별 커버리지\n\n")
f.write("| 파일 | 커버리지 | 총 라인 | 커버된 라인 | 누락된 라인 |\n")
f.write("|------|----------|---------|-------------|-------------|\n")
file_coverage = self.coverage_data.get('file_coverage', {})
for filename, data in sorted(file_coverage.items()):
f.write(f"| {filename} | {data['coverage_percent']:.1f}% | "
f"{data['total_lines']} | {data['covered_lines']} | "
f"{data['missing_lines']} |\n")
# 낮은 커버리지 파일
low_coverage_files = summary.get('low_coverage_files', [])
if low_coverage_files:
f.write("\n## 낮은 커버리지 파일 (< 70%)\n\n")
for filename in low_coverage_files:
data = file_coverage.get(filename, {})
f.write(f"- **{filename}**: {data.get('coverage_percent', 0):.1f}%\n")
# 권장사항
f.write("\n## 권장사항\n\n")
if summary.get('total_coverage', 0) < 80:
f.write("- 전체 커버리지를 80% 이상으로 향상시키세요.\n")
if low_coverage_files:
f.write("- 낮은 커버리지 파일에 대한 테스트를 추가하세요.\n")
if summary.get('grade', 'F') in ['D', 'F']:
f.write("- 코드 품질을 개선하기 위해 더 많은 테스트를 작성하세요.\n")
logger.info(f"마크다운 리포트가 생성되었습니다: {output_file}")
except Exception as e:
logger.error(f"마크다운 리포트 생성 중 오류: {str(e)}")
def generate_json_report(self, output_file: str = 'coverage_report.json'):
"""JSON 리포트 생성"""
try:
import json
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.coverage_data, f, indent=2, ensure_ascii=False)
logger.info(f"JSON 리포트가 생성되었습니다: {output_file}")
except Exception as e:
logger.error(f"JSON 리포트 생성 중 오류: {str(e)}")
def main():
"""메인 함수"""
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 커버리지 테스트 실행
runner = CoverageTestRunner()
result = runner.run_coverage_tests()
if result['success']:
print("✅ 커버리지 테스트가 성공적으로 완료되었습니다!")
# 리포트 생성
coverage_data = result['coverage_data']
report_generator = CoverageReportGenerator(coverage_data)
report_generator.generate_markdown_report()
report_generator.generate_json_report()
# 요약 출력
summary = coverage_data.get('summary', {})
print(f"\n📊 커버리지 요약:")
print(f" 전체 커버리지: {summary.get('total_coverage', 0):.1f}%")
print(f" 등급: {summary.get('grade', 'F')}")
print(f" 총 파일 수: {summary.get('total_files', 0)}")
print(f" 낮은 커버리지 파일 수: {summary.get('low_coverage_count', 0)}")
else:
print("❌ 커버리지 테스트가 실패했습니다.")
if 'error' in result:
print(f"오류: {result['error']}")
return result['success']
if __name__ == '__main__':
success = main()
sys.exit(0 if success else 1)
"""
평가 엔진 테스트 모듈
종합 평가 엔진의 기능을 검증합니다.
"""
import unittest
import numpy as np
import sys
import os
import tempfile
import shutil
from pathlib import Path
# 프로젝트 루트를 Python 경로에 추가
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.evaluator import Evaluator
from config.evaluation_config import EVALUATION_CONFIG
class TestEvaluator(unittest.TestCase):
"""평가 엔진 테스트"""
def setUp(self):
"""테스트 설정"""
# 임시 디렉토리 생성
self.temp_dir = tempfile.mkdtemp()
# 테스트용 더미 파일 생성
self.create_dummy_files()
# 평가 엔진 초기화
self.evaluator = Evaluator()
def tearDown(self):
"""테스트 정리"""
# 임시 디렉토리 삭제
shutil.rmtree(self.temp_dir)
def create_dummy_files(self):
"""테스트용 더미 파일 생성"""
# 더미 GLB 파일 생성 (실제로는 간단한 텍스트 파일)
self.model_path = os.path.join(self.temp_dir, "test_model.glb")
with open(self.model_path, 'w') as f:
f.write("dummy glb content")
# 더미 이미지 파일 생성 (실제로는 간단한 텍스트 파일)
self.reference_path = os.path.join(self.temp_dir, "test_image.png")
with open(self.reference_path, 'w') as f:
f.write("dummy image content")
def test_evaluator_initialization(self):
"""평가 엔진 초기화 테스트"""
self.assertIsNotNone(self.evaluator)
self.assertIsNotNone(self.evaluator.data_loader)
self.assertIsNotNone(self.evaluator.renderer)
self.assertIsNotNone(self.evaluator.map_2d_calculator)
self.assertIsNotNone(self.evaluator.map_3d_calculator)
self.assertIsNotNone(self.evaluator.chamfer_calculator)
self.assertIsNotNone(self.evaluator.emd_calculator)
self.assertIsNotNone(self.evaluator.class_accuracy_calculator)
def test_calculate_comprehensive_score(self):
"""종합 점수 계산 테스트"""
# 테스트용 메트릭 데이터
metrics = {
'2d_map': 0.8,
'3d_map': 0.7,
'chamfer_distance': 0.01,
'emd': 0.02,
'class_accuracy': 0.85
}
score, details = self.evaluator.calculate_comprehensive_score(metrics)
# 종합 점수는 0과 100 사이의 값이어야 함
self.assertGreaterEqual(score, 0.0)
self.assertLessEqual(score, 100.0)
# 세부사항은 딕셔너리여야 함
self.assertIsInstance(details, dict)
self.assertEqual(len(details), len(metrics))
def test_determine_grade(self):
"""성능 등급 결정 테스트"""
# A등급 테스트
grade_a = self.evaluator.determine_grade(95.0)
self.assertEqual(grade_a, 'A')
# B등급 테스트
grade_b = self.evaluator.determine_grade(85.0)
self.assertEqual(grade_b, 'B')
# C등급 테스트
grade_c = self.evaluator.determine_grade(75.0)
self.assertEqual(grade_c, 'C')
# D등급 테스트
grade_d = self.evaluator.determine_grade(65.0)
self.assertEqual(grade_d, 'D')
# F등급 테스트
grade_f = self.evaluator.determine_grade(50.0)
self.assertEqual(grade_f, 'F')
def test_validate_input_files(self):
"""입력 파일 검증 테스트"""
# 유효한 파일들
valid = self.evaluator._validate_input_files(self.model_path, self.reference_path)
self.assertTrue(valid)
# 존재하지 않는 파일
invalid = self.evaluator._validate_input_files("nonexistent.glb", "nonexistent.png")
self.assertFalse(invalid)
def test_create_3d_ground_truth(self):
"""3D Ground Truth 생성 테스트"""
# 테스트용 3D 모델
model_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3)),
'center': np.array([0, 0, 0]),
'scale': np.array([1, 1, 1]),
'bounding_box': (np.array([-1, -1, -1]), np.array([1, 1, 1]))
}
reference_image = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8)
ground_truth = self.evaluator._create_3d_ground_truth(model_3d, reference_image)
# Ground Truth는 딕셔너리여야 함
self.assertIsInstance(ground_truth, dict)
self.assertIn('vertices', ground_truth)
self.assertIn('faces', ground_truth)
self.assertIn('classes', ground_truth)
self.assertIn('bounding_boxes', ground_truth)
def test_generate_report(self):
"""리포트 생성 테스트"""
# 테스트용 결과 데이터
results = {
'comprehensive_score': 85.5,
'grade': 'B',
'evaluation_timestamp': '2024-01-01T00:00:00',
'model_path': self.model_path,
'reference_path': self.reference_path,
'metrics': {
'2d_map': 0.8,
'3d_map': 0.7,
'chamfer_distance': 0.01,
'emd': 0.02,
'class_accuracy': 0.85
},
'score_details': {
'2d_map': {
'raw_value': 0.8,
'normalized_score': 80.0,
'weight': 0.25,
'weighted_score': 20.0,
'threshold': 0.8
}
}
}
# HTML 리포트 생성
html_report = self.evaluator.generate_report(results)
self.assertIsInstance(html_report, str)
self.assertIn('<!DOCTYPE html>', html_report)
self.assertIn('3D 객체인식 평가 결과', html_report)
# JSON 리포트 생성
self.evaluator.config['output']['report_format'] = 'json'
json_report = self.evaluator.generate_report(results)
self.assertIsInstance(json_report, str)
# 텍스트 리포트 생성
self.evaluator.config['output']['report_format'] = 'txt'
text_report = self.evaluator.generate_report(results)
self.assertIsInstance(text_report, str)
self.assertIn('TRELLIS 모델 평가 결과', text_report)
class TestConfiguration(unittest.TestCase):
"""설정 테스트"""
def test_evaluation_config_structure(self):
"""평가 설정 구조 테스트"""
config = EVALUATION_CONFIG
# 필수 키들이 존재하는지 확인
required_keys = [
'weights', 'thresholds', 'rendering', 'pointcloud',
'classification', 'grade_thresholds', 'iou_thresholds',
'output', 'file_paths', 'logging', 'performance', 'validation'
]
for key in required_keys:
self.assertIn(key, config)
def test_weights_sum(self):
"""가중치 합이 1.0인지 테스트"""
weights = EVALUATION_CONFIG['weights']
weight_sum = sum(weights.values())
# 가중치 합은 1.0에 가까워야 함
self.assertAlmostEqual(weight_sum, 1.0, places=6)
def test_thresholds_range(self):
"""임계값이 유효한 범위인지 테스트"""
thresholds = EVALUATION_CONFIG['thresholds']
for metric, threshold in thresholds.items():
# 임계값은 0과 1 사이의 값이어야 함
self.assertGreaterEqual(threshold, 0.0)
self.assertLessEqual(threshold, 1.0)
def test_grade_thresholds_order(self):
"""등급 임계값이 올바른 순서인지 테스트"""
grade_thresholds = EVALUATION_CONFIG['grade_thresholds']
# 등급 임계값은 내림차순이어야 함
values = list(grade_thresholds.values())
self.assertEqual(values, sorted(values, reverse=True))
if __name__ == '__main__':
# 테스트 실행
unittest.main(verbosity=2)
"""
평가 지표 테스트 모듈
각 평가 지표의 정확성을 검증합니다.
"""
import unittest
import numpy as np
import sys
import os
from pathlib import Path
# 프로젝트 루트를 Python 경로에 추가
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.metrics.map_2d import Map2DCalculator
from src.metrics.map_3d import Map3DCalculator
from src.metrics.chamfer_distance import ChamferDistanceCalculator
from src.metrics.emd import EMCalculator
from src.metrics.class_accuracy import ClassAccuracyCalculator
class TestMap2DCalculator(unittest.TestCase):
"""2D mAP 계산기 테스트"""
def setUp(self):
"""테스트 설정"""
self.calculator = Map2DCalculator()
# 테스트용 더미 이미지 생성
self.test_image = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8)
self.rendered_images = [
np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8)
for _ in range(4)
]
def test_compute_iou(self):
"""IoU 계산 테스트"""
box1 = np.array([10, 10, 50, 50])
box2 = np.array([30, 30, 70, 70])
iou = self.calculator.compute_iou(box1, box2)
# IoU는 0과 1 사이의 값이어야 함
self.assertGreaterEqual(iou, 0.0)
self.assertLessEqual(iou, 1.0)
def test_compute_iou_no_overlap(self):
"""겹치지 않는 박스의 IoU 테스트"""
box1 = np.array([0, 0, 10, 10])
box2 = np.array([20, 20, 30, 30])
iou = self.calculator.compute_iou(box1, box2)
# 겹치지 않는 경우 IoU는 0이어야 함
self.assertEqual(iou, 0.0)
def test_compute_iou_identical(self):
"""동일한 박스의 IoU 테스트"""
box1 = np.array([10, 10, 50, 50])
box2 = np.array([10, 10, 50, 50])
iou = self.calculator.compute_iou(box1, box2)
# 동일한 박스의 IoU는 1이어야 함
self.assertEqual(iou, 1.0)
def test_calculate_2d_map(self):
"""2D mAP 계산 테스트"""
map_score = self.calculator.calculate_2d_map(
self.rendered_images, self.test_image
)
# mAP는 0과 1 사이의 값이어야 함
self.assertGreaterEqual(map_score, 0.0)
self.assertLessEqual(map_score, 1.0)
class TestMap3DCalculator(unittest.TestCase):
"""3D mAP 계산기 테스트"""
def setUp(self):
"""테스트 설정"""
self.calculator = Map3DCalculator()
# 테스트용 더미 3D 모델 생성
self.test_model = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3)),
'center': np.array([0, 0, 0]),
'scale': np.array([1, 1, 1]),
'bounding_box': (np.array([-1, -1, -1]), np.array([1, 1, 1]))
}
self.ground_truth_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3)),
'classes': ['chair'],
'bounding_boxes': [{
'center': np.array([0, 0, 0]),
'size': np.array([1, 1, 1]),
'min_coords': np.array([-0.5, -0.5, -0.5]),
'max_coords': np.array([0.5, 0.5, 0.5]),
'volume': 1.0,
'confidence': 1.0
}]
}
def test_compute_3d_iou(self):
"""3D IoU 계산 테스트"""
box1 = {
'min_coords': np.array([0, 0, 0]),
'max_coords': np.array([2, 2, 2])
}
box2 = {
'min_coords': np.array([1, 1, 1]),
'max_coords': np.array([3, 3, 3])
}
iou = self.calculator.compute_3d_iou(box1, box2)
# 3D IoU는 0과 1 사이의 값이어야 함
self.assertGreaterEqual(iou, 0.0)
self.assertLessEqual(iou, 1.0)
def test_calculate_3d_map(self):
"""3D mAP 계산 테스트"""
map_score = self.calculator.calculate_3d_map(
self.test_model, self.ground_truth_3d
)
# mAP는 0과 1 사이의 값이어야 함
self.assertGreaterEqual(map_score, 0.0)
self.assertLessEqual(map_score, 1.0)
class TestChamferDistanceCalculator(unittest.TestCase):
"""Chamfer Distance 계산기 테스트"""
def setUp(self):
"""테스트 설정"""
self.calculator = ChamferDistanceCalculator()
# 테스트용 더미 3D 모델 생성
self.model_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3))
}
self.reference_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3))
}
def test_chamfer_distance_naive(self):
"""Chamfer Distance 계산 테스트"""
pc1 = np.random.rand(50, 3)
pc2 = np.random.rand(50, 3)
distance = self.calculator.chamfer_distance_naive(pc1, pc2)
# Chamfer Distance는 0 이상의 값이어야 함
self.assertGreaterEqual(distance, 0.0)
def test_chamfer_distance_identical(self):
"""동일한 점군의 Chamfer Distance 테스트"""
pc1 = np.random.rand(50, 3)
pc2 = pc1.copy()
distance = self.calculator.chamfer_distance_naive(pc1, pc2)
# 동일한 점군의 Chamfer Distance는 0에 가까워야 함
self.assertLess(distance, 1e-6)
def test_calculate_chamfer_distance(self):
"""Chamfer Distance 계산 테스트"""
distance = self.calculator.calculate_chamfer_distance(
self.model_3d, self.reference_3d
)
# Chamfer Distance는 0 이상의 값이어야 함
self.assertGreaterEqual(distance, 0.0)
class TestEMCalculator(unittest.TestCase):
"""EMD 계산기 테스트"""
def setUp(self):
"""테스트 설정"""
self.calculator = EMCalculator()
# 테스트용 더미 3D 모델 생성
self.model_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3))
}
self.reference_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3))
}
def test_earth_movers_distance(self):
"""EMD 계산 테스트"""
pc1 = np.random.rand(50, 3)
pc2 = np.random.rand(50, 3)
emd = self.calculator.earth_movers_distance(pc1, pc2)
# EMD는 0 이상의 값이어야 함
self.assertGreaterEqual(emd, 0.0)
def test_earth_movers_distance_identical(self):
"""동일한 점군의 EMD 테스트"""
pc1 = np.random.rand(50, 3)
pc2 = pc1.copy()
emd = self.calculator.earth_movers_distance(pc1, pc2)
# 동일한 점군의 EMD는 0에 가까워야 함
self.assertLess(emd, 1e-6)
def test_calculate_emd(self):
"""EMD 계산 테스트"""
emd = self.calculator.calculate_emd(self.model_3d, self.reference_3d)
# EMD는 0 이상의 값이어야 함
self.assertGreaterEqual(emd, 0.0)
class TestClassAccuracyCalculator(unittest.TestCase):
"""클래스 정확도 계산기 테스트"""
def setUp(self):
"""테스트 설정"""
self.calculator = ClassAccuracyCalculator()
# 테스트용 더미 3D 모델 생성
self.model_3d = {
'vertices': np.random.rand(100, 3),
'faces': np.random.randint(0, 100, (50, 3))
}
self.ground_truth_labels = {
'classes': ['chair', 'table'],
'labels': ['chair', 'table']
}
def test_classify_objects(self):
"""객체 분류 테스트"""
classes = self.calculator.classify_objects(self.model_3d)
# 분류 결과는 리스트여야 함
self.assertIsInstance(classes, list)
self.assertGreater(len(classes), 0)
def test_calculate_class_accuracy(self):
"""클래스 정확도 계산 테스트"""
accuracy = self.calculator.calculate_class_accuracy(
self.model_3d, self.ground_truth_labels
)
# 정확도는 0과 1 사이의 값이어야 함
self.assertGreaterEqual(accuracy, 0.0)
self.assertLessEqual(accuracy, 1.0)
def test_compute_class_wise_metrics(self):
"""클래스별 메트릭 계산 테스트"""
predictions = ['chair', 'table', 'chair']
ground_truth = ['chair', 'table', 'sofa']
metrics = self.calculator.compute_class_wise_metrics(predictions, ground_truth)
# 메트릭은 딕셔너리여야 함
self.assertIsInstance(metrics, dict)
self.assertIn('overall_accuracy', metrics)
if __name__ == '__main__':
# 테스트 실행
unittest.main(verbosity=2)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment