import sqlite3 import json import uuid import datetime import logging import re import gradio as gr # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Disable model loading on free tier due to memory constraints TRANSFORMERS_AVAILABLE = False AutoModelForCausalLM = None AutoTokenizer = None tokenizer = None model = None # Database setup conn = sqlite3.connect("erp.db", check_same_thread=False) cursor = conn.cursor() # Create tables cursor.execute(""" CREATE TABLE IF NOT EXISTS chart_of_accounts ( account_id TEXT PRIMARY KEY, account_name TEXT NOT NULL, account_type TEXT NOT NULL, parent_id TEXT, allow_budgeting BOOLEAN, allow_posting BOOLEAN, FOREIGN KEY (parent_id) REFERENCES chart_of_accounts(account_id) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS journal_entries ( entry_id TEXT PRIMARY KEY, date TEXT NOT NULL, debit_account_id TEXT NOT NULL, credit_account_id TEXT NOT NULL, amount REAL NOT NULL, description TEXT, FOREIGN KEY (debit_account_id) REFERENCES chart_of_accounts(account_id), FOREIGN KEY (credit_account_id) REFERENCES chart_of_accounts(account_id) ) """) conn.commit() # Debit/Credit rules ACCOUNT_RULES = { "Asset": {"increase": "Debit", "decrease": "Credit"}, "Liability": {"increase": "Credit", "decrease": "Debit"}, "Equity": {"increase": "Credit", "decrease": "Debit"}, "Revenue": {"increase": "Credit", "decrease": "Debit"}, "Expense": {"increase": "Debit", "decrease": "Credit"} } # Initialize chart of accounts def initialize_chart_of_accounts(): accounts = [ ("1", "Assets", "Asset", None, True, False), ("1.1", "Fixed Assets", "Asset", "1", True, False), ("1.1.1", "Plant", "Asset", "1.1", True, True), ("1.1.2", "Machinery", "Asset", "1.1", True, True), ("1.1.3", "Building", "Asset", "1.1", True, True), ("1.2", "Current Assets", "Asset", "1", True, False), ("1.2.1", "Cash", "Asset", "1.2", True, True), ("1.2.2", "Laptop", "Asset", "1.2", True, True), ("1.2.3", "Inventory", "Asset", "1.2", True, True), ("1.2.4", "Accounts Receivable", "Asset", "1.2", True, True), ("1.2.5", "Bank", "Asset", "1.2", True, True), ("2", "Liabilities", "Liability", None, True, False), ("2.1", "Accounts Payable", "Liability", "2", True, True), ("2.2", "Loan Payable", "Liability", "2", True, True), ("3", "Equity", "Equity", None, True, False), ("3.1", "Owner's Equity", "Equity", "3", True, True), ("3.2", "Drawings", "Equity", "3", True, True), ("4", "Revenue", "Revenue", None, True, False), ("4.1", "Sales Revenue", "Revenue", "4", True, True), ("5", "Expenses", "Expense", None, True, False), ("5.1", "Rent Expense", "Expense", "5", True, True), ("5.2", "Salary Expense", "Expense", "5", True, True), ("5.3", "Office Supplies", "Expense", "5", True, True) ] cursor.executemany(""" INSERT OR REPLACE INTO chart_of_accounts (account_id, account_name, account_type, parent_id, allow_budgeting, allow_posting) VALUES (?, ?, ?, ?, ?, ?) """, accounts) conn.commit() logging.info("Chart of accounts initialized.") # Enhanced flexible parser def parse_prompt(prompt, state): logging.info(f"Parsing prompt: {prompt}") if model and tokenizer: try: input_text = f""" Parse the following accounting prompt into a JSON object with: - debit: {{account, type, amount}} - credit: {{account, type, amount}} - payment_method: 'cash', 'credit', 'bank', or null Prompt: {prompt} """ inputs = tokenizer(input_text, return_tensors="pt") outputs = model.generate(**inputs, max_length=300) response = tokenizer.decode(outputs[0], skip_special_tokens=True) return json.loads(response), state except Exception as e: logging.error(f"Model parsing failed: {e}") prompt_lower = prompt.lower().strip() amount = None match = re.search(r'\$?(\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{2})?', prompt_lower) if match: try: amount = float(match.group().replace(',', '').replace('$', '')) except ValueError: return {"error": "Invalid amount format detected. Please use a valid number (e.g., $200 or 200)."}, state if not amount and not state.get("pending_prompt"): return {"error": "No amount found in prompt. Please include an amount (e.g., $200)."}, state account_mappings = { "laptop": ("Laptop", "Asset"), "inventory": ("Inventory", "Asset"), "machinery": ("Machinery", "Asset"), "building": ("Building", "Asset"), "plant": ("Plant", "Asset"), "office supplies": ("Office Supplies", "Expense"), "cash": ("Cash", "Asset"), "bank": ("Bank", "Asset"), "receivable": ("Accounts Receivable", "Asset"), "payable": ("Accounts Payable", "Liability"), "loan": ("Loan Payable", "Liability"), "equity": ("Owner's Equity", "Equity"), "drawings": ("Drawings", "Equity"), "sales": ("Sales Revenue", "Revenue"), "rent": ("Rent Expense", "Expense"), "salary": ("Salary Expense", "Expense"), "wages": ("Salary Expense", "Expense") } debit_account = None debit_type = None credit_account = None credit_type = None payment_method = None if state.get("pending_prompt"): follow_up = prompt_lower if follow_up in ["cash", "credit", "bank"]: payment_method = follow_up parsed = state["pending_parsed"] debit_account = parsed["debit"]["account"] debit_type = parsed["debit"]["type"] amount = parsed["debit"]["amount"] if payment_method == "cash": credit_account, credit_type = "Cash", "Asset" elif payment_method == "bank": credit_account, credit_type = "Bank", "Asset" elif payment_method == "credit": credit_account, credit_type = "Accounts Payable", "Liability" state = {} # Clear state after successful follow-up else: return {"error": "Please specify 'cash', 'credit', or 'bank'."}, state else: # Detect transaction type and accounts for keyword, (account, acc_type) in account_mappings.items(): if keyword in prompt_lower: if keyword in ["bought", "purchased"]: for asset in ["laptop", "inventory", "machinery", "building", "plant", "office supplies"]: if asset in prompt_lower: debit_account, debit_type = account_mappings[asset] break if not debit_account: debit_account, debit_type = "Inventory", "Asset" # Default asset elif keyword in ["paid", "spent"]: for expense in ["rent", "salary", "wages", "office supplies"]: if expense in prompt_lower: debit_account, debit_type = account_mappings[expense] break if not debit_account: debit_account, debit_type = "Expense", "Expense" # Generic expense # Detect payment method in initial prompt if "cash" in prompt_lower: credit_account, credit_type = "Cash", "Asset" payment_method = "cash" elif "bank" in prompt_lower: credit_account, credit_type = "Bank", "Asset" payment_method = "bank" elif "credit" in prompt_lower: credit_account, credit_type = "Accounts Payable", "Liability" payment_method = "credit" elif keyword in ["sold", "sales"]: debit_account, debit_type = "Accounts Receivable", "Asset" credit_account, credit_type = account_mappings["sales"] elif keyword == "drawings": debit_account, debit_type = "Drawings", "Equity" credit_account, credit_type = "Cash", "Asset" elif keyword == "loan": debit_account, debit_type = "Cash", "Asset" credit_account, credit_type = "Loan Payable", "Liability" break if debit_account and not credit_account and not payment_method: return { "status": "clarify", "message": "Was this transaction made with cash, credit, or bank?", "pending_parsed": {"debit": {"account": debit_account, "type": debit_type, "amount": amount}} }, {"pending_prompt": prompt, "pending_parsed": {"debit": {"account": debit_account, "type": debit_type, "amount": amount}}} if debit_account and credit_account: return { "debit": {"account": debit_account, "type": debit_type, "amount": amount}, "credit": {"account": credit_account, "type": credit_type, "amount": amount}, "payment_method": payment_method }, state return {"error": "Couldn’t interpret the transaction. Try 'Paid rent $400' or 'Bought a laptop for $200'."}, state # Generate journal entry def generate_journal_entry(parsed, state): logging.info(f"Generating journal entry with parsed: {parsed}") if "error" in parsed: return parsed["error"], state if parsed.get("status") == "clarify": return parsed["message"], state debit_account = parsed["debit"]["account"] amount = parsed["debit"]["amount"] credit_account = parsed["credit"]["account"] credit_type = parsed["credit"]["type"] cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (debit_account,)) debit_result = cursor.fetchone() cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (credit_account,)) credit_result = cursor.fetchone() if not debit_result or not credit_result: return "One or both accounts not found.", state if not debit_result[2] or not credit_result[2]: return "Posting not allowed for one or both accounts.", state if debit_result[1] != parsed["debit"]["type"] or credit_result[1] != credit_type: return "Account type mismatch.", state entry_id = str(uuid.uuid4()) date = datetime.datetime.now().isoformat() description = state.get("pending_prompt", "Transaction") try: cursor.execute(""" INSERT INTO journal_entries (entry_id, date, debit_account_id, credit_account_id, amount, description) VALUES (?, ?, ?, ?, ?, ?) """, (entry_id, date, debit_result[0], credit_result[0], amount, description)) conn.commit() logging.info(f"Journal entry created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}") except sqlite3.Error as e: logging.error(f"Database error: {e}") return "Database error occurred.", state return f"Journal Entry Recorded:\nDebit: {debit_account} ${amount}\nCredit: {credit_account} ${amount}", state # Generate T-account def generate_t_account(account_name): cursor.execute("SELECT account_id FROM chart_of_accounts WHERE account_name = ?", (account_name,)) account_id = cursor.fetchone() if not account_id: logging.error(f"Account {account_name} not found.") return "Account not found." account_id = account_id[0] try: cursor.execute(""" SELECT date, amount, description, 'Debit' as type FROM journal_entries WHERE debit_account_id = ? UNION SELECT date, amount, description, 'Credit' as type FROM journal_entries WHERE credit_account_id = ? ORDER BY date """, (account_id, account_id)) entries = cursor.fetchall() logging.info(f"Retrieved {len(entries)} entries for T-account: {account_name}") except sqlite3.Error as e: logging.error(f"SQL error in generate_t_account: {e}") return "Error retrieving T-account data." t_account = f"T-Account for {account_name}\n{'='*50}\n{'Debit':<20} | {'Credit':<20} | Description\n{'-'*50}\n" debit_total = 0 credit_total = 0 for date, amount, desc, entry_type in entries: if entry_type == "Debit": t_account += f"${amount:<19} | {'':<20} | {desc}\n" debit_total += amount else: t_account += f"{'':<20} | ${amount:<19} | {desc}\n" credit_total += amount t_account += f"{'-'*50}\nTotal Debit: ${debit_total:<10} | Total Credit: ${credit_total}\n" return t_account # Gradio chat function def chat_function(message, history, state=None): if state is None: state = {} initialize_chart_of_accounts() logging.info("Initialized state and chart of accounts") logging.info(f"Received message: {message}") if message.lower().startswith("t-account "): account_name = message[10:].strip() if account_name: response = generate_t_account(account_name) else: response = "Please provide an account name (e.g., 't-account Cash')." else: parsed, state = parse_prompt(message, state) response, state = generate_journal_entry(parsed, state) if history is not None: history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": response}) else: history = [ {"role": "user", "content": message}, {"role": "assistant", "content": response} ] return history, state, "" # Clear the textbox # Gradio interface with gr.Blocks() as demo: gr.Markdown("# AI ERP System") gr.Markdown("Enter accounting prompts naturally (e.g., 'Paid rent $400' or 't-account Cash'). I’ll guide you like an experienced accountant if clarification is needed.") chatbot = gr.Chatbot(type="messages") msg = gr.Textbox(placeholder="Type your prompt here...", lines=1, submit_btn=None) clear = gr.Button("Clear") state = gr.State({}) msg.submit(chat_function, [msg, chatbot, state], [chatbot, state, msg]) clear.click(lambda: ([], {}, ""), None, [chatbot, state, msg], queue=False) # Launch Gradio if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)