top of page

KI-Investitionsmaschine: Agentenarchitektur

Überblick


Es gab einige spezifische, systemorientierte Bereiche, auf die ich mich zunächst konzentrieren wollte. Da Datenabruf und -zusammenfassung ein wichtiger, wenn nicht sogar der wichtigste Bestandteil der Funktionalität sein würden, entschied ich mich, meine ersten Agenten auf diese Aspekte auszurichten. Ich hielt es für wichtig, dass sie in der Lage sein sollten, Muster in den Anfragen und Aggregationen der Nutzer zu erkennen und daraus zu lernen. Dies würde eine proaktive Herangehensweise bei der Beantwortung von Nutzerfragen ermöglichen.


Da ich mich entschieden habe, ein Projekt komplett auf Python zu entwickeln, wurden meine wichtigsten Agenteninfrastrukturkomponenten mit folgenden Technologien erstellt:


  • Google Agent Development Kit (ADK) und Agent-to-Agent (A2A)-basierte Agenten

  • Sellerie für das Aufgabenmanagement


  1. Der Abfrageagent: ein asynchroner, mehrprozessfähiger Executor mit Caching und Abbruchfunktion, dem Celery-geroutete A2A-Nachrichten vorgeschaltet sind.


  2. Der Aggregationsagent: ein periodischer/halb-auf-Bedarf ausgeführter Vorberechnungsdienst, der häufig angeforderte Analysen in Redis materialisiert und so die Benutzeroberfläche und den Chat schnell hält.


Nachfolgend finden Sie eine übersichtliche Architekturdarstellung sowie detaillierte Sequenzdiagramme zur Funktionsweise des Abfrage- und des Aggregationsagenten. Ich habe außerdem die wichtigsten Dateien und Methoden aufgeführt, um Ihnen einen schnellen Überblick über die Implementierung zu ermöglichen.


Komponentendiagramm der Agenteninteraktionen
Query-Agent-Dateien : `search/query_agent.py` - Wichtige Methoden: `worker_process`, `execute_query`, `_handle_cancel`, `handle_message`, `connect`

Aggregationsagent-Dateien : `data_sync/aggregation_agent.py` - Wichtige Methoden: `execute_job`, `run_cycle`, `check_inbox`, `AggregationEncoder`, `AggregationAgent.run`

Gemeinsames Agentengerüst : `util/adk/base_agent.py` - Wichtige Klassen/Methoden: `AgentContext`, `BaseAgent.connect`, `BaseAgent.send_message` (Celery), `BaseAgent.register`/Heartbeat, `create_celery_task`

Abfrageagent – Was er tut

Hauptziel: Nutzermuster bei Suchanfragen analysieren, um das Caching oder den Datenabruf zu optimieren. Umfangreiche Abfragen asynchron im Auftrag des Chat-/Such-Backends ausführen und blockierende E/A-Operationen sowie Serialisierung in isolierte Worker-Prozesse auslagern. Dies ermöglicht Backend-Multiplexing.


  • Neo4j, Couchbase, Redis, LLM, Text‑to‑Cypher-Concurrency limiting by backend using semaphores.

  • Ergebnis-Caching mit Inhalts-Hash in Redis

  • Unterstützung bei der harten Kündigung durch Beendigung von Arbeitsprozessen


Lebenszyklus & Nachrichtenfluss


1. Registrierung/Heartbeat – `BaseAgent.connect` und `BaseAgent.register` (über `util/adk/base_agent.py`) stellen eine Verbindung zu Redis her und halten einen Heartbeat unter `agent:query_agent:heartbeat` sowie einen Registry-Eintrag unter `registry:agent:query_agent` aufrecht. – Nachrichten werden über Celery gesendet und empfangen. `BaseAgent.send_message` verwendet Celery, um `A2AMessage` an `agent` weiterzuleiten.


2. Empfangen einer Aufgabe – `QueryAgent.handle_message` leitet Daten nach `payload_type` weiter: – `"task"` → `execute_query` – `"cancel"` → `_handle_cancel`


