Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
import gradio as gr | |
import pandas as pd | |
import pdfplumber | |
import pytesseract | |
from pdf2image import convert_from_path | |
from huggingface_hub import InferenceClient | |
# Initialize with a reliable free model that supports text-generation | |
hf_token = os.getenv("HF_TOKEN") | |
client = InferenceClient(model="mistralai/Mistral-7B-Instruct-v0.2", token=hf_token) | |
def extract_excel_data(file_path): | |
"""Extract text from Excel file""" | |
df = pd.read_excel(file_path, engine='openpyxl') | |
return df.to_string(index=False) | |
def extract_text_from_pdf(pdf_path, is_scanned=False): | |
"""Extract text from PDF with fallback OCR""" | |
try: | |
# Try native PDF extraction first | |
with pdfplumber.open(pdf_path) as pdf: | |
text = "" | |
for page in pdf.pages: | |
text += page.extract_text() + "\n" | |
return text | |
except Exception as e: | |
print(f"Native PDF extraction failed: {str(e)}") | |
# Fallback to OCR for scanned PDFs | |
images = convert_from_path(pdf_path, dpi=200) | |
text = "" | |
for image in images: | |
text += pytesseract.image_to_string(image) + "\n" | |
return text | |
def parse_bank_statement(text): | |
"""Parse bank statement using LLM with fallback to rule-based parser""" | |
cleaned_text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) | |
print(f"Original text sample: {cleaned_text[:200]}...") | |
# Craft precise prompt with strict JSON formatting instructions | |
prompt = f""" | |
<|system|> | |
You are a financial data parser. Extract transactions from bank statements and return ONLY valid JSON. | |
</s> | |
<|user|> | |
Extract all transactions from this bank statement with these exact fields: | |
- date (format: YYYY-MM-DD) | |
- description | |
- amount (format: 0.00) | |
- debit (format: 0.00) | |
- credit (format: 0.00) | |
- closing_balance (format: 0.00 or -0.00 for negative) | |
- category | |
Statement text: | |
{cleaned_text[:3000]} [truncated if too long] | |
Return JSON with this exact structure: | |
{{ | |
"transactions": [ | |
{{ | |
"date": "2025-05-08", | |
"description": "Company XYZ Payroll", | |
"amount": "8315.40", | |
"debit": "0.00", | |
"credit": "8315.40", | |
"closing_balance": "38315.40", | |
"category": "Salary" | |
}}, | |
{{ | |
"date": "2025-05-19", | |
"description": "Whole Foods", | |
"amount": "142.21", | |
"debit": "142.21", | |
"credit": "0.00", | |
"closing_balance": "38173.19", | |
"category": "Groceries" | |
}} | |
] | |
}} | |
RULES: | |
1. Output ONLY the JSON object with no additional text | |
2. Keep amounts as strings with 2 decimal places | |
3. For missing values, use empty strings | |
4. Convert negative amounts to format "-123.45" | |
5. Map categories to: Salary, Groceries, Medical, Utilities, Entertainment, Dining, Misc | |
</s> | |
<|assistant|> | |
""" | |
try: | |
# Call LLM via Hugging Face Inference API | |
response = client.text_generation( | |
prompt, | |
max_new_tokens=2000, | |
temperature=0.01, | |
stop_sequences=["</s>"] | |
) | |
print(f"LLM Response: {response}") | |
# Validate and clean JSON response | |
response = response.strip() | |
if not response.startswith('{'): | |
# Find the first { and last } to extract JSON | |
start_idx = response.find('{') | |
end_idx = response.rfind('}') | |
if start_idx != -1 and end_idx != -1: | |
response = response[start_idx:end_idx+1] | |
# Parse JSON and validate structure | |
data = json.loads(response) | |
if "transactions" not in data: | |
raise ValueError("Missing 'transactions' key in JSON") | |
return data | |
except Exception as e: | |
print(f"LLM Error: {str(e)}") | |
# Fallback to rule-based parser | |
return rule_based_parser(cleaned_text) | |
def rule_based_parser(text): | |
"""Enhanced fallback parser for structured tables""" | |
lines = [line.strip() for line in text.split('\n') if line.strip()] | |
# Find header line - more flexible detection | |
header_index = None | |
header_patterns = [ | |
r'Date\b', r'Description\b', r'Amount\b', | |
r'Debit\b', r'Credit\b', r'Closing\s*Balance\b', r'Category\b' | |
] | |
for i, line in enumerate(lines): | |
if all(re.search(pattern, line, re.IGNORECASE) for pattern in header_patterns): | |
header_index = i | |
break | |
if header_index is None: | |
# Try pipe-delimited format as fallback | |
for i, line in enumerate(lines): | |
if '|' in line and any(p in line for p in ['Date', 'Amount', 'Balance']): | |
header_index = i | |
break | |
if header_index is None or header_index + 1 >= len(lines): | |
return {"transactions": []} | |
data_lines = lines[header_index + 1:] | |
transactions = [] | |
for line in data_lines: | |
# Handle both pipe-delimited and space-aligned formats | |
if '|' in line: | |
parts = [p.strip() for p in line.split('|') if p.strip()] | |
else: | |
# Space-aligned format - split by 2+ spaces | |
parts = re.split(r'\s{2,}', line) | |
if len(parts) < 7: | |
continue | |
try: | |
transactions.append({ | |
"date": parts[0], | |
"description": parts[1], | |
"amount": format_number(parts[2]), | |
"debit": format_number(parts[3]), | |
"credit": format_number(parts[4]), | |
"closing_balance": format_number(parts[5]), | |
"category": parts[6] | |
}) | |
except Exception as e: | |
print(f"Error parsing line: {str(e)}") | |
return {"transactions": transactions} | |
def format_number(value): | |
"""Format numeric values consistently""" | |
if not value: | |
return "0.00" | |
# Clean numeric values | |
value = value.replace(',', '').replace('$', '').strip() | |
# Handle negative numbers in parentheses | |
if '(' in value and ')' in value: | |
value = '-' + value.replace('(', '').replace(')', '') | |
# Standardize decimal format | |
if '.' not in value: | |
value += '.00' | |
# Ensure two decimal places | |
parts = value.split('.') | |
if len(parts) == 2: | |
integer = parts[0].lstrip('0') or '0' | |
decimal = parts[1][:2].ljust(2, '0') | |
value = f"{integer}.{decimal}" | |
# Handle negative signs | |
if value.startswith('-'): | |
return f"-{value[1:].lstrip('0')}" if value[1:] != '0.00' else '0.00' | |
return value | |
def process_file(file, is_scanned): | |
"""Main processing function""" | |
if not file: | |
return pd.DataFrame(columns=[ | |
"Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category" | |
]) | |
file_path = file.name | |
file_ext = os.path.splitext(file_path)[1].lower() | |
try: | |
if file_ext == '.xlsx': | |
text = extract_excel_data(file_path) | |
elif file_ext == '.pdf': | |
text = extract_text_from_pdf(file_path, is_scanned=is_scanned) | |
else: | |
return pd.DataFrame(columns=[ | |
"Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category" | |
]) | |
parsed_data = parse_bank_statement(text) | |
df = pd.DataFrame(parsed_data["transactions"]) | |
# Ensure all required columns exist | |
required_cols = ["date", "description", "amount", "debit", | |
"credit", "closing_balance", "category"] | |
for col in required_cols: | |
if col not in df.columns: | |
df[col] = "" | |
# Format columns properly | |
df.columns = ["Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category"] | |
return df | |
except Exception as e: | |
print(f"Processing error: {str(e)}") | |
# Return empty DataFrame with correct columns on error | |
return pd.DataFrame(columns=[ | |
"Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category" | |
]) | |
# Gradio Interface | |
interface = gr.Interface( | |
fn=process_file, | |
inputs=[ | |
gr.File(label="Upload Bank Statement (PDF/Excel)"), | |
gr.Checkbox(label="Is Scanned PDF? (Use OCR)") | |
], | |
outputs=gr.Dataframe( | |
label="Parsed Transactions", | |
headers=["Date", "Description", "Amount", "Debit", "Credit", "Closing Balance", "Category"], | |
datatype=["date", "str", "number", "number", "number", "number", "str"] | |
), | |
title="AI Bank Statement Parser", | |
description="Extract structured transaction data from PDF/Excel bank statements", | |
allow_flagging="never" | |
) | |
if __name__ == "__main__": | |
interface.launch() |