Spaces:
Sleeping
Sleeping
import re | |
import numpy as np | |
import json | |
from sentence_transformers import SentenceTransformer | |
from transformers import AutoTokenizer | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
import os | |
import gradio as gr | |
import time | |
tokenizer = AutoTokenizer.from_pretrained("answerdotai/ModernBERT-base") | |
sentence_model = SentenceTransformer('all-MiniLM-L6-v2') | |
def clean_text(text): | |
text = re.sub(r'\[speaker_\d+\]', '', text) | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def split_text_by_tokens(text, max_tokens=8000): | |
text = clean_text(text) | |
tokens = tokenizer.encode(text) | |
if len(tokens) <= max_tokens: | |
return [text] | |
split_point = len(tokens) // 2 | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
first_half = [] | |
second_half = [] | |
current_tokens = 0 | |
for sentence in sentences: | |
sentence_tokens = len(tokenizer.encode(sentence)) | |
if current_tokens + sentence_tokens <= split_point: | |
first_half.append(sentence) | |
current_tokens += sentence_tokens | |
else: | |
second_half.append(sentence) | |
return [" ".join(first_half), " ".join(second_half)] | |
def analyze_segment_with_gemini(segment_text): | |
llm = ChatGoogleGenerativeAI( | |
model="gemini-1.5-flash", | |
temperature=0.7, | |
max_tokens=None, | |
timeout=None, | |
max_retries=3 | |
) | |
prompt = f""" | |
Analyze the following text and identify distinct segments within it and do text segmentation: | |
1. Segments should be STRICTLY max=15 | |
2. For each segment/topic you identify: | |
- Provide a SPECIFIC and UNIQUE topic name (3-5 words) that clearly differentiates it from other segments | |
- List 3-5 key concepts discussed in that segment (be precise and avoid repetition between segments) | |
- Write a brief summary of that segment (3-5 sentences) | |
- Create 5 high-quality, meaningful quiz questions based DIRECTLY on the content in that segment only | |
- Questions and answers should be only from the content of the segment | |
For each quiz question: | |
- Create one correct answer that comes DIRECTLY from the text | |
- Create two plausible but incorrect answers | |
- IMPORTANT: Ensure all answer options have similar length (± 3 words) | |
- Ensure the correct answer is clearly indicated with a ✓ symbol | |
- Questions should **require actual understanding**, NOT just basic fact recall. | |
- Questions Are **non-trivial**, encourage deeper thinking, and **avoid surface-level facts**. | |
- Are **directly based on the segment's content** (not inferred from the summary). | |
- Do **not include questions about document structure** (e.g., title, number of paragraphs). | |
- Do **not generate overly generic or obvious questions** (e.g., "What is mentioned in the text?"). | |
- Focus on **core ideas, logical reasoning, and conceptual understanding**. | |
ADDITIONAL REQUIREMENT: | |
- **First, detect the language of the original text.** | |
- **Generate ALL output (topic names, key concepts, summaries, and quizzes) in the same language as the original text.** | |
- If the text is in Russian, generate all responses in Russian. | |
- If the text is in another language, generate responses in that original language. | |
Text: | |
{segment_text} | |
Format your response as JSON with the following structure: | |
{{ | |
"segments": [ | |
{{ | |
"topic_name": "Unique and Specific Topic Name", | |
"key_concepts": ["concept1", "concept2", "concept3"], | |
"summary": "Brief summary of this segment.", | |
"quiz_questions": [ | |
{{ | |
"question": "Question text?", | |
"options": [ | |
{{ | |
"text": "Option A", | |
"correct": false | |
}}, | |
{{ | |
"text": "Option B", | |
"correct": true | |
}}, | |
{{ | |
"text": "Option C", | |
"correct": false | |
}} | |
] | |
}} | |
] | |
}} | |
] | |
}} | |
IMPORTANT: Each segment must have a DISTINCT topic name that clearly differentiates it from others. | |
- **Do NOT repeat** key concepts across multiple segments unless absolutely necessary. | |
- **Ensure the quiz questions challenge the reader** and **are not easily guessable**. | |
""" | |
response = llm.invoke(prompt) | |
response_text = response.content | |
try: | |
json_match = re.search(r'\{[\s\S]*\}', response_text) | |
if json_match: | |
return json.loads(json_match.group(0)) | |
else: | |
return json.loads(response_text) | |
except json.JSONDecodeError: | |
return { | |
"segments": [ | |
{ | |
"topic_name": "JSON Parsing Error", | |
"key_concepts": ["Error in response format"], | |
"summary": "Could not parse the API response.", | |
"quiz_questions": [] | |
} | |
] | |
} | |
def process_document_with_quiz(text): | |
start_time = time.time() | |
token_count = len(tokenizer.encode(text)) | |
print(f"[LOG] Total document tokens: {token_count}") | |
if token_count > 8000: | |
print(f"[LOG] Document exceeds 8000 tokens. Splitting into parts.") | |
parts = split_text_by_tokens(text) | |
print(f"[LOG] Document split into {len(parts)} parts") | |
for i, part in enumerate(parts): | |
part_tokens = len(tokenizer.encode(part)) | |
print(f"[LOG] Part {i+1} contains {part_tokens} tokens") | |
else: | |
print(f"[LOG] Document under 8000 tokens. Processing as a single part.") | |
parts = [text] | |
all_segments = [] | |
segment_counter = 1 | |
for i, part in enumerate(parts): | |
part_start_time = time.time() | |
print(f"[LOG] Processing part {i+1}...") | |
analysis = analyze_segment_with_gemini(part) | |
if "segments" in analysis: | |
print(f"[LOG] Found {len(analysis['segments'])} segments in part {i+1}") | |
for segment in analysis["segments"]: | |
segment["segment_number"] = segment_counter | |
all_segments.append(segment) | |
print(f"[LOG] Segment {segment_counter}: {segment['topic_name']}") | |
segment_counter += 1 | |
else: | |
# Fallback if response format is unexpected | |
print(f"[LOG] Error: Unexpected format in part {i+1} analysis") | |
fallback_segment = { | |
"topic_name": f"Segment {segment_counter} Analysis", | |
"key_concepts": ["Format error in analysis"], | |
"summary": "Could not properly segment this part of the text.", | |
"quiz_questions": [], | |
"segment_number": segment_counter | |
} | |
all_segments.append(fallback_segment) | |
print(f"[LOG] Added fallback segment {segment_counter}") | |
segment_counter += 1 | |
part_time = time.time() - part_start_time | |
print(f"[LOG] Part {i+1} processed in {part_time:.2f} seconds") | |
total_time = time.time() - start_time | |
print(f"[LOG] Total processing time: {total_time:.2f} seconds") | |
print(f"[LOG] Generated {len(all_segments)} segments total") | |
return all_segments | |
def format_quiz_for_display(results): | |
output = [] | |
for segment in results: | |
topic = segment["topic_name"] | |
segment_num = segment["segment_number"] | |
output.append(f"\n\n{'='*40}") | |
output.append(f"SEGMENT {segment_num}: {topic}") | |
output.append(f"{'='*40}\n") | |
output.append("KEY CONCEPTS:") | |
for concept in segment["key_concepts"]: | |
output.append(f"• {concept}") | |
output.append("\nSUMMARY:") | |
output.append(segment["summary"]) | |
output.append("\nQUIZ QUESTIONS:") | |
for i, q in enumerate(segment["quiz_questions"]): | |
output.append(f"\n{i+1}. {q['question']}") | |
for j, option in enumerate(q['options']): | |
letter = chr(97 + j).upper() | |
correct_marker = " ✓" if option["correct"] else "" | |
output.append(f" {letter}. {option['text']}{correct_marker}") | |
return "\n".join(output) | |
def save_results_as_json(results, filename="analysis_results.json"): | |
with open(filename, "w", encoding="utf-8") as f: | |
json.dump(results, f, indent=2, ensure_ascii=False) | |
return filename | |
def save_results_as_txt(formatted_text, filename="analysis_results.txt"): | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(formatted_text) | |
return filename | |
def analyze_document(document_text, api_key): | |
print(f"[LOG] Starting document analysis...") | |
overall_start_time = time.time() | |
os.environ["GOOGLE_API_KEY"] = api_key | |
try: | |
results = process_document_with_quiz(document_text) | |
formatted_output = format_quiz_for_display(results) | |
json_path = "analysis_results.json" | |
txt_path = "analysis_results.txt" | |
with open(json_path, "w", encoding="utf-8") as f: | |
json.dump(results, f, indent=2, ensure_ascii=False) | |
with open(txt_path, "w", encoding="utf-8") as f: | |
f.write(formatted_output) | |
overall_time = time.time() - overall_start_time | |
print(f"[LOG] Document analysis completed in {overall_time:.2f} seconds") | |
topics_summary = "DOCUMENT ANALYSIS SUMMARY:\n" | |
topics_summary += f"Total segments: {len(results)}\n" | |
topics_summary += f"Processing time: {overall_time:.2f} seconds\n\n" | |
topics_summary += "SEGMENTS:\n" | |
for segment in results: | |
topics_summary += f"- Segment {segment['segment_number']}: {segment['topic_name']}\n" | |
formatted_output = topics_summary + "\n" + formatted_output | |
return formatted_output, json_path, txt_path | |
except Exception as e: | |
error_msg = f"Error processing document: {str(e)}" | |
print(f"[LOG] ERROR: {error_msg}") | |
return error_msg, None, None | |
with gr.Blocks(title="Quiz Generator") as app: | |
gr.Markdown("# Quiz Generator") | |
with gr.Row(): | |
with gr.Column(): | |
input_text = gr.Textbox( | |
label="Input Document Text", | |
placeholder="Paste your document text here...", | |
lines=10 | |
) | |
api_key = gr.Textbox( | |
label="Gemini API Key", | |
placeholder="Enter your Gemini API key", | |
type="password" | |
) | |
analyze_btn = gr.Button("Analyze Document") | |
with gr.Column(): | |
output_results = gr.Textbox( | |
label="Analysis Results", | |
lines=20 | |
) | |
json_file_output = gr.File(label="Download JSON") | |
txt_file_output = gr.File(label="Download TXT") | |
analyze_btn.click( | |
fn=analyze_document, | |
inputs=[input_text, api_key], | |
outputs=[output_results, json_file_output, txt_file_output] | |
) | |
if __name__ == "__main__": | |
app.launch() |