3. Cache-Lookup – `execute_query` erstellt einen stabilen Hash der Aufgabennutzlast (ohne temporäre Felder wie `id`, `outbox`, `parent_id`). – Cache-Schlüssel: `query_cache:


4. Ausführung an einen isolierten Workerprozess – Pro Quelle (z. B. `neo4j`, `couchbase`, `llm` usw.) wird ein Semaphor ausgewählt. – Der Agent startet einen Multiprocessing-Prozess, der `worker_process(task, result_queue)` ausführt. – STDIN/STDOUT/STDERR werden im Worker an `/dev/null` gebunden, um Deskriptorprobleme beim Beenden des Prozesses zu vermeiden.


5. Backend-Ausführung (innerhalb des Workers) – Neo4j: Verwendet einen synchronen Treiber (`GraphDatabase.driver`) und führt bereitgestellten Cypher-Code (oder eine spezielle „Tool“-Abfrage, z. B. Pfadexpander für Länder/Währungen) aus. Gibt Zeilen als Dictionaries zurück, die anschließend mit `_jsonify_value` normalisiert werden, um JSON-sichere Typisierung zu gewährleisten. – Couchbase: Ruft reflektierend eine Methode von `search.couchbase_service.cb_service` mit `params` auf. – LLM/Text2Cypher: Verwendet `chat_gateway.llm_client.LLMClient`, um entweder einen Cypher-String aus NL zu erzeugen oder LLM-Tasks auszuführen.


6. Ergebniserfassung & Antwort – Der übergeordnete Prozess fragt die `result_queue` ab. Sobald ein Ergebnis verfügbar ist, leitet er es mithilfe von `AgentContext.reply`, das über Celery geroutet wird, an den ursprünglichen Absender weiter. – Der Cache kann mit normalisiertem JSON (unter Berücksichtigung der in der Datei definierten TTL-Strategie) gefüllt werden, sodass zukünftige identische Aufgaben sofort zurückgegeben werden.


7. Abbruch – `_handle_cancel` sucht den laufenden Worker-Prozess anhand der `task_id` und sendet `SIGTERM`, bei Bedarf eskaliert es zu `SIGKILL`. Dem Anfragenden wird das Abbruchergebnis zurückgesendet.


Datenverträge

- Nutzdaten der eingehenden Aufgabe

- ` Quelle `: `"neo4j" | "couchbase" | "redis" | "llm" | "text2cypher"`

- `query` und/oder `method`, `params` (backendspezifisch)

- Optional: `tool` (spezielle Neo4j-Abfragemuster), `limit`, `outbox`

- Ausgehendes Ergebnis

- `{ id, source, ok, data?, error?, duration, ts, cached? }`


Wichtige Implementierungshinweise

- Worker : `search/query_agent.py::worker_process`

- Cache-Logik & Semaphore : `search/query_agent.py::execute_query`

- Abbruch : `search/query_agent.py::_handle_cancel`

- A2A-Messaging & Heartbeat : `util/adk/base_agent.py`


Aggregationsagent – Was er tut

Hauptziel: Aufwändige, aber häufig benötigte Analysen vorab berechnen und zwischenspeichern, damit Benutzeroberfläche und Chat sofort reagieren können. Mit der Zeit lernt das System, welche häufig verwendeten Aggregationen als vorab zu berechnende Daten gekennzeichnet werden sollten. Es unterstützt außerdem das Vorab-Aufwärmen von häufigen Abfragen und manuelle Auslöser.


- Schreibt Ergebnisse unter Redis-Schlüsseln wie `pre_agg:*` und `analytics:*` (z. B. `analytics:bond_risk_rankings`, `pre_agg:result: `)

- Nutzt Neo4j intensiv für die Aggregation historischer Trends; einige Aufgaben speichern auch globale Indexrollups und weltweite KGV-Verhältnisse.

- Verwendet eine einfache Zustandsmaschine (`AggregationStateMachine`) zur Zyklusverfolgung und Zustandsberichterstattung.

- Läuft aus Speichergründen sequenziell (Einzelprozess), reagiert aber weiterhin auf manuelle Auslöser über einen Posteingang.


Zeitplanung & Auslöser

- Intervall: `AGG_AGENT_INTERVAL_SECS` (Standardwert ≈ 3600 Sekunden)

- Manuelle Auslöser: Benachrichtigungen an die Redis-Liste `agent:aggregation_agent:inbox` (über die Umgebungsvariable `AGG_AGENT_INBOX` überschreibbar). Der Agent prüft den Posteingang zwischen den Jobs.

- Beliebte Abfragen: ZSET `stats:hot_queries` + `stats:query_to_cypher`-Mapping werden verwendet, um beliebte Abfragen vorab zu materialisieren.


Auftragsarten und Ergebnisse

- `notional_aggregation`: Berechnet die fiktiven Summen über viele Anlagetypen hinweg nach `PeriodEndDate` (Neo4j) → Redis `pre_agg:*`

- `asset_holdings_trend`: Trends der Vermögensbestände im Zeitverlauf (Neo4j) → Redis `pre_agg:*`

- `municipal_debt_summary`: Zusammenfassungen der Kommunalschulden (Neo4j) → Redis `pre_agg:*`

- `industry_aggregation`: Sektor-/Branchen-Rollups (Neo4j) → Redis `pre_agg:*`

- `world_pe_aggregation`: Weltweite KGV-Verhältnisse (lokale Berechnung) → Redis `analytics:world_pe_ratios`

- `bond_risk_aggregation`: Anleiherisiko-Rankings (ML-Dienst) → Redis `analytics:bond_risk_rankings` (+ optionaler Alias über `BOND_RISK_UI_KEY`, optionale TTL `BOND_RISK_TTL`)

- `wilshire_aggregation`: Globale Indexaggregation, geschützt durch eine Marktöffnungsprüfung → Redis (Index-Cache)

- `hot_query` (benutzerdefiniert): Bei einem für Menschen lesbaren Abfragenamen wird Cypher aus `stats:query_to_cypher` gesucht, ausgeführt und die Nutzdaten als `pre_agg:result:` gespeichert. `


Lebenszyklus & Ablauf

1. Start/Herzschlag

