Máquina de Investimento com IA: Arquitetura de Agentes
- Claude Paugh

- 15 de fev
- 6 min de leitura
Pré-visualização
Inicialmente, eu queria me concentrar em aspectos específicos relacionados aos sistemas. Como a extração e síntese de dados são elementos importantes, até mesmo essenciais, da funcionalidade, decidi orientar meus primeiros agentes para esses aspectos. Pareceu-me importante que eles fossem capazes de "aprender" e detectar padrões de consulta e agregação do usuário, o que lhes permitiria antecipar as necessidades dos usuários e responder proativamente às suas perguntas.
Como decidi me dedicar inteiramente ao desenvolvimento de um projeto baseado em Python, os principais componentes da minha infraestrutura de agentes foram construídos com:
Agentes baseados no Google Agent Development Kit (ADK) e Agent-to-Agent (A2A)
Aipo para gerenciamento de tarefas
O agente de requisição: um executor assíncrono multiprocesso com cache e cancelamento, usando mensagens A2A roteadas pelo Celery.
O agente de agregação: um serviço de pré-computação periódico/semi-sob demanda que materializa análises frequentemente solicitadas no Redis, mantendo assim a velocidade da interface do usuário e do chat.
A seguir, você encontrará uma visão geral concisa da arquitetura, juntamente com diagramas de sequência detalhados que ilustram a operação do agente de consulta e do agente de agregação. Também incluí os arquivos e métodos principais mais relevantes para facilitar a revisão da implementação.

