AI Investing Machine: Building Markets-Oriented Agents With Prefect: An Architectural Tour
- Claude Paugh

- 2 days ago
- 4 min read
Updated: 2 days ago
I have written the previous post on aggregation and query agents out of order with this one. This probably should have been first, but its a different slice on how to integrate external tools and orchestrate more complex workflows. Hopefully this slice of the system is informative.
What are the agents and what do they do?
In this system, an “agent” is a focused, domain-specific worker that performs a concrete job, typically pulling market data, analyzing it, and publishing results to caches and the database for the UI layer to consume. Many agents are used standalone (CLI/daemon) and also wrapped by Prefect flows for scheduled, reproducible operation.
Key agents and services you’ll encounter:
Technical Analysis Agent (`pricing/tech_analysis_agent.py`)
Role: Runs symbol-level technical analysis using the underlying `TechnicalAnalysisService` and a `TechAnalysisStateMachine` to fetch prices, compute indicators, detect patterns, and write analysis artifacts.
Inputs: Symbols to analyze, price/ticker sources (FMP, Yahoo, IBKR), optional intervals and limits.
Outputs: Indicator series, signals/patterns, and charts; writes to log and storage (DB/Redis) via service methods.
Technical Analysis Service (`pricing/technical_analysis_service.py`)
Role: A large, reusable service providing market I/O and analytics primitives. It fetches prices from multiple sources, applies indicators, detects patterns, computes correlations, and exposes higher-level utilities (e.g., index constituents, ETF holdings).
Inputs: Tickers, reference calendars, FMP/IBKR/Yahoo credentials.
Outputs: JSON/frames with time series, indicators, signals, enriched quotes; writes to Redis/DB via orchestration tasks.
Market Scanner Service (`pricing/scanner_service.py`)
Role: Chooses which symbols to scan, runs the daily scan, and records discoveries. It coordinates symbol eligibility, scheduling, and pattern discovery.
Inputs: Universe, company profiles, Redis scheduling metadata.
Outputs: Batches of scanned symbols, discovered pattern segments, and progress.
Ingestion/News/Index/ETF/Commodities-Crypto Update Agents (via flows)
Role: Regular ingestion of quotes and reference data used by dashboards (e.g., index quotes and constituents, news, ETFs, commodities/crypto). For example, `orchestration/index_quotes_flow.py` fetches quotes and caches them in Redis and persists in PostgreSQL.
Inputs: External APIs (FMP, etc.), configured schedules.
Outputs: Cached quotes in Redis keys like `cache:index_quotes`, persisted timeseries in DB, and per-index constituents for analytics features.
Fixed Income Agents (Municipal/Bond Risk flows)
Role: Update and (re)train bond risk models; scan local muni PDF drops (the “Muni Local Agent”) and refresh risk estimates on a schedule.
Inputs: Local file drops, fiscal releases, model parameters.
Outputs: Updated model artifacts, rankings, and risk views, published to DB/Redis for downstream consumers.
How Prefect and the agents orchestrate work
Prefect is used to schedule, parameterize, and observe the work done by these agents/services. A typical pattern is:
Define one or more `@flow` functions that sequence tasks against a service (e.g., fetch→ enrich → cache → persist).
Convert flows to “deployments” with schedules using Prefect’s server API.
Use `start_prefect.sh` to bootstrap the Prefect server and register/serve deployments.
A concrete example: Index Quotes Update
Flow: `orchestration/index_quotes_flow.py`
`@task fetch_and_cache_index_quotes()`
Calls `ta_service.fetch_batch_index_quotes_fmp()`
Enriches with `ta_service.fetch_indexes_fmp()`
Caches results in Redis under `cache:index_quotes`
Persists to DB via `orchestration.persistence_tasks.persist_quotes_to_db`
Caches index constituents to support world P/E and related metrics
`@flow index_quotes_flow()` just invokes that task and returns success
Deployment: `orchestration/deployments.py`
This file builds multiple deployments (analyst estimates, market scanner, index quotes via ETF/commodities/crypto flows, news, pricing updates, muni/bond risk flows, model memory sync, industry aggregation, IBKR ETL, SEC bulk ingest and reprocessing).
Each deployment gets a clear name, a cron schedule, and a description; they’re then served via `prefect.aserve(...)`.
Operational bootstrap:
`start_prefect.sh` ensures Prefect is installed, launches `prefect server start` in the background, waits for readiness on port 4200, and runs `orchestration/deployments.py` to register and serve deployments.
The Prefect UI at `http://localhost:4200` shows flow runs, schedules, and logs.
Prefect + Agents execution path

End-to-end orchestration: how it all fits together
At a system level, the orchestration marries time-based triggers (Prefect) with capability-centric agents/services (market scanners, TA, ingestion, and bond risk). Data flows from external APIs/brokers through services to Redis/Postgres, then into the web UI and APIs.
What controls what?
Prefect deployments control when and with what parameters an agent/service runs.
Agents/services encapsulate the domain logic: what to fetch, how to analyze, how to store.
Shared infrastructure provides durable state and fast reads: Redis for hot caches and PostgreSQL for persistence.
Key deployments and their cadence:
Market data & analytics
Market Scanner: “daily-market-scan” at 01:00
Pricing Updates: 10:00 and 16:30 ET
Commodities/Crypto: 11:00 and 16:15 ET on weekdays
ETF/Index quotes: 16:30 ET (via ETF flow) and constituents caching per index
Analyst Estimates: 02:00 daily
News Ingestion: every 2 hours from 09:00
Fixed income & models
Daily Bond Risk: 12:00 ET
Muni Local Agent: every 4 hours for PDF discovery
Ad-hoc Bond Risk Training: manual on-demand
Monthly Muni Risk Refresh: 10th of each month at 00:00
System hygiene & enrichment
Model Memory Sync: 02:00 daily (Redis ↔ Postgres)
Industry Aggregation: 04:00 daily
IBKR Fundamentals ETL: 13:00 ET daily
SEC Bulk Ingest: Sat 02:00; Reprocess: manual
A typical daily run:

Operational notes and best practices
One codebase, two faces: Services and agents can be run directly (ad hoc/testing) and via Prefect for production cadence and observability.
Use Redis for hot paths: Caches like `cache:index_quotes` provide fast UI loads; DB persists full history and supports analytics.
Idempotency and error handling: Flows typically guard external I/O with try/except and log failures; prefer idempotent writes so retries are safe.
Parameterize flows: Expose batch sizes, symbol lists, and date windows as parameters on deployments to tweak without code changes.
Observe in Prefect UI: The dashboard shows run history, task logs, failures, and retries. It’s invaluable for operating data pipelines.
A few UI shots for the user facing content produced by what's above:
By separating domain logic (agents/services) from orchestration (Prefect flows and deployments), this system stays composable and observable. Prefect provides reliable triggers and monitoring, while the agents focus on market/domain work. The result is a modular, scheduled data platform that can scale in scope (new flows) and cadence (new deployments) with minimal friction.
Prefect also supports chatbot integration, so we can launch any of the flows with parameters from the chatbot interface for on-demand work. We can also re-run any flows from the chatbot interface as well.























