from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends from pydantic import BaseModel import openai import os import hashlib import time from typing import List, Optional, Dict import httpx # For async API calls (GDELT, Pushshift) app = FastAPI(title="Time Zero API", version="1.0.0") # --- CONFIG & SECRETS --- # In production, these are injected via environment variables (e.g., AWS Secrets Manager) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # --- IN-MEMORY CACHE (Simulating Redis for Tier 1) --- # A real deployment uses Redis or Pinecone for vector matching. TIER_1_CACHE: Dict[str, dict] = {} # --- MODELS --- class TraceRequest(BaseModel): claim: str use_paid_tier: bool = False class Node(BaseModel): id: str label: str level: int source: str schema_match: str payload: str cred: str group: str is_image: bool = False class TraceResponse(BaseModel): claim: str schema: str nodes: List[Node] edges: List[dict] latency_ms: float cost: float cache_hit: bool # --- SERVICES --- async def extract_schema(claim: str) -> str: """Uses GPT-4o-mini to extract the invariant behavioral schema.""" if not OPENAI_API_KEY: # Fallback for POC if no key is set return "[Immigrant Group] -> [Consuming] -> [Domestic Animal] -> [US Location]" openai.api_key = OPENAI_API_KEY response = openai.ChatCompletion.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "Extract the structural behavioral schema of the following claim, removing specific nouns. Format: [Subject] -> [Action] -> [Object] -> [Context]"}, {"role": "user", "content": claim} ], temperature=0.1 ) return response.choices[0].message.content.strip() async def generate_embedding(text: str) -> str: """Generates a semantic vector for Pinecone lookup.""" # Simulated vector generation hash_object = hashlib.sha256(text.encode()) return hash_object.hexdigest() async def query_gdelt_surface(vector_hash: str) -> List[Node]: """Hits GDELT and CommonCrawl for surface mainstream/bridge nodes.""" # Simulated API call to GDELT return [ Node(id="N_MAIN", label="Mainstream Injection\n(VP Debate)", level=4, source="Live Broadcast", schema_match="Exact", payload="Broadcast transcript...", cred="0.95 / 0.05", group="mainstream"), Node(id="N_B1", label="X Influencer A", level=3, source="X Firehose", schema_match="Exact", payload="Viral tweet...", cred="0.40 / 0.60", group="surface") ] async def query_dark_social(vector_hash: str, use_paid: bool) -> List[Node]: """Hits Telethon/Dataminr (Paid) or Pushshift/4plebs (Free).""" if use_paid: # Simulated Dataminr / Telethon Call return [Node(id="N_D1", label="Telegram Channel\n(Blood Tribe)", level=2, source="Telethon API", schema_match="Exact", payload="Forwarded message...", cred="0.01 / 0.99", group="dark")] else: # Simulated 4plebs Call return [Node(id="N_D1", label="4chan /pol/", level=2, source="4plebs API", schema_match="Exact", payload="Anon post...", cred="0.00 / 1.00", group="dark")] async def triage_screenshot_wall(image_url: str) -> Node: """Perceptual Hash -> Gemini Vision OCR.""" # In production, we check local perceptual hash DB first to bypass OCR cost. # Simulated Gemini Flash Call: return Node(id="N_ZERO", label="TIME ZERO\nPrivate FB\n[IMAGE]", level=1, source="OCR Extraction", schema_match="Exact", payload="[IMAGE] OCR: Neighbor said...", cred="0.02 / 0.98", group="origin", is_image=True) # --- ENDPOINTS --- @app.post("/api/trace", response_model=TraceResponse) async def run_trace(req: TraceRequest): start_time = time.time() # Generate deterministic hash for cache lookup (In prod: Vector Similarity Search) query_hash = hashlib.md5(req.claim.lower().encode()).hexdigest() # ========================================== # LAYER 1: TIER 1 CACHE (15ms Fast Path) # ========================================== if query_hash in TIER_1_CACHE: cached_data = TIER_1_CACHE[query_hash] latency = (time.time() - start_time) * 1000 return TraceResponse( claim=req.claim, schema=cached_data["schema"], nodes=cached_data["nodes"], edges=cached_data["edges"], latency_ms=latency, # Will be ~1-15ms cost=0.0000, # ZERO COST ON CACHE HIT cache_hit=True ) # ========================================== # LAYER 2: DEEP CRAWL (Async Pipeline) # ========================================== # 1. Schema Extraction schema = await extract_schema(req.claim) vector = await generate_embedding(schema) cost = 0.0015 # Base LLM Cost # 2. Surface Web Index (GDELT) surface_nodes = await query_gdelt_surface(vector) cost += 0.0005 # API Egress # 3. Dark Social Firehose dark_nodes = await query_dark_social(vector, req.use_paid_tier) cost += 0.0450 if req.use_paid_tier else 0.0000 # 4. Screenshot Wall Triage (If image found) # Hardcoded simulation of finding an image at Time Zero origin_node = await triage_screenshot_wall("simulated_url") cost += 0.0025 # Gemini Vision OCR Cost # 5. Build Graph all_nodes = [origin_node] + dark_nodes + surface_nodes edges = [ {"from": origin_node.id, "to": dark_nodes[0].id}, {"from": dark_nodes[0].id, "to": surface_nodes[1].id}, {"from": surface_nodes[1].id, "to": surface_nodes[0].id} ] # Save to Cache for future agents TIER_1_CACHE[query_hash] = { "schema": schema, "nodes": all_nodes, "edges": edges } latency = (time.time() - start_time) * 1000 return TraceResponse( claim=req.claim, schema=schema, nodes=all_nodes, edges=edges, latency_ms=latency, cost=cost, cache_hit=False ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)