|
|
import time |
|
|
import re |
|
|
import gradio as gr |
|
|
from typing import Dict, Any, List, Tuple |
|
|
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, AutoModelForCausalLM, pipeline |
|
|
|
|
|
|
|
|
MODEL_DEBERTA = "osiria/deberta-italian-question-answering" |
|
|
MODEL_GEPPETTO = "LorenzoDeMattei/GePpeTto" |
|
|
|
|
|
|
|
|
DOMANDE_IT: List[Tuple[str, str]] = [ |
|
|
("tipologia_fattura", "Qual è la tipologia della fattura? / Qual è la natura o il tipo del documento?"), |
|
|
("numero_fattura", "Qual è il numero della fattura? / Qual è il codice identificativo della fattura?"), |
|
|
("data_fattura", "Qual è la data della fattura? / Quando è stata emessa la fattura?"), |
|
|
("data_scadenza", "Qual è la data di scadenza del pagamento? / Entro quale termine deve essere saldata la fattura?"), |
|
|
("emittente", "Chi è l'emittente? / Qual è la ragione sociale dell'emittente?"), |
|
|
("indirizzo_emittente", "Qual è l'indirizzo dell'emittente? / Dove si trova la sede dell'emittente?"), |
|
|
("piva_emittente", "Qual è il codice fiscale o P.IVA dell'emittente? / Qual è la partita IVA dell'azienda?"), |
|
|
("email_emittente", "Qual è l'email dell'emittente? / Qual è l'indirizzo di posta elettronica dell'emittente?"), |
|
|
("telefono_emittente", "Qual è il numero di telefono dell'emittente? / Qual è il recapito telefonico dell'emittente?"), |
|
|
("beneficiario", "Chi è il beneficiario? / Qual è l'intestatario della fattura?"), |
|
|
("indirizzo_beneficiario", "Qual è l'indirizzo del beneficiario? / Dove risiede il beneficiario?"), |
|
|
("cf_beneficiario", "Qual è il codice fiscale del beneficiario? / Qual è l'identificativo fiscale del cliente?"), |
|
|
("email_beneficiario", "Qual è l'email del beneficiario? / Qual è l'indirizzo di posta elettronica del cliente?"), |
|
|
("descrizione_servizio", "Qual è la descrizione del prodotto/servizio? / Come viene denominato l’articolo o la prestazione?"), |
|
|
("quantita_servizio", "Qual è la quantità del prodotto/servizio? / Quante unità o ore sono state fatturate?"), |
|
|
("prezzo_unitario", "Qual è il prezzo unitario del prodotto/servizio? / Qual è il costo per singola unità o ora?"), |
|
|
("totale_servizio", "Qual è il totale del prodotto/servizio? / Qual è l’importo complessivo relativo all’articolo o servizio?"), |
|
|
("importo_imponibile", "Qual è l'importo imponibile? / Qual è il subtotale prima dell'IVA?"), |
|
|
("aliquota_iva", "Qual è l'aliquota IVA applicata? / Qual è la percentuale di IVA?"), |
|
|
("importo_iva", "Qual è l'importo IVA? / Qual è il valore dell'imposta applicata?"), |
|
|
("importo_totale", "Qual è l'importo totale del documento? / Qual è la somma complessiva da pagare?"), |
|
|
("condizioni_pagamento", "Quali sono le condizioni di pagamento? / Quali sono i termini di pagamento previsti?"), |
|
|
("modalita_pagamento", "Qual è la modalità di pagamento prevista? / Quale metodo di pagamento è indicato?"), |
|
|
("iban", "Qual è l'IBAN indicato? / Qual è il numero di conto per il bonifico?"), |
|
|
("bic_swift", "Qual è il BIC/SWIFT indicato? / Qual è il codice bancario internazionale?"), |
|
|
("intestatario_conto", "A chi è intestato il conto? / Chi è il titolare del conto bancario?"), |
|
|
("note", "Quali sono le note aggiuntive riportate nella fattura? / Quali osservazioni sono incluse nel documento?"), |
|
|
("firma_digitale", "Qual è la firma digitale? / Chi ha firmato digitalmente la fattura?"), |
|
|
("cig", "Qual è il CIG di riferimento? / Qual è il codice identificativo di gara?"), |
|
|
("stato_liquidazione", "Qual è lo stato di liquidazione? / Qual è la situazione del pagamento?"), |
|
|
("codice_destinatario", "Qual è il codice destinatario? / Qual è l'identificativo del destinatario?"), |
|
|
("causale", "Qual è la causale della fattura? / Qual è la motivazione o descrizione del pagamento?") |
|
|
] |
|
|
|
|
|
|
|
|
tok_deb = AutoTokenizer.from_pretrained(MODEL_DEBERTA) |
|
|
mdl_deb = AutoModelForQuestionAnswering.from_pretrained(MODEL_DEBERTA) |
|
|
qa_deb = pipeline("question-answering", model=mdl_deb, tokenizer=tok_deb, device=-1) |
|
|
|
|
|
tok_gepp = AutoTokenizer.from_pretrained(MODEL_GEPPETTO) |
|
|
mdl_gepp = AutoModelForCausalLM.from_pretrained(MODEL_GEPPETTO) |
|
|
qa_gepp = pipeline("text-generation", model=mdl_gepp, tokenizer=tok_gepp, device=-1) |
|
|
|
|
|
|
|
|
def preprocess_markdown(text: str) -> str: |
|
|
if not text: return "" |
|
|
text = re.sub(r'\|[\s-]+\|', ' ', text) |
|
|
text = text.replace('|', ' ') |
|
|
text = text.replace('**', '').replace('##', '') |
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
return text |
|
|
|
|
|
|
|
|
def analyze_invoice(md_text: str, custom_question_it: str): |
|
|
logs: List[str] = [] |
|
|
final_output: Dict[str, Any] = {} |
|
|
|
|
|
if len(md_text.strip()) < 10: |
|
|
return {"Error": "Testo troppo breve"}, "⚠️ Inserisci almeno 10 caratteri." |
|
|
|
|
|
clean_text = preprocess_markdown(md_text) |
|
|
|
|
|
|
|
|
t_start_deb = time.time() |
|
|
deb_res: Dict[str, Any] = {} |
|
|
success_count = 0 |
|
|
|
|
|
for key, question_text in DOMANDE_IT: |
|
|
try: |
|
|
res = qa_deb(question=question_text, context=clean_text) |
|
|
answer = res["answer"].strip() |
|
|
score = round(res.get("score", 0.0), 3) |
|
|
status = "Successo" if score > 0.05 and answer else "Non Trovato" |
|
|
if status == "Successo": success_count += 1 |
|
|
deb_res[key] = { |
|
|
"domanda": question_text, |
|
|
"risposta": answer, |
|
|
"confidenza": score, |
|
|
"status": status |
|
|
} |
|
|
except Exception as e: |
|
|
deb_res[key] = {"status": f"Errore inferenza: {str(e)}"} |
|
|
|
|
|
custom_q = custom_question_it.strip() |
|
|
if custom_q: |
|
|
try: |
|
|
res = qa_deb(question=custom_q, context=clean_text) |
|
|
answer = res["answer"].strip() |
|
|
score = round(res.get("score", 0.0), 3) |
|
|
status = "Successo" if score > 0.05 and answer else "Non Trovato" |
|
|
if status == "Successo": success_count += 1 |
|
|
deb_res["domanda_opzionale"] = { |
|
|
"domanda": custom_q, |
|
|
"risposta": answer, |
|
|
"confidenza": score, |
|
|
"status": status |
|
|
} |
|
|
except Exception as e: |
|
|
deb_res["domanda_opzionale"] = {"status": f"Errore inferenza: {str(e)}"} |
|
|
|
|
|
t_elapsed_deb = round(time.time() - t_start_deb, 2) |
|
|
final_output["DeBERTa (estrattivo)"] = deb_res |
|
|
logs.append(f"✅ DeBERTa completato in {t_elapsed_deb}s | Successi: {success_count}/{len(DOMANDE_IT) + (1 if custom_q else 0)}") |
|
|
|
|
|
|
|
|
if custom_q: |
|
|
try: |
|
|
t_start_gepp = time.time() |
|
|
short_context = clean_text[:800] |
|
|
prompt = f"Domanda: {custom_q}\nContesto: {short_context}\nRisposta:" |
|
|
res_gepp = qa_gepp(prompt, max_new_tokens=64, do_sample=False) |
|
|
generative_text = res_gepp[0]["generated_text"].replace(prompt, "").strip() |
|
|
final_output["GePpeTto (generativo)"] = {"risposta_opzionale": generative_text} |
|
|
t_elapsed_gepp = round(time.time() - t_start_gepp, 2) |
|
|
logs.append(f"✅ GePpeTto completato in {t_elapsed_gepp}s") |
|
|
except Exception as e: |
|
|
final_output["GePpeTto (generativo)"] = {"errore": str(e)} |
|
|
logs.append(f"❌ Errore GePpeTto: {e}") |
|
|
else: |
|
|
final_output["GePpeTto (generativo)"] = {"info": "Nessuna domanda opzionale fornita"} |
|
|
|
|
|
return final_output, "\n".join(logs) |
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Base()) as demo: |
|
|
gr.Markdown("# 🧾 Invoice QA: Domande standard + opzionale") |
|
|
gr.Markdown("Risposte estrattive (DeBERTa) su tutte le domande e generative (GePpeTto) solo sulla domanda opzionale.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
md_input = gr.Textbox( |
|
|
label="Testo Fattura (.MD) / Contesto", |
|
|
lines=18, |
|
|
placeholder="Incolla qui il contenuto Markdown/Testo della fattura..." |
|
|
) |
|
|
|
|
|
custom_q_input = gr.Textbox( |
|
|
label="Domanda opzionale (in Italiano)", |
|
|
placeholder="Es: Qual è il riferimento d'ordine?" |
|
|
) |
|
|
|
|
|
btn = gr.Button("🔍 Analizza documento", variant="primary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
out_json = gr.JSON(label="Risultati (DeBERTa estrattivo + GePpeTto opzionale)") |
|
|
with gr.Accordion("📝 Log di Sistema (Tempi e Debug)", open=False): |
|
|
out_log = gr.Textbox(label="Process Log", lines=12) |
|
|
|
|
|
btn.click(fn=analyze_invoice, inputs=[md_input, custom_q_input], outputs=[out_json, out_log]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |