""" وحدة تحليل المستندات لنظام إدارة المناقصات - Hybrid Face """ import os import re import logging import threading from pathlib import Path import datetime import json # تهيئة السجل logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('document_analysis') class DocumentAnalyzer: """فئة تحليل المستندات""" def __init__(self, config=None): """تهيئة محلل المستندات""" self.config = config self.analysis_in_progress = False self.current_document = None self.analysis_results = {} # إنشاء مجلد المستندات إذا لم يكن موجوداً if config and hasattr(config, 'DOCUMENTS_PATH'): self.documents_path = Path(config.DOCUMENTS_PATH) else: self.documents_path = Path('data/documents') if not self.documents_path.exists(): self.documents_path.mkdir(parents=True, exist_ok=True) def analyze_document(self, document_path, document_type="tender", callback=None): """تحليل مستند""" if self.analysis_in_progress: logger.warning("هناك عملية تحليل جارية بالفعل") return False if not os.path.exists(document_path): logger.error(f"المستند غير موجود: {document_path}") return False self.analysis_in_progress = True self.current_document = document_path self.analysis_results = { "document_path": document_path, "document_type": document_type, "analysis_start_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "status": "جاري التحليل", "items": [], "entities": [], "dates": [], "amounts": [], "risks": [] } # بدء التحليل في خيط منفصل thread = threading.Thread( target=self._analyze_document_thread, args=(document_path, document_type, callback) ) thread.daemon = True thread.start() return True def _analyze_document_thread(self, document_path, document_type, callback): """خيط تحليل المستند""" try: # تحديد نوع المستند file_extension = os.path.splitext(document_path)[1].lower() if file_extension == '.pdf': self.analysis_results = self._analyze_pdf(document_path, document_type) elif file_extension == '.docx': self._analyze_docx(document_path, document_type) elif file_extension == '.xlsx': self._analyze_xlsx(document_path, document_type) elif file_extension == '.txt': self._analyze_txt(document_path, document_type) else: logger.error(f"نوع المستند غير مدعوم: {file_extension}") self.analysis_results["status"] = "فشل التحليل" self.analysis_results["error"] = "نوع المستند غير مدعوم" # تحديث حالة التحليل if self.analysis_results["status"] != "فشل التحليل": self.analysis_results["status"] = "اكتمل التحليل" self.analysis_results["analysis_end_time"] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') logger.info(f"اكتمل تحليل المستند: {document_path}") except Exception as e: logger.error(f"خطأ في تحليل المستند: {str(e)}") self.analysis_results["status"] = "فشل التحليل" self.analysis_results["error"] = str(e) finally: self.analysis_in_progress = False # استدعاء دالة الاستجابة إذا تم توفيرها if callback and callable(callback): callback(self.analysis_results) def _analyze_pdf(self, document_path, document_type): """تحليل مستند PDF باستخدام الذكاء الاصطناعي""" try: # استخراج النص من PDF text = self._extract_text_from_pdf(document_path) # تحليل متقدم للمستند analysis = { "file_info": { "name": os.path.basename(document_path), "type": "PDF", "size": os.path.getsize(document_path), "pages": self._count_pages(document_path), "create_date": self._get_creation_date(document_path), "modify_date": time.ctime(os.path.getmtime(document_path)) }, "content_analysis": { "contract_terms": self._analyze_contract_terms(text), "financial_analysis": self._analyze_financial_terms(text), "legal_analysis": self._analyze_legal_terms(text), "risk_analysis": self._analyze_risks(text), "conditions_analysis": self._analyze_conditions(text), "technical_specs": self._analyze_technical_specs(text), "key_dates": self._extract_key_dates(text), "important_figures": self._extract_figures(text), "entities": self._extract_entities(text) }, "statistical_analysis": { "word_count": len(text.split()), "unique_terms": self._analyze_unique_terms(text), "topic_distribution": self._analyze_topics(text), "complexity_score": self._calculate_complexity(text) }, "compliance_check": { "missing_sections": self._check_missing_sections(text), "required_terms": self._check_required_terms(text), "compliance_score": self._calculate_compliance_score(text) }, "summary": self._generate_summary(text), "recommendations": self._generate_recommendations(text), "related_documents": self._find_related_documents(document_path), "version_info": self._get_version_info(document_path) } # إضافة تحليل متخصص حسب نوع المستند if document_type == "tender": analysis["tender_analysis"] = self._analyze_tender_specifics(text) elif document_type == "contract": analysis["contract_analysis"] = self._analyze_contract_specifics(text) elif document_type == "technical": analysis["technical_analysis"] = self._analyze_technical_specifics(text) return analysis except Exception as e: logger.error(f"خطأ في تحليل PDF: {str(e)}") raise def _extract_text_from_pdf(self, document_path): """استخراج النص من ملف PDF (تحتاج إلى مكتبة مثل PyPDF2 أو pdfplumber)""" # Implementation using a PDF processing library like PyPDF2 or pdfplumber is needed here. # This is a placeholder. Replace with actual PDF text extraction. return "Placeholder text extracted from PDF" def _analyze_contract_terms(self, text): """تحليل بنود العقد""" # Implementation for contract term analysis is needed here. This is a placeholder. return "Placeholder contract terms analysis" def _analyze_financial_terms(self, text): """تحليل الجزء المالي""" # Implementation for financial term analysis is needed here. This is a placeholder. return "Placeholder financial terms analysis" def _analyze_legal_terms(self, text): """تحليل القانوني للعقد""" # Implementation for legal term analysis is needed here. This is a placeholder. return "Placeholder legal terms analysis" def _analyze_risks(self, text): """تحليل المخاطر""" # Implementation for risk analysis is needed here. This is a placeholder. return "Placeholder risk analysis" def _analyze_conditions(self, text): """دراسة كراسة الشروط""" # Implementation for conditions analysis is needed here. This is a placeholder. return "Placeholder conditions analysis" def _generate_summary(self, text): """توليد ملخص""" # Implementation for summary generation is needed here. This is a placeholder. return "Placeholder summary" def _generate_recommendations(self, text): """توليد التوصيات""" # Implementation for recommendation generation is needed here. This is a placeholder. return "Placeholder recommendations" def _analyze_docx(self, document_path, document_type): """تحليل مستند Word""" try: # محاكاة تحليل مستند Word logger.info(f"تحليل مستند Word: {document_path}") # في التطبيق الفعلي، سيتم استخدام مكتبة مثل python-docx # لاستخراج النص من ملف Word وتحليله # محاكاة استخراج البنود والكيانات والتواريخ والمبالغ والمخاطر # (مشابه لتحليل PDF) self.analysis_results["items"] = [ {"id": 1, "name": "توريد معدات", "description": "توريد معدات المشروع", "unit": "مجموعة", "estimated_quantity": 10}, {"id": 2, "name": "تركيب المعدات", "description": "تركيب وتشغيل المعدات", "unit": "مجموعة", "estimated_quantity": 10}, {"id": 3, "name": "التدريب", "description": "تدريب الموظفين على استخدام المعدات", "unit": "يوم", "estimated_quantity": 20} ] # محاكاة استخراج الكيانات والتواريخ والمبالغ والمخاطر # (مشابه لتحليل PDF) except Exception as e: logger.error(f"خطأ في تحليل مستند Word: {str(e)}") raise def _analyze_xlsx(self, document_path, document_type): """تحليل مستند Excel""" try: # محاكاة تحليل مستند Excel logger.info(f"تحليل مستند Excel: {document_path}") # في التطبيق الفعلي، سيتم استخدام مكتبة مثل pandas أو openpyxl # لاستخراج البيانات من ملف Excel وتحليلها # محاكاة استخراج البنود self.analysis_results["items"] = [ {"id": 1, "name": "بند 1", "description": "وصف البند 1", "unit": "وحدة", "estimated_quantity": 100}, {"id": 2, "name": "بند 2", "description": "وصف البند 2", "unit": "وحدة", "estimated_quantity": 200}, {"id": 3, "name": "بند 3", "description": "وصف البند 3", "unit": "وحدة", "estimated_quantity": 300} ] # محاكاة استخراج المبالغ self.analysis_results["amounts"] = [ {"type": "item_cost", "amount": 10000, "currency": "SAR", "description": "تكلفة البند 1"}, {"type": "item_cost", "amount": 20000, "currency": "SAR", "description": "تكلفة البند 2"}, {"type": "item_cost", "amount": 30000, "currency": "SAR", "description": "تكلفة البند 3"} ] except Exception as e: logger.error(f"خطأ في تحليل مستند Excel: {str(e)}") raise def _analyze_txt(self, document_path, document_type): """تحليل مستند نصي""" try: # محاكاة تحليل مستند نصي logger.info(f"تحليل مستند نصي: {document_path}") # في التطبيق الفعلي، سيتم قراءة الملف النصي وتحليله # محاكاة استخراج البنود والكيانات والتواريخ والمبالغ والمخاطر # (مشابه للتحليلات الأخرى) except Exception as e: logger.error(f"خطأ في تحليل مستند نصي: {str(e)}") raise def get_analysis_status(self): """الحصول على حالة التحليل الحالي""" if not self.analysis_in_progress: if not self.analysis_results: return {"status": "لا يوجد تحليل جارٍ"} else: return {"status": self.analysis_results.get("status", "غير معروف")} return { "status": "جاري التحليل", "document_path": self.current_document, "start_time": self.analysis_results.get("analysis_start_time") } def get_analysis_results(self): """الحصول على نتائج التحليل""" return self.analysis_results def export_analysis_results(self, output_path=None): """تصدير نتائج التحليل إلى ملف JSON""" if not self.analysis_results: logger.warning("لا توجد نتائج تحليل للتصدير") return None if not output_path: # إنشاء اسم ملف افتراضي timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"analysis_results_{timestamp}.json" output_path = os.path.join(self.documents_path, filename) try: with open(output_path, 'w', encoding='utf-8') as f: json.dump(self.analysis_results, f, ensure_ascii=False, indent=4) logger.info(f"تم تصدير نتائج التحليل إلى: {output_path}") return output_path except Exception as e: logger.error(f"خطأ في تصدير نتائج التحليل: {str(e)}") return None def import_analysis_results(self, input_path): """استيراد نتائج التحليل من ملف JSON""" if not os.path.exists(input_path): logger.error(f"ملف نتائج التحليل غير موجود: {input_path}") return False try: with open(input_path, 'r', encoding='utf-8') as f: self.analysis_results = json.load(f) logger.info(f"تم استيراد نتائج التحليل من: {input_path}") return True except Exception as e: logger.error(f"خطأ في استيراد نتائج التحليل: {str(e)}") return False def _count_pages(self, document_path): """حساب عدد صفحات المستند""" try: import PyPDF2 with open(document_path, 'rb') as file: reader = PyPDF2.PdfReader(file) return len(reader.pages) except: return 0 def _get_creation_date(self, document_path): """استخراج تاريخ إنشاء المستند""" try: import PyPDF2 with open(document_path, 'rb') as file: reader = PyPDF2.PdfReader(file) if '/CreationDate' in reader.metadata: return reader.metadata['/CreationDate'] return "غير متوفر" except: return "غير متوفر" def _analyze_technical_specs(self, text): """تحليل المواصفات الفنية""" specs = { "materials": self._extract_materials(text), "measurements": self._extract_measurements(text), "standards": self._extract_standards(text) } return specs def _extract_key_dates(self, text): """استخراج التواريخ المهمة""" import re date_pattern = r'\d{1,2}[-/]\d{1,2}[-/]\d{2,4}' dates = re.findall(date_pattern, text) return list(set(dates)) def _extract_figures(self, text): """استخراج الأرقام والقيم المهمة""" import re # البحث عن القيم النقدية currency_pattern = r'[\d,]+\.?\d*\s*(?:ريال|دولار|SAR|USD)' currencies = re.findall(currency_pattern, text) # البحث عن النسب المئوية percentage_pattern = r'\d+\.?\d*\s*%' percentages = re.findall(percentage_pattern, text) return { "currencies": currencies, "percentages": percentages } def _analyze_unique_terms(self, text): """تحليل المصطلحات الفريدة""" words = set(text.split()) return list(words) def _calculate_complexity(self, text): """حساب مستوى تعقيد النص""" words = text.split() avg_word_length = sum(len(word) for word in words) / len(words) sentences = text.split('.') avg_sentence_length = len(words) / len(sentences) # حساب درجة التعقيد (1-10) complexity = min((avg_word_length * 0.5 + avg_sentence_length * 0.2), 10) return round(complexity, 2) def _check_missing_sections(self, text): """التحقق من الأقسام المفقودة""" required_sections = [ "نطاق العمل", "المواصفات الفنية", "الشروط العامة", "الضمانات", "الغرامات", "شروط الدفع" ] missing = [] for section in required_sections: if section not in text: missing.append(section) return missing def _find_related_documents(self, document_path): """البحث عن المستندات المرتبطة""" directory = os.path.dirname(document_path) base_name = os.path.basename(document_path) related = [] for file in os.listdir(directory): if file != base_name and file.startswith(base_name.split('_')[0]): related.append(file) return related