Wahbi-AI / modules /data_analysis /data_analysis_app.py
EGYADMIN's picture
Update modules/data_analysis/data_analysis_app.py
ba1993f verified
raw
history blame
15.5 kB
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import io
import os
import json
from pathlib import Path
class DataAnalysisApp:
def __init__(self):
self.data = None
self.file_path = None
def render(self):
"""عرض واجهة تحليل البيانات"""
st.title("تحليل البيانات")
self.run()
# تهيئة حالة الجلسة
if 'analysis_data' not in st.session_state:
st.session_state.analysis_data = {
'uploaded_files': {},
'analysis_results': {},
'ai_insights': {}
}
def run(self):
st.title("تحليل البيانات المتقدم")
# إنشاء التبويبات
tabs = st.tabs([
"تحميل وإدارة البيانات",
"التحليل الإحصائي",
"التحليل المرئي",
"تحليل الذكاء الاصطناعي",
"التقارير"
])
with tabs[0]:
self._render_data_management()
with tabs[1]:
self._render_statistical_analysis()
with tabs[2]:
self._render_visualization()
with tabs[3]:
self._render_ai_analysis()
with tabs[4]:
self._render_reports()
def _render_data_management(self):
st.header("تحميل وإدارة البيانات")
# منطقة تحميل الملفات
uploaded_files = st.file_uploader(
"قم بتحميل ملفات البيانات",
type=["csv", "xlsx", "xls", "pdf"],
accept_multiple_files=True,
key="data_files"
)
if uploaded_files:
for file in uploaded_files:
try:
if file.name.endswith('.pdf'):
import PyPDF2
pdf_reader = PyPDF2.PdfReader(file)
text_content = ""
for page in pdf_reader.pages:
text_content += page.extract_text()
st.session_state.analysis_data['uploaded_files'][file.name] = {
'data': text_content,
'metadata': {
'pages': len(pdf_reader.pages),
'upload_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
st.success(f"تم تحميل الملف PDF {file.name} بنجاح!")
elif file.name.endswith('.csv'):
df = pd.read_csv(file)
st.session_state.analysis_data['uploaded_files'][file.name] = {
'data': df,
'metadata': {
'rows': len(df),
'columns': len(df.columns),
'upload_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
st.success(f"تم تحميل الملف {file.name} بنجاح!")
else:
df = pd.read_excel(file)
st.session_state.analysis_data['uploaded_files'][file.name] = {
'data': df,
'metadata': {
'rows': len(df),
'columns': len(df.columns),
'upload_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
}
st.success(f"تم تحميل الملف {file.name} بنجاح!")
except Exception as e:
st.error(f"خطأ في تحميل الملف {file.name}: {str(e)}")
# عرض الملفات المحملة
if st.session_state.analysis_data['uploaded_files']:
st.subheader("الملفات المحملة")
for filename, file_info in st.session_state.analysis_data['uploaded_files'].items():
with st.expander(f"📄 {filename}"):
st.write("معلومات الملف:")
if 'rows' in file_info['metadata']:
st.write(f"- عدد الصفوف: {file_info['metadata']['rows']}")
if 'columns' in file_info['metadata']:
st.write(f"- عدد الأعمدة: {file_info['metadata']['columns']}")
if 'pages' in file_info['metadata']:
st.write(f"- عدد الصفحات: {file_info['metadata']['pages']}")
st.write(f"- وقت التحميل: {file_info['metadata']['upload_time']}")
if st.button(f"عرض البيانات", key=f"show_{filename}"):
st.write(file_info['data'].head() if isinstance(file_info['data'], pd.DataFrame) else file_info['data'])
def _render_statistical_analysis(self):
st.header("التحليل الإحصائي")
if not st.session_state.analysis_data['uploaded_files']:
st.info("الرجاء تحميل البيانات أولاً")
return
# اختيار الملف للتحليل
selected_file = st.selectbox(
"اختر الملف للتحليل",
list(st.session_state.analysis_data['uploaded_files'].keys())
)
if selected_file:
data = st.session_state.analysis_data['uploaded_files'][selected_file]['data']
if isinstance(data, pd.DataFrame):
# الإحصاءات الوصفية
st.subheader("الإحصاءات الوصفية")
st.dataframe(data.describe())
# تحليل القيم المفقودة
st.subheader("تحليل القيم المفقودة")
missing_data = pd.DataFrame({
'العمود': data.columns,
'عدد القيم المفقودة': data.isnull().sum(),
'نسبة القيم المفقودة (%)': (data.isnull().sum() / len(data) * 100).round(2)
})
st.dataframe(missing_data)
else:
st.info("لا يمكن إجراء تحليل إحصائي على ملفات PDF.")
def _render_visualization(self):
st.header("التحليل المرئي")
if not st.session_state.analysis_data['uploaded_files']:
st.info("الرجاء تحميل البيانات أولاً")
return
selected_file = st.selectbox(
"اختر الملف للتحليل المرئي",
list(st.session_state.analysis_data['uploaded_files'].keys()),
key="viz_file_select"
)
if selected_file:
data = st.session_state.analysis_data['uploaded_files'][selected_file]['data']
if isinstance(data, pd.DataFrame):
# اختيار نوع المخطط
chart_type = st.selectbox(
"اختر نوع المخطط",
["رسم بياني شريطي", "رسم بياني خطي", "رسم بياني دائري", "مخطط التشتت", "مخطط الصندوق"],
key="chart_type"
)
# تخصيص المخطط
if chart_type == "رسم بياني شريطي":
x_col = st.selectbox("اختر محور X", data.columns, key="bar_x")
y_col = st.selectbox("اختر محور Y", data.select_dtypes(include=['number']).columns, key="bar_y")
fig = px.bar(data, x=x_col, y=y_col)
st.plotly_chart(fig, use_container_width=True)
elif chart_type == "رسم بياني خطي":
x_col = st.selectbox("اختر محور X", data.columns, key="line_x")
y_cols = st.multiselect("اختر محاور Y", data.select_dtypes(include=['number']).columns, key="line_y")
if y_cols:
fig = go.Figure()
for y_col in y_cols:
fig.add_trace(go.Scatter(x=data[x_col], y=data[y_col], mode='lines+markers', name=y_col))
fig.update_layout(title=f"مخطط خطي", xaxis_title=x_col, yaxis_title="القيمة")
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("الرجاء اختيار عمود واحد على الأقل للمحور الرأسي")
elif chart_type == "رسم بياني دائري":
col = st.selectbox("اختر العمود", data.columns, key="pie_column")
value_counts_df = data[col].value_counts().reset_index()
value_counts_df.columns = ['القيمة', 'العدد']
fig = px.pie(value_counts_df, names='القيمة', values='العدد', title=f"توزيع {col}")
st.plotly_chart(fig, use_container_width=True)
elif chart_type == "مخطط التشتت":
numeric_columns = data.select_dtypes(include=['number']).columns.tolist()
if len(numeric_columns) < 2:
st.warning("يجب أن يكون هناك عمودان رقميان على الأقل لإنشاء مخطط تشتت")
return
x_column = st.selectbox("اختر عمود المحور الأفقي (x):", numeric_columns, key="scatter_x")
y_column = st.selectbox("اختر عمود المحور الرأسي (y):", numeric_columns, key="scatter_y")
fig = px.scatter(data, x=x_column, y=y_column)
st.plotly_chart(fig, use_container_width=True)
elif chart_type == "مخطط الصندوق":
numeric_columns = data.select_dtypes(include=['number']).columns.tolist()
if not numeric_columns:
st.warning("يجب أن يكون هناك عمود رقمي واحد على الأقل لإنشاء مخطط صندوقي")
return
y_column = st.selectbox("اختر عمود القيمة:", numeric_columns, key="box_y")
fig = px.box(data, y=y_column, title=f"مخطط صندوقي لـ {y_column}")
st.plotly_chart(fig, use_container_width=True)
else:
st.info("لا يمكن إنشاء مخططات مرئية من ملفات PDF.")
def _render_ai_analysis(self):
st.header("تحليل الذكاء الاصطناعي")
if not st.session_state.analysis_data['uploaded_files']:
st.info("الرجاء تحميل البيانات أولاً")
return
selected_file = st.selectbox(
"اختر الملف للتحليل",
list(st.session_state.analysis_data['uploaded_files'].keys()),
key="ai_file_select"
)
if selected_file:
data = st.session_state.analysis_data['uploaded_files'][selected_file]['data']
if isinstance(data, pd.DataFrame):
analysis_type = st.selectbox(
"اختر نوع التحليل",
["تحليل الاتجاهات", "التنبؤ", "اكتشاف الأنماط", "تحليل العلاقات"]
)
if st.button("تحليل البيانات"):
with st.spinner("جاري تحليل البيانات..."):
# محاكاة تحليل الذكاء الاصطناعي
st.session_state.analysis_data['ai_insights'][selected_file] = {
'trends': self._analyze_trends(data),
'patterns': self._analyze_patterns(data),
'correlations': self._analyze_correlations(data)
}
st.success("تم اكتمال التحليل!")
# عرض النتائج
st.json(st.session_state.analysis_data['ai_insights'][selected_file])
else:
st.info("لا يمكن إجراء تحليل ذكاء اصطناعي على ملفات PDF.")
def _render_reports(self):
st.header("التقارير")
if not st.session_state.analysis_data['uploaded_files']:
st.info("الرجاء تحميل البيانات أولاً")
return
# إنشاء تقرير
if st.button("إنشاء تقرير تحليلي شامل"):
report_data = self._generate_comprehensive_report()
# تصدير التقرير
output = io.BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
for sheet_name, data in report_data.items():
pd.DataFrame(data).to_excel(writer, sheet_name=sheet_name)
st.download_button(
label="تحميل التقرير",
data=output.getvalue(),
file_name=f"analytical_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
def _analyze_trends(self, df):
"""تحليل الاتجاهات في البيانات"""
# محاكاة تحليل الاتجاهات
return {
"trend_1": "اتجاه تصاعدي في المبيعات",
"trend_2": "انخفاض في التكاليف التشغيلية",
"trend_3": "زيادة في رضا العملاء"
}
def _analyze_patterns(self, df):
"""اكتشاف الأنماط في البيانات"""
# محاكاة اكتشاف الأنماط
return {
"pattern_1": "نمط موسمي في الطلب",
"pattern_2": "نمط دوري في الإنتاج",
"pattern_3": "نمط جغرافي في التوزيع"
}
def _analyze_correlations(self, df):
"""تحليل العلاقات بين المتغيرات"""
# محاكاة تحليل العلاقات
numeric_cols = df.select_dtypes(include=['number']).columns
correlations = df[numeric_cols].corr().round(2).to_dict()
return correlations
def _generate_comprehensive_report(self):
"""إنشاء تقرير شامل"""
report = {
'ملخص_البيانات': {},
'التحليل_الإحصائي': {},
'تحليل_الذكاء_الاصطناعي': {},
'التوصيات': {}
}
for filename, file_info in st.session_state.analysis_data['uploaded_files'].items():
report['ملخص_البيانات'][filename] = file_info['metadata']
if 'ai_insights' in st.session_state.analysis_data:
report['تحليل_الذكاء_الاصطناعي'] = st.session_state.analysis_data['ai_insights']
return report