File size: 12,379 Bytes
44c5827
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3eac7db
44c5827
 
 
 
3eac7db
 
 
 
44c5827
 
 
 
3eac7db
 
 
 
 
 
 
44c5827
 
 
 
 
 
 
 
 
 
 
 
 
 
3eac7db
 
44c5827
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3eac7db
 
 
 
 
44c5827
 
 
 
3eac7db
44c5827
 
 
 
3eac7db
44c5827
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
from typing import Any, Dict, List
import gradio as gr
import json
import re
from difflib import SequenceMatcher
from scripts.main import main, feedback
import os
from utils import count_tokens
current_dir = os.path.dirname(os.path.abspath(__file__))
output_path = os.path.join(current_dir, "output.json")
converted_path = os.path.join(current_dir, "converted")

def extract_sequence_from_id(chunk_id: str) -> int:
    """Trích xuất sequence number từ chunk ID"""
    # Format: doc_id::CH7::A18::K4::P0::C63
    match = re.search(r'::C(\d+)$', chunk_id)
    if match:
        return int(match.group(1))
    return 0

def load_document_chunks(doc_id: str) -> List[Dict]:
    """Load tất cả chunks của một document và sắp xếp theo thứ tự"""
    import os
    current_dir = os.path.dirname(os.path.abspath(__file__))
    chunks_path = os.path.join(current_dir, "chunks")
    manifest_path = os.path.join(chunks_path, "chunks_manifest.json")
    
    if not os.path.exists(manifest_path):
        return []
    
    with open(manifest_path, "r", encoding="utf-8") as f:
        manifest = json.load(f)
    
    # Lọc chunks của document này
    doc_chunks = []
    for chunk_info in manifest["chunks"]:
        if chunk_info["id"].startswith(doc_id):
            chunk_file_path = chunk_info["path"]
            if os.path.exists(chunk_file_path):
                with open(chunk_file_path, "r", encoding="utf-8") as f:
                    chunk_data = json.load(f)
                    doc_chunks.append(chunk_data)
    
    # Sắp xếp theo sequence number
    doc_chunks.sort(key=lambda x: extract_sequence_from_id(x["id"]))
    return doc_chunks

def reconstruct_document(chunks: List[Dict]) -> str:
    """Tái tạo lại document từ các chunks"""
    if not chunks:
        return ""
    
    document_parts = []
    current_path = []
    
    for chunk in chunks:
        content_type = chunk.get("content_type", "text")
        chunk_text = chunk.get("chunk_text", "")
        path = chunk.get("path", [])
        
        # Thêm headers từ path nếu có thay đổi
        if path != current_path:
            # Tìm phần tử mới trong path
            for i, path_item in enumerate(path):
                if i >= len(current_path) or path_item != current_path[i]:
                    # Thêm header mới
                    if path_item and path_item not in ["ROOT", "TABLE"]:
                        # Xác định level dựa trên vị trí trong path
                        level = i + 1
                        # Đặc biệt xử lý cho "Điểm" - sử dụng level 4 (####)
                        if "Điểm" in path_item:
                            header_marker = "####"
                        else:
                            header_marker = "#" * min(level, 6)  # Tối đa 6 dấu #
                        document_parts.append(f"\n{header_marker} {path_item}\n")
                    break
            current_path = path
        
        if content_type == "table":
            # Thêm table với định dạng markdown
            document_parts.append(f"\n{chunk_text}\n")
        else:
            # Thêm text thông thường, bao gồm cả markdown headings
            if chunk_text.strip():
                document_parts.append(chunk_text)
    
    return "\n".join(document_parts)

def find_text_positions_in_reconstructed_doc(text_to_find: str, reconstructed_doc: str) -> List[tuple]:
    """Tìm tất cả vị trí của text trong document đã tái tạo"""
    positions = []
    start = 0
    
    while True:
        pos = reconstructed_doc.find(text_to_find, start)
        if pos == -1:
            break
        positions.append((pos, pos + len(text_to_find)))
        start = pos + 1
    
    return positions

