|
"""
|
|
معالج ملفات Excel
|
|
"""
|
|
|
|
import pandas as pd
|
|
import os
|
|
import numpy as np
|
|
import xlsxwriter
|
|
from datetime import datetime
|
|
import traceback
|
|
import config
|
|
from utils.helpers import create_directory_if_not_exists, get_file_extension, format_number
|
|
|
|
|
|
def read_excel_file(file_path, sheet_name=0, header=0, skip_rows=None):
|
|
"""
|
|
قراءة ملف Excel
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف Excel
|
|
sheet_name: اسم أو رقم الصفحة (افتراضي: 0)
|
|
header: رقم الصف الذي يحتوي على العناوين (افتراضي: 0)
|
|
skip_rows: قائمة بأرقام الصفوف للتخطي (افتراضي: None)
|
|
|
|
الإرجاع:
|
|
DataFrame من البيانات المقروءة
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
ext = get_file_extension(file_path)
|
|
if ext not in ['.xlsx', '.xls', '.xlsm']:
|
|
raise ValueError(f"نوع الملف غير مدعوم: {ext}. يجب أن يكون الملف بامتداد .xlsx أو .xls أو .xlsm")
|
|
|
|
|
|
df = pd.read_excel(
|
|
file_path,
|
|
sheet_name=sheet_name,
|
|
header=header,
|
|
skiprows=skip_rows
|
|
)
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في قراءة ملف Excel: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def write_excel_file(df, file_path, sheet_name="Sheet1", index=False, freeze_panes=None, column_widths=None, formats=None):
|
|
"""
|
|
كتابة DataFrame إلى ملف Excel
|
|
|
|
المعلمات:
|
|
df: DataFrame المراد كتابته
|
|
file_path: مسار ملف Excel
|
|
sheet_name: اسم الصفحة (افتراضي: Sheet1)
|
|
index: ما إذا كان سيتم تضمين الفهرس (افتراضي: False)
|
|
freeze_panes: صف وعمود لتجميد الألواح (افتراضي: None)
|
|
column_widths: قاموس لعرض الأعمدة {column_name: width}
|
|
formats: قاموس لتنسيقات الأعمدة {column_name: format_function}
|
|
|
|
الإرجاع:
|
|
True في حالة النجاح
|
|
"""
|
|
try:
|
|
|
|
create_directory_if_not_exists(os.path.dirname(file_path))
|
|
|
|
|
|
writer = pd.ExcelWriter(file_path, engine='xlsxwriter')
|
|
df.to_excel(writer, sheet_name=sheet_name, index=index)
|
|
|
|
|
|
workbook = writer.book
|
|
worksheet = writer.sheets[sheet_name]
|
|
|
|
|
|
header_format = workbook.add_format({
|
|
'bold': True,
|
|
'bg_color': '#CCCCCC',
|
|
'border': 1,
|
|
'align': 'center',
|
|
'valign': 'vcenter',
|
|
'text_wrap': True
|
|
})
|
|
|
|
number_format = workbook.add_format({
|
|
'num_format': '#,##0.00',
|
|
'align': 'right'
|
|
})
|
|
|
|
currency_format = workbook.add_format({
|
|
'num_format': '_-* #,##0.00 [$ريال]_-;-* #,##0.00 [$ريال]_-;_-* "-" [$ريال]_-;_-@_-',
|
|
'align': 'right'
|
|
})
|
|
|
|
date_format = workbook.add_format({
|
|
'num_format': 'yyyy-mm-dd',
|
|
'align': 'center'
|
|
})
|
|
|
|
text_format = workbook.add_format({
|
|
'align': 'right',
|
|
'text_wrap': True
|
|
})
|
|
|
|
|
|
for col_num, value in enumerate(df.columns.values):
|
|
worksheet.write(0, col_num + (1 if index else 0), value, header_format)
|
|
|
|
|
|
if column_widths:
|
|
for col_name, width in column_widths.items():
|
|
if col_name in df.columns:
|
|
col_idx = df.columns.get_loc(col_name) + (1 if index else 0)
|
|
worksheet.set_column(col_idx, col_idx, width)
|
|
else:
|
|
|
|
for col_num, col_name in enumerate(df.columns):
|
|
max_len = df[col_name].astype(str).map(len).max()
|
|
col_len = max(max_len, len(str(col_name))) + 2
|
|
worksheet.set_column(col_num + (1 if index else 0), col_num + (1 if index else 0), col_len)
|
|
|
|
|
|
for row_num in range(len(df)):
|
|
for col_num, col_name in enumerate(df.columns):
|
|
cell_value = df.iloc[row_num, col_num]
|
|
cell_format = text_format
|
|
|
|
|
|
if pd.api.types.is_numeric_dtype(df[col_name].dtype):
|
|
if any(curr in col_name.lower() for curr in ['سعر', 'تكلفة', 'قيمة', 'مبلغ', 'ريال']):
|
|
cell_format = currency_format
|
|
else:
|
|
cell_format = number_format
|
|
elif pd.api.types.is_datetime64_dtype(df[col_name].dtype):
|
|
cell_format = date_format
|
|
|
|
|
|
if formats and col_name in formats:
|
|
custom_format = formats[col_name]
|
|
if callable(custom_format):
|
|
|
|
cell_value = custom_format(cell_value)
|
|
else:
|
|
|
|
cell_format = custom_format
|
|
|
|
worksheet.write(row_num + 1, col_num + (1 if index else 0), cell_value, cell_format)
|
|
|
|
|
|
if freeze_panes:
|
|
worksheet.freeze_panes(*freeze_panes)
|
|
|
|
|
|
writer.close()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في كتابة ملف Excel: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def export_to_excel(data, file_path, sheet_name="Sheet1", customize_func=None):
|
|
"""
|
|
تصدير البيانات إلى ملف Excel مع خيارات تخصيص
|
|
|
|
المعلمات:
|
|
data: DataFrame أو قاموس من DataFrames للتصدير
|
|
file_path: مسار ملف Excel
|
|
sheet_name: اسم الصفحة (افتراضي: Sheet1)
|
|
customize_func: دالة لتخصيص المصنف قبل الحفظ (افتراضي: None)
|
|
|
|
الإرجاع:
|
|
True في حالة النجاح
|
|
"""
|
|
try:
|
|
|
|
create_directory_if_not_exists(os.path.dirname(file_path))
|
|
|
|
|
|
writer = pd.ExcelWriter(file_path, engine='xlsxwriter')
|
|
|
|
|
|
if isinstance(data, pd.DataFrame):
|
|
|
|
data.to_excel(writer, sheet_name=sheet_name, index=False)
|
|
elif isinstance(data, dict):
|
|
|
|
for sheet, df in data.items():
|
|
if isinstance(df, pd.DataFrame):
|
|
df.to_excel(writer, sheet_name=sheet, index=False)
|
|
else:
|
|
raise ValueError("البيانات يجب أن تكون DataFrame أو قاموس من DataFrames")
|
|
|
|
|
|
if customize_func and callable(customize_func):
|
|
customize_func(writer)
|
|
|
|
|
|
writer.close()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في تصدير البيانات إلى Excel: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def read_sheets_from_excel(file_path):
|
|
"""
|
|
قراءة جميع صفحات ملف Excel
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف Excel
|
|
|
|
الإرجاع:
|
|
قاموس من DataFrames بأسماء الصفحات كمفاتيح
|
|
"""
|
|
try:
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"الملف غير موجود: {file_path}")
|
|
|
|
|
|
ext = get_file_extension(file_path)
|
|
if ext not in ['.xlsx', '.xls', '.xlsm']:
|
|
raise ValueError(f"نوع الملف غير مدعوم: {ext}. يجب أن يكون الملف بامتداد .xlsx أو .xls أو .xlsm")
|
|
|
|
|
|
excel_file = pd.ExcelFile(file_path)
|
|
sheets = {}
|
|
|
|
for sheet_name in excel_file.sheet_names:
|
|
sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name)
|
|
|
|
return sheets
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في قراءة صفحات ملف Excel: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def create_excel_report(data_dict, file_path, formats=None, column_widths=None, title=None, subtitle=None):
|
|
"""
|
|
إنشاء تقرير Excel متقدم
|
|
|
|
المعلمات:
|
|
data_dict: قاموس من DataFrames للتصدير {sheet_name: DataFrame}
|
|
file_path: مسار ملف Excel
|
|
formats: قاموس للتنسيقات {sheet_name: {column_name: format}}
|
|
column_widths: قاموس لعرض الأعمدة {sheet_name: {column_name: width}}
|
|
title: عنوان التقرير
|
|
subtitle: العنوان الفرعي للتقرير
|
|
|
|
الإرجاع:
|
|
True في حالة النجاح
|
|
"""
|
|
try:
|
|
|
|
create_directory_if_not_exists(os.path.dirname(file_path))
|
|
|
|
|
|
writer = pd.ExcelWriter(file_path, engine='xlsxwriter')
|
|
workbook = writer.book
|
|
|
|
|
|
header_format = workbook.add_format({
|
|
'bold': True,
|
|
'bg_color': '#CCCCCC',
|
|
'border': 1,
|
|
'align': 'center',
|
|
'valign': 'vcenter',
|
|
'text_wrap': True
|
|
})
|
|
|
|
title_format = workbook.add_format({
|
|
'bold': True,
|
|
'font_size': 16,
|
|
'align': 'center',
|
|
'valign': 'vcenter',
|
|
'bg_color': '#E0E0E0',
|
|
'border': 2
|
|
})
|
|
|
|
subtitle_format = workbook.add_format({
|
|
'font_size': 12,
|
|
'align': 'center',
|
|
'valign': 'vcenter',
|
|
'bg_color': '#E0E0E0',
|
|
'border': 1
|
|
})
|
|
|
|
date_format = workbook.add_format({
|
|
'num_format': 'yyyy-mm-dd',
|
|
'align': 'center'
|
|
})
|
|
|
|
number_format = workbook.add_format({
|
|
'num_format': '#,##0.00',
|
|
'align': 'right'
|
|
})
|
|
|
|
currency_format = workbook.add_format({
|
|
'num_format': '_-* #,##0.00 [$ريال]_-;-* #,##0.00 [$ريال]_-;_-* "-" [$ريال]_-;_-@_-',
|
|
'align': 'right'
|
|
})
|
|
|
|
percent_format = workbook.add_format({
|
|
'num_format': '0.00%',
|
|
'align': 'right'
|
|
})
|
|
|
|
text_format = workbook.add_format({
|
|
'align': 'right',
|
|
'text_wrap': True
|
|
})
|
|
|
|
|
|
current_row = 0
|
|
|
|
|
|
if title or subtitle:
|
|
for sheet_name in data_dict.keys():
|
|
worksheet = workbook.add_worksheet(sheet_name)
|
|
current_row = 0
|
|
|
|
if title:
|
|
worksheet.merge_range('A1:J1', title, title_format)
|
|
current_row += 1
|
|
|
|
if subtitle:
|
|
worksheet.merge_range(f'A{current_row + 1}:J{current_row + 1}', subtitle, subtitle_format)
|
|
current_row += 1
|
|
|
|
|
|
current_row += 1
|
|
|
|
|
|
df = data_dict[sheet_name]
|
|
df.to_excel(writer, sheet_name=sheet_name, startrow=current_row, index=False)
|
|
|
|
|
|
for col_num, value in enumerate(df.columns.values):
|
|
worksheet.write(current_row, col_num, value, header_format)
|
|
|
|
|
|
if formats and sheet_name in formats:
|
|
sheet_formats = formats[sheet_name]
|
|
for col_name, fmt in sheet_formats.items():
|
|
if col_name in df.columns:
|
|
col_idx = df.columns.get_loc(col_name)
|
|
for row_num in range(len(df)):
|
|
cell_value = df.iloc[row_num, col_idx]
|
|
worksheet.write(row_num + current_row + 1, col_idx, cell_value, fmt)
|
|
|
|
|
|
if column_widths and sheet_name in column_widths:
|
|
sheet_widths = column_widths[sheet_name]
|
|
for col_name, width in sheet_widths.items():
|
|
if col_name in df.columns:
|
|
col_idx = df.columns.get_loc(col_name)
|
|
worksheet.set_column(col_idx, col_idx, width)
|
|
else:
|
|
|
|
for col_num, col_name in enumerate(df.columns):
|
|
max_len = df[col_name].astype(str).map(len).max()
|
|
col_len = max(max_len, len(str(col_name))) + 2
|
|
worksheet.set_column(col_num, col_num, col_len)
|
|
else:
|
|
|
|
for sheet_name, df in data_dict.items():
|
|
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
|
worksheet = writer.sheets[sheet_name]
|
|
|
|
|
|
for col_num, value in enumerate(df.columns.values):
|
|
worksheet.write(0, col_num, value, header_format)
|
|
|
|
|
|
if formats and sheet_name in formats:
|
|
sheet_formats = formats[sheet_name]
|
|
for col_name, fmt in sheet_formats.items():
|
|
if col_name in df.columns:
|
|
col_idx = df.columns.get_loc(col_name)
|
|
for row_num in range(len(df)):
|
|
cell_value = df.iloc[row_num, col_idx]
|
|
worksheet.write(row_num + 1, col_idx, cell_value, fmt)
|
|
|
|
|
|
if column_widths and sheet_name in column_widths:
|
|
sheet_widths = column_widths[sheet_name]
|
|
for col_name, width in sheet_widths.items():
|
|
if col_name in df.columns:
|
|
col_idx = df.columns.get_loc(col_name)
|
|
worksheet.set_column(col_idx, col_idx, width)
|
|
else:
|
|
|
|
for col_num, col_name in enumerate(df.columns):
|
|
max_len = df[col_name].astype(str).map(len).max()
|
|
col_len = max(max_len, len(str(col_name))) + 2
|
|
worksheet.set_column(col_num, col_num, col_len)
|
|
|
|
|
|
writer.close()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في إنشاء تقرير Excel: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg)
|
|
|
|
|
|
def extract_data_from_excel(file_path, columns_mapping=None, sheet_name=0, header_row=0, data_start_row=1):
|
|
"""
|
|
استخراج بيانات منظمة من ملف Excel
|
|
|
|
المعلمات:
|
|
file_path: مسار ملف Excel
|
|
columns_mapping: قاموس لتخطيط الأعمدة {اسم_العمود_الجديد: اسم_العمود_الأصلي}
|
|
sheet_name: اسم أو رقم الصفحة (افتراضي: 0)
|
|
header_row: رقم صف العناوين (افتراضي: 0)
|
|
data_start_row: رقم صف بداية البيانات (افتراضي: 1)
|
|
|
|
الإرجاع:
|
|
DataFrame من البيانات المستخرجة
|
|
"""
|
|
try:
|
|
|
|
df = pd.read_excel(
|
|
file_path,
|
|
sheet_name=sheet_name,
|
|
header=header_row,
|
|
skiprows=range(1, data_start_row) if data_start_row > 1 else None
|
|
)
|
|
|
|
|
|
df.columns = df.columns.str.strip()
|
|
|
|
|
|
if columns_mapping:
|
|
|
|
missing_columns = [col for col in columns_mapping.values() if col not in df.columns]
|
|
if missing_columns:
|
|
raise ValueError(f"الأعمدة التالية غير موجودة في الملف: {', '.join(missing_columns)}")
|
|
|
|
|
|
df = df.rename(columns={v: k for k, v in columns_mapping.items()})
|
|
|
|
|
|
df = df[list(columns_mapping.keys())]
|
|
|
|
|
|
for col in df.columns:
|
|
|
|
if df[col].dtype == 'object':
|
|
df[col] = df[col].astype(str).str.strip()
|
|
|
|
|
|
try:
|
|
df[col] = pd.to_numeric(df[col], errors='ignore')
|
|
except:
|
|
pass
|
|
|
|
return df
|
|
|
|
except Exception as e:
|
|
error_msg = f"خطأ في استخراج البيانات من ملف Excel: {str(e)}"
|
|
print(error_msg)
|
|
traceback.print_exc()
|
|
raise Exception(error_msg) |