量化中台整体路线图

Context

已完成 Phase 1-6(Lean回测 + 模拟交易 + Dashboard)。现在要从"单机量化工具"升级为"机构级量化中台"。

用户架构包含6层:数据层(L1-L4) → 数据加工层 → AI核心策略层 → 验证层 → 执行层 → 风控层。

当前资源: 8核CPU, 23GB RAM, 104GB磁盘可用, Redis + MongoDB 已有, 有多种付费数据源。 优先级: 数据层 + 加工层先行,暂不接券商。 使用场景: 机构内部系统。


当前完成度

架构层 完成度 说明
数据层 (L1-L4+历史) 5% 只有 yfinance (基础L1), zip文件存储
数据加工层 5% 无pipeline, 无清洗标准化
AI核心策略层 15% 简单技术指标 + LLM情绪信号
验证层 (A/B) 20% 有回测 + 模拟, 无A/B框架
智能执行层 5% 模拟交易, 无券商接入
风控管理层 10% 基础风控(止损、仓位限制)

阶段总览

阶段 A: 数据基础设施      ← 现在开始(本次实施)
阶段 B: 数据加工 + 因子化  ← 紧接着
阶段 C: AI 核心策略层
阶段 D: 执行层 + 风控层(开券商账户后)

阶段 A:数据基础设施(本次实施重点)

A1. TimescaleDB 时序数据库

安装 PostgreSQL + TimescaleDB 扩展,替代 zip 文件存储。

核心表:

-- L1 行情
CREATE TABLE market_ohlcv (
    time TIMESTAMPTZ NOT NULL, symbol TEXT, source TEXT,
    timeframe TEXT, open FLOAT8, high FLOAT8, low FLOAT8,
    close FLOAT8, volume BIGINT, vwap FLOAT8
);
SELECT create_hypertable('market_ohlcv', 'time');

-- L2 Flow
CREATE TABLE flow_data (
    time TIMESTAMPTZ NOT NULL, symbol TEXT, source TEXT,
    flow_type TEXT, side TEXT, premium FLOAT8, volume BIGINT,
    strike FLOAT8, expiry DATE, raw_data JSONB
);

-- L3 Info
CREATE TABLE info_data (
    time TIMESTAMPTZ NOT NULL, source TEXT, category TEXT,
    symbol TEXT, headline TEXT, sentiment FLOAT8,
    confidence FLOAT8, raw_data JSONB
);

-- L4 另类
CREATE TABLE alt_data (
    time TIMESTAMPTZ NOT NULL, source TEXT, category TEXT,
    symbol TEXT, signal_value FLOAT8, metadata JSONB
);

-- 数据源注册
CREATE TABLE data_sources (
    id SERIAL PRIMARY KEY, name TEXT UNIQUE, layer TEXT,
    api_type TEXT, config JSONB, enabled BOOLEAN DEFAULT true,
    last_sync TIMESTAMPTZ, status TEXT DEFAULT 'idle'
);

文件: - src/data/__init__.py - src/data/database.py — SQLAlchemy async 连接池 + 表定义 - src/data/models.py — Pydantic 模型(请求/响应) - scripts/init_db.py — 初始化 TimescaleDB + 建表 - scripts/migrate_yfinance.py — 迁移现有 9个symbol 日线到数据库

A2. Connector 数据采集框架

统一的数据源接口,每个源一个 Connector。

文件: - src/data/connectors/__init__.py - src/data/connectors/base.py — BaseConnector 抽象类(fetch/stream/validate/transform) - src/data/connectors/yfinance_conn.py — 重构现有 yfinance 逻辑 - 后续按需添加:unusual_whales.py, flowalgo.py, polymarket.py

BaseConnector 接口:

class BaseConnector(ABC):
    source_name: str
    layer: str  # L1/L2/L3/L4
    async def fetch(self, symbols, start, end) -> list[dict]
    async def stream(self, symbols) -> AsyncIterator[dict]  # 实时流(可选)
    def transform(self, raw) -> list[dict]  # 标准化为统一格式

A3. 数据调度器

A4. 数据质量监控

A5. 数据查询 API

GET  /api/v1/data/ohlcv     — 查询行情(symbol, start, end, timeframe)
GET  /api/v1/data/flow       — 查询Flow数据
GET  /api/v1/data/sources    — 数据源列表及状态
POST /api/v1/data/sync/{source} — 手动触发同步
GET  /api/v1/data/quality    — 数据质量报告

A6. Dashboard 数据管理页 (/data)

文件: - dashboard/src/pages/DataManager.jsx - dashboard/src/api/client.js — 增加 data API 函数


阶段 B:数据加工 + 因子化(A 完成后)

B1. 清洗管道

B2. 因子引擎

B3. 因子评估


阶段 C:AI 核心策略层(B 完成后)

C1. 异常检测 — Abnormal Score

C2. 趋势评分 — Trend Score

C3. AI 策略引擎 — Predicted P&L

C4. A/B 验证框架


阶段 D:执行 + 风控(开券商账户后)


阶段 A 实施顺序

第一步 A1: 安装 TimescaleDB → 建表 → 迁移 yfinance 数据 第二步 A2: Connector 框架 + yfinance Connector 重构 第三步 A5: 数据查询 API 端点 第四步 A6: Dashboard 数据管理页 第五步 A3: 数据调度器(定时采集) 第六步 A4: 数据质量监控

新增依赖

sqlalchemy[asyncio]>=2.0
asyncpg>=0.29
psycopg2-binary>=2.9
apscheduler>=3.10

验证(阶段 A 完成后)

  1. psql 查询 SELECT count(*) FROM market_ohlcv WHERE symbol='SPY' → ~7000行
  2. curl /api/v1/data/ohlcv?symbol=SPY&start=2025-01-01 → 返回 JSON
  3. curl /api/v1/data/sources → 列出数据源及状态
  4. Dashboard /data 页 → 数据源卡片 + 覆盖热力图
  5. 手动触发同步 → 数据入库 → 质量检查通过

修改的现有文件

main.py                              — 增加 data_routes, DataScheduler 初始化
config.py                            — 增加 DATABASE_URL 配置
requirements.txt                     — 增加数据库依赖
dashboard/src/App.jsx                — 增加 /data 路由
dashboard/src/components/Sidebar.jsx — 增加 Data 导航
dashboard/src/api/client.js          — 增加 data API 函数