Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from openai import AsyncOpenAI | |
| from pydantic import BaseModel | |
| import asyncio | |
| # Initialize FastAPI app | |
| app = FastAPI() | |
| # Define request body model for the prompt | |
| class PromptRequest(BaseModel): | |
| prompt: str | |
| # Initialize OpenAI client | |
| token = os.getenv("GITHUB_TOKEN") | |
| if not token: | |
| raise ValueError("GITHUB_TOKEN environment variable not set") | |
| endpoint = "https://models.github.ai/inference" | |
| model = "openai/gpt-4.1-mini" | |
| client = AsyncOpenAI(base_url=endpoint, api_key=token) | |
| # Async generator to stream chunks | |
| async def stream_response(prompt: str): | |
| try: | |
| # Create streaming chat completion | |
| stream = await client.chat.completions.create( | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=1.0, | |
| top_p=1.0, | |
| model=model, | |
| stream=True | |
| ) | |
| # Yield each chunk as it arrives | |
| async for chunk in stream: | |
| if chunk.choices and len(chunk.choices) > 0: | |
| content = chunk.choices[0].delta.content or "" | |
| yield content | |
| except Exception as err: | |
| yield f"Error: {err}" | |
| # Endpoint to handle prompt and stream response | |
| async def generate_response(request: PromptRequest): | |
| try: | |
| # Return a StreamingResponse with the async generator | |
| return StreamingResponse( | |
| stream_response(request.prompt), | |
| media_type="text/plain" | |
| ) | |
| except Exception as err: | |
| raise HTTPException(status_code=500, detail=f"Server error: {err}") | |