Machine d'investissement IA : Architecture des agents
- Claude Paugh

- 15 févr.
- 7 min de lecture
Aperçu
Je souhaitais me concentrer initialement sur certains aspects spécifiques liés aux systèmes. L'extraction et la synthèse des données étant un élément important, voire essentiel, des fonctionnalités, j'ai décidé d'orienter mes premiers agents vers ces aspects. Il me semblait important qu'ils puissent « apprendre » et détecter les schémas de requêtes et d'agrégations des utilisateurs, ce qui permettrait d'anticiper leurs besoins et de répondre de manière proactive à leurs questions.
Puisque j'ai décidé de me consacrer entièrement au développement d'un projet basé sur Python, les principaux composants de mon infrastructure d'agent ont été construits avec :
Agents basés sur Google Agent Development Kit (ADK) et Agent-to-Agent (A2A)
Céleri pour la gestion des tâches
L'agent de requêtes : un exécuteur asynchrone multiprocessus avec mise en cache et annulation, utilisant des messages A2A routés par Celery.
L'agent d'agrégation : un service de pré-calcul périodique/semi-à-la-demande qui matérialise les analyses fréquemment demandées dans Redis, maintenant ainsi la rapidité de l'interface utilisateur et du chat.
Vous trouverez ci-dessous une vue d'ensemble concise de l'architecture, ainsi que des diagrammes de séquence détaillés illustrant le fonctionnement de l'agent de requêtes et de l'agent d'agrégation. J'ai également inclus les fichiers et méthodes clés les plus pertinents afin de faciliter la consultation de l'implémentation.

