| """ |
| Evidence Verifier |
| |
| Verifies that claims are supported by document evidence. |
| Cross-references extracted information with source documents. |
| """ |
|
|
| from typing import List, Optional, Dict, Any, Tuple |
| from enum import Enum |
| from pydantic import BaseModel, Field |
| from loguru import logger |
| import re |
|
|
|
|
| class EvidenceStrength(str, Enum): |
| """Evidence strength levels.""" |
| STRONG = "strong" |
| MODERATE = "moderate" |
| WEAK = "weak" |
| NONE = "none" |
|
|
|
|
| class VerifierConfig(BaseModel): |
| """Configuration for evidence verifier.""" |
| |
| fuzzy_match: bool = Field(default=True, description="Enable fuzzy matching") |
| case_sensitive: bool = Field(default=False, description="Case-sensitive matching") |
| min_match_ratio: float = Field( |
| default=0.6, |
| ge=0.0, |
| le=1.0, |
| description="Minimum match ratio for fuzzy matching" |
| ) |
|
|
| |
| strong_threshold: float = Field(default=0.9, ge=0.0, le=1.0) |
| moderate_threshold: float = Field(default=0.7, ge=0.0, le=1.0) |
| weak_threshold: float = Field(default=0.5, ge=0.0, le=1.0) |
|
|
| |
| max_evidence_per_claim: int = Field(default=5, ge=1) |
| context_window: int = Field(default=100, description="Characters around match") |
|
|
|
|
| class EvidenceMatch(BaseModel): |
| """A match between claim and evidence.""" |
| evidence_text: str |
| match_score: float |
| strength: EvidenceStrength |
|
|
| |
| chunk_id: Optional[str] = None |
| page: Optional[int] = None |
| position: Optional[int] = None |
|
|
| |
| context_before: Optional[str] = None |
| context_after: Optional[str] = None |
|
|
|
|
| class VerificationResult(BaseModel): |
| """Result of evidence verification.""" |
| claim: str |
| verified: bool |
| strength: EvidenceStrength |
| confidence: float |
|
|
| |
| evidence_matches: List[EvidenceMatch] |
| best_match: Optional[EvidenceMatch] = None |
|
|
| |
| coverage_score: float |
| contradiction_found: bool = False |
| notes: Optional[str] = None |
|
|
|
|
| class EvidenceVerifier: |
| """ |
| Verifies claims against document evidence. |
| |
| Features: |
| - Text matching (exact and fuzzy) |
| - Evidence strength scoring |
| - Contradiction detection |
| - Context extraction |
| """ |
|
|
| def __init__(self, config: Optional[VerifierConfig] = None): |
| """Initialize evidence verifier.""" |
| self.config = config or VerifierConfig() |
|
|
| def verify_claim( |
| self, |
| claim: str, |
| evidence_chunks: List[Dict[str, Any]], |
| ) -> VerificationResult: |
| """ |
| Verify a claim against evidence. |
| |
| Args: |
| claim: The claim to verify |
| evidence_chunks: List of evidence chunks with text |
| |
| Returns: |
| VerificationResult |
| """ |
| if not claim or not evidence_chunks: |
| return VerificationResult( |
| claim=claim, |
| verified=False, |
| strength=EvidenceStrength.NONE, |
| confidence=0.0, |
| evidence_matches=[], |
| coverage_score=0.0, |
| ) |
|
|
| |
| matches = [] |
| for chunk in evidence_chunks: |
| chunk_text = chunk.get("text", "") |
| if not chunk_text: |
| continue |
|
|
| chunk_matches = self._find_matches(claim, chunk_text, chunk) |
| matches.extend(chunk_matches) |
|
|
| |
| matches.sort(key=lambda m: m.match_score, reverse=True) |
| top_matches = matches[:self.config.max_evidence_per_claim] |
|
|
| |
| if top_matches: |
| best_match = top_matches[0] |
| overall_strength = best_match.strength |
| confidence = best_match.match_score |
| coverage_score = self._calculate_coverage(claim, top_matches) |
| else: |
| best_match = None |
| overall_strength = EvidenceStrength.NONE |
| confidence = 0.0 |
| coverage_score = 0.0 |
|
|
| |
| verified = ( |
| overall_strength in [EvidenceStrength.STRONG, EvidenceStrength.MODERATE] |
| and confidence >= self.config.moderate_threshold |
| ) |
|
|
| |
| contradiction_found = self._check_contradictions(claim, evidence_chunks) |
|
|
| return VerificationResult( |
| claim=claim, |
| verified=verified and not contradiction_found, |
| strength=overall_strength, |
| confidence=confidence, |
| evidence_matches=top_matches, |
| best_match=best_match, |
| coverage_score=coverage_score, |
| contradiction_found=contradiction_found, |
| ) |
|
|
| def verify_multiple( |
| self, |
| claims: List[str], |
| evidence_chunks: List[Dict[str, Any]], |
| ) -> List[VerificationResult]: |
| """ |
| Verify multiple claims against evidence. |
| |
| Args: |
| claims: List of claims to verify |
| evidence_chunks: Evidence chunks |
| |
| Returns: |
| List of VerificationResult |
| """ |
| return [self.verify_claim(claim, evidence_chunks) for claim in claims] |
|
|
| def verify_extraction( |
| self, |
| extraction: Dict[str, Any], |
| evidence_chunks: List[Dict[str, Any]], |
| ) -> Dict[str, VerificationResult]: |
| """ |
| Verify extracted fields as claims. |
| |
| Args: |
| extraction: Dictionary of field -> value |
| evidence_chunks: Evidence chunks |
| |
| Returns: |
| Dictionary of field -> VerificationResult |
| """ |
| results = {} |
|
|
| for field, value in extraction.items(): |
| if value is None: |
| continue |
|
|
| |
| claim = f"{field}: {value}" |
| results[field] = self.verify_claim(claim, evidence_chunks) |
|
|
| return results |
|
|
| def _find_matches( |
| self, |
| claim: str, |
| text: str, |
| chunk: Dict[str, Any], |
| ) -> List[EvidenceMatch]: |
| """Find matches for claim in text.""" |
| matches = [] |
|
|
| |
| claim_normalized = claim.lower() if not self.config.case_sensitive else claim |
| text_normalized = text.lower() if not self.config.case_sensitive else text |
|
|
| |
| terms = self._extract_terms(claim_normalized) |
|
|
| |
| if claim_normalized in text_normalized: |
| pos = text_normalized.find(claim_normalized) |
| match = self._create_match( |
| text, pos, len(claim), chunk, |
| score=1.0, strength=EvidenceStrength.STRONG |
| ) |
| matches.append(match) |
|
|
| |
| term_scores = [] |
| for term in terms: |
| if term in text_normalized: |
| pos = text_normalized.find(term) |
| term_scores.append((term, pos, 1.0)) |
| elif self.config.fuzzy_match: |
| |
| fuzzy_score, fuzzy_pos = self._fuzzy_find(term, text_normalized) |
| if fuzzy_score >= self.config.min_match_ratio: |
| term_scores.append((term, fuzzy_pos, fuzzy_score)) |
|
|
| if term_scores: |
| |
| avg_score = sum(s[2] for s in term_scores) / len(terms) if terms else 0 |
| coverage = len(term_scores) / len(terms) if terms else 0 |
| combined_score = (avg_score * 0.7) + (coverage * 0.3) |
|
|
| |
| if combined_score >= self.config.strong_threshold: |
| strength = EvidenceStrength.STRONG |
| elif combined_score >= self.config.moderate_threshold: |
| strength = EvidenceStrength.MODERATE |
| elif combined_score >= self.config.weak_threshold: |
| strength = EvidenceStrength.WEAK |
| else: |
| strength = EvidenceStrength.NONE |
|
|
| |
| if strength != EvidenceStrength.NONE: |
| best_term = max(term_scores, key=lambda t: t[2]) |
| match = self._create_match( |
| text, best_term[1], len(best_term[0]), chunk, |
| score=combined_score, strength=strength |
| ) |
| matches.append(match) |
|
|
| return matches |
|
|
| def _create_match( |
| self, |
| text: str, |
| position: int, |
| length: int, |
| chunk: Dict[str, Any], |
| score: float, |
| strength: EvidenceStrength, |
| ) -> EvidenceMatch: |
| """Create an evidence match with context.""" |
| |
| window = self.config.context_window |
| start = max(0, position - window) |
| end = min(len(text), position + length + window) |
|
|
| context_before = text[start:position] if position > 0 else "" |
| evidence_text = text[position:position + length] |
| context_after = text[position + length:end] if position + length < len(text) else "" |
|
|
| return EvidenceMatch( |
| evidence_text=evidence_text, |
| match_score=score, |
| strength=strength, |
| chunk_id=chunk.get("chunk_id"), |
| page=chunk.get("page"), |
| position=position, |
| context_before=context_before[-50:] if context_before else None, |
| context_after=context_after[:50] if context_after else None, |
| ) |
|
|
| def _extract_terms(self, text: str) -> List[str]: |
| """Extract key terms from text.""" |
| |
| stop_words = { |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", |
| "being", "have", "has", "had", "do", "does", "did", "will", |
| "would", "could", "should", "may", "might", "must", "shall", |
| "can", "need", "dare", "ought", "used", "to", "of", "in", |
| "for", "on", "with", "at", "by", "from", "as", "into", "through", |
| "during", "before", "after", "above", "below", "between", |
| "and", "but", "if", "or", "because", "until", "while", |
| } |
|
|
| |
| words = re.findall(r'\b\w+\b', text.lower()) |
|
|
| |
| terms = [w for w in words if w not in stop_words and len(w) > 2] |
|
|
| return terms |
|
|
| def _fuzzy_find(self, term: str, text: str) -> Tuple[float, int]: |
| """Find term in text with fuzzy matching.""" |
| |
| best_score = 0.0 |
| best_pos = 0 |
|
|
| term_len = len(term) |
| for i in range(len(text) - term_len + 1): |
| window = text[i:i + term_len] |
| |
| matches = sum(1 for a, b in zip(term, window) if a == b) |
| score = matches / term_len |
|
|
| if score > best_score: |
| best_score = score |
| best_pos = i |
|
|
| return best_score, best_pos |
|
|
| def _calculate_coverage( |
| self, |
| claim: str, |
| matches: List[EvidenceMatch], |
| ) -> float: |
| """Calculate how much of the claim is covered by evidence.""" |
| claim_terms = set(self._extract_terms(claim.lower())) |
| if not claim_terms: |
| return 0.0 |
|
|
| covered_terms = set() |
| for match in matches: |
| match_terms = set(self._extract_terms(match.evidence_text.lower())) |
| covered_terms.update(match_terms.intersection(claim_terms)) |
|
|
| return len(covered_terms) / len(claim_terms) |
|
|
| def _check_contradictions( |
| self, |
| claim: str, |
| evidence_chunks: List[Dict[str, Any]], |
| ) -> bool: |
| """Check if evidence contains contradictions to the claim.""" |
| |
| negation_patterns = [ |
| r'\bnot\b', r'\bno\b', r'\bnever\b', r'\bnone\b', |
| r'\bwithout\b', r'\bfailed\b', r'\bdenied\b', |
| ] |
|
|
| claim_lower = claim.lower() |
| claim_terms = set(self._extract_terms(claim_lower)) |
|
|
| for chunk in evidence_chunks: |
| text = chunk.get("text", "").lower() |
|
|
| |
| for term in claim_terms: |
| if term in text: |
| |
| for pattern in negation_patterns: |
| matches = list(re.finditer(pattern, text)) |
| for match in matches: |
| |
| term_pos = text.find(term) |
| if abs(match.start() - term_pos) < 30: |
| return True |
|
|
| return False |
|
|
|
|
| |
| _evidence_verifier: Optional[EvidenceVerifier] = None |
|
|
|
|
| def get_evidence_verifier( |
| config: Optional[VerifierConfig] = None, |
| ) -> EvidenceVerifier: |
| """Get or create singleton evidence verifier.""" |
| global _evidence_verifier |
| if _evidence_verifier is None: |
| _evidence_verifier = EvidenceVerifier(config) |
| return _evidence_verifier |
|
|
|
|
| def reset_evidence_verifier(): |
| """Reset the global verifier instance.""" |
| global _evidence_verifier |
| _evidence_verifier = None |
|
|