Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import time | |
| import google.generativeai as genai | |
| from pydantic import ValidationError | |
| import mimetypes | |
| import os | |
| import settings_ai | |
| from settings_ai import Documento, Articolo | |
| import pandas as pd | |
| from PyPDF2 import PdfReader, PdfWriter | |
| import json | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.ai.documentintelligence import DocumentIntelligenceClient | |
| from azure.ai.documentintelligence.models import AnalyzeDocumentRequest | |
| from streamlit_pdf_viewer import pdf_viewer | |
| import io | |
| from PyPDF2 import PdfReader, PdfWriter | |
| import fitz | |
| import re | |
| import io | |
| from collections import Counter | |
| GENERATION_CONFIG = settings_ai.GENERATION_CONFIG | |
| SYSTEM_INSTRUCTION = settings_ai.SYSTEM_INSTRUCTION | |
| USER_MESSAGE = settings_ai.USER_MESSAGE | |
| API_KEY_GEMINI = settings_ai.API_KEY_GEMINI | |
| # Configura il modello Gemini | |
| genai.configure(api_key=API_KEY_GEMINI) | |
| model = genai.GenerativeModel( | |
| model_name="gemini-2.0-flash", | |
| generation_config=GENERATION_CONFIG, | |
| system_instruction=SYSTEM_INSTRUCTION | |
| ) | |
| # Upload File a GEMINI | |
| def upload_to_gemini(path: str, mime_type: str = None): | |
| """Carica un file su Gemini e ne ritorna l'oggetto file.""" | |
| file = genai.upload_file(path, mime_type=mime_type) | |
| print(f"Uploaded file '{file.display_name}' as: {file.uri}") | |
| return file | |
| # Attesa Upload Files | |
| def wait_for_files_active(files): | |
| """Attende che i file siano nello stato ACTIVE su Gemini.""" | |
| print("Waiting for file processing...") | |
| for name in (f.name for f in files): | |
| file_status = genai.get_file(name) | |
| while file_status.state.name == "PROCESSING": | |
| print(".", end="", flush=True) | |
| time.sleep(10) | |
| file_status = genai.get_file(name) | |
| if file_status.state.name != "ACTIVE": | |
| raise Exception(f"File {file_status.name} failed to process") | |
| print("\n...all files ready") | |
| # Chiamata API Gemini | |
| def send_message_to_gemini(chat_session, message, max_attempts=3): | |
| """Tenta di inviare il messaggio tramite la chat_session, riprovando fino a max_attempts in caso di eccezioni, con un delay di 10 secondi tra i tentativi. """ | |
| for attempt in range(max_attempts): | |
| try: | |
| print(f"Generazione AI con PROMPT: {message}") | |
| response_local = chat_session.send_message(message) | |
| return response_local | |
| except Exception as e: | |
| print(f"Errore in send_message (tentativo {attempt+1}/{max_attempts}): {e}") | |
| if attempt < max_attempts - 1: | |
| print("Riprovo tra 10 secondi...") | |
| time.sleep(10) | |
| raise RuntimeError(f"Invio messaggio fallito dopo {max_attempts} tentativi.") | |
| # Unisce i rettangoli evidenziati (se codice e descrizione stanno su linee diverse viene mostrato un solo rettangolo evidenziato) | |
| def merge_intervals(intervals): | |
| """Unisce gli intervalli sovrapposti. Gli intervalli sono tuple (y0, y1).""" | |
| if not intervals: | |
| return [] | |
| intervals.sort(key=lambda x: x[0]) | |
| merged = [intervals[0]] | |
| for current in intervals[1:]: | |
| last = merged[-1] | |
| if current[0] <= last[1]: | |
| merged[-1] = (last[0], max(last[1], current[1])) | |
| else: | |
| merged.append(current) | |
| return merged | |
| # Evidenzia le corrispondenze | |
| def highlight_text_in_pdf(input_pdf_bytes, text_list): | |
| """Crea rettangoli rossi che evidenziano i testi trovati, unendo gli intervalli sovrapposti in un unico rettangolo. """ | |
| pdf_document = fitz.open(stream=input_pdf_bytes, filetype="pdf") | |
| patterns = [] | |
| for text in text_list: | |
| text_pattern = re.escape(text).replace(r'\ ', r'\s*') | |
| patterns.append(text_pattern) | |
| pattern = r'\b(' + '|'.join(patterns) + r')\b' | |
| regex = re.compile(pattern, re.IGNORECASE) | |
| highlight_color = (1, 0, 0) # rosso | |
| for page in pdf_document: | |
| page_text = page.get_text() | |
| matches = list(regex.finditer(page_text)) | |
| intervals = [] | |
| for match in matches: | |
| match_text = match.group(0) | |
| text_instances = page.search_for(match_text) | |
| for inst in text_instances: | |
| text_height = inst.y1 - inst.y0 | |
| new_y0 = inst.y0 - 0.1 * text_height | |
| new_y1 = inst.y1 + 0.1 * text_height | |
| intervals.append((new_y0, new_y1)) | |
| merged_intervals = merge_intervals(intervals) | |
| for y0, y1 in merged_intervals: | |
| full_width_rect = fitz.Rect(page.rect.x0, y0, page.rect.x1, y1) | |
| rect_annot = page.add_rect_annot(full_width_rect) | |
| rect_annot.set_colors(stroke=highlight_color, fill=highlight_color) | |
| rect_annot.set_opacity(0.15) | |
| rect_annot.update() | |
| output_stream = io.BytesIO() | |
| pdf_document.save(output_stream) | |
| pdf_document.close() | |
| output_stream.seek(0) | |
| return output_stream | |
| # Formattazione Euro | |
| def format_euro(amount): | |
| formatted = f"{amount:,.2f}" | |
| formatted = formatted.replace(",", "X").replace(".", ",").replace("X", ".") | |
| return f"€ {formatted}" | |
| # Testo da PDF | |
| def pdf_to_text(path_file: str) -> str: | |
| """ Estrae e concatena il testo da tutte le pagine del PDF. """ | |
| reader = PdfReader(path_file) | |
| full_text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() or "" | |
| full_text += page_text + "\n" | |
| return full_text | |
| # Funzione che verifica se gli articoli sono corretti (facendo un partsing PDF to TEXT) | |
| def verify_articles(file_path: str, chunk_document): | |
| ''' La funzione trasforma il PDF in TESTO e cerca se ogni articolo è presente (al netto degli spazi) ''' | |
| if not file_path.lower().endswith(".pdf"): | |
| for articolo in chunk_document.Articoli: | |
| articolo.Verificato = 2 | |
| return None | |
| pdf_text = pdf_to_text(file_path) | |
| if '□' in pdf_text: | |
| for articolo in chunk_document.Articoli: | |
| articolo.Verificato = 2 | |
| return None | |
| for articolo in chunk_document.Articoli: | |
| articolo.Verificato = 1 if articolo.CodiceArticolo and (articolo.CodiceArticolo in pdf_text) else 0 | |
| if articolo.Verificato == 0: | |
| articolo.Verificato = 1 if articolo.CodiceArticolo and (articolo.CodiceArticolo.replace(" ", "") in pdf_text.replace(" ", "")) else 0 | |
| if not any(articolo.Verificato == 0 for articolo in chunk_document.Articoli): | |
| return None | |
| unverified_articles = [articolo for articolo in chunk_document.Articoli if articolo.Verificato == 0] | |
| json_unverified_articles = json.dumps([articolo.model_dump() for articolo in unverified_articles]) | |
| return json_unverified_articles | |
| # Funzione ausiliaria che elabora un file (o un chunk) inviandolo tramite send_message_to_gemini | |
| def process_document_splitted(file_path: str, chunk_label: str, use_azure: bool = False) -> Documento: | |
| """ Elabora il file (o il chunk) inviandolo a Gemini e processando la risposta: | |
| - Determina il mime type. | |
| - Effettua l'upload tramite upload_to_gemini e attende che il file sia attivo. | |
| - Avvia una chat con il file caricato e invia il messaggio utente. | |
| - Tenta fino a 3 volte di validare il JSON ottenuto, filtrando gli Articoli. | |
| - Se presenti errori, RIPROCESSA il documento 3 volta passando il risultato precedente, In questo modo riesce a gestire gli errori in modo più preciso! | |
| Ritorna l'istanza di Documento validata. """ | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| if mime_type is None: | |
| mime_type = "application/octet-stream" | |
| if not use_azure: | |
| files = [upload_to_gemini(file_path, mime_type=mime_type)] | |
| wait_for_files_active(files) | |
| chat_history = [{ "role": "user","parts": [files[0]]}] | |
| chat_session = model.start_chat(history=chat_history) | |
| max_validation_attempts = 3 | |
| max_number_reprocess = 3 | |
| chunk_document = None | |
| for i in range(max_number_reprocess): | |
| print(f"Reprocessamento {i+1} di {max_number_reprocess} per il chunk {chunk_label}") | |
| response = None | |
| for attempt in range(max_validation_attempts): | |
| message = USER_MESSAGE | |
| if i > 0: | |
| message += f". Attenzione, RIPROVA perché i seguenti articoli sono da ESCLUDERE in quanto ERRATI! {json_unverified_articles}" | |
| if not use_azure: | |
| response = send_message_to_gemini(chat_session, message) | |
| else: | |
| chunk_document = analyze_invoice_azure(file_path) | |
| try: | |
| if not use_azure: | |
| chunk_document = Documento.model_validate_json(response.text) | |
| chunk_document.Articoli = [ | |
| art for art in chunk_document.Articoli | |
| if art.CodiceArticolo.startswith(("AVE", "AV", "3V", "44")) | |
| and art.TotaleNonIvato != 0 | |
| and art.CodiceArticolo not in ("AVE", "AV", "3V", "44") | |
| ] | |
| break | |
| except ValidationError as ve: | |
| print(f"Errore di validazione {chunk_label} (tentativo {attempt+1}/{max_validation_attempts}): {ve}") | |
| if attempt < max_validation_attempts - 1: | |
| print("Riprovo tra 5 secondi...") | |
| time.sleep(5) | |
| else: | |
| raise RuntimeError(f"Superato il numero massimo di tentativi di validazione {chunk_label}.") | |
| json_unverified_articles = verify_articles(file_path, chunk_document) | |
| if not json_unverified_articles: | |
| return chunk_document | |
| return chunk_document | |
| # Funzione principale che elabora il documento | |
| def process_document(path_file: str, number_pages_split: int, use_azure: bool = False) -> Documento: | |
| """ Elabora il documento in base al tipo di file: | |
| 1. Se il file non è un PDF, lo tratta "così com'è" (ad esempio, come immagine) e ne processa il contenuto tramite process_document_splitted. | |
| 2. Se il file è un PDF e contiene più di 5 pagine, lo divide in chunk da 5 pagine. | |
| Per ogni chunk viene effettuato l’upload, viene elaborato il JSON e validato. | |
| I chunk successivi vengono aggregati nel Documento finale e, se il PDF ha più | |
| di 5 pagine, al termine il campo TotaleMerce viene aggiornato con il valore riportato dall’ultimo chunk. """ | |
| mime_type, _ = mimetypes.guess_type(path_file) | |
| if mime_type is None: | |
| mime_type = "application/octet-stream" | |
| if use_azure: | |
| number_pages_split = 2 | |
| if not path_file.lower().endswith(".pdf"): | |
| print("File non PDF: elaborazione come immagine.") | |
| documento_finale = process_document_splitted(path_file, chunk_label="(immagine)", use_azure=use_azure) | |
| return documento_finale | |
| reader = PdfReader(path_file) | |
| total_pages = len(reader.pages) | |
| documento_finale = None | |
| ultimo_totale_merce = None | |
| for chunk_index in range(0, total_pages, number_pages_split): | |
| writer = PdfWriter() | |
| for page in reader.pages[chunk_index:chunk_index + number_pages_split]: | |
| writer.add_page(page) | |
| temp_filename = "temp_chunk_" | |
| if use_azure: | |
| temp_filename+="azure_" | |
| temp_filename += f"{chunk_index // number_pages_split}.pdf" | |
| with open(temp_filename, "wb") as temp_file: | |
| writer.write(temp_file) | |
| chunk_label = f"(chunk {chunk_index // number_pages_split})" | |
| chunk_document = process_document_splitted(temp_filename, chunk_label=chunk_label, use_azure=use_azure) | |
| if hasattr(chunk_document, "TotaleImponibile"): | |
| ultimo_totale_merce = chunk_document.TotaleImponibile | |
| if documento_finale is None: | |
| documento_finale = chunk_document | |
| else: | |
| documento_finale.Articoli.extend(chunk_document.Articoli) | |
| os.remove(temp_filename) | |
| if total_pages > number_pages_split and ultimo_totale_merce is not None: | |
| documento_finale.TotaleImponibile = ultimo_totale_merce | |
| if documento_finale is None: | |
| raise RuntimeError("Nessun documento elaborato.") | |
| # Controlli aggiuntivi: Se esiste un AVE non possono esistere altri articoli non ave. Se articoli DOPPI segnalo! | |
| if any(articolo.CodiceArticolo.startswith("AVE") for articolo in documento_finale.Articoli): | |
| documento_finale.Articoli = [articolo for articolo in documento_finale.Articoli if articolo.CodiceArticolo.startswith("AVE")] | |
| combinazioni = [(articolo.CodiceArticolo, articolo.TotaleNonIvato) for articolo in documento_finale.Articoli] | |
| conta_combinazioni = Counter(combinazioni) | |
| for articolo in documento_finale.Articoli: | |
| if conta_combinazioni[(articolo.CodiceArticolo, articolo.TotaleNonIvato)] > 1: | |
| articolo.Verificato = False | |
| return documento_finale | |
| # Analizza Fattura con AZURE | |
| def analyze_invoice_azure(file_path: str): | |
| """Invia il file (dal percorso specificato) al servizio prebuilt-invoice e restituisce il risultato dell'analisi.""" | |
| # Apri il file in modalità binaria e leggi il contenuto | |
| with open(file_path, "rb") as file: | |
| file_data = file.read() | |
| client = DocumentIntelligenceClient(endpoint=settings_ai.ENDPOINT_AZURE, credential=AzureKeyCredential(settings_ai.API_AZURE)) | |
| poller = client.begin_analyze_document("prebuilt-invoice", body=file_data) | |
| result = poller.result() | |
| return parse_invoice_to_documento_azure(result) | |
| # Parsing Fattura con AZURE | |
| def parse_invoice_to_documento_azure(result) -> Documento: | |
| """ Parssa il risultato dell'analisi e mappa i campi rilevanti nel modello Pydantic Documento. """ | |
| if not result.documents: | |
| raise ValueError("Nessun documento analizzato trovato.") | |
| invoice = result.documents[0] | |
| invoice_id_field = invoice.fields.get("InvoiceId") | |
| numero_documento = invoice_id_field.value_string if invoice_id_field and invoice_id_field.value_string else "" | |
| invoice_date_field = invoice.fields.get("InvoiceDate") | |
| data_str = invoice_date_field.value_date.isoformat() if invoice_date_field and invoice_date_field.value_date else "" | |
| subtotal_field = invoice.fields.get("SubTotal") | |
| if subtotal_field and subtotal_field.value_currency: | |
| totale_imponibile = subtotal_field.value_currency.amount | |
| else: | |
| invoice_total_field = invoice.fields.get("InvoiceTotal") | |
| totale_imponibile = invoice_total_field.value_currency.amount if invoice_total_field and invoice_total_field.value_currency else 0.0 | |
| articoli = [] | |
| items_field = invoice.fields.get("Items") | |
| if items_field and items_field.value_array: | |
| for item in items_field.value_array: | |
| product_code_field = item.value_object.get("ProductCode") | |
| codice_articolo = product_code_field.value_string if product_code_field and product_code_field.value_string else "" | |
| amount_field = item.value_object.get("Amount") | |
| totale_non_ivato = amount_field.value_currency.amount if amount_field and amount_field.value_currency else 0.0 | |
| articolo = Articolo( | |
| CodiceArticolo=codice_articolo, | |
| TotaleNonIvato=totale_non_ivato, | |
| Verificato=None | |
| ) | |
| articoli.append(articolo) | |
| documento = Documento( | |
| TipoDocumento="Fattura", | |
| NumeroDocumento=numero_documento, | |
| Data=data_str, | |
| TotaleImponibile=totale_imponibile, | |
| Articoli=articoli | |
| ) | |
| return documento | |
| # Front-End con Streamlit | |
| def main(): | |
| st.set_page_config(page_title="Import Fatture AI", page_icon="✨") | |
| st.title("Import Fatture AI ✨") | |
| st.sidebar.title("Caricamento File") | |
| uploaded_files = st.sidebar.file_uploader("Seleziona uno o più PDF", type=["pdf", "jpg", "jpeg", "png"], accept_multiple_files=True) | |
| model_ai = st.sidebar.selectbox("Modello", ['Gemini Flash 2.0', 'Azure Intelligence']) | |
| use_azure = True if model_ai == 'Azure Intelligence' else False | |
| number_pages_split = st.sidebar.slider('Split Pagine', 1, 30, 2) | |
| if st.sidebar.button("Importa", type="primary", use_container_width=True): | |
| if not uploaded_files: | |
| st.warning("Nessun file caricato!") | |
| else: | |
| for uploaded_file in uploaded_files: | |
| st.subheader(f"📄 {uploaded_file.name}") | |
| file_path = uploaded_file.name | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| with st.spinner(f"Elaborazione in corso"): | |
| try: | |
| doc = process_document(uploaded_file.name, number_pages_split, use_azure=use_azure) | |
| totale_non_ivato_verificato = sum(articolo.TotaleNonIvato for articolo in doc.Articoli if articolo.Verificato == 1) | |
| totale_non_ivato_non_verificato = sum(articolo.TotaleNonIvato for articolo in doc.Articoli if articolo.Verificato != 1) | |
| totale_non_ivato = totale_non_ivato_verificato + totale_non_ivato_non_verificato | |
| st.write( | |
| f"- **Tipo**: {doc.TipoDocumento}\n" | |
| f"- **Numero**: {doc.NumeroDocumento}\n" | |
| f"- **Data**: {doc.Data}\n" | |
| f"- **Articoli Compatibili**: {len(doc.Articoli)}\n" | |
| f"- **Totale Documento**: {format_euro(doc.TotaleImponibile)}\n" | |
| ) | |
| if totale_non_ivato_non_verificato > 0: | |
| st.error(f"Totale Ave Non Verificato: {format_euro(totale_non_ivato_verificato)}") | |
| elif totale_non_ivato != 0: | |
| st.success(f"Totale Ave Verificato: {format_euro(totale_non_ivato_verificato)}") | |
| df = pd.DataFrame([{k: v for k, v in Articolo.model_dump().items() if k != ""} for Articolo in doc.Articoli]) | |
| if 'Verificato' in df.columns: | |
| df['Verificato'] = df['Verificato'].apply(lambda x: "✅" if x == 1 else "❌" if x == 0 else "❓" if x == 2 else x) | |
| if totale_non_ivato > 0: | |
| st.dataframe(df, use_container_width=True ,column_config={"TotaleNonIvato": st.column_config.NumberColumn("Totale non Ivato",format="€ %.2f")}) | |
| st.json(doc.model_dump(), expanded=False) | |
| if totale_non_ivato == 0: | |
| st.info(f"Non sono presenti articoli 'AVE'") | |
| if uploaded_file and file_path.lower().endswith(".pdf"): | |
| list_art = list_art = [articolo.CodiceArticolo for articolo in doc.Articoli] + [articolo.DescrizioneArticolo for articolo in doc.Articoli] | |
| if list_art: | |
| new_pdf = highlight_text_in_pdf(uploaded_file.getvalue(), list_art) | |
| pdf_viewer(input=new_pdf.getvalue(), width=1200) | |
| else: | |
| pdf_viewer(input=uploaded_file.getvalue(), width=1200) | |
| else: | |
| st.image(file_path) | |
| st.divider() | |
| except Exception as e: | |
| st.error(f"Errore durante l'elaborazione di {uploaded_file.name}: {e}") | |
| finally: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| if __name__ == "__main__": | |
| main() |