phani50101's picture
Add application file2
809312a
from huggingface_hub import snapshot_download
import gradio as gr
import openvino_genai
import librosa
import numpy as np
from threading import Lock, Event
from scipy.ndimage import uniform_filter1d
from queue import Queue, Empty
from googleapiclient.discovery import build
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import cpuinfo
import gc
import os
# Set CPU affinity for optimization
os.environ["GOMP_CPU_AFFINITY"] = "0-7" # Use first 8 CPU cores
os.environ["OMP_NUM_THREADS"] = "8"
# Configuration constants
GOOGLE_API_KEY = "AIzaSyAo-1iW5MEZbc53DlEldtnUnDaYuTHUDH4"
GOOGLE_CSE_ID = "3027bedf3c88a4efb"
DEFAULT_MAX_TOKENS = 100
DEFAULT_NUM_IMAGES = 1
MAX_HISTORY_TURNS = 2
MAX_TOKENS_LIMIT = 1000
# Download models
start_time = time.time()
snapshot_download(repo_id="OpenVINO/mistral-7b-instruct-v0.1-int8-ov", local_dir="mistral-ov")
snapshot_download(repo_id="OpenVINO/whisper-tiny-fp16-ov", local_dir="whisper-ov-model")
print(f"Model download time: {time.time() - start_time:.2f} seconds")
# CPU-specific configuration
cpu_features = cpuinfo.get_cpu_info()['flags']
config_options = {}
if 'avx512' in cpu_features:
config_options["ENFORCE_BF16"] = "YES"
print("Using AVX512 optimizations")
elif 'avx2' in cpu_features:
config_options["INFERENCE_PRECISION_HINT"] = "f32"
print("Using AVX2 optimizations")
# Initialize models with performance flags
start_time = time.time()
mistral_pipe = openvino_genai.LLMPipeline(
"mistral-ov",
device="CPU",
config={
"PERFORMANCE_HINT": "THROUGHPUT",
**config_options
}
)
whisper_pipe = openvino_genai.WhisperPipeline(
"whisper-ov-model",
device="CPU"
)
pipe_lock = Lock()
print(f"Model initialization time: {time.time() - start_time:.2f} seconds")
# Warm up models
print("Warming up models...")
start_time = time.time()
with pipe_lock:
mistral_pipe.generate("Warmup", openvino_genai.GenerationConfig(max_new_tokens=10))
whisper_pipe.generate(np.zeros(16000, dtype=np.float32))
print(f"Model warmup time: {time.time() - start_time:.2f} seconds")
# Thread pools
generation_executor = ThreadPoolExecutor(max_workers=4) # Increased workers
image_executor = ThreadPoolExecutor(max_workers=8)
def fetch_images(query: str, num: int = DEFAULT_NUM_IMAGES) -> list:
"""Fetch unique images by requesting different result pages"""
start_time = time.time()
if num <= 0:
return []
try:
service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY)
image_links = []
seen_urls = set() # To track unique URLs
# Start from different positions to get unique images
for start_index in range(1, num * 2, 2): # Step by 2 to get different pages
if len(image_links) >= num:
break
res = service.cse().list(
q=query,
cx=GOOGLE_CSE_ID,
searchType="image",
num=1, # Get one result per request
start=start_index # Start at different positions
).execute()
if "items" in res and res["items"]:
item = res["items"][0]
# Skip duplicates
if item["link"] not in seen_urls:
image_links.append(item["link"])
seen_urls.add(item["link"])
print(f"Unique image fetch time: {time.time() - start_time:.2f} seconds")
return image_links[:num] # Return only the requested number
except Exception as e:
print(f"Error in image fetching: {e}")
return []
def process_audio(data, sr):
start_time = time.time()
data = librosa.to_mono(data.T) if data.ndim > 1 else data
data = data.astype(np.float32)
data /= np.max(np.abs(data))
rms = librosa.feature.rms(y=data, frame_length=2048, hop_length=512)[0]
smoothed_rms = uniform_filter1d(rms, size=5)
speech_frames = np.where(smoothed_rms > 0.025)[0]
if not speech_frames.size:
print(f"Audio processing time: {time.time() - start_time:.2f} seconds")
return None
start = max(0, int(speech_frames[0] * 512 - 0.1 * sr))
end = min(len(data), int((speech_frames[-1] + 1) * 512 + 0.1 * sr))
print(f"Audio processing time: {time.time() - start_time:.2f} seconds")
return data[start:end]
def transcribe(audio):
start_time = time.time()
if audio is None:
print(f"Transcription time: {time.time() - start_time:.2f} seconds")
return ""
sr, data = audio
processed = process_audio(data, sr)
if processed is None or len(processed) < 1600:
print(f"Transcription time: {time.time() - start_time:.2f} seconds")
return ""
if sr != 16000:
processed = librosa.resample(processed, orig_sr=sr, target_sr=16000)
result = whisper_pipe.generate(processed)
print(f"Transcription time: {time.time() - start_time:.2f} seconds")
return result
def stream_answer(message: str, max_tokens: int, include_images: bool) -> str:
start_time = time.time()
response_queue = Queue()
completion_event = Event()
error = [None]
optimized_config = openvino_genai.GenerationConfig(
max_new_tokens=max_tokens,
num_beams=1,
do_sample=False,
temperature=1.0,
top_p=0.9,
top_k=30,
streaming=True,
streaming_interval=5 # Batch tokens in groups of 5
)
def callback(tokens): # Now accepts multiple tokens
response_queue.put("".join(tokens))
return openvino_genai.StreamingStatus.RUNNING
def generate():
try:
with pipe_lock:
mistral_pipe.generate(message, optimized_config, callback)
except Exception as e:
error[0] = str(e)
finally:
completion_event.set()
generation_executor.submit(generate)
accumulated = []
token_count = 0
last_gc = time.time()
while not completion_event.is_set() or not response_queue.empty():
if error[0]:
yield f"Error: {error[0]}"
print(f"Stream answer time: {time.time() - start_time:.2f} seconds")
return
try:
token_batch = response_queue.get_nowait()
accumulated.append(token_batch)
token_count += len(token_batch)
# Periodic garbage collection
if time.time() - last_gc > 2.0: # Every 2 seconds
gc.collect()
last_gc = time.time()
yield "".join(accumulated)
except Empty:
continue
print(f"Generated {token_count} tokens in {time.time() - start_time:.2f} seconds "
f"({token_count/(time.time() - start_time):.2f} tokens/sec)")
yield "".join(accumulated)
def run_chat(message: str, history: list, include_images: bool, max_tokens: int, num_images: int):
start_time = time.time()
final_text = ""
# Create a placeholder for the streaming response
history.append((message, "", []))
rendered_history = render_history(history)
yield rendered_history, gr.update(value="", interactive=False)
# Stream tokens and update chatbot in real-time
for output in stream_answer(message, max_tokens, include_images):
final_text = output
# Update only the last response in history
updated_history = history[:-1] + [(message, final_text, [])]
rendered_history = render_history(updated_history)
yield rendered_history, gr.update(value="", interactive=False)
images = []
if include_images:
images = fetch_images(message, num_images)
# Update history with final response and images
history[-1] = (message, final_text, images)
if len(history) > MAX_HISTORY_TURNS:
history = history[-MAX_HISTORY_TURNS:]
rendered_history = render_history(history)
print(f"Total chat time: {time.time() - start_time:.2f} seconds")
yield rendered_history, gr.update(value="", interactive=True)
def render_history(history):
start_time = time.time()
rendered = []
for user_msg, bot_msg, image_links in history:
text = bot_msg
if image_links:
images_html = "".join(
f"<img src='{url}' class='chat-image' onclick='showImage(\"{url}\")' />"
for url in image_links
)
text += f"<br><br><b>📸 Related Visuals:</b><br><div style='display: flex; flex-wrap: wrap;'>{images_html}</div>"
rendered.append((user_msg, text))
return rendered
with gr.Blocks(css="""
.processing {
animation: pulse 1.5s infinite;
color: #4a5568;
padding: 10px;
border-radius: 5px;
text-align: center;
margin: 10px 0;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
.chat-image {
cursor: pointer;
transition: transform 0.2s;
max-height: 100px;
margin: 4px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.chat-image:hover {
transform: scale(1.05);
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
}
.modal {
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background: rgba(0,0,0,0.8);
display: none;
z-index: 1000;
cursor: zoom-out;
}
.modal-content {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
max-width: 90%;
max-height: 90%;
background: white;
padding: 10px;
border-radius: 12px;
}
.modal-img {
width: auto;
height: auto;
max-width: 100%;
max-height: 100%;
border-radius: 8px;
}
.chat-container {
border: 1px solid #e5e7eb;
border-radius: 12px;
padding: 20px;
margin-bottom: 20px;
}
.slider-container {
margin-top: 20px;
padding: 15px;
border-radius: 10px;
background-color: #f8f9fa;
}
.slider-label {
font-weight: bold;
margin-bottom: 5px;
}
.system-info {
background-color: #7B9BDB;
padding: 15px;
border-radius: 8px;
margin: 15px 0;
border-left: 4px solid #1890ff;
}
.typing-indicator {
display: inline-block;
position: relative;
width: 40px;
height: 20px;
}
.typing-dot {
display: inline-block;
width: 6px;
height: 6px;
border-radius: 50%;
background-color: #4a5568;
position: absolute;
animation: typing 1.4s infinite ease-in-out;
}
.typing-dot:nth-child(1) {
left: 0;
animation-delay: 0s;
}
.typing-dot:nth-child(2) {
left: 12px;
animation-delay: 0.2s;
}
.typing-dot:nth-child(3) {
left: 24px;
animation-delay: 0.4s;
}
@keyframes typing {
0%, 60%, 100% { transform: translateY(0); }
30% { transform: translateY(-5px); }
}
""") as demo:
gr.Markdown("# 🤖 EDU CHAT BY PHANINDRA REDDY K")
# System info banner
gr.HTML("""
<div class="system-info">
<strong>Performance Optimized for High-RAM Systems</strong>
<ul>
<li>Adaptive resource allocation based on request type</li>
</ul>
</div>
""")
modal_html = """
<div class="modal" id="imageModal" onclick="this.style.display='none'">
<div class="modal-content">
<img class="modal-img" id="expandedImg">
</div>
</div>
<script>
function showImage(url) {
document.getElementById('expandedImg').src = url;
document.getElementById('imageModal').style.display = 'block';
}
</script>
"""
gr.HTML(modal_html)
state = gr.State([])
with gr.Column(scale=2, elem_classes="chat-container"):
chatbot = gr.Chatbot(label="Conversation", height=500, bubble_full_width=False)
with gr.Column(scale=1):
gr.Markdown("### 💬 Ask Your Question")
with gr.Row():
user_input = gr.Textbox(
placeholder="Type your question here...",
label="",
container=False,
elem_id="question-input"
)
include_images = gr.Checkbox(
label="Include Visuals",
value=True,
container=False,
elem_id="image-checkbox"
)
# Add the sliders container
with gr.Column(elem_classes="slider-container"):
gr.Markdown("### ⚙️ Generation Settings")
with gr.Row():
max_tokens = gr.Slider(
minimum=10,
maximum=MAX_TOKENS_LIMIT, # Increased to 1000
value=DEFAULT_MAX_TOKENS,
step=10,
label="Response Length (Tokens)",
info=f"Max: {MAX_TOKENS_LIMIT} tokens (for detailed explanations)",
elem_classes="slider-label"
)
# Conditionally visible image slider row
with gr.Row(visible=True) as image_slider_row:
num_images = gr.Slider(
minimum=0,
maximum=5,
value=DEFAULT_NUM_IMAGES,
step=1,
label="Number of Images",
info="Set to 0 to disable images",
elem_classes="slider-label"
)
with gr.Row():
submit_btn = gr.Button("Send Text", variant="primary")
mic_btn = gr.Button("Transcribe Voice", variant="secondary")
mic = gr.Audio(
sources=["microphone"],
type="numpy",
label="Voice Input",
show_label=False,
elem_id="voice-input"
)
processing = gr.HTML("""
<div id="processing" style="display: none;">
<div class="processing">🔮 Processing your request...</div>
</div>
""")
# Toggle image slider visibility based on checkbox
def toggle_image_slider(include_visuals):
return gr.update(visible=include_visuals)
include_images.change(
fn=toggle_image_slider,
inputs=include_images,
outputs=image_slider_row
)
def toggle_processing():
return gr.update(visible=True), gr.update(interactive=False)
def hide_processing():
return gr.update(visible=False), gr.update(interactive=True)
# Update the submit_btn click handler to include streaming
submit_btn.click(
fn=toggle_processing,
outputs=[processing, submit_btn]
).then(
fn=lambda: (gr.update(visible=True), gr.update(interactive=False)),
outputs=[processing, submit_btn]
).then(
fn=run_chat,
inputs=[user_input, state, include_images, max_tokens, num_images],
outputs=[chatbot, user_input]
).then(
fn=lambda: (gr.update(visible=False), gr.update(interactive=True)),
outputs=[processing, submit_btn]
)
# Voice transcription remains the same
mic_btn.click(
fn=toggle_processing,
outputs=[processing, mic_btn]
).then(
fn=transcribe,
inputs=mic,
outputs=user_input
).then(
fn=hide_processing,
outputs=[processing, mic_btn]
)
if __name__ == "__main__":
demo.launch(share=True, debug=True)