snote / ui.py
xuanbao01's picture
Upload folder using huggingface_hub
44c5827 verified
from typing import Any, Dict, List
from difflib import SequenceMatcher
import json, os, time, re
import gradio as gr
# Đường dẫn
current_dir = os.path.dirname(os.path.abspath(__file__))
converted_path = os.path.join(current_dir, "converted")
chunks_path = os.path.join(current_dir, "chunks")
def extract_sequence_from_id(chunk_id: str) -> int:
"""Trích xuất sequence number từ chunk ID"""
# Format: doc_id::CH7::A18::K4::P0::C63
match = re.search(r'::C(\d+)$', chunk_id)
if match:
return int(match.group(1))
return 0
def load_document_chunks(doc_id: str) -> List[Dict]:
"""Load tất cả chunks của một document và sắp xếp theo thứ tự"""
manifest_path = os.path.join(chunks_path, "chunks_manifest.json")
if not os.path.exists(manifest_path):
return []
with open(manifest_path, "r", encoding="utf-8") as f:
manifest = json.load(f)
# Lọc chunks của document này
doc_chunks = []
for chunk_info in manifest["chunks"]:
if chunk_info["id"].startswith(doc_id):
chunk_file_path = chunk_info["path"]
if os.path.exists(chunk_file_path):
with open(chunk_file_path, "r", encoding="utf-8") as f:
chunk_data = json.load(f)
doc_chunks.append(chunk_data)
# Sắp xếp theo sequence number
doc_chunks.sort(key=lambda x: extract_sequence_from_id(x["id"]))
return doc_chunks
def reconstruct_document(chunks: List[Dict]) -> str:
"""Tái tạo lại document từ các chunks"""
if not chunks:
return ""
document_parts = []
current_path = []
for chunk in chunks:
content_type = chunk.get("content_type", "text")
chunk_text = chunk.get("chunk_text", "")
path = chunk.get("path", [])
# Thêm headers từ path nếu có thay đổi
if path != current_path:
# Tìm phần tử mới trong path
for i, path_item in enumerate(path):
if i >= len(current_path) or path_item != current_path[i]:
# Thêm header mới
if path_item and path_item not in ["ROOT", "TABLE"]:
# Xác định level dựa trên vị trí trong path
level = i + 1
header_marker = "#" * min(level, 6) # Tối đa 6 dấu #
document_parts.append(f"\n{header_marker} {path_item}\n")
break
current_path = path
if content_type == "table":
# Thêm table với định dạng markdown
document_parts.append(f"\n{chunk_text}\n")
else:
# Thêm text thông thường
if chunk_text.strip():
document_parts.append(chunk_text)
return "\n".join(document_parts)
def find_text_positions_in_reconstructed_doc(text_to_find: str, reconstructed_doc: str) -> List[tuple]:
"""Tìm tất cả vị trí của text trong document đã tái tạo"""
positions = []
start = 0
while True:
pos = reconstructed_doc.find(text_to_find, start)
if pos == -1:
break
positions.append((pos, pos + len(text_to_find)))
start = pos + 1
return positions
def highlight_text_in_reconstructed_doc(texts_to_highlight: List[str], reconstructed_doc: str, chunks: List[Dict] = None) -> str:
"""Highlight text trong document đã tái tạo"""
if not texts_to_highlight:
return reconstructed_doc
# Tạo bản sao để highlight
highlighted_doc = reconstructed_doc
# Sắp xếp texts theo độ dài (dài trước) để tránh highlight overlap
sorted_texts = sorted(texts_to_highlight, key=len, reverse=True)
for i, text in enumerate(sorted_texts):
if not text.strip():
continue
# Tìm vị trí của text trong document đã tái tạo
positions = find_text_positions_in_reconstructed_doc(text, highlighted_doc)
# Nếu không tìm thấy trong document đã tái tạo và có chunks, tìm trong chunk_for_embedding
if not positions and chunks:
for chunk in chunks:
chunk_embedding = chunk.get('chunk_for_embedding', '')
if text in chunk_embedding:
# Thêm text vào document để highlight
highlighted_doc += f"\n\n{text}"
positions = [(len(highlighted_doc) - len(text), len(highlighted_doc))]
break
# Highlight từ cuối lên để không ảnh hưởng đến vị trí của các text khác
for start, end in reversed(positions):
highlighted_text = f'<span style="color:green; font-weight:bold; background-color:yellow;">{text}</span>'
highlighted_doc = highlighted_doc[:start] + highlighted_text + highlighted_doc[end:]
return highlighted_doc
def highlight_doc_with_chunks(doc_id: str, texts: List[str]) -> str:
"""Highlight document sử dụng chunks thay vì file markdown gốc"""
# Load tất cả chunks của document
chunks = load_document_chunks(doc_id)
if not chunks:
return f"⚠️ Không tìm thấy chunks cho document {doc_id}"
# Tái tạo lại document
reconstructed_doc = reconstruct_document(chunks)
if not reconstructed_doc.strip():
return f"⚠️ Document {doc_id} không có nội dung"
# Highlight text
highlighted_doc = highlight_text_in_reconstructed_doc(texts, reconstructed_doc, chunks)
# Thống kê
highlighted_count = 0
for text in texts:
if text.strip() and text in reconstructed_doc:
highlighted_count += 1
total = len([t for t in texts if t.strip()])
success_rate = (highlighted_count / total * 100) if total > 0 else 0.0
# Tính tổng tokens
total_tokens = sum(chunk.get('token_count', 0) for chunk in chunks)
total_chars = sum(chunk.get('text_length_chars', 0) for chunk in chunks)
summary = f"""
<div style='background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 5px;'>
<h3>Highlight Summary:</h3>
<p><strong>Document ID:</strong> {doc_id}</p>
<p><strong>Total chunks:</strong> {len(chunks)}</p>
<p><strong>Total tokens:</strong> {total_tokens:,}</p>
<p><strong>Total characters:</strong> {total_chars:,}</p>
<p><strong>Avg tokens per chunk:</strong> {total_tokens/len(chunks):.1f}</p>
<p><strong>Total texts to highlight:</strong> {total}</p>
<p><strong>Actually highlighted:</strong> {highlighted_count}</p>
<p><strong>Success rate:</strong> {success_rate:.1f}%</p>
</div>
"""
return summary + f"<pre style='white-space: pre-wrap;'>{highlighted_doc}</pre>"
def process_json_file(json_file: str) -> List[str]:
"""Đọc JSON đầu vào, trả về tối đa 5 output để hiển thị"""
if hasattr(json_file, 'name'):
file_path = json_file.name
else:
file_path = json_file
with open(file_path, "r", encoding="utf-8") as f:
session_output = json.load(f)
# Nhóm theo doc_id
doc_ids_set = set(item["doc_id"] for item in session_output)
chunks_retrieved = [{
"doc_id": doc_id,
"texts": [item["text"] for item in session_output if item["doc_id"] == doc_id]
} for doc_id in doc_ids_set]
# Highlight từng document
highlighted_texts = []
for chunk in chunks_retrieved:
highlighted_text = highlight_doc_with_chunks(chunk["doc_id"], chunk["texts"])
highlighted_texts.append(highlighted_text)
# Phân phối ra 5 outputs
while len(highlighted_texts) < 5:
highlighted_texts.append("")
return highlighted_texts[:5]
# ========== Gradio UI ==========
with gr.Blocks(title="RAG Document Viewer") as demo:
gr.Markdown("# RAG Document Viewer")
gr.Markdown("Upload a JSON file containing RAG results to view highlighted documents")
with gr.Row():
json_file_input = gr.File(label="JSON File", type="file")
submit_btn = gr.Button("Submit", variant="primary")
with gr.Row():
html_outputs = []
for i in range(5):
with gr.TabItem(f"Document {i+1}"):
html_outputs.append(gr.HTML())
submit_btn.click(
fn=process_json_file,
inputs=[json_file_input],
outputs=html_outputs
)
if __name__ == "__main__":
demo.launch()