Files
lora-analyzer/lora_analyzer.py

455 lines
16 KiB
Python

"""
LoRA Analyzer - Core Module
Analiza archivos LoRA (.safetensors, .pt, .ckpt) y extrae información técnica
"""
import os
import json
from pathlib import Path
from typing import Dict, Any, List, Tuple
import numpy as np
try:
from safetensors import safe_open
SAFETENSORS_AVAILABLE = True
except ImportError:
SAFETENSORS_AVAILABLE = False
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
class LoRAAnalyzer:
"""Analizador de archivos LoRA"""
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.file_name = self.file_path.name
self.file_size = self.file_path.stat().st_size
self.extension = self.file_path.suffix.lower()
if not self.file_path.exists():
raise FileNotFoundError(f"El archivo no existe: {file_path}")
def analyze(self) -> Dict[str, Any]:
"""Análisis completo del archivo LoRA"""
result = {
"file_info": self._get_file_info(),
"architecture": {},
"metadata": {},
"weights_analysis": {},
"recommendations": []
}
# Cargar y analizar según el formato
if self.extension == ".safetensors":
result.update(self._analyze_safetensors())
elif self.extension in [".pt", ".pth", ".ckpt"]:
result.update(self._analyze_pytorch())
else:
raise ValueError(f"Formato no soportado: {self.extension}")
# Generar recomendaciones
result["recommendations"] = self._generate_recommendations(result)
return result
def _get_file_info(self) -> Dict[str, Any]:
"""Información básica del archivo"""
return {
"nombre": self.file_name,
"ruta": str(self.file_path.absolute()),
"tamaño_mb": round(self.file_size / (1024 * 1024), 2),
"extension": self.extension
}
def _analyze_safetensors(self) -> Dict[str, Any]:
"""Analiza archivos .safetensors"""
if not SAFETENSORS_AVAILABLE:
return {"error": "Instala 'safetensors': pip install safetensors"}
result = {
"architecture": {},
"metadata": {},
"weights_analysis": {}
}
with safe_open(self.file_path, framework="pt") as f:
# Obtener metadatos
metadata = f.metadata()
if metadata:
result["metadata"] = self._parse_metadata(metadata)
# Analizar capas y pesos
keys = list(f.keys())
result["architecture"] = self._analyze_architecture(keys, f)
result["weights_analysis"] = self._analyze_weights(keys, f)
return result
def _analyze_pytorch(self) -> Dict[str, Any]:
"""Analiza archivos .pt, .pth, .ckpt"""
if not TORCH_AVAILABLE:
return {"error": "Instala 'torch': pip install torch"}
result = {
"architecture": {},
"metadata": {},
"weights_analysis": {}
}
try:
state_dict = torch.load(self.file_path, map_location="cpu")
# Extraer metadatos si existen
if isinstance(state_dict, dict):
if "metadata" in state_dict:
result["metadata"] = state_dict["metadata"]
# Obtener los pesos (pueden estar en diferentes claves)
weights = state_dict.get("state_dict", state_dict)
keys = list(weights.keys())
result["architecture"] = self._analyze_architecture_dict(keys, weights)
result["weights_analysis"] = self._analyze_weights_dict(keys, weights)
except Exception as e:
result["error"] = f"Error al cargar archivo: {str(e)}"
return result
def _parse_metadata(self, metadata: Dict) -> Dict[str, Any]:
"""Parsea y organiza los metadatos"""
parsed = {}
# Buscar información común
common_keys = [
"ss_network_module", "ss_network_dim", "ss_network_alpha",
"ss_learning_rate", "ss_text_encoder_lr", "ss_unet_lr",
"ss_num_epochs", "ss_num_train_images", "ss_num_batches_per_epoch",
"ss_batch_size", "ss_base_model", "ss_sd_model_name",
"ss_resolution", "ss_clip_skip", "ss_max_train_steps",
"ss_dataset_dirs", "ss_training_comment"
]
for key in common_keys:
if key in metadata:
value = metadata[key]
# Intentar parsear JSON si es string
if isinstance(value, str):
try:
value = json.loads(value)
except:
pass
parsed[key] = value
# Agregar otros metadatos
for key, value in metadata.items():
if key not in parsed:
parsed[key] = value
return parsed
def _analyze_architecture(self, keys: List[str], tensor_file) -> Dict[str, Any]:
"""Analiza la arquitectura del LoRA desde safetensors"""
arch = {
"total_layers": len(keys),
"layer_details": [],
"rank_info": {},
"alpha_info": {},
"module_types": set()
}
# Analizar cada capa
for key in keys:
tensor = tensor_file.get_tensor(key)
shape = tensor.shape
layer_info = {
"name": key,
"shape": list(shape),
"num_params": np.prod(shape)
}
# Detectar rank (típicamente la dimensión más pequeña en matrices LoRA)
if len(shape) >= 2:
potential_rank = min(shape)
layer_info["potential_rank"] = int(potential_rank)
# Identificar tipo de módulo
if "lora_up" in key or "lora_down" in key:
module_type = "lora"
elif "alpha" in key:
module_type = "alpha"
else:
module_type = "other"
layer_info["type"] = module_type
arch["module_types"].add(module_type)
arch["layer_details"].append(layer_info)
arch["module_types"] = list(arch["module_types"])
# Calcular estadísticas de rank
ranks = [l["potential_rank"] for l in arch["layer_details"]
if "potential_rank" in l]
if ranks:
arch["rank_info"] = {
"detected_ranks": list(set(ranks)),
"most_common_rank": max(set(ranks), key=ranks.count),
"min_rank": min(ranks),
"max_rank": max(ranks)
}
return arch
def _analyze_architecture_dict(self, keys: List[str], weights: Dict) -> Dict[str, Any]:
"""Analiza la arquitectura del LoRA desde diccionario PyTorch"""
arch = {
"total_layers": len(keys),
"layer_details": [],
"rank_info": {},
"module_types": set()
}
for key in keys:
tensor = weights[key]
if hasattr(tensor, 'shape'):
shape = tensor.shape
else:
continue
layer_info = {
"name": key,
"shape": list(shape),
"num_params": np.prod(shape)
}
if len(shape) >= 2:
potential_rank = min(shape)
layer_info["potential_rank"] = int(potential_rank)
# Identificar tipo de módulo
if "lora" in key.lower():
module_type = "lora"
elif "alpha" in key.lower():
module_type = "alpha"
else:
module_type = "other"
layer_info["type"] = module_type
arch["module_types"].add(module_type)
arch["layer_details"].append(layer_info)
arch["module_types"] = list(arch["module_types"])
ranks = [l["potential_rank"] for l in arch["layer_details"]
if "potential_rank" in l]
if ranks:
arch["rank_info"] = {
"detected_ranks": list(set(ranks)),
"most_common_rank": max(set(ranks), key=ranks.count),
"min_rank": min(ranks),
"max_rank": max(ranks)
}
return arch
def _analyze_weights(self, keys: List[str], tensor_file) -> Dict[str, Any]:
"""Analiza los pesos del modelo"""
analysis = {
"total_parameters": 0,
"weight_statistics": {},
"layer_analysis": []
}
for key in keys[:10]: # Limitar para rendimiento
try:
tensor = tensor_file.get_tensor(key)
# Convertir a numpy para análisis
if hasattr(tensor, 'numpy'):
weights = tensor.numpy()
else:
weights = np.array(tensor)
stats = {
"layer": key,
"mean": float(np.mean(weights)),
"std": float(np.std(weights)),
"min": float(np.min(weights)),
"max": float(np.max(weights)),
"non_zero_ratio": float(np.count_nonzero(weights) / weights.size)
}
analysis["layer_analysis"].append(stats)
analysis["total_parameters"] += weights.size
except:
continue
return analysis
def _analyze_weights_dict(self, keys: List[str], weights: Dict) -> Dict[str, Any]:
"""Analiza los pesos desde diccionario"""
analysis = {
"total_parameters": 0,
"layer_analysis": []
}
for key in keys[:10]:
try:
tensor = weights[key]
if hasattr(tensor, 'numpy'):
w = tensor.numpy()
else:
w = np.array(tensor)
stats = {
"layer": key,
"mean": float(np.mean(w)),
"std": float(np.std(w)),
"min": float(np.min(w)),
"max": float(np.max(w)),
"non_zero_ratio": float(np.count_nonzero(w) / w.size)
}
analysis["layer_analysis"].append(stats)
analysis["total_parameters"] += w.size
except:
continue
return analysis
def _generate_recommendations(self, analysis: Dict) -> List[str]:
"""Genera recomendaciones basadas en el análisis"""
recommendations = []
# Recomendaciones basadas en rank
if "architecture" in analysis and "rank_info" in analysis["architecture"]:
rank_info = analysis["architecture"]["rank_info"]
if rank_info:
common_rank = rank_info.get("most_common_rank", 0)
if common_rank < 16:
recommendations.append(
f"Rank bajo ({common_rank}): Bueno para eficiencia. "
"Para más detalles, prueba rank 32-64."
)
elif common_rank > 128:
recommendations.append(
f"Rank alto ({common_rank}): Mucho detalle pero más pesado. "
"Considera reducir a 64-128 para mejor balance."
)
else:
recommendations.append(
f"Rank óptimo ({common_rank}): Buen balance entre detalle y eficiencia."
)
# Recomendaciones basadas en metadatos
if "metadata" in analysis and analysis["metadata"]:
metadata = analysis["metadata"]
if "ss_num_train_images" in metadata:
num_images = metadata["ss_num_train_images"]
try:
num_images = int(num_images)
if num_images < 20:
recommendations.append(
f"Dataset pequeño ({num_images} imágenes): "
"Considera aumentar a 30-50 imágenes para mejor generalización."
)
elif num_images > 100:
recommendations.append(
f"Dataset grande ({num_images} imágenes): "
"Excelente para capturar variaciones."
)
except:
pass
if "ss_learning_rate" in metadata:
lr = metadata["ss_learning_rate"]
recommendations.append(f"Learning rate usado: {lr}")
if not recommendations:
recommendations.append(
"Sube el archivo LoRA para obtener recomendaciones específicas."
)
return recommendations
def format_analysis_report(analysis: Dict) -> str:
"""Formatea el análisis como texto legible"""
report = []
report.append("=" * 70)
report.append("REPORTE DE ANÁLISIS LORA")
report.append("=" * 70)
report.append("")
# Información del archivo
if "file_info" in analysis:
report.append("📁 INFORMACIÓN DEL ARCHIVO:")
info = analysis["file_info"]
report.append(f" Nombre: {info.get('nombre', 'N/A')}")
report.append(f" Tamaño: {info.get('tamaño_mb', 0)} MB")
report.append(f" Formato: {info.get('extension', 'N/A')}")
report.append("")
# Arquitectura
if "architecture" in analysis and analysis["architecture"]:
report.append("🏗️ ARQUITECTURA:")
arch = analysis["architecture"]
report.append(f" Total de capas: {arch.get('total_layers', 0)}")
if "rank_info" in arch and arch["rank_info"]:
rank = arch["rank_info"]
report.append(f" Rank más común: {rank.get('most_common_rank', 'N/A')}")
report.append(f" Rango de ranks: {rank.get('min_rank', 'N/A')} - {rank.get('max_rank', 'N/A')}")
report.append("")
# Metadatos
if "metadata" in analysis and analysis["metadata"]:
report.append("⚙️ METADATOS DE ENTRENAMIENTO:")
meta = analysis["metadata"]
key_info = {
"ss_base_model": "Modelo base",
"ss_network_dim": "Network dim (rank)",
"ss_network_alpha": "Alpha",
"ss_learning_rate": "Learning rate",
"ss_num_epochs": "Épocas",
"ss_num_train_images": "Imágenes de entrenamiento",
"ss_batch_size": "Batch size",
"ss_resolution": "Resolución"
}
for key, label in key_info.items():
if key in meta:
report.append(f" {label}: {meta[key]}")
report.append("")
# Análisis de pesos
if "weights_analysis" in analysis and analysis["weights_analysis"]:
weights = analysis["weights_analysis"]
if "total_parameters" in weights:
report.append("📊 ANÁLISIS DE PESOS:")
report.append(f" Total parámetros: {weights['total_parameters']:,}")
report.append("")
# Recomendaciones
if "recommendations" in analysis and analysis["recommendations"]:
report.append("💡 RECOMENDACIONES:")
for i, rec in enumerate(analysis["recommendations"], 1):
report.append(f" {i}. {rec}")
report.append("")
report.append("=" * 70)
return "\n".join(report)