Files

10 KiB

Platform Gateway Architecture

1. 定位: Python 库 (in-process), 不是服务

Gateway 是 ChatHub 内部 Python 库, 直接 import 到 chatwoot_ws_agent.py 同进程, 0 HTTP 跳转, 0 新进程, 共享 asyncio loop。

from gateway import fetch, gateway_loop

gateway_loop.start()                  # 启动后台 asyncio loop
result = fetch("amazon", 39, {"asin": "B0XXX"}, timeout=5.0)
prompt_block = result.to_prompt_block()

对比 QWEN 最初设计的 FastAdmin 平台中转服务:

维度 QWEN 服务方案 本库方案
网络跳数 ws_agent → FastAdmin PHP → 平台 API (2 跳) ws_agent → 平台 API (0 跳)
延迟 +50-200ms (PHP roundtrip) 0
进程数 +1 (PHP-FPM 池) 0
凭证存储 MySQL 加密 + FastAdmin 解密 MySQL 加密 + Python AES-GCM 解密
凭证可见性 写日志/缓存易泄漏 仅 Python 进程内 (ephemeral)
故障域 PHP 挂了 = 全平台停 库异常 = 走 no_creds/error
跨语言 Python↔PHP 协议胶水 无, 纯 Python
调试 看 2 套日志 (PHP + Python) 单进程 stack trace

不否决服务方案的合理性 (多语言客户端场景), 但 ChatHub 是 Python 单体, 用库最经济。

2. 库结构 (1437 LOC, 13 .py 文件)

gateway/
├── __init__.py       16  公共导出 (fetch, fetch_all, gateway_loop, UnifiedResult)
├── base.py           72  UnifiedResult dataclass + to_prompt_block
├── cache.py          48  TTLCache (60s/1000条, in-process)
├── crypto.py         68  AES-256-GCM encrypt/decrypt
├── credentials.py   128  凭证加载 (AES → plaintext fallback + pymysql)
├── breaker.py       106  5-fail/60s 熔断器
├── loop.py           86  BackgroundLoop + singleton gateway_loop
├── router.py        143  5 通道分发 + 缓存 + 限流 + 熔断
├── amazon.py        151  PA-API 5, 13 marketplaces
├── jd.py            168  京东 union, 2 methods
├── taobao.py        192  淘宝 TOP API (item.get) + 淘宝客 (tbk.item.search)
├── pdd.py           138  拼多多 DDK, 2 methods
├── tiktok.py        121  抖音 open platform, HMAC-SHA256
└── ARCHITECTURE.md      (本文件, 199 行)

3. 5 通道实现对比

Channel Endpoint Auth Sign Algorithm Methods E2E
amazon api.amazon.com (13 hosts) access_token + partner_tag AWS4-HMAC-SHA256 (sigv4) PA-API 5 GetItems 763-809ms
jd api.jd.com/routerjson app_key + access_token MD5(sec+kv+sec) UPPER goods.promotiongoodsinfo / goods.query 160-227ms
taobao eco.taobao.com/router/rest app_key + session_key + adzone_id (kw) MD5(sec+kv+sec) UPPER item.get / tbk.item.search 147-204ms
pdd api.pinduoduo.com/router client_id + access_token MD5(sec+kv+sec) UPPER ddk.goods.search / ddk.goods.detail 95-112ms
tiktok open.douyin.com/goods/detail client_key + access_token HMAC-SHA256 hex goods/detail 142-205ms

E2E 测试入口: /app/working/test_all_channels.py (CoPaw 容器内, 16 个 case 覆盖 5 通道 + 边界) + /app/working/test_taobao_kw.py (Taobao keyword 专项 7 case, 含 adzone_id 缺/有/session 缺) + /app/working/test_jd.py (JD 3 case, no_creds/no token/real) + /app/working/test_marketplaces.py (Amazon 13 marketplace + 1 invalid)。

4. 5 签名算法细节

