|
''' |
|
Created By Lewis Kamau Kimaru |
|
Sema translator fastapi implementation |
|
January 2024 |
|
Docker deployment |
|
''' |
|
|
|
from fastapi import FastAPI, HTTPException, Request, Depends |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import HTMLResponse |
|
import uvicorn |
|
|
|
import ctranslate2 |
|
import sentencepiece as spm |
|
import fasttext |
|
|
|
import pytz |
|
from datetime import datetime |
|
import os |
|
|
|
app = FastAPI() |
|
|
|
origins = ["*"] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=False, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
fasttext.FastText.eprint = lambda x: None |
|
|
|
|
|
templates_folder = os.path.join(os.path.dirname(__file__), "templates") |
|
|
|
|
|
|
|
|
|
def get_time(): |
|
nairobi_timezone = pytz.timezone('Africa/Nairobi') |
|
current_time_nairobi = datetime.now(nairobi_timezone) |
|
|
|
curr_day = current_time_nairobi.strftime('%A') |
|
curr_date = current_time_nairobi.strftime('%Y-%m-%d') |
|
curr_time = current_time_nairobi.strftime('%H:%M:%S') |
|
|
|
full_date = f"{curr_day} | {curr_date} | {curr_time}" |
|
return full_date, curr_time |
|
|
|
|
|
beam_size = 1 |
|
device = "cpu" |
|
|
|
|
|
print("\nimporting Languages Prediction model") |
|
lang_model_file = "lid218e.bin" |
|
lang_model_full_path = os.path.join(os.path.dirname(__file__), lang_model_file) |
|
lang_model = fasttext.load_model(lang_model_full_path) |
|
|
|
|
|
|
|
print("\nimporting SentencePiece model") |
|
sp_model_file = "spm.model" |
|
sp_model_full_path = os.path.join(os.path.dirname(__file__), sp_model_file) |
|
sp = spm.SentencePieceProcessor() |
|
sp.load(sp_model_full_path) |
|
|
|
|
|
print("\nimporting Translator model") |
|
ct_model_file = "sematrans-3.3B" |
|
ct_model_full_path = os.path.join(os.path.dirname(__file__), ct_model_file) |
|
translator = ctranslate2.Translator(ct_model_full_path, device) |
|
|
|
print('\nDone importing models\n') |
|
|
|
|
|
def translate_detect(userinput: str, target_lang: str): |
|
source_sents = [userinput] |
|
source_sents = [sent.strip() for sent in source_sents] |
|
target_prefix = [[target_lang]] * len(source_sents) |
|
|
|
|
|
predictions = lang_model.predict(source_sents[0], k=1) |
|
source_lang = predictions[0][0].replace('__label__', '') |
|
|
|
|
|
source_sents_subworded = sp.encode(source_sents, out_type=str) |
|
source_sents_subworded = [[source_lang] + sent + ["</s>"] for sent in source_sents_subworded] |
|
|
|
|
|
translations = translator.translate_batch( |
|
source_sents_subworded, |
|
batch_type="tokens", |
|
max_batch_size=2024, |
|
beam_size=beam_size, |
|
target_prefix=target_prefix, |
|
) |
|
translations = [translation[0]['tokens'] for translation in translations] |
|
|
|
|
|
translations_desubword = sp.decode(translations) |
|
translations_desubword = [sent[len(target_lang):] for sent in translations_desubword] |
|
|
|
|
|
return source_lang, translations_desubword |
|
|
|
def translate_enter(userinput: str, source_lang: str, target_lang: str): |
|
source_sents = [userinput] |
|
source_sents = [sent.strip() for sent in source_sents] |
|
target_prefix = [[target_lang]] * len(source_sents) |
|
|
|
|
|
source_sents_subworded = sp.encode(source_sents, out_type=str) |
|
source_sents_subworded = [[source_lang] + sent + ["</s>"] for sent in source_sents_subworded] |
|
|
|
|
|
translations = translator.translate_batch(source_sents_subworded, batch_type="tokens", max_batch_size=2024, beam_size=beam_size, target_prefix=target_prefix) |
|
translations = [translation[0]['tokens'] for translation in translations] |
|
|
|
|
|
translations_desubword = sp.decode(translations) |
|
translations_desubword = [sent[len(target_lang):] for sent in translations_desubword] |
|
|
|
|
|
return translations_desubword[0] |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def read_root(request: Request): |
|
return HTMLResponse(content=open(os.path.join(templates_folder, "translator.html"), "r").read(), status_code=200) |
|
|
|
|
|
@app.post("/translate_detect/") |
|
async def translate_detect_endpoint(request: Request): |
|
datad = await request.json() |
|
userinputd = datad.get("userinput") |
|
target_langd = datad.get("target_lang") |
|
dfull_date = get_time()[0] |
|
print(f"\nrequest: {dfull_date}\nTarget Language; {target_langd}, User Input: {userinputd}\n") |
|
|
|
if not userinputd or not target_langd: |
|
raise HTTPException(status_code=422, detail="Both 'userinput' and 'target_lang' are required.") |
|
|
|
source_langd, translated_text_d = translate_detect(userinputd, target_langd) |
|
dcurrent_time = get_time()[1] |
|
print(f"\nresponse: {dcurrent_time}; ... Source_language: {source_langd}, Translated Text: {translated_text_d}\n\n") |
|
return { |
|
"source_language": source_langd, |
|
"translated_text": translated_text_d[0], |
|
} |
|
|
|
|
|
@app.post("/translate_enter/") |
|
async def translate_enter_endpoint(request: Request): |
|
datae = await request.json() |
|
userinpute = datae.get("userinput") |
|
source_lange = datae.get("source_lang") |
|
target_lange = datae.get("target_lang") |
|
efull_date = get_time()[0] |
|
print(f"\nrequest: {efull_date}\nSource_language; {source_lange}, Target Language; {target_lange}, User Input: {userinpute}\n") |
|
|
|
if not userinpute or not target_lange: |
|
raise HTTPException(status_code=422, detail="'userinput' 'sourc_lang'and 'target_lang' are required.") |
|
|
|
translated_text_e = translate_enter(userinpute, source_lange, target_lange) |
|
ecurrent_time = get_time()[1] |
|
print(f"\nresponse: {ecurrent_time}; ... Translated Text: {translated_text_e}\n\n") |
|
return { |
|
"translated_text": translated_text_e, |
|
} |
|
|
|
|
|
print("\nAPIss starting .......\n") |
|
|