Arquivos do agente de consulta : `search/query_agent.py` - Métodos principais: `worker_process`, `execute_query`, `_handle_cancel`, `handle_message`, `connect`
Arquivos do agente de agregação : `data_sync/aggregation_agent.py` - Métodos principais: `execute_job`, `run_cycle`, `check_inbox`, `AggregationEncoder`, `AggregationAgent.run`
Estrutura comum do agente : `util/adk/base_agent.py` - Classes/métodos principais: `AgentContext`, `BaseAgent.connect`, `BaseAgent.send_message` (Celery), `BaseAgent.register`/heartbeat, `create_celery_task`
Agente de Solicitação — Como Funciona
Objetivo principal: Analisar padrões de consulta do usuário para otimizar o armazenamento em cache e a recuperação de dados. Executar consultas que consomem muitos recursos de forma assíncrona em nome do servidor de chat/busca, transferindo operações de E/S bloqueantes e serialização para processos de trabalho isolados. Permite multiplexação de servidores.
Neo4j, Couchbase, Redis, LLM, Text-to-Cypher - Limitação da concorrência pelo backend usando semáforos.
Armazenamento em cache de resultados indexados usando hash de conteúdo no Redis
Apoio ao cancelamento definitivo através da rescisão de processos de trabalho
Fluxo de mensagens e reciclagem da vida
1. Driver de Registro/Base : `BaseAgent.connect` e `BaseAgent.register` (via `util/adk/base_agent.py`) conectam-se ao Redis e mantêm um heartbeat em `agent:query_agent:heartbeat`, bem como uma entrada no registro `registry:agent:query_agent`. As mensagens são enviadas e recebidas via Celery. `BaseAgent.send_message` usa o Celery para encaminhar mensagens `A2AMessage` para `agent`.
2. Recebendo uma tarefa - Encaminha `QueryAgent.handle_message` por `payload_type`: - `"task"` → `execute_query` - `"cancel"` → `_handle_cancel`
3. Busca em cache - `execute_query` cria um hash estável da carga útil da tarefa (excluindo campos transitórios como `id`, `outbox`, `parent_id`). - Chave do cache: `query_cache`:
4. Distribuição para um processo de trabalho isolado : Um semáforo é selecionado para cada fonte (por exemplo, neo4j, couchbase, llm, etc.). O agente inicia um processo de multiprocessamento executando a função worker_process(task, result_queue). Os fluxos STDIN, STDOUT e STDERR são redirecionados para /dev/null no processo de trabalho para evitar problemas com o manipulador quando o processo for encerrado.
5. Execução no servidor (no worker) - Neo4j: Utiliza um driver síncrono (`GraphDatabase.driver`) e executa uma consulta Cypher fornecida (ou uma consulta "tool" especializada, por exemplo, expansores de caminho de país/moeda). Retorna as linhas como dicionários e, em seguida, as normaliza via `_jsonify_value` para garantir a tipagem segura em JSON. - Couchbase: Reflete um método de `search.couchbase_service.cb_service` com `params`. - LLM/Text2Cypher: Utiliza `chat_gateway.llm_client.LLMClient` para gerar uma string Cypher a partir de NL ou executar tarefas LLM.
6. Coleta de Resultados e Resposta : O processo pai consulta a `result_queue`. Assim que um resultado estiver disponível, ele o encaminha ao remetente original por meio de `AgentContext.reply`, que utiliza o Celery. O cache pode ser preenchido com JSON normalizado (respeitando a política de TTL definida no arquivo) para que tarefas idênticas subsequentes sejam processadas instantaneamente.
7. Cancelamento – A função `_handle_cancel` procura o processo de trabalho em execução usando o identificador `task_id` e envia um sinal `SIGTERM`, seguido de um sinal `SIGKILL` se necessário. Uma mensagem de confirmação de cancelamento é enviada ao solicitante.
Contratos de dados
- Carga útil da tarefa recebida
- ` source` : `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`
- `query` e/ou `method`, `params` (específicos do backend)
- Opcional: `tool` (modelos de consulta especiais do Neo4j), `limit`, `outbox`
- Resultado de saída
- `{ id, source, ok, data?, error?, duration, ts, cached? }`
Referências principais para implementação
- Worker : `search/query_agent.py::worker_process`
- Lógica de cache e semáforos : `search/query_agent.py::execute_query`
- Cancelar : `search/query_agent.py::_handle_cancel`
- Mensagens A2A e sinal de presença : `util/adk/base_agent.py`
Agente de Agregação — Seu Papel
Objetivo principal: Pré-calcular e armazenar em cache análises "custosas, mas frequentemente necessárias" para que a interface do usuário e o chat possam responder instantaneamente. Com o tempo, identificar as agregações mais utilizadas que devem ser pré-calculadas. O sistema também oferece suporte ao pré-aquecimento de consultas e a gatilhos manuais.
- Salva os resultados sob chaves Redis como `pre_agg:*` e `analytics:*` (por exemplo, `analytics:bond_risk_rankings`, `pre_agg:result:`)
- Faz uso extensivo do Neo4j para agregar tendências históricas; algumas tarefas também armazenam agrupamentos de índices globais e índices globais de preço/lucro.
- Utiliza uma máquina de estados simples ("AggregationStateMachine") para rastreamento de ciclos e geração de relatórios de estado.
- Funciona sequencialmente (um único processo) para economizar memória, mantendo-se responsivo a comandos manuais via caixa de entrada.
Planejamento e gatilhos
- Intervalo: `AGG_AGENT_INTERVAL_SECS` (padrão ≈ 3600 segundos)
- Acionadores manuais: envio para a lista Redis `agent:aggregation_agent:inbox` (modificável através da variável de ambiente `AGG_AGENT_INBOX`). O agente verifica a caixa de entrada entre cada tarefa.
- Consultas populares: o mapeamento ZSET `stats:hot_queries` + `stats:query_to_cypher` é usado para pré-materializar consultas populares.
Tipos de tarefas e resultados
- `notional_aggregation`: Calcula somas nocionais em vários tipos de ativos por `PeriodEndDate` (Neo4j) → Redis `pre_agg:*`
- `asset_holdings_trend`: Evolução de carteiras de ativos ao longo do tempo (Neo4j) → Redis `pre_agg:*`
- `municipal_debt_summary`: Resumos de dívida municipal (Neo4j) → Redis `pre_agg:*`
- `industry_aggregation`: Agrupamento por setor/indústria (Neo4j) → Redis `pre_agg:*`
- `world_pe_aggregation`: Índices globais de preço/lucro (cálculo local) → Redis `analytics:world_pe_ratios`
- `bond_risk_aggregation`: Classificação de risco de títulos (serviço de aprendizado de máquina) → Redis `analytics:bond_risk_rankings` (+ alias opcional via `BOND_RISK_UI_KEY`, TTL opcional `BOND_RISK_TTL`)
- `wilshire_aggregation`: Agregação global de índices, protegida por uma verificação de abertura de mercado → Redis (cache de índices)
- `hot_query` (personalizado): Dado um nome de consulta legível para humanos, procure por Cypher em `stats:query_to_cypher`, execute-o e armazene a carga útil como `pre_agg:result:`.
Ciclo de vida e fluxo
1. Batimento cardíaco/Início
- Herda de `BaseAgent.connect/register` (heartbeat do Redis e entradas de registro em `agent:aggregation_agent:*`).
2. Ciclo Periódico
- `run_cycle` constrói a lista de tarefas: tarefas fixas + o `HOT_QUERY_LIMIT` mais importante (padrão 20) de `stats:hot_queries`.
- As tarefas são executadas sequencialmente por meio de um executor de thread única (`max_workers=1`) para previsibilidade de memória.
- Entre cada tarefa, a função `check_inbox` é chamada para que os acionadores manuais não fiquem indisponíveis.
3. Caminho de acionamento manual
A função `check_inbox` executa uma operação `LPOP` em `agent:aggregation_agent:inbox` e analisa JSON ou uma string bruta. Se um filtro estiver presente, ela reexecuta `run_cycle`, limitando-a àquela tarefa ou a uma string `hot_query`.
4. Unidade de execução
- `execute_job` abre novas conexões Redis e Neo4j para isolamento do tipo de tarefa, executa a tarefa e grava JSON no cache com `AggregationEncoder` para tipagem robusta (tipos temporais do Neo4j, tipos do NumPy, etc.).


