|
""" |
|
وحدة تحليل المستندات لنظام إدارة المناقصات - Hybrid Face |
|
""" |
|
|
|
import os |
|
import re |
|
import logging |
|
import threading |
|
from pathlib import Path |
|
import datetime |
|
import json |
|
import base64 |
|
import time |
|
from PIL import Image |
|
import io |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger('document_analysis') |
|
|
|
class DocumentAnalyzer: |
|
"""فئة تحليل المستندات""" |
|
|
|
def __init__(self, config=None): |
|
"""تهيئة محلل المستندات""" |
|
self.config = config |
|
self.analysis_in_progress = False |
|
self.current_document = None |
|
self.analysis_results = {} |
|
|
|
|
|
if config and hasattr(config, 'DOCUMENTS_PATH'): |
|
self.documents_path = Path(config.DOCUMENTS_PATH) |
|
else: |
|
self.documents_path = Path('data/documents') |
|
|
|
if not self.documents_path.exists(): |
|
self.documents_path.mkdir(parents=True, exist_ok=True) |
|
|
|
def analyze_document(self, document_path, document_type="tender", callback=None): |
|
"""تحليل مستند""" |
|
if self.analysis_in_progress: |
|
logger.warning("هناك عملية تحليل جارية بالفعل") |
|
return False |
|
|
|
if not os.path.exists(document_path): |
|
logger.error(f"المستند غير موجود: {document_path}") |
|
return False |
|
|
|
self.analysis_in_progress = True |
|
self.current_document = document_path |
|
self.analysis_results = { |
|
"document_path": document_path, |
|
"document_type": document_type, |
|
"analysis_start_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
"status": "جاري التحليل", |
|
"items": [], |
|
"entities": [], |
|
"dates": [], |
|
"amounts": [], |
|
"risks": [] |
|
} |
|
|
|
|
|
thread = threading.Thread( |
|
target=self._analyze_document_thread, |
|
args=(document_path, document_type, callback) |
|
) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
return True |
|
|
|
def _analyze_document_thread(self, document_path, document_type, callback): |
|
"""خيط تحليل المستند""" |
|
try: |
|
|
|
file_extension = os.path.splitext(document_path)[1].lower() |
|
|
|
if file_extension == '.pdf': |
|
self.analysis_results = self._analyze_pdf(document_path, document_type) |
|
elif file_extension == '.docx': |
|
self._analyze_docx(document_path, document_type) |
|
elif file_extension == '.xlsx': |
|
self._analyze_xlsx(document_path, document_type) |
|
elif file_extension == '.txt': |
|
self._analyze_txt(document_path, document_type) |
|
else: |
|
logger.error(f"نوع المستند غير مدعوم: {file_extension}") |
|
self.analysis_results["status"] = "فشل التحليل" |
|
self.analysis_results["error"] = "نوع المستند غير مدعوم" |
|
|
|
|
|
if self.analysis_results["status"] != "فشل التحليل": |
|
self.analysis_results["status"] = "اكتمل التحليل" |
|
self.analysis_results["analysis_end_time"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') |
|
|
|
logger.info(f"اكتمل تحليل المستند: {document_path}") |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل المستند: {str(e)}") |
|
self.analysis_results["status"] = "فشل التحليل" |
|
self.analysis_results["error"] = str(e) |
|
|
|
finally: |
|
self.analysis_in_progress = False |
|
|
|
|
|
if callback and callable(callback): |
|
callback(self.analysis_results) |
|
|
|
def _analyze_pdf(self, document_path, document_type): |
|
"""تحليل مستند PDF باستخدام الذكاء الاصطناعي""" |
|
try: |
|
|
|
text = self._extract_text_from_pdf(document_path) |
|
|
|
|
|
analysis = { |
|
"document_path": document_path, |
|
"document_type": document_type, |
|
"analysis_start_time": self.analysis_results["analysis_start_time"], |
|
"status": "جاري التحليل", |
|
"file_info": { |
|
"name": os.path.basename(document_path), |
|
"type": "PDF", |
|
"size": os.path.getsize(document_path), |
|
"pages": self._count_pages(document_path), |
|
"create_date": self._get_creation_date(document_path), |
|
"modify_date": time.ctime(os.path.getmtime(document_path)) |
|
}, |
|
"content_analysis": { |
|
"contract_terms": self._analyze_contract_terms(text), |
|
"financial_analysis": self._analyze_financial_terms(text), |
|
"legal_analysis": self._analyze_legal_terms(text), |
|
"risk_analysis": self._analyze_risks(text), |
|
"conditions_analysis": self._analyze_conditions(text), |
|
"technical_specs": self._analyze_technical_specs(text), |
|
"key_dates": self._extract_key_dates(text), |
|
"important_figures": self._extract_figures(text), |
|
"entities": self._extract_entities(text) |
|
}, |
|
"statistical_analysis": { |
|
"word_count": len(text.split()), |
|
"unique_terms": self._analyze_unique_terms(text), |
|
"topic_distribution": self._analyze_topics(text), |
|
"complexity_score": self._calculate_complexity(text) |
|
}, |
|
"compliance_check": { |
|
"missing_sections": self._check_missing_sections(text), |
|
"required_terms": self._check_required_terms(text), |
|
"compliance_score": self._calculate_compliance_score(text) |
|
}, |
|
"summary": self._generate_summary(text), |
|
"recommendations": self._generate_recommendations(text), |
|
"related_documents": self._find_related_documents(document_path), |
|
"version_info": self._get_version_info(document_path) |
|
} |
|
|
|
|
|
if document_type == "tender": |
|
analysis["tender_analysis"] = self._analyze_tender_specifics(text) |
|
elif document_type == "contract": |
|
analysis["contract_analysis"] = self._analyze_contract_specifics(text) |
|
elif document_type == "technical": |
|
analysis["technical_analysis"] = self._analyze_technical_specifics(text) |
|
|
|
return analysis |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل PDF: {str(e)}") |
|
raise |
|
|
|
def extract_document_metadata(self, document_path): |
|
"""استخراج البيانات الوصفية للمستند""" |
|
try: |
|
|
|
file_extension = os.path.splitext(document_path)[1].lower() |
|
|
|
metadata = { |
|
"filename": os.path.basename(document_path), |
|
"file_type": file_extension.replace('.', '').upper(), |
|
"file_size": os.path.getsize(document_path), |
|
"creation_date": "غير متوفر", |
|
"modification_date": time.ctime(os.path.getmtime(document_path)), |
|
"author": "غير متوفر", |
|
"title": "غير متوفر" |
|
} |
|
|
|
|
|
if file_extension == '.pdf': |
|
pdf_metadata = self._extract_pdf_metadata(document_path) |
|
metadata.update(pdf_metadata) |
|
elif file_extension == '.docx': |
|
docx_metadata = self._extract_docx_metadata(document_path) |
|
metadata.update(docx_metadata) |
|
elif file_extension == '.xlsx': |
|
xlsx_metadata = self._extract_xlsx_metadata(document_path) |
|
metadata.update(xlsx_metadata) |
|
|
|
return metadata |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج البيانات الوصفية: {str(e)}") |
|
return None |
|
|
|
def _extract_pdf_metadata(self, document_path): |
|
"""استخراج البيانات الوصفية من ملف PDF""" |
|
try: |
|
import PyPDF2 |
|
|
|
metadata = {} |
|
|
|
with open(document_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
|
|
|
|
if reader.metadata: |
|
if '/Title' in reader.metadata: |
|
metadata["title"] = reader.metadata['/Title'] |
|
if '/Author' in reader.metadata: |
|
metadata["author"] = reader.metadata['/Author'] |
|
if '/CreationDate' in reader.metadata: |
|
metadata["creation_date"] = reader.metadata['/CreationDate'] |
|
if '/ModDate' in reader.metadata: |
|
metadata["modification_date"] = reader.metadata['/ModDate'] |
|
if '/Producer' in reader.metadata: |
|
metadata["producer"] = reader.metadata['/Producer'] |
|
if '/Creator' in reader.metadata: |
|
metadata["creator"] = reader.metadata['/Creator'] |
|
|
|
|
|
metadata["pages"] = len(reader.pages) |
|
|
|
return metadata |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج البيانات الوصفية من PDF: {str(e)}") |
|
return {} |
|
|
|
def compare_documents(self, document_path1, document_path2): |
|
"""مقارنة مستندين""" |
|
try: |
|
|
|
self.analyze_document(document_path1) |
|
analysis1 = self.get_analysis_results() |
|
|
|
self.analyze_document(document_path2) |
|
analysis2 = self.get_analysis_results() |
|
|
|
|
|
comparison = { |
|
"document1": { |
|
"path": document_path1, |
|
"file_info": analysis1.get("file_info", {}) |
|
}, |
|
"document2": { |
|
"path": document_path2, |
|
"file_info": analysis2.get("file_info", {}) |
|
}, |
|
"differences": self._find_document_differences(analysis1, analysis2), |
|
"similarity_score": self._calculate_similarity_score(analysis1, analysis2) |
|
} |
|
|
|
return comparison |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في مقارنة المستندات: {str(e)}") |
|
return None |
|
|
|
def _find_document_differences(self, analysis1, analysis2): |
|
"""العثور على الاختلافات بين تحليلين""" |
|
differences = {} |
|
|
|
|
|
if "items" in analysis1 and "items" in analysis2: |
|
items1 = set(item["name"] for item in analysis1["items"] if "name" in item) |
|
items2 = set(item["name"] for item in analysis2["items"] if "name" in item) |
|
|
|
differences["items"] = { |
|
"only_in_doc1": list(items1 - items2), |
|
"only_in_doc2": list(items2 - items1), |
|
"common": list(items1.intersection(items2)) |
|
} |
|
|
|
|
|
if "entities" in analysis1 and "entities" in analysis2: |
|
entities1 = set(entity for entity in analysis1["entities"]) |
|
entities2 = set(entity for entity in analysis2["entities"]) |
|
|
|
differences["entities"] = { |
|
"only_in_doc1": list(entities1 - entities2), |
|
"only_in_doc2": list(entities2 - entities1), |
|
"common": list(entities1.intersection(entities2)) |
|
} |
|
|
|
|
|
if "dates" in analysis1 and "dates" in analysis2: |
|
dates1 = set(date for date in analysis1["dates"]) |
|
dates2 = set(date for date in analysis2["dates"]) |
|
|
|
differences["dates"] = { |
|
"only_in_doc1": list(dates1 - dates2), |
|
"only_in_doc2": list(dates2 - dates1), |
|
"common": list(dates1.intersection(dates2)) |
|
} |
|
|
|
|
|
if "amounts" in analysis1 and "amounts" in analysis2: |
|
amounts1 = set(amount["amount"] for amount in analysis1["amounts"] if "amount" in amount) |
|
amounts2 = set(amount["amount"] for amount in analysis2["amounts"] if "amount" in amount) |
|
|
|
differences["amounts"] = { |
|
"only_in_doc1": list(amounts1 - amounts2), |
|
"only_in_doc2": list(amounts2 - amounts1), |
|
"common": list(amounts1.intersection(amounts2)) |
|
} |
|
|
|
return differences |
|
|
|
def _calculate_similarity_score(self, analysis1, analysis2): |
|
"""حساب درجة التشابه بين تحليلين""" |
|
|
|
similarity_score = 0 |
|
total_factors = 0 |
|
|
|
|
|
if "items" in analysis1 and "items" in analysis2: |
|
items1 = set(item["name"] for item in analysis1["items"] if "name" in item) |
|
items2 = set(item["name"] for item in analysis2["items"] if "name" in item) |
|
|
|
if items1 or items2: |
|
similarity_score += len(items1.intersection(items2)) / max(len(items1.union(items2)), 1) |
|
total_factors += 1 |
|
|
|
|
|
if "entities" in analysis1 and "entities" in analysis2: |
|
entities1 = set(entity for entity in analysis1["entities"]) |
|
entities2 = set(entity for entity in analysis2["entities"]) |
|
|
|
if entities1 or entities2: |
|
similarity_score += len(entities1.intersection(entities2)) / max(len(entities1.union(entities2)), 1) |
|
total_factors += 1 |
|
|
|
|
|
if "dates" in analysis1 and "dates" in analysis2: |
|
dates1 = set(date for date in analysis1["dates"]) |
|
dates2 = set(date for date in analysis2["dates"]) |
|
|
|
if dates1 or dates2: |
|
similarity_score += len(dates1.intersection(dates2)) / max(len(dates1.union(dates2)), 1) |
|
total_factors += 1 |
|
|
|
|
|
if "amounts" in analysis1 and "amounts" in analysis2: |
|
amounts1 = set(amount["amount"] for amount in analysis1["amounts"] if "amount" in amount) |
|
amounts2 = set(amount["amount"] for amount in analysis2["amounts"] if "amount" in amount) |
|
|
|
if amounts1 or amounts2: |
|
similarity_score += len(amounts1.intersection(amounts2)) / max(len(amounts1.union(amounts2)), 1) |
|
total_factors += 1 |
|
|
|
|
|
if total_factors > 0: |
|
similarity_percentage = (similarity_score / total_factors) * 100 |
|
return round(similarity_percentage, 2) |
|
else: |
|
return 0.0 |
|
|
|
def generate_report(self, analysis_results=None, report_format="html"): |
|
"""توليد تقرير من نتائج التحليل""" |
|
try: |
|
|
|
if analysis_results is None: |
|
analysis_results = self.analysis_results |
|
|
|
if not analysis_results: |
|
logger.warning("لا توجد نتائج تحليل لتوليد تقرير") |
|
return None |
|
|
|
|
|
if report_format.lower() == "html": |
|
return self._generate_html_report(analysis_results) |
|
elif report_format.lower() == "pdf": |
|
return self._ |
|
|
|
def _extract_docx_metadata(self, document_path): |
|
"""استخراج البيانات الوصفية من ملف Word""" |
|
try: |
|
import docx |
|
|
|
metadata = {} |
|
|
|
doc = docx.Document(document_path) |
|
|
|
|
|
core_properties = doc.core_properties |
|
|
|
if core_properties.title: |
|
metadata["title"] = core_properties.title |
|
if core_properties.author: |
|
metadata["author"] = core_properties.author |
|
if core_properties.created: |
|
metadata["creation_date"] = str(core_properties.created) |
|
if core_properties.modified: |
|
metadata["modification_date"] = str(core_properties.modified) |
|
if core_properties.last_modified_by: |
|
metadata["last_modified_by"] = core_properties.last_modified_by |
|
if core_properties.revision: |
|
metadata["revision"] = core_properties.revision |
|
|
|
|
|
text_length = sum(len(paragraph.text) for paragraph in doc.paragraphs) |
|
estimated_pages = max(1, text_length // 3000) |
|
metadata["pages"] = estimated_pages |
|
|
|
return metadata |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج البيانات الوصفية من Word: {str(e)}") |
|
return {} |
|
|
|
def _extract_xlsx_metadata(self, document_path): |
|
"""استخراج البيانات الوصفية من ملف Excel""" |
|
try: |
|
import openpyxl |
|
|
|
metadata = {} |
|
|
|
workbook = openpyxl.load_workbook(document_path, read_only=True) |
|
|
|
|
|
if workbook.properties: |
|
if workbook.properties.title: |
|
metadata["title"] = workbook.properties.title |
|
if workbook.properties.creator: |
|
metadata["author"] = workbook.properties.creator |
|
if workbook.properties.created: |
|
metadata["creation_date"] = str(workbook.properties.created) |
|
if workbook.properties.modified: |
|
metadata["modification_date"] = str(workbook.properties.modified) |
|
if workbook.properties.lastModifiedBy: |
|
metadata["last_modified_by"] = workbook.properties.lastModifiedBy |
|
if workbook.properties.revision: |
|
metadata["revision"] = workbook.properties.revision |
|
|
|
|
|
metadata["sheets"] = len(workbook.sheetnames) |
|
metadata["sheet_names"] = workbook.sheetnames |
|
|
|
return metadata |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج البيانات الوصفية من Excel: {str(e)}") |
|
return {} |
|
|
|
def _extract_text_from_pdf(self, document_path): |
|
"""استخراج النص من ملف PDF""" |
|
try: |
|
import PyPDF2 |
|
text = "" |
|
with open(document_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
for page in reader.pages: |
|
text += page.extract_text() + "\n" |
|
return text |
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج النص من PDF: {str(e)}") |
|
raise |
|
|
|
def _analyze_contract_terms(self, text): |
|
"""تحليل بنود العقد""" |
|
terms = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['شروط', 'بند', 'يلتزم', 'يجب']): |
|
terms.append(section.strip()) |
|
return terms |
|
|
|
def _analyze_financial_terms(self, text): |
|
"""تحليل الجزء المالي""" |
|
financial_terms = [] |
|
|
|
|
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مالي', 'تكلفة', 'سعر', 'ميزانية', 'دفع']): |
|
financial_terms.append(section.strip()) |
|
|
|
|
|
amounts = self._extract_monetary_amounts(text) |
|
|
|
return { |
|
"sections": financial_terms, |
|
"amounts": amounts, |
|
"payment_terms": self._extract_payment_terms(text), |
|
"budget_allocation": self._extract_budget_allocation(text) |
|
} |
|
|
|
def _extract_monetary_amounts(self, text): |
|
"""استخراج المبالغ المالية من النص""" |
|
import re |
|
|
|
pattern = r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)\s*(?:ريال|دولار|SAR|USD|ر\.س|\$)' |
|
matches = re.findall(pattern, text) |
|
return [float(amount.replace(',', '')) for amount in matches] |
|
|
|
def _extract_payment_terms(self, text): |
|
"""استخراج شروط الدفع""" |
|
payment_terms = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['دفع', 'سداد', 'أقساط', 'مستحقات']): |
|
payment_terms.append(section.strip()) |
|
return payment_terms |
|
|
|
def _extract_budget_allocation(self, text): |
|
"""استخراج تخصيص الميزانية""" |
|
|
|
|
|
budget_items = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['ميزانية', 'تخصيص', 'تمويل']): |
|
budget_items.append(section.strip()) |
|
return budget_items |
|
|
|
def _analyze_legal_terms(self, text): |
|
"""تحليل القانوني للعقد""" |
|
legal_terms = [] |
|
|
|
|
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['قانون', 'تشريع', 'نظام', 'حكم', 'قضاء', 'محكمة']): |
|
legal_terms.append(section.strip()) |
|
|
|
return { |
|
"sections": legal_terms, |
|
"liability_clauses": self._extract_liability_clauses(text), |
|
"dispute_resolution": self._extract_dispute_resolution(text), |
|
"legal_references": self._extract_legal_references(text) |
|
} |
|
|
|
def _extract_liability_clauses(self, text): |
|
"""استخراج بنود المسؤولية""" |
|
liability_clauses = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مسؤولية', 'التزام', 'ضمان', 'تعويض']): |
|
liability_clauses.append(section.strip()) |
|
return liability_clauses |
|
|
|
def _extract_dispute_resolution(self, text): |
|
"""استخراج آلية فض النزاعات""" |
|
dispute_clauses = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['نزاع', 'خلاف', 'تحكيم', 'قضاء', 'تسوية']): |
|
dispute_clauses.append(section.strip()) |
|
return dispute_clauses |
|
|
|
def _extract_legal_references(self, text): |
|
"""استخراج المراجع القانونية""" |
|
import re |
|
|
|
pattern = r'قانون رقم \d+|لائحة \d+|نظام \d+|مرسوم \d+' |
|
return re.findall(pattern, text) |
|
|
|
def _analyze_risks(self, text): |
|
"""تحليل المخاطر""" |
|
risk_factors = [] |
|
|
|
|
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مخاطر', 'خطر', 'تهديد', 'ضرر', 'إخلال']): |
|
risk_factors.append(section.strip()) |
|
|
|
|
|
risk_categories = { |
|
"financial_risks": self._extract_financial_risks(text), |
|
"operational_risks": self._extract_operational_risks(text), |
|
"legal_risks": self._extract_legal_risks(text), |
|
"technical_risks": self._extract_technical_risks(text) |
|
} |
|
|
|
|
|
risk_severity = self._assess_risk_severity(risk_factors) |
|
|
|
return { |
|
"risk_factors": risk_factors, |
|
"risk_categories": risk_categories, |
|
"risk_severity": risk_severity, |
|
"mitigation_suggestions": self._suggest_risk_mitigation(risk_factors) |
|
} |
|
|
|
def _extract_financial_risks(self, text): |
|
"""استخراج المخاطر المالية""" |
|
financial_risks = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مخاطر مالية', 'خسارة', 'تكلفة إضافية', 'غرامة']): |
|
financial_risks.append(section.strip()) |
|
return financial_risks |
|
|
|
def _extract_operational_risks(self, text): |
|
"""استخراج المخاطر التشغيلية""" |
|
operational_risks = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مخاطر تشغيلية', 'توقف', 'تعطل', 'تأخير']): |
|
operational_risks.append(section.strip()) |
|
return operational_risks |
|
|
|
def _extract_legal_risks(self, text): |
|
"""استخراج المخاطر القانونية""" |
|
legal_risks = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مخاطر قانونية', 'نزاع', 'مخالفة', 'تقاضي']): |
|
legal_risks.append(section.strip()) |
|
return legal_risks |
|
|
|
def _extract_technical_risks(self, text): |
|
"""استخراج المخاطر الفنية""" |
|
technical_risks = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مخاطر فنية', 'عطل', 'خلل', 'تقني']): |
|
technical_risks.append(section.strip()) |
|
return technical_risks |
|
|
|
def _assess_risk_severity(self, risk_factors): |
|
"""تقييم شدة المخاطر""" |
|
|
|
|
|
severity_scores = [] |
|
for risk in risk_factors: |
|
|
|
score = len(risk) / 100 |
|
|
|
|
|
severe_keywords = ['خطير', 'شديد', 'كبير', 'جسيم', 'عالي'] |
|
for keyword in severe_keywords: |
|
if keyword in risk.lower(): |
|
score += 1 |
|
|
|
severity_scores.append(min(score, 10)) |
|
|
|
|
|
average_severity = sum(severity_scores) / len(severity_scores) if severity_scores else 0 |
|
|
|
|
|
if average_severity >= 7: |
|
return "عالية" |
|
elif average_severity >= 4: |
|
return "متوسطة" |
|
else: |
|
return "منخفضة" |
|
|
|
def _suggest_risk_mitigation(self, risk_factors): |
|
"""اقتراح آليات تخفيف المخاطر""" |
|
mitigations = [] |
|
|
|
|
|
|
|
|
|
if any("مالي" in risk for risk in risk_factors): |
|
mitigations.append("ضمانات مالية وتأمين لتغطية المخاطر المالية") |
|
|
|
if any("تأخير" in risk for risk in risk_factors): |
|
mitigations.append("وضع جداول زمنية مرنة وخطط بديلة للطوارئ") |
|
|
|
if any("قانوني" in risk for risk in risk_factors): |
|
mitigations.append("مراجعة قانونية شاملة للعقد وبنوده") |
|
|
|
if any("فني" in risk for risk in risk_factors): |
|
mitigations.append("اختبارات فنية مسبقة وضمانات للأداء الفني") |
|
|
|
|
|
if not mitigations: |
|
mitigations = [ |
|
"وضع خطة إدارة مخاطر شاملة", |
|
"تحديد مسؤوليات الأطراف بوضوح", |
|
"وضع آليات للمتابعة والتقييم الدوري", |
|
"توفير ضمانات مالية وفنية كافية" |
|
] |
|
|
|
return mitigations |
|
|
|
def _analyze_conditions(self, text): |
|
"""دراسة كراسة الشروط""" |
|
conditions = [] |
|
|
|
|
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['شروط', 'متطلبات', 'معايير', 'مواصفات']): |
|
conditions.append(section.strip()) |
|
|
|
|
|
categorized_conditions = { |
|
"general_conditions": self._extract_general_conditions(text), |
|
"technical_conditions": self._extract_technical_conditions(text), |
|
"administrative_conditions": self._extract_administrative_conditions(text), |
|
"financial_conditions": self._extract_financial_conditions(text) |
|
} |
|
|
|
|
|
completeness_score = self._assess_conditions_completeness(conditions) |
|
clarity_score = self._assess_conditions_clarity(conditions) |
|
|
|
return { |
|
"conditions_list": conditions, |
|
"categorized_conditions": categorized_conditions, |
|
"completeness_score": completeness_score, |
|
"clarity_score": clarity_score, |
|
"improvement_suggestions": self._suggest_conditions_improvements(conditions) |
|
} |
|
|
|
def _extract_general_conditions(self, text): |
|
"""استخراج الشروط العامة""" |
|
general_conditions = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['شروط عامة', 'أحكام عامة']): |
|
general_conditions.append(section.strip()) |
|
return general_conditions |
|
|
|
def _extract_technical_conditions(self, text): |
|
"""استخراج الشروط الفنية""" |
|
technical_conditions = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['شروط فنية', 'مواصفات فنية', 'متطلبات فنية']): |
|
technical_conditions.append(section.strip()) |
|
return technical_conditions |
|
|
|
def _extract_administrative_conditions(self, text): |
|
"""استخراج الشروط الإدارية""" |
|
admin_conditions = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['شروط إدارية', 'متطلبات إدارية']): |
|
admin_conditions.append(section.strip()) |
|
return admin_conditions |
|
|
|
def _extract_financial_conditions(self, text): |
|
"""استخراج الشروط المالية""" |
|
financial_conditions = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['شروط مالية', 'متطلبات مالية']): |
|
financial_conditions.append(section.strip()) |
|
return financial_conditions |
|
|
|
def _assess_conditions_completeness(self, conditions): |
|
"""تقييم اكتمال الشروط""" |
|
|
|
required_categories = ['عامة', 'فنية', 'إدارية', 'مالية'] |
|
|
|
coverage = 0 |
|
for category in required_categories: |
|
if any(category in condition.lower() for condition in conditions): |
|
coverage += 1 |
|
|
|
|
|
completeness_score = (coverage / len(required_categories)) * 10 |
|
|
|
return min(round(completeness_score, 1), 10) |
|
|
|
def _assess_conditions_clarity(self, conditions): |
|
"""تقييم وضوح الشروط""" |
|
|
|
|
|
|
|
clarity_scores = [] |
|
for condition in conditions: |
|
|
|
score = 10 |
|
|
|
|
|
ambiguous_terms = ['ربما', 'قد', 'يمكن', 'محتمل', 'حسب الاقتضاء', 'في بعض الحالات'] |
|
for term in ambiguous_terms: |
|
if term in condition.lower(): |
|
score -= 1 |
|
|
|
|
|
if len(condition) > 500: |
|
score -= 2 |
|
|
|
clarity_scores.append(max(score, 1)) |
|
|
|
|
|
average_clarity = sum(clarity_scores) / len(clarity_scores) if clarity_scores else 0 |
|
|
|
return round(average_clarity, 1) |
|
|
|
def _suggest_conditions_improvements(self, conditions): |
|
"""اقتراح تحسينات للشروط""" |
|
suggestions = [] |
|
|
|
|
|
if not any('عامة' in condition.lower() for condition in conditions): |
|
suggestions.append("إضافة قسم للشروط العامة يوضح نطاق العمل والمسؤوليات العامة") |
|
|
|
if not any('فنية' in condition.lower() for condition in conditions): |
|
suggestions.append("إضافة قسم للشروط الفنية يحدد المواصفات والمتطلبات الفنية بدقة") |
|
|
|
if not any('إدارية' in condition.lower() for condition in conditions): |
|
suggestions.append("إضافة قسم للشروط الإدارية يوضح الإجراءات والمتطلبات الإدارية") |
|
|
|
if not any('مالية' in condition.lower() for condition in conditions): |
|
suggestions.append("إضافة قسم للشروط المالية يحدد الالتزامات المالية وآليات الدفع") |
|
|
|
|
|
ambiguous_conditions = [] |
|
for condition in conditions: |
|
if any(term in condition.lower() for term in ['ربما', 'قد', 'يمكن', 'محتمل']): |
|
ambiguous_conditions.append(condition) |
|
|
|
if ambiguous_conditions: |
|
suggestions.append("توضيح الشروط الغامضة وتحديد المتطلبات بدقة أكبر") |
|
|
|
|
|
if not suggestions: |
|
suggestions = [ |
|
"تنظيم الشروط في أقسام منفصلة وواضحة", |
|
"استخدام لغة بسيطة ومباشرة في صياغة الشروط", |
|
"تحديد المعايير الكمية والنوعية بدقة", |
|
"تضمين آليات لحل النزاعات في حالة الاختلاف حول تفسير الشروط" |
|
] |
|
|
|
return suggestions |
|
|
|
def _generate_summary(self, text): |
|
"""توليد ملخص""" |
|
|
|
|
|
|
|
|
|
important_sentences = [] |
|
sentences = text.split('.') |
|
|
|
|
|
key_terms = ['شروط', 'بنود', 'التزامات', 'متطلبات', 'تكلفة', 'مدة', 'ضمان', 'غرامة'] |
|
for sentence in sentences: |
|
if any(term in sentence.lower() for term in key_terms): |
|
important_sentences.append(sentence.strip()) |
|
|
|
|
|
max_sentences = min(10, len(important_sentences)) |
|
summary_sentences = important_sentences[:max_sentences] |
|
|
|
|
|
summary = '. '.join(summary_sentences) |
|
|
|
|
|
summary += f"\n\nيتكون المستند من {len(sentences)} جملة وتم تلخيصه في {len(summary_sentences)} جمل رئيسية." |
|
|
|
return summary |
|
|
|
def _generate_recommendations(self, text): |
|
"""توليد التوصيات""" |
|
|
|
|
|
|
|
recommendations = [] |
|
|
|
|
|
if 'شروط' not in text.lower(): |
|
recommendations.append("إضافة قسم واضح للشروط العامة والخاصة") |
|
|
|
if 'مواصفات فنية' not in text.lower(): |
|
recommendations.append("توضيح المواصفات الفنية المطلوبة بشكل مفصل") |
|
|
|
if 'غرامات' not in text.lower(): |
|
recommendations.append("تحديد الغرامات والجزاءات بوضوح في حالة عدم الالتزام") |
|
|
|
if 'ضمان' not in text.lower(): |
|
recommendations.append("تضمين بنود الضمان والصيانة بشكل واضح") |
|
|
|
|
|
risks = self._analyze_risks(text) |
|
if risks["risk_severity"] == "عالية": |
|
recommendations.append("مراجعة بنود العقد للتقليل من المخاطر العالية المحددة في التحليل") |
|
|
|
|
|
conditions = self._analyze_conditions(text) |
|
if conditions["clarity_score"] < 7: |
|
recommendations.append("تحسين صياغة الشروط لزيادة الوضوح وتقليل الغموض") |
|
|
|
|
|
general_recommendations = [ |
|
"مراجعة العقد من قبل مستشار قانوني متخصص", |
|
"التأكد من توافق البنود مع الأنظمة واللوائح الحالية", |
|
"تضمين آليات واضحة لحل النزاعات", |
|
"تحديد مسؤوليات كل طرف بشكل صريح", |
|
"وضع جداول زمنية واضحة للتنفيذ ومؤشرات للأداء" |
|
] |
|
|
|
|
|
recommendations.extend(general_recommendations) |
|
|
|
return recommendations |
|
|
|
def _analyze_tender_specifics(self, text): |
|
"""تحليل خاص بالمناقصات""" |
|
return { |
|
"eligibility_criteria": self._extract_eligibility_criteria(text), |
|
"submission_requirements": self._extract_submission_requirements(text), |
|
"evaluation_criteria": self._extract_evaluation_criteria(text), |
|
"timeline": self._extract_tender_timeline(text) |
|
} |
|
|
|
def _extract_eligibility_criteria(self, text): |
|
"""استخراج معايير الأهلية""" |
|
criteria = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['أهلية', 'شروط المشاركة', 'متطلبات التأهيل']): |
|
criteria.append(section.strip()) |
|
return criteria |
|
|
|
def _extract_submission_requirements(self, text): |
|
"""استخراج متطلبات تقديم العروض""" |
|
requirements = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['تقديم العروض', 'متطلبات العرض', 'مستندات']): |
|
requirements.append(section.strip()) |
|
return requirements |
|
|
|
def _extract_evaluation_criteria(self, text): |
|
"""استخراج معايير التقييم""" |
|
criteria = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['معايير التقييم', 'آلية التقييم', 'ترسية']): |
|
criteria.append(section.strip()) |
|
return criteria |
|
|
|
def _extract_tender_timeline(self, text): |
|
"""استخراج الجدول الزمني للمناقصة""" |
|
import re |
|
|
|
timeline = {} |
|
|
|
|
|
date_pattern = r'(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})' |
|
|
|
|
|
announcement_match = re.search(r'تاريخ الإعلان\s*[:؛]\s*' + date_pattern, text) |
|
if announcement_match: |
|
timeline["announcement_date"] = announcement_match.group(1) |
|
|
|
|
|
start_submission_match = re.search(r'بدء استلام العروض\s*[:؛]\s*' + date_pattern, text) |
|
if start_submission_match: |
|
timeline["submission_start_date"] = start_submission_match.group(1) |
|
|
|
|
|
end_submission_match = re.search(r'إغلاق استلام العروض\s*[:؛]\s*' + date_pattern, text) |
|
if end_submission_match: |
|
timeline["submission_end_date"] = end_submission_match.group(1) |
|
|
|
|
|
opening_match = re.search(r'فتح المظاريف\s*[:؛]\s*' + date_pattern, text) |
|
if opening_match: |
|
timeline["opening_date"] = opening_match.group(1) |
|
|
|
|
|
evaluation_match = re.search(r'تاريخ التقييم\s*[:؛]\s*' + date_pattern, text) |
|
if evaluation_match: |
|
timeline["evaluation_date"] = evaluation_match.group(1) |
|
|
|
|
|
award_match = re.search(r'تاريخ الترسية\s*[:؛]\s*' + date_pattern, text) |
|
if award_match: |
|
timeline["award_date"] = award_match.group(1) |
|
|
|
return timeline |
|
|
|
def _analyze_contract_specifics(self, text): |
|
"""تحليل خاص بالعقود""" |
|
return { |
|
"parties": self._extract_contract_parties(text), |
|
"duration": self._extract_contract_duration(text), |
|
"termination_conditions": self._extract_termination_conditions(text), |
|
"penalties": self._extract_penalties(text), |
|
"warranties": self._extract_warranties(text) |
|
} |
|
|
|
def _extract_contract_parties(self, text): |
|
"""استخراج أطراف العقد""" |
|
parties = {} |
|
|
|
|
|
first_party_match = re.search(r'الطرف الأول\s*[:؛]\s*([^\n]+)', text) |
|
if first_party_match: |
|
parties["first_party"] = first_party_match.group(1).strip() |
|
|
|
|
|
second_party_match = re.search(r'الطرف الثاني\s*[:؛]\s*([^\n]+)', text) |
|
if second_party_match: |
|
parties["second_party"] = second_party_match.group(1).strip() |
|
|
|
return parties |
|
|
|
def _extract_contract_duration(self, text): |
|
"""استخراج مدة العقد""" |
|
duration = {} |
|
|
|
|
|
duration_match = re.search(r'مدة العقد\s*[:؛]\s*([^\n]+)', text) |
|
if duration_match: |
|
duration["text"] = duration_match.group(1).strip() |
|
|
|
|
|
start_date_match = re.search(r'تاريخ بداية العقد\s*[:؛]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', text) |
|
if start_date_match: |
|
duration["start_date"] = start_date_match.group(1) |
|
|
|
|
|
end_date_match = re.search(r'تاريخ نهاية العقد\s*[:؛]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', text) |
|
if end_date_match: |
|
duration["end_date"] = end_date_match.group(1) |
|
|
|
return duration |
|
|
|
def _extract_termination_conditions(self, text): |
|
"""استخراج شروط إنهاء العقد""" |
|
conditions = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['إنهاء العقد', 'فسخ العقد', 'إلغاء العقد']): |
|
conditions.append(section.strip()) |
|
return conditions |
|
|
|
def _extract_penalties(self, text): |
|
"""استخراج الغرامات والجزاءات""" |
|
penalties = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['غرامة', 'جزاء', 'عقوبة', 'تعويض']): |
|
penalties.append(section.strip()) |
|
return penalties |
|
|
|
def _extract_warranties(self, text): |
|
"""استخراج الضمانات""" |
|
warranties = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['ضمان', 'كفالة', 'تأمين']): |
|
warranties.append(section.strip()) |
|
return warranties |
|
|
|
def _analyze_technical_specifics(self, text): |
|
"""تحليل خاص بالمستندات الفنية""" |
|
return { |
|
"specifications": self._extract_technical_specifications(text), |
|
"standards": self._extract_technical_standards(text), |
|
"testing_procedures": self._extract_testing_procedures(text), |
|
"quality_requirements": self._extract_quality_requirements(text) |
|
} |
|
|
|
def _extract_technical_specifications(self, text): |
|
"""استخراج المواصفات الفنية""" |
|
specifications = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['مواصفات فنية', 'خصائص', 'متطلبات فنية']): |
|
specifications.append(section.strip()) |
|
return specifications |
|
|
|
def _extract_technical_standards(self, text): |
|
"""استخراج المعايير الفنية""" |
|
standards = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['معايير', 'مقاييس', 'مواصفات قياسية']): |
|
standards.append(section.strip()) |
|
return standards |
|
|
|
def _extract_testing_procedures(self, text): |
|
"""استخراج إجراءات الاختبار""" |
|
procedures = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['اختبار', 'فحص', 'تجربة']): |
|
procedures.append(section.strip()) |
|
return procedures |
|
|
|
def _extract_quality_requirements(self, text): |
|
"""استخراج متطلبات الجودة""" |
|
requirements = [] |
|
sections = text.split('\n\n') |
|
for section in sections: |
|
if any(keyword in section.lower() for keyword in ['جودة', 'ضمان الجودة', 'رقابة']): |
|
requirements.append(section.strip()) |
|
return requirements |
|
|
|
def _extract_entities(self, text): |
|
"""استخراج الكيانات من النص""" |
|
entities = { |
|
"organizations": self._extract_organizations(text), |
|
"people": self._extract_people(text), |
|
"locations": self._extract_locations(text) |
|
} |
|
return entities |
|
|
|
def _extract_organizations(self, text): |
|
"""استخراج المنظمات والشركات""" |
|
import re |
|
|
|
org_pattern = r'شركة [\u0600-\u06FF\s]+|مؤسسة [\u0600-\u06FF\s]+|وزارة [\u0600-\u06FF\s]+|هيئة [\u0600-\u06FF\s]+' |
|
return list(set(re.findall(org_pattern, text))) |
|
|
|
def _extract_people(self, text): |
|
"""استخراج أسماء الأشخاص""" |
|
|
|
|
|
return [] |
|
|
|
def _extract_locations(self, text): |
|
"""استخراج المواقع""" |
|
import re |
|
|
|
location_pattern = r'مدينة [\u0600-\u06FF\s]+|منطقة [\u0600-\u06FF\s]+|محافظة [\u0600-\u06FF\s]+' |
|
return list(set(re.findall(location_pattern, text))) |
|
|
|
def _extract_materials(self, text): |
|
"""استخراج المواد""" |
|
materials = [] |
|
|
|
common_materials = ['حديد', 'خشب', 'زجاج', 'ألمنيوم', 'نحاس', 'بلاستيك', 'خرسانة'] |
|
for material in common_materials: |
|
if material in text.lower(): |
|
|
|
pattern = r'[^.]*\b' + material + r'\b[^.]*\.' |
|
material_contexts = re.findall(pattern, text) |
|
for context in material_contexts: |
|
materials.append(context.strip()) |
|
return materials |
|
|
|
def _extract_measurements(self, text): |
|
"""استخراج القياسات""" |
|
import re |
|
|
|
measurement_pattern = r'\d+(?:\.\d+)?\s*(?:متر|سم|مم|كجم|طن|لتر|مل)' |
|
return re.findall(measurement_pattern, text) |
|
|
|
def _extract_standards(self, text): |
|
"""استخراج المعايير""" |
|
standards = [] |
|
|
|
common_standards = ['ISO', 'SASO', 'ASTM', 'BS', 'DIN', 'IEC'] |
|
for standard in common_standards: |
|
if standard in text: |
|
|
|
pattern = r'\b' + standard + r'\s*\d+\b' |
|
standard_matches = re.findall(pattern, text) |
|
standards.extend(standard_matches) |
|
return standards |
|
|
|
def _analyze_topics(self, text): |
|
"""تحليل المواضيع الرئيسية""" |
|
|
|
|
|
|
|
topics = {} |
|
|
|
|
|
common_topics = { |
|
"financial": ['سعر', 'تكلفة', 'ميزانية', 'دفع', 'مالي'], |
|
"technical": ['فني', 'مواصفات', 'معايير', 'تقني'], |
|
"legal": ['قانوني', 'شرط', 'بند', 'التزام', 'حق'], |
|
"administrative": ['إداري', 'إجراء', 'تنظيم', 'إشراف'], |
|
"time": ['مدة', 'فترة', 'موعد', 'تاريخ', 'جدول زمني'] |
|
} |
|
|
|
|
|
word_count = len(text.split()) |
|
|
|
for topic, keywords in common_topics.items(): |
|
topic_count = 0 |
|
for keyword in keywords: |
|
|
|
topic_count += len(re.findall(r'\b' + keyword + r'\w*\b', text)) |
|
|
|
|
|
if word_count > 0: |
|
topic_percentage = (topic_count / word_count) * 100 |
|
topics[topic] = round(topic_percentage, 2) |
|
else: |
|
topics[topic] = 0 |
|
|
|
return topics |
|
|
|
def _check_required_terms(self, text): |
|
"""التحقق من وجود المصطلحات المطلوبة""" |
|
required_terms = { |
|
"general": ['نطاق العمل', 'مدة التنفيذ', 'الشروط العامة'], |
|
"financial": ['قيمة العقد', 'طريقة الدفع', 'الضمان المالي'], |
|
"legal": ['حل النزاعات', 'الإنهاء', 'التعويضات'], |
|
"technical": ['المواصفات الفنية', 'ضمان الجودة', 'معايير القبول'] |
|
} |
|
|
|
found_terms = {} |
|
|
|
for category, terms in required_terms.items(): |
|
found_in_category = [] |
|
for term in terms: |
|
if term in text: |
|
found_in_category.append(term) |
|
|
|
found_terms[category] = found_in_category |
|
|
|
return found_terms |
|
|
|
def _calculate_compliance_score(self, text): |
|
"""حساب درجة الامتثال""" |
|
|
|
missing_sections = self._check_missing_sections(text) |
|
required_terms = self._check_required_terms(text) |
|
|
|
|
|
total_required_terms = sum(len(terms) for terms in required_terms.values()) |
|
found_terms = sum(len(found) for found in required_terms.values()) |
|
|
|
if total_required_terms > 0: |
|
compliance_percentage = (found_terms / total_required_terms) * 100 |
|
|
|
|
|
compliance_percentage -= len(missing_sections) * 5 |
|
|
|
|
|
compliance_percentage = max(0, min(100, compliance_percentage)) |
|
|
|
return round(compliance_percentage, 1) |
|
else: |
|
return 0 |
|
|
|
def _get_version_info(self, document_path): |
|
"""الحصول على معلومات الإصدار""" |
|
|
|
version_info = { |
|
"filename": os.path.basename(document_path), |
|
"last_modified": time.ctime(os.path.getmtime(document_path)) |
|
} |
|
|
|
|
|
match = re.search(r'[vV](\d+(?:\.\d+)*)', os.path.basename(document_path)) |
|
if match: |
|
version_info["version_number"] = match.group(1) |
|
else: |
|
version_info["version_number"] = "غير محدد" |
|
|
|
return version_info |
|
|
|
def _analyze_docx(self, document_path, document_type): |
|
"""تحليل مستند Word""" |
|
try: |
|
|
|
text = self._extract_text_from_docx(document_path) |
|
|
|
|
|
analysis = self._analyze_pdf(document_path, document_type) |
|
|
|
|
|
analysis["file_info"]["type"] = "DOCX" |
|
|
|
return analysis |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل مستند Word: {str(e)}") |
|
raise |
|
|
|
def _extract_text_from_docx(self, document_path): |
|
"""استخراج النص من ملف Word""" |
|
try: |
|
import docx |
|
|
|
doc = docx.Document(document_path) |
|
text = "" |
|
|
|
for paragraph in doc.paragraphs: |
|
text += paragraph.text + "\n" |
|
|
|
for table in doc.tables: |
|
for row in table.rows: |
|
for cell in row.cells: |
|
text += cell.text + " " |
|
text += "\n" |
|
|
|
return text |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج النص من Word: {str(e)}") |
|
raise |
|
|
|
def _analyze_xlsx(self, document_path, document_type): |
|
"""تحليل مستند Excel""" |
|
try: |
|
|
|
data = self._extract_data_from_xlsx(document_path) |
|
|
|
|
|
analysis = { |
|
"document_path": document_path, |
|
"document_type": document_type, |
|
"analysis_start_time": self.analysis_results["analysis_start_time"], |
|
"status": "جاري التحليل", |
|
"file_info": { |
|
"name": os.path.basename(document_path), |
|
"type": "XLSX", |
|
"size": os.path.getsize(document_path), |
|
"sheets": self._count_sheets(document_path), |
|
"create_date": "غير متوفر", |
|
"modify_date": time.ctime(os.path.getmtime(document_path)) |
|
}, |
|
"data_analysis": { |
|
"sheet_summary": data["sheet_summary"], |
|
"total_rows": data["total_rows"], |
|
"total_columns": data["total_columns"], |
|
"numeric_columns": data["numeric_columns"], |
|
"text_columns": data["text_columns"], |
|
"date_columns": data["date_columns"] |
|
} |
|
} |
|
|
|
|
|
if document_type == "tender": |
|
analysis["tender_analysis"] = self._analyze_excel_tender(data) |
|
elif document_type == "financial": |
|
analysis["financial_analysis"] = self._analyze_excel_financial(data) |
|
|
|
return analysis |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل مستند Excel: {str(e)}") |
|
raise |
|
|
|
def _extract_data_from_xlsx(self, document_path): |
|
"""استخراج البيانات من ملف Excel""" |
|
try: |
|
import pandas as pd |
|
|
|
|
|
excel_file = pd.ExcelFile(document_path) |
|
sheet_names = excel_file.sheet_names |
|
|
|
data = { |
|
"sheet_summary": {}, |
|
"total_rows": 0, |
|
"total_columns": 0, |
|
"numeric_columns": 0, |
|
"text_columns": 0, |
|
"date_columns": 0, |
|
"sheets": {} |
|
} |
|
|
|
for sheet_name in sheet_names: |
|
|
|
df = pd.read_excel(excel_file, sheet_name=sheet_name) |
|
|
|
|
|
column_types = {} |
|
numeric_columns = 0 |
|
text_columns = 0 |
|
date_columns = 0 |
|
|
|
for column in df.columns: |
|
if pd.api.types.is_numeric_dtype(df[column]): |
|
column_types[column] = "numeric" |
|
numeric_columns += 1 |
|
elif pd.api.types.is_datetime64_dtype(df[column]): |
|
column_types[column] = "date" |
|
date_columns += 1 |
|
else: |
|
column_types[column] = "text" |
|
text_columns += 1 |
|
|
|
|
|
data["sheet_summary"][sheet_name] = { |
|
"rows": len(df), |
|
"columns": len(df.columns), |
|
"column_types": column_types |
|
} |
|
|
|
|
|
data["total_rows"] += len(df) |
|
data["total_columns"] += len(df.columns) |
|
data["numeric_columns"] += numeric_columns |
|
data["text_columns"] += text_columns |
|
data["date_columns"] += date_columns |
|
|
|
|
|
max_rows = 100 |
|
data["sheets"][sheet_name] = df.head(max_rows).to_dict(orient="records") |
|
|
|
return data |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استخراج البيانات من Excel: {str(e)}") |
|
raise |
|
|
|
def _count_sheets(self, document_path): |
|
"""حساب عدد الأوراق في ملف Excel""" |
|
try: |
|
import pandas as pd |
|
|
|
excel_file = pd.ExcelFile(document_path) |
|
return len(excel_file.sheet_names) |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في حساب عدد الأوراق: {str(e)}") |
|
return 0 |
|
|
|
def _analyze_excel_tender(self, data): |
|
"""تحليل بيانات المناقصة من ملف Excel""" |
|
|
|
analysis = { |
|
"items": self._extract_tender_items(data), |
|
"quantities": self._extract_tender_quantities(data), |
|
"pricing": self._extract_tender_pricing(data) |
|
} |
|
|
|
return analysis |
|
|
|
def _extract_tender_items(self, data): |
|
"""استخراج البنود من بيانات المناقصة""" |
|
items = [] |
|
|
|
|
|
for sheet_name, sheet_data in data["sheets"].items(): |
|
if not sheet_data: |
|
continue |
|
|
|
|
|
possible_item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"] |
|
|
|
for row in sheet_data: |
|
item_found = False |
|
|
|
|
|
for column in possible_item_columns: |
|
if column in row and row[column]: |
|
|
|
quantity = None |
|
unit = None |
|
|
|
for qty_col in ["الكمية", "العدد", "quantity", "qty"]: |
|
if qty_col in row and row[qty_col]: |
|
quantity = row[qty_col] |
|
break |
|
|
|
for unit_col in ["الوحدة", "unit", "uom"]: |
|
if unit_col in row and row[unit_col]: |
|
unit = row[unit_col] |
|
break |
|
|
|
|
|
items.append({ |
|
"name": row[column], |
|
"quantity": quantity, |
|
"unit": unit |
|
}) |
|
|
|
item_found = True |
|
break |
|
|
|
if item_found: |
|
break |
|
|
|
return items |
|
|
|
def _extract_tender_quantities(self, data): |
|
"""استخراج الكميات من بيانات المناقصة""" |
|
quantities = {} |
|
|
|
|
|
for sheet_name, sheet_data in data["sheets"].items(): |
|
if not sheet_data: |
|
continue |
|
|
|
|
|
quantity_columns = ["الكمية", "العدد", "quantity", "qty"] |
|
item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"] |
|
|
|
for row in sheet_data: |
|
item_name = None |
|
quantity = None |
|
|
|
|
|
for col in item_columns: |
|
if col in row and row[col]: |
|
item_name = row[col] |
|
break |
|
|
|
|
|
for col in quantity_columns: |
|
if col in row and row[col]: |
|
quantity = row[col] |
|
break |
|
|
|
|
|
if item_name and quantity: |
|
quantities[item_name] = quantity |
|
|
|
return quantities |
|
|
|
def _extract_tender_pricing(self, data): |
|
"""استخراج الأسعار من بيانات المناقصة""" |
|
pricing = {} |
|
|
|
|
|
for sheet_name, sheet_data in data["sheets"].items(): |
|
if not sheet_data: |
|
continue |
|
|
|
|
|
price_columns = ["السعر", "التكلفة", "المبلغ", "price", "cost", "amount"] |
|
item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"] |
|
|
|
for row in sheet_data: |
|
item_name = None |
|
price = None |
|
|
|
|
|
for col in item_columns: |
|
if col in row and row[col]: |
|
item_name = row[col] |
|
break |
|
|
|
|
|
for col in price_columns: |
|
if col in row and row[col]: |
|
price = row[col] |
|
break |
|
|
|
|
|
if item_name and price: |
|
pricing[item_name] = price |
|
|
|
return pricing |
|
|
|
def _analyze_excel_financial(self, data): |
|
"""تحليل البيانات المالية من ملف Excel""" |
|
|
|
analysis = { |
|
"total_amount": self._calculate_total_amount(data), |
|
"budget_breakdown": self._extract_budget_breakdown(data), |
|
"payment_schedule": self._extract_payment_schedule(data) |
|
} |
|
|
|
return analysis |
|
|
|
def _calculate_total_amount(self, data): |
|
"""حساب المبلغ الإجمالي من البيانات المالية""" |
|
total = 0 |
|
|
|
|
|
for sheet_name, sheet_data in data["sheets"].items(): |
|
if not sheet_data: |
|
continue |
|
|
|
|
|
amount_columns = ["المبلغ", "الإجمالي", "المجموع", "amount", "total", "sum"] |
|
|
|
for row in sheet_data: |
|
for col in amount_columns: |
|
if col in row and row[col] and isinstance(row[col], (int, float)): |
|
total += row[col] |
|
|
|
return total |
|
|
|
def _extract_budget_breakdown(self, data): |
|
"""استخراج تفاصيل الميزانية من البيانات المالية""" |
|
breakdown = {} |
|
|
|
|
|
for sheet_name, sheet_data in data["sheets"].items(): |
|
if not sheet_data: |
|
continue |
|
|
|
|
|
category_columns = ["البند", "الفئة", "القسم", "category", "item"] |
|
amount_columns = ["المبلغ", "التكلفة", "القيمة", "amount", "cost", "value"] |
|
|
|
for row in sheet_data: |
|
category = None |
|
amount = None |
|
|
|
|
|
for col in category_columns: |
|
if col in row and row[col]: |
|
category = row[col] |
|
break |
|
|
|
|
|
for col in amount_columns: |
|
if col in row and row[col] and isinstance(row[col], (int, float)): |
|
amount = row[col] |
|
break |
|
|
|
|
|
if category and amount: |
|
breakdown[category] = amount |
|
|
|
return breakdown |
|
|
|
def _extract_payment_schedule(self, data): |
|
"""استخراج جدول الدفعات من البيانات المالية""" |
|
schedule = [] |
|
|
|
|
|
for sheet_name, sheet_data in data["sheets"].items(): |
|
if not sheet_data: |
|
continue |
|
|
|
|
|
date_columns = ["التاريخ", "الموعد", "date", "schedule"] |
|
amount_columns = ["المبلغ", "الدفعة", "القيمة", "amount", "payment", "value"] |
|
description_columns = ["الوصف", "البيان", "description", "details"] |
|
|
|
for row in sheet_data: |
|
date = None |
|
amount = None |
|
description = None |
|
|
|
|
|
for col in date_columns: |
|
if col in row and row[col]: |
|
date = row[col] |
|
break |
|
|
|
|
|
for col in amount_columns: |
|
if col in row and row[col]: |
|
amount = row[col] |
|
break |
|
|
|
|
|
for col in description_columns: |
|
if col in row and row[col]: |
|
description = row[col] |
|
break |
|
|
|
|
|
if date and amount: |
|
schedule.append({ |
|
"date": date, |
|
"amount": amount, |
|
"description": description |
|
}) |
|
|
|
return schedule |
|
|
|
def _analyze_txt(self, document_path, document_type): |
|
"""تحليل مستند نصي""" |
|
try: |
|
|
|
with open(document_path, 'r', encoding='utf-8') as file: |
|
text = file.read() |
|
|
|
|
|
analysis = self._analyze_pdf(document_path, document_type) |
|
|
|
|
|
analysis["file_info"]["type"] = "TXT" |
|
analysis["file_info"]["pages"] = self._estimate_pages(text) |
|
|
|
return analysis |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تحليل مستند نصي: {str(e)}") |
|
raise |
|
|
|
def _estimate_pages(self, text): |
|
"""تقدير عدد الصفحات في النص""" |
|
|
|
return max(1, len(text) // 3000) |
|
|
|
def get_analysis_status(self): |
|
"""الحصول على حالة التحليل الحالي""" |
|
if not self.analysis_in_progress: |
|
if not self.analysis_results: |
|
return {"status": "لا يوجد تحليل جارٍ"} |
|
else: |
|
return {"status": self.analysis_results.get("status", "غير معروف")} |
|
|
|
return { |
|
"status": "جاري التحليل", |
|
"document_path": self.current_document, |
|
"start_time": self.analysis_results.get("analysis_start_time") |
|
} |
|
|
|
def get_analysis_results(self): |
|
"""الحصول على نتائج التحليل""" |
|
return self.analysis_results |
|
|
|
def export_analysis_results(self, output_path=None): |
|
"""تصدير نتائج التحليل إلى ملف JSON""" |
|
if not self.analysis_results: |
|
logger.warning("لا توجد نتائج تحليل للتصدير") |
|
return None |
|
|
|
if not output_path: |
|
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') |
|
filename = f"analysis_results_{timestamp}.json" |
|
output_path = os.path.join(self.documents_path, filename) |
|
|
|
try: |
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
json.dump(self.analysis_results, f, ensure_ascii=False, indent=4) |
|
|
|
logger.info(f"تم تصدير نتائج التحليل إلى: {output_path}") |
|
return output_path |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في تصدير نتائج التحليل: {str(e)}") |
|
return None |
|
|
|
def import_analysis_results(self, input_path): |
|
"""استيراد نتائج التحليل من ملف JSON""" |
|
if not os.path.exists(input_path): |
|
logger.error(f"ملف نتائج التحليل غير موجود: {input_path}") |
|
return False |
|
|
|
try: |
|
with open(input_path, 'r', encoding='utf-8') as f: |
|
self.analysis_results = json.load(f) |
|
|
|
logger.info(f"تم استيراد نتائج التحليل من: {input_path}") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"خطأ في استيراد نتائج التحليل: {str(e)}") |
|
return False |
|
|
|
def _count_pages(self, document_path): |
|
"""حساب عدد صفحات المستند""" |
|
try: |
|
import PyPDF2 |
|
with open(document_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
return len(reader.pages) |
|
except: |
|
return 0 |
|
|
|
def _get_creation_date(self, document_path): |
|
"""استخراج تاريخ إنشاء المستند""" |
|
try: |
|
import PyPDF2 |
|
with open(document_path, 'rb') as file: |
|
reader = PyPDF2.PdfReader(file) |
|
if '/CreationDate' in reader.metadata: |
|
return reader.metadata['/CreationDate'] |
|
return "غير متوفر" |
|
except: |
|
return "غير متوفر" |
|
|
|
def _analyze_technical_specs(self, text): |
|
"""تحليل المواصفات الفنية""" |
|
specs = { |
|
"materials": self._extract_materials(text), |
|
"measurements": self._extract_measurements(text), |
|
"standards": self._extract_standards(text) |
|
} |
|
return specs |
|
|
|
def _extract_key_dates(self, text): |
|
"""استخراج التواريخ المهمة""" |
|
import re |
|
date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}' |
|
dates = re.findall(date_pattern, text) |
|
return list(set(dates)) |
|
|
|
def _extract_figures(self, text): |
|
"""استخراج الأرقام والقيم المهمة""" |
|
import re |
|
|
|
currency_pattern = r'[\d,]+\.?\d*\s*(?:ريال|دولار|SAR|USD)' |
|
currencies = re.findall(currency_pattern, text) |
|
|
|
|
|
percentage_pattern = r'\d+\.?\d*\s*%' |
|
percentages = re.findall(percentage_pattern, text) |
|
|
|
return { |
|
"currencies": currencies, |
|
"percentages": percentages |
|
} |
|
|
|
def _analyze_unique_terms(self, text): |
|
"""تحليل المصطلحات الفريدة""" |
|
words = set(text.split()) |
|
return list(words) |
|
|
|
def _calculate_complexity(self, text): |
|
"""حساب مستوى تعقيد النص""" |
|
words = text.split() |
|
if not words: |
|
return 0 |
|
|
|
avg_word_length = sum(len(word) for word in words) / len(words) |
|
sentences = text.split('.') |
|
if not sentences: |
|
return 0 |
|
|
|
avg_sentence_length = len(words) / len(sentences) |
|
|
|
|
|
complexity = min((avg_word_length * 0.5 + avg_sentence_length * 0.2), 10) |
|
return round(complexity, 2) |
|
|
|
def _check_missing_sections(self, text): |
|
"""التحقق من الأقسام المفقودة""" |
|
required_sections = [ |
|
"نطاق العمل", |
|
"المواصفات الفنية", |
|
"الشروط العامة", |
|
"الضمانات", |
|
"الغرامات", |
|
"شروط الدفع" |
|
] |
|
|
|
missing = [] |
|
for section in required_sections: |
|
if section not in text: |
|
missing.append(section) |
|
|
|
return missing |
|
|
|
def _find_related_documents(self, document_path): |
|
"""البحث عن المستندات المرتبطة""" |
|
directory = os.path.dirname(document_path) |
|
base_name = os.path.basename(document_path) |
|
related = [] |
|
|
|
for file in os.listdir(directory): |
|
if file != base_name and file.startswith(base_name.split('_')[0]): |
|
related.append(file) |
|
|
|
return related |
|
|
|
def process_image(self, image_path): |
|
"""معالجة وضغط الصورة""" |
|
try: |
|
|
|
with Image.open(image_path) as img: |
|
|
|
if img.mode == 'RGBA': |
|
img = img.convert('RGB') |
|
|
|
|
|
quality = 95 |
|
max_size = (1200, 1200) |
|
|
|
while True: |
|
img.thumbnail(max_size, Image.Resampling.LANCZOS) |
|
buffer = io.BytesIO() |
|
img.save(buffer, format='JPEG', quality=quality, optimize=True) |
|
size = len(buffer.getvalue()) |
|
|
|
|
|
if size <= 5000000: |
|
break |
|
|
|
|
|
quality = max(quality - 10, 20) |
|
max_size = (int(max_size[0] * 0.8), int(max_size[1] * 0.8)) |
|
|
|
|
|
if quality == 20 and max_size[0] < 400: |
|
raise ValueError("لا يمكن ضغط الصورة للحجم المطلوب") |
|
|
|
|
|
return base64.b64encode(buffer.getvalue()).decode('utf-8') |
|
except Exception as e: |
|
logger.error(f"خطأ في معالجة الصورة: {str(e)}") |
|
raise |
|
|
|
def convert_pdf_to_images(self, pdf_path): |
|
"""تحويل PDF إلى صور""" |
|
try: |
|
from pdf2image import convert_from_path |
|
images = convert_from_path(pdf_path) |
|
return images |
|
except Exception as e: |
|
logger.error(f"فشل في تحويل ملف PDF إلى صورة: {str(e)}") |
|
raise |