from typing import Any, Dict, List import gradio as gr import json import re from difflib import SequenceMatcher from scripts.main import main, feedback import os from utils import count_tokens current_dir = os.path.dirname(os.path.abspath(__file__)) output_path = os.path.join(current_dir, "output.json") converted_path = os.path.join(current_dir, "converted") def extract_sequence_from_id(chunk_id: str) -> int: """Trích xuất sequence number từ chunk ID""" # Format: doc_id::CH7::A18::K4::P0::C63 match = re.search(r'::C(\d+)$', chunk_id) if match: return int(match.group(1)) return 0 def load_document_chunks(doc_id: str) -> List[Dict]: """Load tất cả chunks của một document và sắp xếp theo thứ tự""" import os current_dir = os.path.dirname(os.path.abspath(__file__)) chunks_path = os.path.join(current_dir, "chunks") manifest_path = os.path.join(chunks_path, "chunks_manifest.json") if not os.path.exists(manifest_path): return [] with open(manifest_path, "r", encoding="utf-8") as f: manifest = json.load(f) # Lọc chunks của document này doc_chunks = [] for chunk_info in manifest["chunks"]: if chunk_info["id"].startswith(doc_id): chunk_file_path = chunk_info["path"] if os.path.exists(chunk_file_path): with open(chunk_file_path, "r", encoding="utf-8") as f: chunk_data = json.load(f) doc_chunks.append(chunk_data) # Sắp xếp theo sequence number doc_chunks.sort(key=lambda x: extract_sequence_from_id(x["id"])) return doc_chunks def reconstruct_document(chunks: List[Dict]) -> str: """Tái tạo lại document từ các chunks""" if not chunks: return "" document_parts = [] current_path = [] for chunk in chunks: content_type = chunk.get("content_type", "text") chunk_text = chunk.get("chunk_text", "") path = chunk.get("path", []) # Thêm headers từ path nếu có thay đổi if path != current_path: # Tìm phần tử mới trong path for i, path_item in enumerate(path): if i >= len(current_path) or path_item != current_path[i]: # Thêm header mới if path_item and path_item not in ["ROOT", "TABLE"]: # Xác định level dựa trên vị trí trong path level = i + 1 # Đặc biệt xử lý cho "Điểm" - sử dụng level 4 (####) if "Điểm" in path_item: header_marker = "####" else: header_marker = "#" * min(level, 6) # Tối đa 6 dấu # document_parts.append(f"\n{header_marker} {path_item}\n") break current_path = path if content_type == "table": # Thêm table với định dạng markdown document_parts.append(f"\n{chunk_text}\n") else: # Thêm text thông thường, bao gồm cả markdown headings if chunk_text.strip(): document_parts.append(chunk_text) return "\n".join(document_parts) def find_text_positions_in_reconstructed_doc(text_to_find: str, reconstructed_doc: str) -> List[tuple]: """Tìm tất cả vị trí của text trong document đã tái tạo""" positions = [] start = 0 while True: pos = reconstructed_doc.find(text_to_find, start) if pos == -1: break positions.append((pos, pos + len(text_to_find))) start = pos + 1 return positions def highlight_text_in_reconstructed_doc(texts_to_highlight: List[str], reconstructed_doc: str, chunks: List[Dict] = None) -> str: """Highlight text trong document đã tái tạo""" if not texts_to_highlight: return reconstructed_doc # Tạo bản sao để highlight highlighted_doc = reconstructed_doc # Sắp xếp texts theo độ dài (dài trước) để tránh highlight overlap sorted_texts = sorted(texts_to_highlight, key=len, reverse=True) for i, text in enumerate(sorted_texts): if not text.strip(): continue # Tìm vị trí của text trong document đã tái tạo positions = find_text_positions_in_reconstructed_doc(text, highlighted_doc) # Nếu không tìm thấy trong document đã tái tạo và có chunks, tìm trong chunk_for_embedding if not positions and chunks: for chunk in chunks: chunk_embedding = chunk.get('chunk_for_embedding', '') if text in chunk_embedding: # Thêm text vào document để highlight highlighted_doc += f"\n\n{text}" positions = [(len(highlighted_doc) - len(text), len(highlighted_doc))] break # Highlight từ cuối lên để không ảnh hưởng đến vị trí của các text khác for start, end in reversed(positions): token_count = count_tokens(text) highlighted_text = f'{text} ({token_count} tokens)' highlighted_doc = highlighted_doc[:start] + highlighted_text + highlighted_doc[end:] return highlighted_doc def format_highlighted_doc(highlighted_doc: str) -> str: """Format highlighted doc to be more readable""" # Convert markdown headings to HTML headings import re # Convert # to h1, ## to h2, ### to h3, #### to h4 # Handle both with and without leading spaces formatted_doc = re.sub(r'^\s*# (.+)$', r'

\1

', highlighted_doc, flags=re.MULTILINE) formatted_doc = re.sub(r'^\s*## (.+)$', r'

\1

', formatted_doc, flags=re.MULTILINE) formatted_doc = re.sub(r'^\s*### (.+)$', r'

\1

', formatted_doc, flags=re.MULTILINE) formatted_doc = re.sub(r'^\s*#### (.+)$', r'

