Spaces:
Runtime error
Runtime error
import os | |
import re | |
import torch | |
from collections import Counter | |
from transformers import pipeline, AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoModelForTokenClassification | |
import PyPDF2 | |
import openai | |
import docx | |
from arabert.preprocess import ArabertPreprocessor | |
import gradio as gr | |
# التحقق من توفر GPU واستخدامه | |
device = 0 if torch.cuda.is_available() else -1 | |
# تحميل نماذج BERT و GPT2 | |
arabic_bert_tokenizer = AutoTokenizer.from_pretrained("asafaya/bert-base-arabic") | |
arabic_bert_model = AutoModel.from_pretrained("asafaya/bert-base-arabic") | |
arabert_tokenizer = AutoTokenizer.from_pretrained("aubmindlab/bert-base-arabertv02") | |
arabert_model = AutoModel.from_pretrained("aubmindlab/bert-base-arabertv02") | |
gpt2_tokenizer = AutoTokenizer.from_pretrained("aubmindlab/aragpt2-large", trust_remote_code=True) | |
gpt2_model = AutoModelForCausalLM.from_pretrained("aubmindlab/aragpt2-large", trust_remote_code=True) | |
# إعداد المعالج النصي لـ AraBERT | |
arabert_prep = ArabertPreprocessor("aubmindlab/bert-base-arabertv02") | |
# دالة لتقسيم النص إلى أجزاء بناءً على عدد التوكنز | |
def split_text_into_chunks(text, tokenizer, max_length): | |
tokens = tokenizer.tokenize(text) | |
chunks = [] | |
for i in range(0, len(tokens), max_length): | |
chunk_tokens = tokens[i:i + max_length] | |
chunk_text = tokenizer.convert_tokens_to_string(chunk_tokens) | |
chunks.append(chunk_text) | |
return chunks | |
# دالة لتجزئة النص إلى جمل باستخدام التعبيرات العادية | |
def extract_sentences(text): | |
sentences = re.split(r'(?<=[.!؟]) +', text) | |
return sentences | |
# دالة لاستخراج الاقتباسات من النص | |
def extract_quotes(text): | |
quotes = re.findall(r'[“"«](.*?)[”"»]', text) | |
return quotes | |
# دالة لعد الرموز في النص | |
def count_tokens(text, tokenizer): | |
tokens = tokenizer.tokenize(text) | |
return len(tokens) | |
# دالة لاستخراج النص من ملفات PDF | |
def extract_pdf_text(file_path): | |
text = "" | |
with open(file_path, "rb") as pdf_file: | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
for page_num in range(len(pdf_reader.pages)): | |
page = pdf_reader.pages[page_num] | |
text += page.extract_text() or "" | |
return text | |
# دالة لاستخراج النص من ملفات DOCX | |
def extract_docx_text(file_path): | |
doc = docx.Document(file_path) | |
text = "\n".join([para.text for para in doc.paragraphs]) | |
return text | |
# دالة لقراءة النص من ملف مع التعامل مع مشاكل الترميز | |
def read_text_file(file_path): | |
try: | |
with open(file_path, "r", encoding="utf-8") as file: | |
return file.read() | |
except UnicodeDecodeError: | |
try: | |
with open(file_path, "r", encoding="latin-1") as file: | |
return file.read() | |
except UnicodeDecodeError: | |
with open(file_path, "r", encoding="cp1252") as file: | |
return file.read() | |
# دالة لاستخراج المشاهد من النص | |
def extract_scenes(text): | |
scenes = re.split(r'داخلي|خارجي|... داخلي ...|... خارجي ...', text) | |
scenes = [scene.strip() for scene in scenes if scene.strip()] | |
return scenes | |
# دالة لاستخراج تفاصيل المشهد (المكان والوقت) | |
def extract_scene_details(scene): | |
details = {} | |
location_match = re.search(r'(داخلي|خارجي|... داخلي ...|... خارجي ...)', scene) | |
time_match = re.search(r'(ليلاً|نهاراً|شروق|غروب|... ليل ...|... نهار ...)', scene) | |
if location_match: | |
details['location'] = location_match.group() | |
if time_match: | |
details['time'] = time_match.group() | |
return details | |
# دالة لاستخراج أعمار الشخصيات | |
def extract_ages(text): | |
ages = re.findall(r'\b(\d{1,2})\s*(?:عام|سنة|سنوات)\s*(?:من العمر|عمره|عمرها)', text) | |
return ages | |
# دالة لاستخراج وصف الشخصيات | |
def extract_character_descriptions(text): | |
descriptions = re.findall(r'شخصية\s*(.*?)\s*:\s*وصف\s*(.*?)\s*(?:\.|،)', text, re.DOTALL) | |
return descriptions | |
# دالة لاستخراج تكرار الشخصيات | |
def extract_character_frequency(entities): | |
persons = [ent[0] for ent in entities['PERSON']] | |
frequency = Counter(persons) | |
return frequency | |
# دالة لاستخراج الحوارات وتحديد المتحدثين | |
def extract_dialogues(text): | |
dialogues = re.findall(r'(.*?)(?:\s*:\s*)(.*?)(?=\n|$)', text, re.DOTALL) | |
return dialogues | |
# دالة لمعالجة الملفات وتقسيمها بناءً على عدد التوكنز | |
def process_files(input_files, output_directory_950): | |
for file_path in input_files: | |
if os.path.isdir(file_path): # التأكد من أن الملف ليس مجلدًا | |
continue | |
if file_path.endswith(".pdf"): | |
text = extract_pdf_text(file_path) | |
elif file_path.endswith(".docx"): | |
text = extract_docx_text(file_path) | |
else: | |
text = read_text_file(file_path) | |
# تقسيم النص إلى أجزاء لا تتجاوز 950 توكنز | |
chunks_950 = split_text_into_chunks(text, gpt2_tokenizer, 950) | |
for i, chunk in enumerate(chunks_950): | |
output_file_950 = os.path.join(output_directory_950, f"{os.path.splitext(os.path.basename(file_path))[0]}_part_{i+1}.txt") | |
with open(output_file_950, "w", encoding="utf-8") as file: | |
file.write(chunk) | |
# دالة لتحليل النصوص واستخراج المعلومات وحفظ النتائج | |
def analyze_files(input_files, output_directory, tokenizer, max_length): | |
results = [] | |
for file_path in input_files: | |
if os.path.isdir(file_path): # التأكد من أن الملف ليس مجلدًا | |
continue | |
with open(file_path, "r", encoding="utf-8") as file: | |
text = file.read() | |
chunks = split_text_into_chunks(text, tokenizer, max_length) | |
# إجراء التحليل على النصوص المقسمة | |
for chunk in chunks: | |
sentences = extract_sentences(chunk) | |
quotes = extract_quotes(chunk) | |
token_count = count_tokens(chunk, tokenizer) | |
scenes = extract_scenes(chunk) | |
ages = extract_ages(chunk) | |
character_descriptions = extract_character_descriptions(chunk) | |
dialogues = extract_dialogues(chunk) | |
scene_details = [extract_scene_details(scene) for scene in scenes] | |
result = { | |
"sentences": sentences, | |
"quotes": quotes, | |
"token_count": token_count, | |
"scenes": scenes, | |
"scene_details": scene_details, | |
"ages": ages, | |
"character_descriptions": character_descriptions, | |
"dialogues": dialogues | |
} | |
results.append(result) | |
# حفظ النتائج | |
base_filename = os.path.basename(file_path) | |
with open(os.path.join(output_directory, f"{base_filename}_sentences.txt"), "a", encoding="utf-8") as file: | |
file.write("\n".join(sentences) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_quotes.txt"), "a", encoding="utf-8") as file: | |
file.write("\n".join(quotes) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_token_count.txt"), "a", encoding="utf-8") as file: | |
file.write(str(token_count) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_scenes.txt"), "a", encoding="utf-8") as file: | |
file.write("\n".join(scenes) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_scene_details.txt"), "a", encoding="utf-8") as file: | |
file.write(str(scene_details) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_ages.txt"), "a", encoding="utf-8") as file: | |
file.write(str(ages) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_character_descriptions.txt"), "a", encoding="utf-8") as file: | |
file.write(str(character_descriptions) + "\n") | |
with open(os.path.join(output_directory, f"{base_filename}_dialogues.txt"), "a", encoding="utf-8") as file: | |
file.write(str(dialogues) + "\n") | |
return results | |
# تحديد المسارات | |
output_directory_950 = "/Volumes/CLOCKWORK T/clockworkspace/first pro/1000" | |
output_directory_950_out = "/Volumes/CLOCKWORK T/clockworkspace/first pro/out/1000" | |
# التأكد من وجود المسارات | |
os.makedirs(output_directory_950, exist_ok=True) | |
os.makedirs(output_directory_950_out, exist_ok=True) | |
# تعريف واجهة Gradio | |
def analyze_and_complete(input_files): | |
# معالجة الملفات وتقسيمها | |
process_files(input_files, output_directory_950) | |
# تحليل الملفات المقسمة إلى 950 توكنز | |
results = analyze_files(input_files, output_directory_950_out, gpt2_tokenizer, 950) | |
return results | |
interface = gr.Interface( | |
fn=analyze_and_complete, | |
inputs=gr.File(file_count="multiple", type="filepath"), | |
outputs="json", | |
title="Movie Script Analyzer and Completer", | |
description="Upload text, PDF, or DOCX files to analyze and complete the movie script." | |
) | |
if __name__ == "__main__": | |
interface.launch(share=True) | |