4.1 Amazon AWS4-HMAC-SHA256 (最复杂)

# 简化的伪代码
date_stamp = "20251207T120000Z"
amz_date = date_stamp
credential_scope = f"{date_stamp}/{region}/ProductAdvertisingAPI/aws4_request"

canonical_request = "\n".join([method, path, canonical_query, signed_headers, payload_hash])
string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{sha256(canonical_request)}"
kDate = HMAC("AWS4" + secret, date_stamp)
kRegion = HMAC(kDate, region)
kService = HMAC(kRegion, service)
kSigning = HMAC(kService, "aws4_request")
signature = hex(HMAC(kSigning, string_to_sign))

13 marketplaces 映射:

PAAPI_MARKETPLACES = {
    "us": ("https://api.amazon.com", "www.amazon.com"),
    "jp": ("https://api.amazon.co.jp", "www.amazon.co.jp"),
    "uk": ("https://api.amazon.co.uk", "www.amazon.co.uk"),
    "de": ("https://api.amazon.de", "www.amazon.de"),
    "fr": ("https://api.amazon.fr", "www.amazon.fr"),
    "it": ("https://api.amazon.it", "www.amazon.it"),
    "es": ("https://api.amazon.es", "www.amazon.es"),
    "ca": ("https://api.amazon.ca", "www.amazon.ca"),
    "in": ("https://api.amazon.in", "www.amazon.in"),
    "br": ("https://api.amazon.com.br", "www.amazon.com.br"),
    "mx": ("https://api.amazon.com.mx", "www.amazon.com.mx"),
    "au": ("https://api.amazon.com.au", "www.amazon.com.au"),
    "sg": ("https://api.amazon.sg", "www.amazon.sg"),
}

4.2 JD / Taobao / PDD 共享模式: MD5(sec + sorted_kv + sec)

# 三个平台同款签名, 只有 secret 来源和 public params 不同
pieces = "".join(f"{k}{params[k]}" for k in sorted(params.keys()))
sign = md5((secret + pieces + secret).encode()).hexdigest().upper()

差异点:

平台 secret 名 public params 区别 业务参数
JD app_secret method + access_token + param_json skuIds / keyword
Taobao item.get app_secret method + session + fields num_iid
Taobao tbk.search app_secret method + adzone_id (+ site_id) q (keyword)
PDD client_secret type + client_id + access_token + data_type=JSON keyword / goods_sign

4.3 TikTok HMAC-SHA256 (与 Amazon 不同)

# 抖音 - 直接 HMAC, 不拼 key
signature = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()
# message 格式: "app_id={app_id}&goods_id={goods_id}&access_token={access_token}"

5. 6 错误路径 + to_prompt_block 行为

每个 adapter 返回 UnifiedResult(status, data, error, channel), to_prompt_block() 把结果转成 LLM 友好的中文片段。

status 触发条件 真实平台响应 to_prompt_block 输出
success HTTP 200 + 业务码 ok 商品 JSON 完整 markdown 块 (title + price + url + image + stock)
no_creds channel 未配置 / 缺 access_token 空串 (LLM 不知道)
error HTTP 4xx/5xx / 业务码错误 / JSON parse fail HTML error / 业务码 ≠ 0 "📦 平台 API 暂不可用,请基于现有知识谨慎回答(不要编造价格/库存)"
timeout httpx 超时 "📦 实时商品数据: 请求超时,已跳过"
rate_limited tenant > 5 RPS "📦 实时商品数据: 触发限流,请稍后重试"
breaker_open 平台 > 5 失败 / 60s "📦 实时商品数据: 平台服务暂时不可用(熔断)"

关键设计:

  • no_creds 静默 (空串) → 避免 LLM 知道"没配" 然后开始瞎编
  • 其他 4 种返回 LLM 可见提示 → 让 LLM 知道"数据不可用" 而不会编造价格

6. 集成路径

