import gradio as gr import time import torch import tempfile import numpy as np import scipy.io.wavfile as wavfile from transformers import AutoProcessor, BarkModel import whisper import gradio as gr import time import tempfile import numpy as np import scipy.io.wavfile as wavfile import cv2 import os import json from moviepy.editor import VideoFileClip import shutil # Bark TTS model_bark = BarkModel.from_pretrained("suno/bark") processor_bark = AutoProcessor.from_pretrained("suno/bark") model_bark.to("cuda" if torch.cuda.is_available() else "cpu") bark_voice_preset = "v2/en_speaker_6" def bark_tts(text): inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) inputs = {k: v.to(model_bark.device) for k, v in inputs.items()} speech_values = model_bark.generate(**inputs) speech = speech_values.cpu().numpy().squeeze() speech = (speech * 32767).astype(np.int16) temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") wavfile.write(temp_wav.name, 22050, speech) return temp_wav.name # Whisper STT whisper_model = whisper.load_model("base") def whisper_stt(audio_path): if not audio_path or not os.path.exists(audio_path): return "" result = whisper_model.transcribe(audio_path) return result["text"] # DeepFace (Video Face Emotion) def ensure_mp4(video_input): # video_input could be a file-like object, a path, or a Gradio temp path if isinstance(video_input, str): input_path = video_input else: # It's a file-like object (rare for Gradio video, but handle it) with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as temp_in: temp_in.write(video_input.read()) input_path = temp_in.name # If already mp4, return as is if input_path.endswith(".mp4"): return input_path # Convert to mp4 using moviepy mp4_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name try: clip = VideoFileClip(input_path) clip.write_videofile(mp4_path, codec="libx264", audio=False, verbose=False, logger=None) clip.close() except Exception as e: print("Video conversion failed:", e) # As fallback, just copy original shutil.copy(input_path, mp4_path) return mp4_path def analyze_video_emotions(video_input, sample_rate=15): # Convert input to an mp4 file OpenCV can process mp4_path = ensure_mp4(video_input) if not mp4_path or not os.path.exists(mp4_path): return "neutral" cap = cv2.VideoCapture(mp4_path) frame_count = 0 emotion_counts = {} while True: ret, frame = cap.read() if not ret: break if frame_count % sample_rate == 0: try: result = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False) dominant = result[0]["dominant_emotion"] if isinstance(result, list) else result["dominant_emotion"] emotion_counts[dominant] = emotion_counts.get(dominant, 0) + 1 except Exception: pass frame_count += 1 cap.release() if not emotion_counts: return "neutral" return max(emotion_counts.items(), key=lambda x: x[1])[0] wav2vec_model_name = "HaniaRuby/speech-emotion-recognition-wav2vec2" wav2vec_processor = Wav2Vec2Processor.from_pretrained(wav2vec_model_name) wav2vec_model = Wav2Vec2ForSequenceClassification.from_pretrained(wav2vec_model_name) wav2vec_model.eval() voice_label_map = { 0: 'angry', 1: 'disgust', 2: 'fear', 3: 'happy', 4: 'neutral', 5: 'sad', 6: 'surprise' } def analyze_audio_emotion(audio_path): if not audio_path or not os.path.exists(audio_path): return "neutral" speech, sr = librosa.load(audio_path, sr=16000) inputs = wav2vec_processor(speech, sampling_rate=16000, return_tensors="pt") with torch.no_grad(): logits = wav2vec_model(**inputs).logits probs = torch.nn.functional.softmax(logits, dim=-1) predicted_id = torch.argmax(probs, dim=-1).item() return voice_label_map.get(predicted_id, "neutral") # --- Effective confidence calculation def interpret_confidence(voice_label, face_label, answer_score_label, k=0.2): emotion_map = {"happy": 0.9, "neutral": 0.6, "surprised": 0.7, "sad": 0.4, "angry": 0.3, "disgust": 0.2, "fear": 0.3, "no_face": 0.5, "unknown": 0.5} answer_score_map = {"excellent": 1.0, "good": 0.8, "medium": 0.6, "poor": 0.3} voice_score, face_score, answer_score = emotion_map.get(voice_label, 0.5), emotion_map.get(face_label, 0.5), answer_score_map.get(answer_score_label, 0.5) avg_emotion = (voice_score + face_score) / 2 control_bonus = max(0, answer_score - avg_emotion) * k eff_conf = (0.5 * answer_score + 0.22 * voice_score + 0.18 * face_score + 0.1 * control_bonus) return {"effective_confidence": round(eff_conf, 3), "answer_score": round(answer_score, 2), "voice_score": round(voice_score, 2), "face_score": round(face_score, 2), "control_bonus": round(control_bonus, 3)} seniority_mapping = { "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 } import gradio as gr import time import tempfile import numpy as np import scipy.io.wavfile as wavfile import cv2 import os import json # --- 2. Gradio App --- with gr.Blocks(theme=gr.themes.Soft()) as demo: user_data = gr.State({}) interview_state = gr.State({}) missing_fields_state = gr.State([]) # --- UI Layout --- with gr.Column(visible=True) as user_info_section: gr.Markdown("## Candidate Information") cv_file = gr.File(label="Upload CV") job_desc = gr.Textbox(label="Job Description") start_btn = gr.Button("Continue", interactive=False) with gr.Column(visible=False) as missing_section: gr.Markdown("## Missing Information") name_in = gr.Textbox(label="Name", visible=False) role_in = gr.Textbox(label="Job Role", visible=False) seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) skills_in = gr.Textbox(label="Skills", visible=False) submit_btn = gr.Button("Submit", interactive=False) with gr.Column(visible=False) as interview_pre_section: pre_interview_greeting_md = gr.Markdown() start_interview_final_btn = gr.Button("Start Interview") with gr.Column(visible=False) as interview_section: gr.Markdown("## Interview in Progress") question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) question_text = gr.Markdown() user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") user_video_input = gr.Video(sources=["webcam"], label="2. Record Video Answer") stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") confirm_btn = gr.Button("Confirm Answer") evaluation_display = gr.Markdown() emotion_display = gr.Markdown() interview_summary = gr.Markdown(visible=False) # --- UI Logic --- def validate_start_btn(cv_file, job_desc): return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) def process_and_route_initial(cv_file, job_desc): details = extract_candidate_details(cv_file.name) job_info = extract_job_details(job_desc) data = { "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) } missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] if missing: return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) else: greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) start_btn.click( process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] ) def show_missing(missing): if missing is None: missing = [] return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) def validate_fields(name, role, seniority, skills, missing): if not missing: return gr.update(interactive=False) all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) return gr.update(interactive=all_filled) for inp in [name_in, role_in, seniority_in, skills_in]: inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) def complete_manual(data, name, role, seniority, skills): if data["name"].lower() == "unknown": data["name"] = name if data["job_role"].lower() == "unknown": data["job_role"] = role if data["seniority"].lower() == "unknown": data["seniority"] = seniority if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) def start_interview(data): # --- Advanced state with full logging --- state = { "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], "conversation_history": [], "difficulty_adjustment": None, "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), "log": [] } # --- Optionally: context retrieval here (currently just blank) --- context = "" prompt = build_interview_prompt( conversation_history=[], user_response="", context=context, job_role=data["job_role"], skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, voice_label="neutral", face_label="neutral" ) first_q = groq_llm.predict(prompt) # Evaluate Q for quality q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) state["questions"].append(first_q) state["question_evaluations"].append(q_eval) state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) audio_path = bark_tts(first_q) # LOG state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) def transcribe(audio_path): return whisper_stt(audio_path) user_audio_input.change(transcribe, user_audio_input, stt_transcript) def process_answer(transcript, audio_path, video_path, state, data): if not transcript and not video_path: return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) state["timings"].append(elapsed) state["answers"].append(transcript) state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) # --- 1. Emotion analysis --- voice_label = analyze_audio_emotion(audio_path) face_label = analyze_video_emotions(video_path) state["voice_labels"].append(voice_label) state["face_labels"].append(face_label) # --- 2. Evaluate previous Q and Answer --- last_q = state["questions"][-1] q_eval = state["question_evaluations"][-1] # Already in state ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) state["answer_evaluations"].append(answer_eval) answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" # --- 3. Adaptive difficulty --- if answer_score == "excellent": state["difficulty_adjustment"] = "harder" elif answer_score in ("medium", "poor"): state["difficulty_adjustment"] = "easier" else: state["difficulty_adjustment"] = None # --- 4. Effective confidence --- eff_conf = interpret_confidence(voice_label, face_label, answer_score) state["effective_confidences"].append(eff_conf) # --- LOG --- state["log"].append({ "type": "answer", "question": last_q, "answer": transcript, "answer_eval": answer_eval, "ref_answer": ref_answer, "face_label": face_label, "voice_label": voice_label, "effective_confidence": eff_conf, "timing": elapsed, "timestamp": time.time() }) # --- Next or End --- qidx = state["question_idx"] + 1 if qidx >= state["max_questions"]: # Save as JSON (optionally) timestamp = time.strftime("%Y%m%d_%H%M%S") log_file = f"interview_log_{timestamp}.json" with open(log_file, "w", encoding="utf-8") as f: json.dump(state["log"], f, indent=2, ensure_ascii=False) # Report summary = "# Interview Summary\n" for i, q in enumerate(state["questions"]): summary += (f"\n### Q{i + 1}: {q}\n" f"- *Answer*: {state['answers'][i]}\n" f"- *Q Eval*: {state['question_evaluations'][i]}\n" f"- *A Eval*: {state['answer_evaluations'][i]}\n" f"- *Face Emotion: {state['face_labels'][i]}, **Voice Emotion*: {state['voice_labels'][i]}\n" f"- *Effective Confidence*: {state['effective_confidences'][i]['effective_confidence']}\n" f"- *Time*: {state['timings'][i]}s\n") summary += f"\n\n⏺ Full log saved as {log_file}." return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) else: # --- Build next prompt using adaptive difficulty --- state["question_idx"] = qidx state["q_start_time"] = time.time() context = "" # You can add your context logic here prompt = build_interview_prompt( conversation_history=state["conversation_history"], user_response=transcript, context=context, job_role=data["job_role"], skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], face_label=face_label, voice_label=voice_label, effective_confidence=eff_conf ) next_q = groq_llm.predict(prompt) # Evaluate Q quality q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) state["questions"].append(next_q) state["question_evaluations"].append(q_eval) state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) audio_path = bark_tts(next_q) # Display evaluations eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" return ( state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}"), ) confirm_btn.click( process_answer, [stt_transcript, user_audio_input, user_video_input, interview_state, user_data], [interview_state, interview_summary, question_audio, question_text, user_audio_input, user_video_input, emotion_display] ).then( lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, user_video_input] ) demo.launch()