def highlight_text_in_reconstructed_doc(texts_to_highlight: List[str], reconstructed_doc: str, chunks: List[Dict] = None) -> str:
    """Highlight text trong document đã tái tạo"""
    if not texts_to_highlight:
        return reconstructed_doc
    
    # Tạo bản sao để highlight
    highlighted_doc = reconstructed_doc
    
    # Sắp xếp texts theo độ dài (dài trước) để tránh highlight overlap
    sorted_texts = sorted(texts_to_highlight, key=len, reverse=True)
    
    for i, text in enumerate(sorted_texts):
        if not text.strip():
            continue
            
        # Tìm vị trí của text trong document đã tái tạo
        positions = find_text_positions_in_reconstructed_doc(text, highlighted_doc)
        
        # Nếu không tìm thấy trong document đã tái tạo và có chunks, tìm trong chunk_for_embedding
        if not positions and chunks:
            for chunk in chunks:
                chunk_embedding = chunk.get('chunk_for_embedding', '')
                if text in chunk_embedding:
                    # Thêm text vào document để highlight
                    highlighted_doc += f"\n\n{text}"
                    positions = [(len(highlighted_doc) - len(text), len(highlighted_doc))]
                    break
        
        # Highlight từ cuối lên để không ảnh hưởng đến vị trí của các text khác
        for start, end in reversed(positions):
            token_count = count_tokens(text)
            highlighted_text = f'<span style="color:green; font-weight:bold; background-color:yellow;">{text}</span> ({token_count} tokens)'
            highlighted_doc = highlighted_doc[:start] + highlighted_text + highlighted_doc[end:]
    
    return highlighted_doc

def format_highlighted_doc(highlighted_doc: str) -> str:
    """Format highlighted doc to be more readable"""
    # Convert markdown headings to HTML headings
    import re
    
    # Convert # to h1, ## to h2, ### to h3, #### to h4
    # Handle both with and without leading spaces
    formatted_doc = re.sub(r'^\s*# (.+)$', r'<h1>\1</h1>', highlighted_doc, flags=re.MULTILINE)
    formatted_doc = re.sub(r'^\s*## (.+)$', r'<h2>\1</h2>', formatted_doc, flags=re.MULTILINE)
    formatted_doc = re.sub(r'^\s*### (.+)$', r'<h3>\1</h3>', formatted_doc, flags=re.MULTILINE)
    formatted_doc = re.sub(r'^\s*#### (.+)$', r'<h4>\1</h4>', formatted_doc, flags=re.MULTILINE)
    
    # Convert newlines to <br> for HTML display
    formatted_doc = formatted_doc.replace("\n", "<br>")
    return formatted_doc

def highlight_doc_with_chunks(doc_id: str, texts: List[str]) -> str:
    """Highlight document sử dụng chunks thay vì file markdown gốc"""
    # Load tất cả chunks của document
    chunks = load_document_chunks(doc_id)
    
    if not chunks:
        return f"⚠️ Không tìm thấy chunks cho document {doc_id}"
    
    # Tái tạo lại document
    reconstructed_doc = reconstruct_document(chunks)
    
    if not reconstructed_doc.strip():
        return f"⚠️ Document {doc_id} không có nội dung"
    
    # Highlight text
    highlighted_doc = highlight_text_in_reconstructed_doc(texts, reconstructed_doc, chunks)
    
    # Thống kê
    highlighted_count = 0
    for text in texts:
        if text.strip() and text in reconstructed_doc:
            highlighted_count += 1
    total = len([t for t in texts if t.strip()])
    success_rate = (highlighted_count / total * 100) if total > 0 else 0.0
    
    summary = f"""
    <div style='background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 5px;'>
        <h3>Highlight Summary:</h3>
        <p><strong>Document ID:</strong> {doc_id}</p>
        <p><strong>Actually highlighted:</strong> {highlighted_count}</p>
        <p><strong>Success rate:</strong> {success_rate:.1f}%</p>
    </div>
    """
    
    return summary + f"<pre style='white-space: pre-wrap;'>{format_highlighted_doc(highlighted_doc)}</pre>"

