Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, HTTPException | |
import os | |
import tempfile | |
import chess.pgn | |
import chess.engine | |
from enum import Enum | |
from typing import List, Dict | |
from datetime import datetime | |
import csv | |
import json | |
from fastapi.responses import JSONResponse | |
import asyncio | |
import sys | |
from routes.tex_based_review import review_chess_game, validate_json | |
if sys.platform == "win32": | |
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
class GamePhase(Enum): | |
OPENING = "opening" | |
MIDDLEGAME = "middlegame" | |
ENDGAME = "endgame" | |
class Classification(Enum): | |
BRILLIANT = "brilliant" | |
GREAT = "great" | |
BEST = "best" | |
EXCELLENT = "excellent" | |
GOOD = "good" | |
INACCURACY = "inaccuracy" | |
MISTAKE = "mistake" | |
MISS = "miss" | |
BLUNDER = "blunder" | |
BOOK = "book" | |
FORCED = "forced" | |
classification_values = { | |
Classification.BLUNDER: 0, | |
Classification.MISTAKE: 0.2, | |
Classification.MISS: 0.3, | |
Classification.INACCURACY: 0.4, | |
Classification.GOOD: 0.65, | |
Classification.EXCELLENT: 0.9, | |
Classification.BEST: 1, | |
Classification.GREAT: 1, | |
Classification.BRILLIANT: 1, | |
Classification.BOOK: 1, | |
Classification.FORCED: 1, | |
} | |
centipawn_classifications = [ | |
Classification.BEST, | |
Classification.EXCELLENT, | |
Classification.GOOD, | |
Classification.INACCURACY, | |
Classification.MISS, | |
Classification.MISTAKE, | |
Classification.BLUNDER, | |
] | |
# Analysis parameters | |
FORCED_WIN_THRESHOLD = 500 | |
MISS_CENTIPAWN_LOSS = 300 | |
MISS_MATE_THRESHOLD = 3 | |
ENDGAME_MATERIAL_THRESHOLD = 24 | |
QUEEN_VALUE = 9 | |
def detect_game_phase(board: chess.Board, in_opening: bool) -> GamePhase: | |
if in_opening: | |
return GamePhase.OPENING | |
total_material = 0 | |
queens = 0 | |
for color in [chess.WHITE, chess.BLACK]: | |
for piece_type in chess.PIECE_TYPES: | |
if piece_type == chess.KING: | |
continue | |
count = len(board.pieces(piece_type, color)) | |
value = { | |
chess.PAWN: 1, | |
chess.KNIGHT: 3, | |
chess.BISHOP: 3, | |
chess.ROOK: 5, | |
chess.QUEEN: QUEEN_VALUE | |
}[piece_type] | |
total_material += count * value | |
if piece_type == chess.QUEEN: | |
queens += count | |
endgame_conditions = [ | |
total_material <= ENDGAME_MATERIAL_THRESHOLD, | |
queens == 0 and total_material <= ENDGAME_MATERIAL_THRESHOLD * 2, | |
] | |
return GamePhase.ENDGAME if any(endgame_conditions) else GamePhase.MIDDLEGAME | |
def get_evaluation_loss_threshold(classif: Classification, prev_eval: float) -> float: | |
prev_eval = abs(prev_eval) | |
if classif == Classification.BEST: | |
return max(0.0001 * prev_eval**2 + 0.0236 * prev_eval - 3.7143, 0) | |
elif classif == Classification.EXCELLENT: | |
return max(0.0002 * prev_eval**2 + 0.1231 * prev_eval + 27.5455, 0) | |
elif classif == Classification.GOOD: | |
return max(0.0002 * prev_eval**2 + 0.2643 * prev_eval + 60.5455, 0) | |
elif classif == Classification.INACCURACY: | |
return max(0.0002 * prev_eval**2 + 0.3624 * prev_eval + 108.0909, 0) | |
elif classif == Classification.MISS: | |
return max(0.00025 * prev_eval**2 + 0.38255 * prev_eval + 166.9541, 0) | |
elif classif == Classification.MISTAKE: | |
return max(0.0003 * prev_eval**2 + 0.4027 * prev_eval + 225.8182, 0) | |
else: | |
return float("inf") | |
def load_opening_book(csv_path): | |
opening_book = {} | |
try: | |
with open(csv_path, newline='', encoding='utf-8') as csvfile: | |
reader = csv.reader(csvfile) | |
next(reader) | |
for row in reader: | |
if len(row) < 3: | |
continue | |
pgn_moves = row[2] | |
game = chess.pgn.Game() | |
board = game.board() | |
for move in pgn_moves.split(): | |
if "." in move: | |
continue | |
try: | |
chess_move = board.push_san(move) | |
fen = " ".join(board.fen().split()[:4]) | |
opening_book[fen] = chess_move.uci() | |
except ValueError: | |
break | |
except Exception as e: | |
print(f"Error loading opening book: {e}") | |
return opening_book | |
def is_book_move(board, opening_book, max_depth=8): | |
if board.fullmove_number > max_depth: | |
return None | |
fen = " ".join(board.fen().split()[:4]) | |
return opening_book.get(fen) | |
engine_path = os.path.join(os.getcwd(), "models", "stockfish-windows-x86-64-avx2.exe") | |
book_csv_path = os.path.join(os.getcwd(), "assets", "openings_master.csv") | |
def analyze_pgn(pgn_file: str) -> Dict: | |
opening_book = load_opening_book(book_csv_path) | |
text_based_result = review_chess_game(pgn_file) | |
with open(pgn_file) as pgn: | |
game = chess.pgn.read_game(pgn) | |
if not game: | |
return {"error": "No game found in the PGN file."} | |
result = { | |
"move_analysis": [], | |
"phase_analysis": {}, | |
"player_summaries": {}, | |
"test_based_review": text_based_result | |
} | |
with chess.engine.SimpleEngine.popen_uci(engine_path) as engine: | |
board = game.board() | |
classifications = { | |
"white": {phase: [] for phase in GamePhase}, | |
"black": {phase: [] for phase in GamePhase} | |
} | |
phase_data = {phase: [] for phase in GamePhase} | |
in_opening = True | |
for move_number, node in enumerate(game.mainline(), start=1): | |
# Analyze position before the move | |
pre_info = engine.analyse(board, chess.engine.Limit(time=0.3), multipv=3)[0] | |
pre_eval = pre_info["score"].white().score(mate_score=10000) or 0 | |
pre_pv_moves = pre_info.get("pv", []) | |
# Get best move and follow-up moves in UCI notation | |
best_move_pre = pre_pv_moves[0].uci() if pre_pv_moves else None | |
follow_up_pre = [m.uci() for m in pre_pv_moves[:min(len(pre_pv_moves), 5)]] | |
# Make the user move | |
move = node.move | |
board.push(move) # Update the board state | |
# Analyze position after the move | |
post_info = engine.analyse(board, chess.engine.Limit(time=0.3), multipv=3)[0] | |
# Get best move and follow-up moves AFTER move is played (in UCI notation) | |
post_pv_moves = post_info.get("pv", []) | |
best_move_post = post_pv_moves[0].uci() if post_pv_moves else None | |
follow_up_post = [m.uci() for m in post_pv_moves[:min(len(post_pv_moves), 5)]] | |
post_eval = post_info["score"].white().score(mate_score=10000) or 0 | |
# Determine game phase | |
book_move = is_book_move(board, opening_book) | |
current_phase = detect_game_phase(board, in_opening) | |
if not book_move and in_opening: | |
in_opening = False | |
# Calculate evaluation loss | |
eval_loss = abs(pre_eval - post_eval) | |
# Initial classification | |
classification = Classification.BOOK if book_move else None | |
if not classification: | |
for classif in centipawn_classifications: | |
threshold = get_evaluation_loss_threshold(classif, pre_eval) | |
if eval_loss <= threshold: | |
classification = classif | |
break | |
classification = classification or Classification.BLUNDER | |
# Check for missed opportunities | |
is_winning = abs(pre_eval) >= FORCED_WIN_THRESHOLD | |
is_forced_win = pre_info["score"].is_mate() and pre_info["score"].relative.mate() <= MISS_MATE_THRESHOLD | |
if is_winning and move != best_move_pre and (eval_loss >= MISS_CENTIPAWN_LOSS or is_forced_win): | |
classification = Classification.MISS | |
# Check for brilliant moves | |
if classification == Classification.BEST: | |
if pre_eval < -150 and post_eval >= 150: | |
classification = Classification.GREAT | |
elif pre_eval < -300 and post_eval >= 300: | |
classification = Classification.BRILLIANT | |
# Track classifications | |
player = "white" if board.turn == chess.BLACK else "black" | |
classifications[player][current_phase].append(classification) | |
phase_data[current_phase].append(classification) | |
# Add move analysis to result (using UCI notation) | |
result["move_analysis"].append({ | |
"move_number": move_number, | |
"player": "White" if board.turn == chess.BLACK else "Black", | |
"user_move": move.uci(), | |
"evaluation": post_eval / 100, | |
"evaluation_loss": eval_loss / 100, | |
"classification": classification.value, | |
"best_move_pre": best_move_pre, # Best move BEFORE move is played (UCI) | |
"follow_up_pre": follow_up_pre, # Follow-up moves BEFORE move is played (UCI) | |
"best_move_post": best_move_post, # Best move AFTER move is played (UCI) | |
"follow_up_post": follow_up_post # Follow-up moves AFTER move is played (UCI) | |
}) | |
# Phase analysis | |
for phase in GamePhase: | |
moves = phase_data[phase] | |
if moves: | |
rating = get_phase_rating(moves) | |
result["phase_analysis"][phase.value] = { | |
"rating": rating.value, | |
"move_count": len(moves) | |
} | |
# Player summaries | |
for color in ["white", "black"]: | |
player = game.headers["White" if color == "white" else "Black"] | |
counts = {c.value: 0 for c in Classification} | |
for phase in GamePhase: | |
phase_moves = classifications[color][phase] | |
for m in phase_moves: | |
m_enum = Classification(m) if isinstance(m, str) else m # Convert if needed | |
counts[m_enum.value] += 1 | |
result["player_summaries"][player] = counts | |
def convert_enums(obj): | |
if isinstance(obj, Enum): # Convert Enum to its value | |
return obj.value | |
if isinstance(obj, dict): # Recursively handle dicts | |
return {k: convert_enums(v) for k, v in obj.items()} | |
if isinstance(obj, list): # Recursively handle lists | |
return [convert_enums(i) for i in obj] | |
return obj # Return other types as they are | |
json_result = convert_enums(result) | |
return JSONResponse(content=json_result) | |
def get_phase_rating(classified_moves: List[Classification]) -> Classification: | |
if not classified_moves: | |
return Classification.GOOD | |
classified_moves = [Classification(m) if isinstance(m, str) else m for m in classified_moves] | |
total = sum(classification_values[m] for m in classified_moves) | |
average = total / len(classified_moves) | |
rating_order = [ | |
(Classification.BRILLIANT, 0.95), | |
(Classification.GREAT, 0.85), | |
(Classification.BEST, 0.75), | |
(Classification.EXCELLENT, 0.65), | |
(Classification.GOOD, 0.5), | |
(Classification.INACCURACY, 0.35), | |
(Classification.MISS, 0.25), | |
(Classification.MISTAKE, 0.15) | |
] | |
return next((c for c, t in rating_order if average >= t), Classification.BLUNDER) |