from huggingface_hub import snapshot_download import gradio as gr import openvino_genai import librosa import numpy as np from threading import Lock, Event from scipy.ndimage import uniform_filter1d from queue import Queue, Empty from googleapiclient.discovery import build from concurrent.futures import ThreadPoolExecutor, as_completed import time import cpuinfo import gc import os # Set CPU affinity for optimization os.environ["GOMP_CPU_AFFINITY"] = "0-7" # Use first 8 CPU cores os.environ["OMP_NUM_THREADS"] = "8" # Configuration constants GOOGLE_API_KEY = "AIzaSyAo-1iW5MEZbc53DlEldtnUnDaYuTHUDH4" GOOGLE_CSE_ID = "3027bedf3c88a4efb" DEFAULT_MAX_TOKENS = 100 DEFAULT_NUM_IMAGES = 1 MAX_HISTORY_TURNS = 2 MAX_TOKENS_LIMIT = 1000 # Download models start_time = time.time() snapshot_download(repo_id="OpenVINO/mistral-7b-instruct-v0.1-int8-ov", local_dir="mistral-ov") snapshot_download(repo_id="OpenVINO/whisper-tiny-fp16-ov", local_dir="whisper-ov-model") print(f"Model download time: {time.time() - start_time:.2f} seconds") # CPU-specific configuration cpu_features = cpuinfo.get_cpu_info()['flags'] config_options = {} if 'avx512' in cpu_features: config_options["ENFORCE_BF16"] = "YES" print("Using AVX512 optimizations") elif 'avx2' in cpu_features: config_options["INFERENCE_PRECISION_HINT"] = "f32" print("Using AVX2 optimizations") # Initialize models with performance flags start_time = time.time() mistral_pipe = openvino_genai.LLMPipeline( "mistral-ov", device="CPU", config={ "PERFORMANCE_HINT": "THROUGHPUT", **config_options } ) whisper_pipe = openvino_genai.WhisperPipeline( "whisper-ov-model", device="CPU" ) pipe_lock = Lock() print(f"Model initialization time: {time.time() - start_time:.2f} seconds") # Warm up models print("Warming up models...") start_time = time.time() with pipe_lock: mistral_pipe.generate("Warmup", openvino_genai.GenerationConfig(max_new_tokens=10)) whisper_pipe.generate(np.zeros(16000, dtype=np.float32)) print(f"Model warmup time: {time.time() - start_time:.2f} seconds") # Thread pools generation_executor = ThreadPoolExecutor(max_workers=4) # Increased workers image_executor = ThreadPoolExecutor(max_workers=8) def fetch_images(query: str, num: int = DEFAULT_NUM_IMAGES) -> list: """Fetch unique images by requesting different result pages""" start_time = time.time() if num <= 0: return [] try: service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) image_links = [] seen_urls = set() # To track unique URLs # Start from different positions to get unique images for start_index in range(1, num * 2, 2): # Step by 2 to get different pages if len(image_links) >= num: break res = service.cse().list( q=query, cx=GOOGLE_CSE_ID, searchType="image", num=1, # Get one result per request start=start_index # Start at different positions ).execute() if "items" in res and res["items"]: item = res["items"][0] # Skip duplicates if item["link"] not in seen_urls: image_links.append(item["link"]) seen_urls.add(item["link"]) print(f"Unique image fetch time: {time.time() - start_time:.2f} seconds") return image_links[:num] # Return only the requested number except Exception as e: print(f"Error in image fetching: {e}") return [] def process_audio(data, sr): start_time = time.time() data = librosa.to_mono(data.T) if data.ndim > 1 else data data = data.astype(np.float32) data /= np.max(np.abs(data)) rms = librosa.feature.rms(y=data, frame_length=2048, hop_length=512)[0] smoothed_rms = uniform_filter1d(rms, size=5) speech_frames = np.where(smoothed_rms > 0.025)[0] if not speech_frames.size: print(f"Audio processing time: {time.time() - start_time:.2f} seconds") return None start = max(0, int(speech_frames[0] * 512 - 0.1 * sr)) end = min(len(data), int((speech_frames[-1] + 1) * 512 + 0.1 * sr)) print(f"Audio processing time: {time.time() - start_time:.2f} seconds") return data[start:end] def transcribe(audio): start_time = time.time() if audio is None: print(f"Transcription time: {time.time() - start_time:.2f} seconds") return "" sr, data = audio processed = process_audio(data, sr) if processed is None or len(processed) < 1600: print(f"Transcription time: {time.time() - start_time:.2f} seconds") return "" if sr != 16000: processed = librosa.resample(processed, orig_sr=sr, target_sr=16000) result = whisper_pipe.generate(processed) print(f"Transcription time: {time.time() - start_time:.2f} seconds") return result def stream_answer(message: str, max_tokens: int, include_images: bool) -> str: start_time = time.time() response_queue = Queue() completion_event = Event() error = [None] optimized_config = openvino_genai.GenerationConfig( max_new_tokens=max_tokens, num_beams=1, do_sample=False, temperature=1.0, top_p=0.9, top_k=30, streaming=True, streaming_interval=5 # Batch tokens in groups of 5 ) def callback(tokens): # Now accepts multiple tokens response_queue.put("".join(tokens)) return openvino_genai.StreamingStatus.RUNNING def generate(): try: with pipe_lock: mistral_pipe.generate(message, optimized_config, callback) except Exception as e: error[0] = str(e) finally: completion_event.set() generation_executor.submit(generate) accumulated = [] token_count = 0 last_gc = time.time() while not completion_event.is_set() or not response_queue.empty(): if error[0]: yield f"Error: {error[0]}" print(f"Stream answer time: {time.time() - start_time:.2f} seconds") return try: token_batch = response_queue.get_nowait() accumulated.append(token_batch) token_count += len(token_batch) # Periodic garbage collection if time.time() - last_gc > 2.0: # Every 2 seconds gc.collect() last_gc = time.time() yield "".join(accumulated) except Empty: continue print(f"Generated {token_count} tokens in {time.time() - start_time:.2f} seconds " f"({token_count/(time.time() - start_time):.2f} tokens/sec)") yield "".join(accumulated) def run_chat(message: str, history: list, include_images: bool, max_tokens: int, num_images: int): start_time = time.time() final_text = "" # Create a placeholder for the streaming response history.append((message, "", [])) rendered_history = render_history(history) yield rendered_history, gr.update(value="", interactive=False) # Stream tokens and update chatbot in real-time for output in stream_answer(message, max_tokens, include_images): final_text = output # Update only the last response in history updated_history = history[:-1] + [(message, final_text, [])] rendered_history = render_history(updated_history) yield rendered_history, gr.update(value="", interactive=False) images = [] if include_images: images = fetch_images(message, num_images) # Update history with final response and images history[-1] = (message, final_text, images) if len(history) > MAX_HISTORY_TURNS: history = history[-MAX_HISTORY_TURNS:] rendered_history = render_history(history) print(f"Total chat time: {time.time() - start_time:.2f} seconds") yield rendered_history, gr.update(value="", interactive=True) def render_history(history): start_time = time.time() rendered = [] for user_msg, bot_msg, image_links in history: text = bot_msg if image_links: images_html = "".join( f"" for url in image_links ) text += f"

