File size: 8,664 Bytes
44c5827
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from typing import Any, Dict, List
from difflib import SequenceMatcher
import json, os, time, re
import gradio as gr

# Đường dẫn
current_dir = os.path.dirname(os.path.abspath(__file__))
converted_path = os.path.join(current_dir, "converted")
chunks_path = os.path.join(current_dir, "chunks")

def extract_sequence_from_id(chunk_id: str) -> int:
    """Trích xuất sequence number từ chunk ID"""
    # Format: doc_id::CH7::A18::K4::P0::C63
    match = re.search(r'::C(\d+)$', chunk_id)
    if match:
        return int(match.group(1))
    return 0

def load_document_chunks(doc_id: str) -> List[Dict]:
    """Load tất cả chunks của một document và sắp xếp theo thứ tự"""
    manifest_path = os.path.join(chunks_path, "chunks_manifest.json")
    
    if not os.path.exists(manifest_path):
        return []
    
    with open(manifest_path, "r", encoding="utf-8") as f:
        manifest = json.load(f)
    
    # Lọc chunks của document này
    doc_chunks = []
    for chunk_info in manifest["chunks"]:
        if chunk_info["id"].startswith(doc_id):
            chunk_file_path = chunk_info["path"]
            if os.path.exists(chunk_file_path):
                with open(chunk_file_path, "r", encoding="utf-8") as f:
                    chunk_data = json.load(f)
                    doc_chunks.append(chunk_data)
    
    # Sắp xếp theo sequence number
    doc_chunks.sort(key=lambda x: extract_sequence_from_id(x["id"]))
    return doc_chunks

def reconstruct_document(chunks: List[Dict]) -> str:
    """Tái tạo lại document từ các chunks"""
    if not chunks:
        return ""
    
    document_parts = []
    current_path = []
    
    for chunk in chunks:
        content_type = chunk.get("content_type", "text")
        chunk_text = chunk.get("chunk_text", "")
        path = chunk.get("path", [])
        
        # Thêm headers từ path nếu có thay đổi
        if path != current_path:
            # Tìm phần tử mới trong path
            for i, path_item in enumerate(path):
                if i >= len(current_path) or path_item != current_path[i]:
                    # Thêm header mới
                    if path_item and path_item not in ["ROOT", "TABLE"]:
                        # Xác định level dựa trên vị trí trong path
                        level = i + 1
                        header_marker = "#" * min(level, 6)  # Tối đa 6 dấu #
                        document_parts.append(f"\n{header_marker} {path_item}\n")
                    break
            current_path = path
        
        if content_type == "table":
            # Thêm table với định dạng markdown
            document_parts.append(f"\n{chunk_text}\n")
        else:
            # Thêm text thông thường
            if chunk_text.strip():
                document_parts.append(chunk_text)
    
    return "\n".join(document_parts)

def find_text_positions_in_reconstructed_doc(text_to_find: str, reconstructed_doc: str) -> List[tuple]:
    """Tìm tất cả vị trí của text trong document đã tái tạo"""
    positions = []
    start = 0
    
    while True:
        pos = reconstructed_doc.find(text_to_find, start)
        if pos == -1:
            break
        positions.append((pos, pos + len(text_to_find)))
        start = pos + 1
    
    return positions

def highlight_text_in_reconstructed_doc(texts_to_highlight: List[str], reconstructed_doc: str, chunks: List[Dict] = None) -> str:
    """Highlight text trong document đã tái tạo"""
    if not texts_to_highlight:
        return reconstructed_doc
    
    # Tạo bản sao để highlight
    highlighted_doc = reconstructed_doc
    
    # Sắp xếp texts theo độ dài (dài trước) để tránh highlight overlap
    sorted_texts = sorted(texts_to_highlight, key=len, reverse=True)
    
    for i, text in enumerate(sorted_texts):
        if not text.strip():
            continue
            
        # Tìm vị trí của text trong document đã tái tạo
        positions = find_text_positions_in_reconstructed_doc(text, highlighted_doc)
        
        # Nếu không tìm thấy trong document đã tái tạo và có chunks, tìm trong chunk_for_embedding
        if not positions and chunks:
            for chunk in chunks:
                chunk_embedding = chunk.get('chunk_for_embedding', '')
                if text in chunk_embedding:
                    # Thêm text vào document để highlight
                    highlighted_doc += f"\n\n{text}"
                    positions = [(len(highlighted_doc) - len(text), len(highlighted_doc))]
                    break
        
        # Highlight từ cuối lên để không ảnh hưởng đến vị trí của các text khác
        for start, end in reversed(positions):
            highlighted_text = f'<span style="color:green; font-weight:bold; background-color:yellow;">{text}</span>'
            highlighted_doc = highlighted_doc[:start] + highlighted_text + highlighted_doc[end:]
    
    return highlighted_doc