Notas operacionais e parâmetros ajustáveis
- Os endpoints do Redis e as credenciais do Neo4j são lidos do ambiente; muitos valores padrão são fornecidos ou os valores de `creds.py` são usados no modo de desenvolvimento.
- Taxa de agregação: `AGG_AGENT_INTERVAL_SECS` (padrão: 3600). Largura de consulta frequente: `AGG_AGENT_HOT_LIMIT` (padrão: 20).
- Alias TTL/UI para agregar riscos de títulos por meio de `BOND_RISK_TTL` e `BOND_RISK_UI_KEY`.
- Limites de concorrência de consultas: semáforos por origem (`LIMITS` em `search/query_agent.py`).
- Cache de resultados de consulta: `query_cache : : ` no Redis; a carga útil é normalizada com `_jsonify_value` para garantir a segurança do JSON, incluindo os tipos temporais do Neo4j.
Referências cruzadas rápidas
- Solicitar Agente
- `search/query_agent.py::execute_query` (cache, semáforos, criação de processos)
- `search/query_agent.py::worker_process` (lógica de backend)
- `search/query_agent.py::_handle_cancel` (forçar parada)
- Agente de agregação
- `data_sync/aggregation_agent.py::run_cycle` (agendamento de tarefas, consultas frequentes, verificação da caixa de entrada)
- `data_sync/aggregation_agent.py::execute_job` (ações e gravações por tarefa)
- `data_sync/aggregation_agent.py::check_inbox` (gatilhos manuais)
- Estrutura básica
- `util/adk/base_agent.py::BaseAgent` (Mensagens Celery, sinal de presença, registro)

Chaves/estruturas do Redis
Registro/pulsação do agente
`register:agent:`
`agente:
cache de consultas
`query_cache:
Resultados de agregação
`pre_agg:*` → JSON `PreAggPayload`
`analytics:world_per_ratios` → Tabela JSON de índices globais de preço/lucro
`analytics:bond_risk_rankings` → Matriz JSON de classificações (alias opcional `BOND_RISK_UI_KEY`)
Coordenação da agregação
`agent:aggregation_agent:inbox` (LPOP) → `AggTrigger`
`stats:hot_queries` (ZSET) → nomes de consultas populares
`stats:query_to_cypher` (HASH) → nome → Cifra
Comecei principalmente com esses dois agentes devido às minhas preocupações com a escalabilidade de construir uma solução completa baseada em Python que envolvesse o uso de aprendizado de máquina multiusuário, bem como conjuntos de resultados gráficos e agregados.
Inicialmente, surgiram problemas com sobrecarga e gerenciamento de tarefas, motivo pelo qual adicionei o Celery para aliviar a carga dessas decisões. Como mencionado anteriormente, ambos os agentes possuem capacidade de aprendizado; portanto, assegurei que o objetivo geral do agente de IA fosse atingido, ao mesmo tempo que contribuía para a escalabilidade e otimização do sistema.
O Celery funciona em conjunto com o mecanismo de orquestração principal e a lógica de configuração, permitindo que os processos de gerenciamento de tarefas sejam iniciados ou interrompidos de acordo com as necessidades do usuário. Isso permite que os agentes permaneçam dentro dos limites de recursos do sistema e evita sobrecargas e falhas.


