Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import logging | |
| import json | |
| from dataclasses import dataclass | |
| from typing import Optional, Tuple | |
| # Don't import DocumentConverter at module level to prevent early initialization | |
| # from docling.document_converter import DocumentConverter | |
| from processing.sections import SectionExtractor | |
| from utils.cost_tracker import cost_tracker | |
| # Remove global converter initialization - will be done lazily | |
| # _docling_converter = DocumentConverter() | |
| logger = logging.getLogger(__name__) # Logger for this module | |
| class DocumentResult: | |
| """Holds processed results for a document.""" | |
| file_path: str | |
| structured_markdown: str | |
| structured_json: dict | |
| structured_yaml: str # Add YAML format | |
| redacted_markdown: str | |
| redacted_json: dict | |
| raw_text: str # Add raw text without preprocessing | |
| class ProcessingResult: | |
| """Simple result for Jupyter notebook usage.""" | |
| original_document_md: str | |
| redacted_document_md: str | |
| original_document_json: dict | |
| original_document_yaml: str # Add YAML format | |
| redacted_document_json: dict | |
| raw_text: str # Add raw text without preprocessing | |
| removed_indices: list # Add the actual indices that were removed | |
| input_tokens: int | |
| output_tokens: int | |
| cost: float | |
| def process_document_with_redaction( | |
| file_path: str, | |
| endpoint: str, | |
| api_key: str, | |
| api_version: str, | |
| deployment: str, | |
| section_extractor: Optional[SectionExtractor] = None | |
| ) -> ProcessingResult: | |
| """ | |
| Process a document and return a simple tuple with results. | |
| Args: | |
| file_path: Path to the PDF file to process | |
| endpoint: Azure OpenAI endpoint | |
| api_key: Azure OpenAI API key | |
| api_version: Azure OpenAI API version | |
| deployment: Azure OpenAI deployment name | |
| section_extractor: Optional custom section extractor | |
| Returns: | |
| ProcessingResult with (original_document_md, redacted_document_md, input_tokens, output_tokens, cost) | |
| """ | |
| logger.info(f"Processing document: {file_path}") | |
| # Reset cost tracker for this processing session | |
| cost_tracker.reset_session() | |
| # Create section extractor if not provided | |
| if section_extractor is None: | |
| from processing.sections import ReasoningSectionExtractor | |
| section_extractor = ReasoningSectionExtractor( | |
| endpoint=endpoint, | |
| api_key=api_key, | |
| api_version=api_version, | |
| deployment=deployment, | |
| ) | |
| # Process the document | |
| processor = DocumentProcessor(section_extractor=section_extractor) | |
| result = processor.process(file_path) | |
| # Get the actual removed indices from the section extractor | |
| removed_indices = [] | |
| if section_extractor: | |
| # Extract the removed indices from the LLM response | |
| extraction_result = section_extractor.llm_extractor.extract_medication_sections(result.structured_json) | |
| removed_indices = extraction_result.get("indices_to_remove", []) | |
| # Get cost summary | |
| cost_summary = cost_tracker.get_session_summary() | |
| total_input_tokens = cost_summary.get("total_tokens", 0) | |
| total_output_tokens = 0 # We'll calculate this from the breakdown | |
| total_cost = cost_summary.get("total_cost", 0.0) | |
| # Calculate output tokens from model breakdown | |
| for model_stats in cost_summary.get("model_breakdown", {}).values(): | |
| total_output_tokens += model_stats.get("output_tokens", 0) | |
| # Calculate input tokens (total - output) | |
| total_input_tokens = total_input_tokens - total_output_tokens | |
| logger.info(f"Processing complete - Input: {total_input_tokens}, Output: {total_output_tokens}, Cost: ${total_cost:.4f}") | |
| return ProcessingResult( | |
| original_document_md=result.structured_markdown, | |
| redacted_document_md=result.redacted_markdown, | |
| original_document_json=result.structured_json, | |
| original_document_yaml=result.structured_yaml, | |
| redacted_document_json=result.redacted_json, | |
| raw_text=result.raw_text, | |
| removed_indices=removed_indices, | |
| input_tokens=total_input_tokens, | |
| output_tokens=total_output_tokens, | |
| cost=total_cost | |
| ) | |
| class DocumentProcessor: | |
| """Handles parsing of documents with Docling and redacting specified sections.""" | |
| def __init__(self, section_extractor: Optional[SectionExtractor] = None): | |
| """ | |
| Initialize with an optional SectionExtractor for removing specific sections. | |
| If None, no redaction will be performed (original structure only). | |
| The Docling DocumentConverter will be initialized lazily when needed. | |
| """ | |
| self.section_extractor = section_extractor | |
| self._converter = None # Lazy initialization | |
| def converter(self): | |
| """Lazy initialization of DocumentConverter to prevent early Hugging Face Hub initialization.""" | |
| if self._converter is None: | |
| # Import here to ensure environment variables are set first | |
| from docling.document_converter import DocumentConverter | |
| logger.info("Initializing Docling DocumentConverter...") | |
| self._converter = DocumentConverter() | |
| logger.info("Docling DocumentConverter initialized successfully") | |
| return self._converter | |
| def process(self, file_path: str) -> DocumentResult: | |
| """Parse the document and optionally remove specified sections. Returns a DocumentResult.""" | |
| logger.info(f"Starting processing for file: {file_path}") | |
| start_time = time.time() | |
| # Ensure environment variables are set before processing | |
| self._ensure_cache_directories() | |
| # Convert the document using Docling | |
| conv_result = self.converter.convert(file_path) | |
| elapsed = time.time() - start_time | |
| logger.info(f"Docling conversion completed in {elapsed:.2f} seconds") | |
| # Export results from Docling | |
| structured_md = conv_result.document.export_to_markdown() | |
| structured_text = conv_result.document.export_to_text() | |
| doc_json = conv_result.document.export_to_dict() | |
| # Convert JSON to YAML for display | |
| import yaml | |
| doc_yaml = yaml.dump(doc_json, default_flow_style=False, allow_unicode=True, sort_keys=False) | |
| logger.info(f"Extracted document content (text length {len(structured_text)} characters)") | |
| # Use SectionExtractor to remove target sections if provided | |
| if self.section_extractor: | |
| # Use the new JSON-based approach for better section removal | |
| redacted_json = self.section_extractor.remove_sections_from_json(doc_json) | |
| # Convert the redacted JSON back to markdown using Docling's export method | |
| # Create a modified document structure for proper markdown export | |
| redacted_md = self._export_redacted_markdown(conv_result.document, redacted_json) | |
| logger.info("Applied section redaction to remove specified sections") | |
| else: | |
| redacted_md = structured_md # No redaction, use original | |
| redacted_json = doc_json # No redaction, use original | |
| logger.info("No section redaction applied (showing original structure)") | |
| # Persist outputs to files (JSON and redacted text) for auditing | |
| base_name = os.path.splitext(os.path.basename(file_path))[0] | |
| # Use the same temp directory as the main application | |
| temp_dir = os.environ.get('TEMP_DIR', '/tmp/docling_temp') | |
| try: | |
| os.makedirs(temp_dir, exist_ok=True) | |
| except PermissionError: | |
| # Fallback to system temp directory if we can't create in the main temp dir | |
| import tempfile | |
| temp_dir = os.path.join(tempfile.gettempdir(), "docling_temp_files") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| json_path = os.path.join(temp_dir, f"{base_name}_structured.json") | |
| redacted_path = os.path.join(temp_dir, f"{base_name}_redacted.txt") | |
| redacted_json_path = os.path.join(temp_dir, f"{base_name}_redacted.json") | |
| try: | |
| with open(json_path, "w", encoding="utf-8") as jf: | |
| json.dump(doc_json, jf, ensure_ascii=False, indent=2) | |
| with open(redacted_path, "w", encoding="utf-8") as tf: | |
| tf.write(redacted_md) | |
| with open(redacted_json_path, "w", encoding="utf-8") as jf: | |
| json.dump(redacted_json, jf, ensure_ascii=False, indent=2) | |
| logger.info(f"Saved structured JSON to {json_path}, redacted text to {redacted_path}, and redacted JSON to {redacted_json_path}") | |
| except Exception as e: | |
| logger.error(f"Error saving outputs to files: {e}") | |
| # Prepare result object | |
| result = DocumentResult( | |
| file_path=file_path, | |
| structured_markdown=structured_md, | |
| structured_json=doc_json, | |
| structured_yaml=doc_yaml, | |
| redacted_markdown=redacted_md, | |
| redacted_json=redacted_json, | |
| raw_text=structured_text # Include the raw text | |
| ) | |
| logger.info(f"Finished processing for file: {file_path}") | |
| return result | |
| def _ensure_cache_directories(self): | |
| """Ensure all necessary cache directories exist before processing.""" | |
| cache_dirs = [ | |
| os.environ.get('HF_HOME', '/tmp/docling_temp/huggingface'), | |
| os.environ.get('HF_CACHE_HOME', '/tmp/docling_temp/huggingface_cache'), | |
| os.environ.get('HF_HUB_CACHE', '/tmp/docling_temp/huggingface_cache'), | |
| os.environ.get('TRANSFORMERS_CACHE', '/tmp/docling_temp/transformers_cache'), | |
| os.environ.get('HF_DATASETS_CACHE', '/tmp/docling_temp/datasets_cache'), | |
| os.environ.get('DIFFUSERS_CACHE', '/tmp/docling_temp/diffusers_cache'), | |
| os.environ.get('ACCELERATE_CACHE', '/tmp/docling_temp/accelerate_cache'), | |
| os.environ.get('TORCH_HOME', '/tmp/docling_temp/torch'), | |
| os.environ.get('TENSORFLOW_HOME', '/tmp/docling_temp/tensorflow'), | |
| os.environ.get('KERAS_HOME', '/tmp/docling_temp/keras'), | |
| ] | |
| for cache_dir in cache_dirs: | |
| try: | |
| os.makedirs(cache_dir, exist_ok=True) | |
| logger.debug(f"Ensured cache directory exists: {cache_dir}") | |
| except Exception as e: | |
| logger.warning(f"Could not create cache directory {cache_dir}: {e}") | |
| def _export_redacted_markdown(self, document, redacted_json): | |
| """Export redacted markdown using Docling's Document class for proper formatting.""" | |
| try: | |
| # Try different possible import paths for Docling Document class | |
| try: | |
| from docling.document import Document | |
| except ImportError: | |
| try: | |
| from docling import Document | |
| except ImportError: | |
| try: | |
| from docling.core import Document | |
| except ImportError: | |
| # If all imports fail, use the fallback method | |
| logger.warning("Could not import Docling Document class from any known location") | |
| raise ImportError("Docling Document class not found") | |
| # Create a new Document from the redacted JSON | |
| redacted_document = Document.from_dict(redacted_json) | |
| # Use Docling's export method for proper markdown formatting | |
| redacted_md = redacted_document.export_to_markdown() | |
| logger.info("Successfully generated redacted markdown using Docling Document class") | |
| return redacted_md | |
| except Exception as e: | |
| logger.warning(f"Failed to create Docling Document from redacted JSON: {e}") | |
| logger.info("Falling back to manual JSON-to-markdown conversion") | |
| # Fallback to the old method if Docling Document creation fails | |
| return self._json_to_markdown(redacted_json) | |
| def generate_redacted_pdf(self, redacted_json: dict, output_path: str) -> bool: | |
| """ | |
| Generate a redacted PDF from the redacted JSON structure. | |
| Args: | |
| redacted_json: The redacted document JSON structure | |
| output_path: Path where the PDF should be saved | |
| Returns: | |
| bool: True if PDF generation was successful, False otherwise | |
| """ | |
| try: | |
| # Import required libraries | |
| from reportlab.lib.pagesizes import letter, A4 | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| import io | |
| logger.info(f"Generating redacted PDF: {output_path}") | |
| # Create PDF document | |
| doc = SimpleDocTemplate(output_path, pagesize=A4) | |
| story = [] | |
| # Get styles | |
| styles = getSampleStyleSheet() | |
| normal_style = styles['Normal'] | |
| heading_style = styles['Heading1'] | |
| # Create custom styles for better formatting | |
| table_style = ParagraphStyle( | |
| 'TableStyle', | |
| parent=normal_style, | |
| fontName='Courier', | |
| fontSize=9, | |
| spaceAfter=6 | |
| ) | |
| # Process text elements from JSON | |
| texts = redacted_json.get("texts", []) | |
| # Group consecutive table-like elements together | |
| i = 0 | |
| while i < len(texts): | |
| text_elem = texts[i] | |
| text_content = text_elem.get("text", "").strip() | |
| label = text_elem.get("label", "") | |
| level = text_elem.get("level", 0) | |
| if not text_content: | |
| i += 1 | |
| continue | |
| # Handle different content types | |
| if label == "section_header": | |
| # Create header with appropriate level | |
| if level == 1: | |
| story.append(Paragraph(text_content, heading_style)) | |
| else: | |
| # Create sub-heading style | |
| sub_heading_style = ParagraphStyle( | |
| f'Heading{min(level, 3)}', | |
| parent=normal_style, | |
| fontSize=14 - level, | |
| spaceAfter=12, | |
| spaceBefore=12, | |
| textColor=colors.darkblue | |
| ) | |
| story.append(Paragraph(text_content, sub_heading_style)) | |
| elif label == "list_item": | |
| # Handle list items | |
| marker = text_elem.get("marker", "•") | |
| list_text = f"{marker} {text_content}" | |
| story.append(Paragraph(list_text, normal_style)) | |
| elif '|' in text_content and text_content.count('|') > 1: | |
| # Handle table-like content - collect consecutive table rows | |
| table_rows = [] | |
| # Add the current row | |
| cells = [cell.strip() for cell in text_content.split('|') if cell.strip()] | |
| if cells: | |
| table_rows.append(cells) | |
| # Look ahead for consecutive table rows | |
| j = i + 1 | |
| while j < len(texts): | |
| next_text = texts[j].get("text", "").strip() | |
| if '|' in next_text and next_text.count('|') > 1: | |
| next_cells = [cell.strip() for cell in next_text.split('|') if cell.strip()] | |
| if next_cells: | |
| table_rows.append(next_cells) | |
| j += 1 | |
| else: | |
| break | |
| # Create table if we have rows | |
| if table_rows: | |
| table = Table(table_rows) | |
| table.setStyle(TableStyle([ | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Courier'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 3), | |
| ('TOPPADDING', (0, 0), (-1, -1), 3), | |
| ('GRID', (0, 0), (-1, -1), 0.5, colors.grey), | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey), # Header row | |
| ])) | |
| story.append(table) | |
| story.append(Spacer(1, 6)) | |
| # Skip the rows we've already processed | |
| i = j - 1 | |
| else: | |
| # Single row or no valid cells | |
| story.append(Paragraph(text_content, table_style)) | |
| else: | |
| # Regular text content | |
| story.append(Paragraph(text_content, normal_style)) | |
| # Add small spacing between elements | |
| story.append(Spacer(1, 3)) | |
| i += 1 | |
| # Build PDF | |
| doc.build(story) | |
| logger.info(f"Successfully generated redacted PDF: {output_path}") | |
| return True | |
| except ImportError as e: | |
| logger.error(f"Required PDF generation libraries not available: {e}") | |
| logger.info("Install reportlab with: pip install reportlab") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error generating redacted PDF: {e}") | |
| return False | |
| def _json_to_markdown(self, json_data: dict) -> str: | |
| """Convert JSON document structure back to markdown format using Docling's structure.""" | |
| markdown_lines = [] | |
| # Get all text elements from the JSON | |
| texts = json_data.get("texts", []) | |
| for text_elem in texts: | |
| text_content = text_elem.get("text", "") | |
| label = text_elem.get("label", "") | |
| level = text_elem.get("level", 0) | |
| if not text_content.strip(): | |
| continue | |
| # Format based on the label and level (following Docling's structure) | |
| if label == "section_header": | |
| # Add appropriate markdown headers | |
| if level == 1: | |
| markdown_lines.append(f"# {text_content}") | |
| elif level == 2: | |
| markdown_lines.append(f"## {text_content}") | |
| elif level == 3: | |
| markdown_lines.append(f"### {text_content}") | |
| else: | |
| markdown_lines.append(f"#### {text_content}") | |
| elif label == "list_item": | |
| # Handle list items - preserve the original marker | |
| marker = text_elem.get("marker", "-") | |
| markdown_lines.append(f"{marker} {text_content}") | |
| elif label == "text": | |
| # Regular text content - preserve as-is | |
| markdown_lines.append(text_content) | |
| else: | |
| # Default to regular text | |
| markdown_lines.append(text_content) | |
| # Join without extra spacing to match Docling's formatting | |
| return "\n".join(markdown_lines) | |