|
"""
|
|
محلل الأسعار لنظام إدارة المناقصات
|
|
"""
|
|
|
|
import os
|
|
import pandas as pd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from datetime import datetime, timedelta
|
|
from scipy import stats
|
|
import logging
|
|
|
|
logger = logging.getLogger('tender_system.pricing.analyzer')
|
|
|
|
class PriceAnalyzer:
|
|
"""فئة تحليل الأسعار"""
|
|
|
|
def __init__(self, db_connector):
|
|
"""تهيئة محلل الأسعار"""
|
|
self.db = db_connector
|
|
self.charts_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "data", "charts")
|
|
|
|
|
|
os.makedirs(self.charts_dir, exist_ok=True)
|
|
|
|
def get_price_history(self, item_id, start_date=None, end_date=None):
|
|
"""الحصول على تاريخ الأسعار لبند معين
|
|
|
|
المعلمات:
|
|
item_id (int): معرف البند
|
|
start_date (str, optional): تاريخ البداية بتنسيق 'YYYY-MM-DD'
|
|
end_date (str, optional): تاريخ النهاية بتنسيق 'YYYY-MM-DD'
|
|
|
|
العائد:
|
|
pandas.DataFrame: إطار بيانات يحتوي على تاريخ الأسعار
|
|
"""
|
|
try:
|
|
query = """
|
|
SELECT
|
|
pih.id,
|
|
pih.price,
|
|
pih.price_date,
|
|
pih.price_source,
|
|
pih.notes,
|
|
pib.code,
|
|
pib.name,
|
|
mu.name as unit_name,
|
|
mu.symbol as unit_symbol
|
|
FROM
|
|
pricing_items_history pih
|
|
JOIN
|
|
pricing_items_base pib ON pih.base_item_id = pib.id
|
|
LEFT JOIN
|
|
measurement_units mu ON pib.unit_id = mu.id
|
|
WHERE
|
|
pih.base_item_id = ?
|
|
"""
|
|
|
|
params = [item_id]
|
|
|
|
if start_date:
|
|
query += " AND pih.price_date >= ?"
|
|
params.append(start_date)
|
|
|
|
if end_date:
|
|
query += " AND pih.price_date <= ?"
|
|
params.append(end_date)
|
|
|
|
query += " ORDER BY pih.price_date ASC"
|
|
|
|
results = self.db.fetch_all(query, params)
|
|
|
|
if not results:
|
|
logger.warning(f"لا توجد بيانات تاريخية للسعر للبند رقم {item_id}")
|
|
return pd.DataFrame()
|
|
|
|
|
|
df = pd.DataFrame(results, columns=[
|
|
'id', 'price', 'price_date', 'price_source', 'notes',
|
|
'code', 'name', 'unit_name', 'unit_symbol'
|
|
])
|
|
|
|
|
|
df['price_date'] = pd.to_datetime(df['price_date'])
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في الحصول على تاريخ الأسعار: {str(e)}")
|
|
return pd.DataFrame()
|
|
|
|
def analyze_price_trends(self, item_id, start_date=None, end_date=None):
|
|
"""تحليل اتجاهات الأسعار
|
|
|
|
المعلمات:
|
|
item_id (int): معرف البند
|
|
start_date (str, optional): تاريخ البداية بتنسيق 'YYYY-MM-DD'
|
|
end_date (str, optional): تاريخ النهاية بتنسيق 'YYYY-MM-DD'
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج تحليل اتجاهات الأسعار
|
|
"""
|
|
try:
|
|
|
|
df = self.get_price_history(item_id, start_date, end_date)
|
|
|
|
if df.empty:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بيانات كافية لتحليل اتجاهات الأسعار'
|
|
}
|
|
|
|
|
|
stats_data = {
|
|
'min_price': df['price'].min(),
|
|
'max_price': df['price'].max(),
|
|
'avg_price': df['price'].mean(),
|
|
'median_price': df['price'].median(),
|
|
'std_dev': df['price'].std(),
|
|
'price_range': df['price'].max() - df['price'].min(),
|
|
'count': len(df),
|
|
'start_date': df['price_date'].min().strftime('%Y-%m-%d'),
|
|
'end_date': df['price_date'].max().strftime('%Y-%m-%d'),
|
|
'duration_days': (df['price_date'].max() - df['price_date'].min()).days,
|
|
'item_name': df['name'].iloc[0],
|
|
'item_code': df['code'].iloc[0],
|
|
'unit': df['unit_symbol'].iloc[0] if not pd.isna(df['unit_symbol'].iloc[0]) else ''
|
|
}
|
|
|
|
|
|
if len(df) >= 2:
|
|
first_price = df['price'].iloc[0]
|
|
last_price = df['price'].iloc[-1]
|
|
|
|
stats_data['absolute_change'] = last_price - first_price
|
|
stats_data['percentage_change'] = ((last_price - first_price) / first_price) * 100
|
|
|
|
|
|
years = stats_data['duration_days'] / 365.0
|
|
if years > 0:
|
|
stats_data['annual_change_rate'] = (((last_price / first_price) ** (1 / years)) - 1) * 100
|
|
else:
|
|
stats_data['annual_change_rate'] = 0
|
|
else:
|
|
stats_data['absolute_change'] = 0
|
|
stats_data['percentage_change'] = 0
|
|
stats_data['annual_change_rate'] = 0
|
|
|
|
|
|
if len(df) >= 3:
|
|
|
|
df['days'] = (df['price_date'] - df['price_date'].min()).dt.days
|
|
|
|
|
|
slope, intercept, r_value, p_value, std_err = stats.linregress(df['days'], df['price'])
|
|
|
|
stats_data['trend_slope'] = slope
|
|
stats_data['trend_intercept'] = intercept
|
|
stats_data['trend_r_squared'] = r_value ** 2
|
|
stats_data['trend_p_value'] = p_value
|
|
stats_data['trend_std_err'] = std_err
|
|
|
|
|
|
if p_value < 0.05:
|
|
if slope > 0:
|
|
stats_data['trend_direction'] = 'upward'
|
|
stats_data['trend_description'] = 'اتجاه تصاعدي'
|
|
elif slope < 0:
|
|
stats_data['trend_direction'] = 'downward'
|
|
stats_data['trend_description'] = 'اتجاه تنازلي'
|
|
else:
|
|
stats_data['trend_direction'] = 'stable'
|
|
stats_data['trend_description'] = 'مستقر'
|
|
else:
|
|
stats_data['trend_direction'] = 'no_significant_trend'
|
|
stats_data['trend_description'] = 'لا يوجد اتجاه واضح'
|
|
|
|
|
|
stats_data['volatility'] = (df['price'].std() / df['price'].mean()) * 100
|
|
|
|
|
|
if stats_data['volatility'] < 5:
|
|
stats_data['volatility_level'] = 'low'
|
|
stats_data['volatility_description'] = 'منخفض'
|
|
elif stats_data['volatility'] < 15:
|
|
stats_data['volatility_level'] = 'medium'
|
|
stats_data['volatility_description'] = 'متوسط'
|
|
else:
|
|
stats_data['volatility_level'] = 'high'
|
|
stats_data['volatility_description'] = 'مرتفع'
|
|
else:
|
|
stats_data['trend_direction'] = 'insufficient_data'
|
|
stats_data['trend_description'] = 'بيانات غير كافية'
|
|
stats_data['volatility'] = 0
|
|
stats_data['volatility_level'] = 'unknown'
|
|
stats_data['volatility_description'] = 'غير معروف'
|
|
|
|
|
|
chart_path = self._create_trend_chart(df, stats_data, item_id)
|
|
stats_data['chart_path'] = chart_path
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': stats_data
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في تحليل اتجاهات الأسعار: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء تحليل اتجاهات الأسعار: {str(e)}'
|
|
}
|
|
|
|
def _create_trend_chart(self, df, stats_data, item_id):
|
|
"""إنشاء رسم بياني للاتجاه
|
|
|
|
المعلمات:
|
|
df (pandas.DataFrame): إطار البيانات
|
|
stats_data (dict): بيانات الإحصاءات
|
|
item_id (int): معرف البند
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(10, 6))
|
|
|
|
|
|
plt.scatter(df['price_date'], df['price'], color='blue', alpha=0.6, label='أسعار فعلية')
|
|
|
|
|
|
if len(df) >= 3 and 'trend_slope' in stats_data:
|
|
|
|
x_trend = pd.date_range(start=df['price_date'].min(), end=df['price_date'].max(), periods=100)
|
|
days_trend = [(date - df['price_date'].min()).days for date in x_trend]
|
|
y_trend = stats_data['trend_slope'] * np.array(days_trend) + stats_data['trend_intercept']
|
|
|
|
|
|
plt.plot(x_trend, y_trend, color='red', linestyle='--', label='خط الاتجاه')
|
|
|
|
|
|
plt.axhline(y=stats_data['avg_price'], color='green', linestyle='-', alpha=0.5, label='متوسط السعر')
|
|
|
|
|
|
plt.title(f"تحليل اتجاه السعر - {stats_data['item_name']} ({stats_data['item_code']})")
|
|
plt.xlabel('التاريخ')
|
|
plt.ylabel(f"السعر ({stats_data['unit']})")
|
|
|
|
|
|
plt.grid(True, linestyle='--', alpha=0.7)
|
|
|
|
|
|
plt.legend()
|
|
|
|
|
|
plt.gcf().autofmt_xdate()
|
|
|
|
|
|
info_text = (
|
|
f"التغير: {stats_data['percentage_change']:.2f}%\n"
|
|
f"التقلب: {stats_data['volatility']:.2f}%\n"
|
|
)
|
|
|
|
if 'trend_r_squared' in stats_data:
|
|
info_text += f"R²: {stats_data['trend_r_squared']:.3f}"
|
|
|
|
plt.annotate(info_text, xy=(0.02, 0.95), xycoords='axes fraction',
|
|
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="gray", alpha=0.8))
|
|
|
|
|
|
chart_filename = f"price_trend_{item_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني للاتجاه: {str(e)}")
|
|
return None
|
|
|
|
def compare_prices(self, items, date=None):
|
|
"""مقارنة الأسعار بين عدة بنود
|
|
|
|
المعلمات:
|
|
items (list): قائمة بمعرفات البنود
|
|
date (str, optional): تاريخ المقارنة بتنسيق 'YYYY-MM-DD'
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج مقارنة الأسعار
|
|
"""
|
|
try:
|
|
if not items:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد أي بنود للمقارنة'
|
|
}
|
|
|
|
comparison_data = []
|
|
|
|
for item_id in items:
|
|
|
|
item_query = """
|
|
SELECT
|
|
id, code, name, description,
|
|
(SELECT name FROM measurement_units WHERE id = unit_id) as unit_name,
|
|
(SELECT symbol FROM measurement_units WHERE id = unit_id) as unit_symbol,
|
|
base_price, last_updated_date
|
|
FROM
|
|
pricing_items_base
|
|
WHERE
|
|
id = ?
|
|
"""
|
|
|
|
item_result = self.db.fetch_one(item_query, [item_id])
|
|
|
|
if not item_result:
|
|
logger.warning(f"البند رقم {item_id} غير موجود")
|
|
continue
|
|
|
|
item_data = {
|
|
'id': item_result[0],
|
|
'code': item_result[1],
|
|
'name': item_result[2],
|
|
'description': item_result[3],
|
|
'unit_name': item_result[4],
|
|
'unit_symbol': item_result[5],
|
|
'base_price': item_result[6],
|
|
'last_updated_date': item_result[7]
|
|
}
|
|
|
|
|
|
if date:
|
|
price_query = """
|
|
SELECT price, price_date, price_source
|
|
FROM pricing_items_history
|
|
WHERE base_item_id = ?
|
|
AND price_date <= ?
|
|
ORDER BY price_date DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
price_result = self.db.fetch_one(price_query, [item_id, date])
|
|
|
|
if price_result:
|
|
item_data['price'] = price_result[0]
|
|
item_data['price_date'] = price_result[1]
|
|
item_data['price_source'] = price_result[2]
|
|
else:
|
|
|
|
item_data['price'] = item_data['base_price']
|
|
item_data['price_date'] = item_data['last_updated_date']
|
|
item_data['price_source'] = 'base_price'
|
|
else:
|
|
|
|
price_query = """
|
|
SELECT price, price_date, price_source
|
|
FROM pricing_items_history
|
|
WHERE base_item_id = ?
|
|
ORDER BY price_date DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
price_result = self.db.fetch_one(price_query, [item_id])
|
|
|
|
if price_result:
|
|
item_data['price'] = price_result[0]
|
|
item_data['price_date'] = price_result[1]
|
|
item_data['price_source'] = price_result[2]
|
|
else:
|
|
|
|
item_data['price'] = item_data['base_price']
|
|
item_data['price_date'] = item_data['last_updated_date']
|
|
item_data['price_source'] = 'base_price'
|
|
|
|
comparison_data.append(item_data)
|
|
|
|
if not comparison_data:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم العثور على أي بنود للمقارنة'
|
|
}
|
|
|
|
|
|
chart_path = self._create_comparison_chart(comparison_data, date)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': {
|
|
'items': comparison_data,
|
|
'comparison_date': date if date else 'latest',
|
|
'chart_path': chart_path
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في مقارنة الأسعار: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء مقارنة الأسعار: {str(e)}'
|
|
}
|
|
|
|
def _create_comparison_chart(self, comparison_data, date=None):
|
|
"""إنشاء رسم بياني للمقارنة
|
|
|
|
المعلمات:
|
|
comparison_data (list): بيانات المقارنة
|
|
date (str, optional): تاريخ المقارنة
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
|
|
names = [f"{item['code']} - {item['name']}" for item in comparison_data]
|
|
prices = [item['price'] for item in comparison_data]
|
|
|
|
|
|
bars = plt.bar(names, prices, color='skyblue', edgecolor='navy')
|
|
|
|
|
|
for bar in bars:
|
|
height = bar.get_height()
|
|
plt.text(bar.get_x() + bar.get_width()/2., height + 0.1,
|
|
f'{height:.2f}', ha='center', va='bottom')
|
|
|
|
|
|
title = "مقارنة الأسعار"
|
|
if date:
|
|
title += f" (بتاريخ {date})"
|
|
|
|
plt.title(title)
|
|
plt.xlabel('البنود')
|
|
plt.ylabel('السعر')
|
|
|
|
|
|
plt.xticks(rotation=45, ha='right')
|
|
|
|
|
|
plt.grid(True, linestyle='--', alpha=0.7, axis='y')
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"price_comparison_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني للمقارنة: {str(e)}")
|
|
return None
|
|
|
|
def calculate_price_volatility(self, item_id, period='1y'):
|
|
"""حساب تقلب الأسعار
|
|
|
|
المعلمات:
|
|
item_id (int): معرف البند
|
|
period (str): الفترة الزمنية ('1m', '3m', '6m', '1y', '2y', '5y', 'all')
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج حساب تقلب الأسعار
|
|
"""
|
|
try:
|
|
|
|
end_date = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
if period == '1m':
|
|
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
|
|
elif period == '3m':
|
|
start_date = (datetime.now() - timedelta(days=90)).strftime('%Y-%m-%d')
|
|
elif period == '6m':
|
|
start_date = (datetime.now() - timedelta(days=180)).strftime('%Y-%m-%d')
|
|
elif period == '1y':
|
|
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
|
|
elif period == '2y':
|
|
start_date = (datetime.now() - timedelta(days=730)).strftime('%Y-%m-%d')
|
|
elif period == '5y':
|
|
start_date = (datetime.now() - timedelta(days=1825)).strftime('%Y-%m-%d')
|
|
else:
|
|
start_date = None
|
|
|
|
|
|
df = self.get_price_history(item_id, start_date, end_date)
|
|
|
|
if df.empty or len(df) < 2:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بيانات كافية لحساب تقلب الأسعار'
|
|
}
|
|
|
|
|
|
mean_price = df['price'].mean()
|
|
std_dev = df['price'].std()
|
|
volatility = (std_dev / mean_price) * 100
|
|
|
|
|
|
df['price_shift'] = df['price'].shift(1)
|
|
df = df.dropna()
|
|
|
|
if not df.empty:
|
|
df['price_change_pct'] = ((df['price'] - df['price_shift']) / df['price_shift']) * 100
|
|
|
|
|
|
max_increase = df['price_change_pct'].max()
|
|
max_decrease = df['price_change_pct'].min()
|
|
avg_change = df['price_change_pct'].mean()
|
|
median_change = df['price_change_pct'].median()
|
|
|
|
|
|
positive_changes = (df['price_change_pct'] > 0).sum()
|
|
negative_changes = (df['price_change_pct'] < 0).sum()
|
|
no_changes = (df['price_change_pct'] == 0).sum()
|
|
|
|
|
|
if volatility < 5:
|
|
volatility_level = 'low'
|
|
volatility_description = 'منخفض'
|
|
elif volatility < 15:
|
|
volatility_level = 'medium'
|
|
volatility_description = 'متوسط'
|
|
else:
|
|
volatility_level = 'high'
|
|
volatility_description = 'مرتفع'
|
|
|
|
|
|
chart_path = self._create_volatility_chart(df, item_id, period)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': {
|
|
'item_id': item_id,
|
|
'item_name': df['name'].iloc[0],
|
|
'item_code': df['code'].iloc[0],
|
|
'period': period,
|
|
'start_date': df['price_date'].min().strftime('%Y-%m-%d'),
|
|
'end_date': df['price_date'].max().strftime('%Y-%m-%d'),
|
|
'data_points': len(df),
|
|
'mean_price': mean_price,
|
|
'std_dev': std_dev,
|
|
'volatility': volatility,
|
|
'volatility_level': volatility_level,
|
|
'volatility_description': volatility_description,
|
|
'max_increase': max_increase,
|
|
'max_decrease': max_decrease,
|
|
'avg_change': avg_change,
|
|
'median_change': median_change,
|
|
'positive_changes': positive_changes,
|
|
'negative_changes': negative_changes,
|
|
'no_changes': no_changes,
|
|
'chart_path': chart_path
|
|
}
|
|
}
|
|
else:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بيانات كافية لحساب تقلب الأسعار بعد معالجة البيانات'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في حساب تقلب الأسعار: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء حساب تقلب الأسعار: {str(e)}'
|
|
}
|
|
|
|
def _create_volatility_chart(self, df, item_id, period):
|
|
"""إنشاء رسم بياني للتقلب
|
|
|
|
المعلمات:
|
|
df (pandas.DataFrame): إطار البيانات
|
|
item_id (int): معرف البند
|
|
period (str): الفترة الزمنية
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10), gridspec_kw={'height_ratios': [2, 1]})
|
|
|
|
|
|
ax1.plot(df['price_date'], df['price'], 'b-', linewidth=2)
|
|
ax1.set_title(f"سعر البند عبر الزمن - {df['name'].iloc[0]} ({df['code'].iloc[0]})")
|
|
ax1.set_xlabel('التاريخ')
|
|
ax1.set_ylabel('السعر')
|
|
ax1.grid(True, linestyle='--', alpha=0.7)
|
|
|
|
|
|
mean_price = df['price'].mean()
|
|
std_dev = df['price'].std()
|
|
|
|
ax1.axhline(y=mean_price, color='g', linestyle='-', alpha=0.8, label='متوسط السعر')
|
|
ax1.axhline(y=mean_price + std_dev, color='r', linestyle='--', alpha=0.5, label='انحراف معياري +1')
|
|
ax1.axhline(y=mean_price - std_dev, color='r', linestyle='--', alpha=0.5, label='انحراف معياري -1')
|
|
|
|
ax1.fill_between(df['price_date'], mean_price - std_dev, mean_price + std_dev, color='gray', alpha=0.2)
|
|
ax1.legend()
|
|
|
|
|
|
ax2.bar(df['price_date'], df['price_change_pct'], color='skyblue', edgecolor='navy', alpha=0.7)
|
|
ax2.set_title('التغيرات النسبية في السعر (%)')
|
|
ax2.set_xlabel('التاريخ')
|
|
ax2.set_ylabel('التغير النسبي (%)')
|
|
ax2.grid(True, linestyle='--', alpha=0.7)
|
|
|
|
|
|
ax2.axhline(y=0, color='k', linestyle='-', alpha=0.3)
|
|
|
|
|
|
fig.autofmt_xdate()
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"price_volatility_{item_id}_{period}_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني للتقلب: {str(e)}")
|
|
return None
|
|
|
|
def perform_sensitivity_analysis(self, project_id, variable_items, ranges):
|
|
"""إجراء تحليل الحساسية
|
|
|
|
المعلمات:
|
|
project_id (int): معرف المشروع
|
|
variable_items (list): قائمة بمعرفات البنود المتغيرة
|
|
ranges (dict): نطاقات التغيير لكل بند
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج تحليل الحساسية
|
|
"""
|
|
try:
|
|
|
|
query = """
|
|
SELECT
|
|
id, item_number, description, quantity, unit_price, total_price
|
|
FROM
|
|
project_pricing_items
|
|
WHERE
|
|
project_id = ?
|
|
"""
|
|
|
|
results = self.db.fetch_all(query, [project_id])
|
|
|
|
if not results:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بنود للمشروع المحدد'
|
|
}
|
|
|
|
|
|
project_items = pd.DataFrame(results, columns=[
|
|
'id', 'item_number', 'description', 'quantity', 'unit_price', 'total_price'
|
|
])
|
|
|
|
|
|
original_total = project_items['total_price'].sum()
|
|
|
|
|
|
sensitivity_data = []
|
|
|
|
for item_id in variable_items:
|
|
if item_id not in project_items['id'].values:
|
|
logger.warning(f"البند رقم {item_id} غير موجود في المشروع")
|
|
continue
|
|
|
|
|
|
item_info = project_items[project_items['id'] == item_id].iloc[0]
|
|
|
|
|
|
if str(item_id) in ranges:
|
|
item_range = ranges[str(item_id)]
|
|
else:
|
|
|
|
item_range = {'min': -20, 'max': 20, 'step': 10}
|
|
|
|
|
|
change_percentages = list(range(
|
|
item_range['min'],
|
|
item_range['max'] + item_range['step'],
|
|
item_range['step']
|
|
))
|
|
|
|
item_sensitivity = {
|
|
'item_id': item_id,
|
|
'item_number': item_info['item_number'],
|
|
'description': item_info['description'],
|
|
'original_price': item_info['unit_price'],
|
|
'original_total': item_info['total_price'],
|
|
'changes': []
|
|
}
|
|
|
|
|
|
for percentage in change_percentages:
|
|
|
|
new_price = item_info['unit_price'] * (1 + percentage / 100)
|
|
new_total = new_price * item_info['quantity']
|
|
|
|
|
|
project_total = original_total - item_info['total_price'] + new_total
|
|
|
|
|
|
project_change = ((project_total - original_total) / original_total) * 100
|
|
|
|
item_sensitivity['changes'].append({
|
|
'percentage': percentage,
|
|
'new_price': new_price,
|
|
'new_total': new_total,
|
|
'project_total': project_total,
|
|
'project_change': project_change
|
|
})
|
|
|
|
sensitivity_data.append(item_sensitivity)
|
|
|
|
if not sensitivity_data:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بنود صالحة لتحليل الحساسية'
|
|
}
|
|
|
|
|
|
chart_path = self._create_sensitivity_chart(sensitivity_data, original_total, project_id)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': {
|
|
'project_id': project_id,
|
|
'original_total': original_total,
|
|
'sensitivity_data': sensitivity_data,
|
|
'chart_path': chart_path
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إجراء تحليل الحساسية: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء إجراء تحليل الحساسية: {str(e)}'
|
|
}
|
|
|
|
def _create_sensitivity_chart(self, sensitivity_data, original_total, project_id):
|
|
"""إنشاء رسم بياني لتحليل الحساسية
|
|
|
|
المعلمات:
|
|
sensitivity_data (list): بيانات تحليل الحساسية
|
|
original_total (float): إجمالي المشروع الأصلي
|
|
project_id (int): معرف المشروع
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
|
|
|
|
for item in sensitivity_data:
|
|
percentages = [change['percentage'] for change in item['changes']]
|
|
project_changes = [change['project_change'] for change in item['changes']]
|
|
|
|
plt.plot(percentages, project_changes, marker='o', linewidth=2,
|
|
label=f"{item['item_number']} - {item['description'][:30]}...")
|
|
|
|
|
|
plt.title(f"تحليل الحساسية للمشروع رقم {project_id}")
|
|
plt.xlabel('نسبة التغيير في سعر البند (%)')
|
|
plt.ylabel('نسبة التغيير في إجمالي المشروع (%)')
|
|
|
|
|
|
plt.axhline(y=0, color='k', linestyle='-', alpha=0.3)
|
|
plt.axvline(x=0, color='k', linestyle='-', alpha=0.3)
|
|
|
|
|
|
plt.grid(True, linestyle='--', alpha=0.7)
|
|
|
|
|
|
plt.legend(loc='best')
|
|
|
|
|
|
info_text = f"إجمالي المشروع الأصلي: {original_total:,.2f}"
|
|
plt.annotate(info_text, xy=(0.02, 0.02), xycoords='axes fraction',
|
|
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="gray", alpha=0.8))
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"sensitivity_analysis_{project_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني لتحليل الحساسية: {str(e)}")
|
|
return None
|
|
|
|
def analyze_price_correlations(self, items):
|
|
"""تحليل ارتباطات الأسعار بين عدة بنود
|
|
|
|
المعلمات:
|
|
items (list): قائمة بمعرفات البنود
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج تحليل الارتباطات
|
|
"""
|
|
try:
|
|
if not items or len(items) < 2:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'يجب تحديد بندين على الأقل لتحليل الارتباطات'
|
|
}
|
|
|
|
|
|
all_prices = {}
|
|
item_names = {}
|
|
|
|
for item_id in items:
|
|
|
|
df = self.get_price_history(item_id)
|
|
|
|
if df.empty:
|
|
logger.warning(f"لا توجد بيانات تاريخية للسعر للبند رقم {item_id}")
|
|
continue
|
|
|
|
|
|
all_prices[item_id] = df[['price_date', 'price']].copy()
|
|
item_names[item_id] = f"{df['code'].iloc[0]} - {df['name'].iloc[0]}"
|
|
|
|
if len(all_prices) < 2:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بيانات كافية لتحليل الارتباطات'
|
|
}
|
|
|
|
|
|
|
|
all_dates = set()
|
|
for item_id, df in all_prices.items():
|
|
all_dates.update(df['price_date'].dt.strftime('%Y-%m-%d').tolist())
|
|
|
|
|
|
unified_df = pd.DataFrame({'price_date': sorted(list(all_dates))})
|
|
unified_df['price_date'] = pd.to_datetime(unified_df['price_date'])
|
|
|
|
|
|
for item_id, df in all_prices.items():
|
|
|
|
price_series = df.set_index('price_date')['price']
|
|
|
|
|
|
unified_df[f'price_{item_id}'] = unified_df['price_date'].map(
|
|
lambda x: price_series.get(x, None)
|
|
)
|
|
|
|
|
|
price_columns = [col for col in unified_df.columns if col.startswith('price_')]
|
|
unified_df[price_columns] = unified_df[price_columns].interpolate(method='linear')
|
|
|
|
|
|
unified_df = unified_df.dropna()
|
|
|
|
if len(unified_df) < 3:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بيانات كافية بعد معالجة التواريخ المشتركة'
|
|
}
|
|
|
|
|
|
correlation_matrix = unified_df[price_columns].corr()
|
|
|
|
|
|
correlation_data = []
|
|
|
|
for i, item1_id in enumerate(items):
|
|
if f'price_{item1_id}' not in correlation_matrix.columns:
|
|
continue
|
|
|
|
for j, item2_id in enumerate(items):
|
|
if f'price_{item2_id}' not in correlation_matrix.columns or i >= j:
|
|
continue
|
|
|
|
correlation = correlation_matrix.loc[f'price_{item1_id}', f'price_{item2_id}']
|
|
|
|
|
|
if abs(correlation) < 0.3:
|
|
strength = 'weak'
|
|
strength_description = 'ضعيف'
|
|
elif abs(correlation) < 0.7:
|
|
strength = 'moderate'
|
|
strength_description = 'متوسط'
|
|
else:
|
|
strength = 'strong'
|
|
strength_description = 'قوي'
|
|
|
|
if correlation > 0:
|
|
direction = 'positive'
|
|
direction_description = 'طردي'
|
|
else:
|
|
direction = 'negative'
|
|
direction_description = 'عكسي'
|
|
|
|
correlation_data.append({
|
|
'item1_id': item1_id,
|
|
'item1_name': item_names.get(item1_id, f'البند {item1_id}'),
|
|
'item2_id': item2_id,
|
|
'item2_name': item_names.get(item2_id, f'البند {item2_id}'),
|
|
'correlation': correlation,
|
|
'strength': strength,
|
|
'strength_description': strength_description,
|
|
'direction': direction,
|
|
'direction_description': direction_description
|
|
})
|
|
|
|
if not correlation_data:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم العثور على ارتباطات بين البنود المحددة'
|
|
}
|
|
|
|
|
|
chart_path = self._create_correlation_chart(correlation_matrix, item_names)
|
|
|
|
|
|
trends_chart_path = self._create_price_trends_chart(unified_df, price_columns, item_names)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': {
|
|
'correlation_data': correlation_data,
|
|
'chart_path': chart_path,
|
|
'trends_chart_path': trends_chart_path
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في تحليل ارتباطات الأسعار: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء تحليل ارتباطات الأسعار: {str(e)}'
|
|
}
|
|
|
|
def _create_correlation_chart(self, correlation_matrix, item_names):
|
|
"""إنشاء رسم بياني لمصفوفة الارتباط
|
|
|
|
المعلمات:
|
|
correlation_matrix (pandas.DataFrame): مصفوفة الارتباط
|
|
item_names (dict): قاموس بأسماء البنود
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(10, 8))
|
|
|
|
|
|
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
|
|
cmap = sns.diverging_palette(230, 20, as_cmap=True)
|
|
|
|
|
|
labels = [item_names.get(int(col.split('_')[1]), col) for col in correlation_matrix.columns]
|
|
|
|
|
|
sns.heatmap(correlation_matrix, mask=mask, cmap=cmap, vmax=1, vmin=-1, center=0,
|
|
square=True, linewidths=.5, cbar_kws={"shrink": .5}, annot=True,
|
|
xticklabels=labels, yticklabels=labels)
|
|
|
|
|
|
plt.title('مصفوفة ارتباط الأسعار بين البنود')
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"price_correlation_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني لمصفوفة الارتباط: {str(e)}")
|
|
return None
|
|
|
|
def _create_price_trends_chart(self, unified_df, price_columns, item_names):
|
|
"""إنشاء رسم بياني لتطور الأسعار
|
|
|
|
المعلمات:
|
|
unified_df (pandas.DataFrame): إطار البيانات الموحد
|
|
price_columns (list): أسماء أعمدة الأسعار
|
|
item_names (dict): قاموس بأسماء البنود
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
|
|
for col in price_columns:
|
|
item_id = int(col.split('_')[1])
|
|
item_name = item_names.get(item_id, f'البند {item_id}')
|
|
|
|
|
|
first_price = unified_df[col].iloc[0]
|
|
normalized_prices = (unified_df[col] / first_price) * 100
|
|
|
|
plt.plot(unified_df['price_date'], normalized_prices, linewidth=2, label=item_name)
|
|
|
|
|
|
plt.title('تطور الأسعار النسبية للبنود (القيمة الأولى = 100)')
|
|
plt.xlabel('التاريخ')
|
|
plt.ylabel('السعر النسبي')
|
|
|
|
|
|
plt.grid(True, linestyle='--', alpha=0.7)
|
|
|
|
|
|
plt.legend(loc='best')
|
|
|
|
|
|
plt.gcf().autofmt_xdate()
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"price_trends_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني لتطور الأسعار: {str(e)}")
|
|
return None
|
|
|
|
def compare_with_market_prices(self, items):
|
|
"""مقارنة أسعار البنود مع أسعار السوق
|
|
|
|
المعلمات:
|
|
items (list): قائمة بمعرفات البنود
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج المقارنة
|
|
"""
|
|
try:
|
|
if not items:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد أي بنود للمقارنة'
|
|
}
|
|
|
|
comparison_data = []
|
|
|
|
for item_id in items:
|
|
|
|
item_query = """
|
|
SELECT
|
|
id, code, name, description,
|
|
(SELECT name FROM measurement_units WHERE id = unit_id) as unit_name,
|
|
(SELECT symbol FROM measurement_units WHERE id = unit_id) as unit_symbol,
|
|
base_price, last_updated_date
|
|
FROM
|
|
pricing_items_base
|
|
WHERE
|
|
id = ?
|
|
"""
|
|
|
|
item_result = self.db.fetch_one(item_query, [item_id])
|
|
|
|
if not item_result:
|
|
logger.warning(f"البند رقم {item_id} غير موجود")
|
|
continue
|
|
|
|
item_data = {
|
|
'id': item_result[0],
|
|
'code': item_result[1],
|
|
'name': item_result[2],
|
|
'description': item_result[3],
|
|
'unit_name': item_result[4],
|
|
'unit_symbol': item_result[5],
|
|
'base_price': item_result[6],
|
|
'last_updated_date': item_result[7]
|
|
}
|
|
|
|
|
|
price_query = """
|
|
SELECT price, price_date, price_source
|
|
FROM pricing_items_history
|
|
WHERE base_item_id = ?
|
|
ORDER BY price_date DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
price_result = self.db.fetch_one(price_query, [item_id])
|
|
|
|
if price_result:
|
|
item_data['current_price'] = price_result[0]
|
|
item_data['price_date'] = price_result[1]
|
|
item_data['price_source'] = price_result[2]
|
|
else:
|
|
|
|
item_data['current_price'] = item_data['base_price']
|
|
item_data['price_date'] = item_data['last_updated_date']
|
|
item_data['price_source'] = 'base_price'
|
|
|
|
|
|
market_query = """
|
|
SELECT AVG(price) as avg_price
|
|
FROM pricing_items_history
|
|
WHERE base_item_id = ? AND price_source != 'internal'
|
|
AND price_date >= date('now', '-6 months')
|
|
"""
|
|
|
|
market_result = self.db.fetch_one(market_query, [item_id])
|
|
|
|
if market_result and market_result[0]:
|
|
item_data['market_price'] = market_result[0]
|
|
|
|
|
|
item_data['price_difference'] = item_data['current_price'] - item_data['market_price']
|
|
item_data['price_difference_percentage'] = (item_data['price_difference'] / item_data['market_price']) * 100
|
|
|
|
|
|
if abs(item_data['price_difference_percentage']) < 5:
|
|
item_data['price_status'] = 'competitive'
|
|
item_data['price_status_description'] = 'تنافسي'
|
|
elif item_data['price_difference_percentage'] < 0:
|
|
item_data['price_status'] = 'below_market'
|
|
item_data['price_status_description'] = 'أقل من السوق'
|
|
else:
|
|
item_data['price_status'] = 'above_market'
|
|
item_data['price_status_description'] = 'أعلى من السوق'
|
|
else:
|
|
|
|
internal_query = """
|
|
SELECT AVG(price) as avg_price
|
|
FROM pricing_items_history
|
|
WHERE base_item_id = ?
|
|
AND price_date >= date('now', '-6 months')
|
|
"""
|
|
|
|
internal_result = self.db.fetch_one(internal_query, [item_id])
|
|
|
|
if internal_result and internal_result[0]:
|
|
item_data['market_price'] = internal_result[0]
|
|
item_data['price_difference'] = item_data['current_price'] - item_data['market_price']
|
|
item_data['price_difference_percentage'] = (item_data['price_difference'] / item_data['market_price']) * 100
|
|
|
|
|
|
if abs(item_data['price_difference_percentage']) < 5:
|
|
item_data['price_status'] = 'competitive'
|
|
item_data['price_status_description'] = 'تنافسي'
|
|
elif item_data['price_difference_percentage'] < 0:
|
|
item_data['price_status'] = 'below_average'
|
|
item_data['price_status_description'] = 'أقل من المتوسط'
|
|
else:
|
|
item_data['price_status'] = 'above_average'
|
|
item_data['price_status_description'] = 'أعلى من المتوسط'
|
|
else:
|
|
item_data['market_price'] = None
|
|
item_data['price_difference'] = None
|
|
item_data['price_difference_percentage'] = None
|
|
item_data['price_status'] = 'unknown'
|
|
item_data['price_status_description'] = 'غير معروف'
|
|
|
|
comparison_data.append(item_data)
|
|
|
|
if not comparison_data:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم العثور على أي بنود للمقارنة'
|
|
}
|
|
|
|
|
|
chart_path = self._create_market_comparison_chart(comparison_data)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': {
|
|
'items': comparison_data,
|
|
'chart_path': chart_path
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في مقارنة الأسعار مع أسعار السوق: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء مقارنة الأسعار مع أسعار السوق: {str(e)}'
|
|
}
|
|
|
|
def _create_market_comparison_chart(self, comparison_data):
|
|
"""إنشاء رسم بياني لمقارنة الأسعار مع أسعار السوق
|
|
|
|
المعلمات:
|
|
comparison_data (list): بيانات المقارنة
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
valid_items = [item for item in comparison_data if item.get('market_price') is not None]
|
|
|
|
if not valid_items:
|
|
return None
|
|
|
|
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
|
|
names = [f"{item['code']} - {item['name'][:20]}..." for item in valid_items]
|
|
current_prices = [item['current_price'] for item in valid_items]
|
|
market_prices = [item['market_price'] for item in valid_items]
|
|
|
|
|
|
x = np.arange(len(names))
|
|
width = 0.35
|
|
|
|
|
|
plt.bar(x - width/2, current_prices, width, label='السعر الحالي', color='skyblue')
|
|
plt.bar(x + width/2, market_prices, width, label='سعر السوق', color='lightgreen')
|
|
|
|
|
|
plt.xlabel('البنود')
|
|
plt.ylabel('السعر')
|
|
plt.title('مقارنة الأسعار الحالية مع أسعار السوق')
|
|
plt.xticks(x, names, rotation=45, ha='right')
|
|
plt.legend()
|
|
|
|
|
|
plt.grid(True, linestyle='--', alpha=0.7, axis='y')
|
|
|
|
|
|
for i, item in enumerate(valid_items):
|
|
if 'price_difference_percentage' in item and item['price_difference_percentage'] is not None:
|
|
percentage = item['price_difference_percentage']
|
|
color = 'green' if percentage < 0 else 'red' if percentage > 0 else 'black'
|
|
plt.annotate(f"{percentage:.1f}%",
|
|
xy=(x[i], max(current_prices[i], market_prices[i]) * 1.05),
|
|
ha='center', va='bottom', color=color,
|
|
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="gray", alpha=0.8))
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"market_comparison_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني لمقارنة الأسعار مع أسعار السوق: {str(e)}")
|
|
return None
|
|
|
|
def analyze_cost_drivers(self, project_id):
|
|
"""تحليل محركات التكلفة للمشروع
|
|
|
|
المعلمات:
|
|
project_id (int): معرف المشروع
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على نتائج تحليل محركات التكلفة
|
|
"""
|
|
try:
|
|
|
|
query = """
|
|
SELECT
|
|
id, item_number, description, quantity, unit_price, total_price,
|
|
(SELECT name FROM pricing_categories WHERE id =
|
|
(SELECT category_id FROM pricing_items_base WHERE id = base_item_id)
|
|
) as category_name
|
|
FROM
|
|
project_pricing_items
|
|
WHERE
|
|
project_id = ?
|
|
"""
|
|
|
|
results = self.db.fetch_all(query, [project_id])
|
|
|
|
if not results:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لا توجد بنود للمشروع المحدد'
|
|
}
|
|
|
|
|
|
df = pd.DataFrame(results, columns=[
|
|
'id', 'item_number', 'description', 'quantity', 'unit_price',
|
|
'total_price', 'category_name'
|
|
])
|
|
|
|
|
|
df['category_name'] = df['category_name'].fillna('أخرى')
|
|
|
|
|
|
project_total = df['total_price'].sum()
|
|
|
|
|
|
category_analysis = df.groupby('category_name').agg({
|
|
'total_price': 'sum'
|
|
}).reset_index()
|
|
|
|
|
|
category_analysis['percentage'] = (category_analysis['total_price'] / project_total) * 100
|
|
|
|
|
|
category_analysis = category_analysis.sort_values('total_price', ascending=False)
|
|
|
|
|
|
top_items = df.sort_values('total_price', ascending=False).head(10)
|
|
top_items['percentage'] = (top_items['total_price'] / project_total) * 100
|
|
|
|
|
|
df_sorted = df.sort_values('total_price', ascending=False)
|
|
df_sorted['cumulative_cost'] = df_sorted['total_price'].cumsum()
|
|
df_sorted['cumulative_percentage'] = (df_sorted['cumulative_cost'] / project_total) * 100
|
|
|
|
|
|
items_80_percent = len(df_sorted[df_sorted['cumulative_percentage'] <= 80])
|
|
if items_80_percent == 0:
|
|
items_80_percent = 1
|
|
|
|
pareto_ratio = items_80_percent / len(df)
|
|
|
|
|
|
category_chart_path = self._create_category_chart(category_analysis)
|
|
top_items_chart_path = self._create_top_items_chart(top_items)
|
|
pareto_chart_path = self._create_pareto_chart(df_sorted)
|
|
|
|
return {
|
|
'status': 'success',
|
|
'data': {
|
|
'project_id': project_id,
|
|
'project_total': project_total,
|
|
'category_analysis': category_analysis.to_dict('records'),
|
|
'top_items': top_items.to_dict('records'),
|
|
'pareto_ratio': pareto_ratio,
|
|
'items_80_percent': items_80_percent,
|
|
'total_items': len(df),
|
|
'category_chart_path': category_chart_path,
|
|
'top_items_chart_path': top_items_chart_path,
|
|
'pareto_chart_path': pareto_chart_path
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في تحليل محركات التكلفة: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء تحليل محركات التكلفة: {str(e)}'
|
|
}
|
|
|
|
def _create_category_chart(self, category_analysis):
|
|
"""إنشاء رسم بياني للتكاليف حسب الفئة
|
|
|
|
المعلمات:
|
|
category_analysis (pandas.DataFrame): تحليل الفئات
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(10, 6))
|
|
|
|
|
|
plt.pie(
|
|
category_analysis['total_price'],
|
|
labels=category_analysis['category_name'],
|
|
autopct='%1.1f%%',
|
|
startangle=90,
|
|
shadow=False,
|
|
wedgeprops={'edgecolor': 'white', 'linewidth': 1}
|
|
)
|
|
|
|
|
|
plt.title('توزيع التكاليف حسب الفئة')
|
|
|
|
|
|
plt.axis('equal')
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"cost_category_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني للتكاليف حسب الفئة: {str(e)}")
|
|
return None
|
|
|
|
def _create_top_items_chart(self, top_items):
|
|
"""إنشاء رسم بياني للبنود الأعلى تكلفة
|
|
|
|
المعلمات:
|
|
top_items (pandas.DataFrame): البنود الأعلى تكلفة
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
|
|
items = [f"{row['item_number']} - {row['description'][:20]}..." for _, row in top_items.iterrows()]
|
|
costs = top_items['total_price'].tolist()
|
|
|
|
|
|
bars = plt.barh(items, costs, color='skyblue', edgecolor='navy')
|
|
|
|
|
|
for i, bar in enumerate(bars):
|
|
width = bar.get_width()
|
|
plt.text(width * 1.01, bar.get_y() + bar.get_height()/2,
|
|
f'{width:,.0f} ({top_items["percentage"].iloc[i]:.1f}%)',
|
|
va='center')
|
|
|
|
|
|
plt.title('البنود الأعلى تكلفة')
|
|
plt.xlabel('التكلفة')
|
|
plt.ylabel('البنود')
|
|
|
|
|
|
plt.grid(True, linestyle='--', alpha=0.7, axis='x')
|
|
|
|
|
|
plt.tight_layout()
|
|
|
|
|
|
chart_filename = f"top_cost_items_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني للبنود الأعلى تكلفة: {str(e)}")
|
|
return None
|
|
|
|
def _create_pareto_chart(self, df_sorted):
|
|
"""إنشاء رسم بياني لتحليل باريتو
|
|
|
|
المعلمات:
|
|
df_sorted (pandas.DataFrame): إطار البيانات المرتب
|
|
|
|
العائد:
|
|
str: مسار ملف الرسم البياني
|
|
"""
|
|
try:
|
|
|
|
fig, ax1 = plt.subplots(figsize=(12, 6))
|
|
|
|
|
|
x = range(1, len(df_sorted) + 1)
|
|
y1 = df_sorted['total_price'].tolist()
|
|
y2 = df_sorted['cumulative_percentage'].tolist()
|
|
|
|
|
|
ax1.bar(x, y1, color='skyblue', alpha=0.7)
|
|
ax1.set_xlabel('عدد البنود')
|
|
ax1.set_ylabel('التكلفة', color='navy')
|
|
ax1.tick_params(axis='y', labelcolor='navy')
|
|
|
|
|
|
ax2 = ax1.twinx()
|
|
|
|
|
|
ax2.plot(x, y2, 'r-', linewidth=2, marker='o', markersize=4)
|
|
ax2.set_ylabel('النسبة التراكمية (%)', color='red')
|
|
ax2.tick_params(axis='y', labelcolor='red')
|
|
|
|
|
|
ax2.axhline(y=80, color='green', linestyle='--', alpha=0.7)
|
|
|
|
|
|
plt.title('تحليل باريتو للتكاليف')
|
|
|
|
|
|
ax1.grid(True, linestyle='--', alpha=0.7)
|
|
|
|
|
|
fig.tight_layout()
|
|
|
|
|
|
chart_filename = f"pareto_analysis_{datetime.now().strftime('%Y%m%d%H%M%S')}.png"
|
|
chart_path = os.path.join(self.charts_dir, chart_filename)
|
|
plt.savefig(chart_path, dpi=100, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return chart_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسم بياني لتحليل باريتو: {str(e)}")
|
|
return None
|
|
|
|
def generate_price_analysis_charts(self, analysis_type, params):
|
|
"""إنشاء رسوم بيانية لتحليل الأسعار
|
|
|
|
المعلمات:
|
|
analysis_type (str): نوع التحليل
|
|
params (dict): معلمات التحليل
|
|
|
|
العائد:
|
|
dict: قاموس يحتوي على مسارات الرسوم البيانية
|
|
"""
|
|
try:
|
|
if analysis_type == 'trend':
|
|
|
|
if 'item_id' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد معرف البند'
|
|
}
|
|
|
|
result = self.analyze_price_trends(
|
|
params['item_id'],
|
|
params.get('start_date'),
|
|
params.get('end_date')
|
|
)
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [result['data']['chart_path']]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
elif analysis_type == 'comparison':
|
|
|
|
if 'items' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد البنود للمقارنة'
|
|
}
|
|
|
|
result = self.compare_prices(
|
|
params['items'],
|
|
params.get('date')
|
|
)
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [result['data']['chart_path']]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
elif analysis_type == 'volatility':
|
|
|
|
if 'item_id' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد معرف البند'
|
|
}
|
|
|
|
result = self.calculate_price_volatility(
|
|
params['item_id'],
|
|
params.get('period', '1y')
|
|
)
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [result['data']['chart_path']]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
elif analysis_type == 'sensitivity':
|
|
|
|
if 'project_id' not in params or 'variable_items' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد معرف المشروع أو البنود المتغيرة'
|
|
}
|
|
|
|
result = self.perform_sensitivity_analysis(
|
|
params['project_id'],
|
|
params['variable_items'],
|
|
params.get('ranges', {})
|
|
)
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [result['data']['chart_path']]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
elif analysis_type == 'correlation':
|
|
|
|
if 'items' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد البنود للتحليل'
|
|
}
|
|
|
|
result = self.analyze_price_correlations(params['items'])
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [result['data']['chart_path'], result['data']['trends_chart_path']]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
elif analysis_type == 'market_comparison':
|
|
|
|
if 'items' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد البنود للمقارنة'
|
|
}
|
|
|
|
result = self.compare_with_market_prices(params['items'])
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [result['data']['chart_path']]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
elif analysis_type == 'cost_drivers':
|
|
|
|
if 'project_id' not in params:
|
|
return {
|
|
'status': 'error',
|
|
'message': 'لم يتم تحديد معرف المشروع'
|
|
}
|
|
|
|
result = self.analyze_cost_drivers(params['project_id'])
|
|
|
|
if result['status'] == 'success':
|
|
return {
|
|
'status': 'success',
|
|
'charts': [
|
|
result['data']['category_chart_path'],
|
|
result['data']['top_items_chart_path'],
|
|
result['data']['pareto_chart_path']
|
|
]
|
|
}
|
|
else:
|
|
return result
|
|
|
|
else:
|
|
return {
|
|
'status': 'error',
|
|
'message': f'نوع التحليل غير معروف: {analysis_type}'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"خطأ في إنشاء رسوم بيانية لتحليل الأسعار: {str(e)}")
|
|
return {
|
|
'status': 'error',
|
|
'message': f'حدث خطأ أثناء إنشاء رسوم بيانية لتحليل الأسعار: {str(e)}'
|
|
}
|
|
|