Spaces:
Sleeping
Sleeping
import io | |
import os | |
import tempfile | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import JSONResponse, StreamingResponse | |
from PIL import Image, UnidentifiedImageError | |
from routes.segmentation import segment_chess_board | |
from routes.detection import detect_pieces | |
from routes.fen_generator import gen_fen | |
from routes.chess_review import analyze_pgn | |
from typing import List, Dict, Any, Union | |
from pydantic import BaseModel | |
import asyncio | |
import sys | |
import tracemalloc | |
tracemalloc.start() | |
if sys.platform == "win32": | |
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
app = FastAPI() | |
class DetectionResults(BaseModel): | |
boxes: list | |
confidences: list | |
classes: list | |
async def read_root(): | |
return { | |
"name": "Narendra", | |
"age": 20, | |
"Gender": "Male" | |
} | |
async def get_seg(file: UploadFile = File(...)): | |
print(f'Image received: {file.filename}') | |
try: | |
image_content = await file.read() | |
if not image_content: | |
return JSONResponse(content={"error": "Empty file uploaded"}, status_code=400) | |
try: | |
image = Image.open(io.BytesIO(image_content)) | |
except UnidentifiedImageError: | |
return JSONResponse(content={"error": "Invalid image format"}, status_code=400) | |
# If segment_chess_board is async, use `await`, otherwise remove `await` | |
segmented_image = await segment_chess_board(image) | |
if isinstance(segmented_image, dict): | |
return JSONResponse(content=segmented_image, status_code=400) | |
# Save to in-memory bytes | |
img_bytes = io.BytesIO() | |
segmented_image.save(img_bytes, format="PNG") | |
img_bytes.seek(0) | |
print("Image successfully processed and returned") | |
return StreamingResponse( | |
img_bytes, | |
media_type="image/png", | |
headers={"Content-Disposition": "inline; filename=output.png"} | |
) | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
async def get_coords(file: UploadFile = File(...)): | |
try: | |
image_content = await file.read() | |
if not image_content: | |
print("No image found") | |
return JSONResponse(content={"error": "Empty file uploaded"}, status_code=400) | |
try: | |
image = Image.open(io.BytesIO(image_content)) | |
except UnidentifiedImageError: | |
return JSONResponse(content={"error": "Invalid image format"}, status_code=400) | |
detection_results = await detect_pieces(image) | |
if "error" in detection_results: | |
return JSONResponse(content=detection_results, status_code=400) | |
print("Image successfully processed and returned") | |
return JSONResponse(content={"detections": detection_results}, status_code=200) | |
except Exception as e: | |
print(f"Unexpected error: {str(e)}") | |
return JSONResponse(content={"error": "Unexpected error occurred", "details": str(e)}, status_code=500) | |
async def get_fen(file : UploadFile = File(), perspective : str = Form("w"), next_to_move : str = Form("w")): | |
if perspective not in ["w" , "b"]: | |
return JSONResponse(content={"error" : "Perspective should be w (white) or b (black)"}, status_code=500) | |
if next_to_move not in ["w" , "b"]: | |
return JSONResponse(content={"error" : "next to move should be w (white) or b (black)"}, status_code=500) | |
try: | |
image_content = await file.read() | |
if not image_content: | |
return JSONResponse(content={"error": "Empty file uploaded"}, status_code=400) | |
try: | |
image = Image.open(io.BytesIO(image_content)) | |
except UnidentifiedImageError: | |
return JSONResponse(content={"error": "Invalid image format"}, status_code=400) | |
segmented_image = await segment_chess_board(image) | |
if isinstance(segmented_image, dict): | |
return JSONResponse(content=segmented_image, status_code=400) | |
segmented_image = segmented_image.resize((224, 224)) | |
detection_results = await detect_pieces(segmented_image) | |
if "error" in detection_results: | |
return JSONResponse(content=detection_results, status_code=400) | |
fen = gen_fen(detection_results, perspective, next_to_move) | |
if not fen: | |
return JSONResponse(content={"error": "FEN generation failed", "details": "Invalid input data"}, status_code=500) | |
return JSONResponse(content={"FEN": fen}, status_code=200) | |
except Exception as e: | |
return JSONResponse(content={"error": "Unexpected error occurred", "details": str(e)}, status_code=500) | |
async def getReview(file: UploadFile = File(...)): | |
print(os.getcwd()) | |
print("call recieved") | |
if not file.filename.endswith(".pgn"): | |
return JSONResponse(content={"error": "Invalid file format. Please upload a PGN file"}, status_code=400) | |
try: | |
# Save the uploaded file to a temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pgn") as tmp_file: | |
tmp_file.write(await file.read()) | |
tmp_file_path = tmp_file.name | |
# Analyze the PGN file | |
analysis_result = analyze_pgn(tmp_file_path) | |
# Clean up the temporary file | |
os.remove(tmp_file_path) | |
if not analysis_result: | |
return JSONResponse(content={"error": "No game found in the PGN file"}, status_code=400) | |
return analysis_result | |
except Exception as e: | |
return JSONResponse(content={"error": "Unexpected error occurred", "details": str(e)}, status_code=500) | |