- Erbt `BaseAgent.connect/register` (Redis-Heartbeat und Registry-Einträge unter `agent:aggregation_agent:*`).


2. Periodischer Zyklus

- `run_cycle` erstellt die Jobliste: feste Jobs + die Top-`HOT_QUERY_LIMIT` (Standardwert 20) aus `stats:hot_queries`.

- Die Jobs werden sequenziell über einen Single Thread-Executor ausgeführt (`max_workers=1`). Dies dient der Vorhersagbarkeit des Speicherverbrauchs.

- Zwischen den einzelnen Jobs wird `check_inbox` aufgerufen, damit manuelle Trigger nicht blockiert werden.


3. Manueller Triggerpfad

- `check_inbox` führt einen `LPOP`-Aufruf für `agent:aggregation_agent:inbox` durch und analysiert entweder JSON oder einen einfachen String. Falls ein Filter vorhanden ist, wird `run_cycle` erneut aufgerufen, beschränkt auf diesen Job oder einen `hot_query`-String.


4. Ausführungseinheit

- `execute_job` öffnet neue Redis- und Neo4j-Verbindungen zur Isolation pro Jobtyp, führt die Aufgabe aus und schreibt JSON mit `AggregationEncoder` in den Cache für robuste Typisierung (Neo4j temporal, numpy types, etc.).


Aktivitätsdiagramm der Agenten


Aktivitätsdiagramm des Agenten


Betriebshinweise & Einstellmöglichkeiten

- Redis-Endpunkte und Neo4j-Zugangsdaten werden aus der Umgebungsvariablen gelesen; viele Standardwerte werden bereitgestellt oder greifen in der Entwicklungsversion auf die Werte aus `creds.py` zurück.

- Aggregationsintervall: `AGG_AGENT_INTERVAL_SECS` (Standardwert 3600). Hot-Query-Breite: `AGG_AGENT_HOT_LIMIT` (Standardwert 20).

- Bond Risk Aggregation TTL/UI Alias über `BOND_RISK_TTL` und `BOND_RISK_UI_KEY`.

- Abfrage-Parallelitätsgrenzen: Semaphore pro Quelle (`LIMITS` in `search/query_agent.py`).

- Abfrageergebnis-Cache: `query_cache: : ` in Redis; payload normalized with `_jsonify_value` to ensure JSON safety, including Neo4j temporal types.


Schnelle Querverweise

- Anfrageagent

- `search/query_agent.py::execute_query` (Cache, Semaphore, Worker-Spawn)

- `search/query_agent.py::worker_process` (Logik pro Backend)

- `search/query_agent.py::_handle_cancel` (erzwungenes Beenden)

- Aggregationsagent

- `data_sync/aggregation_agent.py::run_cycle` (Jobplanung, häufige Abfragen, Posteingangsprüfungen)

- `data_sync/aggregation_agent.py::execute_job` (Aktionen und Schreibvorgänge pro Job)

- `data_sync/aggregation_agent.py::check_inbox` (manuelle Auslöser)

- Basis-Framework

- `util/adk/base_agent.py::BaseAgent` (Celery-Messaging, Heartbeat, Registry)


Ablaufdiagramm der Agenten

Wichtige Redis-Schlüssel / -Strukturen

  • Agentenregister/Herzschlag

    • `registry:agent:

    • Agent:


  • Abfragecache

    `query_cache:


  • Aggregationsausgaben

    • `pre_agg:*` → JSON `PreAggPayload`

    • `analytics:world_pe_ratios` → JSON-Array mit weltweiten KGV-Werten

    • `analytics:bond_risk_rankings` → JSON-Array mit Ranglisten (optionaler Alias `BOND_RISK_UI_KEY`)


  • Aggregationskoordination

    • `agent:aggregation_agent:inbox` (LPOP) → `AggTrigger`

    • `stats:hot_queries` (ZSET) → beliebte Suchanfragenamen

    • `stats:query_to_cypher` (HASH) → Name → Cypher


Ich habe hauptsächlich mit diesen beiden Agenten begonnen, da ich Bedenken hinsichtlich der Skalierbarkeit beim Aufbau einer komplett auf Python basierenden Lösung hatte, die die Nutzung von ML durch mehrere Benutzer sowie Graphen und aggregierte Ergebnismengen beinhaltete.


Anfangs gab es einige Probleme mit Überlastung und Aufgabenmanagement, weshalb ich Celery hinzugefügt habe, um diese Entscheidungen auszulagern. Wie Sie oben gelesen haben, verfügen beide Agenten über Lernprozesse. Daher habe ich versucht, das übergeordnete Ziel des KI-Agenten zu verfolgen und gleichzeitig die Skalierung und Optimierung des Systems zu unterstützen.


Celery arbeitet mit der zentralen Orchestrierungs-Engine und Konfigurationslogik zusammen, um Aufgabenverwaltungs-Worker je nach Benutzerbedarf zu starten oder zu beenden. Dadurch wird sichergestellt, dass die Agenten innerhalb der Systemressourcenbeschränkungen bleiben und nicht unkontrolliert abstürzen.


bottom of page