""" وحدة تحليل المستندات لنظام إدارة المناقصات - Hybrid Face """ import os import re import logging import threading from pathlib import Path import datetime import json import base64 import time from PIL import Image import io # تهيئة السجل logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('document_analysis') class DocumentAnalyzer: """فئة تحليل المستندات""" def __init__(self, config=None): """تهيئة محلل المستندات""" self.config = config self.analysis_in_progress = False self.current_document = None self.analysis_results = {} # إنشاء مجلد المستندات إذا لم يكن موجوداً if config and hasattr(config, 'DOCUMENTS_PATH'): self.documents_path = Path(config.DOCUMENTS_PATH) else: self.documents_path = Path('data/documents') if not self.documents_path.exists(): self.documents_path.mkdir(parents=True, exist_ok=True) def analyze_document(self, document_path, document_type="tender", callback=None): """تحليل مستند""" if self.analysis_in_progress: logger.warning("هناك عملية تحليل جارية بالفعل") return False if not os.path.exists(document_path): logger.error(f"المستند غير موجود: {document_path}") return False self.analysis_in_progress = True self.current_document = document_path self.analysis_results = { "document_path": document_path, "document_type": document_type, "analysis_start_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "status": "جاري التحليل", "items": [], "entities": [], "dates": [], "amounts": [], "risks": [] } # بدء التحليل في خيط منفصل thread = threading.Thread( target=self._analyze_document_thread, args=(document_path, document_type, callback) ) thread.daemon = True thread.start() return True def _analyze_document_thread(self, document_path, document_type, callback): """خيط تحليل المستند""" try: # تحديد نوع المستند file_extension = os.path.splitext(document_path)[1].lower() if file_extension == '.pdf': self.analysis_results = self._analyze_pdf(document_path, document_type) elif file_extension == '.docx': self._analyze_docx(document_path, document_type) elif file_extension == '.xlsx': self._analyze_xlsx(document_path, document_type) elif file_extension == '.txt': self._analyze_txt(document_path, document_type) else: logger.error(f"نوع المستند غير مدعوم: {file_extension}") self.analysis_results["status"] = "فشل التحليل" self.analysis_results["error"] = "نوع المستند غير مدعوم" # تحديث حالة التحليل if self.analysis_results["status"] != "فشل التحليل": self.analysis_results["status"] = "اكتمل التحليل" self.analysis_results["analysis_end_time"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') logger.info(f"اكتمل تحليل المستند: {document_path}") except Exception as e: logger.error(f"خطأ في تحليل المستند: {str(e)}") self.analysis_results["status"] = "فشل التحليل" self.analysis_results["error"] = str(e) finally: self.analysis_in_progress = False # استدعاء دالة الاستجابة إذا تم توفيرها if callback and callable(callback): callback(self.analysis_results) def _analyze_pdf(self, document_path, document_type): """تحليل مستند PDF باستخدام الذكاء الاصطناعي""" try: # استخراج النص من PDF text = self._extract_text_from_pdf(document_path) # تحليل متقدم للمستند analysis = { "document_path": document_path, "document_type": document_type, "analysis_start_time": self.analysis_results["analysis_start_time"], "status": "جاري التحليل", "file_info": { "name": os.path.basename(document_path), "type": "PDF", "size": os.path.getsize(document_path), "pages": self._count_pages(document_path), "create_date": self._get_creation_date(document_path), "modify_date": time.ctime(os.path.getmtime(document_path)) }, "content_analysis": { "contract_terms": self._analyze_contract_terms(text), "financial_analysis": self._analyze_financial_terms(text), "legal_analysis": self._analyze_legal_terms(text), "risk_analysis": self._analyze_risks(text), "conditions_analysis": self._analyze_conditions(text), "technical_specs": self._analyze_technical_specs(text), "key_dates": self._extract_key_dates(text), "important_figures": self._extract_figures(text), "entities": self._extract_entities(text) }, "statistical_analysis": { "word_count": len(text.split()), "unique_terms": self._analyze_unique_terms(text), "topic_distribution": self._analyze_topics(text), "complexity_score": self._calculate_complexity(text) }, "compliance_check": { "missing_sections": self._check_missing_sections(text), "required_terms": self._check_required_terms(text), "compliance_score": self._calculate_compliance_score(text) }, "summary": self._generate_summary(text), "recommendations": self._generate_recommendations(text), "related_documents": self._find_related_documents(document_path), "version_info": self._get_version_info(document_path) } # إضافة تحليل متخصص حسب نوع المستند if document_type == "tender": analysis["tender_analysis"] = self._analyze_tender_specifics(text) elif document_type == "contract": analysis["contract_analysis"] = self._analyze_contract_specifics(text) elif document_type == "technical": analysis["technical_analysis"] = self._analyze_technical_specifics(text) return analysis except Exception as e: logger.error(f"خطأ في تحليل PDF: {str(e)}") raise def extract_document_metadata(self, document_path): """استخراج البيانات الوصفية للمستند""" try: # تحديد نوع المستند file_extension = os.path.splitext(document_path)[1].lower() metadata = { "filename": os.path.basename(document_path), "file_type": file_extension.replace('.', '').upper(), "file_size": os.path.getsize(document_path), "creation_date": "غير متوفر", "modification_date": time.ctime(os.path.getmtime(document_path)), "author": "غير متوفر", "title": "غير متوفر" } # استخراج البيانات الوصفية حسب نوع المستند if file_extension == '.pdf': pdf_metadata = self._extract_pdf_metadata(document_path) metadata.update(pdf_metadata) elif file_extension == '.docx': docx_metadata = self._extract_docx_metadata(document_path) metadata.update(docx_metadata) elif file_extension == '.xlsx': xlsx_metadata = self._extract_xlsx_metadata(document_path) metadata.update(xlsx_metadata) return metadata except Exception as e: logger.error(f"خطأ في استخراج البيانات الوصفية: {str(e)}") return None def _extract_pdf_metadata(self, document_path): """استخراج البيانات الوصفية من ملف PDF""" try: import PyPDF2 metadata = {} with open(document_path, 'rb') as file: reader = PyPDF2.PdfReader(file) # استخراج البيانات الوصفية المتاحة if reader.metadata: if '/Title' in reader.metadata: metadata["title"] = reader.metadata['/Title'] if '/Author' in reader.metadata: metadata["author"] = reader.metadata['/Author'] if '/CreationDate' in reader.metadata: metadata["creation_date"] = reader.metadata['/CreationDate'] if '/ModDate' in reader.metadata: metadata["modification_date"] = reader.metadata['/ModDate'] if '/Producer' in reader.metadata: metadata["producer"] = reader.metadata['/Producer'] if '/Creator' in reader.metadata: metadata["creator"] = reader.metadata['/Creator'] # إضافة عدد الصفحات metadata["pages"] = len(reader.pages) return metadata except Exception as e: logger.error(f"خطأ في استخراج البيانات الوصفية من PDF: {str(e)}") return {} def compare_documents(self, document_path1, document_path2): """مقارنة مستندين""" try: # تحليل المستندين self.analyze_document(document_path1) analysis1 = self.get_analysis_results() self.analyze_document(document_path2) analysis2 = self.get_analysis_results() # مقارنة نتائج التحليل comparison = { "document1": { "path": document_path1, "file_info": analysis1.get("file_info", {}) }, "document2": { "path": document_path2, "file_info": analysis2.get("file_info", {}) }, "differences": self._find_document_differences(analysis1, analysis2), "similarity_score": self._calculate_similarity_score(analysis1, analysis2) } return comparison except Exception as e: logger.error(f"خطأ في مقارنة المستندات: {str(e)}") return None def _find_document_differences(self, analysis1, analysis2): """العثور على الاختلافات بين تحليلين""" differences = {} # مقارنة البنود if "items" in analysis1 and "items" in analysis2: items1 = set(item["name"] for item in analysis1["items"] if "name" in item) items2 = set(item["name"] for item in analysis2["items"] if "name" in item) differences["items"] = { "only_in_doc1": list(items1 - items2), "only_in_doc2": list(items2 - items1), "common": list(items1.intersection(items2)) } # مقارنة الكيانات if "entities" in analysis1 and "entities" in analysis2: entities1 = set(entity for entity in analysis1["entities"]) entities2 = set(entity for entity in analysis2["entities"]) differences["entities"] = { "only_in_doc1": list(entities1 - entities2), "only_in_doc2": list(entities2 - entities1), "common": list(entities1.intersection(entities2)) } # مقارنة التواريخ if "dates" in analysis1 and "dates" in analysis2: dates1 = set(date for date in analysis1["dates"]) dates2 = set(date for date in analysis2["dates"]) differences["dates"] = { "only_in_doc1": list(dates1 - dates2), "only_in_doc2": list(dates2 - dates1), "common": list(dates1.intersection(dates2)) } # مقارنة المبالغ if "amounts" in analysis1 and "amounts" in analysis2: amounts1 = set(amount["amount"] for amount in analysis1["amounts"] if "amount" in amount) amounts2 = set(amount["amount"] for amount in analysis2["amounts"] if "amount" in amount) differences["amounts"] = { "only_in_doc1": list(amounts1 - amounts2), "only_in_doc2": list(amounts2 - amounts1), "common": list(amounts1.intersection(amounts2)) } return differences def _calculate_similarity_score(self, analysis1, analysis2): """حساب درجة التشابه بين تحليلين""" # محاكاة بسيطة لحساب درجة التشابه similarity_score = 0 total_factors = 0 # التشابه في البنود if "items" in analysis1 and "items" in analysis2: items1 = set(item["name"] for item in analysis1["items"] if "name" in item) items2 = set(item["name"] for item in analysis2["items"] if "name" in item) if items1 or items2: # تجنب القسمة على صفر similarity_score += len(items1.intersection(items2)) / max(len(items1.union(items2)), 1) total_factors += 1 # التشابه في الكيانات if "entities" in analysis1 and "entities" in analysis2: entities1 = set(entity for entity in analysis1["entities"]) entities2 = set(entity for entity in analysis2["entities"]) if entities1 or entities2: similarity_score += len(entities1.intersection(entities2)) / max(len(entities1.union(entities2)), 1) total_factors += 1 # التشابه في التواريخ if "dates" in analysis1 and "dates" in analysis2: dates1 = set(date for date in analysis1["dates"]) dates2 = set(date for date in analysis2["dates"]) if dates1 or dates2: similarity_score += len(dates1.intersection(dates2)) / max(len(dates1.union(dates2)), 1) total_factors += 1 # التشابه في المبالغ if "amounts" in analysis1 and "amounts" in analysis2: amounts1 = set(amount["amount"] for amount in analysis1["amounts"] if "amount" in amount) amounts2 = set(amount["amount"] for amount in analysis2["amounts"] if "amount" in amount) if amounts1 or amounts2: similarity_score += len(amounts1.intersection(amounts2)) / max(len(amounts1.union(amounts2)), 1) total_factors += 1 # حساب المتوسط if total_factors > 0: similarity_percentage = (similarity_score / total_factors) * 100 return round(similarity_percentage, 2) else: return 0.0 def generate_report(self, analysis_results=None, report_format="html"): """توليد تقرير من نتائج التحليل""" try: # استخدام نتائج التحليل الحالية إذا لم يتم توفير نتائج if analysis_results is None: analysis_results = self.analysis_results if not analysis_results: logger.warning("لا توجد نتائج تحليل لتوليد تقرير") return None # توليد التقرير حسب الصيغة المطلوبة if report_format.lower() == "html": return self._generate_html_report(analysis_results) elif report_format.lower() == "pdf": return self._ def _extract_docx_metadata(self, document_path): """استخراج البيانات الوصفية من ملف Word""" try: import docx metadata = {} doc = docx.Document(document_path) # استخراج البيانات الوصفية المتاحة core_properties = doc.core_properties if core_properties.title: metadata["title"] = core_properties.title if core_properties.author: metadata["author"] = core_properties.author if core_properties.created: metadata["creation_date"] = str(core_properties.created) if core_properties.modified: metadata["modification_date"] = str(core_properties.modified) if core_properties.last_modified_by: metadata["last_modified_by"] = core_properties.last_modified_by if core_properties.revision: metadata["revision"] = core_properties.revision # إضافة عدد الصفحات (تقريبي) text_length = sum(len(paragraph.text) for paragraph in doc.paragraphs) estimated_pages = max(1, text_length // 3000) metadata["pages"] = estimated_pages return metadata except Exception as e: logger.error(f"خطأ في استخراج البيانات الوصفية من Word: {str(e)}") return {} def _extract_xlsx_metadata(self, document_path): """استخراج البيانات الوصفية من ملف Excel""" try: import openpyxl metadata = {} workbook = openpyxl.load_workbook(document_path, read_only=True) # استخراج البيانات الوصفية المتاحة if workbook.properties: if workbook.properties.title: metadata["title"] = workbook.properties.title if workbook.properties.creator: metadata["author"] = workbook.properties.creator if workbook.properties.created: metadata["creation_date"] = str(workbook.properties.created) if workbook.properties.modified: metadata["modification_date"] = str(workbook.properties.modified) if workbook.properties.lastModifiedBy: metadata["last_modified_by"] = workbook.properties.lastModifiedBy if workbook.properties.revision: metadata["revision"] = workbook.properties.revision # إضافة عدد الأوراق metadata["sheets"] = len(workbook.sheetnames) metadata["sheet_names"] = workbook.sheetnames return metadata except Exception as e: logger.error(f"خطأ في استخراج البيانات الوصفية من Excel: {str(e)}") return {} def _extract_text_from_pdf(self, document_path): """استخراج النص من ملف PDF""" try: import PyPDF2 text = "" with open(document_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in reader.pages: text += page.extract_text() + "\n" return text except Exception as e: logger.error(f"خطأ في استخراج النص من PDF: {str(e)}") raise def _analyze_contract_terms(self, text): """تحليل بنود العقد""" terms = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['شروط', 'بند', 'يلتزم', 'يجب']): terms.append(section.strip()) return terms def _analyze_financial_terms(self, text): """تحليل الجزء المالي""" financial_terms = [] # البحث عن الأقسام المالية sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مالي', 'تكلفة', 'سعر', 'ميزانية', 'دفع']): financial_terms.append(section.strip()) # استخراج المبالغ المالية amounts = self._extract_monetary_amounts(text) return { "sections": financial_terms, "amounts": amounts, "payment_terms": self._extract_payment_terms(text), "budget_allocation": self._extract_budget_allocation(text) } def _extract_monetary_amounts(self, text): """استخراج المبالغ المالية من النص""" import re # نمط للبحث عن المبالغ المالية بالريال السعودي والدولار الأمريكي pattern = r'(\d{1,3}(?:,\d{3})*(?:\.\d+)?)\s*(?:ريال|دولار|SAR|USD|ر\.س|\$)' matches = re.findall(pattern, text) return [float(amount.replace(',', '')) for amount in matches] def _extract_payment_terms(self, text): """استخراج شروط الدفع""" payment_terms = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['دفع', 'سداد', 'أقساط', 'مستحقات']): payment_terms.append(section.strip()) return payment_terms def _extract_budget_allocation(self, text): """استخراج تخصيص الميزانية""" # هذه وظيفة بسيطة لاستخراج تخصيص الميزانية # في التطبيق الحقيقي، قد تحتاج إلى تحليل أكثر تعقيدًا budget_items = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['ميزانية', 'تخصيص', 'تمويل']): budget_items.append(section.strip()) return budget_items def _analyze_legal_terms(self, text): """تحليل القانوني للعقد""" legal_terms = [] # البحث عن الأقسام القانونية sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['قانون', 'تشريع', 'نظام', 'حكم', 'قضاء', 'محكمة']): legal_terms.append(section.strip()) return { "sections": legal_terms, "liability_clauses": self._extract_liability_clauses(text), "dispute_resolution": self._extract_dispute_resolution(text), "legal_references": self._extract_legal_references(text) } def _extract_liability_clauses(self, text): """استخراج بنود المسؤولية""" liability_clauses = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مسؤولية', 'التزام', 'ضمان', 'تعويض']): liability_clauses.append(section.strip()) return liability_clauses def _extract_dispute_resolution(self, text): """استخراج آلية فض النزاعات""" dispute_clauses = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['نزاع', 'خلاف', 'تحكيم', 'قضاء', 'تسوية']): dispute_clauses.append(section.strip()) return dispute_clauses def _extract_legal_references(self, text): """استخراج المراجع القانونية""" import re # نمط للبحث عن المراجع القانونية مثل أرقام القوانين واللوائح pattern = r'قانون رقم \d+|لائحة \d+|نظام \d+|مرسوم \d+' return re.findall(pattern, text) def _analyze_risks(self, text): """تحليل المخاطر""" risk_factors = [] # البحث عن الأقسام المتعلقة بالمخاطر sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مخاطر', 'خطر', 'تهديد', 'ضرر', 'إخلال']): risk_factors.append(section.strip()) # تصنيف المخاطر risk_categories = { "financial_risks": self._extract_financial_risks(text), "operational_risks": self._extract_operational_risks(text), "legal_risks": self._extract_legal_risks(text), "technical_risks": self._extract_technical_risks(text) } # تقييم شدة المخاطر risk_severity = self._assess_risk_severity(risk_factors) return { "risk_factors": risk_factors, "risk_categories": risk_categories, "risk_severity": risk_severity, "mitigation_suggestions": self._suggest_risk_mitigation(risk_factors) } def _extract_financial_risks(self, text): """استخراج المخاطر المالية""" financial_risks = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مخاطر مالية', 'خسارة', 'تكلفة إضافية', 'غرامة']): financial_risks.append(section.strip()) return financial_risks def _extract_operational_risks(self, text): """استخراج المخاطر التشغيلية""" operational_risks = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مخاطر تشغيلية', 'توقف', 'تعطل', 'تأخير']): operational_risks.append(section.strip()) return operational_risks def _extract_legal_risks(self, text): """استخراج المخاطر القانونية""" legal_risks = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مخاطر قانونية', 'نزاع', 'مخالفة', 'تقاضي']): legal_risks.append(section.strip()) return legal_risks def _extract_technical_risks(self, text): """استخراج المخاطر الفنية""" technical_risks = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مخاطر فنية', 'عطل', 'خلل', 'تقني']): technical_risks.append(section.strip()) return technical_risks def _assess_risk_severity(self, risk_factors): """تقييم شدة المخاطر""" # في التطبيق الحقيقي، ستحتاج إلى تحليل أكثر تعقيدًا # هذه مجرد محاكاة بسيطة severity_scores = [] for risk in risk_factors: # تقييم بسيط بناءً على طول النص والكلمات الرئيسية score = len(risk) / 100 # كلما كان النص أطول، كلما كانت المخاطر أكثر تفصيلاً # زيادة الدرجة بناءً على كلمات مفتاحية تدل على شدة الخطر severe_keywords = ['خطير', 'شديد', 'كبير', 'جسيم', 'عالي'] for keyword in severe_keywords: if keyword in risk.lower(): score += 1 severity_scores.append(min(score, 10)) # تحديد سقف للدرجة # متوسط درجة الشدة average_severity = sum(severity_scores) / len(severity_scores) if severity_scores else 0 # تصنيف المخاطر بناءً على متوسط الشدة if average_severity >= 7: return "عالية" elif average_severity >= 4: return "متوسطة" else: return "منخفضة" def _suggest_risk_mitigation(self, risk_factors): """اقتراح آليات تخفيف المخاطر""" mitigations = [] # في التطبيق الحقيقي، ستحتاج إلى محرك استدلال أكثر تعقيدًا # هذه مجرد اقتراحات عامة if any("مالي" in risk for risk in risk_factors): mitigations.append("ضمانات مالية وتأمين لتغطية المخاطر المالية") if any("تأخير" in risk for risk in risk_factors): mitigations.append("وضع جداول زمنية مرنة وخطط بديلة للطوارئ") if any("قانوني" in risk for risk in risk_factors): mitigations.append("مراجعة قانونية شاملة للعقد وبنوده") if any("فني" in risk for risk in risk_factors): mitigations.append("اختبارات فنية مسبقة وضمانات للأداء الفني") # إضافة توصيات عامة إذا لم يتم العثور على مخاطر محددة if not mitigations: mitigations = [ "وضع خطة إدارة مخاطر شاملة", "تحديد مسؤوليات الأطراف بوضوح", "وضع آليات للمتابعة والتقييم الدوري", "توفير ضمانات مالية وفنية كافية" ] return mitigations def _analyze_conditions(self, text): """دراسة كراسة الشروط""" conditions = [] # البحث عن الأقسام المتعلقة بالشروط sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['شروط', 'متطلبات', 'معايير', 'مواصفات']): conditions.append(section.strip()) # تصنيف الشروط categorized_conditions = { "general_conditions": self._extract_general_conditions(text), "technical_conditions": self._extract_technical_conditions(text), "administrative_conditions": self._extract_administrative_conditions(text), "financial_conditions": self._extract_financial_conditions(text) } # تقييم مدى اكتمال الشروط ووضوحها completeness_score = self._assess_conditions_completeness(conditions) clarity_score = self._assess_conditions_clarity(conditions) return { "conditions_list": conditions, "categorized_conditions": categorized_conditions, "completeness_score": completeness_score, "clarity_score": clarity_score, "improvement_suggestions": self._suggest_conditions_improvements(conditions) } def _extract_general_conditions(self, text): """استخراج الشروط العامة""" general_conditions = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['شروط عامة', 'أحكام عامة']): general_conditions.append(section.strip()) return general_conditions def _extract_technical_conditions(self, text): """استخراج الشروط الفنية""" technical_conditions = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['شروط فنية', 'مواصفات فنية', 'متطلبات فنية']): technical_conditions.append(section.strip()) return technical_conditions def _extract_administrative_conditions(self, text): """استخراج الشروط الإدارية""" admin_conditions = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['شروط إدارية', 'متطلبات إدارية']): admin_conditions.append(section.strip()) return admin_conditions def _extract_financial_conditions(self, text): """استخراج الشروط المالية""" financial_conditions = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['شروط مالية', 'متطلبات مالية']): financial_conditions.append(section.strip()) return financial_conditions def _assess_conditions_completeness(self, conditions): """تقييم اكتمال الشروط""" # تحقق من وجود جميع أنواع الشروط الرئيسية required_categories = ['عامة', 'فنية', 'إدارية', 'مالية'] coverage = 0 for category in required_categories: if any(category in condition.lower() for condition in conditions): coverage += 1 # حساب نسبة التغطية completeness_score = (coverage / len(required_categories)) * 10 return min(round(completeness_score, 1), 10) # تحديد سقف للدرجة def _assess_conditions_clarity(self, conditions): """تقييم وضوح الشروط""" # في التطبيق الحقيقي، ستحتاج إلى تحليل لغوي أكثر تعقيدًا # هذه مجرد محاكاة بسيطة clarity_scores = [] for condition in conditions: # تقييم بسيط بناءً على وضوح النص score = 10 # نبدأ بدرجة كاملة # تقليل الدرجة بناءً على كلمات غامضة ambiguous_terms = ['ربما', 'قد', 'يمكن', 'محتمل', 'حسب الاقتضاء', 'في بعض الحالات'] for term in ambiguous_terms: if term in condition.lower(): score -= 1 # تقليل الدرجة إذا كان النص طويلًا جدًا if len(condition) > 500: score -= 2 clarity_scores.append(max(score, 1)) # الحد الأدنى للدرجة هو 1 # متوسط درجة الوضوح average_clarity = sum(clarity_scores) / len(clarity_scores) if clarity_scores else 0 return round(average_clarity, 1) def _suggest_conditions_improvements(self, conditions): """اقتراح تحسينات للشروط""" suggestions = [] # اقتراحات عامة لتحسين الشروط if not any('عامة' in condition.lower() for condition in conditions): suggestions.append("إضافة قسم للشروط العامة يوضح نطاق العمل والمسؤوليات العامة") if not any('فنية' in condition.lower() for condition in conditions): suggestions.append("إضافة قسم للشروط الفنية يحدد المواصفات والمتطلبات الفنية بدقة") if not any('إدارية' in condition.lower() for condition in conditions): suggestions.append("إضافة قسم للشروط الإدارية يوضح الإجراءات والمتطلبات الإدارية") if not any('مالية' in condition.lower() for condition in conditions): suggestions.append("إضافة قسم للشروط المالية يحدد الالتزامات المالية وآليات الدفع") # اقتراحات للشروط الموجودة ambiguous_conditions = [] for condition in conditions: if any(term in condition.lower() for term in ['ربما', 'قد', 'يمكن', 'محتمل']): ambiguous_conditions.append(condition) if ambiguous_conditions: suggestions.append("توضيح الشروط الغامضة وتحديد المتطلبات بدقة أكبر") # إضافة توصيات عامة إذا لم يتم العثور على مشاكل محددة if not suggestions: suggestions = [ "تنظيم الشروط في أقسام منفصلة وواضحة", "استخدام لغة بسيطة ومباشرة في صياغة الشروط", "تحديد المعايير الكمية والنوعية بدقة", "تضمين آليات لحل النزاعات في حالة الاختلاف حول تفسير الشروط" ] return suggestions def _generate_summary(self, text): """توليد ملخص""" # في التطبيق الحقيقي، ستحتاج إلى استخدام تقنيات معالجة اللغة الطبيعية # لتلخيص النص بشكل ذكي. هذه مجرد محاكاة بسيطة. # استخراج الجمل المهمة من النص important_sentences = [] sentences = text.split('.') # البحث عن جمل مهمة بناءً على كلمات مفتاحية key_terms = ['شروط', 'بنود', 'التزامات', 'متطلبات', 'تكلفة', 'مدة', 'ضمان', 'غرامة'] for sentence in sentences: if any(term in sentence.lower() for term in key_terms): important_sentences.append(sentence.strip()) # اختيار عدد محدود من الجمل للملخص max_sentences = min(10, len(important_sentences)) summary_sentences = important_sentences[:max_sentences] # دمج الجمل في ملخص summary = '. '.join(summary_sentences) # إضافة خاتمة موجزة summary += f"\n\nيتكون المستند من {len(sentences)} جملة وتم تلخيصه في {len(summary_sentences)} جمل رئيسية." return summary def _generate_recommendations(self, text): """توليد التوصيات""" # في التطبيق الحقيقي، ستحتاج إلى تحليل أكثر تعقيدًا # هذه مجرد توصيات عامة بناءً على المحتوى recommendations = [] # توصيات بناءً على وجود أو غياب أقسام معينة if 'شروط' not in text.lower(): recommendations.append("إضافة قسم واضح للشروط العامة والخاصة") if 'مواصفات فنية' not in text.lower(): recommendations.append("توضيح المواصفات الفنية المطلوبة بشكل مفصل") if 'غرامات' not in text.lower(): recommendations.append("تحديد الغرامات والجزاءات بوضوح في حالة عدم الالتزام") if 'ضمان' not in text.lower(): recommendations.append("تضمين بنود الضمان والصيانة بشكل واضح") # توصيات بناءً على تحليل المخاطر risks = self._analyze_risks(text) if risks["risk_severity"] == "عالية": recommendations.append("مراجعة بنود العقد للتقليل من المخاطر العالية المحددة في التحليل") # توصيات بناءً على تحليل الشروط conditions = self._analyze_conditions(text) if conditions["clarity_score"] < 7: recommendations.append("تحسين صياغة الشروط لزيادة الوضوح وتقليل الغموض") # توصيات عامة general_recommendations = [ "مراجعة العقد من قبل مستشار قانوني متخصص", "التأكد من توافق البنود مع الأنظمة واللوائح الحالية", "تضمين آليات واضحة لحل النزاعات", "تحديد مسؤوليات كل طرف بشكل صريح", "وضع جداول زمنية واضحة للتنفيذ ومؤشرات للأداء" ] # دمج التوصيات recommendations.extend(general_recommendations) return recommendations def _analyze_tender_specifics(self, text): """تحليل خاص بالمناقصات""" return { "eligibility_criteria": self._extract_eligibility_criteria(text), "submission_requirements": self._extract_submission_requirements(text), "evaluation_criteria": self._extract_evaluation_criteria(text), "timeline": self._extract_tender_timeline(text) } def _extract_eligibility_criteria(self, text): """استخراج معايير الأهلية""" criteria = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['أهلية', 'شروط المشاركة', 'متطلبات التأهيل']): criteria.append(section.strip()) return criteria def _extract_submission_requirements(self, text): """استخراج متطلبات تقديم العروض""" requirements = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['تقديم العروض', 'متطلبات العرض', 'مستندات']): requirements.append(section.strip()) return requirements def _extract_evaluation_criteria(self, text): """استخراج معايير التقييم""" criteria = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['معايير التقييم', 'آلية التقييم', 'ترسية']): criteria.append(section.strip()) return criteria def _extract_tender_timeline(self, text): """استخراج الجدول الزمني للمناقصة""" import re timeline = {} # البحث عن تواريخ محددة مثل تاريخ الإعلان، تاريخ الإغلاق، إلخ. date_pattern = r'(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})' # تاريخ الإعلان announcement_match = re.search(r'تاريخ الإعلان\s*[:؛]\s*' + date_pattern, text) if announcement_match: timeline["announcement_date"] = announcement_match.group(1) # تاريخ بدء استلام العروض start_submission_match = re.search(r'بدء استلام العروض\s*[:؛]\s*' + date_pattern, text) if start_submission_match: timeline["submission_start_date"] = start_submission_match.group(1) # تاريخ إغلاق استلام العروض end_submission_match = re.search(r'إغلاق استلام العروض\s*[:؛]\s*' + date_pattern, text) if end_submission_match: timeline["submission_end_date"] = end_submission_match.group(1) # تاريخ فتح المظاريف opening_match = re.search(r'فتح المظاريف\s*[:؛]\s*' + date_pattern, text) if opening_match: timeline["opening_date"] = opening_match.group(1) # تاريخ التقييم evaluation_match = re.search(r'تاريخ التقييم\s*[:؛]\s*' + date_pattern, text) if evaluation_match: timeline["evaluation_date"] = evaluation_match.group(1) # تاريخ الترسية award_match = re.search(r'تاريخ الترسية\s*[:؛]\s*' + date_pattern, text) if award_match: timeline["award_date"] = award_match.group(1) return timeline def _analyze_contract_specifics(self, text): """تحليل خاص بالعقود""" return { "parties": self._extract_contract_parties(text), "duration": self._extract_contract_duration(text), "termination_conditions": self._extract_termination_conditions(text), "penalties": self._extract_penalties(text), "warranties": self._extract_warranties(text) } def _extract_contract_parties(self, text): """استخراج أطراف العقد""" parties = {} # البحث عن الطرف الأول first_party_match = re.search(r'الطرف الأول\s*[:؛]\s*([^\n]+)', text) if first_party_match: parties["first_party"] = first_party_match.group(1).strip() # البحث عن الطرف الثاني second_party_match = re.search(r'الطرف الثاني\s*[:؛]\s*([^\n]+)', text) if second_party_match: parties["second_party"] = second_party_match.group(1).strip() return parties def _extract_contract_duration(self, text): """استخراج مدة العقد""" duration = {} # البحث عن مدة العقد duration_match = re.search(r'مدة العقد\s*[:؛]\s*([^\n]+)', text) if duration_match: duration["text"] = duration_match.group(1).strip() # البحث عن تاريخ بداية العقد start_date_match = re.search(r'تاريخ بداية العقد\s*[:؛]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', text) if start_date_match: duration["start_date"] = start_date_match.group(1) # البحث عن تاريخ نهاية العقد end_date_match = re.search(r'تاريخ نهاية العقد\s*[:؛]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})', text) if end_date_match: duration["end_date"] = end_date_match.group(1) return duration def _extract_termination_conditions(self, text): """استخراج شروط إنهاء العقد""" conditions = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['إنهاء العقد', 'فسخ العقد', 'إلغاء العقد']): conditions.append(section.strip()) return conditions def _extract_penalties(self, text): """استخراج الغرامات والجزاءات""" penalties = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['غرامة', 'جزاء', 'عقوبة', 'تعويض']): penalties.append(section.strip()) return penalties def _extract_warranties(self, text): """استخراج الضمانات""" warranties = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['ضمان', 'كفالة', 'تأمين']): warranties.append(section.strip()) return warranties def _analyze_technical_specifics(self, text): """تحليل خاص بالمستندات الفنية""" return { "specifications": self._extract_technical_specifications(text), "standards": self._extract_technical_standards(text), "testing_procedures": self._extract_testing_procedures(text), "quality_requirements": self._extract_quality_requirements(text) } def _extract_technical_specifications(self, text): """استخراج المواصفات الفنية""" specifications = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['مواصفات فنية', 'خصائص', 'متطلبات فنية']): specifications.append(section.strip()) return specifications def _extract_technical_standards(self, text): """استخراج المعايير الفنية""" standards = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['معايير', 'مقاييس', 'مواصفات قياسية']): standards.append(section.strip()) return standards def _extract_testing_procedures(self, text): """استخراج إجراءات الاختبار""" procedures = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['اختبار', 'فحص', 'تجربة']): procedures.append(section.strip()) return procedures def _extract_quality_requirements(self, text): """استخراج متطلبات الجودة""" requirements = [] sections = text.split('\n\n') for section in sections: if any(keyword in section.lower() for keyword in ['جودة', 'ضمان الجودة', 'رقابة']): requirements.append(section.strip()) return requirements def _extract_entities(self, text): """استخراج الكيانات من النص""" entities = { "organizations": self._extract_organizations(text), "people": self._extract_people(text), "locations": self._extract_locations(text) } return entities def _extract_organizations(self, text): """استخراج المنظمات والشركات""" import re # نمط بسيط للبحث عن المنظمات والشركات org_pattern = r'شركة [\u0600-\u06FF\s]+|مؤسسة [\u0600-\u06FF\s]+|وزارة [\u0600-\u06FF\s]+|هيئة [\u0600-\u06FF\s]+' return list(set(re.findall(org_pattern, text))) def _extract_people(self, text): """استخراج أسماء الأشخاص""" # في التطبيق الحقيقي، ستحتاج إلى استخدام تقنيات التعرف على الكيانات # هذه مجرد محاكاة بسيطة return [] def _extract_locations(self, text): """استخراج المواقع""" import re # نمط بسيط للبحث عن المواقع location_pattern = r'مدينة [\u0600-\u06FF\s]+|منطقة [\u0600-\u06FF\s]+|محافظة [\u0600-\u06FF\s]+' return list(set(re.findall(location_pattern, text))) def _extract_materials(self, text): """استخراج المواد""" materials = [] # في التطبيق الحقيقي، ستحتاج إلى قائمة بالمواد الشائعة للبحث عنها common_materials = ['حديد', 'خشب', 'زجاج', 'ألمنيوم', 'نحاس', 'بلاستيك', 'خرسانة'] for material in common_materials: if material in text.lower(): # البحث عن السياق المحيط بالمادة pattern = r'[^.]*\b' + material + r'\b[^.]*\.' material_contexts = re.findall(pattern, text) for context in material_contexts: materials.append(context.strip()) return materials def _extract_measurements(self, text): """استخراج القياسات""" import re # البحث عن القياسات مثل الطول والعرض والوزن وغيرها measurement_pattern = r'\d+(?:\.\d+)?\s*(?:متر|سم|مم|كجم|طن|لتر|مل)' return re.findall(measurement_pattern, text) def _extract_standards(self, text): """استخراج المعايير""" standards = [] # معايير شائعة للبحث عنها common_standards = ['ISO', 'SASO', 'ASTM', 'BS', 'DIN', 'IEC'] for standard in common_standards: if standard in text: # البحث عن المعيار مع رقمه pattern = r'\b' + standard + r'\s*\d+\b' standard_matches = re.findall(pattern, text) standards.extend(standard_matches) return standards def _analyze_topics(self, text): """تحليل المواضيع الرئيسية""" # في التطبيق الحقيقي، ستحتاج إلى استخدام تقنيات تحليل المواضيع مثل LDA # هذه مجرد محاكاة بسيطة topics = {} # مواضيع شائعة في المناقصات والعقود common_topics = { "financial": ['سعر', 'تكلفة', 'ميزانية', 'دفع', 'مالي'], "technical": ['فني', 'مواصفات', 'معايير', 'تقني'], "legal": ['قانوني', 'شرط', 'بند', 'التزام', 'حق'], "administrative": ['إداري', 'إجراء', 'تنظيم', 'إشراف'], "time": ['مدة', 'فترة', 'موعد', 'تاريخ', 'جدول زمني'] } # حساب تكرار كلمات كل موضوع في النص word_count = len(text.split()) for topic, keywords in common_topics.items(): topic_count = 0 for keyword in keywords: # عدد مرات ظهور الكلمة المفتاحية في النص topic_count += len(re.findall(r'\b' + keyword + r'\w*\b', text)) # حساب النسبة المئوية للموضوع if word_count > 0: topic_percentage = (topic_count / word_count) * 100 topics[topic] = round(topic_percentage, 2) else: topics[topic] = 0 return topics def _check_required_terms(self, text): """التحقق من وجود المصطلحات المطلوبة""" required_terms = { "general": ['نطاق العمل', 'مدة التنفيذ', 'الشروط العامة'], "financial": ['قيمة العقد', 'طريقة الدفع', 'الضمان المالي'], "legal": ['حل النزاعات', 'الإنهاء', 'التعويضات'], "technical": ['المواصفات الفنية', 'ضمان الجودة', 'معايير القبول'] } found_terms = {} for category, terms in required_terms.items(): found_in_category = [] for term in terms: if term in text: found_in_category.append(term) found_terms[category] = found_in_category return found_terms def _calculate_compliance_score(self, text): """حساب درجة الامتثال""" # التحقق من وجود الأقسام المطلوبة missing_sections = self._check_missing_sections(text) required_terms = self._check_required_terms(text) # حساب درجة الامتثال total_required_terms = sum(len(terms) for terms in required_terms.values()) found_terms = sum(len(found) for found in required_terms.values()) if total_required_terms > 0: compliance_percentage = (found_terms / total_required_terms) * 100 # تقليل الدرجة بناءً على الأقسام المفقودة compliance_percentage -= len(missing_sections) * 5 # ضمان أن الدرجة في النطاق المناسب compliance_percentage = max(0, min(100, compliance_percentage)) return round(compliance_percentage, 1) else: return 0 def _get_version_info(self, document_path): """الحصول على معلومات الإصدار""" # في التطبيق الحقيقي، قد تحتاج لاستخراج معلومات الإصدار من الملف version_info = { "filename": os.path.basename(document_path), "last_modified": time.ctime(os.path.getmtime(document_path)) } # البحث عن رقم الإصدار في اسم الملف match = re.search(r'[vV](\d+(?:\.\d+)*)', os.path.basename(document_path)) if match: version_info["version_number"] = match.group(1) else: version_info["version_number"] = "غير محدد" return version_info def _analyze_docx(self, document_path, document_type): """تحليل مستند Word""" try: # استخراج النص من ملف Word text = self._extract_text_from_docx(document_path) # استخدام نفس آلية تحليل PDF للتحليل analysis = self._analyze_pdf(document_path, document_type) # تحديث نوع الملف analysis["file_info"]["type"] = "DOCX" return analysis except Exception as e: logger.error(f"خطأ في تحليل مستند Word: {str(e)}") raise def _extract_text_from_docx(self, document_path): """استخراج النص من ملف Word""" try: import docx doc = docx.Document(document_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" for table in doc.tables: for row in table.rows: for cell in row.cells: text += cell.text + " " text += "\n" return text except Exception as e: logger.error(f"خطأ في استخراج النص من Word: {str(e)}") raise def _analyze_xlsx(self, document_path, document_type): """تحليل مستند Excel""" try: # استخراج البيانات من ملف Excel data = self._extract_data_from_xlsx(document_path) # إنشاء تحليل مخصص لملفات Excel analysis = { "document_path": document_path, "document_type": document_type, "analysis_start_time": self.analysis_results["analysis_start_time"], "status": "جاري التحليل", "file_info": { "name": os.path.basename(document_path), "type": "XLSX", "size": os.path.getsize(document_path), "sheets": self._count_sheets(document_path), "create_date": "غير متوفر", "modify_date": time.ctime(os.path.getmtime(document_path)) }, "data_analysis": { "sheet_summary": data["sheet_summary"], "total_rows": data["total_rows"], "total_columns": data["total_columns"], "numeric_columns": data["numeric_columns"], "text_columns": data["text_columns"], "date_columns": data["date_columns"] } } # إضافة تحليلات إضافية حسب نوع المستند if document_type == "tender": analysis["tender_analysis"] = self._analyze_excel_tender(data) elif document_type == "financial": analysis["financial_analysis"] = self._analyze_excel_financial(data) return analysis except Exception as e: logger.error(f"خطأ في تحليل مستند Excel: {str(e)}") raise def _extract_data_from_xlsx(self, document_path): """استخراج البيانات من ملف Excel""" try: import pandas as pd # قراءة جميع الأوراق في الملف excel_file = pd.ExcelFile(document_path) sheet_names = excel_file.sheet_names data = { "sheet_summary": {}, "total_rows": 0, "total_columns": 0, "numeric_columns": 0, "text_columns": 0, "date_columns": 0, "sheets": {} } for sheet_name in sheet_names: # قراءة الورقة إلى DataFrame df = pd.read_excel(excel_file, sheet_name=sheet_name) # تحليل أنواع البيانات في الأعمدة column_types = {} numeric_columns = 0 text_columns = 0 date_columns = 0 for column in df.columns: if pd.api.types.is_numeric_dtype(df[column]): column_types[column] = "numeric" numeric_columns += 1 elif pd.api.types.is_datetime64_dtype(df[column]): column_types[column] = "date" date_columns += 1 else: column_types[column] = "text" text_columns += 1 # تحديث ملخص الورقة data["sheet_summary"][sheet_name] = { "rows": len(df), "columns": len(df.columns), "column_types": column_types } # تحديث الإحصائيات الإجمالية data["total_rows"] += len(df) data["total_columns"] += len(df.columns) data["numeric_columns"] += numeric_columns data["text_columns"] += text_columns data["date_columns"] += date_columns # تخزين البيانات (مع حد أقصى للصفوف للتحكم في الحجم) max_rows = 100 data["sheets"][sheet_name] = df.head(max_rows).to_dict(orient="records") return data except Exception as e: logger.error(f"خطأ في استخراج البيانات من Excel: {str(e)}") raise def _count_sheets(self, document_path): """حساب عدد الأوراق في ملف Excel""" try: import pandas as pd excel_file = pd.ExcelFile(document_path) return len(excel_file.sheet_names) except Exception as e: logger.error(f"خطأ في حساب عدد الأوراق: {str(e)}") return 0 def _analyze_excel_tender(self, data): """تحليل بيانات المناقصة من ملف Excel""" # تحليل بسيط لملف Excel خاص بالمناقصة analysis = { "items": self._extract_tender_items(data), "quantities": self._extract_tender_quantities(data), "pricing": self._extract_tender_pricing(data) } return analysis def _extract_tender_items(self, data): """استخراج البنود من بيانات المناقصة""" items = [] # البحث عن الأوراق التي تحتوي على بنود المناقصة for sheet_name, sheet_data in data["sheets"].items(): if not sheet_data: continue # البحث عن الأعمدة التي قد تحتوي على أسماء البنود possible_item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"] for row in sheet_data: item_found = False # البحث عن أسماء البنود for column in possible_item_columns: if column in row and row[column]: # تحقق من وجود أعمدة الكمية والوحدة quantity = None unit = None for qty_col in ["الكمية", "العدد", "quantity", "qty"]: if qty_col in row and row[qty_col]: quantity = row[qty_col] break for unit_col in ["الوحدة", "unit", "uom"]: if unit_col in row and row[unit_col]: unit = row[unit_col] break # إضافة البند إلى القائمة items.append({ "name": row[column], "quantity": quantity, "unit": unit }) item_found = True break if item_found: break return items def _extract_tender_quantities(self, data): """استخراج الكميات من بيانات المناقصة""" quantities = {} # البحث عن الأوراق التي تحتوي على كميات المناقصة for sheet_name, sheet_data in data["sheets"].items(): if not sheet_data: continue # البحث عن الأعمدة التي قد تحتوي على الكميات quantity_columns = ["الكمية", "العدد", "quantity", "qty"] item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"] for row in sheet_data: item_name = None quantity = None # البحث عن اسم البند for col in item_columns: if col in row and row[col]: item_name = row[col] break # البحث عن الكمية for col in quantity_columns: if col in row and row[col]: quantity = row[col] break # تخزين الكمية إذا وجدت if item_name and quantity: quantities[item_name] = quantity return quantities def _extract_tender_pricing(self, data): """استخراج الأسعار من بيانات المناقصة""" pricing = {} # البحث عن الأوراق التي تحتوي على أسعار المناقصة for sheet_name, sheet_data in data["sheets"].items(): if not sheet_data: continue # البحث عن الأعمدة التي قد تحتوي على الأسعار price_columns = ["السعر", "التكلفة", "المبلغ", "price", "cost", "amount"] item_columns = ["البند", "الوصف", "المادة", "البيان", "item", "description"] for row in sheet_data: item_name = None price = None # البحث عن اسم البند for col in item_columns: if col in row and row[col]: item_name = row[col] break # البحث عن السعر for col in price_columns: if col in row and row[col]: price = row[col] break # تخزين السعر إذا وجد if item_name and price: pricing[item_name] = price return pricing def _analyze_excel_financial(self, data): """تحليل البيانات المالية من ملف Excel""" # تحليل بسيط لملف Excel مالي analysis = { "total_amount": self._calculate_total_amount(data), "budget_breakdown": self._extract_budget_breakdown(data), "payment_schedule": self._extract_payment_schedule(data) } return analysis def _calculate_total_amount(self, data): """حساب المبلغ الإجمالي من البيانات المالية""" total = 0 # البحث عن الأوراق التي تحتوي على بيانات مالية for sheet_name, sheet_data in data["sheets"].items(): if not sheet_data: continue # البحث عن الأعمدة التي قد تحتوي على مبالغ amount_columns = ["المبلغ", "الإجمالي", "المجموع", "amount", "total", "sum"] for row in sheet_data: for col in amount_columns: if col in row and row[col] and isinstance(row[col], (int, float)): total += row[col] return total def _extract_budget_breakdown(self, data): """استخراج تفاصيل الميزانية من البيانات المالية""" breakdown = {} # البحث عن الأوراق التي تحتوي على تفاصيل الميزانية for sheet_name, sheet_data in data["sheets"].items(): if not sheet_data: continue # البحث عن الأعمدة التي قد تحتوي على بنود الميزانية category_columns = ["البند", "الفئة", "القسم", "category", "item"] amount_columns = ["المبلغ", "التكلفة", "القيمة", "amount", "cost", "value"] for row in sheet_data: category = None amount = None # البحث عن فئة الميزانية for col in category_columns: if col in row and row[col]: category = row[col] break # البحث عن المبلغ for col in amount_columns: if col in row and row[col] and isinstance(row[col], (int, float)): amount = row[col] break # تخزين بند الميزانية إذا وجد if category and amount: breakdown[category] = amount return breakdown def _extract_payment_schedule(self, data): """استخراج جدول الدفعات من البيانات المالية""" schedule = [] # البحث عن الأوراق التي تحتوي على جدول الدفعات for sheet_name, sheet_data in data["sheets"].items(): if not sheet_data: continue # البحث عن الأعمدة التي قد تحتوي على معلومات الدفعات date_columns = ["التاريخ", "الموعد", "date", "schedule"] amount_columns = ["المبلغ", "الدفعة", "القيمة", "amount", "payment", "value"] description_columns = ["الوصف", "البيان", "description", "details"] for row in sheet_data: date = None amount = None description = None # البحث عن تاريخ الدفعة for col in date_columns: if col in row and row[col]: date = row[col] break # البحث عن مبلغ الدفعة for col in amount_columns: if col in row and row[col]: amount = row[col] break # البحث عن وصف الدفعة for col in description_columns: if col in row and row[col]: description = row[col] break # تخزين الدفعة إذا وجدت if date and amount: schedule.append({ "date": date, "amount": amount, "description": description }) return schedule def _analyze_txt(self, document_path, document_type): """تحليل مستند نصي""" try: # قراءة محتوى الملف النصي with open(document_path, 'r', encoding='utf-8') as file: text = file.read() # استخدام نفس آلية تحليل PDF analysis = self._analyze_pdf(document_path, document_type) # تحديث نوع الملف analysis["file_info"]["type"] = "TXT" analysis["file_info"]["pages"] = self._estimate_pages(text) return analysis except Exception as e: logger.error(f"خطأ في تحليل مستند نصي: {str(e)}") raise def _estimate_pages(self, text): """تقدير عدد الصفحات في النص""" # تقدير بسيط: كل 3000 حرف تعادل صفحة واحدة تقريبًا return max(1, len(text) // 3000) def get_analysis_status(self): """الحصول على حالة التحليل الحالي""" if not self.analysis_in_progress: if not self.analysis_results: return {"status": "لا يوجد تحليل جارٍ"} else: return {"status": self.analysis_results.get("status", "غير معروف")} return { "status": "جاري التحليل", "document_path": self.current_document, "start_time": self.analysis_results.get("analysis_start_time") } def get_analysis_results(self): """الحصول على نتائج التحليل""" return self.analysis_results def export_analysis_results(self, output_path=None): """تصدير نتائج التحليل إلى ملف JSON""" if not self.analysis_results: logger.warning("لا توجد نتائج تحليل للتصدير") return None if not output_path: # إنشاء اسم ملف افتراضي timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"analysis_results_{timestamp}.json" output_path = os.path.join(self.documents_path, filename) try: with open(output_path, 'w', encoding='utf-8') as f: json.dump(self.analysis_results, f, ensure_ascii=False, indent=4) logger.info(f"تم تصدير نتائج التحليل إلى: {output_path}") return output_path except Exception as e: logger.error(f"خطأ في تصدير نتائج التحليل: {str(e)}") return None def import_analysis_results(self, input_path): """استيراد نتائج التحليل من ملف JSON""" if not os.path.exists(input_path): logger.error(f"ملف نتائج التحليل غير موجود: {input_path}") return False try: with open(input_path, 'r', encoding='utf-8') as f: self.analysis_results = json.load(f) logger.info(f"تم استيراد نتائج التحليل من: {input_path}") return True except Exception as e: logger.error(f"خطأ في استيراد نتائج التحليل: {str(e)}") return False def _count_pages(self, document_path): """حساب عدد صفحات المستند""" try: import PyPDF2 with open(document_path, 'rb') as file: reader = PyPDF2.PdfReader(file) return len(reader.pages) except: return 0 def _get_creation_date(self, document_path): """استخراج تاريخ إنشاء المستند""" try: import PyPDF2 with open(document_path, 'rb') as file: reader = PyPDF2.PdfReader(file) if '/CreationDate' in reader.metadata: return reader.metadata['/CreationDate'] return "غير متوفر" except: return "غير متوفر" def _analyze_technical_specs(self, text): """تحليل المواصفات الفنية""" specs = { "materials": self._extract_materials(text), "measurements": self._extract_measurements(text), "standards": self._extract_standards(text) } return specs def _extract_key_dates(self, text): """استخراج التواريخ المهمة""" import re date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}' dates = re.findall(date_pattern, text) return list(set(dates)) def _extract_figures(self, text): """استخراج الأرقام والقيم المهمة""" import re # البحث عن القيم النقدية currency_pattern = r'[\d,]+\.?\d*\s*(?:ريال|دولار|SAR|USD)' currencies = re.findall(currency_pattern, text) # البحث عن النسب المئوية percentage_pattern = r'\d+\.?\d*\s*%' percentages = re.findall(percentage_pattern, text) return { "currencies": currencies, "percentages": percentages } def _analyze_unique_terms(self, text): """تحليل المصطلحات الفريدة""" words = set(text.split()) return list(words) def _calculate_complexity(self, text): """حساب مستوى تعقيد النص""" words = text.split() if not words: return 0 avg_word_length = sum(len(word) for word in words) / len(words) sentences = text.split('.') if not sentences: return 0 avg_sentence_length = len(words) / len(sentences) # حساب درجة التعقيد (1-10) complexity = min((avg_word_length * 0.5 + avg_sentence_length * 0.2), 10) return round(complexity, 2) def _check_missing_sections(self, text): """التحقق من الأقسام المفقودة""" required_sections = [ "نطاق العمل", "المواصفات الفنية", "الشروط العامة", "الضمانات", "الغرامات", "شروط الدفع" ] missing = [] for section in required_sections: if section not in text: missing.append(section) return missing def _find_related_documents(self, document_path): """البحث عن المستندات المرتبطة""" directory = os.path.dirname(document_path) base_name = os.path.basename(document_path) related = [] for file in os.listdir(directory): if file != base_name and file.startswith(base_name.split('_')[0]): related.append(file) return related def process_image(self, image_path): """معالجة وضغط الصورة""" try: # فتح الصورة with Image.open(image_path) as img: # تحويل الصورة إلى RGB إذا كانت RGBA if img.mode == 'RGBA': img = img.convert('RGB') # البدء بجودة عالية وتقليلها تدريجياً حتى نصل للحجم المطلوب quality = 95 max_size = (1200, 1200) while True: img.thumbnail(max_size, Image.Resampling.LANCZOS) buffer = io.BytesIO() img.save(buffer, format='JPEG', quality=quality, optimize=True) size = len(buffer.getvalue()) # إذا كان الحجم أقل من 5 ميجابايت، نخرج من الحلقة if size <= 5000000: break # تقليل الجودة والحجم quality = max(quality - 10, 20) # لا نقلل الجودة عن 20 max_size = (int(max_size[0] * 0.8), int(max_size[1] * 0.8)) # إذا وصلنا للحد الأدنى من الجودة والحجم ولم نصل للحجم المطلوب if quality == 20 and max_size[0] < 400: raise ValueError("لا يمكن ضغط الصورة للحجم المطلوب") # تحويل الصورة المضغوطة إلى base64 return base64.b64encode(buffer.getvalue()).decode('utf-8') except Exception as e: logger.error(f"خطأ في معالجة الصورة: {str(e)}") raise def convert_pdf_to_images(self, pdf_path): """تحويل PDF إلى صور""" try: from pdf2image import convert_from_path images = convert_from_path(pdf_path) return images except Exception as e: logger.error(f"فشل في تحويل ملف PDF إلى صورة: {str(e)}") raise