AI投资机器:代理架构
- Claude Paugh

- 2月15日
- 讀畢需時 6 分鐘
概述
首先,我想重点关注一些特定的系统相关领域。因为数据检索和汇总将是功能的重要组成部分,甚至可以说是核心功能。所以我决定让我的初始代理专注于这些方面,因为我认为它们能够“学习”并检测用户查询和聚合的模式至关重要。这将使它们在回答用户问题时能够更加主动。
因为我决定全力投入基于 Python 的项目开发。我的主要代理基础设施组件是用以下代码构建的:
Google Agent Development Kit (ADK) 和基于 Agent-to-Agent (A2A) 的代理
芹菜用于任务管理
查询代理:一个异步、多进程执行器,具有缓存和取消功能,前端由 Celery 路由的 A2A 消息提供支持。
聚合代理:一种周期性/半按需预计算服务,可将常用的分析结果具体化到 Redis 中,从而保持 UI 和聊天功能快速运行。
下方是查询代理和聚合代理的简明架构图以及详细的时序图。我还附上了最相关的文件和关键方法,方便您快速查阅实现细节。

查询代理文件: `search/query_agent.py` - 主要方法:`worker_process`、`execute_query`、`_handle_cancel`、`handle_message`、`connect`
聚合代理文件:`data_sync/aggregation_agent.py` - 主要方法:`execute_job`、`run_cycle`、`check_inbox`、`AggregationEncoder`、`AggregationAgent.run`
通用代理框架:`util/adk/base_agent.py` - 关键类/方法:`AgentContext`、`BaseAgent.connect`、`BaseAgent.send_message`(Celery)、`BaseAgent.register`/heartbeat、`create_celery_task`
查询代理——它的功能
总体目标:学习用户查询请求模式,以指导数据缓存或检索。代表聊天/搜索后端异步执行繁重查询,将阻塞式 I/O 和序列化卸载到隔离的工作进程中。它提供后端多路复用功能:
Neo4j、Couchbase、Redis、LLM、文本到密码的并发限制(通过后端使用信号量)。
Redis 中基于内容哈希的缓存结果
通过终止工作进程来支持硬取消
生命周期与信息流
1.注册/心跳- `BaseAgent.connect` 和 `BaseAgent.register`(通过 `util/adk/base_agent.py`)连接到 Redis,并在 `agent:query_agent:heartbeat` 处保持心跳,并在注册表中维护一个条目 `registry:agent:query_agent`。- 消息通过 Celery 发送/接收。`BaseAgent.send_message` 使用 Celery 将 `A2AMessage` 转发给 `agent`。
2.接收任务- `QueryAgent.handle_message` 根据 `payload_type` 进行路由: - `"task"` → `execute_query` - `"cancel"` → `_handle_cancel`
3.缓存查找- `execute_query` 构建任务有效负载的稳定哈希值(不包括 `id`、`outbox`、`parent_id` 等瞬态字段)。 - 缓存键:`query_cache:`
4.调度到隔离的工作进程- 每个源(例如,`neo4j`、`couchbase`、`llm` 等)选择一个信号量。- 代理会生成一个运行 `worker_process(task, result_queue)` 的 `multiprocessing` 进程。- 工作进程中的 STDIN/STDOUT/STDERR 绑定到 `/dev/null`,以避免进程销毁期间出现描述符问题。
5.后端执行(在 worker 内部) - Neo4j:使用同步驱动程序(`GraphDatabase.driver`)并运行提供的 Cypher 查询(或专门的“工具”查询,例如国家/货币路径扩展器)。返回的行是字典,然后通过 `_jsonify_value` 进行规范化,以确保 JSON 安全类型。 - Couchbase:反射地调用 `search.couchbase_service.cb_service` 上的方法,并传入 `params` 参数。 - LLM/Text2Cypher:使用 `chat_gateway.llm_client.LLMClient` 从 NL 生成 Cypher 字符串或运行 LLM 任务。
6.结果收集与回复- 父进程轮询 `result_queue`。一旦结果可用,它会使用 `AgentContext.reply` 将结构化结果转发给原始发送者,该过程通过 Celery 进行路由。- 缓存可以填充规范化的 JSON(遵循文件中定义的 TTL 策略),以便将来执行相同任务时能够立即返回结果。
7.取消操作- `_handle_cancel` 会根据 `task_id` 查找正在运行的工作进程,并发送 `SIGTERM` 信号,必要时升级到 `SIGKILL` 信号。取消结果会回复给请求者。
数据合约
-传入的任务有效载荷
- `来源`: `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`
- `query` 和/或 `method`,`params`(后端特定)
-可选: `tool`(特殊的 Neo4j 查询模式)、`limit`、`outbox`
-输出结果
- `{ id, source, ok, data?, error?, duration, ts, cached? }`
关键实施参考资料
-工作进程:`search/query_agent.py::worker_process`
-缓存逻辑和信号量:`search/query_agent.py::execute_query`
-取消:`search/query_agent.py::_handle_cancel`
- A2A 消息传递和心跳:`util/adk/base_agent.py`
聚合代理——它的功能
主要用途:预先计算并缓存“成本高昂但频繁需要”的分析数据,以便用户界面和聊天功能能够即时响应。随着时间的推移,系统会“学习”哪些常用聚合数据应标记为预聚合需求。此外,它还支持热查询预热和手动触发。
- 将结果写入 Redis 键,例如 `pre_agg:*` 和 `analytics:*`(例如,`analytics:bond_risk_rankings`、`pre_agg:result:`)。 `)
- 大量使用 Neo4j 进行历史趋势聚合;部分任务还会存储全球指数汇总和全球市盈率。
- 采用简单的状态机(`AggregationStateMachine`)进行周期跟踪和健康报告
- 为了节省内存,程序设计为顺序运行(单工作进程),但仍能响应通过收件箱发出的手动触发指令。
日程安排和触发器
- 间隔:`AGG_AGENT_INTERVAL_SECS`(默认值 ≈ 3600 秒)
- 手动触发:向 Redis 列表 `agent:aggregation_agent:inbox` 推送消息(可通过环境变量 `AGG_AGENT_INBOX` 覆盖)。代理会在任务间隙检查收件箱。
- 热门查询:ZSET `stats:hot_queries` + `stats:query_to_cypher` 映射用于预先实现热门查询。
工作类型和产出
- `notional_aggregation`:按 `PeriodEndDate` 汇总多种资产类型的名义金额 (Neo4j) → Redis `pre_agg:*`
- `asset_holdings_trend`:资产持有趋势随时间的变化(Neo4j)→ Redis `pre_agg:*`
- `municipal_debt_summary`: 市政债务汇总 (Neo4j) → Redis `pre_agg:*`
- `industry_aggregation`: 行业/板块汇总 (Neo4j) → Redis `pre_agg:*`
- `world_pe_aggregation`:全球市盈率(本地计算)→ Redis `analytics:world_pe_ratios`
- `bond_risk_aggregation`:债券风险排名(机器学习服务)→ Redis `analytics:bond_risk_rankings`(+ 可选别名,通过 `BOND_RISK_UI_KEY`,可选 TTL `BOND_RISK_TTL`)
- `wilshire_aggregation`:全局索引聚合,受市场开盘检查保护 → Redis(索引缓存)
- `hot_query`(自定义):给定一个易于理解的查询名称,从 `stats:query_to_cypher` 中查找 Cypher,运行它,并将有效负载存储为 `pre_agg:result:` `
生命周期与流程
1. 开始/心跳
- 继承了 `BaseAgent.connect/register`(`agent:aggregation_agent:*` 下的 Redis 心跳和注册表条目)。
2. 周期性循环
- `run_cycle` 构建作业列表:固定作业 + 来自 `stats:hot_queries` 的前 `HOT_QUERY_LIMIT`(默认 20)。
- 为了保证内存使用可预测性,作业通过单线程执行器(`max_workers=1`)按顺序执行。
- 在每个任务之间,都会调用 `check_inbox`,这样手动触发器就不会被忽略。
3. 手动触发路径
- `check_inbox` 对 `agent:aggregation_agent:inbox` 执行 `LPOP` 操作,并解析 JSON 或纯字符串。如果存在过滤器,它会重新调用 `run_cycle`,并将结果限定为该作业或 `hot_query` 字符串。
4. 执行单位
- `execute_job` 为每种作业类型打开新的 Redis 和 Neo4j 连接以进行隔离,运行任务,使用 `AggregationEncoder` 将 JSON 写入缓存以实现强大的类型识别(Neo4j 时间类型、numpy 类型等)。


