feat(graph): refactor core services for high-availability simulations
This commit is contained in:
parent
676ec898b9
commit
e7f452ed9e
|
|
@ -18,6 +18,7 @@ from ..models.task import TaskManager, TaskStatus
|
||||||
from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
|
from ..utils.zep_paging import fetch_all_nodes, fetch_all_edges
|
||||||
from .text_processor import TextProcessor
|
from .text_processor import TextProcessor
|
||||||
from ..utils.locale import t, get_locale, set_locale
|
from ..utils.locale import t, get_locale, set_locale
|
||||||
|
from ..utils.zep_retry import with_zep_retry
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
@ -190,6 +191,7 @@ class GraphBuilderService:
|
||||||
error_msg = f"{str(e)}\n{traceback.format_exc()}"
|
error_msg = f"{str(e)}\n{traceback.format_exc()}"
|
||||||
self.task_manager.fail_task(task_id, error_msg)
|
self.task_manager.fail_task(task_id, error_msg)
|
||||||
|
|
||||||
|
@with_zep_retry(max_retries=3, operation_name="create_graph")
|
||||||
def create_graph(self, name: str) -> str:
|
def create_graph(self, name: str) -> str:
|
||||||
"""创建Zep图谱(公开方法)"""
|
"""创建Zep图谱(公开方法)"""
|
||||||
graph_id = f"mirofish_{uuid.uuid4().hex[:16]}"
|
graph_id = f"mirofish_{uuid.uuid4().hex[:16]}"
|
||||||
|
|
@ -285,11 +287,14 @@ class GraphBuilderService:
|
||||||
|
|
||||||
# 调用Zep API设置本体
|
# 调用Zep API设置本体
|
||||||
if entity_types or edge_definitions:
|
if entity_types or edge_definitions:
|
||||||
self.client.graph.set_ontology(
|
@with_zep_retry(max_retries=3, operation_name="set_ontology")
|
||||||
graph_ids=[graph_id],
|
def _set_ontology():
|
||||||
entities=entity_types if entity_types else None,
|
self.client.graph.set_ontology(
|
||||||
edges=edge_definitions if edge_definitions else None,
|
graph_ids=[graph_id],
|
||||||
)
|
entities=entity_types if entity_types else None,
|
||||||
|
edges=edge_definitions if edge_definitions else None,
|
||||||
|
)
|
||||||
|
_set_ontology()
|
||||||
|
|
||||||
def add_text_batches(
|
def add_text_batches(
|
||||||
self,
|
self,
|
||||||
|
|
@ -322,10 +327,14 @@ class GraphBuilderService:
|
||||||
|
|
||||||
# 发送到Zep
|
# 发送到Zep
|
||||||
try:
|
try:
|
||||||
batch_result = self.client.graph.add_batch(
|
@with_zep_retry(max_retries=3, operation_name=f"add_batch {batch_num}/{total_batches}")
|
||||||
graph_id=graph_id,
|
def _add_batch():
|
||||||
episodes=episodes
|
return self.client.graph.add_batch(
|
||||||
)
|
graph_id=graph_id,
|
||||||
|
episodes=episodes
|
||||||
|
)
|
||||||
|
|
||||||
|
batch_result = _add_batch()
|
||||||
|
|
||||||
# 收集返回的 episode uuid
|
# 收集返回的 episode uuid
|
||||||
if batch_result and isinstance(batch_result, list):
|
if batch_result and isinstance(batch_result, list):
|
||||||
|
|
@ -376,7 +385,11 @@ class GraphBuilderService:
|
||||||
# 检查每个 episode 的处理状态
|
# 检查每个 episode 的处理状态
|
||||||
for ep_uuid in list(pending_episodes):
|
for ep_uuid in list(pending_episodes):
|
||||||
try:
|
try:
|
||||||
episode = self.client.graph.episode.get(uuid_=ep_uuid)
|
@with_zep_retry(max_retries=2, initial_delay=1.0, operation_name="get_episode")
|
||||||
|
def _get_episode():
|
||||||
|
return self.client.graph.episode.get(uuid_=ep_uuid)
|
||||||
|
|
||||||
|
episode = _get_episode()
|
||||||
is_processed = getattr(episode, 'processed', False)
|
is_processed = getattr(episode, 'processed', False)
|
||||||
|
|
||||||
if is_processed:
|
if is_processed:
|
||||||
|
|
@ -500,6 +513,7 @@ class GraphBuilderService:
|
||||||
"edge_count": len(edges_data),
|
"edge_count": len(edges_data),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@with_zep_retry(max_retries=3, operation_name="delete_graph")
|
||||||
def delete_graph(self, graph_id: str):
|
def delete_graph(self, graph_id: str):
|
||||||
"""删除图谱"""
|
"""删除图谱"""
|
||||||
self.client.graph.delete(graph_id=graph_id)
|
self.client.graph.delete(graph_id=graph_id)
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ from zep_cloud.client import Zep
|
||||||
from ..config import Config
|
from ..config import Config
|
||||||
from ..utils.logger import get_logger
|
from ..utils.logger import get_logger
|
||||||
from ..utils.locale import get_language_instruction, get_locale, set_locale, t
|
from ..utils.locale import get_language_instruction, get_locale, set_locale, t
|
||||||
|
from ..utils.zep_retry import with_zep_retry
|
||||||
from .zep_entity_reader import EntityNode, ZepEntityReader
|
from .zep_entity_reader import EntityNode, ZepEntityReader
|
||||||
|
|
||||||
logger = get_logger('mirofish.oasis_profile')
|
logger = get_logger('mirofish.oasis_profile')
|
||||||
|
|
@ -316,55 +317,27 @@ class OasisProfileGenerator:
|
||||||
|
|
||||||
comprehensive_query = t('progress.zepSearchQuery', name=entity_name)
|
comprehensive_query = t('progress.zepSearchQuery', name=entity_name)
|
||||||
|
|
||||||
|
@with_zep_retry(max_retries=3, initial_delay=2.0, operation_name="Zep Edge Search")
|
||||||
def search_edges():
|
def search_edges():
|
||||||
"""搜索边(事实/关系)- 带重试机制"""
|
"""搜索边(事实/关系)- 带重试机制"""
|
||||||
max_retries = 3
|
return self.zep_client.graph.search(
|
||||||
last_exception = None
|
query=comprehensive_query,
|
||||||
delay = 2.0
|
graph_id=self.graph_id,
|
||||||
|
limit=30,
|
||||||
for attempt in range(max_retries):
|
scope="edges",
|
||||||
try:
|
reranker="rrf"
|
||||||
return self.zep_client.graph.search(
|
)
|
||||||
query=comprehensive_query,
|
|
||||||
graph_id=self.graph_id,
|
|
||||||
limit=30,
|
|
||||||
scope="edges",
|
|
||||||
reranker="rrf"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
last_exception = e
|
|
||||||
if attempt < max_retries - 1:
|
|
||||||
logger.debug(f"Zep边搜索第 {attempt + 1} 次失败: {str(e)[:80]}, 重试中...")
|
|
||||||
time.sleep(delay)
|
|
||||||
delay *= 2
|
|
||||||
else:
|
|
||||||
logger.debug(f"Zep边搜索在 {max_retries} 次尝试后仍失败: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
@with_zep_retry(max_retries=3, initial_delay=2.0, operation_name="Zep Node Search")
|
||||||
def search_nodes():
|
def search_nodes():
|
||||||
"""搜索节点(实体摘要)- 带重试机制"""
|
"""搜索节点(实体摘要)- 带重试机制"""
|
||||||
max_retries = 3
|
return self.zep_client.graph.search(
|
||||||
last_exception = None
|
query=comprehensive_query,
|
||||||
delay = 2.0
|
graph_id=self.graph_id,
|
||||||
|
limit=20,
|
||||||
for attempt in range(max_retries):
|
scope="nodes",
|
||||||
try:
|
reranker="rrf"
|
||||||
return self.zep_client.graph.search(
|
)
|
||||||
query=comprehensive_query,
|
|
||||||
graph_id=self.graph_id,
|
|
||||||
limit=20,
|
|
||||||
scope="nodes",
|
|
||||||
reranker="rrf"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
last_exception = e
|
|
||||||
if attempt < max_retries - 1:
|
|
||||||
logger.debug(f"Zep节点搜索第 {attempt + 1} 次失败: {str(e)[:80]}, 重试中...")
|
|
||||||
time.sleep(delay)
|
|
||||||
delay *= 2
|
|
||||||
else:
|
|
||||||
logger.debug(f"Zep节点搜索在 {max_retries} 次尝试后仍失败: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 并行执行edges和nodes搜索
|
# 并行执行edges和nodes搜索
|
||||||
|
|
|
||||||
|
|
@ -225,8 +225,9 @@ class OntologyGenerator:
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# 传给 LLM 的文本最大长度(5万字)
|
# 传给 LLM 的文本最大长度(2万字)
|
||||||
MAX_TEXT_LENGTH_FOR_LLM = 50000
|
# 本体分析只需识别实体/关系类型,不需要完整文本;完整文本仍用于后续图谱构建
|
||||||
|
MAX_TEXT_LENGTH_FOR_LLM = 20000
|
||||||
|
|
||||||
def _build_user_message(
|
def _build_user_message(
|
||||||
self,
|
self,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue