Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, Response, stream_with_context, render_template | |
import hmac, hashlib, secrets, time, logging, json, requests, os | |
from datetime import datetime, timedelta, timezone | |
# from dotenv import load_dotenv;load_dotenv() | |
# Logging Configuration (Console Only) | |
logging.basicConfig( | |
level=logging.INFO, # Set logging level to INFO | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler()] # Log to console only | |
) | |
logger = logging.getLogger(__name__) # Logger instance | |
app = Flask(__name__) | |
# 🔑 Secret key for API authentication | |
SECRET_KEY = os.getenv("SECRET_KEY") | |
SPECIAL_API_KEY = os.getenv("SPECIAL_API_KEY") | |
ENDPOINT = os.getenv("ENDPOINT") | |
SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT") | |
EXCEPTION = os.getenv("EXCEPTION") | |
AVAILABLE_MODELS = [ | |
"deepseek-r1", | |
"gpt-4-turbo", | |
"gpt-4", | |
"gpt-3.5-turbo", | |
"mistral-7b", | |
"gemini-pro", | |
"claude-3-sonnet", | |
"claude-3-haiku", | |
"llama-3-8b", | |
"llama-3-70b" | |
] | |
# Track API statistics | |
api_usage = {} # Stores {api_key: {"count": X, "reset_time": timestamp}} | |
# ✅ Request Limit Configuration | |
REQUEST_LIMIT = 10 # Max requests per day | |
# Track API statistics | |
request_count = 0 | |
start_time = time.time() | |
# Validate API Key | |
def validate_api_key(api_key): | |
parts = api_key.split("-") | |
if len(parts) != 3 or parts[0] != "TrueSyncAI": | |
return False | |
random_part, received_signature = parts[1], parts[2] | |
expected_signature = hmac.new(SECRET_KEY.encode(), random_part.encode(), hashlib.sha256).hexdigest()[:16] | |
return expected_signature == received_signature | |
# ✅ Generator function for streaming response | |
def stream_response(api_payload): | |
try: | |
response = requests.post( | |
ENDPOINT, | |
json=api_payload, | |
stream=True | |
) | |
if response.status_code == 200 and response.ok: | |
for value in response.iter_lines(decode_unicode=True): | |
if value and "[DONE]" not in value: | |
try: | |
data = json.loads(value[6:]) | |
content = data['choices'][0]['delta']['content'] | |
yield content | |
except: | |
continue | |
else: | |
yield f"Status Code: {response.status_code}, Response: {response.content}" | |
except: | |
yield EXCEPTION | |
# ✅ Function for normal (non-streaming) response | |
def non_stream_response(api_payload) -> str: | |
try: | |
response = requests.post(ENDPOINT, json=api_payload) | |
if response.status_code==200 and response.ok: | |
return response.json()["choices"][0]["message"]["content"] | |
else: | |
return f"Status Code: {response.status_code}, Response: {response.content}" | |
except: | |
return EXCEPTION | |
def home(): | |
return render_template("index.html") | |
def status(): | |
global request_count | |
uptime_seconds = int(time.time() - start_time) | |
# Convert uptime to days, hours, minutes, and seconds | |
days = uptime_seconds // 86400 | |
hours = (uptime_seconds % 86400) // 3600 | |
minutes = (uptime_seconds % 3600) // 60 | |
seconds = uptime_seconds % 60 | |
uptime_str = f"{days} days, {hours} hours, {minutes} minutes and {seconds} seconds" | |
logger.info(f"📊 Status Check | Uptime: {uptime_str} | Total Requests: {request_count}") | |
return jsonify({ | |
"status": "API is running", | |
"total_requests": request_count, | |
"uptime": uptime_str | |
}) | |
def usage(): | |
api_key = request.args.get("api_key") | |
if not api_key or not validate_api_key(api_key): | |
return jsonify({"error": "Invalid API Key"}), 401 | |
now = datetime.now(timezone.utc) # ✅ Use timezone-aware datetime | |
user_data = api_usage.get(api_key, {"count": 0, "reset_time": now + timedelta(days=1)}) | |
remaining_requests = max(0, REQUEST_LIMIT - user_data["count"]) | |
reset_time = user_data["reset_time"].strftime("%Y-%m-%d %H:%M:%S UTC") | |
return jsonify({ | |
"api_key": api_key, | |
"requests_used": user_data["count"], | |
"remaining_requests": remaining_requests, | |
"reset_time": reset_time | |
}) | |
def get_available_models(): | |
""" | |
Returns a list of available AI models. | |
""" | |
logger.info("📜 Model list requested") | |
return jsonify({ | |
"models": ", ".join(AVAILABLE_MODELS), | |
}) | |
# Generate API Key | |
def generate_api_key(): | |
random_part = secrets.token_hex(16) | |
signature = hmac.new(SECRET_KEY.encode(), random_part.encode(), hashlib.sha256).hexdigest()[:16] | |
api_key = f"TrueSyncAI-{random_part}-{signature}" | |
return jsonify({"api_key": api_key}) | |
# ✅ Chat API with configurable parameters | |
def chat(): | |
global request_count | |
data = request.json | |
api_key = data.get("api_key") | |
message = data.get("message", "").strip() | |
logger.info(f"🔹 Incoming Chat Request | API Key: {api_key} | Message: {message}") | |
if not api_key or not validate_api_key(api_key): | |
logger.warning("❌ Invalid API Key Attempt") | |
return jsonify({"error": "Invalid API Key"}), 401 | |
# Validate required message field | |
if not message: | |
logger.warning("⚠️ Empty message received") | |
return jsonify({"error": "Message cannot be empty"}), 400 | |
# ✅ Apply Limit to the Specific API Key | |
if api_key == SPECIAL_API_KEY: | |
now = datetime.now(timezone.utc) | |
user_data = api_usage.get(api_key, {"count": 0, "reset_time": now + timedelta(days=1)}) | |
# Reset count if reset time has passed | |
if now >= user_data["reset_time"]: | |
user_data = {"count": 0, "reset_time": now + timedelta(days=1)} | |
# Block requests if limit exceeded | |
if user_data["count"] >= REQUEST_LIMIT: | |
return jsonify({"error": "Request limit reached. This is a testing API key for developers. The limit resets daily. Please wait or use a different key."}), 429 | |
# Increase request count | |
user_data["count"] += 1 | |
api_usage[api_key] = user_data # Update storage | |
# Extract optional parameters with defaults | |
stream = data.get("stream", False) | |
model = data.get("model", "deepseek-r1") | |
temperature = data.get("temperature", 0.2) | |
presence_penalty = data.get("presence_penalty", 0) | |
frequency_penalty = data.get("frequency_penalty", 0) | |
top_p = data.get("top_p", 1) | |
max_tokens = data.get("max_tokens", 4000) | |
api_payload = { | |
"messages": [ | |
{'role': 'system', 'content': SYSTEM_PROMPT}, | |
{'role': 'user', 'content': message} | |
], | |
"stream": stream, | |
"model": model, | |
"temperature": temperature, | |
"presence_penalty": presence_penalty, | |
"frequency_penalty": frequency_penalty, | |
"top_p": top_p, | |
"max_tokens": max_tokens | |
} | |
request_count += 1 | |
# Return streaming or normal response based on user request | |
if stream: | |
return Response(stream_with_context(stream_response(api_payload)), content_type="text/plain") | |
else: | |
response_text = non_stream_response(api_payload) | |
logger.info(f"✅ Response Sent | API Key: {api_key} | Token Usage: {len(response_text)} chars") | |
return jsonify({"response": response_text}) | |
if __name__ == "__main__": | |
logger.info("🚀 TrueSyncAI API is starting...") | |
app.run(host="0.0.0.0", port=7860) # Hugging Face Spaces default port | |