操作说明和可调参数
- Redis 端点和 Neo4j 凭据是从环境变量中读取的;提供了许多默认值,或者在开发环境中回退到 `creds.py` 中的值。
- 聚合节奏:`AGG_AGENT_INTERVAL_SECS`(默认值 3600)。热查询广度:`AGG_AGENT_HOT_LIMIT`(默认值 20)。
- 通过 `BOND_RISK_TTL` 和 `BOND_RISK_UI_KEY` 设置债券风险聚合 TTL/UI 别名。
- 查询并发限制:每个源的信号量(`search/query_agent.py` 中的 `LIMITS`)。
- 查询结果缓存:`query_cache: : ` 在 Redis 中;有效负载使用 `_jsonify_value` 进行规范化,以保证 JSON 安全性,包括 Neo4j 时间类型。
快速交叉引用
- 查询代理
- `search/query_agent.py::execute_query`(缓存、信号量、工作进程)
- `search/query_agent.py::worker_process`(后端逻辑)
- `search/query_agent.py::_handle_cancel`(强制终止)
- 聚合代理
- `data_sync/aggregation_agent.py::run_cycle`(作业规划、热点查询、收件箱检查)
- `data_sync/aggregation_agent.py::execute_job`(每个作业的操作和写入)
- `data_sync/aggregation_agent.py::check_inbox`(手动触发器)
- 基础框架
- `util/adk/base_agent.py::BaseAgent`(Celery 消息传递、心跳、注册表)

Redis 关键键/结构
代理注册/心跳
`registry:agent:
代理:
查询缓存
`query_cache:
汇总输出
`pre_agg:*` → JSON `PreAggPayload`
`analytics:world_pe_ratios` → 全球市盈率的 JSON 数组
`analytics:bond_risk_rankings` → 排名 JSON 数组(可选别名 `BOND_RISK_UI_KEY`)
聚合协调
`agent:aggregation_agent:inbox` (LPOP) → `AggTrigger`
`stats:hot_queries` (ZSET) → 热门查询名称
`stats:query_to_cypher`(哈希)→ 名称 → Cypher
我最初主要关注这两个代理,是因为我担心构建一个完整的基于 Python 的解决方案的可扩展性,该解决方案涉及多用户机器学习使用以及图和聚合结果集。
最初在过载和任务管理方面存在一些问题,所以我引入了 Celery 来将这些决策外包出去。正如您在上面读到的,这两个智能体都具有“学习”功能,因此我努力遵循人工智能“智能体”的总体目标,同时帮助系统扩展和优化。
Celery 与主编排引擎以及一些配置逻辑协同工作,根据用户需求启动或关闭任务管理工作进程。它帮助代理程序将资源使用控制在系统限制范围内,避免资源过度消耗导致崩溃。


