top of page

AI投資マシン:エージェントアーキテクチャ

概要


当初は、システム指向の特定の領域に重点を置きたいと考えていました。データの取得と要約は、機能の主要部分ではないにしても、重要な部分となるため、初期のエージェントではこれらの側面に重点を置くことにしました。ユーザーからのクエリや集計のパターンを「学習」して検出できることが重要だと考えたからです。これにより、ユーザーからの質問にプロアクティブに回答できるようになります。


Pythonベースのプロジェクト開発に全力を注ぐことに決めたので、エージェントの主要インフラコンポーネントは以下で構築しました。


  • Google エージェント開発キット (ADK) とエージェント間 (A2A) ベースのエージェント

  • タスク管理のためのCelery


  1. クエリエージェント: キャッシュとキャンセル機能を備えた非同期のマルチプロセスエグゼキュータ。Celery ルーティングの A2A メッセージが前面に来る。


  2. 集約エージェント: 一般的に要求される分析を Redis に実現し、UI とチャットを高速に保つ、定期的/セミオンデマンドの事前計算サービスです。


以下は、クエリエージェントとアグリゲーションエージェントの動作を示す簡潔なアーキテクチャ図と詳細なシーケンス図です。また、実装を相互参照しやすいよう、関連性の高いファイルと主要なメソッドも記載しています。


エージェントインタラクションのコンポーネント図
クエリ エージェント ファイル: `search/query_agent.py` - 主要なメソッド: `worker_process`、`execute_query`、`_handle_cancel`、`handle_message`、`connect` -

集約エージェント ファイル: `data_sync/aggregation_agent.py` - 主要メソッド: `execute_job`、`run_cycle`、`check_inbox`、`AggregationEncoder`、`AggregationAgent.run` -

共通エージェント スキャフォールド: `util/adk/base_agent.py` - 主要なクラス/メソッド: `AgentContext`、`BaseAgent.connect`、`BaseAgent.send_message` (Celery)、`BaseAgent.register`/heartbeat、`create_celery_task`

クエリエージェント — 機能

高レベルの目的:ユーザーのクエリリクエストパターンを学習し、データのキャッシュや取得に役立てます。チャット/検索バックエンドに代わって、高負荷のクエリを非同期的に実行し、ブロッキングI/Oとシリアル化を独立したワーカープロセスにオフロードします。バックエンドの多重化を実現します。


  • Neo4j、Couchbase、Redis、LLM、Text‑to‑Cypher - セマフォを使用したバックエンドによる同時実行の制限。

  • Redis のコンテンツハッシュをキーとする結果のキャッシュ

  • ワーカープロセスの終了によるハードキャンセルのサポート


ライフサイクルとメッセージフロー


1.登録/ハートビート- `BaseAgent.connect` と `BaseAgent.register` (`util/adk/base_agent.py` 経由) は Redis に接続し、`agent:query_agent:heartbeat` とレジストリ エントリ `registry:agent:query_agent` でハートビートを維持します。 - メッセージは Celery 経由で送受信されます。`BaseAgent.send_message` は Celery を使用して `A2AMessage` を `agent` に転送します。


2.タスクの受信- `QueryAgent.handle_message` は `payload_type` によってルーティングします: - `"task"` → `execute_query` - `"cancel"` → `_handle_cancel`


