approval / app.py
EricGEGE's picture
Update app.py
f6be584 verified
raw
history blame
23.6 kB
import pandas as pd
import gradio as gr
from datetime import datetime
import os
import json
import pymysql
from sqlalchemy import create_engine, Column, Integer, String, Float, Text, DateTime, inspect
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 阿里云数据库设置
DB_HOST = os.getenv('DB_HOST') # 阿里云数据库主机地址
DB_PORT = int(os.getenv('DB_PORT', '3306')) # 数据库端口,默认3306
DB_USER = os.getenv('DB_USER') # 数据库用户名
DB_PASSWORD = os.getenv('DB_PASSWORD') # 数据库密码
DBNAME = os.getenv('DB_PASSWORD')
DB_NAME = os.getenv('DBNAME') # 数据库名称
log = os.getenv('USER') # 登录用户名
password = os.getenv('PASSWORD') # 登录密码
# 创建数据库连接
def get_db_connection():
try:
connection = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return connection
except Exception as e:
print(f"数据库连接错误: {e}")
return None
# 创建SQLAlchemy引擎和会话
def get_db_session():
try:
engine = create_engine(f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')
Session = sessionmaker(bind=engine)
return Session(), engine
except Exception as e:
print(f"SQLAlchemy会话创建错误: {e}")
return None, None
# 定义数据库模型
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
role = Column(String(20), nullable=False)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'role': self.role
}
class PurchaseOrder(Base):
__tablename__ = 'purchase_orders'
id = Column(Integer, primary_key=True, autoincrement=True)
付款抬头 = Column(String(100))
合同号 = Column(String(50))
运编号 = Column(String(50))
采购编号 = Column(String(50), unique=True)
车辆信息 = Column(Text)
销售合同价 = Column(Float)
采购价 = Column(Float)
费用名称 = Column(String(50))
费用金额 = Column(Float)
采购款收款抬头 = Column(String(100))
采购款收款账号信息 = Column(Text)
业务员 = Column(String(50))
客户名称 = Column(String(100))
客户国家 = Column(String(50))
在途状态 = Column(String(50))
开票类型 = Column(String(20))
收款状态 = Column(String(50))
审批状态 = Column(String(50))
退款说明 = Column(Text)
备注 = Column(Text)
timestamp = Column(DateTime, default=datetime.now)
毛利 = Column(Float)
def to_dict(self):
return {
'id': self.id,
'付款抬头': self.付款抬头,
'合同号': self.合同号,
'运编号': self.运编号,
'采购编号': self.采购编号,
'车辆信息': self.车辆信息,
'销售合同价': self.销售合同价,
'采购价': self.采购价,
'费用名称': self.费用名称,
'费用金额': self.费用金额,
'采购款收款抬头': self.采购款收款抬头,
'采购款收款账号信息': self.采购款收款账号信息,
'业务员': self.业务员,
'客户名称': self.客户名称,
'客户国家': self.客户国家,
'在途状态': self.在途状态,
'开票类型': self.开票类型,
'收款状态': self.收款状态,
'审批状态': self.审批状态,
'退款说明': self.退款说明,
'备注': self.备注,
'timestamp': self.timestamp.strftime('%Y-%m-%d') if self.timestamp else None,
'毛利': self.毛利
}
# 初始化数据库
def init_database():
session, engine = get_db_session()
if not session or not engine:
return False
try:
# 检查表是否存在
conn = engine.connect()
inspector = inspect(engine)
# 创建表(如果不存在)
if not inspector.has_table('users'):
print("创建users表")
User.__table__.create(engine)
if not inspector.has_table('purchase_orders'):
print("创建purchase_orders表")
PurchaseOrder.__table__.create(engine)
# 检查用户表是否为空,如果为空则添加初始用户
if session.query(User).count() == 0:
new_users = [
User(id=1, username='cevcaigou159', role='采购'),
User(id=2, username='cevdanzheng789', role='单证'),
User(id=3, username='cevcaiwu369', role='财务'),
User(id=4, username='cevboss888', role='总经理'),
User(id=5, username='cevcaigou357', role='采购'),
User(id=6, username='investor888', role='观察员'),
User(id=7, username='cevcaigou888', role='采购经理'),
User(id=8, username='cevcaigou005', role='采购'),
User(id=9, username='cevchuna001', role='出纳'),
User(id=10, username='cevcaigou359', role='采购'),
User(id=11, username='cevcaigou288', role='采购')
]
session.add_all(new_users)
session.commit()
session.close()
return True
except Exception as e:
print(f"初始化数据库错误: {e}")
session.rollback()
session.close()
return False
# 初始化数据库
init_database()
# 审批流程控制
def get_next_approval_status(current_status):
try:
if current_status == '待采购经理/总经理审批':
return '待财务审批'
elif current_status == '待财务审批':
return '已批准'
elif current_status == '已批准':
return '已付款'
return current_status
except Exception as e:
return "操作有误: {}".format(e)
def approve_order(order_id, approver_role):
try:
session, _ = get_db_session()
if not session:
return '数据库连接失败'
order = session.query(PurchaseOrder).filter(PurchaseOrder.采购编号 == order_id).first()
if not order:
session.close()
return '订单未找到'
current_status = order.审批状态
if (current_status == '待采购经理/总经理审批' and approver_role not in ['采购经理', '总经理']) or \
(current_status == '待财务审批' and approver_role != '财务') or \
(current_status == '待出纳审批' and approver_role != '出纳'):
session.close()
return f'只有{current_status}角色可以批准当前订单'
next_status = get_next_approval_status(current_status)
order.审批状态 = next_status
order.timestamp = datetime.now()
session.commit()
session.close()
return '订单已批准'
except Exception as e:
if session:
session.rollback()
session.close()
return "操作有误: {}".format(e)
# 全局会话状态
session_state = {'user_id': None, 'username': None, 'role': None, 'order_id': None}
def login(username):
try:
session, _ = get_db_session()
if not session:
return '数据库连接失败'
user = session.query(User).filter(User.username == username).first()
if user:
session_state['user_id'] = user.id
session_state['username'] = user.username
session_state['role'] = user.role
session.close()
return "登录成功,用户:{}".format(username)
else:
session.close()
return "用户未找到"
except Exception as e:
if session:
session.close()
return "操作有误: {}".format(e)
def submit_order1(
username, 付款抬头,
运编号, 车辆信息, 销售合同价, 采购价,
费用名称, 费用金额, 采购款收款抬头, 采购款收款账号信息,
在途状态, 开票类型,
收款状态, 退款说明, 备注
):
try:
global profit
login(username)
if session_state['role'] not in ['采购', '采购经理', '单证']:
return "只有采购角色可以提交订单"
session, _ = get_db_session()
if not session:
return '数据库连接失败'
# 生成采购订单编号
prefix = 'DING00000'
if session_state['username'] == 'cevcaigou159':
prefix = 'SHEN00000'
elif session_state['username'] == 'cevcaigou888':
prefix = 'NIU00000'
elif session_state['username'] == 'cevdanzheng789':
prefix = 'CHEN00000'
elif session_state['username'] == 'cevcaigou359':
prefix = 'ZHUANG00000'
elif session_state['username'] == 'cevcaigou288':
prefix = 'GU00000'
# 查询已有订单数量
existing_count = session.query(PurchaseOrder).filter(
PurchaseOrder.采购编号.like(f"{prefix}%")
).count()
n = existing_count + 900
i = n + 1
order_name = f"{prefix}{i}"
# 查找现有运编号记录
existing_order = session.query(PurchaseOrder).filter(
PurchaseOrder.运编号 == 运编号
).first()
if not existing_order:
session.close()
return "未找到相关运编号记录"
profit = float(销售合同价) * 7 - float(采购价) + float(采购价) * 0.115
# 创建新订单
new_order = PurchaseOrder(
付款抬头=付款抬头,
合同号=existing_order.合同号,
运编号=运编号,
采购编号=order_name,
车辆信息=车辆信息,
销售合同价=float(销售合同价),
采购价=float(采购价),
费用名称=费用名称,
费用金额=float(费用金额) if 费用金额 else 0,
采购款收款抬头=采购款收款抬头,
采购款收款账号信息=采购款收款账号信息,
业务员=existing_order.业务员,
客户名称=existing_order.客户名称,
客户国家=existing_order.客户国家,
在途状态=在途状态,
开票类型=开票类型,
收款状态=收款状态,
审批状态='待采购经理/总经理审批',
退款说明=退款说明,
备注=备注,
timestamp=datetime.now(),
毛利=profit
)
session.add(new_order)
session.commit()
session.close()
session_state.clear()
return "订单已提交"
except Exception as e:
if session:
session.rollback()
session.close()
return "操作有误: {}".format(e)
def submit_order2(
username,
付款抬头, 合同号, 运编号, 车辆信息, 销售合同价, 采购价,
费用名称, 费用金额, 采购款收款抬头, 采购款收款账号信息,
业务员, 客户名称, 客户国家, 在途状态, 开票类型,
收款状态, 退款说明, 备注
):
try:
global profit
login(username)
if session_state['role'] not in ['采购', '采购经理', '单证']:
return "只有采购角色可以提交订单"
session, _ = get_db_session()
if not session:
return '数据库连接失败'
# 生成采购订单编号
prefix = 'DING00000'
if session_state['username'] == 'cevcaigou159':
prefix = 'SHEN00000'
elif session_state['username'] == 'cevcaigou888':
prefix = 'NIU00000'
elif session_state['username'] == 'cevdanzheng789':
prefix = 'CHEN00000'
elif session_state['username'] == 'cevcaigou359':
prefix = 'ZHUANG00000'
elif session_state['username'] == 'cevcaigou288':
prefix = 'GU00000'
# 查询已有订单数量
existing_count = session.query(PurchaseOrder).filter(
PurchaseOrder.采购编号.like(f"{prefix}%")
).count()
n = existing_count + 900
i = n + 1
order_name = f"{prefix}{i}"
profit = float(销售合同价) * 7.25 - float(采购价) + float(采购价) * 0.115
# 创建新订单
new_order = PurchaseOrder(
付款抬头=付款抬头,
合同号=合同号,
运编号=运编号,
采购编号=order_name,
车辆信息=车辆信息,
销售合同价=float(销售合同价),
采购价=float(采购价),
费用名称=费用名称,
费用金额=float(费用金额) if 费用金额 else 0,
采购款收款抬头=采购款收款抬头,
采购款收款账号信息=采购款收款账号信息,
业务员=业务员,
客户名称=客户名称,
客户国家=客户国家,
在途状态=在途状态,
开票类型=开票类型,
收款状态=收款状态,
审批状态='待采购经理/总经理审批',
退款说明=退款说明,
备注=备注,
timestamp=datetime.now(),
毛利=profit
)
session.add(new_order)
session.commit()
session.close()
session_state.clear()
return "订单已提交"
except Exception as e:
if session:
session.rollback()
session.close()
return "操作有误: {}".format(e)
def format_order(order):
try:
return f"""
<table border="1">
<tr><th>审批状态</b></th><td><b>{order['审批状态']}</b></td></tr>
<tr><th>付款抬头</th><td>{order['付款抬头']}</td></tr>
<tr><th>合同号</th><td>{order['合同号']}</td></tr>
<tr><th>运编号</th><td>{order['运编号']}</td></tr>
<tr><th>采购编号</th><td>{order['采购编号']}</td></tr>
<tr><th>车辆信息</th><td>{order['车辆信息']}</td></tr>
<tr><th>销售合同价(FOB美元)</th><td>{order['销售合同价']}</td></tr>
<tr><th>采购价(人民币)</th><td>{order['采购价']}</td></tr>
<tr><th>费用名称</th><td>{order['费用名称']}</td></tr>
<tr><th>费用金额</th><td>{order['费用金额']}</td></tr>
<tr><th>采购款收款抬头</th><td>{order['采购款收款抬头']}</td></tr>
<tr><th>采购款收款账号信息</th><td>{order['采购款收款账号信息']}</td></tr>
<tr><th>业务员</th><td>{order['业务员']}</td></tr>
<tr><th>客户名称</th><td>{order['客户名称']}</td></tr>
<tr><th>客户国家</th><td>{order['客户国家']}</td></tr>
<tr><th>在途状态</th><td>{order['在途状态']}</td></tr>
<tr><th>开票类型</th><td>{order['开票类型']}</td></tr>
<tr><th>收款状态</th><td>{order['收款状态']}</td></tr>
<tr><th>退款说明</th><td>{order['退款说明']}</td></tr>
<tr><th>备注</th><td>{order['备注']}</td></tr>
<tr><th>时间</th><td>{order['timestamp']}</td></tr>
<tr><th>毛利</th><td>{order['毛利']}</td></tr>
<tr><th>当前身份</th><td>{session_state['username']}</td></tr>
</table>
"""
except Exception as e:
return "操作有误: {}".format(e)
def display_orders(username):
try:
login(username)
allowed_roles = [
'采购', '单证', '财务',
'总经理', '观察员', '采购经理', '出纳'
]
if session_state['role'] not in allowed_roles:
return ["请登录后查看订单"]
session, _ = get_db_session()
if not session:
return ["数据库连接失败"]
# 将排序条件从timestamp改为id,并使用desc()实现倒序
orders = session.query(PurchaseOrder).order_by(PurchaseOrder.id.desc()).all()
order_list = [format_order(order.to_dict()) for order in orders]
session.close()
session_state.clear()
return order_list
except Exception as e:
if session:
session.close()
return ["操作有误: {}".format(e)]
def approve_order_interface(username, order_id):
try:
login(username)
allowed_roles = ['财务', '总经理', '采购经理', '出纳']
if session_state['role'] not in allowed_roles:
return "您没有批准订单的权限"
role1 = session_state['role']
result = approve_order(order_id, role1)
session_state.clear()
return result
except Exception as e:
return "操作有误: {}".format(e)
with gr.Blocks() as app:
with gr.TabItem("提交费用(旧运编号)"):
username_submit = gr.Textbox(label="用户名")
付款抬头 = gr.Textbox(label="付款抬头")
运编号 = gr.Textbox(label="运编号")
车辆信息 = gr.Textbox(label="车辆信息")
销售合同价 = gr.Textbox(label="销售合同价(FOB美元)")
采购价 = gr.Textbox(label="采购价(人民币)")
费用名称 = gr.Dropdown(
choices=[
'采车定金', '采车定金(抵车款)', '采车定金(全额退回)', '采车定金(部分退回)',
'采车尾款(含背户费)', '采车尾款', '背户手续费', '系统刷机费', '充电桩费',
'保险费(短期交强)', '保险费(长期交强)', '保险费(全险)', '国内运费(市内)', '国内运费',
'购置税费', '仓储费', '检测报告费', '车辆用品费', '车辆改装费', '港杂费', '海运费', '海运费+港杂费',
'其他费用'
],
label="费用名称")
费用金额 = gr.Textbox(label="费用金额")
采购款收款抬头 = gr.Textbox(label="采购款收款抬头")
采购款收款账号信息 = gr.Textbox(label="采购款收款账号信息")
在途状态 = gr.Dropdown(
choices=[
'工厂', '4S店', '车管所',
'改装店', '运输中', '物流站点', '货代仓库', '装箱',
'发运', '到达目的港'
],
label="在途状态")
开票类型 = gr.Dropdown(
choices=[
'增票', '普票', '低开', '无票'
],
label="开票类型")
收款状态 = gr.Textbox(label="收款状态")
退款说明 = gr.Textbox(label="退款说明")
备注 = gr.Textbox(label="备注")
submit_button = gr.Button("提交订单")
submit_output = gr.HTML(label="输出")
submit_button.click(fn=submit_order1, inputs=[
username_submit, 付款抬头,运编号,车辆信息, 销售合同价, 采购价,
费用名称, 费用金额, 采购款收款抬头, 采购款收款账号信息,
在途状态, 开票类型,
收款状态, 退款说明, 备注
], outputs=submit_output)
with gr.TabItem("提交订单(新运编号)"):
username_submit = gr.Textbox(label="用户名")
运编号_submit = gr.Textbox(label="运编号")
付款抬头 = gr.Textbox(label="付款抬头")
合同号 = gr.Textbox(label="合同号")
车辆信息 = gr.Textbox(label="车辆信息")
销售合同价 = gr.Textbox(label="销售合同价(FOB美元)")
采购价 = gr.Textbox(label="采购价(人民币)")
费用名称 = gr.Dropdown(
choices=[
'采车定金', '采车定金(抵车款)', '采车定金(全额退回)', '采车定金(部分退回)',
'采车尾款(含背户费)', '采车尾款', '背户手续费', '系统刷机费', '充电桩费',
'保险费(短期交强)', '保险费(长期交强)', '保险费(全险)', '国内运费(市内)', '国内运费',
'购置税费', '仓储费', '检测报告费', '车辆用品费', '车辆改装费', '港杂费', '海运费', '海运费+港杂费',
'其他费用'
],
label="费用名称")
费用金额 = gr.Textbox(label="费用金额")
采购款收款抬头 = gr.Textbox(label="采购款收款抬头")
采购款收款账号信息 = gr.Textbox(label="采购款收款账号信息")
业务员 = gr.Textbox(label="业务员")
客户名称 = gr.Textbox(label="客户名称")
客户国家 = gr.Textbox(label="客户国家")
在途状态 = gr.Dropdown(
choices=[
'工厂', '4S店', '车管所',
'改装店', '运输中', '物流站点', '货代仓库', '装箱',
'发运', '到达目的港'
],
label="在途状态")
开票类型 = gr.Dropdown(
choices=[
'增票', '普票', '低开', '无票'
],
label="开票类型")
收款状态 = gr.Textbox(label="收款状态")
退款说明 = gr.Textbox(label="退款说明")
备注 = gr.Textbox(label="备注")
submit_button = gr.Button("提交订单")
submit_output = gr.HTML(label="输出")
submit_button.click(fn=submit_order2, inputs=[
username_submit, 付款抬头, 合同号, 运编号_submit, 车辆信息, 销售合同价, 采购价,
费用名称, 费用金额, 采购款收款抬头, 采购款收款账号信息,
业务员, 客户名称, 客户国家, 在途状态, 开票类型,
收款状态, 退款说明, 备注
], outputs=submit_output)
with gr.TabItem("所有订单"):
username_display = gr.Textbox(label="用户名")
display_button = gr.Button("显示所有订单")
orders_output = gr.HTML(label="订单")
display_button.click(fn=display_orders, inputs=username_display, outputs=orders_output)
with gr.TabItem("批准订单"):
username_approve = gr.Textbox(label="用户名")
order_id = gr.Textbox(label="采购编号")
approve_order_button = gr.Button("批准订单")
approve_order_output = gr.HTML(label="输出")
approve_order_button.click(fn=approve_order_interface, inputs=[username_approve, order_id],
outputs=approve_order_output)
app.launch(auth=(log, password))