File size: 8,664 Bytes
44c5827 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
from typing import Any, Dict, List
from difflib import SequenceMatcher
import json, os, time, re
import gradio as gr
# Đường dẫn
current_dir = os.path.dirname(os.path.abspath(__file__))
converted_path = os.path.join(current_dir, "converted")
chunks_path = os.path.join(current_dir, "chunks")
def extract_sequence_from_id(chunk_id: str) -> int:
"""Trích xuất sequence number từ chunk ID"""
# Format: doc_id::CH7::A18::K4::P0::C63
match = re.search(r'::C(\d+)$', chunk_id)
if match:
return int(match.group(1))
return 0
def load_document_chunks(doc_id: str) -> List[Dict]:
"""Load tất cả chunks của một document và sắp xếp theo thứ tự"""
manifest_path = os.path.join(chunks_path, "chunks_manifest.json")
if not os.path.exists(manifest_path):
return []
with open(manifest_path, "r", encoding="utf-8") as f:
manifest = json.load(f)
# Lọc chunks của document này
doc_chunks = []
for chunk_info in manifest["chunks"]:
if chunk_info["id"].startswith(doc_id):
chunk_file_path = chunk_info["path"]
if os.path.exists(chunk_file_path):
with open(chunk_file_path, "r", encoding="utf-8") as f:
chunk_data = json.load(f)
doc_chunks.append(chunk_data)
# Sắp xếp theo sequence number
doc_chunks.sort(key=lambda x: extract_sequence_from_id(x["id"]))
return doc_chunks
def reconstruct_document(chunks: List[Dict]) -> str:
"""Tái tạo lại document từ các chunks"""
if not chunks:
return ""
document_parts = []
current_path = []
for chunk in chunks:
content_type = chunk.get("content_type", "text")
chunk_text = chunk.get("chunk_text", "")
path = chunk.get("path", [])
# Thêm headers từ path nếu có thay đổi
if path != current_path:
# Tìm phần tử mới trong path
for i, path_item in enumerate(path):
if i >= len(current_path) or path_item != current_path[i]:
# Thêm header mới
if path_item and path_item not in ["ROOT", "TABLE"]:
# Xác định level dựa trên vị trí trong path
level = i + 1
header_marker = "#" * min(level, 6) # Tối đa 6 dấu #
document_parts.append(f"\n{header_marker} {path_item}\n")
break
current_path = path
if content_type == "table":
# Thêm table với định dạng markdown
document_parts.append(f"\n{chunk_text}\n")
else:
# Thêm text thông thường
if chunk_text.strip():
document_parts.append(chunk_text)
return "\n".join(document_parts)
def find_text_positions_in_reconstructed_doc(text_to_find: str, reconstructed_doc: str) -> List[tuple]:
"""Tìm tất cả vị trí của text trong document đã tái tạo"""
positions = []
start = 0
while True:
pos = reconstructed_doc.find(text_to_find, start)
if pos == -1:
break
positions.append((pos, pos + len(text_to_find)))
start = pos + 1
return positions
def highlight_text_in_reconstructed_doc(texts_to_highlight: List[str], reconstructed_doc: str, chunks: List[Dict] = None) -> str:
"""Highlight text trong document đã tái tạo"""
if not texts_to_highlight:
return reconstructed_doc
# Tạo bản sao để highlight
highlighted_doc = reconstructed_doc
# Sắp xếp texts theo độ dài (dài trước) để tránh highlight overlap
sorted_texts = sorted(texts_to_highlight, key=len, reverse=True)
for i, text in enumerate(sorted_texts):
if not text.strip():
continue
# Tìm vị trí của text trong document đã tái tạo
positions = find_text_positions_in_reconstructed_doc(text, highlighted_doc)
# Nếu không tìm thấy trong document đã tái tạo và có chunks, tìm trong chunk_for_embedding
if not positions and chunks:
for chunk in chunks:
chunk_embedding = chunk.get('chunk_for_embedding', '')
if text in chunk_embedding:
# Thêm text vào document để highlight
highlighted_doc += f"\n\n{text}"
positions = [(len(highlighted_doc) - len(text), len(highlighted_doc))]
break
# Highlight từ cuối lên để không ảnh hưởng đến vị trí của các text khác
for start, end in reversed(positions):
highlighted_text = f'<span style="color:green; font-weight:bold; background-color:yellow;">{text}</span>'
highlighted_doc = highlighted_doc[:start] + highlighted_text + highlighted_doc[end:]
return highlighted_doc
def highlight_doc_with_chunks(doc_id: str, texts: List[str]) -> str:
"""Highlight document sử dụng chunks thay vì file markdown gốc"""
# Load tất cả chunks của document
chunks = load_document_chunks(doc_id)
if not chunks:
return f"⚠️ Không tìm thấy chunks cho document {doc_id}"
# Tái tạo lại document
reconstructed_doc = reconstruct_document(chunks)
if not reconstructed_doc.strip():
return f"⚠️ Document {doc_id} không có nội dung"
# Highlight text
highlighted_doc = highlight_text_in_reconstructed_doc(texts, reconstructed_doc, chunks)
# Thống kê
highlighted_count = 0
for text in texts:
if text.strip() and text in reconstructed_doc:
highlighted_count += 1
total = len([t for t in texts if t.strip()])
success_rate = (highlighted_count / total * 100) if total > 0 else 0.0
# Tính tổng tokens
total_tokens = sum(chunk.get('token_count', 0) for chunk in chunks)
total_chars = sum(chunk.get('text_length_chars', 0) for chunk in chunks)
summary = f"""
<div style='background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 5px;'>
<h3>Highlight Summary:</h3>
<p><strong>Document ID:</strong> {doc_id}</p>
<p><strong>Total chunks:</strong> {len(chunks)}</p>
<p><strong>Total tokens:</strong> {total_tokens:,}</p>
<p><strong>Total characters:</strong> {total_chars:,}</p>
<p><strong>Avg tokens per chunk:</strong> {total_tokens/len(chunks):.1f}</p>
<p><strong>Total texts to highlight:</strong> {total}</p>
<p><strong>Actually highlighted:</strong> {highlighted_count}</p>
<p><strong>Success rate:</strong> {success_rate:.1f}%</p>
</div>
"""
return summary + f"<pre style='white-space: pre-wrap;'>{highlighted_doc}</pre>"
def process_json_file(json_file: str) -> List[str]:
"""Đọc JSON đầu vào, trả về tối đa 5 output để hiển thị"""
if hasattr(json_file, 'name'):
file_path = json_file.name
else:
file_path = json_file
with open(file_path, "r", encoding="utf-8") as f:
session_output = json.load(f)
# Nhóm theo doc_id
doc_ids_set = set(item["doc_id"] for item in session_output)
chunks_retrieved = [{
"doc_id": doc_id,
"texts": [item["text"] for item in session_output if item["doc_id"] == doc_id]
} for doc_id in doc_ids_set]
# Highlight từng document
highlighted_texts = []
for chunk in chunks_retrieved:
highlighted_text = highlight_doc_with_chunks(chunk["doc_id"], chunk["texts"])
highlighted_texts.append(highlighted_text)
# Phân phối ra 5 outputs
while len(highlighted_texts) < 5:
highlighted_texts.append("")
return highlighted_texts[:5]
# ========== Gradio UI ==========
with gr.Blocks(title="RAG Document Viewer") as demo:
gr.Markdown("# RAG Document Viewer")
gr.Markdown("Upload a JSON file containing RAG results to view highlighted documents")
with gr.Row():
json_file_input = gr.File(label="JSON File", type="file")
submit_btn = gr.Button("Submit", variant="primary")
with gr.Row():
html_outputs = []
for i in range(5):
with gr.TabItem(f"Document {i+1}"):
html_outputs.append(gr.HTML())
submit_btn.click(
fn=process_json_file,
inputs=[json_file_input],
outputs=html_outputs
)
if __name__ == "__main__":
demo.launch()
|