3.キャッシュ検索- `execute_query` はタスクペイロードの安定したハッシュを構築します(`id`、`outbox`、`parent_id` などの一時的なフィールドは除きます)。 - キャッシュキー: `query_cache:


4.分離されたワーカープロセスへのディスパッチ- セマフォは `source` (例: `neo4j`、`couchbase`、`llm` など) ごとに選択されます。 - エージェントは `worker_process(task, result_queue)` を実行する `multiprocessing` プロセスを生成します。 - プロセスのティアダウン中に記述子の問題が発生するのを回避するため、STDIN/STDOUT/STDERR はワーカー内で `/dev/null` にバインドされます。


5.バックエンド実行(ワーカー内部) - Neo4j: 同期ドライバー(`GraphDatabase.driver`)を使用し、提供されたCypher(または国/通貨パスエクスパンダーなどの特殊な「ツール」クエリ)を実行します。行を辞書として返し、`_jsonify_value` で正規化することでJSONセーフな型指定を保証します。 - Couchbase: `search.couchbase_service.cb_service` のメソッドを `params` でリフレクション呼び出しします。 - LLM/Text2Cypher: `chat_gateway.llm_client.LLMClient` を使用して、NLからCypher文字列を生成するか、LLMタスクを実行します。


6.結果の収集と返信- 親プロセスは `result_queue` をポーリングします。結果が利用可能になると、`AgentContext.reply` を使用して構造化された結果を元の送信者に転送します。この処理は Celery 経由でルーティングされます。 - キャッシュには正規化された JSON が保存される可能性があり(ファイルで定義された TTL 戦略を尊重)、これにより将来の同一タスクが即座に返されます。


7.キャンセル- `_handle_cancel` は `task_id` で実行中のワーカープロセスを検索し、`SIGTERM` を送信します。必要に応じて `SIGKILL` にエスカレートします。キャンセル結果はリクエスト元に返されます。


データ契約

-受信タスクペイロード

- `ソース`: `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`

- `query` および/または `method`、`params` (バックエンド固有)

-オプション: `tool` (特別な Neo4j クエリパターン)、`limit`、`outbox`

-送信結果

- `{ id, source, ok, data?, error?, duration, ts, cached? }`


主要な実装リファレンス

-ワーカー: `search/query_agent.py::worker_process`

-キャッシュロジックとセマフォ: `search/query_agent.py::execute_query`

-キャンセル: `search/query_agent.py::_handle_cancel`

- A2A メッセージングとハートビート: `util/adk/base_agent.py`


集約エージェント — 機能

大まかな目的:「高コストだが頻繁に必要となる」分析を事前に計算してキャッシュすることで、UIとチャットが即座に応答できるようにします。時間の経過とともに、頻繁に使用される集計を事前集計の必要性としてタグ付けする必要があることを「学習」します。また、ホットクエリのウォームアップと手動トリガーもサポートします。


- `pre_agg:*` や `analytics:*` などの Redis キーの下に結果を書き込みます (例: `analytics:bond_risk_rankings`、`pre_agg:result: `)

- 過去の傾向の集計にはNeo4jを多用しています。一部のタスクでは、グローバルインデックスのロールアップや世界株価収益率も保存しています。

- サイクル追跡とヘルスレポートにシンプルなステートマシン (`AggregationStateMachine`) を採用

- メモリを節約するために設計により順次実行(単一のワーカー)されますが、受信トレイを介した手動トリガーへの応答性は維持されます。


スケジュールとトリガー

- 間隔: `AGG_AGENT_INTERVAL_SECS` (デフォルト ≈ 3600 秒)

- 手動トリガー:Redisリスト「agent:aggregation_agent:inbox」にプッシュします(環境変数「AGG_AGENT_INBOX」で上書き可能)。エージェントはジョブ間で受信トレイをチェックします。

- ホットクエリ: ZSET `stats:hot_queries` + `stats:query_to_cypher` マッピングは、人気のあるクエリを事前にマテリアライズするために使用されます。


ジョブの種類と出力

- `notional_aggregation`: `PeriodEndDate` (Neo4j) で多くの資産タイプにわたる想定元本を事前合計します → Redis `pre_agg:*`

- `asset_holdings_trend`: 資産保有傾向の推移 (Neo4j) → Redis `pre_agg:*`

- `municipal_debt_summary`: 地方自治体のデットサマリー (Neo4j) → Redis `pre_agg:*`

- `industry_aggregation`: セクター/業種のロールアップ (Neo4j) → Redis `pre_agg:*`

- `world_pe_aggregation`: 世界株価収益率(ローカルコンピューティング) → Redis `analytics:world_pe_ratios`

- `bond_risk_aggregation`: 債券リスクランキング (ML サービス) → Redis `analytics:bond_risk_rankings` (+ `BOND_RISK_UI_KEY` 経由のオプションのエイリアス、オプションの TTL `BOND_RISK_TTL`)

- `wilshire_aggregation`: マーケットオープンチェックで保護されたグローバルインデックス集約 → Redis (インデックスキャッシュ)

- `hot_query` (カスタム): 人間が読めるクエリ名を指定すると、`stats:query_to_cypher` から Cypher を検索して実行し、ペイロードを `pre_agg:result: として保存します。 `


ライフサイクルとフロー

1. スタート/ハートビート

- `BaseAgent.connect/register` (`agent:aggregation_agent:*` の下の Redis ハートビートおよびレジストリ エントリ) を継承します。


2. 周期的なサイクル

- `run_cycle` はジョブ リストを構築します: 固定ジョブ + `stats:hot_queries` からの上位 `HOT_QUERY_LIMIT` (デフォルトは 20)。

- ジョブは、メモリ予測可能性のために、シングルスレッド エグゼキュータ (`max_workers=1`) を介して順次実行されます。

- 各ジョブの間で `check_inbox` が呼び出されるため、手動トリガーが不足することはありません。


3. 手動トリガーパス

- `check_inbox` は `agent:aggregation_agent:inbox` に対して `LPOP` を実行し、JSON またはプレーン文字列を解析します。フィルターが存在する場合は、そのジョブまたは `hot_query` 文字列に制約された `run_cycle` を再呼び出しします。


4. 実行ユニット

- `execute_job` は、ジョブ タイプごとに分離するために新しい Redis および Neo4j 接続を開き、タスクを実行し、堅牢な型指定 (Neo4j の一時型、numpy 型など) のために `AggregationEncoder` を使用して JSON をキャッシュに書き込みます。


エージェントのアクティビティ図


エージェントのアクティビティ図


運用上の注意と調整可能なパラメータ

- Redis エンドポイントと Neo4j 認証情報は env から読み取られます。多くのデフォルトが提供されるか、dev の `creds.py` 値にフォールバックします。

- 集計間隔: `AGG_AGENT_INTERVAL_SECS` (デフォルト: 3600)。ホットクエリの幅: `AGG_AGENT_HOT_LIMIT` (デフォルト: 20)。

- `BOND_RISK_TTL` および `BOND_RISK_UI_KEY` による債券リスク集約 TTL/UI エイリアス。

- クエリの同時実行制限: ソースごとのセマフォ (`search/query_agent.py` の `LIMITS`)。

- クエリ結果キャッシュ: `query_cache: : ` を Redis で使用します。ペイロードは、Neo4j のテンポラル タイプを含む JSON の安全性を保証するために `_jsonify_value` で正規化されます。


クイック相互参照

- クエリエージェント

- `search/query_agent.py::execute_query` (キャッシュ、セマフォ、ワーカーの生成)

- `search/query_agent.py::worker_process` (バックエンドごとのロジック)

- `search/query_agent.py::_handle_cancel` (強制終了)

- 集約エージェント

- `data_sync/aggregation_agent.py::run_cycle` (ジョブ計画、ホットクエリ、受信トレイチェック)

- `data_sync/aggregation_agent.py::execute_job` (ジョブごとのアクションと書き込み)

- `data_sync/aggregation_agent.py::check_inbox` (手動トリガー)

- ベースフレームワーク

- `util/adk/base_agent.py::BaseAgent` (Celery メッセージング、ハートビート、レジストリ)


エージェントのフロー図

Redisのキー/構造体

  • エージェントレジストリ/ハートビート

    • `レジストリ:エージェント:

    • `エージェント:


  • クエリキャッシュ

    `クエリキャッシュ:


  • 集約出力

    • `pre_agg:*` → JSON `PreAggPayload`

    • `analytics:world_pe_ratios` → 世界株価収益率のJSON配列

    • `analytics:bond_risk_rankings` → ランキングの JSON 配列 (オプションのエイリアス `BOND_RISK_UI_KEY`)


  • 集約調整

    • `agent:aggregation_agent:inbox` (LPOP) → `AggTrigger`

    • `stats:hot_queries` (ZSET) → 人気のクエリ名

    • `stats:query_to_cypher` (HASH) → 名前 → サイファー


私は、マルチユーザー ML の使用とグラフおよび集約された結果セットを含む Python ベースのソリューション全体を構築する際のスケーラビリティに対する懸念から、主にこれら 2 つのエージェントから始めました。


当初は過負荷とタスク管理に関していくつか問題がありました。そのため、これらの決定を外部化するためにCeleryを追加しました。前述の通り、どちらのエージェントにも「学習」の側面があるため、AI「エージェント」としての全体的な目的を遵守しつつ、システムのスケーリングと最適化を支援するよう努めました。


Celeryはメインのオーケストレーションエンジンと連携し、ユーザーの要求に応じてタスク管理ワーカーを起動またはシャットダウンする設定ロジックを備えています。これにより、エージェントがシステムのリソース制限内に収まり、暴走してクラッシュするのを防ぎます。


bottom of page