AI Investing Machine: Agents Architecture
- Claude Paugh

- 6 hours ago
- 5 min read
Overview
There were some specific systems orientated areas that I wanted to focus on to begin with. Since data retrieval and summarization was going to be an important part of, if not the key area, of functionality. I decided to focus my initial agents on those aspects, since I thought it was important that they be able to "learn" and detect patterns of queries and aggregations from users. It would enable a pro-active aspect when answering questions from users.
Since I decided to go all in on a Python based project development. My main agent infrastructure components were built with:
Google Agent Development Kit (ADK) and Agent-to-Agent (A2A) based agents
Celery for task management
The Query Agent: an async, multi‑process executor with caching and cancellation, fronted by Celery‑routed A2A messages
The Aggregation Agent: a periodic/semi‑on‑demand pre‑computation service that materializes commonly requested analytics into Redis, keeping the UI and chat fast.
Below is a concise architecture view plus detailed sequence diagrams for how the Query Agent and Aggregation Agent work. I’ve also included the most relevant files and key methods so you can cross‑reference the implementation quickly.

Query Agent files: `search/query_agent.py` - Key methods: `worker_process`, `execute_query`, `_handle_cancel`, `handle_message`, `connect`-
Aggregation Agent files: `data_sync/aggregation_agent.py` - Key methods: `execute_job`, `run_cycle`, `check_inbox`, `AggregationEncoder`, `AggregationAgent.run`-
Common agent scaffold: `util/adk/base_agent.py` - Key classes/methods: `AgentContext`, `BaseAgent.connect`, `BaseAgent.send_message` (Celery), `BaseAgent.register`/heartbeat, `create_celery_task`
Query Agent — What it does
High‑level purpose: Learn user patterns of query requests to inform caching or retrieval of data. Execute heavy queries asynchronously on behalf of the chat/search backend, offloading blocking I/O and serialization into isolated worker processes. It provides backend multiplexing:
Neo4j, Couchbase, Redis, LLM, Text‑to‑Cypher-Concurrency limiting by backend using semaphores.
Result caching keyed by a content hash in Redis
Hard cancellation support by terminating worker processes
Lifecycle & Message Flow
1. Registration/Heartbeat - `BaseAgent.connect` and `BaseAgent.register` (via `util/adk/base_agent.py`) connect to Redis and keep a heartbeat at `agent:query_agent:heartbeat` and a registry entry `registry:agent:query_agent`. - Messages are sent/received via Celery. `BaseAgent.send_message` uses Celery to forward `A2AMessage` to `agent.<recipient>.handle`.
2. Receiving a Task - `QueryAgent.handle_message` routes by `payload_type`: - `"task"` → `execute_query` - `"cancel"` → `_handle_cancel`
3. Cache Lookup - `execute_query` builds a stable hash of the task payload (excluding transient fields like `id`, `outbox`, `parent_id`). - Cache key: `query_cache:<source>:<sha256>` (prefix defined in file). If present, the cached JSON is returned immediately with `cached: true`.
4. Dispatch to an Isolated Worker Process - A semaphore is selected per `source` (e.g., `neo4j`, `couchbase`, `llm`, etc.). - The agent spawns a `multiprocessing` process running `worker_process(task, result_queue)`. - STDIN/STDOUT/STDERR are bound to `/dev/null` in the worker to avoid descriptor issues during process teardown.
5. Backend Execution (inside worker) - Neo4j: Uses a synchronous driver (`GraphDatabase.driver`) and runs provided Cypher (or a specialized “tool” query, e.g., country/currency path expanders). Returns rows as dictionaries, then normalized via `_jsonify_value` to ensure JSON‑safe typing. - Couchbase: Reflectively calls a method on `search.couchbase_service.cb_service` with `params`. - LLM/Text2Cypher: Uses `chat_gateway.llm_client.LLMClient` to either produce a Cypher string from NL or run LLM tasks.
6. Result Collection & Reply - The parent process polls `result_queue`. Once available, it forwards the structured result to the original sender using `AgentContext.reply`, which routes over Celery. - The cache may be populated with the normalized JSON (respecting TTL strategy defined in file) so future identical tasks are returned instantly.
7. Cancellation - `_handle_cancel` looks up the running worker process by `task_id` and sends `SIGTERM`, then escalates to `SIGKILL` if needed. A cancellation result is replied to the requester.
Data Contracts
- Incoming task payload
- `source`: `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`
- `query` and/or `method`, `params` (backend‑specific)
- Optional: `tool` (special Neo4j query patterns), `limit`, `outbox`
- Outgoing result
- `{ id, source, ok, data?, error?, duration, ts, cached? }`
Key Implementation References
- Worker: `search/query_agent.py::worker_process`
- Cache logic & semaphores: `search/query_agent.py::execute_query`
- Cancellation: `search/query_agent.py::_handle_cancel`
- A2A messaging & heartbeat: `util/adk/base_agent.py`
Aggregation Agent — What it does
High‑level purpose: Pre‑compute and cache “expensive but frequently needed” analytics so the UI and chat can respond instantly. Over time "learn" which frequently used aggregations should be tagged as a pre-aggregation need. It also supports hot‑query warmups and manual triggers.
- Writes results under Redis keys like `pre_agg:*` and `analytics:*` (e.g., `analytics:bond_risk_rankings`, `pre_agg:result:<hash>`)
- Uses Neo4j heavily for historical trend aggregations; some tasks also store global index rollups and world P/E ratios
- Employs a simple state machine (`AggregationStateMachine`) for cycle tracking and health reporting
- Runs sequentially (single worker) by design to conserve memory, but maintains responsiveness to manual triggers via an inbox
Scheduling & Triggers
- Interval: `AGG_AGENT_INTERVAL_SECS` (default ≈ 3600 seconds)
- Manual triggers: push to Redis list `agent:aggregation_agent:inbox` (overridable via env `AGG_AGENT_INBOX`). The agent checks the inbox between jobs.
- Hot queries: ZSET `stats:hot_queries` + `stats:query_to_cypher` mapping are used to pre‑materialize popular queries.
Job Types and Outputs
- `notional_aggregation`: Pre‑sums notional across many asset types by `PeriodEndDate` (Neo4j) → Redis `pre_agg:*`
- `asset_holdings_trend`: Asset holding trends over time (Neo4j) → Redis `pre_agg:*`
- `municipal_debt_summary`: Muni summaries (Neo4j) → Redis `pre_agg:*`
- `industry_aggregation`: Sector/industry rollups (Neo4j) → Redis `pre_agg:*`
- `world_pe_aggregation`: World P/E ratios (local compute) → Redis `analytics:world_pe_ratios`
- `bond_risk_aggregation`: Bond risk rankings (ML service) → Redis `analytics:bond_risk_rankings` (+ optional alias via `BOND_RISK_UI_KEY`, optional TTL `BOND_RISK_TTL`)
- `wilshire_aggregation`: Global index aggregation, guarded by market‑open check → Redis (indexes cache)
- `hot_query` (custom): Given a human‑readable query name, look up Cypher from `stats:query_to_cypher`, run it, store payload as `pre_agg:result:<sha256(query)>`
Lifecycle & Flow
1. Start/Heartbeat
- Inherits `BaseAgent.connect/register` (Redis heartbeat and registry entries under `agent:aggregation_agent:*`).
2. Periodic Cycle
- `run_cycle` builds the job list: fixed jobs + top `HOT_QUERY_LIMIT` (default 20) from `stats:hot_queries`.
- Jobs are executed sequentially via a single‑thread executor (`max_workers=1`) for memory predictability.
- Between each job, `check_inbox` is called so manual triggers are not starved.
3. Manual Trigger Path
- `check_inbox` does an `LPOP` on `agent:aggregation_agent:inbox` and parses either JSON or a plain string. If a filter is present, it re‑invokes `run_cycle` constrained to that job or a `hot_query` string.
4. Execution Unit
- `execute_job` opens fresh Redis and Neo4j connections for isolation per job type, runs the task, writes JSON to cache with `AggregationEncoder` for robust typing (Neo4j temporal, numpy types, etc.).