\1

', formatted_doc, flags=re.MULTILINE) # Convert newlines to
for HTML display formatted_doc = formatted_doc.replace("\n", "
") return formatted_doc def highlight_doc_with_chunks(doc_id: str, texts: List[str]) -> str: """Highlight document sử dụng chunks thay vì file markdown gốc""" # Load tất cả chunks của document chunks = load_document_chunks(doc_id) if not chunks: return f"⚠️ Không tìm thấy chunks cho document {doc_id}" # Tái tạo lại document reconstructed_doc = reconstruct_document(chunks) if not reconstructed_doc.strip(): return f"⚠️ Document {doc_id} không có nội dung" # Highlight text highlighted_doc = highlight_text_in_reconstructed_doc(texts, reconstructed_doc, chunks) # Thống kê highlighted_count = 0 for text in texts: if text.strip() and text in reconstructed_doc: highlighted_count += 1 total = len([t for t in texts if t.strip()]) success_rate = (highlighted_count / total * 100) if total > 0 else 0.0 summary = f"""

Highlight Summary:

Document ID: {doc_id}

Actually highlighted: {highlighted_count}

Success rate: {success_rate:.1f}%

""" return summary + f"
{format_highlighted_doc(highlighted_doc)}
" def format_user_prompt(user_prompt: str) -> str: """Format user prompt to be more readable""" # Make "Chunk" bold using HTML token_count = count_tokens(user_prompt) formatted_prompt = user_prompt.replace("Chunk", "Chunk") # Convert newlines to
for HTML display formatted_prompt = formatted_prompt.replace("\n", "
") # Add token count at the beginning formatted_prompt = f"

Total tokens: {token_count}


" + formatted_prompt return formatted_prompt # Global variable to store current session_id current_session_id = None def get_feedback(is_like: bool, session_id: str): return feedback(is_like, session_id) def response_generator(query: str, top_k: int = 20, top_n: int = 10): global current_session_id response, session_id = main(query, top_k=top_k, top_n=top_n) current_session_id = session_id # Store session_id globally session_path = f"sessions/{session_id}.json" with open(session_path, "r", encoding="utf-8") as f: session_output = json.load(f) rag_results = session_output[0]["rag_results"] user_prompt = session_output[0]["user_prompt"] doc_ids_set = set([item["doc_id"] for item in rag_results]) chunks_retrieved = [{ "doc_id": doc_id, "texts": [item["text"] for item in rag_results if item["doc_id"] == doc_id] } for doc_id in doc_ids_set] highlighted_texts = [highlight_doc_with_chunks(chunk["doc_id"], chunk["texts"]) for chunk in chunks_retrieved] user_prompt = format_user_prompt(user_prompt) # phân phối ra 5 outputs while len(highlighted_texts) < 15: highlighted_texts.append("") # Unpack the list into individual return values return response, current_session_id, *highlighted_texts, user_prompt def get_like_feedback(): global current_session_id if current_session_id: result = get_feedback(True, current_session_id) print(f"Like feedback: {result}") # Debug print return f"✅ {result}" return "❌ No active session" def get_dislike_feedback(): global current_session_id if current_session_id: result = get_feedback(False, current_session_id) print(f"Dislike feedback: {result}") # Debug print return f"👎 {result}" return "❌ No active session" def clear_feedback(): return "" # Create the interface with multiple outputs with gr.Blocks(title="RAG") as demo: gr.Markdown("# RAG System") gr.Markdown("Query the document and see highlighted results (Link Google Drive: https://drive.google.com/drive/folders/1gQ-KCaTHIoYWxds_UnrDrGu4sE1yU8PJ?usp=sharing)") with gr.Row(): with gr.Column(scale=1): query_input = gr.Textbox(lines=5, label="Query", placeholder="Enter your question here...") with gr.Row(): gr.HTML("") # Empty space to push button to the right submit_btn = gr.Button("Submit", variant="primary", size="sm") with gr.Column(scale=1): response_output = gr.Textbox(lines=8, label="Response", interactive=True) session_id_output = gr.Textbox(lines=1, label="Current Session ID", interactive=False) feedback_output = gr.Textbox(lines=2, label="Feedback Status", interactive=False) with gr.Row(): gr.HTML("") # Empty space to push button to the right like_btn = gr.Button("Like", variant="primary", size="sm") dislike_btn = gr.Button("Dislike", variant="primary", size="sm") # Create tabs with HTML outputs for each chunk with gr.Tabs(): html_outputs = [] for i in range(15): # Support up to 5 chunks with gr.TabItem(f"Document Chunk {i+1}"): html_outputs.append(gr.HTML()) # Add User Prompt tab at the end with gr.TabItem("User Prompt"): user_prompt_output = gr.HTML(label="User Prompt") submit_btn.click( fn=response_generator, inputs=[query_input], outputs=[response_output, session_id_output] + html_outputs + [user_prompt_output] ).then( fn=clear_feedback, inputs=[], outputs=[feedback_output] ) like_btn.click( fn=get_like_feedback, inputs=[], outputs=[feedback_output] ) dislike_btn.click( fn=get_dislike_feedback, inputs=[], outputs=[feedback_output] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, share=True)