📸 Related Visuals:
{images_html}
" rendered.append((user_msg, text)) return rendered with gr.Blocks(css=""" .processing { animation: pulse 1.5s infinite; color: #4a5568; padding: 10px; border-radius: 5px; text-align: center; margin: 10px 0; } @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.5; } } .chat-image { cursor: pointer; transition: transform 0.2s; max-height: 100px; margin: 4px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .chat-image:hover { transform: scale(1.05); box-shadow: 0 4px 8px rgba(0,0,0,0.2); } .modal { position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0,0,0,0.8); display: none; z-index: 1000; cursor: zoom-out; } .modal-content { position: absolute; top: 50%; left: 50%; transform: translate(-50%, -50%); max-width: 90%; max-height: 90%; background: white; padding: 10px; border-radius: 12px; } .modal-img { width: auto; height: auto; max-width: 100%; max-height: 100%; border-radius: 8px; } .chat-container { border: 1px solid #e5e7eb; border-radius: 12px; padding: 20px; margin-bottom: 20px; } .slider-container { margin-top: 20px; padding: 15px; border-radius: 10px; background-color: #f8f9fa; } .slider-label { font-weight: bold; margin-bottom: 5px; } .system-info { background-color: #7B9BDB; padding: 15px; border-radius: 8px; margin: 15px 0; border-left: 4px solid #1890ff; } .typing-indicator { display: inline-block; position: relative; width: 40px; height: 20px; } .typing-dot { display: inline-block; width: 6px; height: 6px; border-radius: 50%; background-color: #4a5568; position: absolute; animation: typing 1.4s infinite ease-in-out; } .typing-dot:nth-child(1) { left: 0; animation-delay: 0s; } .typing-dot:nth-child(2) { left: 12px; animation-delay: 0.2s; } .typing-dot:nth-child(3) { left: 24px; animation-delay: 0.4s; } @keyframes typing { 0%, 60%, 100% { transform: translateY(0); } 30% { transform: translateY(-5px); } } """) as demo: gr.Markdown("# 🤖 EDU CHAT BY PHANINDRA REDDY K") # System info banner gr.HTML("""
Performance Optimized for High-RAM Systems
""") modal_html = """ """ gr.HTML(modal_html) state = gr.State([]) with gr.Column(scale=2, elem_classes="chat-container"): chatbot = gr.Chatbot(label="Conversation", height=500, bubble_full_width=False) with gr.Column(scale=1): gr.Markdown("### 💬 Ask Your Question") with gr.Row(): user_input = gr.Textbox( placeholder="Type your question here...", label="", container=False, elem_id="question-input" ) include_images = gr.Checkbox( label="Include Visuals", value=True, container=False, elem_id="image-checkbox" ) # Add the sliders container with gr.Column(elem_classes="slider-container"): gr.Markdown("### ⚙️ Generation Settings") with gr.Row(): max_tokens = gr.Slider( minimum=10, maximum=MAX_TOKENS_LIMIT, # Increased to 1000 value=DEFAULT_MAX_TOKENS, step=10, label="Response Length (Tokens)", info=f"Max: {MAX_TOKENS_LIMIT} tokens (for detailed explanations)", elem_classes="slider-label" ) # Conditionally visible image slider row with gr.Row(visible=True) as image_slider_row: num_images = gr.Slider( minimum=0, maximum=5, value=DEFAULT_NUM_IMAGES, step=1, label="Number of Images", info="Set to 0 to disable images", elem_classes="slider-label" ) with gr.Row(): submit_btn = gr.Button("Send Text", variant="primary") mic_btn = gr.Button("Transcribe Voice", variant="secondary") mic = gr.Audio( sources=["microphone"], type="numpy", label="Voice Input", show_label=False, elem_id="voice-input" ) processing = gr.HTML(""" """) # Toggle image slider visibility based on checkbox def toggle_image_slider(include_visuals): return gr.update(visible=include_visuals) include_images.change( fn=toggle_image_slider, inputs=include_images, outputs=image_slider_row ) def toggle_processing(): return gr.update(visible=True), gr.update(interactive=False) def hide_processing(): return gr.update(visible=False), gr.update(interactive=True) # Update the submit_btn click handler to include streaming submit_btn.click( fn=toggle_processing, outputs=[processing, submit_btn] ).then( fn=lambda: (gr.update(visible=True), gr.update(interactive=False)), outputs=[processing, submit_btn] ).then( fn=run_chat, inputs=[user_input, state, include_images, max_tokens, num_images], outputs=[chatbot, user_input] ).then( fn=lambda: (gr.update(visible=False), gr.update(interactive=True)), outputs=[processing, submit_btn] ) # Voice transcription remains the same mic_btn.click( fn=toggle_processing, outputs=[processing, mic_btn] ).then( fn=transcribe, inputs=mic, outputs=user_input ).then( fn=hide_processing, outputs=[processing, mic_btn] ) if __name__ == "__main__": demo.launch(share=True, debug=True)