Operational Notes & Tunables
- Redis endpoints and Neo4j creds are read from env; many defaults are provided or fallback to `creds.py` values in dev.
- Aggregation cadence: `AGG_AGENT_INTERVAL_SECS` (default 3600). Hot‑query breadth: `AGG_AGENT_HOT_LIMIT` (default 20).
- Bond Risk aggregation TTL/UI alias via `BOND_RISK_TTL` and `BOND_RISK_UI_KEY`.
- Query concurrency limits: per‑source semaphores (`LIMITS` in `search/query_agent.py`).
- Query result cache: `query_cache:<source>:<hash>` in Redis; payload normalized with `_jsonify_value` to guarantee JSON safety, including Neo4j temporal types.
Quick Cross‑References
- Query Agent
- `search/query_agent.py::execute_query` (cache, semaphores, worker spawn)
- `search/query_agent.py::worker_process` (per‑backend logic)
- `search/query_agent.py::_handle_cancel` (force terminate)
- Aggregation Agent
- `data_sync/aggregation_agent.py::run_cycle` (job planning, hot queries, inbox checks)
- `data_sync/aggregation_agent.py::execute_job` (per‑job actions and writes)
- `data_sync/aggregation_agent.py::check_inbox` (manual triggers)
- Base Framework
- `util/adk/base_agent.py::BaseAgent` (Celery messaging, heartbeat, registry)

Key Redis Keys / Structures
Agent registry/heartbeat
`registry:agent:<agent_id>` → JSON `{ agent_id, capabilities, inbox_key, last_seen, status }`
`agent:<agent_id>:heartbeat` → HASH `{ ts, status }` (TTL ~60s)
Query cache
`query_cache:<source>:<sha256(task_without_id_outbox_parent)>` → JSON `QueryResult`
Aggregation outputs
`pre_agg:*` → JSON `PreAggPayload`
`analytics:world_pe_ratios` → JSON array of world P/E
`analytics:bond_risk_rankings` → JSON array of rankings (optional alias `BOND_RISK_UI_KEY`)
Aggregation coordination
`agent:aggregation_agent:inbox` (LPOP) → `AggTrigger`
`stats:hot_queries` (ZSET) → popular query names
`stats:query_to_cypher` (HASH) → name → Cypher
I started principally with these two agents due to my concern over scalability of build a whole Python based solution that involved multi-user ML use as well as graph and aggregated result sets.
There were some issues initially regarding overloading and task management, which is why I added Celery to externalize those decisions. There are "learning" aspects of both agents, as you have read above, so I tried to adhere to the overall AI "agent" objective, plus help with system scaling and optimization.
Celery works in conjunction with the main orchestration engine, and some configuration logic to spawn or shutdown task management workers based on user demand. It helps the agents to stay within system limits for resources, and not just run-away and crash.