def highlight_doc_with_chunks(doc_id: str, texts: List[str]) -> str:
    """Highlight document sử dụng chunks thay vì file markdown gốc"""
    # Load tất cả chunks của document
    chunks = load_document_chunks(doc_id)
    
    if not chunks:
        return f"⚠️ Không tìm thấy chunks cho document {doc_id}"
    
    # Tái tạo lại document
    reconstructed_doc = reconstruct_document(chunks)
    
    if not reconstructed_doc.strip():
        return f"⚠️ Document {doc_id} không có nội dung"
    
    # Highlight text
    highlighted_doc = highlight_text_in_reconstructed_doc(texts, reconstructed_doc, chunks)
    
    # Thống kê
    highlighted_count = 0
    for text in texts:
        if text.strip() and text in reconstructed_doc:
            highlighted_count += 1
    
    total = len([t for t in texts if t.strip()])
    success_rate = (highlighted_count / total * 100) if total > 0 else 0.0
    
    # Tính tổng tokens
    total_tokens = sum(chunk.get('token_count', 0) for chunk in chunks)
    total_chars = sum(chunk.get('text_length_chars', 0) for chunk in chunks)
    
    summary = f"""
    <div style='background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 5px;'>
        <h3>Highlight Summary:</h3>
        <p><strong>Document ID:</strong> {doc_id}</p>
        <p><strong>Total chunks:</strong> {len(chunks)}</p>
        <p><strong>Total tokens:</strong> {total_tokens:,}</p>
        <p><strong>Total characters:</strong> {total_chars:,}</p>
        <p><strong>Avg tokens per chunk:</strong> {total_tokens/len(chunks):.1f}</p>
        <p><strong>Total texts to highlight:</strong> {total}</p>
        <p><strong>Actually highlighted:</strong> {highlighted_count}</p>
        <p><strong>Success rate:</strong> {success_rate:.1f}%</p>
    </div>
    """
    
    return summary + f"<pre style='white-space: pre-wrap;'>{highlighted_doc}</pre>"

def process_json_file(json_file: str) -> List[str]:
    """Đọc JSON đầu vào, trả về tối đa 5 output để hiển thị"""
    if hasattr(json_file, 'name'):
        file_path = json_file.name
    else:
        file_path = json_file
    
    with open(file_path, "r", encoding="utf-8") as f:
        session_output = json.load(f)

    # Nhóm theo doc_id
    doc_ids_set = set(item["doc_id"] for item in session_output)
    chunks_retrieved = [{
        "doc_id": doc_id,
        "texts": [item["text"] for item in session_output if item["doc_id"] == doc_id]
    } for doc_id in doc_ids_set]

    # Highlight từng document
    highlighted_texts = []
    for chunk in chunks_retrieved:
        highlighted_text = highlight_doc_with_chunks(chunk["doc_id"], chunk["texts"])
        highlighted_texts.append(highlighted_text)

    # Phân phối ra 5 outputs
    while len(highlighted_texts) < 5:
        highlighted_texts.append("")

    return highlighted_texts[:5]

# ========== Gradio UI ==========
with gr.Blocks(title="RAG Document Viewer") as demo:
    gr.Markdown("# RAG Document Viewer")
    gr.Markdown("Upload a JSON file containing RAG results to view highlighted documents")
    
    with gr.Row():
        json_file_input = gr.File(label="JSON File", type="file")
        submit_btn = gr.Button("Submit", variant="primary")
    
    with gr.Row():
        html_outputs = []
        for i in range(5):
            with gr.TabItem(f"Document {i+1}"):
                html_outputs.append(gr.HTML())

    submit_btn.click(
        fn=process_json_file,
        inputs=[json_file_input],
        outputs=html_outputs
    )

if __name__ == "__main__":
    demo.launch()