lda_analyze / work.py
daswer123's picture
Upload 3 files
f147e1e verified
import pandas as pd
import numpy as np
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif
import matplotlib.pyplot as plt
import seaborn as sns
import logging
import os
from datetime import datetime
from typing import Dict, Tuple, List, Optional, Any
import xlsxwriter
class LDAAnalyzer:
"""
Класс для выполнения линейного дискриминантного анализа (LDA)
с расширенной функциональностью и форматированным выводом результатов
"""
def __init__(self, input_file: str, target_column: int):
"""
Инициализация анализатора LDA
Args:
input_file (str): Путь к входному файлу Excel
target_column (int): Номер столбца для классификации
"""
self.input_file = input_file
self.target_column = target_column
self.data = None
self.X = None
self.y = None
self.X_transformed = None
self.lda = None
self.scaler = StandardScaler()
self.label_encoder = LabelEncoder()
self.feature_names = None
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('lda_analysis.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# Цветовая схема для визуализации
self.colors = ['lightblue', 'green', 'purple', 'yellow',
'red', 'orange', 'cyan', 'brown', 'pink']
self.logger.info(f"Инициализация LDA анализатора с файлом: {input_file}")
def validate_data(self) -> None:
"""Валидация входных данных"""
if self.data is None:
raise ValueError("Данные не загружены")
# Проверка размерности
if self.data.shape[0] < 30:
raise ValueError("Недостаточно наблюдений (минимум 30)")
# Проверка пропущенных значений
if self.data.isnull().any().any():
raise ValueError("Обнаружены пропущенные значения")
# Проверка типов данных
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < self.data.shape[1] - 1: # -1 для целевой переменной
raise ValueError("Обнаружены нечисловые признаки")
def load_data(self) -> None:
"""Загрузка данных из Excel файла"""
try:
self.logger.info("Загрузка данных...")
# Загрузка данных
self.data = pd.read_excel(self.input_file)
# Преобразование имен колонок
self.data.columns = [str(col) for col in self.data.columns]
# Попытка преобразовать все колонки (кроме целевой) в числовой формат
for col in self.data.columns:
if self.data.columns.get_loc(col) != self.target_column:
try:
self.data[col] = pd.to_numeric(self.data[col], errors='coerce')
except Exception as e:
self.logger.warning(f"Не удалось преобразовать колонку {col} в числовой формат: {str(e)}")
self.validate_data()
self.logger.info(f"Данные загружены. Размерность: {self.data.shape}")
except Exception as e:
self.logger.error(f"Ошибка при загрузке данных: {str(e)}")
raise
def prepare_data(self) -> None:
"""Подготовка данных для анализа"""
try:
self.logger.info("Подготовка данных...")
# Разделение на признаки и целевую переменную
X = self.data.drop(self.data.columns[self.target_column], axis=1)
y = self.data.iloc[:, self.target_column]
# Преобразование имен колонок в строки
X.columns = X.columns.astype(str)
# Кодирование меток классов
self.y = self.label_encoder.fit_transform(y) + 1
# Преобразование в числовой формат
X = X.apply(pd.to_numeric, errors='coerce')
# Проверка на пропущенные значения после преобразования
if X.isnull().any().any():
raise ValueError("После преобразования в числовой формат появились пропущенные значения")
# Стандартизация признаков
self.X = self.scaler.fit_transform(X)
# Проверка количества классов и наблюдений в каждом классе
class_counts = pd.Series(self.y).value_counts()
if (class_counts < 5).any():
self.logger.warning("Некоторые классы имеют менее 5 наблюдений")
self.logger.info(f"Данные подготовлены. X: {self.X.shape}, y: {self.y.shape}")
self.logger.info(f"Количество классов: {len(np.unique(self.y))}")
except Exception as e:
self.logger.error(f"Ошибка при подготовке данных: {str(e)}")
raise
def perform_lda(self) -> None:
"""Выполнение LDA анализа"""
try:
self.logger.info("Выполнение LDA анализа...")
# Инициализация и обучение LDA
self.lda = LinearDiscriminantAnalysis(solver='svd')
self.X_transformed = self.lda.fit_transform(self.X, self.y)
# Оценка качества модели
accuracy = self.lda.score(self.X, self.y)
self.logger.info(f"Общая точность модели: {accuracy:.3f}")
except Exception as e:
self.logger.error(f"Ошибка при выполнении LDA: {str(e)}")
raise
def create_confusion_matrix(self) -> Tuple[pd.DataFrame, List[List[str]], float]:
"""
Создание матрицы ошибок и расчет процентов классификации
Returns:
tuple: (матрица ошибок, проценты, общая точность)
"""
try:
self.logger.info("Создание матрицы ошибок...")
# Получение предсказаний
y_pred = self.lda.predict(self.X)
# Создание матрицы ошибок
classes = sorted(np.unique(self.y))
n_classes = len(classes)
confusion_matrix = np.zeros((n_classes, n_classes))
for i in range(len(self.y)):
confusion_matrix[self.y[i]-1][y_pred[i]-1] += 1
# Создание DataFrame для матрицы ошибок
columns = [f"{i+1}.00" for i in range(n_classes)]
index = [f"{i+1}.00" for i in range(n_classes)]
df_confusion = pd.DataFrame(confusion_matrix, columns=columns, index=index)
# Добавление столбца "Всего"
df_confusion['Всего'] = df_confusion.sum(axis=1)
# Расчет процентов
percentages = np.zeros((n_classes, n_classes + 1)) # +1 для столбца "Всего"
for i in range(n_classes):
row_sum = confusion_matrix[i].sum()
if row_sum > 0:
percentages[i, :-1] = (confusion_matrix[i] / row_sum) * 100
percentages[i, -1] = 100.0
# Форматирование процентов
percentage_rows = []
for row in percentages:
formatted_row = [f"{x:.1f}" for x in row]
percentage_rows.append(formatted_row)
# Расчет общей точности
accuracy = (np.sum(np.diag(confusion_matrix)) / np.sum(confusion_matrix)) * 100
self.logger.info(f"Процент правильной классификации: {accuracy:.1f}%")
return df_confusion, percentage_rows, accuracy
except Exception as e:
self.logger.error(f"Ошибка при создании матрицы ошибок: {str(e)}")
raise
def get_coefficients(self) -> pd.DataFrame:
"""
Получение коэффициентов дискриминантных функций
Returns:
pd.DataFrame: таблица коэффициентов
"""
try:
self.logger.info("Получение коэффициентов...")
# Получение коэффициентов и размерностей
n_features = self.X.shape[1]
n_classes = len(np.unique(self.y))
n_components = min(n_classes - 1, n_features)
# Создание списка имен переменных
var_names = [f"VAR{str(i+1).zfill(5)}" for i in range(n_features)]
# Создание DataFrame с коэффициентами
coef_data = []
for i in range(n_components):
row_data = {}
for j, var_name in enumerate(var_names):
row_data[var_name] = self.lda.coef_[i][j]
coef_data.append(row_data)
df_coef = pd.DataFrame(coef_data, index=[f"Функция {i+1}" for i in range(n_components)])
# Добавление константы (intercept)
const_data = {}
for j, var_name in enumerate(var_names):
const_data[var_name] = self.lda.intercept_[j] if j < len(self.lda.intercept_) else 0.0
const_df = pd.DataFrame([const_data], index=['Константа'])
# Объединение коэффициентов и константы
df_coef = pd.concat([df_coef, const_df])
# Округление значений
df_coef = df_coef.round(3)
self.logger.info("Коэффициенты получены")
return df_coef
except Exception as e:
self.logger.error(f"Ошибка при получении коэффициентов: {str(e)}")
raise
def create_visualization(self) -> plt.Figure:
"""
Создание визуализации результатов
Returns:
plt.Figure: объект графика
"""
try:
self.logger.info("Создание визуализации...")
fig = plt.figure(figsize=(12, 8))
# Построение точек для каждого класса
for class_num in np.unique(self.y):
mask = self.y == class_num
plt.scatter(
self.X_transformed[mask, 0],
self.X_transformed[mask, 1] if self.X_transformed.shape[1] > 1
else np.zeros_like(self.X_transformed[mask, 0]),
c=[self.colors[(class_num-1) % len(self.colors)]],
label=f'Группа {class_num}',
alpha=0.7
)
# Добавление центроидов
centroid = np.mean(self.X_transformed[mask, :2], axis=0)
plt.scatter(
centroid[0],
centroid[1] if self.X_transformed.shape[1] > 1 else 0,
c='black',
marker='s',
s=100
)
plt.annotate(
f'{class_num}',
(centroid[0], centroid[1]),
xytext=(5, 5),
textcoords='offset points',
fontsize=10,
bbox=dict(facecolor='white', edgecolor='none', alpha=0.7)
)
plt.xlabel('Первая каноническая функция')
plt.ylabel('Вторая каноническая функция')
plt.title('Канонические дискриминантные функции')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
self.logger.info("Визуализация создана")
return fig
except Exception as e:
self.logger.error(f"Ошибка при создании визуализации: {str(e)}")
raise
def save_results(self, output_dir: str) -> None:
"""
Сохранение всех результатов анализа
Args:
output_dir (str): директория для сохранения результатов
"""
try:
self.logger.info(f"Сохранение результатов в {output_dir}...")
# Создание директории если её нет
os.makedirs(output_dir, exist_ok=True)
# Получение результатов
confusion_matrix, percentages, accuracy = self.create_confusion_matrix()
coefficients = self.get_coefficients()
# Сохранение в Excel
excel_path = os.path.join(output_dir, 'lda_results.xlsx')
with pd.ExcelWriter(excel_path, engine='xlsxwriter') as writer:
workbook = writer.book
# Форматы для Excel
header_format = workbook.add_format({
'bold': True,
'align': 'center',
'valign': 'vcenter',
'bg_color': '#D9D9D9',
'border': 1
})
cell_format = workbook.add_format({
'align': 'center',
'border': 1
})
number_format = workbook.add_format({
'align': 'center',
'border': 1,
'num_format': '0.000'
})
# 1. Матрица классификации
worksheet1 = workbook.add_worksheet('Матрица классификации')
# Записываем заголовки
headers = ['Исходный', 'Количество'] + \
[f'{i+1}.00' for i in range(len(confusion_matrix.columns)-1)] + \
['Всего']
for col, header in enumerate(headers):
worksheet1.write(0, col, header, header_format)
worksheet1.set_column(col, col, 15)
# Записываем данные
for i, (index, row) in enumerate(confusion_matrix.iterrows()):
worksheet1.write(i+1, 0, index, cell_format)
worksheet1.write(i+1, 1, row['Всего'], cell_format)
for j, val in enumerate(row):
worksheet1.write(i+1, j+2, val, cell_format)
# 2. Проценты классификации
worksheet2 = workbook.add_worksheet('Проценты')
# Заголовки
for col, header in enumerate(headers):
worksheet2.write(0, col, header, header_format)
worksheet2.set_column(col, col, 15)
# Данные процентов
for i, row in enumerate(percentages):
worksheet2.write(i+1, 0, f"{i+1}.00", cell_format)
worksheet2.write(i+1, 1, confusion_matrix.iloc[i]['Всего'], cell_format)
for j, val in enumerate(row):
worksheet2.write(i+1, j+2, float(val.replace(',', '.')), number_format)
# Примечание
note_row = len(percentages) + 2
worksheet2.write(
note_row, 0,
f'* Примечание: {accuracy:.1f}% исходных сгруппированных наблюдений '
f'классифицированы правильно.',
workbook.add_format({'bold': True})
)
# 3. Коэффициенты функций
worksheet3 = workbook.add_worksheet('Коэффициенты')
# Записываем заголовки коэффициентов
worksheet3.write(0, 0, 'Переменная', header_format)
for i, col in enumerate(coefficients.columns):
worksheet3.write(0, i+1, col, header_format)
worksheet3.set_column(i+1, i+1, 15)
# Записываем данные коэффициентов
for i, (index, row) in enumerate(coefficients.iterrows()):
worksheet3.write(i+1, 0, index, cell_format)
for j, val in enumerate(row):
worksheet3.write(i+1, j+1, val, number_format)
# Добавляем примечание к коэффициентам
worksheet3.write(
len(coefficients)+1, 0,
'*Нестандартизованные коэффициенты',
workbook.add_format({'bold': True, 'italic': True})
)
# Сохранение визуализации
fig = self.create_visualization()
fig.savefig(
os.path.join(output_dir, 'lda_visualization.png'),
bbox_inches='tight',
dpi=300
)
plt.close(fig)
self.logger.info("Результаты успешно сохранены")
except Exception as e:
self.logger.error(f"Ошибка при сохранении результатов: {str(e)}")
raise