EGYADMIN's picture
Upload 10 files
4c143b5 verified
"""
وحدة تحليل المستندات لنظام إدارة المناقصات - Hybrid Face
"""
import os
import re
import logging
import threading
from pathlib import Path
import datetime
import json
# تهيئة السجل
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('document_analysis')
class DocumentAnalyzer:
"""فئة تحليل المستندات"""
def __init__(self, config=None):
"""تهيئة محلل المستندات"""
self.config = config
self.analysis_in_progress = False
self.current_document = None
self.analysis_results = {}
# إنشاء مجلد المستندات إذا لم يكن موجوداً
if config and hasattr(config, 'DOCUMENTS_PATH'):
self.documents_path = Path(config.DOCUMENTS_PATH)
else:
self.documents_path = Path('data/documents')
if not self.documents_path.exists():
self.documents_path.mkdir(parents=True, exist_ok=True)
def analyze_document(self, document_path, document_type="tender", callback=None):
"""تحليل مستند"""
if self.analysis_in_progress:
logger.warning("هناك عملية تحليل جارية بالفعل")
return False
if not os.path.exists(document_path):
logger.error(f"المستند غير موجود: {document_path}")
return False
self.analysis_in_progress = True
self.current_document = document_path
self.analysis_results = {
"document_path": document_path,
"document_type": document_type,
"analysis_start_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"status": "جاري التحليل",
"items": [],
"entities": [],
"dates": [],
"amounts": [],
"risks": []
}
# بدء التحليل في خيط منفصل
thread = threading.Thread(
target=self._analyze_document_thread,
args=(document_path, document_type, callback)
)
thread.daemon = True
thread.start()
return True
def _analyze_document_thread(self, document_path, document_type, callback):
"""خيط تحليل المستند"""
try:
# تحديد نوع المستند
file_extension = os.path.splitext(document_path)[1].lower()
if file_extension == '.pdf':
self.analysis_results = self._analyze_pdf(document_path, document_type)
elif file_extension == '.docx':
self._analyze_docx(document_path, document_type)
elif file_extension == '.xlsx':
self._analyze_xlsx(document_path, document_type)
elif file_extension == '.txt':
self._analyze_txt(document_path, document_type)
else:
logger.error(f"نوع المستند غير مدعوم: {file_extension}")
self.analysis_results["status"] = "فشل التحليل"
self.analysis_results["error"] = "نوع المستند غير مدعوم"
# تحديث حالة التحليل
if self.analysis_results["status"] != "فشل التحليل":
self.analysis_results["status"] = "اكتمل التحليل"
self.analysis_results["analysis_end_time"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"اكتمل تحليل المستند: {document_path}")
except Exception as e:
logger.error(f"خطأ في تحليل المستند: {str(e)}")
self.analysis_results["status"] = "فشل التحليل"
self.analysis_results["error"] = str(e)
finally:
self.analysis_in_progress = False
# استدعاء دالة الاستجابة إذا تم توفيرها
if callback and callable(callback):
callback(self.analysis_results)
def _analyze_pdf(self, document_path, document_type):
"""تحليل مستند PDF باستخدام الذكاء الاصطناعي"""
try:
# استخراج النص من PDF
text = self._extract_text_from_pdf(document_path)
# تحليل متقدم للمستند
analysis = {
"file_info": {
"name": os.path.basename(document_path),
"type": "PDF",
"size": os.path.getsize(document_path),
"pages": self._count_pages(document_path),
"create_date": self._get_creation_date(document_path),
"modify_date": time.ctime(os.path.getmtime(document_path))
},
"content_analysis": {
"contract_terms": self._analyze_contract_terms(text),
"financial_analysis": self._analyze_financial_terms(text),
"legal_analysis": self._analyze_legal_terms(text),
"risk_analysis": self._analyze_risks(text),
"conditions_analysis": self._analyze_conditions(text),
"technical_specs": self._analyze_technical_specs(text),
"key_dates": self._extract_key_dates(text),
"important_figures": self._extract_figures(text),
"entities": self._extract_entities(text)
},
"statistical_analysis": {
"word_count": len(text.split()),
"unique_terms": self._analyze_unique_terms(text),
"topic_distribution": self._analyze_topics(text),
"complexity_score": self._calculate_complexity(text)
},
"compliance_check": {
"missing_sections": self._check_missing_sections(text),
"required_terms": self._check_required_terms(text),
"compliance_score": self._calculate_compliance_score(text)
},
"summary": self._generate_summary(text),
"recommendations": self._generate_recommendations(text),
"related_documents": self._find_related_documents(document_path),
"version_info": self._get_version_info(document_path)
}
# إضافة تحليل متخصص حسب نوع المستند
if document_type == "tender":
analysis["tender_analysis"] = self._analyze_tender_specifics(text)
elif document_type == "contract":
analysis["contract_analysis"] = self._analyze_contract_specifics(text)
elif document_type == "technical":
analysis["technical_analysis"] = self._analyze_technical_specifics(text)
return analysis
except Exception as e:
logger.error(f"خطأ في تحليل PDF: {str(e)}")
raise
def _extract_text_from_pdf(self, document_path):
"""استخراج النص من ملف PDF (تحتاج إلى مكتبة مثل PyPDF2 أو pdfplumber)"""
# Implementation using a PDF processing library like PyPDF2 or pdfplumber is needed here.
# This is a placeholder. Replace with actual PDF text extraction.
return "Placeholder text extracted from PDF"
def _analyze_contract_terms(self, text):
"""تحليل بنود العقد"""
# Implementation for contract term analysis is needed here. This is a placeholder.
return "Placeholder contract terms analysis"
def _analyze_financial_terms(self, text):
"""تحليل الجزء المالي"""
# Implementation for financial term analysis is needed here. This is a placeholder.
return "Placeholder financial terms analysis"
def _analyze_legal_terms(self, text):
"""تحليل القانوني للعقد"""
# Implementation for legal term analysis is needed here. This is a placeholder.
return "Placeholder legal terms analysis"
def _analyze_risks(self, text):
"""تحليل المخاطر"""
# Implementation for risk analysis is needed here. This is a placeholder.
return "Placeholder risk analysis"
def _analyze_conditions(self, text):
"""دراسة كراسة الشروط"""
# Implementation for conditions analysis is needed here. This is a placeholder.
return "Placeholder conditions analysis"
def _generate_summary(self, text):
"""توليد ملخص"""
# Implementation for summary generation is needed here. This is a placeholder.
return "Placeholder summary"
def _generate_recommendations(self, text):
"""توليد التوصيات"""
# Implementation for recommendation generation is needed here. This is a placeholder.
return "Placeholder recommendations"
def _analyze_docx(self, document_path, document_type):
"""تحليل مستند Word"""
try:
# محاكاة تحليل مستند Word
logger.info(f"تحليل مستند Word: {document_path}")
# في التطبيق الفعلي، سيتم استخدام مكتبة مثل python-docx
# لاستخراج النص من ملف Word وتحليله
# محاكاة استخراج البنود والكيانات والتواريخ والمبالغ والمخاطر
# (مشابه لتحليل PDF)
self.analysis_results["items"] = [
{"id": 1, "name": "توريد معدات", "description": "توريد معدات المشروع", "unit": "مجموعة", "estimated_quantity": 10},
{"id": 2, "name": "تركيب المعدات", "description": "تركيب وتشغيل المعدات", "unit": "مجموعة", "estimated_quantity": 10},
{"id": 3, "name": "التدريب", "description": "تدريب الموظفين على استخدام المعدات", "unit": "يوم", "estimated_quantity": 20}
]
# محاكاة استخراج الكيانات والتواريخ والمبالغ والمخاطر
# (مشابه لتحليل PDF)
except Exception as e:
logger.error(f"خطأ في تحليل مستند Word: {str(e)}")
raise
def _analyze_xlsx(self, document_path, document_type):
"""تحليل مستند Excel"""
try:
# محاكاة تحليل مستند Excel
logger.info(f"تحليل مستند Excel: {document_path}")
# في التطبيق الفعلي، سيتم استخدام مكتبة مثل pandas أو openpyxl
# لاستخراج البيانات من ملف Excel وتحليلها
# محاكاة استخراج البنود
self.analysis_results["items"] = [
{"id": 1, "name": "بند 1", "description": "وصف البند 1", "unit": "وحدة", "estimated_quantity": 100},
{"id": 2, "name": "بند 2", "description": "وصف البند 2", "unit": "وحدة", "estimated_quantity": 200},
{"id": 3, "name": "بند 3", "description": "وصف البند 3", "unit": "وحدة", "estimated_quantity": 300}
]
# محاكاة استخراج المبالغ
self.analysis_results["amounts"] = [
{"type": "item_cost", "amount": 10000, "currency": "SAR", "description": "تكلفة البند 1"},
{"type": "item_cost", "amount": 20000, "currency": "SAR", "description": "تكلفة البند 2"},
{"type": "item_cost", "amount": 30000, "currency": "SAR", "description": "تكلفة البند 3"}
]
except Exception as e:
logger.error(f"خطأ في تحليل مستند Excel: {str(e)}")
raise
def _analyze_txt(self, document_path, document_type):
"""تحليل مستند نصي"""
try:
# محاكاة تحليل مستند نصي
logger.info(f"تحليل مستند نصي: {document_path}")
# في التطبيق الفعلي، سيتم قراءة الملف النصي وتحليله
# محاكاة استخراج البنود والكيانات والتواريخ والمبالغ والمخاطر
# (مشابه للتحليلات الأخرى)
except Exception as e:
logger.error(f"خطأ في تحليل مستند نصي: {str(e)}")
raise
def get_analysis_status(self):
"""الحصول على حالة التحليل الحالي"""
if not self.analysis_in_progress:
if not self.analysis_results:
return {"status": "لا يوجد تحليل جارٍ"}
else:
return {"status": self.analysis_results.get("status", "غير معروف")}
return {
"status": "جاري التحليل",
"document_path": self.current_document,
"start_time": self.analysis_results.get("analysis_start_time")
}
def get_analysis_results(self):
"""الحصول على نتائج التحليل"""
return self.analysis_results
def export_analysis_results(self, output_path=None):
"""تصدير نتائج التحليل إلى ملف JSON"""
if not self.analysis_results:
logger.warning("لا توجد نتائج تحليل للتصدير")
return None
if not output_path:
# إنشاء اسم ملف افتراضي
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"analysis_results_{timestamp}.json"
output_path = os.path.join(self.documents_path, filename)
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.analysis_results, f, ensure_ascii=False, indent=4)
logger.info(f"تم تصدير نتائج التحليل إلى: {output_path}")
return output_path
except Exception as e:
logger.error(f"خطأ في تصدير نتائج التحليل: {str(e)}")
return None
def import_analysis_results(self, input_path):
"""استيراد نتائج التحليل من ملف JSON"""
if not os.path.exists(input_path):
logger.error(f"ملف نتائج التحليل غير موجود: {input_path}")
return False
try:
with open(input_path, 'r', encoding='utf-8') as f:
self.analysis_results = json.load(f)
logger.info(f"تم استيراد نتائج التحليل من: {input_path}")
return True
except Exception as e:
logger.error(f"خطأ في استيراد نتائج التحليل: {str(e)}")
return False
def _count_pages(self, document_path):
"""حساب عدد صفحات المستند"""
try:
import PyPDF2
with open(document_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
return len(reader.pages)
except:
return 0
def _get_creation_date(self, document_path):
"""استخراج تاريخ إنشاء المستند"""
try:
import PyPDF2
with open(document_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
if '/CreationDate' in reader.metadata:
return reader.metadata['/CreationDate']
return "غير متوفر"
except:
return "غير متوفر"
def _analyze_technical_specs(self, text):
"""تحليل المواصفات الفنية"""
specs = {
"materials": self._extract_materials(text),
"measurements": self._extract_measurements(text),
"standards": self._extract_standards(text)
}
return specs
def _extract_key_dates(self, text):
"""استخراج التواريخ المهمة"""
import re
date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}'
dates = re.findall(date_pattern, text)
return list(set(dates))
def _extract_figures(self, text):
"""استخراج الأرقام والقيم المهمة"""
import re
# البحث عن القيم النقدية
currency_pattern = r'[\d,]+\.?\d*\s*(?:ريال|دولار|SAR|USD)'
currencies = re.findall(currency_pattern, text)
# البحث عن النسب المئوية
percentage_pattern = r'\d+\.?\d*\s*%'
percentages = re.findall(percentage_pattern, text)
return {
"currencies": currencies,
"percentages": percentages
}
def _analyze_unique_terms(self, text):
"""تحليل المصطلحات الفريدة"""
words = set(text.split())
return list(words)
def _calculate_complexity(self, text):
"""حساب مستوى تعقيد النص"""
words = text.split()
avg_word_length = sum(len(word) for word in words) / len(words)
sentences = text.split('.')
avg_sentence_length = len(words) / len(sentences)
# حساب درجة التعقيد (1-10)
complexity = min((avg_word_length * 0.5 + avg_sentence_length * 0.2), 10)
return round(complexity, 2)
def _check_missing_sections(self, text):
"""التحقق من الأقسام المفقودة"""
required_sections = [
"نطاق العمل",
"المواصفات الفنية",
"الشروط العامة",
"الضمانات",
"الغرامات",
"شروط الدفع"
]
missing = []
for section in required_sections:
if section not in text:
missing.append(section)
return missing
def _find_related_documents(self, document_path):
"""البحث عن المستندات المرتبطة"""
directory = os.path.dirname(document_path)
base_name = os.path.basename(document_path)
related = []
for file in os.listdir(directory):
if file != base_name and file.startswith(base_name.split('_')[0]):
related.append(file)
return related