Fichiers de l'agent de requêtes : `search/query_agent.py` - Méthodes principales : `worker_process`, `execute_query`, `_handle_cancel`, `handle_message`, `connect`
Fichiers de l'agent d'agrégation : `data_sync/aggregation_agent.py` - Méthodes principales : `execute_job`, `run_cycle`, `check_inbox`, `AggregationEncoder`, `AggregationAgent.run`
Structure commune de l'agent : `util/adk/base_agent.py` - Classes/méthodes clés : `AgentContext`, `BaseAgent.connect`, `BaseAgent.send_message` (Celery), `BaseAgent.register`/heartbeat, `create_celery_task`
Agent de requêtes — Son fonctionnement
Objectif principal : Analyser les schémas de requêtes des utilisateurs afin d’optimiser la mise en cache et la récupération des données. Exécuter les requêtes lourdes de manière asynchrone pour le compte du serveur de chat/recherche, en déchargeant les E/S bloquantes et la sérialisation dans des processus de travail isolés. Permet le multiplexage du serveur.
Neo4j, Couchbase, Redis, LLM, Text‑to-Cypher - Limitation de la concurrence par le backend à l'aide de sémaphores.
Mise en cache des résultats indexée par un hachage de contenu dans Redis
Prise en charge de l'annulation pure et simple par la mise fin des processus de travail
Flux de messages et de recyclage de la vie
1. Enregistrement/Pilote de base : `BaseAgent.connect` et `BaseAgent.register` (via `util/adk/base_agent.py`) se connectent à Redis et maintiennent un pouls à l’adresse `agent:query_agent:heartbeat` ainsi qu’une entrée dans le registre `registry:agent:query_agent`. Les messages sont envoyés et reçus via Celery. `BaseAgent.send_message` utilise Celery pour transmettre les messages `A2AMessage` à `agent`.
2. Réception d'une tâche - Routes `QueryAgent.handle_message` par `payload_type` : - `"task"` → `execute_query` - `"cancel"` → `_handle_cancel`
3. Recherche dans le cache - `execute_query` construit un hachage stable de la charge utile de la tâche (à l'exclusion des champs transitoires tels que `id`, `outbox`, `parent_id`). - Clé du cache : `query_cache` :
4. Distribution vers un processus worker isolé : un sémaphore est sélectionné pour chaque source (par exemple, neo4j, couchbase, llm, etc.). L’agent lance un processus multiprocessing exécutant la fonction worker_process(task, result_queue). Les flux STDIN, STDOUT et STDERR sont redirigés vers /dev/null dans le worker afin d’éviter les problèmes de descripteurs lors de la fermeture du processus.
5. Exécution côté serveur (dans le worker) - Neo4j : Utilise un pilote synchrone (`GraphDatabase.driver`) et exécute une requête Cypher fournie (ou une requête « outil » spécialisée, par exemple, des expandeurs de chemin pays/devise). Renvoie les lignes sous forme de dictionnaires, puis les normalise via `_jsonify_value` pour garantir un typage JSON sûr. - Couchbase : Appelle par réflexion une méthode de `search.couchbase_service.cb_service` avec `params`. - LLM/Text2Cypher : Utilise `chat_gateway.llm_client.LLMClient` pour générer une chaîne Cypher à partir de NL ou exécuter des tâches LLM.
6. Collecte des résultats et réponse : Le processus parent interroge `result_queue`. Dès qu'un résultat est disponible, il le transmet à l'expéditeur initial via `AgentContext.reply`, qui utilise Celery. Le cache peut être rempli avec le JSON normalisé (en respectant la stratégie TTL définie dans le fichier) afin que les tâches identiques ultérieures soient traitées instantanément.
7. Annulation – La fonction `_handle_cancel` recherche le processus worker en cours d'exécution à l'aide de l'identifiant `task_id` et envoie un signal `SIGTERM`, puis un signal `SIGKILL` si nécessaire. Un message de confirmation d'annulation est envoyé au demandeur.
Contrats de données
- Charge utile de la tâche entrante
- ` source `: `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`
- `query` et/ou `method`, `params` (spécifiques au backend)
- Facultatif : `tool` (modèles de requêtes Neo4j spéciaux), `limit`, `outbox`
- Résultat sortant
- `{ id, source, ok, data?, error?, duration, ts, cached? }`
Références clés pour la mise en œuvre
- Worker : `search/query_agent.py::worker_process`
- Logique du cache et sémaphores : `search/query_agent.py::execute_query`
- Annulation : `search/query_agent.py::_handle_cancel`
- Messagerie A2A et signal de présence : `util/adk/base_agent.py`
Agent d'agrégation — Son rôle
Objectif principal : Précalculer et mettre en cache les analyses « coûteuses mais fréquemment nécessaires » afin que l’interface utilisateur et le chat puissent répondre instantanément. Au fil du temps, identifier les agrégations fréquemment utilisées qui doivent être pré-calculées. Le système prend également en charge le préchauffage des requêtes et les déclencheurs manuels.
- Enregistre les résultats sous des clés Redis telles que `pre_agg:*` et `analytics:*` (par exemple, `analytics:bond_risk_rankings`, `pre_agg:result: `)
- Utilise intensivement Neo4j pour l'agrégation des tendances historiques ; certaines tâches stockent également des regroupements d'indices globaux et des ratios cours/bénéfice mondiaux.
- Utilise une machine à états simple (« AggregationStateMachine ») pour le suivi des cycles et les rapports d'état.
- Fonctionne de manière séquentielle (un seul processus) afin d'économiser de la mémoire, tout en restant réactif aux déclenchements manuels via une boîte de réception.
Planification et déclencheurs
- Intervalle : `AGG_AGENT_INTERVAL_SECS` (par défaut ≈ 3600 secondes)
- Déclencheurs manuels : envoi vers la liste Redis `agent:aggregation_agent:inbox` (modifiable via la variable d'environnement `AGG_AGENT_INBOX`). L'agent vérifie la boîte de réception entre chaque tâche.
- Requêtes populaires : le mappage ZSET `stats:hot_queries` + `stats:query_to_cypher` est utilisé pour pré-matérialiser les requêtes populaires.
Types de tâches et résultats
- `notional_aggregation` : Calcule les sommes notionnelles sur plusieurs types d'actifs par `PeriodEndDate` (Neo4j) → Redis `pre_agg:*`
- `asset_holdings_trend` : Évolution des portefeuilles d'actifs au fil du temps (Neo4j) → Redis `pre_agg:*`
- `municipal_debt_summary` : Résumés des dettes municipales (Neo4j) → Redis `pre_agg:*`
- `industry_aggregation` : Regroupements par secteur/industrie (Neo4j) → Redis `pre_agg:*`
- `world_pe_aggregation` : Ratios cours/bénéfice mondiaux (calcul local) → Redis `analytics:world_pe_ratios`
- `bond_risk_aggregation` : Classement des risques obligataires (service d'apprentissage automatique) → Redis `analytics:bond_risk_rankings` (+ alias optionnel via `BOND_RISK_UI_KEY`, TTL optionnel `BOND_RISK_TTL`)
- `wilshire_aggregation` : Agrégation globale des index, protégée par une vérification d’ouverture du marché → Redis (cache des index)
- `hot_query` (personnalisé) : Étant donné un nom de requête lisible par l’humain, recherchez Cypher dans `stats:query_to_cypher`, exécutez-le, stockez la charge utile sous le nom `pre_agg:result:`. `
Cycle de vie et flux
1. Début/Battement cardiaque
- Hérite de `BaseAgent.connect/register` (battement de cœur Redis et entrées de registre sous `agent:aggregation_agent:*`).
2. Cycle périodique
- `run_cycle` construit la liste des tâches : tâches fixes + top `HOT_QUERY_LIMIT` (par défaut 20) de `stats:hot_queries`.
- Les tâches sont exécutées séquentiellement via un exécuteur à thread unique (`max_workers=1`) pour une prévisibilité de la mémoire.
- Entre chaque tâche, `check_inbox` est appelé afin que les déclencheurs manuels ne soient pas indisponibles.
3. Chemin de déclenchement manuel
La fonction `check_inbox` effectue une opération `LPOP` sur `agent:aggregation_agent:inbox` et analyse soit du JSON, soit une chaîne de caractères brute. Si un filtre est présent, elle réexécute `run_cycle` en le limitant à cette tâche ou à une chaîne `hot_query`.
4. Unité d'exécution
- `execute_job` ouvre de nouvelles connexions Redis et Neo4j pour l'isolation par type de tâche, exécute la tâche, écrit du JSON dans le cache avec `AggregationEncoder` pour un typage robuste (types temporels Neo4j, types numpy, etc.).


Notes opérationnelles et paramètres réglables
- Les points de terminaison Redis et les identifiants Neo4j sont lus à partir de l'environnement ; de nombreuses valeurs par défaut sont fournies ou utilisent les valeurs de `creds.py` en mode développement.
- Cadence d'agrégation : `AGG_AGENT_INTERVAL_SECS` (par défaut : 3600). Largeur des requêtes fréquentes : `AGG_AGENT_HOT_LIMIT` (par défaut : 20).
- Alias TTL/UI d'agrégation des risques obligataires via `BOND_RISK_TTL` et `BOND_RISK_UI_KEY`.
- Limites de concurrence des requêtes : sémaphores par source (`LIMITS` dans `search/query_agent.py`).
- Cache des résultats de requête : `query_cache : : ` dans Redis ; la charge utile est normalisée avec `_jsonify_value` pour garantir la sécurité JSON, y compris les types temporels Neo4j.
Références croisées rapides
- Agent de requête
- `search/query_agent.py::execute_query` (cache, sémaphores, création de processus)
- `search/query_agent.py::worker_process` (logique par backend)
- `search/query_agent.py::_handle_cancel` (forcer l'arrêt)
- Agent d'agrégation
- `data_sync/aggregation_agent.py::run_cycle` (planification des tâches, requêtes fréquentes, vérification de la boîte de réception)
- `data_sync/aggregation_agent.py::execute_job` (actions et écritures par tâche)
- `data_sync/aggregation_agent.py::check_inbox` (déclencheurs manuels)
- Cadre de base
- `util/adk/base_agent.py::BaseAgent` (Messagerie Celery, signal de présence, registre)

Clés/structures Redis
Registre des agents/battement de cœur
`registre:agent:
`agent:
cache de requêtes
`query_cache:
Résultats d'agrégation
`pre_agg:*` → JSON `PreAggPayload`
`analytics:world_pe_ratios` → Tableau JSON des ratios cours/bénéfice mondiaux
`analytics:bond_risk_rankings` → Tableau JSON de classements (alias optionnel `BOND_RISK_UI_KEY`)
Coordination de l'agrégation
`agent:aggregation_agent:inbox` (LPOP) → `AggTrigger`
`stats:hot_queries` (ZSET) → noms de requêtes populaires
`stats:query_to_cypher` (HASH) → nom → Chiffre
J'ai commencé principalement par ces deux agents en raison de mes préoccupations concernant l'évolutivité de la construction d'une solution complète basée sur Python impliquant l'utilisation de l'apprentissage automatique multi-utilisateurs ainsi que des ensembles de résultats graphiques et agrégés.
Au départ, des problèmes de surcharge et de gestion des tâches sont apparus, c'est pourquoi j'ai ajouté Celery pour externaliser ces décisions. Comme indiqué précédemment, les deux agents possèdent des capacités d'apprentissage ; j'ai donc veillé à respecter l'objectif global de l'agent d'IA, tout en contribuant à la mise à l'échelle et à l'optimisation du système.
Celery fonctionne de concert avec le moteur d'orchestration principal et une logique de configuration permettant de lancer ou d'arrêter des processus de gestion des tâches en fonction des besoins de l'utilisateur. Il permet ainsi aux agents de rester dans les limites de ressources système et d'éviter toute saturation et tout plantage.


