Spaces:
Sleeping
Sleeping
| import requests | |
| import gradio as gr | |
| from ragatouille import RAGPretrainedModel | |
| import logging | |
| from pathlib import Path | |
| from time import perf_counter | |
| from sentence_transformers import CrossEncoder | |
| from huggingface_hub import InferenceClient | |
| from jinja2 import Environment, FileSystemLoader | |
| import numpy as np | |
| from os import getenv | |
| from backend.query_llm import generate_hf, generate_qwen | |
| from backend.semantic_search import table, retriever | |
| from huggingface_hub import InferenceClient | |
| # Bhashini API translation function | |
| api_key = getenv('API_KEY') | |
| user_id = getenv('USER_ID') | |
| def bhashini_translate(text: str, from_code: str = "en", to_code: str = "hi") -> dict: | |
| """Translates text from source language to target language using the Bhashini API.""" | |
| if not text.strip(): | |
| print('Input text is empty. Please provide valid text for translation.') | |
| return {"status_code": 400, "message": "Input text is empty", "translated_content": None, "speech_content": None} | |
| else: | |
| print('Input text - ',text) | |
| print(f'Starting translation process from {from_code} to {to_code}...') | |
| print(f'Starting translation process from {from_code} to {to_code}...') | |
| gr.Warning(f'Translating to {to_code}...') | |
| url = 'https://meity-auth.ulcacontrib.org/ulca/apis/v0/model/getModelsPipeline' | |
| headers = { | |
| "Content-Type": "application/json", | |
| "userID": user_id, | |
| "ulcaApiKey": api_key | |
| } | |
| payload = { | |
| "pipelineTasks": [{"taskType": "translation", "config": {"language": {"sourceLanguage": from_code, "targetLanguage": to_code}}}], | |
| "pipelineRequestConfig": {"pipelineId": "64392f96daac500b55c543cd"} | |
| } | |
| print('Sending initial request to get the pipeline...') | |
| response = requests.post(url, json=payload, headers=headers) | |
| if response.status_code != 200: | |
| print(f'Error in initial request: {response.status_code}') | |
| return {"status_code": response.status_code, "message": "Error in translation request", "translated_content": None} | |
| print('Initial request successful, processing response...') | |
| response_data = response.json() | |
| service_id = response_data["pipelineResponseConfig"][0]["config"][0]["serviceId"] | |
| callback_url = response_data["pipelineInferenceAPIEndPoint"]["callbackUrl"] | |
| print(f'Service ID: {service_id}, Callback URL: {callback_url}') | |
| headers2 = { | |
| "Content-Type": "application/json", | |
| response_data["pipelineInferenceAPIEndPoint"]["inferenceApiKey"]["name"]: response_data["pipelineInferenceAPIEndPoint"]["inferenceApiKey"]["value"] | |
| } | |
| compute_payload = { | |
| "pipelineTasks": [{"taskType": "translation", "config": {"language": {"sourceLanguage": from_code, "targetLanguage": to_code}, "serviceId": service_id}}], | |
| "inputData": {"input": [{"source": text}], "audio": [{"audioContent": None}]} | |
| } | |
| print(f'Sending translation request with text: "{text}"') | |
| compute_response = requests.post(callback_url, json=compute_payload, headers=headers2) | |
| if compute_response.status_code != 200: | |
| print(f'Error in translation request: {compute_response.status_code}') | |
| return {"status_code": compute_response.status_code, "message": "Error in translation", "translated_content": None} | |
| print('Translation request successful, processing translation...') | |
| compute_response_data = compute_response.json() | |
| translated_content = compute_response_data["pipelineResponse"][0]["output"][0]["target"] | |
| print(f'Translation successful. Translated content: "{translated_content}"') | |
| return {"status_code": 200, "message": "Translation successful", "translated_content": translated_content} | |
| # Existing chatbot functions | |
| VECTOR_COLUMN_NAME = "vector" | |
| TEXT_COLUMN_NAME = "text" | |
| HF_TOKEN = getenv("HUGGING_FACE_HUB_TOKEN") | |
| proj_dir = Path(__file__).parent | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1", token=HF_TOKEN) | |
| env = Environment(loader=FileSystemLoader(proj_dir / 'templates')) | |
| template = env.get_template('template.j2') | |
| template_html = env.get_template('template_html.j2') | |
| # def add_text(history, text): | |
| # history = [] if history is None else history | |
| # history = history + [(text, None)] | |
| # return history, gr.Textbox(value="", interactive=False) | |
| def bot(history, cross_encoder): | |
| top_rerank = 25 | |
| top_k_rank = 20 | |
| query = history[-1][0] if history else '' | |
| print('\nQuery: ',query ) | |
| print('\nHistory:',history) | |
| if not query: | |
| gr.Warning("Please submit a non-empty string as a prompt") | |
| raise ValueError("Empty string was submitted") | |
| logger.warning('Retrieving documents...') | |
| if cross_encoder == '(HIGH ACCURATE) ColBERT': | |
| gr.Warning('Retrieving using ColBERT.. First time query will take a minute for model to load..pls wait') | |
| RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0") | |
| RAG_db = RAG.from_index('.ragatouille/colbert/indexes/cbseclass10index') | |
| documents_full = RAG_db.search(query, k=top_k_rank) | |
| documents = [item['content'] for item in documents_full] | |
| prompt = template.render(documents=documents, query=query) | |
| prompt_html = template_html.render(documents=documents, query=query) | |
| generate_fn = generate_hf | |
| history[-1][1] = "" | |
| for character in generate_fn(prompt, history[:-1]): | |
| history[-1][1] = character | |
| yield history, prompt_html | |
| else: | |
| document_start = perf_counter() | |
| query_vec = retriever.encode(query) | |
| doc1 = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank) | |
| documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list() | |
| documents = [doc[TEXT_COLUMN_NAME] for doc in documents] | |
| query_doc_pair = [[query, doc] for doc in documents] | |
| if cross_encoder == '(FAST) MiniLM-L6v2': | |
| cross_encoder1 = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') | |
| elif cross_encoder == '(ACCURATE) BGE reranker': | |
| cross_encoder1 = CrossEncoder('BAAI/bge-reranker-base') | |
| cross_scores = cross_encoder1.predict(query_doc_pair) | |
| sim_scores_argsort = list(reversed(np.argsort(cross_scores))) | |
| documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]] | |
| document_time = perf_counter() - document_start | |
| prompt = template.render(documents=documents, query=query) | |
| prompt_html = template_html.render(documents=documents, query=query) | |
| #generate_fn = generate_hf | |
| generate_fn=generate_qwen | |
| # Create a new history entry instead of modifying the tuple directly | |
| new_history = history[:-1] + [ (prompt, "") ] # query replaced prompt | |
| output='' | |
| # for character in generate_fn(prompt, history[:-1]): | |
| # #new_history[-1] = (query, character) | |
| # output+=character | |
| output=generate_fn(prompt, history[:-1]) | |
| print('Output:',output) | |
| new_history[-1] = (prompt, output) #query replaced with prompt | |
| print('New History',new_history) | |
| #print('prompt html',prompt_html)# Update the last tuple with new text | |
| history_list = list(history[-1]) | |
| history_list[1] = output # Assuming `character` is what you want to assign | |
| # Update the history with the modified list converted back to a tuple | |
| history[-1] = tuple(history_list) | |
| #history[-1][1] = character | |
| # yield new_history, prompt_html | |
| yield history, prompt_html | |
| # new_history,prompt_html | |
| # history[-1][1] = "" | |
| # for character in generate_fn(prompt, history[:-1]): | |
| # history[-1][1] = character | |
| # yield history, prompt_html | |
| #def translate_text(response_text, selected_language): | |
| def translate_text(selected_language,history): | |
| iso_language_codes = { | |
| "Hindi": "hi", | |
| "Gom": "gom", | |
| "Kannada": "kn", | |
| "Dogri": "doi", | |
| "Bodo": "brx", | |
| "Urdu": "ur", | |
| "Tamil": "ta", | |
| "Kashmiri": "ks", | |
| "Assamese": "as", | |
| "Bengali": "bn", | |
| "Marathi": "mr", | |
| "Sindhi": "sd", | |
| "Maithili": "mai", | |
| "Punjabi": "pa", | |
| "Malayalam": "ml", | |
| "Manipuri": "mni", | |
| "Telugu": "te", | |
| "Sanskrit": "sa", | |
| "Nepali": "ne", | |
| "Santali": "sat", | |
| "Gujarati": "gu", | |
| "Odia": "or" | |
| } | |
| to_code = iso_language_codes[selected_language] | |
| response_text = history[-1][1] if history else '' | |
| print('response_text for translation',response_text) | |
| translation = bhashini_translate(response_text, to_code=to_code) | |
| return translation['translated_content'] | |
| # Gradio interface | |
| with gr.Blocks(theme='gradio/soft') as CHATBOT: | |
| history_state = gr.State([]) | |
| with gr.Row(): | |
| with gr.Column(scale=10): | |
| gr.HTML(value="""<div style="color: #FF4500;"><h1>m-</h1>MITHRA<h1><span style="color: #008000">student Manual Chatbot </span></h1></div>""") | |
| gr.HTML(value=f"""<p style="font-family: sans-serif; font-size: 16px;">Using GenAI for CBIC Capacity Building - A free chat bot developed by National Customs Targeting Center using Open source LLMs for CBIC Officers</p>""") | |
| gr.HTML(value=f"""<p style="font-family: Arial, sans-serif; font-size: 14px;">Developed by NCTC,Mumbai. Suggestions may be sent to <a href="mailto:[email protected]" style="color: #00008B; font-style: italic;">[email protected]</a>.</p>""") | |
| with gr.Column(scale=3): | |
| gr.Image(value='logo.png', height=200, width=200) | |
| chatbot = gr.Chatbot( | |
| [], | |
| elem_id="chatbot", | |
| avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg', | |
| 'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'), | |
| bubble_full_width=False, | |
| show_copy_button=True, | |
| show_share_button=True, | |
| ) | |
| with gr.Row(): | |
| txt = gr.Textbox( | |
| scale=3, | |
| show_label=False, | |
| placeholder="Enter text and press enter", | |
| container=False, | |
| ) | |
| txt_btn = gr.Button(value="Submit text", scale=1) | |
| cross_encoder = gr.Radio(choices=['(FAST) MiniLM-L6v2', '(ACCURATE) BGE reranker', '(HIGH ACCURATE) ColBERT'], value='(ACCURATE) BGE reranker', label="Embeddings", info="Only First query to Colbert may take little time)") | |
| language_dropdown = gr.Dropdown( | |
| choices=[ | |
| "Hindi", "Gom", "Kannada", "Dogri", "Bodo", "Urdu", "Tamil", "Kashmiri", "Assamese", "Bengali", "Marathi", | |
| "Sindhi", "Maithili", "Punjabi", "Malayalam", "Manipuri", "Telugu", "Sanskrit", "Nepali", "Santali", | |
| "Gujarati", "Odia" | |
| ], | |
| value="Hindi", # default to Hindi | |
| label="Select Language for Translation" | |
| ) | |
| prompt_html = gr.HTML() | |
| translated_textbox = gr.Textbox(label="Translated Response") | |
| def update_history_and_translate(txt, cross_encoder, history_state, language_dropdown): | |
| print('History state',history_state) | |
| history = history_state | |
| history.append((txt, "")) | |
| #history_state.value=(history) | |
| # Call bot function | |
| # bot_output = list(bot(history, cross_encoder)) | |
| bot_output = next(bot(history, cross_encoder)) | |
| print('bot_output',bot_output) | |
| #history, prompt_html = bot_output[-1] | |
| history, prompt_html = bot_output | |
| print('History',history) | |
| # Update the history state | |
| history_state[:] = history | |
| # Translate text | |
| translated_text = translate_text(language_dropdown, history) | |
| return history, prompt_html, translated_text | |
| txt_msg = txt_btn.click(update_history_and_translate, [txt, cross_encoder, history_state, language_dropdown], [chatbot, prompt_html, translated_textbox]) | |
| txt_msg = txt.submit(update_history_and_translate, [txt, cross_encoder, history_state, language_dropdown], [chatbot, prompt_html, translated_textbox]) | |
| examples = ['CAN U SAY THE DIFFERENCES BETWEEN METALS AND NON METALS?','WHAT IS IONIC BOND?', | |
| 'EXPLAIN ASEXUAL REPRODUCTION'] | |
| gr.Examples(examples, txt) | |
| # Launch the Gradio application | |
| CHATBOT.launch(share=True,debug=True) | |