Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis | |
from sklearn.preprocessing import StandardScaler, LabelEncoder | |
from sklearn.decomposition import PCA | |
from sklearn.feature_selection import SelectKBest, f_classif | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import logging | |
import os | |
from datetime import datetime | |
from typing import Dict, Tuple, List, Optional, Any | |
import xlsxwriter | |
class LDAAnalyzer: | |
""" | |
Класс для выполнения линейного дискриминантного анализа (LDA) | |
с расширенной функциональностью и форматированным выводом результатов | |
""" | |
def __init__(self, input_file: str, target_column: int): | |
""" | |
Инициализация анализатора LDA | |
Args: | |
input_file (str): Путь к входному файлу Excel | |
target_column (int): Номер столбца для классификации | |
""" | |
self.input_file = input_file | |
self.target_column = target_column | |
self.data = None | |
self.X = None | |
self.y = None | |
self.X_transformed = None | |
self.lda = None | |
self.scaler = StandardScaler() | |
self.label_encoder = LabelEncoder() | |
self.feature_names = None | |
# Настройка логирования | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('lda_analysis.log'), | |
logging.StreamHandler() | |
] | |
) | |
self.logger = logging.getLogger(__name__) | |
# Цветовая схема для визуализации | |
self.colors = ['lightblue', 'green', 'purple', 'yellow', | |
'red', 'orange', 'cyan', 'brown', 'pink'] | |
self.logger.info(f"Инициализация LDA анализатора с файлом: {input_file}") | |
def validate_data(self) -> None: | |
"""Валидация входных данных""" | |
if self.data is None: | |
raise ValueError("Данные не загружены") | |
# Проверка размерности | |
if self.data.shape[0] < 30: | |
raise ValueError("Недостаточно наблюдений (минимум 30)") | |
# Проверка пропущенных значений | |
if self.data.isnull().any().any(): | |
raise ValueError("Обнаружены пропущенные значения") | |
# Проверка типов данных | |
numeric_cols = self.data.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) < self.data.shape[1] - 1: # -1 для целевой переменной | |
raise ValueError("Обнаружены нечисловые признаки") | |
def load_data(self) -> None: | |
"""Загрузка данных из Excel файла""" | |
try: | |
self.logger.info("Загрузка данных...") | |
# Загрузка данных | |
self.data = pd.read_excel(self.input_file) | |
# Преобразование имен колонок | |
self.data.columns = [str(col) for col in self.data.columns] | |
# Попытка преобразовать все колонки (кроме целевой) в числовой формат | |
for col in self.data.columns: | |
if self.data.columns.get_loc(col) != self.target_column: | |
try: | |
self.data[col] = pd.to_numeric(self.data[col], errors='coerce') | |
except Exception as e: | |
self.logger.warning(f"Не удалось преобразовать колонку {col} в числовой формат: {str(e)}") | |
self.validate_data() | |
self.logger.info(f"Данные загружены. Размерность: {self.data.shape}") | |
except Exception as e: | |
self.logger.error(f"Ошибка при загрузке данных: {str(e)}") | |
raise | |
def prepare_data(self) -> None: | |
"""Подготовка данных для анализа""" | |
try: | |
self.logger.info("Подготовка данных...") | |
# Разделение на признаки и целевую переменную | |
X = self.data.drop(self.data.columns[self.target_column], axis=1) | |
y = self.data.iloc[:, self.target_column] | |
# Преобразование имен колонок в строки | |
X.columns = X.columns.astype(str) | |
# Кодирование меток классов | |
self.y = self.label_encoder.fit_transform(y) + 1 | |
# Преобразование в числовой формат | |
X = X.apply(pd.to_numeric, errors='coerce') | |
# Проверка на пропущенные значения после преобразования | |
if X.isnull().any().any(): | |
raise ValueError("После преобразования в числовой формат появились пропущенные значения") | |
# Стандартизация признаков | |
self.X = self.scaler.fit_transform(X) | |
# Проверка количества классов и наблюдений в каждом классе | |
class_counts = pd.Series(self.y).value_counts() | |
if (class_counts < 5).any(): | |
self.logger.warning("Некоторые классы имеют менее 5 наблюдений") | |
self.logger.info(f"Данные подготовлены. X: {self.X.shape}, y: {self.y.shape}") | |
self.logger.info(f"Количество классов: {len(np.unique(self.y))}") | |
except Exception as e: | |
self.logger.error(f"Ошибка при подготовке данных: {str(e)}") | |
raise | |
def perform_lda(self) -> None: | |
"""Выполнение LDA анализа""" | |
try: | |
self.logger.info("Выполнение LDA анализа...") | |
# Инициализация и обучение LDA | |
self.lda = LinearDiscriminantAnalysis(solver='svd') | |
self.X_transformed = self.lda.fit_transform(self.X, self.y) | |
# Оценка качества модели | |
accuracy = self.lda.score(self.X, self.y) | |
self.logger.info(f"Общая точность модели: {accuracy:.3f}") | |
except Exception as e: | |
self.logger.error(f"Ошибка при выполнении LDA: {str(e)}") | |
raise | |
def create_confusion_matrix(self) -> Tuple[pd.DataFrame, List[List[str]], float]: | |
""" | |
Создание матрицы ошибок и расчет процентов классификации | |
Returns: | |
tuple: (матрица ошибок, проценты, общая точность) | |
""" | |
try: | |
self.logger.info("Создание матрицы ошибок...") | |
# Получение предсказаний | |
y_pred = self.lda.predict(self.X) | |
# Создание матрицы ошибок | |
classes = sorted(np.unique(self.y)) | |
n_classes = len(classes) | |
confusion_matrix = np.zeros((n_classes, n_classes)) | |
for i in range(len(self.y)): | |
confusion_matrix[self.y[i]-1][y_pred[i]-1] += 1 | |
# Создание DataFrame для матрицы ошибок | |
columns = [f"{i+1}.00" for i in range(n_classes)] | |
index = [f"{i+1}.00" for i in range(n_classes)] | |
df_confusion = pd.DataFrame(confusion_matrix, columns=columns, index=index) | |
# Добавление столбца "Всего" | |
df_confusion['Всего'] = df_confusion.sum(axis=1) | |
# Расчет процентов | |
percentages = np.zeros((n_classes, n_classes + 1)) # +1 для столбца "Всего" | |
for i in range(n_classes): | |
row_sum = confusion_matrix[i].sum() | |
if row_sum > 0: | |
percentages[i, :-1] = (confusion_matrix[i] / row_sum) * 100 | |
percentages[i, -1] = 100.0 | |
# Форматирование процентов | |
percentage_rows = [] | |
for row in percentages: | |
formatted_row = [f"{x:.1f}" for x in row] | |
percentage_rows.append(formatted_row) | |
# Расчет общей точности | |
accuracy = (np.sum(np.diag(confusion_matrix)) / np.sum(confusion_matrix)) * 100 | |
self.logger.info(f"Процент правильной классификации: {accuracy:.1f}%") | |
return df_confusion, percentage_rows, accuracy | |
except Exception as e: | |
self.logger.error(f"Ошибка при создании матрицы ошибок: {str(e)}") | |
raise | |
def get_coefficients(self) -> pd.DataFrame: | |
""" | |
Получение коэффициентов дискриминантных функций | |
Returns: | |
pd.DataFrame: таблица коэффициентов | |
""" | |
try: | |
self.logger.info("Получение коэффициентов...") | |
# Получение коэффициентов и размерностей | |
n_features = self.X.shape[1] | |
n_classes = len(np.unique(self.y)) | |
n_components = min(n_classes - 1, n_features) | |
# Создание списка имен переменных | |
var_names = [f"VAR{str(i+1).zfill(5)}" for i in range(n_features)] | |
# Создание DataFrame с коэффициентами | |
coef_data = [] | |
for i in range(n_components): | |
row_data = {} | |
for j, var_name in enumerate(var_names): | |
row_data[var_name] = self.lda.coef_[i][j] | |
coef_data.append(row_data) | |
df_coef = pd.DataFrame(coef_data, index=[f"Функция {i+1}" for i in range(n_components)]) | |
# Добавление константы (intercept) | |
const_data = {} | |
for j, var_name in enumerate(var_names): | |
const_data[var_name] = self.lda.intercept_[j] if j < len(self.lda.intercept_) else 0.0 | |
const_df = pd.DataFrame([const_data], index=['Константа']) | |
# Объединение коэффициентов и константы | |
df_coef = pd.concat([df_coef, const_df]) | |
# Округление значений | |
df_coef = df_coef.round(3) | |
self.logger.info("Коэффициенты получены") | |
return df_coef | |
except Exception as e: | |
self.logger.error(f"Ошибка при получении коэффициентов: {str(e)}") | |
raise | |
def create_visualization(self) -> plt.Figure: | |
""" | |
Создание визуализации результатов | |
Returns: | |
plt.Figure: объект графика | |
""" | |
try: | |
self.logger.info("Создание визуализации...") | |
fig = plt.figure(figsize=(12, 8)) | |
# Построение точек для каждого класса | |
for class_num in np.unique(self.y): | |
mask = self.y == class_num | |
plt.scatter( | |
self.X_transformed[mask, 0], | |
self.X_transformed[mask, 1] if self.X_transformed.shape[1] > 1 | |
else np.zeros_like(self.X_transformed[mask, 0]), | |
c=[self.colors[(class_num-1) % len(self.colors)]], | |
label=f'Группа {class_num}', | |
alpha=0.7 | |
) | |
# Добавление центроидов | |
centroid = np.mean(self.X_transformed[mask, :2], axis=0) | |
plt.scatter( | |
centroid[0], | |
centroid[1] if self.X_transformed.shape[1] > 1 else 0, | |
c='black', | |
marker='s', | |
s=100 | |
) | |
plt.annotate( | |
f'{class_num}', | |
(centroid[0], centroid[1]), | |
xytext=(5, 5), | |
textcoords='offset points', | |
fontsize=10, | |
bbox=dict(facecolor='white', edgecolor='none', alpha=0.7) | |
) | |
plt.xlabel('Первая каноническая функция') | |
plt.ylabel('Вторая каноническая функция') | |
plt.title('Канонические дискриминантные функции') | |
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left') | |
plt.grid(True, alpha=0.3) | |
plt.tight_layout() | |
self.logger.info("Визуализация создана") | |
return fig | |
except Exception as e: | |
self.logger.error(f"Ошибка при создании визуализации: {str(e)}") | |
raise | |
def save_results(self, output_dir: str) -> None: | |
""" | |
Сохранение всех результатов анализа | |
Args: | |
output_dir (str): директория для сохранения результатов | |
""" | |
try: | |
self.logger.info(f"Сохранение результатов в {output_dir}...") | |
# Создание директории если её нет | |
os.makedirs(output_dir, exist_ok=True) | |
# Получение результатов | |
confusion_matrix, percentages, accuracy = self.create_confusion_matrix() | |
coefficients = self.get_coefficients() | |
# Сохранение в Excel | |
excel_path = os.path.join(output_dir, 'lda_results.xlsx') | |
with pd.ExcelWriter(excel_path, engine='xlsxwriter') as writer: | |
workbook = writer.book | |
# Форматы для Excel | |
header_format = workbook.add_format({ | |
'bold': True, | |
'align': 'center', | |
'valign': 'vcenter', | |
'bg_color': '#D9D9D9', | |
'border': 1 | |
}) | |
cell_format = workbook.add_format({ | |
'align': 'center', | |
'border': 1 | |
}) | |
number_format = workbook.add_format({ | |
'align': 'center', | |
'border': 1, | |
'num_format': '0.000' | |
}) | |
# 1. Матрица классификации | |
worksheet1 = workbook.add_worksheet('Матрица классификации') | |
# Записываем заголовки | |
headers = ['Исходный', 'Количество'] + \ | |
[f'{i+1}.00' for i in range(len(confusion_matrix.columns)-1)] + \ | |
['Всего'] | |
for col, header in enumerate(headers): | |
worksheet1.write(0, col, header, header_format) | |
worksheet1.set_column(col, col, 15) | |
# Записываем данные | |
for i, (index, row) in enumerate(confusion_matrix.iterrows()): | |
worksheet1.write(i+1, 0, index, cell_format) | |
worksheet1.write(i+1, 1, row['Всего'], cell_format) | |
for j, val in enumerate(row): | |
worksheet1.write(i+1, j+2, val, cell_format) | |
# 2. Проценты классификации | |
worksheet2 = workbook.add_worksheet('Проценты') | |
# Заголовки | |
for col, header in enumerate(headers): | |
worksheet2.write(0, col, header, header_format) | |
worksheet2.set_column(col, col, 15) | |
# Данные процентов | |
for i, row in enumerate(percentages): | |
worksheet2.write(i+1, 0, f"{i+1}.00", cell_format) | |
worksheet2.write(i+1, 1, confusion_matrix.iloc[i]['Всего'], cell_format) | |
for j, val in enumerate(row): | |
worksheet2.write(i+1, j+2, float(val.replace(',', '.')), number_format) | |
# Примечание | |
note_row = len(percentages) + 2 | |
worksheet2.write( | |
note_row, 0, | |
f'* Примечание: {accuracy:.1f}% исходных сгруппированных наблюдений ' | |
f'классифицированы правильно.', | |
workbook.add_format({'bold': True}) | |
) | |
# 3. Коэффициенты функций | |
worksheet3 = workbook.add_worksheet('Коэффициенты') | |
# Записываем заголовки коэффициентов | |
worksheet3.write(0, 0, 'Переменная', header_format) | |
for i, col in enumerate(coefficients.columns): | |
worksheet3.write(0, i+1, col, header_format) | |
worksheet3.set_column(i+1, i+1, 15) | |
# Записываем данные коэффициентов | |
for i, (index, row) in enumerate(coefficients.iterrows()): | |
worksheet3.write(i+1, 0, index, cell_format) | |
for j, val in enumerate(row): | |
worksheet3.write(i+1, j+1, val, number_format) | |
# Добавляем примечание к коэффициентам | |
worksheet3.write( | |
len(coefficients)+1, 0, | |
'*Нестандартизованные коэффициенты', | |
workbook.add_format({'bold': True, 'italic': True}) | |
) | |
# Сохранение визуализации | |
fig = self.create_visualization() | |
fig.savefig( | |
os.path.join(output_dir, 'lda_visualization.png'), | |
bbox_inches='tight', | |
dpi=300 | |
) | |
plt.close(fig) | |
self.logger.info("Результаты успешно сохранены") | |
except Exception as e: | |
self.logger.error(f"Ошибка при сохранении результатов: {str(e)}") | |
raise |