File size: 10,131 Bytes
e212866 388e3b5 e212866 388e3b5 8568540 388e3b5 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 388e3b5 e212866 8568540 e212866 8568540 388e3b5 8568540 388e3b5 8568540 e212866 8568540 388e3b5 e212866 8568540 388e3b5 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 388e3b5 e212866 2b37229 e212866 2b37229 e212866 8568540 2b37229 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 e212866 8568540 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
import sqlite3
import json
import uuid
import datetime
import logging
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Attempt to import transformers
try:
from transformers import AutoModelForCausalLM, AutoTokenizer
TRANSFORMERS_AVAILABLE = True
except ImportError:
logging.warning("Transformers library not found. Using fallback parser.")
TRANSFORMERS_AVAILABLE = False
AutoModelForCausalLM = None
AutoTokenizer = None
# Initialize AI model (distilbert as placeholder; replace with fine-tuned model or Mistral-7B)
model_name = "distilbert-base-uncased" # Lightweight model for demo
if TRANSFORMERS_AVAILABLE:
try:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
logging.info(f"Loaded model: {model_name}")
except Exception as e:
logging.error(f"Failed to load model {model_name}: {e}")
tokenizer = None
model = None
else:
tokenizer = None
model = None
# Database setup
conn = sqlite3.connect("erp.db")
cursor = conn.cursor()
# Create tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS chart_of_accounts (
account_id TEXT PRIMARY KEY,
account_name TEXT NOT NULL,
account_type TEXT NOT NULL,
parent_id TEXT,
allow_budgeting BOOLEAN,
allow_posting BOOLEAN,
FOREIGN KEY (parent_id) REFERENCES chart_of_accounts(account_id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS journal_entries (
entry_id TEXT PRIMARY KEY,
date TEXT NOT NULL,
debit_account_id TEXT NOT NULL,
credit_account_id TEXT NOT NULL,
amount REAL NOT NULL,
description TEXT,
FOREIGN KEY (debit_account_id) REFERENCES chart_of_accounts(account_id),
FOREIGN KEY (credit_account_id) REFERENCES chart_of_accounts(account_id)
)
""")
conn.commit()
# Debit/Credit rules
ACCOUNT_RULES = {
"Asset": {"increase": "Debit", "decrease": "Credit"},
"Liability": {"increase": "Credit", "decrease": "Debit"},
"Equity": {"increase": "Credit", "decrease": "Debit"},
"Revenue": {"increase": "Credit", "decrease": "Debit"},
"Expense": {"increase": "Debit", "decrease": "Credit"}
}
# Initialize chart of accounts
def initialize_chart_of_accounts():
accounts = [
("1", "Assets", "Asset", None, True, False),
("1.1", "Fixed Assets", "Asset", "1", True, False),
("1.1.1", "Plant", "Asset", "1.1", True, True),
("1.1.2", "Machinery", "Asset", "1.1", True, True),
("1.1.3", "Building", "Asset", "1.1", True, True),
("1.2", "Current Assets", "Asset", "1", True, False),
("1.2.1", "Cash", "Asset", "1.2", True, True),
("1.2.2", "Laptop", "Asset", "1.2", True, True),
("2", "Liabilities", "Liability", None, True, False),
("2.1", "Accounts Payable", "Liability", "2", True, True),
("3", "Equity", "Equity", None, True, False),
("3.1", "Owner's Capital", "Equity", "3", True, True),
("4", "Revenue", "Revenue", None, True, False),
("4.1", "Sales", "Revenue", "4", True, True),
("5", "Expenses", "Expense", None, True, False),
("5.1", "Operating Expenses", "Expense", "5", True, True)
]
cursor.executemany("""
INSERT OR REPLACE INTO chart_of_accounts
(account_id, account_name, account_type, parent_id, allow_budgeting, allow_posting)
VALUES (?, ?, ?, ?, ?, ?)
""", accounts)
conn.commit()
logging.info("Chart of accounts initialized.")
# Parse prompt using AI model (or fallback)
def parse_prompt(prompt):
if model and tokenizer:
try:
input_text = f"""
Parse the following accounting prompt into a JSON object with:
- debit: {{account, type, amount}}
- credit: {{account, type, amount}}
- payment_method: 'cash' or 'credit' or null
Prompt: {prompt}
"""
inputs = tokenizer(input_text, return_tensors="pt")
outputs = model.generate(**inputs, max_length=300)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
return json.loads(response)
except Exception as e:
logging.warning(f"Model parsing failed: {e}. Using fallback parser.")
# Fallback parsing for common scenarios
prompt_lower = prompt.lower()
amount = None
for word in prompt_lower.split():
if word.startswith("$"):
try:
amount = float(word[1:])
break
except:
pass
if not amount:
logging.error("No amount found in prompt.")
return None
if "laptop" in prompt_lower:
debit_account = "Laptop"
debit_type = "Asset"
if "cash" in prompt_lower:
credit_account = "Cash"
credit_type = "Asset"
payment_method = "cash"
elif "credit" in prompt_lower:
credit_account = "Accounts Payable"
credit_type = "Liability"
payment_method = "credit"
else:
return {"debit": {"account": "Laptop", "type": "Asset", "amount": amount}, "credit": None, "payment_method": None}
return {
"debit": {"account": debit_account, "type": debit_type, "amount": amount},
"credit": {"account": credit_account, "type": credit_type, "amount": amount},
"payment_method": payment_method
}
logging.error("Prompt not recognized.")
return None
# Generate journal entry
def generate_journal_entry(prompt, follow_up_response=None):
parsed = parse_prompt(prompt)
if not parsed:
return "Unable to parse prompt. Please provide more details."
debit_account = parsed["debit"]["account"]
amount = parsed["debit"]["amount"]
payment_method = parsed.get("payment_method")
# Handle ambiguous payment method
if not payment_method and not follow_up_response:
return {"status": "clarify", "message": "Wonderful, did you buy on credit? (Yes/No)"}
# Determine credit account
credit_account = None
credit_type = None
if follow_up_response and follow_up_response.lower() == "yes":
credit_account = "Accounts Payable"
credit_type = "Liability"
elif payment_method == "cash":
credit_account = parsed["credit"]["account"]
credit_type = parsed["credit"]["type"]
elif payment_method == "credit":
credit_account = "Accounts Payable"
credit_type = "Liability"
else:
return "Invalid payment method specified."
# Validate accounts
cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (debit_account,))
debit_result = cursor.fetchone()
cursor.execute("SELECT account_id, account_type, allow_posting FROM chart_of_accounts WHERE account_name = ?", (credit_account,))
credit_result = cursor.fetchone()
if not debit_result or not credit_result:
return "One or both accounts not found in chart of accounts."
if not debit_result[2] or not credit_result[2]:
return "Posting not allowed for one or both accounts."
# Validate account types
if debit_result[1] != parsed["debit"]["type"] or credit_result[1] != credit_type:
return "Account type mismatch."
# Create journal entry
entry_id = str(uuid.uuid4())
date = datetime.datetime.now().isoformat()
cursor.execute("""
INSERT INTO journal_entries (entry_id, date, debit_account_id, credit_account_id, amount, description)
VALUES (?, ?, ?, ?, ?, ?)
""", (entry_id, date, debit_result[0], credit_result[0], amount, prompt))
conn.commit()
logging.info(f"Journal entry created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}")
return f"Journal Entry Created: Debit {debit_account} ${amount}, Credit {credit_account} ${amount}"
# Generate T-account
def generate_t_account(account_name):
cursor.execute("SELECT account_id FROM chart_of_accounts WHERE account_name = ?", (account_name,))
account_id = cursor.fetchone()
if not account_id:
logging.error(f"Account {account_name} not found.")
return "Account not found."
account_id = account_id[0]
try:
cursor.execute("""
SELECT date, amount, description, 'Debit' as type FROM journal_entries WHERE debit_account_id = ?
UNION
SELECT date, amount, description, 'Credit' as type FROM journal_entries WHERE credit_account_id = ?
ORDER BY date
""", (account_id, account_id))
entries = cursor.fetchall()
logging.info(f"Retrieved {len(entries)} entries for T-account: {account_name}")
except sqlite3.Error as e:
logging.error(f"SQL error in generate_t_account: {e}")
return "Error retrieving T-account data."
t_account = f"T-Account for {account_name}\n{'='*50}\n{'Debit':<20} | {'Credit':<20} | Description\n{'-'*50}\n"
debit_total = 0
credit_total = 0
for date, amount, desc, entry_type in entries:
if entry_type == "Debit":
t_account += f"${amount:<19} | {'':<20} | {desc}\n"
debit_total += amount
else:
t_account += f"{'':<20} | ${amount:<19} | {desc}\n"
credit_total += amount
t_account += f"{'-'*50}\nTotal Debit: ${debit_total:<10} | Total Credit: ${credit_total}\n"
return t_account
# Example usage
if __name__ == "__main__":
initialize_chart_of_accounts()
# Test prompt (cash purchase)
prompt = "Bought a laptop for $200 on cash"
result = generate_journal_entry(prompt)
print(result)
# Test prompt (ambiguous payment method)
prompt = "Bought a laptop for $300"
result = generate_journal_entry(prompt)
print(result)
if isinstance(result, dict) and result["status"] == "clarify":
result = generate_journal_entry(prompt, "Yes")
print(result)
# Test T-account
t_account = generate_t_account("Laptop")
print(t_account)
# Clean up
conn.close() |