top of page

Máquina de inversión de IA: Arquitectura de agentes

Descripción general


Había algunas áreas específicas orientadas a sistemas en las que quería centrarme inicialmente. Dado que la recuperación y el resumen de datos iban a ser una parte importante, si no el área clave, de la funcionalidad, decidí centrar a mis agentes iniciales en esos aspectos, ya que consideraba importante que pudieran "aprender" y detectar patrones de consultas y agregaciones de los usuarios. Esto permitiría una actitud proactiva al responder a sus preguntas.


Desde que decidí apostar por el desarrollo de un proyecto basado en Python, los principales componentes de mi infraestructura de agente se crearon con:


  • Kit de desarrollo de agente de Google (ADK) y agentes basados en agente a agente (A2A)

  • Apio para la gestión de tareas


  1. El agente de consultas: un ejecutor multiproceso asincrónico con almacenamiento en caché y cancelación, liderado por mensajes A2A enrutados por Celery


  2. El agente de agregación: un servicio de precomputación periódico/semi-a pedido que materializa los análisis comúnmente solicitados en Redis, manteniendo la interfaz de usuario y el chat rápidos.


A continuación, se muestra una vista concisa de la arquitectura, además de diagramas de secuencia detallados sobre el funcionamiento del Agente de Consulta y el Agente de Agregación. También he incluido los archivos y métodos clave más relevantes para que pueda comparar la implementación rápidamente.


Diagrama de componentes de las interacciones de los agentes
Archivos del agente de consultas : `search/query_agent.py` - Métodos clave: `worker_process`, `execute_query`, `_handle_cancel`, `handle_message`, `connect`-

Archivos del agente de agregación : `data_sync/aggregation_agent.py` - Métodos clave: `execute_job`, `run_cycle`, `check_inbox`, `AggregationEncoder`, `AggregationAgent.run`-

Andamio de agente común : `util/adk/base_agent.py` - Clases/métodos clave: `AgentContext`, `BaseAgent.connect`, `BaseAgent.send_message` (Celery), `BaseAgent.register`/heartbeat, `create_celery_task`

Agente de consultas: qué hace

Objetivo general: Aprender los patrones de las solicitudes de consulta del usuario para fundamentar el almacenamiento en caché o la recuperación de datos. Ejecutar consultas pesadas de forma asíncrona en nombre del backend de chat/búsqueda, descargando el bloqueo de E/S y la serialización en procesos de trabajo aislados. Proporciona multiplexación del backend:


  • Neo4j, Couchbase, Redis, LLM, limitación de concurrencia de texto a cifrado mediante backend mediante semáforos.

  • Almacenamiento en caché de resultados codificado por un hash de contenido en Redis

  • Soporte de cancelación dura mediante la finalización de procesos de trabajo


Ciclo de vida y flujo de mensajes


1. Registro/Latido : `BaseAgent.connect` y `BaseAgent.register` (a través de `util/adk/base_agent.py`) se conectan a Redis y mantienen un latido en `agent:query_agent:heartbeat` y una entrada de registro `registry:agent:query_agent`. Los mensajes se envían y reciben mediante Celery. `BaseAgent.send_message` utiliza Celery para reenviar `A2AMessage` a `agent`.


2. Recepción de una tarea : `QueryAgent.handle_message` se enruta por `payload_type`: - `"task"` → `execute_query` - `"cancel"` → `_handle_cancel`