def format_user_prompt(user_prompt: str) -> str:
    """Format user prompt to be more readable"""
    # Make "Chunk" bold using HTML
    token_count = count_tokens(user_prompt)
    formatted_prompt = user_prompt.replace("Chunk", "<strong>Chunk</strong>")
    # Convert newlines to <br> for HTML display
    formatted_prompt = formatted_prompt.replace("\n", "<br>")
    # Add token count at the beginning
    formatted_prompt = f"<p><strong>Total tokens:</strong> {token_count}</p><br>" + formatted_prompt
    return formatted_prompt

# Global variable to store current session_id
current_session_id = None

def get_feedback(is_like: bool, session_id: str):
    return feedback(is_like, session_id)

def response_generator(query: str, top_k: int = 20, top_n: int = 10):
    global current_session_id
    response, session_id = main(query, top_k=top_k, top_n=top_n)
    current_session_id = session_id  # Store session_id globally
    
    session_path = f"sessions/{session_id}.json"
    with open(session_path, "r", encoding="utf-8") as f:
        session_output = json.load(f)
    
    rag_results = session_output[0]["rag_results"]
    user_prompt = session_output[0]["user_prompt"]
    doc_ids_set = set([item["doc_id"] for item in rag_results])
    chunks_retrieved = [{
        "doc_id": doc_id,
        "texts": [item["text"] for item in rag_results if item["doc_id"] == doc_id]
    } for doc_id in doc_ids_set]

    highlighted_texts = [highlight_doc_with_chunks(chunk["doc_id"], chunk["texts"]) for chunk in chunks_retrieved]
    user_prompt = format_user_prompt(user_prompt)
    # phân phối ra 5 outputs
    while len(highlighted_texts) < 15:
        highlighted_texts.append("")

    # Unpack the list into individual return values
    return response, current_session_id, *highlighted_texts, user_prompt

def get_like_feedback():
    global current_session_id
    if current_session_id:
        result = get_feedback(True, current_session_id)
        print(f"Like feedback: {result}")  # Debug print
        return f"✅ {result}"
    return "❌ No active session"

def get_dislike_feedback():
    global current_session_id
    if current_session_id:
        result = get_feedback(False, current_session_id)
        print(f"Dislike feedback: {result}")  # Debug print
        return f"👎 {result}"
    return "❌ No active session"

def clear_feedback():
    return ""

# Create the interface with multiple outputs
with gr.Blocks(title="RAG") as demo:
    gr.Markdown("# RAG System")
    gr.Markdown("Query the document and see highlighted results (Link Google Drive: https://drive.google.com/drive/folders/1gQ-KCaTHIoYWxds_UnrDrGu4sE1yU8PJ?usp=sharing)")
    
    with gr.Row():
        with gr.Column(scale=1):
            query_input = gr.Textbox(lines=5, label="Query", placeholder="Enter your question here...")
            with gr.Row():
                gr.HTML("")  # Empty space to push button to the right
                submit_btn = gr.Button("Submit", variant="primary", size="sm")
        with gr.Column(scale=1):
            response_output = gr.Textbox(lines=8, label="Response", interactive=True)
            session_id_output = gr.Textbox(lines=1, label="Current Session ID", interactive=False)
            feedback_output = gr.Textbox(lines=2, label="Feedback Status", interactive=False)
            with gr.Row():
                gr.HTML("")  # Empty space to push button to the right
                like_btn = gr.Button("Like", variant="primary", size="sm")
                dislike_btn = gr.Button("Dislike", variant="primary", size="sm")

    # Create tabs with HTML outputs for each chunk
    with gr.Tabs():
        html_outputs = []
        for i in range(15):  # Support up to 5 chunks
            with gr.TabItem(f"Document Chunk {i+1}"):
                html_outputs.append(gr.HTML())
        
        # Add User Prompt tab at the end
        with gr.TabItem("User Prompt"):
            user_prompt_output = gr.HTML(label="User Prompt")
    
    submit_btn.click(
        fn=response_generator,
        inputs=[query_input],
        outputs=[response_output, session_id_output] + html_outputs + [user_prompt_output]
    ).then(
        fn=clear_feedback,
        inputs=[],
        outputs=[feedback_output]
    )
    like_btn.click(
        fn=get_like_feedback,
        inputs=[],
        outputs=[feedback_output]
    )
    dislike_btn.click(
        fn=get_dislike_feedback,
        inputs=[],
        outputs=[feedback_output]
    )

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860, share=True)