import os import hashlib from fastapi import FastAPI, HTTPException from pydantic import BaseModel from neo4j import GraphDatabase import google.generativeai as genai # ================================================================== # TIMEZERO: Ephemeral Trace Backend Blueprint # Connects to Neo4j AuraDB (Free Tier) and Gemini API # ================================================================== app = FastAPI(title="TimeZero Trace API") # Environment configuration (These live safely on the server) NEO4J_URI = os.getenv("NEO4J_URI", "neo4j+s://your-instance.databases.neo4j.io") NEO4J_USER = os.getenv("NEO4J_USER", "neo4j") NEO4J_PASS = os.getenv("NEO4J_PASS", "your_password") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") # Initialize connections try: driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS)) except Exception: driver = None # Handled gracefully if credentials aren't set if GEMINI_API_KEY: genai.configure(api_key=GEMINI_API_KEY) model = genai.GenerativeModel('gemini-1.5-flash') else: model = None class TraceRequest(BaseModel): raw_claim_text: str bypass_cache: bool = False # ------------------------------------------------------------------ # STAGE 1: Schema Extraction (LLM via Gemini) # ------------------------------------------------------------------ def extract_schema(text: str) -> str: """Converts raw text into a structural semantic graph pattern""" if not model: return "[Actor] -> [Action] -> [Target] -> [Context] # (Mock Schema)" prompt = f"Extract the core semantic claim. Format exactly as: [Actor] -> [Action] -> [Target] -> [Context]. Text: '{text}'" try: response = model.generate_content(prompt) return response.text.strip() except Exception as e: return "[Actor] -> [Action] -> [Target] -> [Location]" # Fallback # ------------------------------------------------------------------ # STAGE 2: The Deep Crawl (Simulated for Blueprint) # ------------------------------------------------------------------ def execute_deep_crawl(schema: str): """ In production, this queries GDELT, Telethon, Pushshift. Returns the structured path from Time Zero to Mainstream Injection. """ # Mocked Trace Result based on our test case return { "time_zero_node": {"id": "fb_group_123", "type": "Private Screenshot", "fringe_score": 0.99}, "mainstream_node": {"id": "vp_debate", "type": "Television", "fringe_score": 0.10}, "path_depth": 3, "confidence": 0.94 } # ------------------------------------------------------------------ # STAGE 3: Neo4j Persistence (The Ephemeral Pruning) # ------------------------------------------------------------------ def persist_trace_passport(schema: str, crawl_data: dict): """ We DO NOT save the whole internet. We save the Time Zero passport and the Launderer node to Neo4j to act as the Tier 1 Cache for future queries. """ if not driver: return "mock_passport_123" schema_hash = hashlib.md5(schema.encode()).hexdigest() query = """ MERGE (c:Claim {schema_hash: $hash}) SET c.schema = $schema, c.last_verified = timestamp() MERGE (tz:Source {id: $tz_id}) SET tz.type = $tz_type, tz.fringe_score = $tz_score MERGE (ln:Source {id: $main_id}) SET ln.type = $main_type MERGE (tz)-[:ORIGINATED]->(c) MERGE (ln)-[:LAUNDERED {depth: $depth}]->(c) RETURN c.schema_hash AS passport_id """ try: with driver.session() as session: result = session.run(query, hash=schema_hash, schema=schema, tz_id=crawl_data['time_zero_node']['id'], tz_type=crawl_data['time_zero_node']['type'], tz_score=crawl_data['time_zero_node']['fringe_score'], main_id=crawl_data['mainstream_node']['id'], main_type=crawl_data['mainstream_node']['type'], depth=crawl_data['path_depth'] ) record = result.single() return record["passport_id"] if record else None except Exception as e: print(f"Neo4j Write Error: {e}") return "mock_passport_123" # ------------------------------------------------------------------ # API ENDPOINT # ------------------------------------------------------------------ @app.post("/api/trace") async def run_trace(req: TraceRequest): # 1. Extract Schema schema = extract_schema(req.raw_claim_text) schema_hash = hashlib.md5(schema.encode()).hexdigest() # 2. Check Tier 1 Cache (Neo4j) if not req.bypass_cache and driver: try: with driver.session() as session: cache_check = session.run("MATCH (c:Claim {schema_hash: $hash}) RETURN c", hash=schema_hash).single() if cache_check: return {"status": "CACHE_HIT", "schema": schema, "cost": "$0.00", "latency": "15ms"} except Exception: pass # Fall through to crawl on DB error # 3. If cache miss, run the heavy crawl crawl_result = execute_deep_crawl(schema) # 4. Save Passport to Graph Database passport_id = persist_trace_passport(schema, crawl_result) return { "status": "TRACE_COMPLETE", "schema": schema, "time_zero": crawl_result['time_zero_node'], "passport_id": passport_id, "cost": "$0.05", "latency": "3450ms" }