| |
| |
| |
|
|
| import os |
| import time |
| import uuid |
| from dataclasses import dataclass, field |
| from typing import List, Dict, Any |
|
|
| from datetime import datetime |
|
|
| import numpy as np |
| import torch |
| from sentence_transformers import SentenceTransformer |
| from fastapi import FastAPI |
| from pydantic import BaseModel |
|
|
|
|
| |
| |
| |
|
|
| PHI_43 = 22.93606797749979 |
| SYSTEM_ID = "QUANTARION-HYPERGRAPH-RAG-PROD" |
|
|
|
|
| |
| |
| |
|
|
| LOG_DIR = os.path.join(os.getcwd(), "Logs") |
| os.makedirs(LOG_DIR, exist_ok=True) |
| LOG_PATH = os.path.join(LOG_DIR, ".text") |
|
|
| def log_line(msg: str) -> None: |
| ts = datetime.utcnow().isoformat() |
| line = f"[{ts}] [{SYSTEM_ID}] {msg}" |
| print(line) |
| try: |
| with open(LOG_PATH, "a", encoding="utf-8") as f: |
| f.write(line + " |
| ") |
| except Exception: |
| |
| pass |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class Hyperedge: |
| id: str |
| vertices: List[str] |
| weight: float |
| meta: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class Hypergraph: |
| vertices: List[str] |
| hyperedges: List[Hyperedge] |
|
|
|
|
| class QueryRequest(BaseModel): |
| query: str |
| top_k: int = 5 |
|
|
|
|
| class QueryResponse(BaseModel): |
| query_id: str |
| query: str |
| selected_hyperedges: List[Dict[str, Any]] |
| answer: str |
| phi43_check: float |
| latency_ms: float |
|
|
|
|
| |
| |
| |
|
|
| class HypergraphRAGEngine: |
| """ |
| Production-grade Hypergraph RAG: |
| - Embeddings via SentenceTransformer |
| - Hyperedges = n-ary concept relations |
| - Retrieval = minimal hyperedge cover approximation |
| - φ⁴³ used as a numeric regularizer for scoring/stability |
| """ |
|
|
| def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): |
| log_line("Initializing HypergraphRAGEngine…") |
| self.model_name = model_name |
| self.embedder = SentenceTransformer(model_name) |
| self.hypergraph: Hypergraph = Hypergraph(vertices=[], hyperedges=[]) |
| self.vertex_embeddings: Dict[str, np.ndarray] = {} |
| self.ready = False |
|
|
| |
|
|
| def build_from_documents(self, docs: List[Dict[str, Any]]) -> None: |
| """ |
| docs: list of {"id": str, "text": str, "entities": [str,...]} |
| entities = extracted or annotated concept ids/names. |
| """ |
| log_line(f"Building hypergraph from {len(docs)} documents…") |
|
|
| vertices_set = set() |
| hyperedges: List[Hyperedge] = [] |
|
|
| |
| for d in docs: |
| for ent in d.get("entities", []): |
| vertices_set.add(ent) |
|
|
| vertices = sorted(list(vertices_set)) |
|
|
| |
| if vertices: |
| log_line(f"Embedding {len(vertices)} vertices…") |
| embs = self.embedder.encode(vertices, normalize_embeddings=True) |
| self.vertex_embeddings = { |
| v: embs[i] for i, v in enumerate(vertices) |
| } |
|
|
| |
| for d in docs: |
| ents = list(set(d.get("entities", []))) |
| if len(ents) < 2: |
| continue |
|
|
| he_id = str(uuid.uuid4()) |
| he = Hyperedge( |
| id=he_id, |
| vertices=ents, |
| weight=1.0, |
| meta={ |
| "doc_id": d["id"], |
| "text": d["text"], |
| }, |
| ) |
| hyperedges.append(he) |
|
|
| self.hypergraph = Hypergraph(vertices=vertices, hyperedges=hyperedges) |
| self.ready = True |
| log_line( |
| f"Hypergraph built: |V|={len(self.hypergraph.vertices)}, |E|={len(self.hypergraph.hyperedges)}" |
| ) |
|
|
| |
|
|
| def _query_embedding(self, query: str) -> np.ndarray: |
| return self.embedder.encode([query], normalize_embeddings=True)[0] |
|
|
| def _hyperedge_score(self, query_emb: np.ndarray, he: Hyperedge) -> float: |
| |
| sims = [] |
| for v in he.vertices: |
| ve = self.vertex_embeddings.get(v) |
| if ve is not None: |
| sims.append(float(np.dot(query_emb, ve))) |
| if not sims: |
| base = 0.0 |
| else: |
| base = float(np.mean(sims)) |
| |
| reg = (base + 1.0) / 2.0 |
| return float(base + 0.01 * (PHI_43 / 23.0) * reg) |
|
|
| def retrieve_hyperedges(self, query: str, top_k: int = 5) -> List[Hyperedge]: |
| if not self.ready or not self.hypergraph.hyperedges: |
| return [] |
|
|
| q_emb = self._query_embedding(query) |
| scored = [] |
| for he in self.hypergraph.hyperedges: |
| s = self._hyperedge_score(q_emb, he) |
| scored.append((s, he)) |
|
|
| scored.sort(key=lambda x: x[0], reverse=True) |
| return [he for _, he in scored[:top_k]] |
|
|
| |
|
|
| def generate_answer(self, query: str, hyperedges: List[Hyperedge]) -> str: |
| """ |
| In production, this would call QVNN/LLM with retrieved context. |
| Here we produce a concise, deterministic executive-style answer. |
| """ |
| if not hyperedges: |
| return ( |
| "No sufficient hypergraph context was found for this query in the " |
| "current Quantarion Hypergraph-RAG index." |
| ) |
|
|
| docs = [he.meta.get("text", "") for he in hyperedges] |
| docs = [d for d in docs if d.strip()] |
| snippet = " ".join(docs)[:800] |
|
|
| return ( |
| "Executive hypergraph-grounded summary: |
| " |
| f"- Query: {query} |
| " |
| f"- Top hyperedges: {len(hyperedges)} |
| " |
| f"- Condensed context: {snippet} |
| " |
| "This answer is generated by selecting a minimal set of " |
| "multi-entity hyperedges that best align with the query, " |
| "using φ⁴³-regularized similarity scoring." |
| ) |
|
|
| |
|
|
| def phi43_check(self, hyperedges: List[Hyperedge]) -> float: |
| """ |
| Simple φ-check: scale count of hyperedges into [0,1] vs PHI_43. |
| """ |
| if not hyperedges: |
| return 0.0 |
| val = len(hyperedges) / PHI_43 |
| return float(max(0.0, min(1.0, val))) |
|
|
|
|
| |
| |
| |
|
|
| app = FastAPI(title="Quantarion Hypergraph-RAG Production API") |
|
|
| engine = HypergraphRAGEngine() |
|
|
|
|
| @app.on_event("startup") |
| def _startup(): |
| |
| log_line("Startup: building demo hypergraph index…") |
| demo_docs = [ |
| { |
| "id": "doc1", |
| "text": "Neuromorphic SNNs provide event-driven, low-power computation.", |
| "entities": ["neuromorphic", "SNN", "event-driven"], |
| }, |
| { |
| "id": "doc2", |
| "text": "Hypergraph RAG uses hyperedges to capture multi-entity relations.", |
| "entities": ["hypergraph", "RAG", "multi-entity"], |
| }, |
| { |
| "id": "doc3", |
| "text": "Hybrid retrieval combines dense, sparse, and graph-based signals.", |
| "entities": ["hybrid retrieval", "dense", "sparse", "graph"], |
| }, |
| ] |
| engine.build_from_documents(demo_docs) |
| log_line("Startup: Hypergraph-RAG demo index ready.") |
|
|
|
|
| @app.post("/query", response_model=QueryResponse) |
| def query_hypergraph_rag(req: QueryRequest): |
| t0 = time.time() |
| qid = str(uuid.uuid4()) |
| log_line(f"QUERY {qid} | {req.query}") |
|
|
| selected = engine.retrieve_hyperedges(req.query, top_k=req.top_k) |
| answer = engine.generate_answer(req.query, selected) |
| phi_val = engine.phi43_check(selected) |
| latency = (time.time() - t0) * 1000.0 |
|
|
| log_line( |
| f"QUERY {qid} | hyperedges={len(selected)} | phi43_check={phi_val:.3f} | latency_ms={latency:.1f}" |
| ) |
|
|
| return QueryResponse( |
| query_id=qid, |
| query=req.query, |
| selected_hyperedges=[ |
| { |
| "id": he.id, |
| "vertices": he.vertices, |
| "weight": he.weight, |
| "meta": he.meta, |
| } |
| for he in selected |
| ], |
| answer=answer, |
| phi43_check=phi_val, |
| latency_ms=latency, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| log_line("Starting Quantarion Hypergraph-RAG Production server on 0.0.0.0:8000…") |
| uvicorn.run(app, host="0.0.0.0", port=8000) |