Spaces:
Build error
Build error
import gradio as gr | |
import time | |
import torch | |
import tempfile | |
import numpy as np | |
import scipy.io.wavfile as wavfile | |
from transformers import AutoProcessor, BarkModel | |
import whisper | |
import gradio as gr | |
import time | |
import tempfile | |
import numpy as np | |
import scipy.io.wavfile as wavfile | |
import cv2 | |
import os | |
import json | |
from moviepy.editor import VideoFileClip | |
import shutil | |
# Bark TTS | |
model_bark = BarkModel.from_pretrained("suno/bark") | |
processor_bark = AutoProcessor.from_pretrained("suno/bark") | |
model_bark.to("cuda" if torch.cuda.is_available() else "cpu") | |
bark_voice_preset = "v2/en_speaker_6" | |
def bark_tts(text): | |
inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) | |
inputs = {k: v.to(model_bark.device) for k, v in inputs.items()} | |
speech_values = model_bark.generate(**inputs) | |
speech = speech_values.cpu().numpy().squeeze() | |
speech = (speech * 32767).astype(np.int16) | |
temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
wavfile.write(temp_wav.name, 22050, speech) | |
return temp_wav.name | |
# Whisper STT | |
whisper_model = whisper.load_model("base") | |
def whisper_stt(audio_path): | |
if not audio_path or not os.path.exists(audio_path): return "" | |
result = whisper_model.transcribe(audio_path) | |
return result["text"] | |
# DeepFace (Video Face Emotion) | |
def ensure_mp4(video_input): | |
# video_input could be a file-like object, a path, or a Gradio temp path | |
if isinstance(video_input, str): | |
input_path = video_input | |
else: | |
# It's a file-like object (rare for Gradio video, but handle it) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as temp_in: | |
temp_in.write(video_input.read()) | |
input_path = temp_in.name | |
# If already mp4, return as is | |
if input_path.endswith(".mp4"): | |
return input_path | |
# Convert to mp4 using moviepy | |
mp4_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
try: | |
clip = VideoFileClip(input_path) | |
clip.write_videofile(mp4_path, codec="libx264", audio=False, verbose=False, logger=None) | |
clip.close() | |
except Exception as e: | |
print("Video conversion failed:", e) | |
# As fallback, just copy original | |
shutil.copy(input_path, mp4_path) | |
return mp4_path | |
def analyze_video_emotions(video_input, sample_rate=15): | |
# Convert input to an mp4 file OpenCV can process | |
mp4_path = ensure_mp4(video_input) | |
if not mp4_path or not os.path.exists(mp4_path): | |
return "neutral" | |
cap = cv2.VideoCapture(mp4_path) | |
frame_count = 0 | |
emotion_counts = {} | |
while True: | |
ret, frame = cap.read() | |
if not ret: break | |
if frame_count % sample_rate == 0: | |
try: | |
result = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False) | |
dominant = result[0]["dominant_emotion"] if isinstance(result, list) else result["dominant_emotion"] | |
emotion_counts[dominant] = emotion_counts.get(dominant, 0) + 1 | |
except Exception: pass | |
frame_count += 1 | |
cap.release() | |
if not emotion_counts: return "neutral" | |
return max(emotion_counts.items(), key=lambda x: x[1])[0] | |
wav2vec_model_name = "HaniaRuby/speech-emotion-recognition-wav2vec2" | |
wav2vec_processor = Wav2Vec2Processor.from_pretrained(wav2vec_model_name) | |
wav2vec_model = Wav2Vec2ForSequenceClassification.from_pretrained(wav2vec_model_name) | |
wav2vec_model.eval() | |
voice_label_map = { | |
0: 'angry', 1: 'disgust', 2: 'fear', 3: 'happy', | |
4: 'neutral', 5: 'sad', 6: 'surprise' | |
} | |
def analyze_audio_emotion(audio_path): | |
if not audio_path or not os.path.exists(audio_path): return "neutral" | |
speech, sr = librosa.load(audio_path, sr=16000) | |
inputs = wav2vec_processor(speech, sampling_rate=16000, return_tensors="pt") | |
with torch.no_grad(): | |
logits = wav2vec_model(**inputs).logits | |
probs = torch.nn.functional.softmax(logits, dim=-1) | |
predicted_id = torch.argmax(probs, dim=-1).item() | |
return voice_label_map.get(predicted_id, "neutral") | |
# --- Effective confidence calculation | |
def interpret_confidence(voice_label, face_label, answer_score_label, k=0.2): | |
emotion_map = {"happy": 0.9, "neutral": 0.6, "surprised": 0.7, "sad": 0.4, "angry": 0.3, "disgust": 0.2, "fear": 0.3, "no_face": 0.5, "unknown": 0.5} | |
answer_score_map = {"excellent": 1.0, "good": 0.8, "medium": 0.6, "poor": 0.3} | |
voice_score, face_score, answer_score = emotion_map.get(voice_label, 0.5), emotion_map.get(face_label, 0.5), answer_score_map.get(answer_score_label, 0.5) | |
avg_emotion = (voice_score + face_score) / 2 | |
control_bonus = max(0, answer_score - avg_emotion) * k | |
eff_conf = (0.5 * answer_score + 0.22 * voice_score + 0.18 * face_score + 0.1 * control_bonus) | |
return {"effective_confidence": round(eff_conf, 3), "answer_score": round(answer_score, 2), "voice_score": round(voice_score, 2), "face_score": round(face_score, 2), "control_bonus": round(control_bonus, 3)} | |
seniority_mapping = { | |
"Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 | |
} | |
import gradio as gr | |
import time | |
import tempfile | |
import numpy as np | |
import scipy.io.wavfile as wavfile | |
import cv2 | |
import os | |
import json | |
# --- 2. Gradio App --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
user_data = gr.State({}) | |
interview_state = gr.State({}) | |
missing_fields_state = gr.State([]) | |
# --- UI Layout --- | |
with gr.Column(visible=True) as user_info_section: | |
gr.Markdown("## Candidate Information") | |
cv_file = gr.File(label="Upload CV") | |
job_desc = gr.Textbox(label="Job Description") | |
start_btn = gr.Button("Continue", interactive=False) | |
with gr.Column(visible=False) as missing_section: | |
gr.Markdown("## Missing Information") | |
name_in = gr.Textbox(label="Name", visible=False) | |
role_in = gr.Textbox(label="Job Role", visible=False) | |
seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) | |
skills_in = gr.Textbox(label="Skills", visible=False) | |
submit_btn = gr.Button("Submit", interactive=False) | |
with gr.Column(visible=False) as interview_pre_section: | |
pre_interview_greeting_md = gr.Markdown() | |
start_interview_final_btn = gr.Button("Start Interview") | |
with gr.Column(visible=False) as interview_section: | |
gr.Markdown("## Interview in Progress") | |
question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) | |
question_text = gr.Markdown() | |
user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") | |
user_video_input = gr.Video(sources=["webcam"], label="2. Record Video Answer") | |
stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") | |
confirm_btn = gr.Button("Confirm Answer") | |
evaluation_display = gr.Markdown() | |
emotion_display = gr.Markdown() | |
interview_summary = gr.Markdown(visible=False) | |
# --- UI Logic --- | |
def validate_start_btn(cv_file, job_desc): | |
return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) | |
cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
def process_and_route_initial(cv_file, job_desc): | |
details = extract_candidate_details(cv_file.name) | |
job_info = extract_job_details(job_desc) | |
data = { | |
"name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), | |
"seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) | |
} | |
missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] | |
if missing: | |
return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
else: | |
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." | |
return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) | |
start_btn.click( | |
process_and_route_initial, | |
[cv_file, job_desc], | |
[user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] | |
) | |
def show_missing(missing): | |
if missing is None: missing = [] | |
return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) | |
missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) | |
def validate_fields(name, role, seniority, skills, missing): | |
if not missing: return gr.update(interactive=False) | |
all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) | |
return gr.update(interactive=all_filled) | |
for inp in [name_in, role_in, seniority_in, skills_in]: | |
inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) | |
def complete_manual(data, name, role, seniority, skills): | |
if data["name"].lower() == "unknown": data["name"] = name | |
if data["job_role"].lower() == "unknown": data["job_role"] = role | |
if data["seniority"].lower() == "unknown": data["seniority"] = seniority | |
if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] | |
greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." | |
return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) | |
submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) | |
def start_interview(data): | |
# --- Advanced state with full logging --- | |
state = { | |
"questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], | |
"question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], | |
"conversation_history": [], | |
"difficulty_adjustment": None, | |
"question_idx": 0, "max_questions": 3, "q_start_time": time.time(), | |
"log": [] | |
} | |
# --- Optionally: context retrieval here (currently just blank) --- | |
context = "" | |
prompt = build_interview_prompt( | |
conversation_history=[], user_response="", context=context, job_role=data["job_role"], | |
skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, | |
voice_label="neutral", face_label="neutral" | |
) | |
first_q = groq_llm.predict(prompt) | |
# Evaluate Q for quality | |
q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) | |
state["questions"].append(first_q) | |
state["question_evaluations"].append(q_eval) | |
state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) | |
audio_path = bark_tts(first_q) | |
# LOG | |
state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) | |
return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" | |
start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) | |
def transcribe(audio_path): | |
return whisper_stt(audio_path) | |
user_audio_input.change(transcribe, user_audio_input, stt_transcript) | |
def process_answer(transcript, audio_path, video_path, state, data): | |
if not transcript and not video_path: | |
return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) | |
state["timings"].append(elapsed) | |
state["answers"].append(transcript) | |
state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) | |
# --- 1. Emotion analysis --- | |
voice_label = analyze_audio_emotion(audio_path) | |
face_label = analyze_video_emotions(video_path) | |
state["voice_labels"].append(voice_label) | |
state["face_labels"].append(face_label) | |
# --- 2. Evaluate previous Q and Answer --- | |
last_q = state["questions"][-1] | |
q_eval = state["question_evaluations"][-1] # Already in state | |
ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) | |
answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) | |
state["answer_evaluations"].append(answer_eval) | |
answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" | |
# --- 3. Adaptive difficulty --- | |
if answer_score == "excellent": | |
state["difficulty_adjustment"] = "harder" | |
elif answer_score in ("medium", "poor"): | |
state["difficulty_adjustment"] = "easier" | |
else: | |
state["difficulty_adjustment"] = None | |
# --- 4. Effective confidence --- | |
eff_conf = interpret_confidence(voice_label, face_label, answer_score) | |
state["effective_confidences"].append(eff_conf) | |
# --- LOG --- | |
state["log"].append({ | |
"type": "answer", | |
"question": last_q, | |
"answer": transcript, | |
"answer_eval": answer_eval, | |
"ref_answer": ref_answer, | |
"face_label": face_label, | |
"voice_label": voice_label, | |
"effective_confidence": eff_conf, | |
"timing": elapsed, | |
"timestamp": time.time() | |
}) | |
# --- Next or End --- | |
qidx = state["question_idx"] + 1 | |
if qidx >= state["max_questions"]: | |
# Save as JSON (optionally) | |
timestamp = time.strftime("%Y%m%d_%H%M%S") | |
log_file = f"interview_log_{timestamp}.json" | |
with open(log_file, "w", encoding="utf-8") as f: | |
json.dump(state["log"], f, indent=2, ensure_ascii=False) | |
# Report | |
summary = "# Interview Summary\n" | |
for i, q in enumerate(state["questions"]): | |
summary += (f"\n### Q{i + 1}: {q}\n" | |
f"- *Answer*: {state['answers'][i]}\n" | |
f"- *Q Eval*: {state['question_evaluations'][i]}\n" | |
f"- *A Eval*: {state['answer_evaluations'][i]}\n" | |
f"- *Face Emotion: {state['face_labels'][i]}, **Voice Emotion*: {state['voice_labels'][i]}\n" | |
f"- *Effective Confidence*: {state['effective_confidences'][i]['effective_confidence']}\n" | |
f"- *Time*: {state['timings'][i]}s\n") | |
summary += f"\n\n⏺ Full log saved as {log_file}." | |
return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) | |
else: | |
# --- Build next prompt using adaptive difficulty --- | |
state["question_idx"] = qidx | |
state["q_start_time"] = time.time() | |
context = "" # You can add your context logic here | |
prompt = build_interview_prompt( | |
conversation_history=state["conversation_history"], | |
user_response=transcript, | |
context=context, | |
job_role=data["job_role"], | |
skills=data["skills"], | |
seniority=data["seniority"], | |
difficulty_adjustment=state["difficulty_adjustment"], | |
face_label=face_label, | |
voice_label=voice_label, | |
effective_confidence=eff_conf | |
) | |
next_q = groq_llm.predict(prompt) | |
# Evaluate Q quality | |
q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) | |
state["questions"].append(next_q) | |
state["question_evaluations"].append(q_eval) | |
state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) | |
state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) | |
audio_path = bark_tts(next_q) | |
# Display evaluations | |
eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" | |
return ( | |
state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", | |
gr.update(value=None), gr.update(value=None), | |
gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}"), | |
) | |
confirm_btn.click( | |
process_answer, | |
[stt_transcript, user_audio_input, user_video_input, interview_state, user_data], | |
[interview_state, interview_summary, question_audio, question_text, user_audio_input, user_video_input, emotion_display] | |
).then( | |
lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, user_video_input] | |
) | |
demo.launch() |