|
""" |
|
وحدة تحليل المستندات لنظام إدارة المناقصات - Hybrid Face |
|
""" |
|
|
|
import os |
|
import re |
|
import logging |
|
import threading |
|
from pathlib import Path |
|
import datetime |
|
import json |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger('document_analysis') |
|
|
|
class DocumentAnalyzer: |
|
"""فئة تحليل المستندات""" |
|
|
|
def __init__(self, config=None): |
|
"""تهيئة محلل المستندات""" |
|
self.config = config |
|
self.analysis_in_progress = False |
|
self.current_document = None |
|
self.analysis_results = {} |
|
|
|
|
|
if config and hasattr(config, 'DOCUMENTS_PATH'): |
|
self.documents_path = Path(config.DOCUMENTS_PATH) |
|
else: |
|
self.documents_path = Path('data/documents') |
|
|
|
if not self.documents_path.exists(): |
|
self.documents_path.mkdir(parents=True, exist_ok=True) |
|
|
|
def analyze_document(self, document_path, document_type="tender", callback=None): |
|
"""تحليل مستند""" |
|
if self.analysis_in_progress: |
|
logger.warning("هناك عملية تحليل جارية بالفعل") |
|
return False |
|
|
|
if not os.path.exists(document_path): |
|
logger.error(f"المستند غير موجود: {document_path}") |
|
return False |
|
|
|
self.analysis_in_progress = True |
|
self.current_document = document_path |
|
self.analysis_results = { |
|
"document_path": document_path, |
|
"document_type": document_type, |
|
"analysis_start_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
"status": "جاري التحليل", |
|
"items": [], |
|
"entities": [], |
|
"dates": [], |
|
"amounts": [], |
|
"risks": [] |
|
} |
|
|
|
|
|
thread = threading.Thread( |
|
target=self._analyze_document_thread, |
|
args=(document_path, document_type, callback) |
|
) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
return True |
|
|
|
def _analyze_document_thread(self, document_path, document_type, callback): |
|
"""خيط تحليل المستند""" |
|
try: |
|
|
|
file_extension = os.path.splitext(document_path)[1].lower() |
|
|
|
if file_extension == '.pdf': |
|
self.analysis_results = self._analyze_pdf(document_path, document_type) |
|
elif file_extension == '.docx': |
|
self._analyze_docx(document_path, document_type) |
|
elif file_extension == '.xlsx': |
|
self._analyze_xlsx(document_path, document_type) |
|
elif file_extension == '.txt': |
|
self._analyze_txt(document_path, document_type) |
|
else: |
|
logger.error(f"نوع المستند غير مدعوم: {file_extension}") |
|
self.analysis_results["status"] = "فشل التحليل" |
|
self.analysis_results["error"] = "نوع المستند غير مدعوم" |
|
|
|
|
|
if self.analysis_results["status"] != "فشل التحليل": |
|
self.analysis_results["status"] = "اكتمل التحليل" |
|
self.analysis_results["analysis_end_time"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
|
|
logger.info(f"اكتمل تحليل المستند: {document_path}") |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل المستند: {str(e)}") |
|
self.analysis_results["status"] = "فشل التحليل" |
|
self.analysis_results["error"] = str(e) |
|
|
|
finally: |
|
self.analysis_in_progress = False |
|
|
|
|
|
if callback and callable(callback): |
|
callback(self.analysis_results) |
|
|
|
def _analyze_pdf(self, document_path, document_type): |
|
"""تحليل مستند PDF باستخدام الذكاء الاصطناعي""" |
|
try: |
|
|
|
text = self._extract_text_from_pdf(document_path) |
|
|
|
|
|
analysis = { |
|
"file_info": { |
|
"name": os.path.basename(document_path), |
|
"type": "PDF", |
|
"size": os.path.getsize(document_path), |
|
"pages": self._count_pages(document_path), |
|
"create_date": self._get_creation_date(document_path), |
|
"modify_date": time.ctime(os.path.getmtime(document_path)) |
|
}, |
|
"content_analysis": { |
|
"contract_terms": self._analyze_contract_terms(text), |
|
"financial_analysis": self._analyze_financial_terms(text), |
|
"legal_analysis": self._analyze_legal_terms(text), |
|
"risk_analysis": self._analyze_risks(text), |
|
"conditions_analysis": self._analyze_conditions(text), |
|
"technical_specs": self._analyze_technical_specs(text), |
|
"key_dates": self._extract_key_dates(text), |
|
"important_figures": self._extract_figures(text), |
|
"entities": self._extract_entities(text) |
|
}, |
|
"statistical_analysis": { |
|
"word_count": len(text.split()), |
|
"unique_terms": self._analyze_unique_terms(text), |
|
"topic_distribution": self._analyze_topics(text), |
|
"complexity_score": self._calculate_complexity(text) |
|
}, |
|
"compliance_check": { |
|
"missing_sections": self._check_missing_sections(text), |
|
"required_terms": self._check_required_terms(text), |
|
"compliance_score": self._calculate_compliance_score(text) |
|
}, |
|
"summary": self._generate_summary(text), |
|
"recommendations": self._generate_recommendations(text), |
|
"related_documents": self._find_related_documents(document_path), |
|
"version_info": self._get_version_info(document_path) |
|
} |
|
|
|
|
|
if document_type == "tender": |
|
analysis["tender_analysis"] = self._analyze_tender_specifics(text) |
|
elif document_type == "contract": |
|
analysis["contract_analysis"] = self._analyze_contract_specifics(text) |
|
elif document_type == "technical": |
|
analysis["technical_analysis"] = self._analyze_technical_specifics(text) |
|
|
|
return analysis |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل PDF: {str(e)}") |
|
raise |
|
|
|
def _extract_text_from_pdf(self, document_path): |
|
"""استخراج النص من ملف PDF (تحتاج إلى مكتبة مثل PyPDF2 أو pdfplumber)""" |
|
|
|
|
|
return "Placeholder text extracted from PDF" |
|
|
|
def _analyze_contract_terms(self, text): |
|
"""تحليل بنود العقد""" |
|
|
|
return "Placeholder contract terms analysis" |
|
|
|
def _analyze_financial_terms(self, text): |
|
"""تحليل الجزء المالي""" |
|
|
|
return "Placeholder financial terms analysis" |
|
|
|
def _analyze_legal_terms(self, text): |
|
"""تحليل القانوني للعقد""" |
|
|
|
return "Placeholder legal terms analysis" |
|
|
|
def _analyze_risks(self, text): |
|
"""تحليل المخاطر""" |
|
|
|
return "Placeholder risk analysis" |
|
|
|
def _analyze_conditions(self, text): |
|
"""دراسة كراسة الشروط""" |
|
|
|
return "Placeholder conditions analysis" |
|
|
|
def _generate_summary(self, text): |
|
"""توليد ملخص""" |
|
|
|
return "Placeholder summary" |
|
|
|
def _generate_recommendations(self, text): |
|
"""توليد التوصيات""" |
|
|
|
return "Placeholder recommendations" |
|
|
|
|
|
|
|
def _analyze_docx(self, document_path, document_type): |
|
"""تحليل مستند Word""" |
|
try: |
|
|
|
logger.info(f"تحليل مستند Word: {document_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
self.analysis_results["items"] = [ |
|
{"id": 1, "name": "توريد معدات", "description": "توريد معدات المشروع", "unit": "مجموعة", "estimated_quantity": 10}, |
|
{"id": 2, "name": "تركيب المعدات", "description": "تركيب وتشغيل المعدات", "unit": "مجموعة", "estimated_quantity": 10}, |
|
{"id": 3, "name": "التدريب", "description": "تدريب الموظفين على استخدام المعدات", "unit": "يوم", "estimated_quantity": 20} |
|
] |
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل مستند Word: {str(e)}") |
|
raise |
|
|
|
def _analyze_xlsx(self, document_path, document_type): |
|
"""تحليل مستند Excel""" |
|
try: |
|
|
|
logger.info(f"تحليل مستند Excel: {document_path}") |
|
|
|
|
|
|
|
|
|
|
|
self.analysis_results["items"] = [ |
|
{"id": 1, "name": "بند 1", "description": "وصف البند 1", "unit": "وحدة", "estimated_quantity": 100}, |
|
{"id": 2, "name": "بند 2", "description": "وصف البند 2", "unit": "وحدة", "estimated_quantity": 200}, |
|
{"id": 3, "name": "بند 3", "description": "وصف البند 3", "unit": "وحدة", "estimated_quantity": 300} |
|
] |
|
|
|
|
|
self.analysis_results["amounts"] = [ |
|
{"type": "item_cost", "amount": 10000, "currency": "SAR", "description": "تكلفة البند 1"}, |
|
{"type": "item_cost", "amount": 20000, "currency": "SAR", "description": "تكلفة البند 2"}, |
|
{"type": "item_cost", "amount": 30000, "currency": "SAR", "description": "تكلفة البند 3"} |
|
] |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل مستند Excel: {str(e)}") |
|
raise |
|
|
|
def _analyze_txt(self, document_path, document_type): |
|
"""تحليل مستند نصي""" |
|
try: |
|
|
|
logger.info(f"تحليل مستند نصي: {document_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل مستند نصي: {str(e)}") |
|
raise |
|
|
|
def get_analysis_status(self): |
|
"""الحصول على حالة التحليل الحالي""" |
|
if not self.analysis_in_progress: |
|
if not self.analysis_results: |
|
return {"status": "لا يوجد تحليل جارٍ"} |
|
else: |
|
return {"status": self.analysis_results.get("status", "غير معروف")} |
|
|
|
return { |
|
"status": "جاري التحليل", |
|
"document_path": self.current_document, |
|
"start_time": self.analysis_results.get("analysis_start_time") |
|
} |
|
|
|
def get_analysis_results(self): |
|
"""الحصول على نتائج التحليل""" |
|
return self.analysis_results |
|
|
|
def export_analysis_results(self, output_path=None): |
|
"""تصدير نتائج التحليل إلى ملف JSON""" |
|
if not self.analysis_results: |
|
logger.warning("لا توجد نتائج تحليل للتصدير") |
|
return None |
|
|
|
if not output_path: |
|
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') |
|
filename = f"analysis_results_{timestamp}.json" |
|
output_path = os.path.join(self.documents_path, filename) |
|
|
|
try: |
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
json.dump(self.analysis_results, f, ensure_ascii=False, indent=4) |
|
|
|
logger.info(f"تم تصدير نتائج التحليل إلى: {output_path}") |
|
return output_path |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تصدير نتائج التحليل: {str(e)}") |
|
return None |
|
|
|
def import_analysis_results(self, input_path): |
|
"""استيراد نتائج التحليل من ملف JSON""" |
|
if not os.path.exists(input_path): |
|
logger.error(f"ملف نتائج التحليل غير موجود: {input_path}") |
|
return False |
|
|
|
try: |
|
with open(input_path, 'r', encoding='utf-8') as f: |
|
self.analysis_results = json.load(f) |
|
|
|
logger.info(f"تم استيراد نتائج التحليل من: {input_path}") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استيراد نتائج التحليل: {str(e)}") |
|
return False |
|
|
|
def _count_pages(self, document_path): |
|
"""حساب عدد صفحات المستند""" |
|
try: |
|
import PyPDF2 |
|
with open(document_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
return len(reader.pages) |
|
except: |
|
return 0 |
|
|
|
def _get_creation_date(self, document_path): |
|
"""استخراج تاريخ إنشاء المستند""" |
|
try: |
|
import PyPDF2 |
|
with open(document_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
if '/CreationDate' in reader.metadata: |
|
return reader.metadata['/CreationDate'] |
|
return "غير متوفر" |
|
except: |
|
return "غير متوفر" |
|
|
|
def _analyze_technical_specs(self, text): |
|
"""تحليل المواصفات الفنية""" |
|
specs = { |
|
"materials": self._extract_materials(text), |
|
"measurements": self._extract_measurements(text), |
|
"standards": self._extract_standards(text) |
|
} |
|
return specs |
|
|
|
def _extract_key_dates(self, text): |
|
"""استخراج التواريخ المهمة""" |
|
import re |
|
date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}' |
|
dates = re.findall(date_pattern, text) |
|
return list(set(dates)) |
|
|
|
def _extract_figures(self, text): |
|
"""استخراج الأرقام والقيم المهمة""" |
|
import re |
|
|
|
currency_pattern = r'[\d,]+\.?\d*\s*(?:ريال|دولار|SAR|USD)' |
|
currencies = re.findall(currency_pattern, text) |
|
|
|
|
|
percentage_pattern = r'\d+\.?\d*\s*%' |
|
percentages = re.findall(percentage_pattern, text) |
|
|
|
return { |
|
"currencies": currencies, |
|
"percentages": percentages |
|
} |
|
|
|
def _analyze_unique_terms(self, text): |
|
"""تحليل المصطلحات الفريدة""" |
|
words = set(text.split()) |
|
return list(words) |
|
|
|
def _calculate_complexity(self, text): |
|
"""حساب مستوى تعقيد النص""" |
|
words = text.split() |
|
avg_word_length = sum(len(word) for word in words) / len(words) |
|
sentences = text.split('.') |
|
avg_sentence_length = len(words) / len(sentences) |
|
|
|
|
|
complexity = min((avg_word_length * 0.5 + avg_sentence_length * 0.2), 10) |
|
return round(complexity, 2) |
|
|
|
def _check_missing_sections(self, text): |
|
"""التحقق من الأقسام المفقودة""" |
|
required_sections = [ |
|
"نطاق العمل", |
|
"المواصفات الفنية", |
|
"الشروط العامة", |
|
"الضمانات", |
|
"الغرامات", |
|
"شروط الدفع" |
|
] |
|
|
|
missing = [] |
|
for section in required_sections: |
|
if section not in text: |
|
missing.append(section) |
|
|
|
return missing |
|
|
|
def _find_related_documents(self, document_path): |
|
"""البحث عن المستندات المرتبطة""" |
|
directory = os.path.dirname(document_path) |
|
base_name = os.path.basename(document_path) |
|
related = [] |
|
|
|
for file in os.listdir(directory): |
|
if file != base_name and file.startswith(base_name.split('_')[0]): |
|
related.append(file) |
|
|
|
return related |
|
|