EGYADMIN's picture
Update modules/document_analysis/analyzer.py
92688b8 verified
raw
history blame
85.2 kB
"""
وحدة تحليل المستندات لنظام إدارة المناقصات - Hybrid Face
"""
import os
import re
import logging
import threading
from pathlib import Path
import datetime
import json
import base64
import time
from PIL import Image
import io
# تهيئة السجل
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('document_analysis')
class DocumentAnalyzer:
"""فئة تحليل المستندات"""
def __init__(self, config=None):
"""تهيئة محلل المستندات"""
self.config = config
self.analysis_in_progress = False
self.current_document = None
self.analysis_results = {}
# إنشاء مجلد المستندات إذا لم يكن موجوداً
if config and hasattr(config, 'DOCUMENTS_PATH'):
self.documents_path = Path(config.DOCUMENTS_PATH)
else:
self.documents_path = Path('data/documents')
if not self.documents_path.exists():
self.documents_path.mkdir(parents=True, exist_ok=True)
def analyze_document(self, document_path, document_type="tender", callback=None):
"""تحليل مستند"""
if self.analysis_in_progress:
logger.warning("هناك عملية تحليل جارية بالفعل")
return False
if not os.path.exists(document_path):
logger.error(f"المستند غير موجود: {document_path}")
return False
self.analysis_in_progress = True
self.current_document = document_path
self.analysis_results = {
"document_path": document_path,
"document_type": document_type,
"analysis_start_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"status": "جاري التحليل",
"items": [],
"entities": [],
"dates": [],
"amounts": [],
"risks": []
}
# بدء التحليل في خيط منفصل
thread = threading.Thread(
target=self._analyze_document_thread,
args=(document_path, document_type, callback)
)
thread.daemon = True
thread.start()
return True
def _analyze_document_thread(self, document_path, document_type, callback):
"""خيط تحليل المستند"""
try:
# تحديد نوع المستند
file_extension = os.path.splitext(document_path)[1].lower()
if file_extension == '.pdf':
self.analysis_results = self._analyze_pdf(document_path, document_type)
elif file_extension == '.docx':
self._analyze_docx(document_path, document_type)
elif file_extension == '.xlsx':
self._analyze_xlsx(document_path, document_type)
elif file_extension == '.txt':
self._analyze_txt(document_path, document_type)
else:
logger.error(f"نوع المستند غير مدعوم: {file_extension}")
self.analysis_results["status"] = "فشل التحليل"
self.analysis_results["error"] = "نوع المستند غير مدعوم"
# تحديث حالة التحليل
if self.analysis_results["status"] != "فشل التحليل":
self.analysis_results["status"] = "اكتمل التحليل"
self.analysis_results["analysis_end_time"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"اكتمل تحليل المستند: {document_path}")
except Exception as e:
logger.error(f"خطأ في تحليل المستند: {str(e)}")
self.analysis_results["status"] = "فشل التحليل"
self.analysis_results["error"] = str(e)
finally:
self.analysis_in_progress = False
# استدعاء دالة الاستجابة إذا تم توفيرها
if callback and callable(callback):
callback(self.analysis_results)
def _analyze_pdf(self, document_path, document_type):
"""تحليل مستند PDF باستخدام الذكاء الاصطناعي"""
try:
# استخراج النص من PDF
text = self._extract_text_from_pdf(document_path)
# تحليل متقدم للمستند
analysis = {
"document_path": document_path,
"document_type": document_type,
"analysis_start_time": self.analysis_results["analysis_start_time"],
"status": "جاري التحليل",
"file_info": {
"name": os.path.basename(document_path),
"type": "PDF",
"size": os.path.getsize(document_path),
"pages": self._count_pages(document_path),
"create_date": self._get_creation_date(document_path),
"modify_date": time.ctime(os.path.getmtime(document_path))
},
"content_analysis": {
"contract_terms": self._analyze_contract_terms(text),
"financial_analysis": self._analyze_financial_terms(text),
"legal_analysis": self._analyze_legal_terms(text),
"risk_analysis": self._analyze_risks(text),
"conditions_analysis": self._analyze_conditions(text),
"technical_specs": self._analyze_technical_specs(text),
"key_dates": self._extract_key_dates(text),
"important_figures": self._extract_figures(text),
"entities": self._extract_entities(text)
},
"statistical_analysis": {
"word_count": len(text.split()),
"unique_terms": self._analyze_unique_terms(text),
"topic_distribution": self._analyze_topics(text),
"complexity_score": self._calculate_complexity(text)
},
"compliance_check": {
"missing_sections": self._check_missing_sections(text),
"required_terms": self._check_required_terms(text),
"compliance_score": self._calculate_compliance_score(text)
},
"summary": self._generate_summary(text),
"recommendations": self._generate_recommendations(text),
"related_documents": self._find_related_documents(document_path),
"version_info": self._get_version_info(document_path)
}
# إضافة تحليل متخصص حسب نوع المستند
if document_type == "tender":
analysis["tender_analysis"] = self._analyze_tender_specifics(text)
elif document_type == "contract":
analysis["contract_analysis"] = self._analyze_contract_specifics(text)
elif document_type == "technical":
analysis["technical_analysis"] = self._analyze_technical_specifics(text)
return analysis
except Exception as e:
logger.error(f"خطأ في تحليل PDF: {str(e)}")
raise
def extract_document_metadata(self, document_path):
"""استخراج البيانات الوصفية للمستند"""
try:
# تحديد نوع المستند
file_extension = os.path.splitext(document_path)[1].lower()
metadata = {
"filename": os.path.basename(document_path),
"file_type": file_extension.replace('.', '').upper(),
"file_size": os.path.getsize(document_path),
"creation_date": "غير متوفر",
"modification_date": time.ctime(os.path.getmtime(document_path)),
"author": "غير متوفر",
"title": "غير متوفر"
}
# استخراج البيانات الوصفية حسب نوع المستند
if file_extension == '.pdf':
pdf_metadata = self._extract_pdf_metadata(document_path)
metadata.update(pdf_metadata)
elif file_extension == '.docx':
docx_metadata = self._extract_docx_metadata(document_path)
metadata.update(docx_metadata)
elif file_extension == '.xlsx':
xlsx_metadata = self._extract_xlsx_metadata(document_path)
metadata.update(xlsx_metadata)
return metadata
except Exception as e:
logger.error(f"خطأ في استخراج البيانات الوصفية: {str(e)}")
return None
def _extract_pdf_metadata(self, document_path):
"""استخراج البيانات الوصفية من ملف PDF"""
try:
import PyPDF2
metadata = {}
with open(document_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
# استخراج البيانات الوصفية المتاحة
if reader.metadata:
if '/Title' in reader.metadata:
metadata["title"] = reader.metadata['/Title']
if '/Author' in reader.metadata:
metadata["author"] = reader.metadata['/Author']
if '/CreationDate' in reader.metadata:
metadata["creation_date"] = reader.metadata['/CreationDate']
if '/ModDate' in reader.metadata:
metadata["modification_date"] = reader.metadata['/ModDate']
if '/Producer' in reader.metadata:
metadata["producer"] = reader.metadata['/Producer']
if '/Creator' in reader.metadata:
metadata["creator"] = reader.metadata['/Creator']
# إضافة عدد الصفحات
metadata["pages"] = len(reader.pages)
return metadata
except Exception as e:
logger.error(f"خطأ في استخراج البيانات الوصفية من PDF: {str(e)}")
return {}
def compare_documents(self, document_path1, document_path2):
"""مقارنة مستندين"""
try:
# تحليل المستندين
self.analyze_document(document_path1)
analysis1 = self.get_analysis_results()
self.analyze_document(document_path2)
analysis2 = self.get_analysis_results()
# مقارنة نتائج التحليل
comparison = {
"document1": {
"path": document_path1,
"file_info": analysis1.get("file_info", {})
},
"document2": {
"path": document_path2,
"file_info": analysis2.get("file_info", {})
},
"differences": self._find_document_differences(analysis1, analysis2),
"similarity_score": self._calculate_similarity_score(analysis1, analysis2)
}
return comparison
except Exception as e:
logger.error(f"خطأ في مقارنة المستندات: {str(e)}")
return None
def _find_document_differences(self, analysis1, analysis2):
"""العثور على الاختلافات بين تحليلين"""
differences = {}
# مقارنة البنود
if "items" in analysis1 and "items" in analysis2:
items1 = set(item["name"] for item in analysis1["items"] if "name" in item)
items2 = set(item["name"] for item in analysis2["items"] if "name" in item)
differences["items"] = {
"only_in_doc1": list(items1 - items2),
"only_in_doc2": list(items2 - items1),
"common": list(items1.intersection(items2))
}
# مقارنة الكيانات
if "entities" in analysis1 and "entities" in analysis2:
entities1 = set(entity for entity in analysis1["entities"])
entities2 = set(entity for entity in analysis2["entities"])
differences["entities"] = {
"only_in_doc1": list(entities1 - entities2),
"only_in_doc2": list(entities2 - entities1),
"common": list(entities1.intersection(entities2))
}
# مقارنة التواريخ
if "dates" in analysis1 and "dates" in analysis2:
dates1 = set(date for date in analysis1["dates"])
dates2 = set(date for date in analysis2["dates"])
differences["dates"] = {
"only_in_doc1": list(dates1 - dates2),
"only_in_doc2": list(dates2 - dates1),
"common": list(dates1.intersection(dates2))
}
# مقارنة المبالغ
if "amounts" in analysis1 and "amounts" in analysis2:
amounts1 = set(amount["amount"] for amount in analysis1["amounts"] if "amount" in amount)
amounts2 = set(amount["amount"] for amount in analysis2["amounts"] if "amount" in amount)
differences["amounts"] = {
"only_in_doc1": list(amounts1 - amounts2),
"only_in_doc2": list(amounts2 - amounts1),
"common": list(amounts1.intersection(amounts2))
}
return differences
def _calculate_similarity_score(self, analysis1, analysis2):
"""حساب درجة التشابه بين تحليلين"""
# محاكاة بسيطة لحساب درجة التشابه
similarity_score = 0
total_factors = 0
# التشابه في البنود
if "items" in analysis1 and "items" in analysis2:
items1 = set(item["name"] for item in analysis1["items"] if "name" in item)
items2 = set(item["name"] for item in analysis2["items"] if "name" in item)
if items1 or items2: # تجنب القسمة على صفر
similarity_score += len(items1.intersection(items2)) / max(len(items1.union(items2)), 1)
total_factors += 1
# التشابه في الكيانات
if "entities" in analysis1 and "entities" in analysis2:
entities1 = set(entity for entity in analysis1["entities"])
entities2 = set(entity for entity in analysis2["entities"])
if entities1 or entities2:
similarity_score += len(entities1.intersection(entities2)) / max(len(entities1.union(entities2)), 1)
total_factors += 1
# التشابه في التواريخ
if "dates" in analysis1 and "dates" in analysis2:
dates1 = set(date for date in analysis1["dates"])
dates2 = set(date for date in analysis2["dates"])
if dates1 or dates2:
similarity_score += len(dates1.intersection(dates2)) / max(len(dates1.union(dates2)), 1)
total_factors += 1
# التشابه في المبالغ
if "amounts" in analysis1 and "amounts" in analysis2:
amounts1 = set(amount["amount"] for amount in analysis1["amounts"] if "amount" in amount)
amounts2 = set(amount["amount"] for amount in analysis2["amounts"] if "amount" in amount)
if amounts1 or amounts2:
similarity_score += len(amounts1.intersection(amounts2)) / max(len(amounts1.union(amounts2)), 1)
total_factors += 1
# حساب المتوسط
if total_factors > 0:
similarity_percentage = (similarity_score / total_factors) * 100
return round(similarity_percentage, 2)
else:
return 0.0
def generate_report(self, analysis_results=None, report_format="html"):
"""توليد تقرير من نتائج التحليل"""
try:
# استخدام نتائج التحليل الحالية إذا لم يتم توفير نتائج
if analysis_results is None:
analysis_results = self.analysis_results
if not analysis_results:
logger.warning("لا توجد نتائج تحليل لتوليد تقرير")
return None
# توليد التقرير حسب الصيغة المطلوبة
if report_format.lower() == "html":
return self._generate_html_report(analysis_results)
elif report_format.lower() == "pdf":
return self._
def _extract_docx_metadata(self, document_path):
"""استخراج البيانات الوصفية من ملف Word"""
try:
import docx
metadata = {}
doc = docx.Document(document_path)
# استخراج البيانات الوصفية المتاحة
core_properties = doc.core_properties
if core_properties.title:
metadata["title"] = core_properties.title
if core_properties.author:
metadata["author"] = core_properties.author
if core_properties.created:
metadata["creation_date"] = str(core_properties.created)
if core_properties.modified:
metadata["modification_date"] = str(core_properties.modified)
if core_properties.last_modified_by:
metadata["last_modified_by"] = core_properties.last_modified_by
if core_properties.revision:
metadata["revision"] = core_properties.revision
# إضافة عدد الصفحات (تقريبي)
text_length = sum(len(paragraph.text) for paragraph in doc.paragraphs)
estimated_pages = max(1, text_length // 3000)
metadata["pages"] = estimated_pages
return metadata
except Exception as e:
logger.error(f"خطأ في استخراج البيانات الوصفية من Word: {str(e)}")
return {}
def _extract_xlsx_metadata(self, document_path):
"""استخراج البيانات الوصفية من ملف Excel"""
try:
import openpyxl
metadata = {}
workbook = openpyxl.load_workbook(document_path, read_only=True)
# استخراج البيانات الوصفية المتاحة
if workbook.properties:
if workbook.properties.title:
metadata["title"] = workbook.properties.title
if workbook.properties.creator:
metadata["author"] = workbook.properties.creator
if workbook.properties.created:
metadata["creation_date"] = str(workbook.properties.created)
if workbook.properties.modified:
metadata["modification_date"] = str(workbook.properties.modified)
if workbook.properties.lastModifiedBy:
metadata["last_modified_by"] = workbook.properties.lastModifiedBy
if workbook.properties.revision:
metadata["revision"] = workbook.properties.revision
# إضافة عدد الأوراق
metadata["sheets"] = len(workbook.sheetnames)
metadata["sheet_names"] = workbook.sheetnames
return metadata
except Exception as e:
logger.error(f"خطأ في استخراج البيانات الوصفية من Excel: {str(e)}")
return {}
def _extract_text_from_pdf(self, document_path):
"""استخراج النص من ملف PDF"""
try:
import PyPDF2
text = ""
with open(document_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page in reader.pages:
text += page.extract_text() + "\n"
return text
except Exception as e:
logger.error(f"خطأ في استخراج النص من PDF: {str(e)}")
raise
def _analyze_contract_terms(self, text):
"""تحليل بنود العقد"""
terms = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['شروط', 'بند', 'يلتزم', 'يجب']):
terms.append(section.strip())
return terms
def _analyze_financial_terms(self, text):
"""تحليل الجزء المالي"""
financial_terms = []
# البحث عن الأقسام المالية
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مالي', 'تكلفة', 'سعر', 'ميزانية', 'دفع']):
financial_terms.append(section.strip())
# استخراج المبالغ المالية
amounts = self._extract_monetary_amounts(text)
return {
"sections": financial_terms,
"amounts": amounts,
"payment_terms": self._extract_payment_terms(text),
"budget_allocation": self._extract_budget_allocation(text)
}
def _extract_monetary_amounts(self, text):
"""استخراج المبالغ المالية من النص"""
import re
# نمط للبحث عن المبالغ المالية بالريال السعودي والدولار الأمريكي
pattern = r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)\s*(?:ريال|دولار|SAR|USD|ر\.س|\$)'
matches = re.findall(pattern, text)
return [float(amount.replace(',', '')) for amount in matches]
def _extract_payment_terms(self, text):
"""استخراج شروط الدفع"""
payment_terms = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['دفع', 'سداد', 'أقساط', 'مستحقات']):
payment_terms.append(section.strip())
return payment_terms
def _extract_budget_allocation(self, text):
"""استخراج تخصيص الميزانية"""
# هذه وظيفة بسيطة لاستخراج تخصيص الميزانية
# في التطبيق الحقيقي، قد تحتاج إلى تحليل أكثر تعقيدًا
budget_items = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['ميزانية', 'تخصيص', 'تمويل']):
budget_items.append(section.strip())
return budget_items
def _analyze_legal_terms(self, text):
"""تحليل القانوني للعقد"""
legal_terms = []
# البحث عن الأقسام القانونية
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['قانون', 'تشريع', 'نظام', 'حكم', 'قضاء', 'محكمة']):
legal_terms.append(section.strip())
return {
"sections": legal_terms,
"liability_clauses": self._extract_liability_clauses(text),
"dispute_resolution": self._extract_dispute_resolution(text),
"legal_references": self._extract_legal_references(text)
}
def _extract_liability_clauses(self, text):
"""استخراج بنود المسؤولية"""
liability_clauses = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مسؤولية', 'التزام', 'ضمان', 'تعويض']):
liability_clauses.append(section.strip())
return liability_clauses
def _extract_dispute_resolution(self, text):
"""استخراج آلية فض النزاعات"""
dispute_clauses = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['نزاع', 'خلاف', 'تحكيم', 'قضاء', 'تسوية']):
dispute_clauses.append(section.strip())
return dispute_clauses
def _extract_legal_references(self, text):
"""استخراج المراجع القانونية"""
import re
# نمط للبحث عن المراجع القانونية مثل أرقام القوانين واللوائح
pattern = r'قانون رقم \d+|لائحة \d+|نظام \d+|مرسوم \d+'
return re.findall(pattern, text)
def _analyze_risks(self, text):
"""تحليل المخاطر"""
risk_factors = []
# البحث عن الأقسام المتعلقة بالمخاطر
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مخاطر', 'خطر', 'تهديد', 'ضرر', 'إخلال']):
risk_factors.append(section.strip())
# تصنيف المخاطر
risk_categories = {
"financial_risks": self._extract_financial_risks(text),
"operational_risks": self._extract_operational_risks(text),
"legal_risks": self._extract_legal_risks(text),
"technical_risks": self._extract_technical_risks(text)
}
# تقييم شدة المخاطر
risk_severity = self._assess_risk_severity(risk_factors)
return {
"risk_factors": risk_factors,
"risk_categories": risk_categories,
"risk_severity": risk_severity,
"mitigation_suggestions": self._suggest_risk_mitigation(risk_factors)
}
def _extract_financial_risks(self, text):
"""استخراج المخاطر المالية"""
financial_risks = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مخاطر مالية', 'خسارة', 'تكلفة إضافية', 'غرامة']):
financial_risks.append(section.strip())
return financial_risks
def _extract_operational_risks(self, text):
"""استخراج المخاطر التشغيلية"""
operational_risks = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مخاطر تشغيلية', 'توقف', 'تعطل', 'تأخير']):
operational_risks.append(section.strip())
return operational_risks
def _extract_legal_risks(self, text):
"""استخراج المخاطر القانونية"""
legal_risks = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مخاطر قانونية', 'نزاع', 'مخالفة', 'تقاضي']):
legal_risks.append(section.strip())
return legal_risks
def _extract_technical_risks(self, text):
"""استخراج المخاطر الفنية"""
technical_risks = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مخاطر فنية', 'عطل', 'خلل', 'تقني']):
technical_risks.append(section.strip())
return technical_risks
def _assess_risk_severity(self, risk_factors):
"""تقييم شدة المخاطر"""
# في التطبيق الحقيقي، ستحتاج إلى تحليل أكثر تعقيدًا
# هذه مجرد محاكاة بسيطة
severity_scores = []
for risk in risk_factors:
# تقييم بسيط بناءً على طول النص والكلمات الرئيسية
score = len(risk) / 100 # كلما كان النص أطول، كلما كانت المخاطر أكثر تفصيلاً
# زيادة الدرجة بناءً على كلمات مفتاحية تدل على شدة الخطر
severe_keywords = ['خطير', 'شديد', 'كبير', 'جسيم', 'عالي']
for keyword in severe_keywords:
if keyword in risk.lower():
score += 1
severity_scores.append(min(score, 10)) # تحديد سقف للدرجة
# متوسط درجة الشدة
average_severity = sum(severity_scores) / len(severity_scores) if severity_scores else 0
# تصنيف المخاطر بناءً على متوسط الشدة
if average_severity >= 7:
return "عالية"
elif average_severity >= 4:
return "متوسطة"
else:
return "منخفضة"
def _suggest_risk_mitigation(self, risk_factors):
"""اقتراح آليات تخفيف المخاطر"""
mitigations = []
# في التطبيق الحقيقي، ستحتاج إلى محرك استدلال أكثر تعقيدًا
# هذه مجرد اقتراحات عامة
if any("مالي" in risk for risk in risk_factors):
mitigations.append("ضمانات مالية وتأمين لتغطية المخاطر المالية")
if any("تأخير" in risk for risk in risk_factors):
mitigations.append("وضع جداول زمنية مرنة وخطط بديلة للطوارئ")
if any("قانوني" in risk for risk in risk_factors):
mitigations.append("مراجعة قانونية شاملة للعقد وبنوده")
if any("فني" in risk for risk in risk_factors):
mitigations.append("اختبارات فنية مسبقة وضمانات للأداء الفني")
# إضافة توصيات عامة إذا لم يتم العثور على مخاطر محددة
if not mitigations:
mitigations = [
"وضع خطة إدارة مخاطر شاملة",
"تحديد مسؤوليات الأطراف بوضوح",
"وضع آليات للمتابعة والتقييم الدوري",
"توفير ضمانات مالية وفنية كافية"
]
return mitigations
def _analyze_conditions(self, text):
"""دراسة كراسة الشروط"""
conditions = []
# البحث عن الأقسام المتعلقة بالشروط
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['شروط', 'متطلبات', 'معايير', 'مواصفات']):
conditions.append(section.strip())
# تصنيف الشروط
categorized_conditions = {
"general_conditions": self._extract_general_conditions(text),
"technical_conditions": self._extract_technical_conditions(text),
"administrative_conditions": self._extract_administrative_conditions(text),
"financial_conditions": self._extract_financial_conditions(text)
}
# تقييم مدى اكتمال الشروط ووضوحها
completeness_score = self._assess_conditions_completeness(conditions)
clarity_score = self._assess_conditions_clarity(conditions)
return {
"conditions_list": conditions,
"categorized_conditions": categorized_conditions,
"completeness_score": completeness_score,
"clarity_score": clarity_score,
"improvement_suggestions": self._suggest_conditions_improvements(conditions)
}
def _extract_general_conditions(self, text):
"""استخراج الشروط العامة"""
general_conditions = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['شروط عامة', 'أحكام عامة']):
general_conditions.append(section.strip())
return general_conditions
def _extract_technical_conditions(self, text):
"""استخراج الشروط الفنية"""
technical_conditions = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['شروط فنية', 'مواصفات فنية', 'متطلبات فنية']):
technical_conditions.append(section.strip())
return technical_conditions
def _extract_administrative_conditions(self, text):
"""استخراج الشروط الإدارية"""
admin_conditions = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['شروط إدارية', 'متطلبات إدارية']):
admin_conditions.append(section.strip())
return admin_conditions
def _extract_financial_conditions(self, text):
"""استخراج الشروط المالية"""
financial_conditions = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['شروط مالية', 'متطلبات مالية']):
financial_conditions.append(section.strip())
return financial_conditions
def _assess_conditions_completeness(self, conditions):
"""تقييم اكتمال الشروط"""
# تحقق من وجود جميع أنواع الشروط الرئيسية
required_categories = ['عامة', 'فنية', 'إدارية', 'مالية']
coverage = 0
for category in required_categories:
if any(category in condition.lower() for condition in conditions):
coverage += 1
# حساب نسبة التغطية
completeness_score = (coverage / len(required_categories)) * 10
return min(round(completeness_score, 1), 10) # تحديد سقف للدرجة
def _assess_conditions_clarity(self, conditions):
"""تقييم وضوح الشروط"""
# في التطبيق الحقيقي، ستحتاج إلى تحليل لغوي أكثر تعقيدًا
# هذه مجرد محاكاة بسيطة
clarity_scores = []
for condition in conditions:
# تقييم بسيط بناءً على وضوح النص
score = 10 # نبدأ بدرجة كاملة
# تقليل الدرجة بناءً على كلمات غامضة
ambiguous_terms = ['ربما', 'قد', 'يمكن', 'محتمل', 'حسب الاقتضاء', 'في بعض الحالات']
for term in ambiguous_terms:
if term in condition.lower():
score -= 1
# تقليل الدرجة إذا كان النص طويلًا جدًا
if len(condition) > 500:
score -= 2
clarity_scores.append(max(score, 1)) # الحد الأدنى للدرجة هو 1
# متوسط درجة الوضوح
average_clarity = sum(clarity_scores) / len(clarity_scores) if clarity_scores else 0
return round(average_clarity, 1)
def _suggest_conditions_improvements(self, conditions):
"""اقتراح تحسينات للشروط"""
suggestions = []
# اقتراحات عامة لتحسين الشروط
if not any('عامة' in condition.lower() for condition in conditions):
suggestions.append("إضافة قسم للشروط العامة يوضح نطاق العمل والمسؤوليات العامة")
if not any('فنية' in condition.lower() for condition in conditions):
suggestions.append("إضافة قسم للشروط الفنية يحدد المواصفات والمتطلبات الفنية بدقة")
if not any('إدارية' in condition.lower() for condition in conditions):
suggestions.append("إضافة قسم للشروط الإدارية يوضح الإجراءات والمتطلبات الإدارية")
if not any('مالية' in condition.lower() for condition in conditions):
suggestions.append("إضافة قسم للشروط المالية يحدد الالتزامات المالية وآليات الدفع")
# اقتراحات للشروط الموجودة
ambiguous_conditions = []
for condition in conditions:
if any(term in condition.lower() for term in ['ربما', 'قد', 'يمكن', 'محتمل']):
ambiguous_conditions.append(condition)
if ambiguous_conditions:
suggestions.append("توضيح الشروط الغامضة وتحديد المتطلبات بدقة أكبر")
# إضافة توصيات عامة إذا لم يتم العثور على مشاكل محددة
if not suggestions:
suggestions = [
"تنظيم الشروط في أقسام منفصلة وواضحة",
"استخدام لغة بسيطة ومباشرة في صياغة الشروط",
"تحديد المعايير الكمية والنوعية بدقة",
"تضمين آليات لحل النزاعات في حالة الاختلاف حول تفسير الشروط"
]
return suggestions
def _generate_summary(self, text):
"""توليد ملخص"""
# في التطبيق الحقيقي، ستحتاج إلى استخدام تقنيات معالجة اللغة الطبيعية
# لتلخيص النص بشكل ذكي. هذه مجرد محاكاة بسيطة.
# استخراج الجمل المهمة من النص
important_sentences = []
sentences = text.split('.')
# البحث عن جمل مهمة بناءً على كلمات مفتاحية
key_terms = ['شروط', 'بنود', 'التزامات', 'متطلبات', 'تكلفة', 'مدة', 'ضمان', 'غرامة']
for sentence in sentences:
if any(term in sentence.lower() for term in key_terms):
important_sentences.append(sentence.strip())
# اختيار عدد محدود من الجمل للملخص
max_sentences = min(10, len(important_sentences))
summary_sentences = important_sentences[:max_sentences]
# دمج الجمل في ملخص
summary = '. '.join(summary_sentences)
# إضافة خاتمة موجزة
summary += f"\n\nيتكون المستند من {len(sentences)} جملة وتم تلخيصه في {len(summary_sentences)} جمل رئيسية."
return summary
def _generate_recommendations(self, text):
"""توليد التوصيات"""
# في التطبيق الحقيقي، ستحتاج إلى تحليل أكثر تعقيدًا
# هذه مجرد توصيات عامة بناءً على المحتوى
recommendations = []
# توصيات بناءً على وجود أو غياب أقسام معينة
if 'شروط' not in text.lower():
recommendations.append("إضافة قسم واضح للشروط العامة والخاصة")
if 'مواصفات فنية' not in text.lower():
recommendations.append("توضيح المواصفات الفنية المطلوبة بشكل مفصل")
if 'غرامات' not in text.lower():
recommendations.append("تحديد الغرامات والجزاءات بوضوح في حالة عدم الالتزام")
if 'ضمان' not in text.lower():
recommendations.append("تضمين بنود الضمان والصيانة بشكل واضح")
# توصيات بناءً على تحليل المخاطر
risks = self._analyze_risks(text)
if risks["risk_severity"] == "عالية":
recommendations.append("مراجعة بنود العقد للتقليل من المخاطر العالية المحددة في التحليل")
# توصيات بناءً على تحليل الشروط
conditions = self._analyze_conditions(text)
if conditions["clarity_score"] < 7:
recommendations.append("تحسين صياغة الشروط لزيادة الوضوح وتقليل الغموض")
# توصيات عامة
general_recommendations = [
"مراجعة العقد من قبل مستشار قانوني متخصص",
"التأكد من توافق البنود مع الأنظمة واللوائح الحالية",
"تضمين آليات واضحة لحل النزاعات",
"تحديد مسؤوليات كل طرف بشكل صريح",
"وضع جداول زمنية واضحة للتنفيذ ومؤشرات للأداء"
]
# دمج التوصيات
recommendations.extend(general_recommendations)
return recommendations
def _analyze_tender_specifics(self, text):
"""تحليل خاص بالمناقصات"""
return {
"eligibility_criteria": self._extract_eligibility_criteria(text),
"submission_requirements": self._extract_submission_requirements(text),
"evaluation_criteria": self._extract_evaluation_criteria(text),
"timeline": self._extract_tender_timeline(text)
}
def _extract_eligibility_criteria(self, text):
"""استخراج معايير الأهلية"""
criteria = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['أهلية', 'شروط المشاركة', 'متطلبات التأهيل']):
criteria.append(section.strip())
return criteria
def _extract_submission_requirements(self, text):
"""استخراج متطلبات تقديم العروض"""
requirements = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['تقديم العروض', 'متطلبات العرض', 'مستندات']):
requirements.append(section.strip())
return requirements
def _extract_evaluation_criteria(self, text):
"""استخراج معايير التقييم"""
criteria = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['معايير التقييم', 'آلية التقييم', 'ترسية']):
criteria.append(section.strip())
return criteria
def _extract_tender_timeline(self, text):
"""استخراج الجدول الزمني للمناقصة"""
import re
timeline = {}
# البحث عن تواريخ محددة مثل تاريخ الإعلان، تاريخ الإغلاق، إلخ.
date_pattern = r'(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})'
# تاريخ الإعلان
announcement_match = re.search(r'تاريخ الإعلان\s*[:؛]\s*' + date_pattern, text)
if announcement_match:
timeline["announcement_date"] = announcement_match.group(1)
# تاريخ بدء استلام العروض
start_submission_match = re.search(r'بدء استلام العروض\s*[:؛]\s*' + date_pattern, text)
if start_submission_match:
timeline["submission_start_date"] = start_submission_match.group(1)
# تاريخ إغلاق استلام العروض
end_submission_match = re.search(r'إغلاق استلام العروض\s*[:؛]\s*' + date_pattern, text)
if end_submission_match:
timeline["submission_end_date"] = end_submission_match.group(1)
# تاريخ فتح المظاريف
opening_match = re.search(r'فتح المظاريف\s*[:؛]\s*' + date_pattern, text)
if opening_match:
timeline["opening_date"] = opening_match.group(1)
# تاريخ التقييم
evaluation_match = re.search(r'تاريخ التقييم\s*[:؛]\s*' + date_pattern, text)
if evaluation_match:
timeline["evaluation_date"] = evaluation_match.group(1)
# تاريخ الترسية
award_match = re.search(r'تاريخ الترسية\s*[:؛]\s*' + date_pattern, text)
if award_match:
timeline["award_date"] = award_match.group(1)
return timeline
def _analyze_contract_specifics(self, text):
"""تحليل خاص بالعقود"""
return {
"parties": self._extract_contract_parties(text),
"duration": self._extract_contract_duration(text),
"termination_conditions": self._extract_termination_conditions(text),
"penalties": self._extract_penalties(text),
"warranties": self._extract_warranties(text)
}
def _extract_contract_parties(self, text):
"""استخراج أطراف العقد"""
parties = {}
# البحث عن الطرف الأول
first_party_match = re.search(r'الطرف الأول\s*[:؛]\s*([^\n]+)', text)
if first_party_match:
parties["first_party"] = first_party_match.group(1).strip()
# البحث عن الطرف الثاني
second_party_match = re.search(r'الطرف الثاني\s*[:؛]\s*([^\n]+)', text)
if second_party_match:
parties["second_party"] = second_party_match.group(1).strip()
return parties
def _extract_contract_duration(self, text):
"""استخراج مدة العقد"""
duration = {}
# البحث عن مدة العقد
duration_match = re.search(r'مدة العقد\s*[:؛]\s*([^\n]+)', text)
if duration_match:
duration["text"] = duration_match.group(1).strip()
# البحث عن تاريخ بداية العقد
start_date_match = re.search(r'تاريخ بداية العقد\s*[:؛]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', text)
if start_date_match:
duration["start_date"] = start_date_match.group(1)
# البحث عن تاريخ نهاية العقد
end_date_match = re.search(r'تاريخ نهاية العقد\s*[:؛]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', text)
if end_date_match:
duration["end_date"] = end_date_match.group(1)
return duration
def _extract_termination_conditions(self, text):
"""استخراج شروط إنهاء العقد"""
conditions = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['إنهاء العقد', 'فسخ العقد', 'إلغاء العقد']):
conditions.append(section.strip())
return conditions
def _extract_penalties(self, text):
"""استخراج الغرامات والجزاءات"""
penalties = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['غرامة', 'جزاء', 'عقوبة', 'تعويض']):
penalties.append(section.strip())
return penalties
def _extract_warranties(self, text):
"""استخراج الضمانات"""
warranties = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['ضمان', 'كفالة', 'تأمين']):
warranties.append(section.strip())
return warranties
def _analyze_technical_specifics(self, text):
"""تحليل خاص بالمستندات الفنية"""
return {
"specifications": self._extract_technical_specifications(text),
"standards": self._extract_technical_standards(text),
"testing_procedures": self._extract_testing_procedures(text),
"quality_requirements": self._extract_quality_requirements(text)
}
def _extract_technical_specifications(self, text):
"""استخراج المواصفات الفنية"""
specifications = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['مواصفات فنية', 'خصائص', 'متطلبات فنية']):
specifications.append(section.strip())
return specifications
def _extract_technical_standards(self, text):
"""استخراج المعايير الفنية"""
standards = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['معايير', 'مقاييس', 'مواصفات قياسية']):
standards.append(section.strip())
return standards
def _extract_testing_procedures(self, text):
"""استخراج إجراءات الاختبار"""
procedures = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['اختبار', 'فحص', 'تجربة']):
procedures.append(section.strip())
return procedures
def _extract_quality_requirements(self, text):
"""استخراج متطلبات الجودة"""
requirements = []
sections = text.split('\n\n')
for section in sections:
if any(keyword in section.lower() for keyword in ['جودة', 'ضمان الجودة', 'رقابة']):
requirements.append(section.strip())
return requirements
def _extract_entities(self, text):
"""استخراج الكيانات من النص"""
entities = {
"organizations": self._extract_organizations(text),
"people": self._extract_people(text),
"locations": self._extract_locations(text)
}
return entities
def _extract_organizations(self, text):
"""استخراج المنظمات والشركات"""
import re
# نمط بسيط للبحث عن المنظمات والشركات
org_pattern = r'شركة [\u0600-\u06FF\s]+|مؤسسة [\u0600-\u06FF\s]+|وزارة [\u0600-\u06FF\s]+|هيئة [\u0600-\u06FF\s]+'
return list(set(re.findall(org_pattern, text)))
def _extract_people(self, text):
"""استخراج أسماء الأشخاص"""
# في التطبيق الحقيقي، ستحتاج إلى استخدام تقنيات التعرف على الكيانات
# هذه مجرد محاكاة بسيطة
return []
def _extract_locations(self, text):
"""استخراج المواقع"""
import re
# نمط بسيط للبحث عن المواقع
location_pattern = r'مدينة [\u0600-\u06FF\s]+|منطقة [\u0600-\u06FF\s]+|محافظة [\u0600-\u06FF\s]+'
return list(set(re.findall(location_pattern, text)))
def _extract_materials(self, text):
"""استخراج المواد"""
materials = []
# في التطبيق الحقيقي، ستحتاج إلى قائمة بالمواد الشائعة للبحث عنها
common_materials = ['حديد', 'خشب', 'زجاج', 'ألمنيوم', 'نحاس', 'بلاستيك', 'خرسانة']
for material in common_materials:
if material in text.lower():
# البحث عن السياق المحيط بالمادة
pattern = r'[^.]*\b' + material + r'\b[^.]*\.'
material_contexts = re.findall(pattern, text)
for context in material_contexts:
materials.append(context.strip())
return materials
def _extract_measurements(self, text):
"""استخراج القياسات"""
import re
# البحث عن القياسات مثل الطول والعرض والوزن وغيرها
measurement_pattern = r'\d+(?:\.\d+)?\s*(?:متر|سم|مم|كجم|طن|لتر|مل)'
return re.findall(measurement_pattern, text)
def _extract_standards(self, text):
"""استخراج المعايير"""
standards = []
# معايير شائعة للبحث عنها
common_standards = ['ISO', 'SASO', 'ASTM', 'BS', 'DIN', 'IEC']
for standard in common_standards:
if standard in text:
# البحث عن المعيار مع رقمه
pattern = r'\b' + standard + r'\s*\d+\b'
standard_matches = re.findall(pattern, text)
standards.extend(standard_matches)
return standards
def _analyze_topics(self, text):
"""تحليل المواضيع الرئيسية"""
# في التطبيق الحقيقي، ستحتاج إلى استخدام تقنيات تحليل المواضيع مثل LDA
# هذه مجرد محاكاة بسيطة
topics = {}
# مواضيع شائعة في المناقصات والعقود
common_topics = {
"financial": ['سعر', 'تكلفة', 'ميزانية', 'دفع', 'مالي'],
"technical": ['فني', 'مواصفات', 'معايير', 'تقني'],
"legal": ['قانوني', 'شرط', 'بند', 'التزام', 'حق'],
"administrative": ['إداري', 'إجراء', 'تنظيم', 'إشراف'],
"time": ['مدة', 'فترة', 'موعد', 'تاريخ', 'جدول زمني']
}
# حساب تكرار كلمات كل موضوع في النص
word_count = len(text.split())
for topic, keywords in common_topics.items():
topic_count = 0
for keyword in keywords:
# عدد مرات ظهور الكلمة المفتاحية في النص
topic_count += len(re.findall(r'\b' + keyword + r'\w*\b', text))
# حساب النسبة المئوية للموضوع
if word_count > 0:
topic_percentage = (topic_count / word_count) * 100
topics[topic] = round(topic_percentage, 2)
else:
topics[topic] = 0
return topics
def _check_required_terms(self, text):
"""التحقق من وجود المصطلحات المطلوبة"""
required_terms = {
"general": ['نطاق العمل', 'مدة التنفيذ', 'الشروط العامة'],
"financial": ['قيمة العقد', 'طريقة الدفع', 'الضمان المالي'],
"legal": ['حل النزاعات', 'الإنهاء', 'التعويضات'],
"technical": ['المواصفات الفنية', 'ضمان الجودة', 'معايير القبول']
}
found_terms = {}
for category, terms in required_terms.items():
found_in_category = []
for term in terms:
if term in text:
found_in_category.append(term)
found_terms[category] = found_in_category
return found_terms
def _calculate_compliance_score(self, text):
"""حساب درجة الامتثال"""
# التحقق من وجود الأقسام المطلوبة
missing_sections = self._check_missing_sections(text)
required_terms = self._check_required_terms(text)
# حساب درجة الامتثال
total_required_terms = sum(len(terms) for terms in required_terms.values())
found_terms = sum(len(found) for found in required_terms.values())
if total_required_terms > 0:
compliance_percentage = (found_terms / total_required_terms) * 100
# تقليل الدرجة بناءً على الأقسام المفقودة
compliance_percentage -= len(missing_sections) * 5
# ضمان أن الدرجة في النطاق المناسب
compliance_percentage = max(0, min(100, compliance_percentage))
return round(compliance_percentage, 1)
else:
return 0
def _get_version_info(self, document_path):
"""الحصول على معلومات الإصدار"""
# في التطبيق الحقيقي، قد تحتاج لاستخراج معلومات الإصدار من الملف
version_info = {
"filename": os.path.basename(document_path),
"last_modified": time.ctime(os.path.getmtime(document_path))
}
# البحث عن رقم الإصدار في اسم الملف
match = re.search(r'[vV](\d+(?:\.\d+)*)', os.path.basename(document_path))
if match:
version_info["version_number"] = match.group(1)
else:
version_info["version_number"] = "غير محدد"
return version_info
def _analyze_docx(self, document_path, document_type):
"""تحليل مستند Word"""
try:
# استخراج النص من ملف Word
text = self._extract_text_from_docx(document_path)
# استخدام نفس آلية تحليل PDF للتحليل
analysis = self._analyze_pdf(document_path, document_type)
# تحديث نوع الملف
analysis["file_info"]["type"] = "DOCX"
return analysis
except Exception as e:
logger.error(f"خطأ في تحليل مستند Word: {str(e)}")
raise
def _extract_text_from_docx(self, document_path):
"""استخراج النص من ملف Word"""
try:
import docx
doc = docx.Document(document_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
text += cell.text + " "
text += "\n"
return text
except Exception as e:
logger.error(f"خطأ في استخراج النص من Word: {str(e)}")
raise
def _analyze_xlsx(self, document_path, document_type):
"""تحليل مستند Excel"""
try:
# استخراج البيانات من ملف Excel
data = self._extract_data_from_xlsx(document_path)
# إنشاء تحليل مخصص لملفات Excel
analysis = {
"document_path": document_path,
"document_type": document_type,
"analysis_start_time": self.analysis_results["analysis_start_time"],
"status": "جاري التحليل",
"file_info": {
"name": os.path.basename(document_path),
"type": "XLSX",
"size": os.path.getsize(document_path),
"sheets": self._count_sheets(document_path),
"create_date": "غير متوفر",
"modify_date": time.ctime(os.path.getmtime(document_path))
},
"data_analysis": {
"sheet_summary": data["sheet_summary"],
"total_rows": data["total_rows"],
"total_columns": data["total_columns"],
"numeric_columns": data["numeric_columns"],
"text_columns": data["text_columns"],
"date_columns": data["date_columns"]
}
}
# إضافة تحليلات إضافية حسب نوع المستند
if document_type == "tender":
analysis["tender_analysis"] = self._analyze_excel_tender(data)
elif document_type == "financial":
analysis["financial_analysis"] = self._analyze_excel_financial(data)
return analysis
except Exception as e:
logger.error(f"خطأ في تحليل مستند Excel: {str(e)}")
raise
def _extract_data_from_xlsx(self, document_path):
"""استخراج البيانات من ملف Excel"""
try:
import pandas as pd
# قراءة جميع الأوراق في الملف
excel_file = pd.ExcelFile(document_path)
sheet_names = excel_file.sheet_names
data = {
"sheet_summary": {},
"total_rows": 0,
"total_columns": 0,
"numeric_columns": 0,
"text_columns": 0,
"date_columns": 0,
"sheets": {}
}
for sheet_name in sheet_names:
# قراءة الورقة إلى DataFrame
df = pd.read_excel(excel_file, sheet_name=sheet_name)
# تحليل أنواع البيانات في الأعمدة
column_types = {}
numeric_columns = 0
text_columns = 0
date_columns = 0
for column in df.columns:
if pd.api.types.is_numeric_dtype(df[column]):
column_types[column] = "numeric"
numeric_columns += 1
elif pd.api.types.is_datetime64_dtype(df[column]):
column_types[column] = "date"
date_columns += 1
else:
column_types[column] = "text"
text_columns += 1
# تحديث ملخص الورقة
data["sheet_summary"][sheet_name] = {
"rows": len(df),
"columns": len(df.columns),
"column_types": column_types
}
# تحديث الإحصائيات الإجمالية
data["total_rows"] += len(df)
data["total_columns"] += len(df.columns)
data["numeric_columns"] += numeric_columns
data["text_columns"] += text_columns
data["date_columns"] += date_columns
# تخزين البيانات (مع حد أقصى للصفوف للتحكم في الحجم)
max_rows = 100
data["sheets"][sheet_name] = df.head(max_rows).to_dict(orient="records")
return data
except Exception as e:
logger.error(f"خطأ في استخراج البيانات من Excel: {str(e)}")
raise
def _count_sheets(self, document_path):
"""حساب عدد الأوراق في ملف Excel"""
try:
import pandas as pd
excel_file = pd.ExcelFile(document_path)
return len(excel_file.sheet_names)
except Exception as e:
logger.error(f"خطأ في حساب عدد الأوراق: {str(e)}")
return 0
def _analyze_excel_tender(self, data):
"""تحليل بيانات المناقصة من ملف Excel"""
# تحليل بسيط لملف Excel خاص بالمناقصة
analysis = {
"items": self._extract_tender_items(data),
"quantities": self._extract_tender_quantities(data),
"pricing": self._extract_tender_pricing(data)
}
return analysis
def _extract_tender_items(self, data):
"""استخراج البنود من بيانات المناقصة"""
items = []
# البحث عن الأوراق التي تحتوي على بنود المناقصة
for sheet_name, sheet_data in data["sheets"].items():
if not sheet_data:
continue
# البحث عن الأعمدة التي قد تحتوي على أسماء البنود
possible_item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"]
for row in sheet_data:
item_found = False
# البحث عن أسماء البنود
for column in possible_item_columns:
if column in row and row[column]:
# تحقق من وجود أعمدة الكمية والوحدة
quantity = None
unit = None
for qty_col in ["الكمية", "العدد", "quantity", "qty"]:
if qty_col in row and row[qty_col]:
quantity = row[qty_col]
break
for unit_col in ["الوحدة", "unit", "uom"]:
if unit_col in row and row[unit_col]:
unit = row[unit_col]
break
# إضافة البند إلى القائمة
items.append({
"name": row[column],
"quantity": quantity,
"unit": unit
})
item_found = True
break
if item_found:
break
return items
def _extract_tender_quantities(self, data):
"""استخراج الكميات من بيانات المناقصة"""
quantities = {}
# البحث عن الأوراق التي تحتوي على كميات المناقصة
for sheet_name, sheet_data in data["sheets"].items():
if not sheet_data:
continue
# البحث عن الأعمدة التي قد تحتوي على الكميات
quantity_columns = ["الكمية", "العدد", "quantity", "qty"]
item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"]
for row in sheet_data:
item_name = None
quantity = None
# البحث عن اسم البند
for col in item_columns:
if col in row and row[col]:
item_name = row[col]
break
# البحث عن الكمية
for col in quantity_columns:
if col in row and row[col]:
quantity = row[col]
break
# تخزين الكمية إذا وجدت
if item_name and quantity:
quantities[item_name] = quantity
return quantities
def _extract_tender_pricing(self, data):
"""استخراج الأسعار من بيانات المناقصة"""
pricing = {}
# البحث عن الأوراق التي تحتوي على أسعار المناقصة
for sheet_name, sheet_data in data["sheets"].items():
if not sheet_data:
continue
# البحث عن الأعمدة التي قد تحتوي على الأسعار
price_columns = ["السعر", "التكلفة", "المبلغ", "price", "cost", "amount"]
item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"]
for row in sheet_data:
item_name = None
price = None
# البحث عن اسم البند
for col in item_columns:
if col in row and row[col]:
item_name = row[col]
break
# البحث عن السعر
for col in price_columns:
if col in row and row[col]:
price = row[col]
break
# تخزين السعر إذا وجد
if item_name and price:
pricing[item_name] = price
return pricing
def _analyze_excel_financial(self, data):
"""تحليل البيانات المالية من ملف Excel"""
# تحليل بسيط لملف Excel مالي
analysis = {
"total_amount": self._calculate_total_amount(data),
"budget_breakdown": self._extract_budget_breakdown(data),
"payment_schedule": self._extract_payment_schedule(data)
}
return analysis
def _calculate_total_amount(self, data):
"""حساب المبلغ الإجمالي من البيانات المالية"""
total = 0
# البحث عن الأوراق التي تحتوي على بيانات مالية
for sheet_name, sheet_data in data["sheets"].items():
if not sheet_data:
continue
# البحث عن الأعمدة التي قد تحتوي على مبالغ
amount_columns = ["المبلغ", "الإجمالي", "المجموع", "amount", "total", "sum"]
for row in sheet_data:
for col in amount_columns:
if col in row and row[col] and isinstance(row[col], (int, float)):
total += row[col]
return total
def _extract_budget_breakdown(self, data):
"""استخراج تفاصيل الميزانية من البيانات المالية"""
breakdown = {}
# البحث عن الأوراق التي تحتوي على تفاصيل الميزانية
for sheet_name, sheet_data in data["sheets"].items():
if not sheet_data:
continue
# البحث عن الأعمدة التي قد تحتوي على بنود الميزانية
category_columns = ["البند", "الفئة", "القسم", "category", "item"]
amount_columns = ["المبلغ", "التكلفة", "القيمة", "amount", "cost", "value"]
for row in sheet_data:
category = None
amount = None
# البحث عن فئة الميزانية
for col in category_columns:
if col in row and row[col]:
category = row[col]
break
# البحث عن المبلغ
for col in amount_columns:
if col in row and row[col] and isinstance(row[col], (int, float)):
amount = row[col]
break
# تخزين بند الميزانية إذا وجد
if category and amount:
breakdown[category] = amount
return breakdown
def _extract_payment_schedule(self, data):
"""استخراج جدول الدفعات من البيانات المالية"""
schedule = []
# البحث عن الأوراق التي تحتوي على جدول الدفعات
for sheet_name, sheet_data in data["sheets"].items():
if not sheet_data:
continue
# البحث عن الأعمدة التي قد تحتوي على معلومات الدفعات
date_columns = ["التاريخ", "الموعد", "date", "schedule"]
amount_columns = ["المبلغ", "الدفعة", "القيمة", "amount", "payment", "value"]
description_columns = ["الوصف", "البيان", "description", "details"]
for row in sheet_data:
date = None
amount = None
description = None
# البحث عن تاريخ الدفعة
for col in date_columns:
if col in row and row[col]:
date = row[col]
break
# البحث عن مبلغ الدفعة
for col in amount_columns:
if col in row and row[col]:
amount = row[col]
break
# البحث عن وصف الدفعة
for col in description_columns:
if col in row and row[col]:
description = row[col]
break
# تخزين الدفعة إذا وجدت
if date and amount:
schedule.append({
"date": date,
"amount": amount,
"description": description
})
return schedule
def _analyze_txt(self, document_path, document_type):
"""تحليل مستند نصي"""
try:
# قراءة محتوى الملف النصي
with open(document_path, 'r', encoding='utf-8') as file:
text = file.read()
# استخدام نفس آلية تحليل PDF
analysis = self._analyze_pdf(document_path, document_type)
# تحديث نوع الملف
analysis["file_info"]["type"] = "TXT"
analysis["file_info"]["pages"] = self._estimate_pages(text)
return analysis
except Exception as e:
logger.error(f"خطأ في تحليل مستند نصي: {str(e)}")
raise
def _estimate_pages(self, text):
"""تقدير عدد الصفحات في النص"""
# تقدير بسيط: كل 3000 حرف تعادل صفحة واحدة تقريبًا
return max(1, len(text) // 3000)
def get_analysis_status(self):
"""الحصول على حالة التحليل الحالي"""
if not self.analysis_in_progress:
if not self.analysis_results:
return {"status": "لا يوجد تحليل جارٍ"}
else:
return {"status": self.analysis_results.get("status", "غير معروف")}
return {
"status": "جاري التحليل",
"document_path": self.current_document,
"start_time": self.analysis_results.get("analysis_start_time")
}
def get_analysis_results(self):
"""الحصول على نتائج التحليل"""
return self.analysis_results
def export_analysis_results(self, output_path=None):
"""تصدير نتائج التحليل إلى ملف JSON"""
if not self.analysis_results:
logger.warning("لا توجد نتائج تحليل للتصدير")
return None
if not output_path:
# إنشاء اسم ملف افتراضي
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"analysis_results_{timestamp}.json"
output_path = os.path.join(self.documents_path, filename)
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.analysis_results, f, ensure_ascii=False, indent=4)
logger.info(f"تم تصدير نتائج التحليل إلى: {output_path}")
return output_path
except Exception as e:
logger.error(f"خطأ في تصدير نتائج التحليل: {str(e)}")
return None
def import_analysis_results(self, input_path):
"""استيراد نتائج التحليل من ملف JSON"""
if not os.path.exists(input_path):
logger.error(f"ملف نتائج التحليل غير موجود: {input_path}")
return False
try:
with open(input_path, 'r', encoding='utf-8') as f:
self.analysis_results = json.load(f)
logger.info(f"تم استيراد نتائج التحليل من: {input_path}")
return True
except Exception as e:
logger.error(f"خطأ في استيراد نتائج التحليل: {str(e)}")
return False
def _count_pages(self, document_path):
"""حساب عدد صفحات المستند"""
try:
import PyPDF2
with open(document_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
return len(reader.pages)
except:
return 0
def _get_creation_date(self, document_path):
"""استخراج تاريخ إنشاء المستند"""
try:
import PyPDF2
with open(document_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
if '/CreationDate' in reader.metadata:
return reader.metadata['/CreationDate']
return "غير متوفر"
except:
return "غير متوفر"
def _analyze_technical_specs(self, text):
"""تحليل المواصفات الفنية"""
specs = {
"materials": self._extract_materials(text),
"measurements": self._extract_measurements(text),
"standards": self._extract_standards(text)
}
return specs
def _extract_key_dates(self, text):
"""استخراج التواريخ المهمة"""
import re
date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}'
dates = re.findall(date_pattern, text)
return list(set(dates))
def _extract_figures(self, text):
"""استخراج الأرقام والقيم المهمة"""
import re
# البحث عن القيم النقدية
currency_pattern = r'[\d,]+\.?\d*\s*(?:ريال|دولار|SAR|USD)'
currencies = re.findall(currency_pattern, text)
# البحث عن النسب المئوية
percentage_pattern = r'\d+\.?\d*\s*%'
percentages = re.findall(percentage_pattern, text)
return {
"currencies": currencies,
"percentages": percentages
}
def _analyze_unique_terms(self, text):
"""تحليل المصطلحات الفريدة"""
words = set(text.split())
return list(words)
def _calculate_complexity(self, text):
"""حساب مستوى تعقيد النص"""
words = text.split()
if not words:
return 0
avg_word_length = sum(len(word) for word in words) / len(words)
sentences = text.split('.')
if not sentences:
return 0
avg_sentence_length = len(words) / len(sentences)
# حساب درجة التعقيد (1-10)
complexity = min((avg_word_length * 0.5 + avg_sentence_length * 0.2), 10)
return round(complexity, 2)
def _check_missing_sections(self, text):
"""التحقق من الأقسام المفقودة"""
required_sections = [
"نطاق العمل",
"المواصفات الفنية",
"الشروط العامة",
"الضمانات",
"الغرامات",
"شروط الدفع"
]
missing = []
for section in required_sections:
if section not in text:
missing.append(section)
return missing
def _find_related_documents(self, document_path):
"""البحث عن المستندات المرتبطة"""
directory = os.path.dirname(document_path)
base_name = os.path.basename(document_path)
related = []
for file in os.listdir(directory):
if file != base_name and file.startswith(base_name.split('_')[0]):
related.append(file)
return related
def process_image(self, image_path):
"""معالجة وضغط الصورة"""
try:
# فتح الصورة
with Image.open(image_path) as img:
# تحويل الصورة إلى RGB إذا كانت RGBA
if img.mode == 'RGBA':
img = img.convert('RGB')
# البدء بجودة عالية وتقليلها تدريجياً حتى نصل للحجم المطلوب
quality = 95
max_size = (1200, 1200)
while True:
img.thumbnail(max_size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format='JPEG', quality=quality, optimize=True)
size = len(buffer.getvalue())
# إذا كان الحجم أقل من 5 ميجابايت، نخرج من الحلقة
if size <= 5000000:
break
# تقليل الجودة والحجم
quality = max(quality - 10, 20) # لا نقلل الجودة عن 20
max_size = (int(max_size[0] * 0.8), int(max_size[1] * 0.8))
# إذا وصلنا للحد الأدنى من الجودة والحجم ولم نصل للحجم المطلوب
if quality == 20 and max_size[0] < 400:
raise ValueError("لا يمكن ضغط الصورة للحجم المطلوب")
# تحويل الصورة المضغوطة إلى base64
return base64.b64encode(buffer.getvalue()).decode('utf-8')
except Exception as e:
logger.error(f"خطأ في معالجة الصورة: {str(e)}")
raise
def convert_pdf_to_images(self, pdf_path):
"""تحويل PDF إلى صور"""
try:
from pdf2image import convert_from_path
images = convert_from_path(pdf_path)
return images
except Exception as e:
logger.error(f"فشل في تحويل ملف PDF إلى صورة: {str(e)}")
raise