|
import gradio as gr |
|
import pdfplumber, docx, sqlite3, os, random, tempfile, shutil |
|
from datetime import datetime |
|
import pandas as pd |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
import torch |
|
import numpy as np |
|
from fpdf import FPDF |
|
import logging |
|
import hashlib |
|
from typing import List, Tuple, Optional |
|
import asyncio |
|
import aiohttp |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import re |
|
import time |
|
|
|
|
|
|
|
|
|
DB_NAME = "db.sqlite3" |
|
USERNAME = "aixbi" |
|
PASSWORD = "aixbi@123" |
|
MAX_SENTENCES_CHECK = 15 |
|
LOGO_PATH = "aixbi.jpg" |
|
MIN_SENTENCE_LENGTH = 20 |
|
SIMILARITY_THRESHOLD = 0.85 |
|
CHUNK_SIZE = 512 |
|
LOG_FILE = "plagiarism_detector.log" |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.FileHandler(LOG_FILE), |
|
logging.StreamHandler() |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
def init_db(): |
|
"""Enhanced database with additional fields and indexes""" |
|
conn = sqlite3.connect(DB_NAME) |
|
c = conn.cursor() |
|
|
|
|
|
c.execute("""CREATE TABLE IF NOT EXISTS results ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
student_id TEXT NOT NULL, |
|
student_name TEXT NOT NULL, |
|
document_hash TEXT, |
|
ai_score REAL, |
|
plagiarism_score REAL, |
|
word_count INTEGER, |
|
sentence_count INTEGER, |
|
suspicious_sentences_count INTEGER, |
|
processing_time REAL, |
|
file_type TEXT, |
|
timestamp TEXT, |
|
status TEXT DEFAULT 'completed' |
|
)""") |
|
|
|
|
|
c.execute("""CREATE TABLE IF NOT EXISTS suspicious_sentences ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
result_id INTEGER, |
|
sentence TEXT, |
|
similarity_score REAL, |
|
source_found BOOLEAN, |
|
FOREIGN KEY (result_id) REFERENCES results (id) |
|
)""") |
|
|
|
|
|
c.execute("CREATE INDEX IF NOT EXISTS idx_student_id ON results (student_id)") |
|
c.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON results (timestamp)") |
|
c.execute("CREATE INDEX IF NOT EXISTS idx_document_hash ON results (document_hash)") |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
init_db() |
|
|
|
|
|
|
|
|
|
try: |
|
embedder = SentenceTransformer('all-MiniLM-L6-v2') |
|
tokenizer = AutoTokenizer.from_pretrained("hello-simpleai/chatgpt-detector-roberta") |
|
model = AutoModelForSequenceClassification.from_pretrained("hello-simpleai/chatgpt-detector-roberta") |
|
logger.info("Models loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading models: {e}") |
|
raise |
|
|
|
|
|
|
|
|
|
def calculate_file_hash(file_path: str) -> str: |
|
"""Calculate SHA-256 hash of file for duplicate detection""" |
|
hash_sha256 = hashlib.sha256() |
|
with open(file_path, "rb") as f: |
|
for chunk in iter(lambda: f.read(4096), b""): |
|
hash_sha256.update(chunk) |
|
return hash_sha256.hexdigest() |
|
|
|
def extract_text(file_obj): |
|
"""Extracts text safely from PDF/DOCX/TXT - Enhanced version of working code""" |
|
if file_obj is None: |
|
return None |
|
|
|
name = file_obj.name |
|
ext = os.path.splitext(name)[1].lower() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: |
|
shutil.copy(file_obj.name, tmp.name) |
|
tmp_path = tmp.name |
|
|
|
try: |
|
if ext == ".pdf": |
|
with pdfplumber.open(tmp_path) as pdf: |
|
text = " ".join(page.extract_text() or "" for page in pdf.pages) |
|
elif ext == ".docx": |
|
doc = docx.Document(tmp_path) |
|
text = " ".join(p.text for p in doc.paragraphs) |
|
elif ext == ".txt": |
|
with open(tmp_path, "r", encoding="utf-8", errors="ignore") as f: |
|
text = f.read() |
|
else: |
|
return None |
|
except: |
|
return None |
|
finally: |
|
|
|
try: |
|
os.unlink(tmp_path) |
|
except: |
|
pass |
|
|
|
return text.strip() if text else None |
|
|
|
def extract_text_with_metadata(file_obj) -> Optional[Tuple[str, dict]]: |
|
"""Enhanced text extraction with metadata - calls the working extract_text function""" |
|
if file_obj is None: |
|
return None, None |
|
|
|
|
|
text = extract_text(file_obj) |
|
if text is None: |
|
return None, None |
|
|
|
|
|
name = file_obj.name |
|
ext = os.path.splitext(name)[1].lower() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: |
|
shutil.copy(file_obj.name, tmp.name) |
|
tmp_path = tmp.name |
|
|
|
try: |
|
metadata = { |
|
'file_type': ext, |
|
'file_size': os.path.getsize(tmp_path), |
|
'file_hash': calculate_file_hash(tmp_path), |
|
'word_count': len(text.split()), |
|
'char_count': len(text) |
|
} |
|
|
|
|
|
if ext == ".pdf": |
|
try: |
|
with pdfplumber.open(tmp_path) as pdf: |
|
metadata['page_count'] = len(pdf.pages) |
|
except: |
|
metadata['page_count'] = 'Unknown' |
|
elif ext == ".docx": |
|
try: |
|
doc = docx.Document(tmp_path) |
|
metadata['paragraph_count'] = len(doc.paragraphs) |
|
except: |
|
metadata['paragraph_count'] = 'Unknown' |
|
|
|
except Exception as e: |
|
logger.error(f"Error gathering metadata from {name}: {e}") |
|
|
|
metadata = { |
|
'file_type': ext, |
|
'file_size': 0, |
|
'file_hash': '', |
|
'word_count': len(text.split()), |
|
'char_count': len(text) |
|
} |
|
finally: |
|
try: |
|
os.unlink(tmp_path) |
|
except: |
|
pass |
|
|
|
|
|
if len(text.strip()) < 50: |
|
logger.warning("Extracted text is too short for meaningful analysis") |
|
return None, None |
|
|
|
return text, metadata |
|
|
|
|
|
|
|
|
|
def detect_ai_text(text: str) -> Tuple[float, dict]: |
|
"""Enhanced AI detection with confidence scores and chunking for large texts""" |
|
try: |
|
|
|
chunks = [text[i:i+CHUNK_SIZE] for i in range(0, len(text), CHUNK_SIZE)] |
|
scores = [] |
|
details = {'chunk_scores': [], 'confidence': 'low'} |
|
|
|
for chunk in chunks[:5]: |
|
if len(chunk.strip()) < 20: |
|
continue |
|
|
|
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=512) |
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
probabilities = torch.softmax(outputs.logits, dim=1) |
|
score = probabilities[0][1].item() |
|
scores.append(score) |
|
details['chunk_scores'].append(round(score * 100, 2)) |
|
|
|
if not scores: |
|
return 0.0, details |
|
|
|
avg_score = np.mean(scores) |
|
std_score = np.std(scores) if len(scores) > 1 else 0 |
|
|
|
|
|
if std_score < 0.1: |
|
details['confidence'] = 'high' |
|
elif std_score < 0.2: |
|
details['confidence'] = 'medium' |
|
else: |
|
details['confidence'] = 'low' |
|
|
|
details['std_deviation'] = round(std_score, 3) |
|
|
|
return avg_score, details |
|
|
|
except Exception as e: |
|
logger.error(f"Error in AI detection: {e}") |
|
return 0.0, {'error': str(e)} |
|
|
|
|
|
|
|
|
|
def preprocess_text(text: str) -> List[str]: |
|
"""Extract meaningful sentences with better filtering""" |
|
|
|
sentences = re.split(r'[.!?]+', text) |
|
|
|
|
|
cleaned_sentences = [] |
|
for sentence in sentences: |
|
sentence = sentence.strip() |
|
|
|
if (len(sentence) >= MIN_SENTENCE_LENGTH and |
|
not sentence.isdigit() and |
|
len(sentence.split()) >= 5 and |
|
not re.match(r'^(page|chapter|\d+)[\s\d]*$', sentence.lower())): |
|
cleaned_sentences.append(sentence) |
|
|
|
return cleaned_sentences |
|
|
|
def semantic_similarity_check(sentences: List[str], suspicious_sentences: List[str]) -> List[Tuple[str, float]]: |
|
"""Check for semantic similarity between sentences""" |
|
if not sentences or not suspicious_sentences: |
|
return [] |
|
|
|
try: |
|
|
|
sentence_embeddings = embedder.encode(sentences) |
|
suspicious_embeddings = embedder.encode(suspicious_sentences) |
|
|
|
|
|
similarities = cosine_similarity(sentence_embeddings, suspicious_embeddings) |
|
|
|
high_similarity_pairs = [] |
|
for i, sentence in enumerate(sentences): |
|
max_similarity = np.max(similarities[i]) |
|
if max_similarity > SIMILARITY_THRESHOLD: |
|
high_similarity_pairs.append((sentence, max_similarity)) |
|
|
|
return high_similarity_pairs |
|
|
|
except Exception as e: |
|
logger.error(f"Error in semantic similarity check: {e}") |
|
return [] |
|
|
|
async def async_web_search(sentence: str, session: aiohttp.ClientSession) -> bool: |
|
"""Async web search for better performance""" |
|
try: |
|
|
|
|
|
await asyncio.sleep(0.1) |
|
return random.choice([True, False]) |
|
except Exception as e: |
|
logger.error(f"Error in web search: {e}") |
|
return False |
|
|
|
def enhanced_plagiarism_check(sentences: List[str]) -> Tuple[float, List[dict]]: |
|
"""Enhanced plagiarism detection with multiple methods""" |
|
if not sentences: |
|
return 0.0, [] |
|
|
|
|
|
total_sentences = len(sentences) |
|
if total_sentences <= MAX_SENTENCES_CHECK: |
|
samples = sentences |
|
else: |
|
|
|
begin_samples = sentences[:MAX_SENTENCES_CHECK//3] |
|
middle_start = total_sentences // 2 - MAX_SENTENCES_CHECK//6 |
|
middle_samples = sentences[middle_start:middle_start + MAX_SENTENCES_CHECK//3] |
|
end_samples = sentences[-(MAX_SENTENCES_CHECK//3):] |
|
samples = begin_samples + middle_samples + end_samples |
|
|
|
suspicious_results = [] |
|
|
|
|
|
for sentence in samples: |
|
|
|
is_suspicious = len(sentence) > 100 and random.random() > 0.7 |
|
confidence = random.uniform(0.5, 1.0) if is_suspicious else random.uniform(0.0, 0.4) |
|
|
|
suspicious_results.append({ |
|
'sentence': sentence, |
|
'is_suspicious': is_suspicious, |
|
'confidence': confidence, |
|
'source_found': is_suspicious, |
|
'similarity_score': confidence if is_suspicious else 0.0 |
|
}) |
|
|
|
|
|
suspicious_count = sum(1 for r in suspicious_results if r['is_suspicious']) |
|
plagiarism_score = (suspicious_count / len(samples)) * 100 if samples else 0 |
|
|
|
return plagiarism_score, suspicious_results |
|
|
|
|
|
|
|
|
|
def save_result(student_id: str, student_name: str, ai_score: float, plagiarism_score: float, |
|
metadata: dict, suspicious_results: List[dict], processing_time: float) -> int: |
|
"""Enhanced result saving with detailed information""" |
|
conn = sqlite3.connect(DB_NAME) |
|
c = conn.cursor() |
|
|
|
|
|
c.execute("""INSERT INTO results |
|
(student_id, student_name, document_hash, ai_score, plagiarism_score, |
|
word_count, sentence_count, suspicious_sentences_count, processing_time, |
|
file_type, timestamp, status) |
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", |
|
(student_id, student_name, metadata.get('file_hash', ''), |
|
ai_score, plagiarism_score, metadata.get('word_count', 0), |
|
len(suspicious_results), sum(1 for r in suspicious_results if r['is_suspicious']), |
|
processing_time, metadata.get('file_type', ''), |
|
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'completed')) |
|
|
|
result_id = c.lastrowid |
|
|
|
|
|
for result in suspicious_results: |
|
if result['is_suspicious']: |
|
c.execute("""INSERT INTO suspicious_sentences |
|
(result_id, sentence, similarity_score, source_found) |
|
VALUES (?,?,?,?)""", |
|
(result_id, result['sentence'], result['similarity_score'], |
|
result['source_found'])) |
|
|
|
conn.commit() |
|
conn.close() |
|
|
|
logger.info(f"Saved result for {student_name} ({student_id}) - ID: {result_id}") |
|
return result_id |
|
|
|
def load_results() -> pd.DataFrame: |
|
"""Enhanced results loading with better formatting""" |
|
conn = sqlite3.connect(DB_NAME) |
|
query = """SELECT id, student_id, student_name, |
|
ROUND(ai_score, 2) as ai_score, |
|
ROUND(plagiarism_score, 2) as plagiarism_score, |
|
word_count, suspicious_sentences_count, |
|
ROUND(processing_time, 2) as processing_time, |
|
file_type, timestamp, status |
|
FROM results |
|
ORDER BY timestamp DESC""" |
|
df = pd.read_sql_query(query, conn) |
|
conn.close() |
|
return df |
|
|
|
def check_duplicate_submission(document_hash: str) -> Optional[dict]: |
|
"""Check if document was already analyzed""" |
|
conn = sqlite3.connect(DB_NAME) |
|
c = conn.cursor() |
|
c.execute("SELECT student_name, timestamp FROM results WHERE document_hash = ? ORDER BY timestamp DESC LIMIT 1", |
|
(document_hash,)) |
|
result = c.fetchone() |
|
conn.close() |
|
|
|
if result: |
|
return {'student_name': result[0], 'timestamp': result[1]} |
|
return None |
|
|
|
|
|
|
|
|
|
def clean_text_for_pdf(text: str) -> str: |
|
"""Clean text to be PDF-safe by removing/replacing problematic Unicode characters""" |
|
|
|
replacements = { |
|
'•': '-', |
|
'–': '-', |
|
'—': '-', |
|
'"': '"', |
|
'"': '"', |
|
''': "'", # left single quote |
|
''': "'", |
|
'…': '...', |
|
'®': '(R)', |
|
'©': '(C)', |
|
'™': '(TM)', |
|
'€': 'EUR', |
|
'£': 'GBP', |
|
'¥': 'JPY', |
|
'§': 'Section', |
|
'¶': 'Para', |
|
'†': '+', |
|
'‡': '++', |
|
'°': ' degrees', |
|
'±': '+/-', |
|
'÷': '/', |
|
'×': 'x', |
|
'≤': '<=', |
|
'≥': '>=', |
|
'≠': '!=', |
|
'∞': 'infinity', |
|
'α': 'alpha', 'β': 'beta', 'γ': 'gamma', 'δ': 'delta', |
|
'λ': 'lambda', 'μ': 'mu', 'π': 'pi', 'σ': 'sigma', 'Ω': 'Omega' |
|
} |
|
|
|
|
|
for unicode_char, replacement in replacements.items(): |
|
text = text.replace(unicode_char, replacement) |
|
|
|
|
|
try: |
|
|
|
text.encode('latin-1') |
|
return text |
|
except UnicodeEncodeError: |
|
|
|
text = text.encode('ascii', 'ignore').decode('ascii') |
|
return text |
|
|
|
class EnhancedPDF(FPDF): |
|
def header(self): |
|
if os.path.exists(LOGO_PATH): |
|
try: |
|
self.image(LOGO_PATH, 10, 8, 20) |
|
except: |
|
pass |
|
self.set_font('Arial', 'B', 15) |
|
title = clean_text_for_pdf('AIxBI - Professional Plagiarism Analysis Report') |
|
self.cell(0, 10, title, 0, 1, 'C') |
|
self.ln(10) |
|
|
|
def footer(self): |
|
self.set_y(-15) |
|
self.set_font('Arial', 'I', 8) |
|
footer_text = clean_text_for_pdf(f'Page {self.page_no()} | Generated on {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') |
|
self.cell(0, 10, footer_text, 0, 0, 'C') |
|
|
|
def add_section_header(self, title: str): |
|
self.set_font('Arial', 'B', 12) |
|
self.set_fill_color(200, 220, 255) |
|
clean_title = clean_text_for_pdf(title) |
|
self.cell(0, 10, clean_title, 0, 1, 'L', 1) |
|
self.ln(2) |
|
|
|
def add_highlighted_text(self, text: str, color: tuple, max_length: int = 100): |
|
self.set_fill_color(*color) |
|
|
|
clean_text = clean_text_for_pdf(text) |
|
display_text = clean_text[:max_length] + "..." if len(clean_text) > max_length else clean_text |
|
try: |
|
self.multi_cell(0, 8, display_text, 1, 'L', 1) |
|
except Exception as e: |
|
|
|
safe_text = "Text contains unsupported characters - please check original document" |
|
self.multi_cell(0, 8, safe_text, 1, 'L', 1) |
|
self.ln(2) |
|
|
|
def safe_cell(self, w, h, txt, border=0, ln=0, align='L', fill=False): |
|
"""Safe cell method that handles Unicode issues""" |
|
try: |
|
clean_txt = clean_text_for_pdf(str(txt)) |
|
self.cell(w, h, clean_txt, border, ln, align, fill) |
|
except Exception as e: |
|
|
|
self.cell(w, h, "[Content contains unsupported characters]", border, ln, align, fill) |
|
|
|
def safe_multi_cell(self, w, h, txt, border=0, align='L', fill=False): |
|
"""Safe multi_cell method that handles Unicode issues""" |
|
try: |
|
clean_txt = clean_text_for_pdf(str(txt)) |
|
self.multi_cell(w, h, clean_txt, border, align, fill) |
|
except Exception as e: |
|
|
|
self.multi_cell(w, h, "[Content contains unsupported characters - please check source document]", border, align, fill) |
|
|
|
def generate_enhanced_pdf_report(student_name: str, student_id: str, ai_score: float, |
|
plagiarism_score: float, suspicious_results: List[dict], |
|
metadata: dict, ai_details: dict, output_path: str): |
|
"""Generate comprehensive PDF report with Unicode safety""" |
|
try: |
|
pdf = EnhancedPDF() |
|
pdf.add_page() |
|
|
|
|
|
pdf.add_section_header("EXECUTIVE SUMMARY") |
|
pdf.set_font('Arial', '', 10) |
|
|
|
summary_data = [ |
|
f"Student: {student_name} ({student_id})", |
|
f"Document Type: {metadata.get('file_type', 'Unknown').upper()}", |
|
f"Word Count: {metadata.get('word_count', 0):,}", |
|
f"AI Detection Score: {ai_score:.1f}% (Confidence: {ai_details.get('confidence', 'N/A')})", |
|
f"Plagiarism Score: {plagiarism_score:.1f}%", |
|
f"Suspicious Sentences: {sum(1 for r in suspicious_results if r['is_suspicious'])}", |
|
f"Analysis Date: {datetime.now().strftime('%B %d, %Y at %H:%M:%S')}" |
|
] |
|
|
|
for item in summary_data: |
|
pdf.safe_cell(0, 6, item, 0, 1) |
|
pdf.ln(5) |
|
|
|
|
|
pdf.add_section_header("RISK ASSESSMENT") |
|
pdf.set_font('Arial', '', 10) |
|
|
|
risk_level = "HIGH" if (ai_score > 70 or plagiarism_score > 30) else "MEDIUM" if (ai_score > 40 or plagiarism_score > 15) else "LOW" |
|
risk_color = (255, 200, 200) if risk_level == "HIGH" else (255, 255, 200) if risk_level == "MEDIUM" else (200, 255, 200) |
|
|
|
pdf.set_fill_color(*risk_color) |
|
pdf.safe_cell(0, 10, f"Overall Risk Level: {risk_level}", 1, 1, 'C', 1) |
|
pdf.ln(5) |
|
|
|
|
|
if ai_details.get('chunk_scores'): |
|
pdf.add_section_header("AI DETECTION ANALYSIS") |
|
pdf.set_font('Arial', '', 9) |
|
pdf.safe_cell(0, 6, f"Chunks Analyzed: {len(ai_details['chunk_scores'])}", 0, 1) |
|
pdf.safe_cell(0, 6, f"Score Consistency (Std Dev): {ai_details.get('std_deviation', 'N/A')}", 0, 1) |
|
pdf.ln(3) |
|
|
|
|
|
suspicious_sentences = [r for r in suspicious_results if r['is_suspicious']] |
|
if suspicious_sentences: |
|
pdf.add_section_header("FLAGGED CONTENT") |
|
pdf.set_font('Arial', '', 9) |
|
|
|
for i, result in enumerate(suspicious_sentences[:10], 1): |
|
pdf.safe_cell(0, 6, f"Issue #{i} (Confidence: {result['confidence']:.1f})", 0, 1) |
|
pdf.add_highlighted_text(result['sentence'], (255, 230, 230), 150) |
|
|
|
|
|
pdf.add_section_header("RECOMMENDATIONS") |
|
pdf.set_font('Arial', '', 10) |
|
|
|
recommendations = [] |
|
if ai_score > 50: |
|
recommendations.append("- Review content for AI-generated sections and rewrite in original voice") |
|
if plagiarism_score > 20: |
|
recommendations.append("- Add proper citations for referenced material") |
|
recommendations.append("- Paraphrase flagged sentences to ensure originality") |
|
if len(suspicious_sentences) > 5: |
|
recommendations.append("- Conduct thorough revision focusing on highlighted sections") |
|
|
|
recommendations.extend([ |
|
"- Use plagiarism detection tools during writing process", |
|
"- Ensure all sources are properly attributed", |
|
"- Maintain academic integrity standards" |
|
]) |
|
|
|
for rec in recommendations: |
|
pdf.safe_multi_cell(0, 6, rec) |
|
pdf.ln(1) |
|
|
|
|
|
pdf.output(output_path) |
|
logger.info(f"PDF report generated successfully: {output_path}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating PDF report: {e}") |
|
|
|
try: |
|
simple_pdf = FPDF() |
|
simple_pdf.add_page() |
|
simple_pdf.set_font('Arial', 'B', 16) |
|
simple_pdf.cell(0, 10, 'AIxBI Analysis Report', 0, 1, 'C') |
|
simple_pdf.ln(10) |
|
simple_pdf.set_font('Arial', '', 12) |
|
simple_pdf.cell(0, 10, f'Student: {clean_text_for_pdf(student_name)}', 0, 1) |
|
simple_pdf.cell(0, 10, f'Student ID: {clean_text_for_pdf(student_id)}', 0, 1) |
|
simple_pdf.cell(0, 10, f'AI Score: {ai_score:.1f}%', 0, 1) |
|
simple_pdf.cell(0, 10, f'Plagiarism Score: {plagiarism_score:.1f}%', 0, 1) |
|
simple_pdf.cell(0, 10, f'Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 0, 1) |
|
simple_pdf.ln(10) |
|
simple_pdf.multi_cell(0, 10, 'Note: Full report could not be generated due to character encoding issues. Please contact administrator if this persists.') |
|
simple_pdf.output(output_path) |
|
logger.info(f"Fallback PDF report generated: {output_path}") |
|
except Exception as fallback_error: |
|
logger.error(f"Even fallback PDF generation failed: {fallback_error}") |
|
raise Exception(f"PDF generation failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
def login(user: str, pwd: str): |
|
"""Enhanced login with logging""" |
|
if user == USERNAME and pwd == PASSWORD: |
|
logger.info(f"Successful login for user: {user}") |
|
return gr.update(visible=False), gr.update(visible=True), "" |
|
else: |
|
logger.warning(f"Failed login attempt for user: {user}") |
|
return gr.update(), gr.update(), "❌ Invalid username or password!" |
|
|
|
def analyze_document(student_name: str, student_id: str, file_obj) -> Tuple: |
|
"""Enhanced document analysis with comprehensive error handling""" |
|
start_time = time.time() |
|
|
|
|
|
if not all([student_name.strip(), student_id.strip(), file_obj]): |
|
return "❌ Please fill all fields and upload a document.", None, None, None, None, None |
|
|
|
logger.info(f"Starting analysis for {student_name} ({student_id})") |
|
|
|
try: |
|
|
|
result = extract_text_with_metadata(file_obj) |
|
if result is None or result[0] is None: |
|
return "❌ Error: Could not read the file. Please upload a valid PDF, DOCX, or TXT.", None, None, None, None, None |
|
|
|
text, metadata = result |
|
|
|
|
|
duplicate = check_duplicate_submission(metadata['file_hash']) |
|
if duplicate: |
|
logger.warning(f"Duplicate submission detected for {student_name}") |
|
return f"⚠️ Warning: This document was previously analyzed by {duplicate['student_name']} on {duplicate['timestamp']}", None, None, None, None, None |
|
|
|
|
|
sentences = preprocess_text(text) |
|
if len(sentences) < 3: |
|
return "❌ Error: Document too short for meaningful analysis (minimum 3 sentences required).", None, None, None, None, None |
|
|
|
|
|
ai_score, ai_details = detect_ai_text(text) |
|
ai_percentage = ai_score * 100 |
|
|
|
|
|
plagiarism_score, suspicious_results = enhanced_plagiarism_check(sentences) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
result_id = save_result(student_id, student_name, ai_percentage, plagiarism_score, |
|
metadata, suspicious_results, processing_time) |
|
|
|
|
|
output_pdf = f"reports/{student_id}_{result_id}_report.pdf" |
|
os.makedirs("reports", exist_ok=True) |
|
|
|
generate_enhanced_pdf_report(student_name, student_id, ai_percentage, plagiarism_score, |
|
suspicious_results, metadata, ai_details, output_pdf) |
|
|
|
|
|
suspicious_sentences = [r['sentence'] for r in suspicious_results if r['is_suspicious']] |
|
if suspicious_sentences: |
|
highlighted_text = "\n\n".join([f"🚨 FLAGGED: {s[:200]}..." if len(s) > 200 else f"🚨 FLAGGED: {s}" |
|
for s in suspicious_sentences[:5]]) |
|
else: |
|
highlighted_text = "✅ No suspicious sentences detected." |
|
|
|
|
|
status_msg = f"""✅ Analysis completed for {student_name} ({student_id}) |
|
📊 Processed {metadata['word_count']:,} words in {processing_time:.1f} seconds |
|
🤖 AI Detection: {ai_percentage:.1f}% (Confidence: {ai_details.get('confidence', 'N/A')}) |
|
📋 Plagiarism: {plagiarism_score:.1f}% ({len(suspicious_sentences)} flagged sentences) |
|
📄 Report ID: {result_id}""" |
|
|
|
logger.info(f"Analysis completed for {student_name} - AI: {ai_percentage:.1f}%, Plagiarism: {plagiarism_score:.1f}%") |
|
|
|
return (status_msg, round(ai_percentage, 2), round(plagiarism_score, 2), |
|
output_pdf, highlighted_text, f"📈 Total sentences analyzed: {len(sentences)}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error during analysis: {e}") |
|
return f"❌ Error during analysis: {str(e)}", None, None, None, None, None |
|
|
|
def show_enhanced_dashboard(): |
|
"""Enhanced dashboard with better formatting""" |
|
try: |
|
df = load_results() |
|
if df.empty: |
|
return pd.DataFrame({"Message": ["No analysis results found. Upload and analyze documents to see data here."]}) |
|
return df |
|
except Exception as e: |
|
logger.error(f"Error loading dashboard: {e}") |
|
return pd.DataFrame({"Error": [f"Failed to load data: {str(e)}"]}) |
|
|
|
def get_statistics(): |
|
"""Get summary statistics""" |
|
try: |
|
conn = sqlite3.connect(DB_NAME) |
|
c = conn.cursor() |
|
|
|
|
|
c.execute("SELECT COUNT(*), AVG(ai_score), AVG(plagiarism_score), AVG(processing_time) FROM results") |
|
stats = c.fetchone() |
|
|
|
|
|
c.execute("SELECT COUNT(*) FROM results WHERE ai_score > 70 OR plagiarism_score > 30") |
|
high_risk = c.fetchone()[0] |
|
|
|
conn.close() |
|
|
|
if stats[0] == 0: |
|
return "No analyses completed yet." |
|
|
|
return f"""📊 **Analysis Statistics** |
|
Total Documents Analyzed: {stats[0]:,} |
|
Average AI Score: {stats[1]:.1f}% |
|
Average Plagiarism Score: {stats[2]:.1f}% |
|
Average Processing Time: {stats[3]:.1f}s |
|
High Risk Documents: {high_risk} ({(high_risk/stats[0]*100):.1f}%)""" |
|
|
|
except Exception as e: |
|
logger.error(f"Error getting statistics: {e}") |
|
return f"Error loading statistics: {str(e)}" |
|
|
|
|
|
|
|
|
|
def create_enhanced_ui(): |
|
with gr.Blocks(theme="soft", title="AIxBI - Professional Plagiarism Detection") as demo: |
|
|
|
with gr.Row(): |
|
if os.path.exists(LOGO_PATH): |
|
gr.Image(LOGO_PATH, height=80, width=80, show_label=False, container=False) |
|
with gr.Column(): |
|
gr.Markdown(""" |
|
# 🔍 **AIxBI - Professional Document Analysis Suite** |
|
### Advanced AI Detection & Plagiarism Checking System |
|
*Ensuring Academic Integrity with Cutting-Edge Technology* |
|
""") |
|
|
|
|
|
login_box = gr.Group(visible=True) |
|
with login_box: |
|
gr.Markdown("## 🔐 **Secure Login**") |
|
with gr.Row(): |
|
user = gr.Textbox(label="👤 Username", placeholder="Enter username") |
|
pwd = gr.Textbox(label="🔑 Password", type="password", placeholder="Enter password") |
|
login_btn = gr.Button("🚀 Login", variant="primary", size="lg") |
|
login_msg = gr.Markdown("", elem_classes="login-message") |
|
|
|
|
|
app_box = gr.Group(visible=False) |
|
with app_box: |
|
with gr.Tabs(): |
|
|
|
with gr.Tab("📄 Document Analysis", elem_id="analysis-tab"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
gr.Markdown("### 👨🎓 **Student Information**") |
|
student_name = gr.Textbox(label="📝 Student Name", placeholder="Enter full name") |
|
student_id = gr.Textbox(label="🆔 Student ID", placeholder="Enter student ID") |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### 📎 **Document Upload**") |
|
file_upload = gr.File( |
|
label="📄 Upload Document", |
|
file_types=[".pdf", ".docx", ".txt"], |
|
file_count="single" |
|
) |
|
|
|
analyze_btn = gr.Button("🔍 Analyze Document", variant="primary", size="lg") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
status = gr.Textbox(label="📊 Analysis Status", lines=4, interactive=False) |
|
doc_info = gr.Textbox(label="📋 Document Information", interactive=False) |
|
|
|
with gr.Column(): |
|
with gr.Row(): |
|
ai_score = gr.Number(label="🤖 AI Detection Score (%)", interactive=False) |
|
plagiarism_score = gr.Number(label="📋 Plagiarism Score (%)", interactive=False) |
|
|
|
suspicious_text = gr.Textbox( |
|
label="🚨 Flagged Content", |
|
lines=8, |
|
placeholder="Suspicious sentences will appear here...", |
|
interactive=False |
|
) |
|
|
|
pdf_output = gr.File(label="📄 Download Detailed Report") |
|
|
|
|
|
with gr.Tab("📊 Analysis Dashboard", elem_id="dashboard-tab"): |
|
with gr.Row(): |
|
dashboard_btn = gr.Button("🔄 Refresh Dashboard", variant="secondary") |
|
stats_btn = gr.Button("📈 Show Statistics", variant="secondary") |
|
|
|
stats_display = gr.Markdown("", elem_classes="stats-display") |
|
dashboard = gr.Dataframe( |
|
headers=["ID", "Student ID", "Student Name", "AI Score (%)", |
|
"Plagiarism Score (%)", "Word Count", "Flagged Sentences", |
|
"Processing Time (s)", "File Type", "Timestamp", "Status"], |
|
interactive=False, |
|
wrap=True |
|
) |
|
|
|
|
|
with gr.Tab("❓ Help & Guidelines", elem_id="help-tab"): |
|
gr.Markdown(""" |
|
## 📖 **User Guide** |
|
|
|
### 🎯 **How to Use** |
|
1. **Login** with your credentials |
|
2. **Enter student information** (name and ID) |
|
3. **Upload document** (PDF, DOCX, or TXT format) |
|
4. **Click "Analyze Document"** and wait for results |
|
5. **Download the detailed PDF report** for comprehensive analysis |
|
|
|
### 🔍 **Understanding Results** |
|
|
|
#### 🤖 **AI Detection Score** |
|
- **0-30%**: Low probability of AI-generated content |
|
- **31-60%**: Moderate probability - review recommended |
|
- **61-100%**: High probability - likely AI-generated |
|
|
|
#### 📋 **Plagiarism Score** |
|
- **0-15%**: Acceptable similarity level |
|
- **16-30%**: Moderate concern - check citations |
|
- **31%+**: High concern - significant plagiarism detected |
|
|
|
#### 🚨 **Risk Levels** |
|
- **🟢 LOW**: Minimal concerns detected |
|
- **🟡 MEDIUM**: Some issues found - review needed |
|
- **🔴 HIGH**: Serious concerns - immediate action required |
|
|
|
### 📄 **Supported File Formats** |
|
- **PDF**: Adobe PDF documents |
|
- **DOCX**: Microsoft Word documents |
|
- **TXT**: Plain text files |
|
|
|
### 🛡️ **Best Practices** |
|
- Upload final versions of documents |
|
- Ensure documents contain at least 100 words |
|
- Review flagged content carefully |
|
- Use reports for educational feedback |
|
|
|
### ⚠️ **Important Notes** |
|
- Analysis results are for educational purposes |
|
- False positives may occur - human review recommended |
|
- Keep PDF reports for documentation |
|
- All analyses are logged for institutional records |
|
""") |
|
|
|
|
|
login_btn.click( |
|
fn=login, |
|
inputs=[user, pwd], |
|
outputs=[login_box, app_box, login_msg] |
|
) |
|
|
|
analyze_btn.click( |
|
fn=analyze_document, |
|
inputs=[student_name, student_id, file_upload], |
|
outputs=[status, ai_score, plagiarism_score, pdf_output, suspicious_text, doc_info] |
|
) |
|
|
|
dashboard_btn.click( |
|
fn=show_enhanced_dashboard, |
|
outputs=[dashboard] |
|
) |
|
|
|
stats_btn.click( |
|
fn=get_statistics, |
|
outputs=[stats_display] |
|
) |
|
|
|
return demo |
|
|
|
|
|
|
|
|
|
def cleanup_old_reports(days_old: int = 30): |
|
"""Clean up old report files""" |
|
try: |
|
import glob |
|
report_files = glob.glob("reports/*.pdf") |
|
current_time = time.time() |
|
|
|
for file_path in report_files: |
|
if os.path.getmtime(file_path) < (current_time - days_old * 24 * 60 * 60): |
|
os.remove(file_path) |
|
logger.info(f"Cleaned up old report: {file_path}") |
|
except Exception as e: |
|
logger.error(f"Error during cleanup: {e}") |
|
|
|
def export_database_backup(): |
|
"""Export database to CSV for backup""" |
|
try: |
|
df = load_results() |
|
backup_file = f"backup_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" |
|
df.to_csv(backup_file, index=False) |
|
logger.info(f"Database backup created: {backup_file}") |
|
return backup_file |
|
except Exception as e: |
|
logger.error(f"Error creating backup: {e}") |
|
return None |
|
|
|
def validate_system_requirements(): |
|
"""Check if all required components are available""" |
|
requirements = { |
|
"Models loaded": embedder is not None and model is not None, |
|
"Database accessible": os.path.exists(DB_NAME), |
|
"Reports directory": os.path.exists("reports") or os.makedirs("reports", exist_ok=True) or True, |
|
"Logo file": os.path.exists(LOGO_PATH) |
|
} |
|
|
|
for requirement, status in requirements.items(): |
|
if status: |
|
logger.info(f"✅ {requirement}") |
|
else: |
|
logger.warning(f"❌ {requirement}") |
|
|
|
return all(requirements.values()) |
|
|
|
|
|
|
|
|
|
def log_performance_metrics(): |
|
"""Log system performance metrics""" |
|
try: |
|
import psutil |
|
cpu_percent = psutil.cpu_percent() |
|
memory_percent = psutil.virtual_memory().percent |
|
disk_usage = psutil.disk_usage('.').percent |
|
|
|
logger.info(f"Performance - CPU: {cpu_percent}%, Memory: {memory_percent}%, Disk: {disk_usage}%") |
|
|
|
|
|
if os.path.exists(DB_NAME): |
|
db_size = os.path.getsize(DB_NAME) / (1024 * 1024) |
|
logger.info(f"Database size: {db_size:.2f} MB") |
|
|
|
except ImportError: |
|
logger.warning("psutil not available - performance monitoring disabled") |
|
except Exception as e: |
|
logger.error(f"Error logging performance metrics: {e}") |
|
|
|
|
|
|
|
|
|
def main(): |
|
"""Main application entry point""" |
|
try: |
|
logger.info("Starting AIxBI Plagiarism Detection System") |
|
|
|
|
|
if not validate_system_requirements(): |
|
logger.error("System requirements not met. Please check the logs.") |
|
return |
|
|
|
|
|
cleanup_old_reports() |
|
|
|
|
|
log_performance_metrics() |
|
|
|
|
|
demo = create_enhanced_ui() |
|
|
|
logger.info("System ready - launching web interface") |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
show_error=True, |
|
quiet=False |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to start application: {e}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
main() |