File size: 4,816 Bytes
32107e6 93a4649 32107e6 f059568 32107e6 93a4649 7a38f6d 0da0126 32107e6 03b3a61 32107e6 0da0126 32107e6 8ad9118 32107e6 f059568 08b6d0e 03b3a61 32107e6 0da0126 49a5c30 32107e6 8ad9118 32107e6 dc9e516 32107e6 03b3a61 dc9e516 32107e6 dc9e516 03b3a61 dc9e516 32107e6 03b3a61 dc9e516 32107e6 8ad9118 93a4649 8ad9118 93a4649 32107e6 8ad9118 32107e6 93a4649 8ad9118 dc9e516 aac15bb 93a4649 49a5c30 aac15bb 49a5c30 aac15bb 49a5c30 aac15bb 8ad9118 93a4649 32107e6 93a4649 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import logging
import os
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from transformers import pipeline
import langid
from huggingface_hub import login
import socket
import time
# Global variables
HF_HUB_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")
def current_time_gmt():
return time.gmtime().tm_hour+2,':',time.gmtime().tm_min,':',time.gmtime().tm_sec
# Verify Hugging Face token
if not HF_HUB_TOKEN:
raise ValueError("Missing Hugging Face API token. Please set HUGGINGFACEHUB_API_TOKEN in environment variables.")
login(token=HF_HUB_TOKEN)
# Load Hebrew and English text generation models
hebrew_generator = pipeline("text-generation", model="Norod78/hebrew-gpt_neo-small")
english_generator = pipeline("text-generation", model="distilgpt2")
# Function to detect language
def detect_language(user_input):
try:
# lang = detect(user_input)
lang, _ = langid.classify(user_input) # langid.classify returns a tuple (language, confidence)
print(f"Detected language: {lang}, ", f"current time: {current_time_gmt()}")
return "hebrew" if lang == "he" else "english" if lang == "en" else "unsupported"
except Exception as e:
print(f"Language detection error: {e}")
return "unsupported"
# Function to generate a response
# def generate_response(text):
# language = detect_language(text)
# if language == "hebrew":
# return hebrew_generator(text, max_length=100, truncation=True)[0]["generated_text"]
# elif language == "english":
# return english_generator(text, max_length=100, truncation=True)[0]["generated_text"]
# return "Sorry, I only support Hebrew and English."
def generate_response(text):
language = detect_language(text)
print(f"Detected language: {language}, ", f"current time: {current_time_gmt()}") # Debugging
if language == "hebrew":
output = hebrew_generator(text, max_length=100, truncation=True)
print(f"Hebrew model output: {output}, ", f"current time: {current_time_gmt()}") # Debugging
return output[0]["generated_text"]
elif language == "english":
output = english_generator(text, max_length=50, truncation=True)
print(f"English model output: {output}, ", f"current time: {current_time_gmt()}") # Debugging
return output[0]["generated_text"]
return "Sorry, I only support Hebrew and English."
# FastAPI lifespan event
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Starting application...")
yield # Wait until app closes
print("Shutting down application...")
# Create FastAPI app
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Decision Helper API is running!"}
# @app.post("/generate_response")
# async def generate_text(request: Request):
# try:
# data = await request.json()
# text = data.get("text", "").strip() # removes non-relevant spaces
# if not text:
# return {"error": "No text provided"}
# response = generate_response(text)
# return {"response": response}
# except Exception as e:
# logging.error(f"Error processing request: {e}")
# return {"error": "Invalid request. Please send JSON with a 'text' field."}
# @app.post("/generate_response")
# async def generate_text(request: Request):
# try:
# data = await request.json()
# logging.info(f"Received request: {data}") # Log the request data
# text = data.get("text", "").strip() # removes non-relevant spaces
# if not text:
# return {"error": "No text provided"}
# response = generate_response(text)
# logging.info(f"Generated response: {response}") # Log the response
# return {"response": response}
# except Exception as e:
# logging.error(f"Error processing request: {e}")
# return {"error": "Invalid request. Please send JSON with a 'text' field."}
@app.post("/generate_response")
async def generate_text(request: Request):
try:
data = await request.json()
if not data or "text" not in data:
logging.error("Received an empty or invalid request")
return {"error": "Invalid request. Please send JSON with a 'text' field."}
text = data["text"].strip()
if not text:
return {"error": "No text provided"}
response = generate_response(text)
return {"response": response}
except Exception as e:
logging.error(f"Error processing request: {e}")
return {"error": "Invalid request. Please send JSON with a 'text' field."}
# Run the server
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
|