3. Búsqueda de caché : `execute_query` crea un hash estable de la carga útil de la tarea (excluyendo campos transitorios como `id`, `outbox`, `parent_id`). - Clave de caché: `query_cache:


4. Despacho a un proceso de trabajo aislado : se selecciona un semáforo por `source` (por ejemplo, `neo4j`, `couchbase`, `llm`, etc.). - El agente genera un proceso de `multiprocesamiento` que ejecuta `worker_process(task, result_queue)`. - STDIN/STDOUT/STDERR están vinculados a `/dev/null` en el trabajador para evitar problemas de descriptor durante el desmantelamiento del proceso.


5. Ejecución de backend (trabajador interno) - Neo4j: Utiliza un controlador síncrono (`GraphDatabase.driver`) y ejecuta Cypher proporcionado (o una consulta de herramienta especializada, por ejemplo, expansores de rutas de país/moneda). Devuelve filas como diccionarios y luego se normaliza mediante `_jsonify_value` para garantizar la tipificación segura con JSON. - Couchbase: Llama reflexivamente a un método en `search.couchbase_service.cb_service` con `params`. - LLM/Text2Cypher: Utiliza `chat_gateway.llm_client.LLMClient` para generar una cadena Cypher desde NL o ejecutar tareas LLM.


6. Recopilación y respuesta de resultados : El proceso principal sondea `result_queue`. Una vez disponible, reenvía el resultado estructurado al remitente original mediante `AgentContext.reply`, que se enruta a través de Celery. La caché puede rellenarse con el JSON normalizado (respetando la estrategia TTL definida en el archivo) para que las futuras tareas idénticas se devuelvan instantáneamente.


7. Cancelación : `_handle_cancel` busca el proceso de trabajo en ejecución mediante `task_id` y envía `SIGTERM`. Luego, escala a `SIGKILL` si es necesario. Se envía un resultado de cancelación al solicitante.


Contratos de datos

- Carga útil de la tarea entrante

- ` fuente` : `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`

- `consulta` y/o `método`, `parámetros` (específico del backend)

- Opcional: `tool` (patrones de consulta especiales de Neo4j), `limit`, `outbox`

- Resultado saliente

- `{ id, origen, ok, datos?, error?, duración, ts, almacenado en caché? }`


Referencias clave de implementación

- Trabajador : `search/query_agent.py::worker_process`

- Lógica de caché y semáforos : `search/query_agent.py::execute_query`

- Cancelación : `search/query_agent.py::_handle_cancel`

- Mensajería y latidos A2A : `util/adk/base_agent.py`


Agente de agregación: qué hace

Objetivo general: Precalcular y almacenar en caché análisis costosos pero frecuentemente necesarios para que la interfaz de usuario y el chat puedan responder al instante. Con el tiempo, aprender qué agregaciones de uso frecuente deben etiquetarse como necesarias antes de la agregación. También admite el calentamiento de consultas activas y los activadores manuales.


- Escribe resultados bajo claves Redis como `pre_agg:*` y `analytics:*` (por ejemplo, `analytics:bond_risk_rankings`, `pre_agg:result: `)

- Utiliza Neo4j con frecuencia para agregaciones de tendencias históricas; algunas tareas también almacenan acumulaciones de índices globales y ratios P/E mundiales.

- Emplea una máquina de estados simple (`AggregationStateMachine`) para el seguimiento del ciclo y los informes de salud.

- Se ejecuta secuencialmente (un solo trabajador) por diseño para conservar la memoria, pero mantiene la capacidad de respuesta a los activadores manuales a través de una bandeja de entrada.


Programación y activadores

- Intervalo: `AGG_AGENT_INTERVAL_SECS` (predeterminado ≈ 3600 segundos)

- Activadores manuales: enviar a la lista de Redis `agent:aggregation_agent:inbox` (anulable mediante el entorno `AGG_AGENT_INBOX`). El agente revisa la bandeja de entrada entre trabajos.

- Consultas activas: el mapeo ZSET `stats:hot_queries` + `stats:query_to_cypher` se utiliza para materializar previamente las consultas populares.


Tipos de trabajos y resultados

- `notional_aggregation`: Suma previamente los valores nocionales en muchos tipos de activos por `PeriodEndDate` (Neo4j) → Redis `pre_agg:*`

- `asset_holdings_trend`: Tendencias de tenencia de activos a lo largo del tiempo (Neo4j) → Redis `pre_agg:*`

- `municipal_debt_summary`: Resúmenes municipales (Neo4j) → Redis `pre_agg:*`

- `industry_aggregation`: acumulaciones de sectores/industrias (Neo4j) → Redis `pre_agg:*`

- `world_pe_aggregation`: ratios P/E mundiales (cálculo local) → Redis `analytics:world_pe_ratios`

- `bond_risk_aggregation`: Clasificaciones de riesgo de bonos (servicio ML) → Redis `analytics:bond_risk_rankings` (+ alias opcional a través de `BOND_RISK_UI_KEY`, TTL opcional `BOND_RISK_TTL`)

- `wilshire_aggregation`: agregación de índice global, protegida por la verificación de apertura del mercado → Redis (caché de índices)

- `hot_query` (personalizado): Dado un nombre de consulta legible para humanos, busque Cypher desde `stats:query_to_cypher`, ejecútelo, almacene la carga útil como `pre_agg:result: `


Ciclo de vida y flujo

1. Inicio/Latido del corazón

- Hereda `BaseAgent.connect/register` (latido de Redis y entradas de registro bajo `agent:aggregation_agent:*`).


2. Ciclo periódico

- `run_cycle` crea la lista de trabajos: trabajos fijos + `HOT_QUERY_LIMIT` superior (predeterminado 20) de `stats:hot_queries`.

- Los trabajos se ejecutan secuencialmente a través de un ejecutor de un solo hilo (`max_workers=1`) para la previsibilidad de la memoria.

- Entre cada trabajo, se llama a `check_inbox` para que los activadores manuales no se llenen de información.


3. Ruta de activación manual

- `check_inbox` realiza un `LPOP` en `agent:aggregation_agent:inbox` y analiza JSON o una cadena simple. Si hay un filtro, vuelve a invocar `run_cycle` restringido a ese trabajo o a una cadena `hot_query`.


4. Unidad de Ejecución

- `execute_job` abre nuevas conexiones Redis y Neo4j para aislamiento por tipo de trabajo, ejecuta la tarea, escribe JSON en caché con `AggregationEncoder` para tipificación robusta (temporal Neo4j, tipos numpy, etc.).


diagrama de actividades de los agentes


diagrama de actividad del agente


Notas operativas y ajustables

- Los puntos finales de Redis y las credenciales de Neo4j se leen desde el entorno; se proporcionan muchos valores predeterminados o se recurre a los valores de `creds.py` en el desarrollo.

Cadencia de agregación: `AGG_AGENT_INTERVAL_SECS` (predeterminado: 3600). Amplitud de consulta activa: `AGG_AGENT_HOT_LIMIT` (predeterminado: 20).

- Alias TTL/UI de agregación de riesgo de bono a través de `BOND_RISK_TTL` y `BOND_RISK_UI_KEY`.

- Límites de concurrencia de consultas: semáforos por origen (`LIMITS` en `search/query_agent.py`).

- Caché de resultados de la consulta: `query_cache: : ` en Redis; carga útil normalizada con `_jsonify_value` para garantizar la seguridad de JSON, incluidos los tipos temporales de Neo4j.


Referencias cruzadas rápidas

- Agente de consultas

- `search/query_agent.py::execute_query` (caché, semáforos, generación de trabajadores)

- `search/query_agent.py::worker_process` (lógica por backend)

- `search/query_agent.py::_handle_cancel` (forzar la terminación)

- Agente de agregación

- `data_sync/aggregation_agent.py::run_cycle` (planificación de trabajos, consultas activas, comprobaciones de la bandeja de entrada)

- `data_sync/aggregation_agent.py::execute_job` (acciones y escrituras por trabajo)

- `data_sync/aggregation_agent.py::check_inbox` (activadores manuales)

- Marco base

- `util/adk/base_agent.py::BaseAgent` (mensajería de Celery, latido, registro)


diagrama de flujo de agentes

Claves/Estructuras de Redis

  • Registro/latido del agente

    • `registro:agente:

    • `agente:


  • Caché de consultas

    `caché de consulta:


  • Resultados de agregación

    • `pre_agg:*` → JSON `PreAggPayload`

    • `analytics:world_pe_ratios` → Matriz JSON de P/E mundial

    • `analytics:bond_risk_rankings` → Matriz JSON de clasificaciones (alias opcional `BOND_RISK_UI_KEY`)


  • Coordinación de agregación

    • `agente:agente_de_agregación:bandeja de entrada` (LPOP) → `AggTrigger`

    • `stats:hot_queries` (ZSET) → nombres de consultas populares

    • `stats:query_to_cypher` (HASH) → nombre → Cypher


Comencé principalmente con estos dos agentes debido a mi preocupación por la escalabilidad de construir una solución completa basada en Python que involucrara el uso de ML por parte de múltiples usuarios, así como conjuntos de resultados gráficos y agregados.


Inicialmente, surgieron algunos problemas con la sobrecarga y la gestión de tareas, por lo que añadí Celery para externalizar esas decisiones. Como ya se ha dicho, ambos agentes tienen aspectos de "aprendizaje", así que intenté cumplir con el objetivo general del "agente" de IA, además de ayudar con el escalado y la optimización del sistema.


Celery funciona en conjunto con el motor de orquestación principal y cierta lógica de configuración para generar o detener trabajadores de gestión de tareas según la demanda del usuario. Esto ayuda a los agentes a mantenerse dentro de los límites de recursos del sistema, evitando que se descontrolen y se bloqueen.


Etiquetas:

 
 
bottom of page