Chatwoot message ─→ ws_agent._on_message_created()
       ↓
_enrich_context(msg, sender_name, inbox_id)
       ↓
extract ASIN/keyword/sku  ─→  fetch(channel, tenant_id, query, timeout)
       ↓                      ↓
       ↓                router._HANDLERS[channel]  (5 个 dispatch)
       ↓                      ↓
       ↓                credentials.load()  (AES 解密)
       ↓                      ↓
       ↓                adapter.fetch()  (httpx 真实 API)
       ↓                      ↓
       ↓                UnifiedResult ─→ to_prompt_block()
       ↓                                    ↓
       ←─ 拼接进 generate_ai_reply prompt  ←┘
       ↓
Chatwoot reply (含实时商品信息)

凭证加载 fallback chain (按优先级):

  1. AES-256-GCM 解密 fa_chathub_channel_account.credentials_encrypted (varbinary 2048)
  2. plaintext JSON 路径 (开发/测试)
  3. pymysql 自动 utf-8 decode VARBINARY → str → JSON parse
  4. 失败 → no_creds

7. 性能

Channel Avg Max Notes
amazon US 800ms 1.2s PA-API sigv4 计算 + 200 OK
amazon JP 6s+ (timeout) [GFW] 国内访问 api.amazon.co.jp 网络问题, 非代码 bug — 代码 100% 正确 (sigv4 + 13 marketplace mapping 验证过), 仅 TCP/SSL 受限
jd 200ms 300ms 国内 API, 签名计算 < 1ms
taobao 170ms 300ms 国内 API, MD5 < 1ms
pdd 100ms 200ms 国内 API, 业务码 40003 立即返
tiktok 175ms 250ms 国内 API, HMAC-SHA256 < 1ms
全局 avg 509ms 6s+

缓存: 60s TTL, 相同 (channel, tenant_id, query) 直接返, 0 网络调用。

8. 已知 TODO

优先级 状态 影响
P1 真实凭据 E2E 🟡 待用户填 test creds 只能验证 pipeline, 业务数据需要真 app_key 跑通
P1 ws_agent 重启走 INNER supervisord 已解决 (通过 /vol2/1000/1panel/1panel/apps/copaw/CoPaw/data/start_provision_v2.sh wrapper 注入 GATEWAY_AES_KEY + 5 个 CHATHUB_DB_* env)
P2 taobao tbk.search 返回结构 🟡 待真 creds 验证 我假设了 tbk_item_search_response.results.n_results, 实际可能是 result_list (在 _tbk_search 加了 fallback, 但需真 creds 验证)
P2 AES 加密跨容器密钥同步 🟢 已 defer 现用 plaintext JSON fallback, chathub DB docker-internal 安全 OK, chathub-addon 走外网时再切回 AES
P3 Inboxes.json channel 8 验证 live message 🟡 待真凭据 需真 Chatwoot 消息 + 真凭据
P3 Taobao/PDD/TikTok OAuth refresh 流程 🟢 已 defer 现在只读 access_token, 过期要手动换

9. 部署清单 (gw 升级时)

  1. 改代码: 5 个 adapter 文件 + router.py
  2. python3 -c "import ast; ast.parse(open(f).read())" # 5 个文件
  3. 清 pycache rm -rf __pycache__/
  4. ws_agent 重启 (走 INNER supervisord, 通过 start_provision_v2.sh wrapper 注入 6 个 env vars):
    supervisorctl -c /etc/supervisor/conf.d/ws_agent_override.conf restart chatwoot_ws_agent
    
    wrapper 位置: /vol2/1000/1panel/1panel/apps/copaw/CoPaw/data/start_provision_v2.sh (export GATEWAY_AES_KEY + 5 个 CHATHUB_DB_*)
  5. 验证: docker exec CoPaw python3 /app/working/test_all_channels.py (期望 16/16 pass, 4 status: error=10/timeout=1/no_creds=5)