| """ |
| Semantic Chunking Utilities |
| |
| Strategies for splitting and merging document content |
| into semantically meaningful chunks. |
| """ |
|
|
| import re |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from ..chunks.models import ( |
| BoundingBox, |
| ChunkType, |
| DocumentChunk, |
| ) |
|
|
|
|
| @dataclass |
| class ChunkingConfig: |
| """Configuration for semantic chunking.""" |
|
|
| |
| min_chunk_chars: int = 50 |
| max_chunk_chars: int = 2000 |
| target_chunk_chars: int = 500 |
|
|
| |
| overlap_chars: int = 100 |
|
|
| |
| split_on_headings: bool = True |
| split_on_paragraphs: bool = True |
| preserve_sentences: bool = True |
|
|
| |
| merge_small_chunks: bool = True |
| merge_threshold_chars: int = 100 |
|
|
|
|
| class SemanticChunker: |
| """ |
| Semantic chunking engine. |
| |
| Splits text into meaningful chunks based on document structure, |
| headings, paragraphs, and sentence boundaries. |
| """ |
|
|
| |
| HEADING_PATTERN = re.compile(r'^(?:#{1,6}\s+|[A-Z0-9][\.\)]\s+|\d+[\.\)]\s+)', re.MULTILINE) |
| PARAGRAPH_PATTERN = re.compile(r'\n\s*\n') |
| SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') |
|
|
| def __init__(self, config: Optional[ChunkingConfig] = None): |
| self.config = config or ChunkingConfig() |
|
|
| def chunk_text( |
| self, |
| text: str, |
| metadata: Optional[Dict[str, Any]] = None, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Split text into semantic chunks. |
| |
| Args: |
| text: Input text to chunk |
| metadata: Optional metadata to include with each chunk |
| |
| Returns: |
| List of chunk dictionaries with text and metadata |
| """ |
| if not text or not text.strip(): |
| return [] |
|
|
| metadata = metadata or {} |
| chunks: List[Dict[str, Any]] = [] |
|
|
| |
| if self.config.split_on_headings: |
| sections = self._split_by_headings(text) |
| else: |
| sections = [{"heading": None, "text": text}] |
|
|
| for section in sections: |
| section_chunks = self._chunk_section( |
| section["text"], |
| section.get("heading"), |
| ) |
| for chunk_text in section_chunks: |
| if len(chunk_text.strip()) >= self.config.min_chunk_chars: |
| chunks.append({ |
| "text": chunk_text.strip(), |
| "heading": section.get("heading"), |
| **metadata, |
| }) |
|
|
| |
| if self.config.merge_small_chunks: |
| chunks = self._merge_small_chunks(chunks) |
|
|
| return chunks |
|
|
| def _split_by_headings(self, text: str) -> List[Dict[str, Any]]: |
| """Split text by heading patterns.""" |
| sections = [] |
| current_heading = None |
| current_text = [] |
|
|
| lines = text.split("\n") |
|
|
| for line in lines: |
| if self.HEADING_PATTERN.match(line): |
| |
| if current_text: |
| sections.append({ |
| "heading": current_heading, |
| "text": "\n".join(current_text), |
| }) |
| current_heading = line.strip() |
| current_text = [] |
| else: |
| current_text.append(line) |
|
|
| |
| if current_text: |
| sections.append({ |
| "heading": current_heading, |
| "text": "\n".join(current_text), |
| }) |
|
|
| return sections if sections else [{"heading": None, "text": text}] |
|
|
| def _chunk_section( |
| self, |
| text: str, |
| heading: Optional[str], |
| ) -> List[str]: |
| """Chunk a single section.""" |
| if len(text) <= self.config.max_chunk_chars: |
| return [text] |
|
|
| |
| if self.config.split_on_paragraphs: |
| paragraphs = self.PARAGRAPH_PATTERN.split(text) |
| else: |
| paragraphs = [text] |
|
|
| chunks = [] |
| current_chunk = "" |
|
|
| for para in paragraphs: |
| para = para.strip() |
| if not para: |
| continue |
|
|
| |
| if len(current_chunk) + len(para) + 1 <= self.config.target_chunk_chars: |
| if current_chunk: |
| current_chunk += "\n\n" + para |
| else: |
| current_chunk = para |
| else: |
| |
| if current_chunk: |
| chunks.append(current_chunk) |
|
|
| |
| if len(para) > self.config.max_chunk_chars: |
| sub_chunks = self._split_long_text(para) |
| chunks.extend(sub_chunks[:-1]) |
| current_chunk = sub_chunks[-1] if sub_chunks else "" |
| else: |
| current_chunk = para |
|
|
| if current_chunk: |
| chunks.append(current_chunk) |
|
|
| return chunks |
|
|
| def _split_long_text(self, text: str) -> List[str]: |
| """Split long text by sentences.""" |
| if not self.config.preserve_sentences: |
| |
| return self._split_by_chars(text) |
|
|
| sentences = self.SENTENCE_PATTERN.split(text) |
| chunks = [] |
| current_chunk = "" |
|
|
| for sentence in sentences: |
| sentence = sentence.strip() |
| if not sentence: |
| continue |
|
|
| if len(current_chunk) + len(sentence) + 1 <= self.config.target_chunk_chars: |
| if current_chunk: |
| current_chunk += " " + sentence |
| else: |
| current_chunk = sentence |
| else: |
| if current_chunk: |
| chunks.append(current_chunk) |
|
|
| if len(sentence) > self.config.max_chunk_chars: |
| |
| sub_chunks = self._split_by_chars(sentence) |
| chunks.extend(sub_chunks[:-1]) |
| current_chunk = sub_chunks[-1] if sub_chunks else "" |
| else: |
| current_chunk = sentence |
|
|
| if current_chunk: |
| chunks.append(current_chunk) |
|
|
| return chunks |
|
|
| def _split_by_chars(self, text: str) -> List[str]: |
| """Split text by character count with overlap.""" |
| chunks = [] |
| start = 0 |
| text_len = len(text) |
|
|
| while start < text_len: |
| end = min(start + self.config.target_chunk_chars, text_len) |
|
|
| |
| if end < text_len: |
| |
| space_idx = text.rfind(" ", start, end) |
| if space_idx > start: |
| end = space_idx |
|
|
| chunks.append(text[start:end].strip()) |
|
|
| |
| start = end - self.config.overlap_chars |
| if start < 0 or start >= text_len: |
| break |
|
|
| return chunks |
|
|
| def _merge_small_chunks( |
| self, |
| chunks: List[Dict[str, Any]], |
| ) -> List[Dict[str, Any]]: |
| """Merge chunks smaller than threshold.""" |
| if not chunks: |
| return chunks |
|
|
| merged = [] |
| current = None |
|
|
| for chunk in chunks: |
| text = chunk["text"] |
|
|
| if current is None: |
| current = chunk.copy() |
| continue |
|
|
| |
| current_len = len(current["text"]) |
| new_len = len(text) |
|
|
| if (current_len < self.config.merge_threshold_chars and |
| current_len + new_len <= self.config.max_chunk_chars and |
| current.get("heading") == chunk.get("heading")): |
| |
| current["text"] = current["text"] + "\n\n" + text |
| else: |
| merged.append(current) |
| current = chunk.copy() |
|
|
| if current: |
| merged.append(current) |
|
|
| return merged |
|
|
|
|
| class DocumentChunkBuilder: |
| """ |
| Builder for creating DocumentChunk objects. |
| |
| Provides a fluent interface for chunk construction with |
| automatic ID generation and validation. |
| """ |
|
|
| def __init__( |
| self, |
| doc_id: str, |
| page: int, |
| ): |
| self.doc_id = doc_id |
| self.page = page |
| self._chunks: List[DocumentChunk] = [] |
| self._sequence_index = 0 |
|
|
| def add_chunk( |
| self, |
| text: str, |
| chunk_type: ChunkType, |
| bbox: BoundingBox, |
| confidence: float = 1.0, |
| metadata: Optional[Dict[str, Any]] = None, |
| ) -> "DocumentChunkBuilder": |
| """Add a chunk.""" |
| chunk_id = DocumentChunk.generate_chunk_id( |
| doc_id=self.doc_id, |
| page=self.page, |
| bbox=bbox, |
| chunk_type_str=chunk_type.value, |
| ) |
|
|
| chunk = DocumentChunk( |
| chunk_id=chunk_id, |
| doc_id=self.doc_id, |
| chunk_type=chunk_type, |
| text=text, |
| page=self.page, |
| bbox=bbox, |
| confidence=confidence, |
| sequence_index=self._sequence_index, |
| metadata=metadata or {}, |
| ) |
|
|
| self._chunks.append(chunk) |
| self._sequence_index += 1 |
| return self |
|
|
| def add_text( |
| self, |
| text: str, |
| bbox: BoundingBox, |
| confidence: float = 1.0, |
| ) -> "DocumentChunkBuilder": |
| """Add a text chunk.""" |
| return self.add_chunk(text, ChunkType.TEXT, bbox, confidence) |
|
|
| def add_title( |
| self, |
| text: str, |
| bbox: BoundingBox, |
| confidence: float = 1.0, |
| ) -> "DocumentChunkBuilder": |
| """Add a title chunk.""" |
| return self.add_chunk(text, ChunkType.TITLE, bbox, confidence) |
|
|
| def add_heading( |
| self, |
| text: str, |
| bbox: BoundingBox, |
| confidence: float = 1.0, |
| ) -> "DocumentChunkBuilder": |
| """Add a heading chunk.""" |
| return self.add_chunk(text, ChunkType.HEADING, bbox, confidence) |
|
|
| def add_paragraph( |
| self, |
| text: str, |
| bbox: BoundingBox, |
| confidence: float = 1.0, |
| ) -> "DocumentChunkBuilder": |
| """Add a paragraph chunk.""" |
| return self.add_chunk(text, ChunkType.PARAGRAPH, bbox, confidence) |
|
|
| def build(self) -> List[DocumentChunk]: |
| """Build and return the list of chunks.""" |
| return self._chunks.copy() |
|
|
| def reset(self) -> "DocumentChunkBuilder": |
| """Reset the builder.""" |
| self._chunks = [] |
| self._sequence_index = 0 |
| return self |
|
|
|
|
| def estimate_tokens(text: str) -> int: |
| """ |
| Estimate token count for text. |
| |
| Uses simple heuristic: ~4 characters per token. |
| """ |
| return len(text) // 4 |
|
|
|
|
| def split_for_embedding( |
| text: str, |
| max_tokens: int = 512, |
| overlap_tokens: int = 50, |
| ) -> List[str]: |
| """ |
| Split text for embedding model input. |
| |
| Args: |
| text: Text to split |
| max_tokens: Maximum tokens per chunk |
| overlap_tokens: Overlap between chunks |
| |
| Returns: |
| List of text chunks |
| """ |
| max_chars = max_tokens * 4 |
| overlap_chars = overlap_tokens * 4 |
|
|
| config = ChunkingConfig( |
| max_chunk_chars=max_chars, |
| target_chunk_chars=max_chars - 100, |
| overlap_chars=overlap_chars, |
| ) |
|
|
| chunker = SemanticChunker(config) |
| chunks = chunker.chunk_text(text) |
|
|
| return [c["text"] for c in chunks] |
|
|