From 7599634bea626a413bb6f88978c272b0310ea6e2 Mon Sep 17 00:00:00 2001 From: Andresbravo9 <127428045+Andresbravo9@users.noreply.github.com> Date: Wed, 20 May 2026 15:18:16 -0300 Subject: [PATCH] feat(memory): implement experimental dual-layer memory spike for backtesting Closes #9. Adds a toggleable (USE_EXPERIMENTAL_MEMORY) prototype with Core Memory + Archival Memory mapping to compete against Zep Knowledge Graph. Includes I/O batch optimizations and absolute timestamps. --- SPIKE_S1_MEMORY.md | 51 ++++++ backend/app/config.py | 4 + backend/app/services/experimental_memory.py | 166 ++++++++++++++++++ backend/app/services/report_agent.py | 5 +- .../app/services/zep_graph_memory_updater.py | 28 ++- backend/app/services/zep_tools.py | 45 ++++- 6 files changed, 293 insertions(+), 6 deletions(-) create mode 100644 SPIKE_S1_MEMORY.md create mode 100644 backend/app/services/experimental_memory.py diff --git a/SPIKE_S1_MEMORY.md b/SPIKE_S1_MEMORY.md new file mode 100644 index 00000000..71b1ec84 --- /dev/null +++ b/SPIKE_S1_MEMORY.md @@ -0,0 +1,51 @@ +# Spike S1: Prototype Memory Layer for Backtesting Comparison + +## 1. Executive Summary +This spike prototyped an alternative memory layer for MiroFish, inspired by Andrej Karpathy's LLM-Wiki approach. The goal was to create a simpler, more predictable memory system that can be used for A/B testing and backtesting against the current Zep-based Knowledge Graph system. + +## 2. Technical Mapping (Current System) +The current MiroFish memory system relies on **Zep Knowledge Graph**: +- **Construction:** `GraphBuilderService` extracts entities and relationships from text. +- **Ingestion:** `ZepGraphMemoryUpdater` batch-updates Zep with agent activities. +- **Retrieval:** `ZepToolsService` performs complex graph searches (InsightForge, PanoramaSearch). +- **Pros:** High structure, captures complex relationships. +- **Cons:** High token cost, variable retrieval consistency, complex maintenance. + +## 3. Experimental Architecture +The prototype implements a **Dual-Layer Flat Memory**: + +### A. Core Memory (Working Set) +- **Concept:** A fixed block of high-value information. +- **Content:** Agent Persona, Bio, Current Objectives, and Key Global Facts. +- **Injection:** Always prepended to the retrieved context in the prompt. +- **Token Budget:** ~500 tokens. + +### B. Archival Memory (Long-term) +- **Concept:** A vector-based repository of raw text "episodes". +- **Retrieval:** Top-K semantic search (Cosine Similarity). +- **Fallback:** Keyword-based scoring if embedding services are unavailable. +- **Storage:** Local JSON storage per simulation (`backend/data/simulations/{id}/experimental_memory.json`). +- **Optimization:** Added an `add_memories` batching method to process multiple episodes concurrently, preventing I/O bottlenecks during high-volume simulation cycles. + +## 4. Implementation Details +- **Flag:** `USE_EXPERIMENTAL_MEMORY=true` (Environment variable). +- **Service:** `ExperimentalMemoryService` handles storage and retrieval. +- **Interception:** `ZepToolsService` and `ZepGraphMemoryUpdater` are modified to divert calls to the experimental service when the flag is enabled. + +## 5. Initial Metrics for Evaluation +The following metrics have been defined for comparing Baseline vs Experimental: + +1. **Consistency:** Does the agent contradict past actions/statements? (0 to 1). +2. **Evidence Usage:** Ratio of cited/used retrieved fragments in the final response. +3. **Stability:** Token consumption growth over time (tokens/round). +4. **Prediction Quality:** Proximity of agent decisions to historical ground truth (for backtesting). + +## 6. How to Run Comparison +1. **Run Baseline:** Set `USE_EXPERIMENTAL_MEMORY=false` and run a simulation/report. +2. **Run Experimental:** Set `USE_EXPERIMENTAL_MEMORY=true` and run the same simulation/report. +3. **Compare Output:** Check `backend/data/simulations/{id}/experimental_memory.json` vs Zep dashboard. + +## 7. Next Steps +- Implement a more robust local vector store (FAISS/Chroma). +- Refine the "Core Memory" update logic (summarization of key events). +- Integration with UI to show Core Memory status. diff --git a/backend/app/config.py b/backend/app/config.py index 967c862e..13e8064c 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -109,6 +109,10 @@ class Config: REPORT_AGENT_MAX_TOOL_CALLS = int(os.environ.get('REPORT_AGENT_MAX_TOOL_CALLS', '5')) REPORT_AGENT_MAX_REFLECTION_ROUNDS = int(os.environ.get('REPORT_AGENT_MAX_REFLECTION_ROUNDS', '2')) REPORT_AGENT_TEMPERATURE = float(os.environ.get('REPORT_AGENT_TEMPERATURE', '0.5')) + + # Experimental Memory (Spike S1) + USE_EXPERIMENTAL_MEMORY = os.environ.get('USE_EXPERIMENTAL_MEMORY', 'False').lower() == 'true' + DATA_DIR = os.path.join(os.path.dirname(__file__), '../data') @classmethod def use_openzep(cls): diff --git a/backend/app/services/experimental_memory.py b/backend/app/services/experimental_memory.py new file mode 100644 index 00000000..04cbf10a --- /dev/null +++ b/backend/app/services/experimental_memory.py @@ -0,0 +1,166 @@ +""" +Experimental Memory Service (Spike S1) +Implements a dual-layer memory approach: Core Memory + Archival Memory. +Inspired by Karpathy's LLM-Wiki and MemGPT. +""" + +import os +import json +import time +import numpy as np +from typing import List, Dict, Any, Optional +from ..utils.embedding_client import EmbeddingClient +from ..config import Config +from ..utils.logger import get_logger + +logger = get_logger('mirofish.experimental_memory') + +class ExperimentalMemoryService: + def __init__(self, simulation_id: str): + self.simulation_id = simulation_id + self.storage_path = os.path.join(Config.DATA_DIR, 'simulations', simulation_id, 'experimental_memory.json') + self.core_memory_path = os.path.join(Config.DATA_DIR, 'simulations', simulation_id, 'core_memory.json') + + # Ensure directory exists + os.makedirs(os.path.dirname(self.storage_path), exist_ok=True) + + self.embedder = self._get_embedder() + self.memories = self._load_memories() + self.core_memory = self._load_core_memory() + + def _get_embedder(self) -> Optional[EmbeddingClient]: + embedder_config = Config.get_graph_search_embedder_config() + base_url = embedder_config.get("base_url") + model = embedder_config.get("model") + if not base_url or not model: + logger.warning("Embedding client not configured for experimental memory.") + return None + try: + return EmbeddingClient( + api_key=embedder_config.get("api_key") or "ollama", + base_url=base_url, + model=model + ) + except Exception as e: + logger.error(f"Failed to initialize embedding client: {e}") + return None + + def _load_memories(self) -> List[Dict[str, Any]]: + if os.path.exists(self.storage_path): + with open(self.storage_path, 'r', encoding='utf-8') as f: + return json.load(f) + return [] + + def _save_memories(self): + with open(self.storage_path, 'w', encoding='utf-8') as f: + json.dump(self.memories, f, ensure_ascii=False, indent=2) + + def _load_core_memory(self) -> Dict[str, Any]: + if os.path.exists(self.core_memory_path): + with open(self.core_memory_path, 'r', encoding='utf-8') as f: + return json.load(f) + return { + "persona": "Standard MiroFish Agent", + "objectives": [], + "key_events": [] + } + + def save_core_memory(self, core_data: Dict[str, Any]): + self.core_memory.update(core_data) + with open(self.core_memory_path, 'w', encoding='utf-8') as f: + json.dump(self.core_memory, f, ensure_ascii=False, indent=2) + + def add_memories(self, activities: List[Dict[str, Any]]): + """Add multiple episodes to archival memory in a batch to avoid I/O bottlenecks.""" + if not activities: + return + + texts_to_embed = [item.get("text", "") for item in activities] + embeddings = [None] * len(activities) + + if self.embedder: + try: + embeddings = self.embedder.embed_texts(texts_to_embed) + except Exception as e: + logger.error(f"Failed to batch embed memory: {e}") + + for item, embedding in zip(activities, embeddings): + memory_entry = { + "text": item.get("text", ""), + "metadata": item.get("metadata", {}), + "embedding": embedding, + "timestamp": time.time() + } + self.memories.append(memory_entry) + + self._save_memories() + + def add_memory(self, text: str, metadata: Optional[Dict[str, Any]] = None): + """Add a single episode to archival memory.""" + self.add_memories([{"text": text, "metadata": metadata or {}}]) + + def retrieve(self, query: str, k: int = 5) -> Dict[str, Any]: + """Retrieve context from both Core and Archival memory.""" + archival_results = self._retrieve_archival(query, k) + + return { + "core_memory": self.core_memory, + "archival_memory": archival_results + } + + def _retrieve_archival(self, query: str, k: int) -> List[str]: + if not self.memories: + return [] + + use_fallback = False + if not self.embedder: + use_fallback = True + + if not use_fallback: + try: + query_embedding = self.embedder.embed_texts([query])[0] + + # Compute cosine similarity + scores = [] + for m in self.memories: + if m.get("embedding"): + sim = self._cosine_similarity(query_embedding, m["embedding"]) + scores.append((sim, m["text"])) + + if not scores: + use_fallback = True + else: + # Sort by similarity + scores.sort(key=lambda x: x[0], reverse=True) + return [s[1] for s in scores[:k]] + + except Exception as e: + logger.error(f"Error during vector retrieval: {e}. Falling back to keyword search.") + use_fallback = True + + if use_fallback: + # Fallback to simple keyword search + logger.info("Using keyword search fallback for archival memory.") + # Simple scoring based on word overlap or containment + scored_results = [] + query_words = set(query.lower().split()) + for m in self.memories: + text_lower = m["text"].lower() + score = 0 + if query.lower() in text_lower: + score += 10 + for word in query_words: + if word in text_lower: + score += 1 + if score > 0: + scored_results.append((score, m["text"])) + + scored_results.sort(key=lambda x: x[0], reverse=True) + return [s[1] for s in scored_results[:k]] + + return [] + + def _cosine_similarity(self, v1: List[float], v2: List[float]) -> float: + a = np.array(v1) + b = np.array(v2) + return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) diff --git a/backend/app/services/report_agent.py b/backend/app/services/report_agent.py index cecd70b4..b068bc01 100644 --- a/backend/app/services/report_agent.py +++ b/backend/app/services/report_agent.py @@ -904,7 +904,10 @@ class ReportAgent: self.simulation_requirement = simulation_requirement self.llm = llm_client or LLMClient() - self.zep_tools = zep_tools or ZepToolsService() + self.zep_tools = zep_tools or ZepToolsService( + llm_client=self.llm, + simulation_id=self.simulation_id + ) # 工具定义 self.tools = self._define_tools() diff --git a/backend/app/services/zep_graph_memory_updater.py b/backend/app/services/zep_graph_memory_updater.py index 509f9f78..8d8b27c2 100644 --- a/backend/app/services/zep_graph_memory_updater.py +++ b/backend/app/services/zep_graph_memory_updater.py @@ -228,15 +228,17 @@ class ZepGraphMemoryUpdater: MAX_RETRIES = 3 RETRY_DELAY = 2 # 秒 - def __init__(self, graph_id: str, api_key: Optional[str] = None): + def __init__(self, graph_id: str, api_key: Optional[str] = None, simulation_id: Optional[str] = None): """ 初始化更新器 Args: graph_id: Zep图谱ID api_key: Zep API Key(可选,默认从配置读取) + simulation_id: 模拟ID(用于实验性记忆) """ self.graph_id = graph_id + self.simulation_id = simulation_id self.api_key = Config.ZEP_API_KEY if api_key is None else api_key errors = Config.get_graph_backend_config_errors(api_key=self.api_key) @@ -245,6 +247,13 @@ class ZepGraphMemoryUpdater: self.backend = get_graph_backend(api_key=self.api_key) + # 实验性记忆服务 + self.exp_memory = None + if Config.USE_EXPERIMENTAL_MEMORY and simulation_id: + from .experimental_memory import ExperimentalMemoryService + self.exp_memory = ExperimentalMemoryService(simulation_id) + logger.info(f"实验性记忆已启用: simulation_id={simulation_id}") + # 活动队列 self._activity_queue: Queue = Queue() @@ -416,6 +425,21 @@ class ZepGraphMemoryUpdater: data=combined_text ) + # 同时保存到实验性记忆 + if self.exp_memory: + exp_activities = [] + for activity in activities: + exp_activities.append({ + "text": activity.to_episode_text(), + "metadata": { + "platform": activity.platform, + "agent_id": activity.agent_id, + "round": activity.round_num, + "action": activity.action_type + } + }) + self.exp_memory.add_memories(exp_activities) + self._total_sent += 1 self._total_items_sent += len(activities) display_name = self._get_platform_display_name(platform) @@ -502,7 +526,7 @@ class ZepGraphMemoryManager: if simulation_id in cls._updaters: cls._updaters[simulation_id].stop() - updater = ZepGraphMemoryUpdater(graph_id) + updater = ZepGraphMemoryUpdater(graph_id, simulation_id=simulation_id) updater.start() cls._updaters[simulation_id] = updater diff --git a/backend/app/services/zep_tools.py b/backend/app/services/zep_tools.py index ffc1a003..d4d3f742 100644 --- a/backend/app/services/zep_tools.py +++ b/backend/app/services/zep_tools.py @@ -424,17 +424,36 @@ class ZepToolsService: MAX_RETRIES = 3 RETRY_DELAY = 2.0 - def __init__(self, api_key: Optional[str] = None, llm_client: Optional[LLMClient] = None): + def __init__(self, api_key: Optional[str] = None, llm_client: Optional[LLMClient] = None, simulation_id: Optional[str] = None): self.api_key = Config.ZEP_API_KEY if api_key is None else api_key + self.simulation_id = simulation_id + + # 实验性记忆服务 + self.exp_memory = None + if Config.USE_EXPERIMENTAL_MEMORY and simulation_id: + from .experimental_memory import ExperimentalMemoryService + self.exp_memory = ExperimentalMemoryService(simulation_id) + logger.info(f"实验性记忆已在 ZepToolsService 中启用: simulation_id={simulation_id}") + + # 如果没有启用实验性记忆,或者仍需要图谱后端,则验证配置 errors = Config.get_graph_backend_config_errors(api_key=self.api_key) - if errors: + if errors and not self.exp_memory: raise ValueError("; ".join(errors)) - self.backend = get_graph_backend(api_key=self.api_key) + try: + self.backend = get_graph_backend(api_key=self.api_key) + except Exception as e: + if self.exp_memory: + logger.warning(f"无法初始化图谱后端 (将仅使用实验性记忆): {e}") + self.backend = None + else: + raise e + # LLM客户端用于InsightForge生成子问题 self._llm_client = llm_client self._search_embedder_client = None self._search_reranker_client = None + logger.info("ZepToolsService 初始化完成") @property @@ -979,6 +998,26 @@ class ZepToolsService: 当前实现会优先召回边,再按配置补充节点摘要,避免只拿到零散 fact 或在 OpenZep 上完全退化到本地关键词搜索。 """ + # 实验性记忆逻辑 (Spike S1) + if self.exp_memory: + logger.info(f"使用实验性记忆进行搜索: query={query[:50]}...") + exp_results = self.exp_memory.retrieve(query, k=limit) + + facts = exp_results["archival_memory"] + core = exp_results["core_memory"] + + # 将 Core Memory 也转化为 facts 供后续使用 + core_fact = f"[CORE MEMORY] Persona: {core.get('persona', 'N/A')}. Objectives: {', '.join(core.get('objectives', []))}" + facts.insert(0, core_fact) + + return SearchResult( + facts=facts, + edges=[], # 实验性记忆暂不支持边 + nodes=[], # 实验性记忆暂不支持节点 + query=query, + total_count=len(facts) + ) + logger.info(f"图谱搜索: graph_id={graph_id}, query={query[:50]}...") query_normalized = self._normalize_text(query)