Spaces:
Sleeping
Sleeping
| import re | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Dict, Any | |
| from .llm_extractor import AzureO1MedicationExtractor | |
| logger = logging.getLogger(__name__) | |
| class ReasoningSectionExtractor: | |
| def __init__(self, endpoint, api_key, api_version, deployment): | |
| self.llm_extractor = AzureO1MedicationExtractor( | |
| endpoint=endpoint, | |
| api_key=api_key, | |
| api_version=api_version, | |
| deployment=deployment, | |
| ) | |
| def remove_sections_from_json(self, doc_json: Dict[str, Any]) -> Dict[str, Any]: | |
| extraction_result = self.llm_extractor.extract_medication_sections(doc_json) | |
| indices_to_remove = extraction_result["indices_to_remove"] | |
| reasoning = extraction_result.get("reasoning", {}) | |
| # Log detailed reasoning for transparency | |
| logger.info(f"LLM reasoning summary: {reasoning}") | |
| # Get the texts for detailed logging | |
| texts = doc_json.get("texts", []) | |
| # Provide specific feedback about what was removed | |
| if indices_to_remove: | |
| logger.info(f"Removing {len(indices_to_remove)} text elements: {indices_to_remove}") | |
| # Categorize and show what specific content is being removed | |
| medication_headers = [] | |
| medication_items = [] | |
| other_content = [] | |
| for idx in indices_to_remove: | |
| if idx < len(texts): | |
| text_content = texts[idx].get("text", "") | |
| text_label = texts[idx].get("label", "") | |
| # Categorize the content | |
| if any(keyword in text_content.lower() for keyword in ['medicatie', 'thuismedicatie', 'medication', 'drugs']): | |
| medication_headers.append((idx, text_content)) | |
| elif any(keyword in text_content.lower() for keyword in ['tablet', 'capsule', 'mg', 'ml', 'zakje', 'oral', 'maal daags']): | |
| medication_items.append((idx, text_content)) | |
| else: | |
| other_content.append((idx, text_content)) | |
| # Log with more detail | |
| logger.info(f" → Removing index {idx} ({text_label}): '{text_content[:150]}{'...' if len(text_content) > 150 else ''}'") | |
| else: | |
| logger.warning(f" → Invalid index {idx}: exceeds document length ({len(texts)})") | |
| # Summary of what was categorized | |
| if medication_headers: | |
| logger.info(f"Medication headers removed: {len(medication_headers)} items") | |
| for idx, content in medication_headers: | |
| logger.info(f" Header {idx}: {content}") | |
| if medication_items: | |
| logger.info(f"Medication items removed: {len(medication_items)} items") | |
| for idx, content in medication_items[:5]: # Show first 5 to avoid spam | |
| logger.info(f" Item {idx}: {content[:100]}...") | |
| if len(medication_items) > 5: | |
| logger.info(f" ... and {len(medication_items) - 5} more medication items") | |
| if other_content: | |
| logger.warning(f"⚠️ NON-MEDICATION content removed: {len(other_content)} items") | |
| for idx, content in other_content: | |
| logger.warning(f" ⚠️ Index {idx}: {content[:200]}...") | |
| logger.warning("⚠️ Please review: non-medication content was removed - this may indicate an issue with the LLM detection") | |
| else: | |
| logger.info("No formal medication lists identified for removal") | |
| # Remove the identified text elements | |
| import copy | |
| redacted_json = copy.deepcopy(doc_json) | |
| texts = redacted_json.get("texts", []) | |
| redacted_texts = [t for i, t in enumerate(texts) if i not in indices_to_remove] | |
| redacted_json["texts"] = redacted_texts | |
| # Log the result | |
| removed_count = len(texts) - len(redacted_texts) | |
| logger.info(f"Successfully removed {removed_count} text elements from document structure") | |
| logger.info(f"Document structure: {len(texts)} → {len(redacted_texts)} text elements") | |
| return redacted_json | |
| def remove_sections(self, text: str) -> str: | |
| """ | |
| Remove sections from markdown text. This is a fallback method for compatibility. | |
| Since ReasoningSectionExtractor works with JSON structure, this method | |
| returns the original text (no redaction) as the JSON-based approach is preferred. | |
| """ | |
| logger.warning("ReasoningSectionExtractor.remove_sections() called - this method is not implemented for text-based redaction. Use remove_sections_from_json() instead.") | |
| return text | |
| class SectionDefinition: | |
| """Defines a section to extract/remove by specifying its start (and optional end) regex.""" | |
| name: str | |
| start_pattern: str # Regex pattern to identify the section start (use multiline anchors as needed) | |
| end_pattern: Optional[str] = None # Regex for section end, or None if it goes until next section or EOF | |
| class SectionExtractor: | |
| """Finds and removes specified sections from document content.""" | |
| def __init__(self, sections: List[SectionDefinition]): | |
| # Compile regex patterns for performance | |
| self.sections = [ | |
| SectionDefinition(sec.name, re.compile(sec.start_pattern), re.compile(sec.end_pattern) if sec.end_pattern else None) | |
| for sec in sections | |
| ] | |
| def remove_sections(self, text: str) -> str: | |
| """ | |
| Remove all defined sections from the given text. Returns the redacted text. | |
| The text is expected to be the full document content (in Markdown or plain text form). | |
| """ | |
| logger.info("Removing sections from text...") | |
| if not self.sections: | |
| return text # nothing to remove | |
| to_remove_ranges = [] # will hold (start_index, end_index) for removal | |
| # Find all section start positions | |
| for sec in self.sections: | |
| match = sec.start_pattern.search(text) | |
| if match: | |
| start_idx = match.start() | |
| # Determine end of section | |
| if sec.end_pattern: | |
| end_match = sec.end_pattern.search(text, start_idx) | |
| if end_match: | |
| # End pattern found; end index is start of end_match | |
| end_idx = end_match.start() | |
| else: | |
| end_idx = len(text) # if no end pattern found, remove till end | |
| else: | |
| end_idx = len(text) # default end is end-of-text (will adjust later if there's another section) | |
| to_remove_ranges.append((start_idx, end_idx, sec.name)) | |
| logger.info(f"Marked section '{sec.name}' for removal (positions {start_idx}-{end_idx})") | |
| else: | |
| logger.info(f"Section '{sec.name}' not found in text (pattern: {sec.start_pattern.pattern})") | |
| if not to_remove_ranges: | |
| logger.info("No sections to remove.") | |
| return text | |
| # Sort ranges by start index | |
| to_remove_ranges.sort(key=lambda x: x[0]) | |
| # If sections overlap or touch, adjust ranges to avoid double-counting | |
| redacted_text = "" | |
| current_idx = 0 | |
| for start_idx, end_idx, sec_name in to_remove_ranges: | |
| # Append text from current_idx up to start_idx (keeping content before section) | |
| if current_idx < start_idx: | |
| redacted_text += text[current_idx:start_idx] | |
| else: | |
| # Overlapping section (or consecutive) – already handled by previous removal | |
| logger.warning(f"Section '{sec_name}' overlaps with a previous section removal region.") | |
| current_idx = max(current_idx, end_idx) | |
| # Append any remaining text after last removed section | |
| if current_idx < len(text): | |
| redacted_text += text[current_idx:] | |
| return redacted_text | |
| def remove_sections_from_json(self, doc_json: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Remove specified sections from the structured JSON document. | |
| This method works with the Docling JSON structure to identify and remove | |
| sections based on their semantic content rather than just text patterns. | |
| """ | |
| logger.info("Removing sections from structured JSON...") | |
| if not self.sections: | |
| return doc_json # nothing to remove | |
| # Create a deep copy to avoid modifying the original | |
| import copy | |
| redacted_json = copy.deepcopy(doc_json) | |
| # Get all text elements from the document | |
| texts = redacted_json.get("texts", []) | |
| if not texts: | |
| logger.warning("No texts found in document JSON") | |
| return redacted_json | |
| # Find text elements that match our section patterns | |
| text_indices_to_remove = set() | |
| for sec in self.sections: | |
| logger.info(f"Looking for section '{sec.name}' with pattern: {sec.start_pattern.pattern}") | |
| # Find text elements that match the section start pattern | |
| for i, text_elem in enumerate(texts): | |
| text_content = text_elem.get("text", "") | |
| if sec.start_pattern.search(text_content): | |
| logger.info(f"Found section '{sec.name}' in text element {i}: '{text_content[:50]}...'") | |
| text_indices_to_remove.add(i) | |
| # If we have an end pattern, also remove subsequent text elements until we find the end | |
| if sec.end_pattern: | |
| for j in range(i + 1, len(texts)): | |
| next_text_content = texts[j].get("text", "") | |
| if sec.end_pattern.search(next_text_content): | |
| logger.info(f"Found end of section '{sec.name}' in text element {j}") | |
| break | |
| text_indices_to_remove.add(j) | |
| else: | |
| # No end pattern - remove this text element only | |
| # For medication lists, we might want to remove the next few elements too | |
| # This is a heuristic that could be made more sophisticated | |
| if "medication" in sec.name.lower(): | |
| # Remove up to 3 subsequent text elements for medication lists | |
| for j in range(i + 1, min(i + 4, len(texts))): | |
| text_indices_to_remove.add(j) | |
| # Remove the identified text elements | |
| if text_indices_to_remove: | |
| logger.info(f"Removing {len(text_indices_to_remove)} text elements: {sorted(text_indices_to_remove)}") | |
| # Remove from texts array | |
| redacted_texts = [texts[i] for i in range(len(texts)) if i not in text_indices_to_remove] | |
| redacted_json["texts"] = redacted_texts | |
| # Update body children to remove references to deleted texts | |
| body = redacted_json.get("body", {}) | |
| if "children" in body: | |
| # Filter out references to removed text elements | |
| original_children = body["children"] | |
| redacted_children = [] | |
| for child_ref in original_children: | |
| if "$ref" in child_ref: | |
| ref_path = child_ref["$ref"] | |
| # Check if this reference points to a text element we're keeping | |
| if ref_path.startswith("#/texts/"): | |
| try: | |
| text_index = int(ref_path.split("/")[-1]) | |
| if text_index not in text_indices_to_remove: | |
| # Adjust the reference index since we removed some texts | |
| new_index = text_index - sum(1 for x in text_indices_to_remove if x < text_index) | |
| child_ref["$ref"] = f"#/texts/{new_index}" | |
| redacted_children.append(child_ref) | |
| except (ValueError, IndexError): | |
| # Keep the reference if we can't parse it | |
| redacted_children.append(child_ref) | |
| else: | |
| # Keep non-text references | |
| redacted_children.append(child_ref) | |
| else: | |
| # Keep non-reference children | |
| redacted_children.append(child_ref) | |
| body["children"] = redacted_children | |
| else: | |
| logger.info("No sections found to remove") | |
| return redacted_json | |