|
import sqlite3 |
|
import json |
|
import uuid |
|
import datetime |
|
import logging |
|
import re |
|
import gradio as gr |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
TRANSFORMERS_AVAILABLE = False |
|
AutoModelForCausalLM = None |
|
AutoTokenizer = None |
|
tokenizer = None |
|
model = None |
|
|
|
|
|
conn = sqlite3.connect("erp.db", check_same_thread=False) |
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS chart_of_accounts ( |
|
account_id TEXT PRIMARY KEY, |
|
account_name TEXT NOT NULL, |
|
account_type TEXT NOT NULL, |
|
parent_id TEXT, |
|
allow_budgeting BOOLEAN, |
|
allow_posting BOOLEAN, |
|
FOREIGN KEY (parent_id) REFERENCES chart_of_accounts(account_id) |
|
) |
|
""") |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS journal_entries ( |
|
entry_id TEXT PRIMARY KEY, |
|
date TEXT NOT NULL, |
|
debit_account_id TEXT NOT NULL, |
|
credit_account_id TEXT NOT NULL, |
|
amount REAL NOT NULL, |
|
description TEXT, |
|
FOREIGN KEY (debit_account_id) REFERENCES chart_of_accounts(account_id), |
|
FOREIGN KEY (credit_account_id) REFERENCES chart_of_accounts(account_id) |
|
) |
|
""") |
|
conn.commit() |
|
|
|
|
|
ACCOUNT_RULES = { |
|
"Asset": {"increase": "Debit", "decrease": "Credit"}, |
|
"Liability": {"increase": "Credit", "decrease": "Debit"}, |
|
"Equity": {"increase": "Credit", "decrease": "Debit"}, |
|
"Revenue": {"increase": "Credit", "decrease": "Debit"}, |
|
"Expense": {"increase": "Debit", "decrease": "Credit"} |
|
} |
|
|
|
|
|
def initialize_chart_of_accounts(): |
|
accounts = [ |
|
("1", "Assets", "Asset", None, True, False), |
|
("1.1", "Fixed Assets", "Asset", "1", True, False), |
|
("1.1.1", "Plant", "Asset", "1.1", True, True), |
|
("1.1.2", "Machinery", "Asset", "1.1", True, True), |
|
("1.1.3", "Building", "Asset", "1.1", True, True), |
|
("1.2", "Current Assets", "Asset", "1", True, False), |
|
("1.2.1", "Cash", "Asset", "1.2", True, True), |
|
("1.2.2", "Laptop", "Asset", "1.2", True, True), |
|
("1.2.3", "Inventory", "Asset", "1.2", True, True), |
|
("1.2.4", "Accounts Receivable", "Asset", "1.2", True, True), |
|
("1.2.5", "Bank", "Asset", "1.2", True, True), |
|
("2", "Liabilities", "Liability", None, True, False), |
|
("2.1", "Accounts Payable", "Liability", "2", True, True), |
|
("2.2", "Loan Payable", "Liability", "2", True, True), |
|
("3", "Equity", "Equity", None, True, False), |
|
("3.1", "Owner's Equity", "Equity", "3", True, True), |
|
("3.2", "Drawings", "Equity", "3", True, True), |
|
("4", "Revenue", "Revenue", None, True, False), |
|
("4.1", "Sales Revenue", "Revenue", "4", True, True), |
|
("5", "Expenses", "Expense", None, True, False), |
|
("5.1", "Rent Expense", "Expense", "5", True, True), |
|
("5.2", "Salary Expense", "Expense", "5", True, True), |
|
("5.3", "Office Supplies", "Expense", "5", True, True) |
|
] |
|
cursor.executemany(""" |
|
INSERT OR REPLACE INTO chart_of_accounts |
|
(account_id, account_name, account_type, parent_id, allow_budgeting, allow_posting) |
|
VALUES (?, ?, ?, ?, ?, ?) |
|
""", accounts) |
|
conn.commit() |
|
logging.info("Chart of accounts initialized.") |
|
|
|
|
|
def parse_prompt(prompt, state): |
|
logging.info(f"Parsing prompt: {prompt}") |
|
if model and tokenizer: |
|
try: |
|
input_text = f""" |
|
Parse the following accounting prompt into a JSON object with: |
|
- debit: {{account, type, amount}} |
|
- credit: {{account, type, amount}} |
|
- payment_method: 'cash', 'credit', 'bank', or null |
|
Prompt: {prompt} |
|
""" |
|
inputs = tokenizer(input_text, return_tensors="pt") |
|
outputs = model.generate(**inputs, max_length=300) |
|
response = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
return json.loads(response), state |
|
except Exception as e: |
|
logging.error(f"Model parsing failed: {e}") |
|
|
|
prompt_lower = prompt.lower().strip() |
|
amount = None |
|
match = re.search(r'\$[\d,.]+', prompt_lower) |
|
if match: |
|
try: |
|
amount = float(match.group().replace('$', '').replace(',', '')) |
|
except ValueError: |
|
return {"error": "Invalid amount format."}, state |
|
|
|
if not amount: |
|
return {"error": "No amount found in prompt."}, state |
|
|
|
account_mappings = { |
|
"laptop": ("Laptop", "Asset"), |
|
"inventory": ("Inventory", "Asset"), |
|
"machinery": ("Machinery", "Asset"), |
|
"building": ("Building", "Asset"), |
|
"plant": ("Plant", "Asset"), |
|
"office supplies": ("Office Supplies", "Expense"), |
|
"cash": ("Cash", "Asset"), |
|
"bank": ("Bank", "Asset"), |
|
"receivable": ("Accounts Receivable", "Asset"), |
|
"sold goods": ("Sales Revenue", "Revenue"), |
|
"sales": ("Sales Revenue", "Revenue"), |
|
"rent": ("Rent Expense", "Expense"), |
|
"salary": ("Salary Expense", "Expense"), |
|
"paid": ("Cash", "Asset"), |
|
"bought": ("Laptop", "Asset"), |
|
"purchased": ("Laptop", "Asset"), |
|
"owner's draw": ("Drawings", "Equity"), |
|
"loan": ("Loan Payable", "Liability") |
|
} |
|
|
|
debit_account = None |
|
debit_type = None |
|
credit_account = None |
|
credit_type = None |
|
payment_method = None |
|
|
|
if state.get("pending_prompt"): |
|
follow_up = prompt_lower |
|
if follow_up in ["cash", "credit", "bank"]: |
|
payment_method = follow_up |
|
parsed = state["pending_parsed"] |
|
debit_account = parsed["debit"]["account"] |
|
debit_type = parsed["debit"]["type"] |
|
amount = parsed["debit"]["amount"] |
|
if payment_method == "cash": |
|
credit_account, credit_type = "Cash", "Asset" |
|
elif payment_method == "bank": |
|
credit_account, credit_type = "Bank", "Asset" |
|
elif payment_method == "credit": |
|
credit_account, credit_type = "Accounts Payable", "Liability" |
|
state = {} |
|
else: |
|
return {"error": "Please respond with 'cash', 'credit', or 'bank'."}, state |
|
else: |
|
for keyword, (account, acc_type) in account_mappings.items(): |
|
if keyword in prompt_lower: |
|
if keyword in ["bought", "purchased"]: |
|
for asset in ["laptop", "inventory", "machinery", "building", "plant", "office supplies"]: |
|
if asset in prompt_lower: |
|
debit_account, debit_type = account_mappings[asset] |
|
break |
|
if not debit_account: |
|
debit_account, debit_type = account, acc_type |
|
elif keyword in ["rent", "salary", "office supplies"]: |
|
debit_account, debit_type = account, acc_type |
|
elif keyword in ["sold goods", "sales"]: |
|
debit_account, debit_type = "Accounts Receivable", "Asset" |
|
credit_account, credit_type = account, acc_type |
|
elif keyword == "owner's draw": |
|
debit_account, debit_type = "Drawings", "Equity" |
|
credit_account, credit_type = "Cash", "Asset" |
|
elif keyword == "paid": |
|
if "rent" in prompt_lower: |
|
debit_account, debit_type = "Rent Expense", "Expense" |
|
elif "salary" in prompt_lower: |
|
debit_account, debit_type = "Salary Expense", "Expense" |
|
elif "office supplies" in prompt_lower: |
|
debit_account, debit_type = "Office Supplies", "Expense" |
|
credit_account, credit_type = "Cash", "Asset" |
|
break |
|
|
|
if "cash" in prompt_lower and not credit_account: |
|
credit_account, credit_type = "Cash", "Asset" |
|
payment_method = "cash" |
|
elif "bank" in prompt_lower and not credit_account: |
|
credit_account, credit_type = "Bank", "Asset" |
|
payment_method = "bank" |
|
elif "credit" in prompt_lower and not credit_account: |
|
credit_account, credit_type = "Accounts Payable", "Liability" |
|
payment_method = "credit" |
|
elif debit_account and not credit_account: |
|
return { |
|
"status": "clarify", |
|
"message": "Was this bought on cash, credit, or bank?", |
|
"pending_parsed": {"debit": {"account": debit_account, "type": debit_type, "amount": amount}} |
|
}, {"pending_prompt": prompt, "pending_parsed": {"debit": {"account": debit_account, "type": debit_type, "amount": amount}}} |
|
|
|
if debit_account and credit_account: |
|
return { |
|
"debit": {"account": debit_account, "type": debit_type, "amount": amount}, |
|
"credit": {"account": credit_account, "type": credit_type, "amount": amount}, |
|
"payment_method": payment_method |
|
}, state |
|
return {"error": "Prompt not recognized. Try 'Bought a laptop for $200' or 'Paid rent $400'."}, state |
|
|
|
|
|
def generate_journal_entry(parsed, state): |
|
logging.info(f"Generating journal entry with parsed: {parsed}") |
|
if "error" in parsed: |
|
return parsed["error"], state |
|
if parsed.get("status") == "clarify": |
|
return parsed["message"], state |
|
|
|
debit_account = parsed["debit"]["account"] |
|
amount = parsed["debit"]["amount"] |
|
credit_account = parsed["credit"]["account"] |
|
credit_type = parsed["credit"]["type"] |
|
|
|
cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (debit_account,)) |
|
debit_result = cursor.fetchone() |
|
cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (credit_account,)) |
|
credit_result = cursor.fetchone() |
|
|
|
if not debit_result or not credit_result: |
|
return "One or both accounts not found.", state |
|
if not debit_result[2] or not credit_result[2]: |
|
return "Posting not allowed for one or both accounts.", state |
|
if debit_result[1] != parsed["debit"]["type"] or credit_result[1] != credit_type: |
|
return "Account type mismatch.", state |
|
|
|
entry_id = str(uuid.uuid4()) |
|
date = datetime.datetime.now().isoformat() |
|
description = state.get("pending_prompt", "Transaction") |
|
try: |
|
cursor.execute(""" |
|
INSERT INTO journal_entries (entry_id, date, debit_account_id, credit_account_id, amount, description) |
|
VALUES (?, ?, ?, ?, ?, ?) |
|
""", (entry_id, date, debit_result[0], credit_result[0], amount, description)) |
|
conn.commit() |
|
logging.info(f"Journal entry created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}") |
|
except sqlite3.Error as e: |
|
logging.error(f"Database error: {e}") |
|
return "Database error occurred.", state |
|
|
|
return f"Journal Entry Created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}", state |
|
|
|
|
|
def generate_t_account(account_name): |
|
cursor.execute("SELECT account_id FROM chart_of_accounts WHERE account_name = ?", (account_name,)) |
|
account_id = cursor.fetchone() |
|
if not account_id: |
|
logging.error(f"Account {account_name} not found.") |
|
return "Account not found." |
|
|
|
account_id = account_id[0] |
|
try: |
|
cursor.execute(""" |
|
SELECT date, amount, description, 'Debit' as type FROM journal_entries WHERE debit_account_id = ? |
|
UNION |
|
SELECT date, amount, description, 'Credit' as type FROM journal_entries WHERE credit_account_id = ? |
|
ORDER BY date |
|
""", (account_id, account_id)) |
|
entries = cursor.fetchall() |
|
logging.info(f"Retrieved {len(entries)} entries for T-account: {account_name}") |
|
except sqlite3.Error as e: |
|
logging.error(f"SQL error in generate_t_account: {e}") |
|
return "Error retrieving T-account data." |
|
|
|
t_account = f"T-Account for {account_name}\n{'='*50}\n{'Debit':<20} | {'Credit':<20} | Description\n{'-'*50}\n" |
|
debit_total = 0 |
|
credit_total = 0 |
|
for date, amount, desc, entry_type in entries: |
|
if entry_type == "Debit": |
|
t_account += f"${amount:<19} | {'':<20} | {desc}\n" |
|
debit_total += amount |
|
else: |
|
t_account += f"{'':<20} | ${amount:<19} | {desc}\n" |
|
credit_total += amount |
|
t_account += f"{'-'*50}\nTotal Debit: ${debit_total:<10} | Total Credit: ${credit_total}\n" |
|
|
|
return t_account |
|
|
|
|
|
def chat_function(message, history, state=None): |
|
if state is None: |
|
state = {} |
|
initialize_chart_of_accounts() |
|
logging.info("Initialized state and chart of accounts") |
|
|
|
logging.info(f"Received message: {message}") |
|
|
|
if message.lower().startswith("t-account "): |
|
account_name = message[10:].strip() |
|
if account_name: |
|
response = generate_t_account(account_name) |
|
else: |
|
response = "Please specify an account name." |
|
else: |
|
parsed, state = parse_prompt(message, state) |
|
response, state = generate_journal_entry(parsed, state) |
|
|
|
if history is not None: |
|
history.append({"role": "user", "content": message}) |
|
history.append({"role": "assistant", "content": response}) |
|
else: |
|
history = [ |
|
{"role": "user", "content": message}, |
|
{"role": "assistant", "content": response} |
|
] |
|
|
|
return history, state, "" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# AI ERP System") |
|
gr.Markdown("Enter accounting prompts like 'Bought a laptop for $200' or 't-account Laptop'. The system will ask for clarification if needed.") |
|
chatbot = gr.Chatbot(type="messages") |
|
msg = gr.Textbox(placeholder="Type your prompt here...", lines=1, submit_btn=None) |
|
clear = gr.Button("Clear") |
|
|
|
state = gr.State({}) |
|
|
|
msg.submit(chat_function, [msg, chatbot, state], [chatbot, state, msg]) |
|
clear.click(lambda: ([], {}, ""), None, [chatbot, state, msg], queue=False) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |