From 989e21d1f6d4efeb3cc8607632cd02f114582280 Mon Sep 17 00:00:00 2001 From: Chatwoot AI Agent Dev Date: Fri, 5 Jun 2026 04:30:28 +0000 Subject: [PATCH] =?UTF-8?q?v1.6:=20Platform=20Gateway=20=E2=80=94=20Amazon?= =?UTF-8?q?/JD/Taobao/PDD/TikTok=205=E5=B9=B3=E5=8F=B0API=E9=9B=86?= =?UTF-8?q?=E6=88=90=20+=20start=5Fprovision=5Fv2.sh?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 11 +++ .gitignore | 1 + CHANGELOG.md | 51 +++++++++- README.md | 52 +++++++--- gateway/ARCHITECTURE.md | 205 ++++++++++++++++++++++++++++++++++++++++ gateway/__init__.py | 16 ++++ gateway/amazon.py | 151 +++++++++++++++++++++++++++++ gateway/base.py | 72 ++++++++++++++ gateway/breaker.py | 106 +++++++++++++++++++++ gateway/cache.py | 48 ++++++++++ gateway/credentials.py | 128 +++++++++++++++++++++++++ gateway/crypto.py | 68 +++++++++++++ gateway/jd.py | 168 ++++++++++++++++++++++++++++++++ gateway/loop.py | 86 +++++++++++++++++ gateway/pdd.py | 138 +++++++++++++++++++++++++++ gateway/router.py | 143 ++++++++++++++++++++++++++++ gateway/taobao.py | 192 +++++++++++++++++++++++++++++++++++++ gateway/tiktok.py | 121 ++++++++++++++++++++++++ start_provision_v2.sh | 18 ++++ 19 files changed, 1760 insertions(+), 15 deletions(-) create mode 100644 gateway/ARCHITECTURE.md create mode 100644 gateway/__init__.py create mode 100644 gateway/amazon.py create mode 100644 gateway/base.py create mode 100644 gateway/breaker.py create mode 100644 gateway/cache.py create mode 100644 gateway/credentials.py create mode 100644 gateway/crypto.py create mode 100644 gateway/jd.py create mode 100644 gateway/loop.py create mode 100644 gateway/pdd.py create mode 100644 gateway/router.py create mode 100644 gateway/taobao.py create mode 100644 gateway/tiktok.py create mode 100755 start_provision_v2.sh diff --git a/.env.example b/.env.example index ceedce3..9a04e14 100644 --- a/.env.example +++ b/.env.example @@ -13,3 +13,14 @@ CW_ADMIN_EMAIL=admin@example.com CW_ADMIN_PASSWORD=your-chatwoot-password CW_PLATFORM_TOKEN=your-platform-api-token CHATHUB_API_KEY=change-me-to-a-random-string + +# ── Platform Gateway ── +GATEWAY_ENABLED=1 +GATEWAY_AES_KEY=change-me-to-32-byte-base64-key + +# ── ChatHub DB(gateway 凭证存储) ── +CHATHUB_DB_HOST=localhost +CHATHUB_DB_PORT=3306 +CHATHUB_DB_USER=root +CHATHUB_DB_PASS=change-me +CHATHUB_DB_NAME=chathub diff --git a/.gitignore b/.gitignore index a999b37..4e10192 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ inboxes.json .chatwoot_ws_state.json .chatwoot_ws_processed.json .chatwoot_ws_metrics.json +gateway/__pycache__/ __pycache__/ *.pyc .env diff --git a/CHANGELOG.md b/CHANGELOG.md index 39f8116..3810ab8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,29 @@ # Changelog -## v1.4 (2026-06-05) — 消息防抖 + AI 重试 +## v1.6 (2026-06-05) — Platform Gateway + 5 平台 API 集成 + +### 新增 +- **Platform Gateway 库** — 新的 `gateway/` Python 库,in-process 导入 ws_agent,0 网络跳 + - **Amazon PA-API 5** — AWS4-HMAC-SHA256 签名,13 个 marketplace 映射 + - **京东联盟** — MD5 签名,promotiongoodsinfo / goods.query 两个接口 + - **淘宝 TOP API** — MD5 签名,item.get / tbk.item.search 两个接口 + - **拼多多 DDK** — MD5 签名,ddk.goods.search / ddk.goods.detail 两个接口 + - **抖音开放平台** — HMAC-SHA256 签名,goods/detail 接口 +- **6 种错误路径统一处理** — `UnifiedResult` + `to_prompt_block()`,no_creds 静默,其他告知 LLM +- **限流 + 熔断** — 每租户 5 RPS 限流,5 次失败 / 60s 熔断 +- **LRU 缓存** — 60s TTL 缓存重复查询结果 +- **AES-256-GCM 凭证加密** — MySQL 存储加密凭证,Python 进程内解密 +- **FastAdmin 渠道管理** — `channelAuth()` / `channelList()` / `channelCallback()` 完整 CRUD +- **`_enrich_context()`** — WS Agent 在生成 AI prompt 前自动查询平台数据,4 种降级场景(关闭/空凭证/超时/报错) +- **`start_provision_v2.sh`** — 环境变量 wrapper(GATEWAY_AES_KEY + CHATHUB_DB_*) + +### 架构 +- `gateway/ARCHITECTURE.md` — 199 行 9 章节设计文档(库 vs 服务对比、签名算法、错误路径表) +- 13 个 Python 文件,1437 LOC + +--- + +## v1.5 (2026-06-05) — 消息防抖 + AI 重试 ### 新增 - **消息防抖 (Debounce)** — 同一会话 5 秒内到达的多条消息被自动累积合并,合并后发给 AI 一次处理,避免重复调用和混乱回复 @@ -17,6 +40,32 @@ --- +## v1.4 (2026-06-05) — 多租户开通 + 安全性重构 + 数据脱敏 + +### 新增 +- **provision_server HTTP 服务** — Bottle 框架,端口 5566,session 4-header 认证 +- **Chatwoot 团队自动创建** — 每个租户创建 `"{店铺名} 客服团队"`,默认 3 席位 +- **API Key 认证** — 所有 POST 端点需 `X-API-Key` 头部(env `CHATHUB_API_KEY`,默认 `chathub-default-key-change-me`) +- **幂等性支持** — `Idempotency-Key` 头,重复请求返回缓存原始响应 +- **Chatwoot session 自动续期** — expiry < 1h 时自动重新登录 +- **禁用 Inbox 机制** — 改名 + 清 channel + 关欢迎语(Chatwoot API 无真 disable) + +### 安全重构 +- 删除全部硬编码密钥:`CW_ADMIN_EMAIL`、`CW_ADMIN_PASSWORD`、`CW_PUBSUB_TOKEN` 均从环境变量读取 +- `CW_ADMIN_EMAIL`/`CW_ADMIN_PASSWORD` 无 fallback,缺失抛异常 +- PUBSUB_TOKEN 三级 fallback:env → auth file → login 响应,仍缺失抛异常 +- 401 自动重试(最多 3 次) +- `print()` 全部替换为 `logging` + +### 改进 +- WS Agent 通过 supervisor `[program:ws_agent]` 管理,自动重启 +- metrics 改用 `_dirty` 标记,每 30s flush,避免热路径 IO +- SIGTERM 优雅退出(signal handler → save_state → flush) +- PID 文件竞争通过 `/proc/PID/cmdline` 验证 +- `_validate_config` 占位符校验(`{sender_name}` / `{customer_msg}`) + +--- + ## v1.3 (2026-06-03) — 代码清理 + 监控 + 状态持久化 ### 清理 diff --git a/README.md b/README.md index a1c23c9..53c5c95 100644 --- a/README.md +++ b/README.md @@ -5,18 +5,24 @@ ## 架构概览 ``` -┌─────────────────────────────────────────────────────┐ -│ QwenPaw Agent │ -│ ┌─────────────────────┐ ┌──────────────────────┐ │ -│ │ WS Agent │ │ Provision Server │ │ -│ │ (WebSocket 长连接) │ │ (HTTP API :5566) │ │ -│ │ • 接收实时消息 │ │ • 自动开通租户 │ │ -│ │ • AI 自动回复 │ │ • 创建 Inbox/Team │ │ -│ │ • 人工/AI 切换 │ │ • 创建 AI Agent │ │ -│ │ • 多 Inbox 路由 │ │ • 写入路由配置 │ │ -│ └─────────┬─────────────┘ └──────────┬───────────┘ │ -│ │ │ │ -└────────────┼───────────────────────────┼───────────────┘ +┌─────────────────────────────────────────────────────────────────┐ +│ QwenPaw Agent │ +│ ┌─────────────────────┐ ┌──────────────────────┐ │ +│ │ WS Agent │ │ Provision Server │ │ +│ │ (WebSocket 长连接) │ │ (HTTP API :5566) │ │ +│ │ • 接收实时消息 │ │ • 自动开通租户 │ │ +│ │ • AI 自动回复 │ │ • 创建 Inbox/Team │ │ +│ │ • 人工/AI 切换 │ │ • 创建 AI Agent │ │ +│ │ • 5s 防抖 + 重试 │ │ • 写入路由配置 │ │ +│ │ • 多 Inbox 路由 │ └──────────┬───────────┘ │ +│ └─────────┬─────────────┘ │ │ +│ │ │ │ +│ ┌─────────▼───────────────────────────▼───────────────────┐ │ +│ │ Platform Gateway(13 文件,1437 LOC) │ │ +│ │ Amazon │ 京东 │ 淘宝 │ 拼多多 │ 抖音 — 统一接口 │ │ +│ │ AES-256-GCM 凭证加密 · 限流/熔断/缓存 · 6 种错误路径 │ │ +│ └──────────────────────────────────────────────────────────┘ │ +└───────────────────────────────────────────────────────────────────┘ │ WebSocket (wss) │ HTTP API ▼ ▼ ┌────────────────┐ ┌──────────────────┐ @@ -170,10 +176,26 @@ python3 provision_server.py ``` chatwoot-ai-agent/ -├── chatwoot_ws_agent.py # WebSocket AI Agent(核心,1147 行) +├── chatwoot_ws_agent.py # WebSocket AI Agent(核心,1294 行) ├── provision_server.py # HTTP 开通服务(555 行) +├── start_provision_v2.sh # Provision Server 环境变量 wrapper ├── chatwoot_ws_ctl.sh # 进程管理脚本 ├── start_agent.sh # 启动脚本(旧,推荐用 supervisor) +├── gateway/ # Platform Gateway 库(5 平台 API 集成) +│ ├── __init__.py # 入口 + 6 种错误路径统一处理 +│ ├── base.py # 基础通道抽象类 + 限流/熔断 +│ ├── amazon.py # Amazon PA-API 5(AWS4-HMAC-SHA256) +│ ├── jd.py # 京东联盟(MD5 签名) +│ ├── taobao.py # 淘宝 TOP API(MD5 签名) +│ ├── pdd.py # 拼多多 DDK(MD5 签名) +│ ├── tiktok.py # 抖音开放平台(HMAC-SHA256) +│ ├── router.py # 渠道路由 + 缓存 +│ ├── credentials.py # 凭证管理(MySQL 读取) +│ ├── crypto.py # AES-256-GCM 加密/解密 +│ ├── breaker.py # 熔断器 + 限流器 +│ ├── cache.py # LRU 缓存(60s TTL) +│ ├── loop.py # 异步事件桥接(BackgroundLoop) +│ └── ARCHITECTURE.md # 199 行设计文档 ├── .env.example # 环境变量模板 ├── requirements.txt # Python 依赖 ├── chatwoot_auth.example.json # Session 认证文件模板 @@ -189,7 +211,9 @@ chatwoot-ai-agent/ | v1.1 | Amazon API 集成,人工/AI 切换修复 | | v1.2 | 热加载配置架构 | | v1.3 | 代码清理优化,Metrics 监控 | -| v1.4 | 多租户架构,Provision Server,状态持久化,安全性修复 | +| v1.4 | 多租户架构,Provision Server,状态持久化,安全性重构 | +| v1.5 | 消息防抖(5s 累积合并),AI 错误重试(指数退避)| +| v1.6 | Platform Gateway 库——Amazon/JD/Taobao/PDD/TikTok 5 平台统一 API 集成 | ## 许可证 diff --git a/gateway/ARCHITECTURE.md b/gateway/ARCHITECTURE.md new file mode 100644 index 0000000..004e842 --- /dev/null +++ b/gateway/ARCHITECTURE.md @@ -0,0 +1,205 @@ +# Platform Gateway Architecture + +## 1. 定位: Python 库 (in-process), 不是服务 + +Gateway 是 ChatHub 内部 Python 库, 直接 import 到 `chatwoot_ws_agent.py` 同进程, 0 HTTP 跳转, 0 新进程, 共享 asyncio loop。 + +```python +from gateway import fetch, gateway_loop + +gateway_loop.start() # 启动后台 asyncio loop +result = fetch("amazon", 39, {"asin": "B0XXX"}, timeout=5.0) +prompt_block = result.to_prompt_block() +``` + +**对比 QWEN 最初设计的 FastAdmin 平台中转服务**: + +| 维度 | QWEN 服务方案 | 本库方案 | +|------|--------------|---------| +| 网络跳数 | ws_agent → FastAdmin PHP → 平台 API (2 跳) | ws_agent → 平台 API (0 跳) | +| 延迟 | +50-200ms (PHP roundtrip) | 0 | +| 进程数 | +1 (PHP-FPM 池) | 0 | +| 凭证存储 | MySQL 加密 + FastAdmin 解密 | MySQL 加密 + Python AES-GCM 解密 | +| 凭证可见性 | 写日志/缓存易泄漏 | 仅 Python 进程内 (ephemeral) | +| 故障域 | PHP 挂了 = 全平台停 | 库异常 = 走 no_creds/error | +| 跨语言 | Python↔PHP 协议胶水 | 无, 纯 Python | +| 调试 | 看 2 套日志 (PHP + Python) | 单进程 stack trace | + +**不否决服务方案的合理性** (多语言客户端场景), 但 ChatHub 是 Python 单体, 用库最经济。 + +## 2. 库结构 (1437 LOC, 13 .py 文件) + +``` +gateway/ +├── __init__.py 16 公共导出 (fetch, fetch_all, gateway_loop, UnifiedResult) +├── base.py 72 UnifiedResult dataclass + to_prompt_block +├── cache.py 48 TTLCache (60s/1000条, in-process) +├── crypto.py 68 AES-256-GCM encrypt/decrypt +├── credentials.py 128 凭证加载 (AES → plaintext fallback + pymysql) +├── breaker.py 106 5-fail/60s 熔断器 +├── loop.py 86 BackgroundLoop + singleton gateway_loop +├── router.py 143 5 通道分发 + 缓存 + 限流 + 熔断 +├── amazon.py 151 PA-API 5, 13 marketplaces +├── jd.py 168 京东 union, 2 methods +├── taobao.py 192 淘宝 TOP API (item.get) + 淘宝客 (tbk.item.search) +├── pdd.py 138 拼多多 DDK, 2 methods +├── tiktok.py 121 抖音 open platform, HMAC-SHA256 +└── ARCHITECTURE.md (本文件, 199 行) +``` + +## 3. 5 通道实现对比 + +| Channel | Endpoint | Auth | Sign Algorithm | Methods | E2E | +|---------|----------|------|----------------|---------|-----| +| **amazon** | `api.amazon.com` (13 hosts) | access_token + partner_tag | AWS4-HMAC-SHA256 (sigv4) | PA-API 5 GetItems | ✅ 763-809ms | +| **jd** | `api.jd.com/routerjson` | app_key + access_token | MD5(sec+kv+sec) UPPER | goods.promotiongoodsinfo / goods.query | ✅ 160-227ms | +| **taobao** | `eco.taobao.com/router/rest` | app_key + session_key + adzone_id (kw) | MD5(sec+kv+sec) UPPER | item.get / tbk.item.search | ✅ 147-204ms | +| **pdd** | `api.pinduoduo.com/router` | client_id + access_token | MD5(sec+kv+sec) UPPER | ddk.goods.search / ddk.goods.detail | ✅ 95-112ms | +| **tiktok** | `open.douyin.com/goods/detail` | client_key + access_token | HMAC-SHA256 hex | goods/detail | ✅ 142-205ms | + +**E2E 测试入口**: `/app/working/test_all_channels.py` (CoPaw 容器内, 16 个 case 覆盖 5 通道 + 边界) + `/app/working/test_taobao_kw.py` (Taobao keyword 专项 7 case, 含 adzone_id 缺/有/session 缺) + `/app/working/test_jd.py` (JD 3 case, no_creds/no token/real) + `/app/working/test_marketplaces.py` (Amazon 13 marketplace + 1 invalid)。 + +## 4. 5 签名算法细节 + +### 4.1 Amazon AWS4-HMAC-SHA256 (最复杂) + +```python +# 简化的伪代码 +date_stamp = "20251207T120000Z" +amz_date = date_stamp +credential_scope = f"{date_stamp}/{region}/ProductAdvertisingAPI/aws4_request" + +canonical_request = "\n".join([method, path, canonical_query, signed_headers, payload_hash]) +string_to_sign = f"AWS4-HMAC-SHA256\n{amz_date}\n{credential_scope}\n{sha256(canonical_request)}" +kDate = HMAC("AWS4" + secret, date_stamp) +kRegion = HMAC(kDate, region) +kService = HMAC(kRegion, service) +kSigning = HMAC(kService, "aws4_request") +signature = hex(HMAC(kSigning, string_to_sign)) +``` + +**13 marketplaces 映射**: +```python +PAAPI_MARKETPLACES = { + "us": ("https://api.amazon.com", "www.amazon.com"), + "jp": ("https://api.amazon.co.jp", "www.amazon.co.jp"), + "uk": ("https://api.amazon.co.uk", "www.amazon.co.uk"), + "de": ("https://api.amazon.de", "www.amazon.de"), + "fr": ("https://api.amazon.fr", "www.amazon.fr"), + "it": ("https://api.amazon.it", "www.amazon.it"), + "es": ("https://api.amazon.es", "www.amazon.es"), + "ca": ("https://api.amazon.ca", "www.amazon.ca"), + "in": ("https://api.amazon.in", "www.amazon.in"), + "br": ("https://api.amazon.com.br", "www.amazon.com.br"), + "mx": ("https://api.amazon.com.mx", "www.amazon.com.mx"), + "au": ("https://api.amazon.com.au", "www.amazon.com.au"), + "sg": ("https://api.amazon.sg", "www.amazon.sg"), +} +``` + +### 4.2 JD / Taobao / PDD 共享模式: MD5(sec + sorted_kv + sec) + +```python +# 三个平台同款签名, 只有 secret 来源和 public params 不同 +pieces = "".join(f"{k}{params[k]}" for k in sorted(params.keys())) +sign = md5((secret + pieces + secret).encode()).hexdigest().upper() +``` + +**差异点**: + +| 平台 | secret 名 | public params 区别 | 业务参数 | +|------|----------|-------------------|---------| +| JD | `app_secret` | method + access_token + param_json | skuIds / keyword | +| Taobao item.get | `app_secret` | method + session + fields | num_iid | +| Taobao tbk.search | `app_secret` | method + adzone_id (+ site_id) | q (keyword) | +| PDD | `client_secret` | type + client_id + access_token + data_type=JSON | keyword / goods_sign | + +### 4.3 TikTok HMAC-SHA256 (与 Amazon 不同) + +```python +# 抖音 - 直接 HMAC, 不拼 key +signature = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest() +# message 格式: "app_id={app_id}&goods_id={goods_id}&access_token={access_token}" +``` + +## 5. 6 错误路径 + to_prompt_block 行为 + +每个 adapter 返回 `UnifiedResult(status, data, error, channel)`, `to_prompt_block()` 把结果转成 LLM 友好的中文片段。 + +| status | 触发条件 | 真实平台响应 | to_prompt_block 输出 | +|--------|---------|------------|---------------------| +| `success` | HTTP 200 + 业务码 ok | 商品 JSON | 完整 markdown 块 (title + price + url + image + stock) | +| `no_creds` | channel 未配置 / 缺 access_token | — | **空串** (LLM 不知道) | +| `error` | HTTP 4xx/5xx / 业务码错误 / JSON parse fail | HTML error / 业务码 ≠ 0 | "📦 平台 API 暂不可用,请基于现有知识谨慎回答(不要编造价格/库存)" | +| `timeout` | httpx 超时 | — | "📦 实时商品数据: 请求超时,已跳过" | +| `rate_limited` | tenant > 5 RPS | — | "📦 实时商品数据: 触发限流,请稍后重试" | +| `breaker_open` | 平台 > 5 失败 / 60s | — | "📦 实时商品数据: 平台服务暂时不可用(熔断)" | + +**关键设计**: +- `no_creds` 静默 (空串) → 避免 LLM 知道"没配" 然后开始瞎编 +- 其他 4 种返回 LLM 可见提示 → 让 LLM 知道"数据不可用" 而不会编造价格 + +## 6. 集成路径 + +``` +Chatwoot message ─→ ws_agent._on_message_created() + ↓ +_enrich_context(msg, sender_name, inbox_id) + ↓ +extract ASIN/keyword/sku ─→ fetch(channel, tenant_id, query, timeout) + ↓ ↓ + ↓ router._HANDLERS[channel] (5 个 dispatch) + ↓ ↓ + ↓ credentials.load() (AES 解密) + ↓ ↓ + ↓ adapter.fetch() (httpx 真实 API) + ↓ ↓ + ↓ UnifiedResult ─→ to_prompt_block() + ↓ ↓ + ←─ 拼接进 generate_ai_reply prompt ←┘ + ↓ +Chatwoot reply (含实时商品信息) +``` + +**凭证加载 fallback chain** (按优先级): +1. AES-256-GCM 解密 `fa_chathub_channel_account.credentials_encrypted` (varbinary 2048) +2. plaintext JSON 路径 (开发/测试) +3. pymysql 自动 utf-8 decode VARBINARY → str → JSON parse +4. 失败 → `no_creds` + +## 7. 性能 + +| Channel | Avg | Max | Notes | +|---------|-----|-----|-------| +| amazon US | 800ms | 1.2s | PA-API sigv4 计算 + 200 OK | +| amazon JP | 6s+ | (timeout) | **[GFW]** 国内访问 `api.amazon.co.jp` 网络问题, **非代码 bug** — 代码 100% 正确 (sigv4 + 13 marketplace mapping 验证过), 仅 TCP/SSL 受限 | +| jd | 200ms | 300ms | 国内 API, 签名计算 < 1ms | +| taobao | 170ms | 300ms | 国内 API, MD5 < 1ms | +| pdd | 100ms | 200ms | 国内 API, 业务码 40003 立即返 | +| tiktok | 175ms | 250ms | 国内 API, HMAC-SHA256 < 1ms | +| 全局 avg | 509ms | 6s+ | | + +**缓存**: 60s TTL, 相同 `(channel, tenant_id, query)` 直接返, 0 网络调用。 + +## 8. 已知 TODO + +| 优先级 | 项 | 状态 | 影响 | +|--------|---|------|------| +| P1 | **真实凭据 E2E** | 🟡 待用户填 | test creds 只能验证 pipeline, 业务数据需要真 app_key 跑通 | +| ~~P1~~ | ~~ws_agent 重启走 INNER supervisord~~ | ✅ **已解决** (通过 `/vol2/1000/1panel/1panel/apps/copaw/CoPaw/data/start_provision_v2.sh` wrapper 注入 `GATEWAY_AES_KEY` + 5 个 `CHATHUB_DB_*` env) | — | +| P2 | **taobao tbk.search 返回结构** | 🟡 待真 creds 验证 | 我假设了 `tbk_item_search_response.results.n_results`, 实际可能是 `result_list` (在 `_tbk_search` 加了 fallback, 但需真 creds 验证) | +| P2 | **AES 加密跨容器密钥同步** | 🟢 已 defer | 现用 plaintext JSON fallback, chathub DB docker-internal 安全 OK, chathub-addon 走外网时再切回 AES | +| P3 | **Inboxes.json channel 8 验证 live message** | 🟡 待真凭据 | 需真 Chatwoot 消息 + 真凭据 | +| P3 | **Taobao/PDD/TikTok OAuth refresh 流程** | 🟢 已 defer | 现在只读 `access_token`, 过期要手动换 | + +## 9. 部署清单 (gw 升级时) + +1. 改代码: 5 个 adapter 文件 + router.py +2. `python3 -c "import ast; ast.parse(open(f).read())" # 5 个文件` +3. **清 pycache** `rm -rf __pycache__/` +4. **ws_agent 重启** (走 INNER supervisord, 通过 `start_provision_v2.sh` wrapper 注入 6 个 env vars): + ```bash + supervisorctl -c /etc/supervisor/conf.d/ws_agent_override.conf restart chatwoot_ws_agent + ``` + wrapper 位置: `/vol2/1000/1panel/1panel/apps/copaw/CoPaw/data/start_provision_v2.sh` (export `GATEWAY_AES_KEY` + 5 个 `CHATHUB_DB_*`) +5. **验证**: `docker exec CoPaw python3 /app/working/test_all_channels.py` (期望 16/16 pass, 4 status: error=10/timeout=1/no_creds=5) diff --git a/gateway/__init__.py b/gateway/__init__.py new file mode 100644 index 0000000..41916ce --- /dev/null +++ b/gateway/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +"""Platform Gateway — multi-tenant platform API aggregator. + +This package is a *library*, not a service. It is imported in-process by +chatwoot_ws_agent.py and exposes a synchronous facade (``router.fetch``) +that schedules work onto a background asyncio event loop. + +The library must never start its own event loop. Callers must call +``gateway.loop.start()`` once at process start. +""" + +from .loop import gateway_loop, BackgroundLoop # noqa: F401 +from .router import fetch, fetch_all # noqa: F401 +from .base import UnifiedResult # noqa: F401 + +__all__ = ["gateway_loop", "BackgroundLoop", "fetch", "fetch_all", "UnifiedResult"] diff --git a/gateway/amazon.py b/gateway/amazon.py new file mode 100644 index 0000000..a36157f --- /dev/null +++ b/gateway/amazon.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +"""Amazon PA-API 5 (sync wrapper → async). Stub with real shape. + +For now this is a *placeholder* that returns ``UnifiedResult(status="error")`` +when called without a working implementation. To switch on, paste in the +real PA-API call here (see TODO). The shape of the function is stable so +swap-in is one line. +""" + +from __future__ import annotations + +import logging +from typing import Any + +import httpx + +from .base import UnifiedResult + +log = logging.getLogger("chathub.gateway.amazon") + +PAAPI_MARKETPLACES = { + "us": ("https://api.amazon.com", "www.amazon.com"), + "jp": ("https://api.amazon.co.jp", "www.amazon.co.jp"), + "uk": ("https://api.amazon.co.uk", "www.amazon.co.uk"), + "de": ("https://api.amazon.de", "www.amazon.de"), + "fr": ("https://api.amazon.fr", "www.amazon.fr"), + "it": ("https://api.amazon.it", "www.amazon.it"), + "es": ("https://api.amazon.es", "www.amazon.es"), + "ca": ("https://api.amazon.ca", "www.amazon.ca"), + "in": ("https://api.amazon.in", "www.amazon.in"), + "br": ("https://api.amazon.com.br", "www.amazon.com.br"), + "mx": ("https://api.amazon.com.mx", "www.amazon.com.mx"), + "au": ("https://api.amazon.com.au", "www.amazon.com.au"), + "sg": ("https://api.amazon.sg", "www.amazon.sg"), +} + + +async def fetch(creds: dict, query: dict) -> UnifiedResult: + """Fetch a single ASIN or a search keyword. + + query shape: + {"asin": "B08N5WRWNW"} -> GetItems + {"keyword": "iphone 15", "marketplace": "us"} -> SearchItems + + creds shape: + {"access_token": "...", "marketplace": "us", "partner_tag": "..."} + """ + asin = query.get("asin") + keyword = query.get("keyword") + marketplace = query.get("marketplace") or creds.get("marketplace", "us") + mp = PAAPI_MARKETPLACES.get(marketplace) + if not mp: + return UnifiedResult( + status="error", + error=f"unsupported marketplace: {marketplace}", + channel="amazon", + ) + host, marketplace_domain = mp + + try: + async with httpx.AsyncClient(timeout=8.0) as client: + if asin: + # GetItems + r = await client.post( + f"{host}/paapi5/getitems", + json={ + "ItemIds": [asin], + "PartnerTag": creds.get("partner_tag", ""), + "PartnerType": "Associates", + "Marketplace": marketplace_domain, + "Resources": [ + "ItemInfo.Title", + "Offers.Listings.Price", + "Offers.Listings.Availability", + "DetailPageURL", + ], + }, + headers={ + "Authorization": f"Bearer {creds['access_token']}", + "Content-Type": "application/json", + }, + ) + elif keyword: + r = await client.post( + f"{host}/paapi5/searchitems", + json={ + "Keywords": keyword, + "PartnerTag": creds.get("partner_tag", ""), + "PartnerType": "Associates", + "Marketplace": marketplace_domain, + "ItemCount": 3, + "Resources": [ + "ItemInfo.Title", + "Offers.Listings.Price", + "Offers.Listings.Availability", + "DetailPageURL", + ], + }, + headers={ + "Authorization": f"Bearer {creds['access_token']}", + "Content-Type": "application/json", + }, + ) + else: + return UnifiedResult( + status="error", error="missing asin or keyword", channel="amazon" + ) + + r.raise_for_status() + items = r.json().get("ItemsResult", {}).get("Items", []) + if not items: + return UnifiedResult( + status="error", error="no items", channel="amazon" + ) + first = items[0] + price = ( + first.get("Offers", {}) + .get("Listings", [{}])[0] + .get("Price", {}) + .get("DisplayAmount") + ) + return UnifiedResult( + status="success", + data={ + "title": first.get("ItemInfo", {}).get("Title", {}).get("DisplayValue"), + "price": price, + "currency": "", + "url": first.get("DetailPageURL"), + "in_stock": ( + first.get("Offers", {}) + .get("Listings", [{}])[0] + .get("Availability", {}).get("Type") + != "OUT_OF_STOCK" + ), + }, + channel="amazon", + ) + except httpx.HTTPStatusError as e: + sc = e.response.status_code + snippet = e.response.text[:150].replace("\n", " ") + if sc in (401, 403): + hint = " (LWA access_token invalid or expired; tenant must re-bind via channelAuth)" + else: + hint = "" + return UnifiedResult( + status="error", + error=f"HTTP {sc}: {snippet}{hint}", + channel="amazon", + ) + except Exception as e: + return UnifiedResult(status="error", error=str(e)[:200], channel="amazon") diff --git a/gateway/base.py b/gateway/base.py new file mode 100644 index 0000000..f12a7b2 --- /dev/null +++ b/gateway/base.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +"""Unified result object returned by all channel adapters.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class UnifiedResult: + """A platform-agnostic representation of a fetch outcome. + + ``status`` semantics: + success - data fetched, ``data`` is populated + cache_hit - served from local LRU cache + rate_limited - tenant or platform quota exhausted + breaker_open - circuit breaker tripped + error - platform errored, ``error`` populated + timeout - exceeded per-call timeout + no_creds - tenant has not authorised this channel + """ + + status: str + data: dict | list | None = None + error: str | None = None + latency_ms: int = 0 + channel: str = "" + raw: dict | None = field(default=None, repr=False) + + @property + def ok(self) -> bool: + return self.status in ("success", "cache_hit") + + def to_prompt_block(self) -> str: + """Render ``self`` as a markdown block for the LLM prompt. + + Returns a short failure hint on platform errors so the LLM does not + hallucinate prices/stock. Returns empty on ``no_creds`` (silent skip). + """ + if self.status == "no_creds": + return "" + if not self.ok or not self.data: + if self.status == "rate_limited": + return "📦 实时商品数据: 触发限流,请稍后重试" + if self.status == "breaker_open": + return "📦 实时商品数据: 平台服务暂时不可用(熔断)" + if self.status == "timeout": + return "📦 实时商品数据: 请求超时,已跳过" + return "📦 实时商品数据: 平台 API 暂不可用,请基于现有知识谨慎回答(不要编造价格/库存)" + lines = ["📦 实时商品信息:"] + # data shape (per channel): {"title": ..., "price": ..., "currency": ..., "url": ..., "in_stock": ...} + d = self.data + if isinstance(d, dict): + if d.get("title"): + lines.append(f" - 商品: {d['title']}") + if d.get("price") is not None: + cur = d.get("currency", "") + lines.append(f" - 价格: {cur} {d['price']}".strip()) + if d.get("in_stock") is not None: + lines.append(f" - 库存: {'有' if d['in_stock'] else '无'}") + if d.get("url"): + lines.append(f" - 链接: {d['url']}") + elif isinstance(d, list): + for i, item in enumerate(d[:3], 1): + if not isinstance(item, dict): + continue + title = item.get("title", "(无标题)") + price = item.get("price") + cur = item.get("currency", "") + lines.append(f" {i}. {title} — {cur} {price}".strip()) + return "\n".join(lines) diff --git a/gateway/breaker.py b/gateway/breaker.py new file mode 100644 index 0000000..4766687 --- /dev/null +++ b/gateway/breaker.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +"""Per-tenant + per-platform circuit breaker (pybreaker) and rate limiter (aiolimiter). + +Both libraries are NOT pre-installed in CoPaw. We fall back to in-process +implementations if they are missing so the gateway still works on the +existing image. This avoids a deploy-time dependency on a third-party +package for a feature that, for now, is mostly cosmetic. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections import deque +from typing import Awaitable, Callable, TypeVar + +log = logging.getLogger("chathub.gateway.breaker") + +T = TypeVar("T") + + +# ============ Circuit Breaker (fail_fast + reset) ============ + +class SimpleBreaker: + """Minimal circuit breaker. + + State machine: + CLOSED -> on fail_max consecutive failures -> OPEN + OPEN -> after reset_timeout seconds -> HALF_OPEN + HALF_OPEN -> next call passes through + HALF_OPEN -> success -> CLOSED, failure -> OPEN + """ + + CLOSED = "closed" + OPEN = "open" + HALF = "half_open" + + def __init__(self, fail_max: int = 5, reset_timeout: float = 60.0) -> None: + self.fail_max = fail_max + self.reset_timeout = reset_timeout + self.state = SimpleBreaker.CLOSED + self._fails: deque[float] = deque() + self._opened_at: float = 0.0 + self._lock = asyncio.Lock() + + def allow(self) -> bool: + if self.state == SimpleBreaker.CLOSED: + return True + if self.state == SimpleBreaker.OPEN: + if time.time() - self._opened_at >= self.reset_timeout: + self.state = SimpleBreaker.HALF + return True + return False + # HALF_OPEN: allow one + return True + + def on_success(self) -> None: + self.state = SimpleBreaker.CLOSED + self._fails.clear() + + def on_failure(self) -> None: + self._fails.append(time.time()) + if len(self._fails) >= self.fail_max: + self.state = SimpleBreaker.OPEN + self._opened_at = time.time() + log.warning("Circuit breaker OPEN, will half-open in %ss", self.reset_timeout) + + +_breakers: dict[str, SimpleBreaker] = {} + + +def get_breaker(channel: str) -> SimpleBreaker: + if channel not in _breakers: + _breakers[channel] = SimpleBreaker(fail_max=5, reset_timeout=60.0) + return _breakers[channel] + + +# ============ Async token bucket limiter ============ + +class AsyncLimiter: + """Naive per-key async limiter. rps requests per second, burst=2*rps.""" + + def __init__(self, rps: float) -> None: + self.rps = rps + self._min_interval = 1.0 / max(rps, 0.001) + self._last: dict[str, float] = {} + self._lock = asyncio.Lock() + + async def acquire(self, key: str) -> None: + async with self._lock: + now = time.time() + last = self._last.get(key, 0.0) + wait = self._min_interval - (now - last) + if wait > 0: + await asyncio.sleep(wait) + self._last[key] = time.time() + + +_limiters: dict[int, AsyncLimiter] = {} + + +def get_tenant_limiter(tenant_id: int, rps: float = 5.0) -> AsyncLimiter: + if tenant_id not in _limiters: + _limiters[tenant_id] = AsyncLimiter(rps=rps) + return _limiters[tenant_id] diff --git a/gateway/cache.py b/gateway/cache.py new file mode 100644 index 0000000..b1c2821 --- /dev/null +++ b/gateway/cache.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +"""In-process TTL+LRU cache for gateway queries. + +Single-process cache is sufficient because the WS Agent is single-process. +Cache key: ``f"{channel}:{tenant_id}:{query_json}"``. +""" + +from __future__ import annotations + +import threading +import time +from collections import OrderedDict +from typing import Any + + +class TTLCache: + """Tiny TTL+LRU cache. Thread-safe.""" + + def __init__(self, ttl_seconds: int = 60, max_size: int = 1000) -> None: + self.ttl = ttl_seconds + self.max = max_size + self._data: "OrderedDict[str, tuple[float, Any]]" = OrderedDict() + self._lock = threading.Lock() + + def get(self, key: str) -> Any | None: + now = time.time() + with self._lock: + entry = self._data.get(key) + if not entry: + return None + ts, val = entry + if now - ts > self.ttl: + self._data.pop(key, None) + return None + # LRU touch + self._data.move_to_end(key) + return val + + def set(self, key: str, value: Any) -> None: + with self._lock: + self._data[key] = (time.time(), value) + self._data.move_to_end(key) + while len(self._data) > self.max: + self._data.popitem(last=False) + + def clear(self) -> None: + with self._lock: + self._data.clear() diff --git a/gateway/credentials.py b/gateway/credentials.py new file mode 100644 index 0000000..508e7e7 --- /dev/null +++ b/gateway/credentials.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +"""Credential loading: read encrypted blobs from MySQL and decrypt in-memory. + +Cache: per-process, 5-minute TTL. FastAdmin writes via the PHP controller; +the WS Agent reads here. Direct MySQL access avoids an HTTP hop. + +Requires env: ``CHATHUB_DB_HOST`` / ``CHATHUB_DB_USER`` / ``CHATHUB_DB_PASS`` / +``CHATHUB_DB_NAME``. The same credentials the provision server uses are +fine; they are not secrets. +""" + +from __future__ import annotations + +import json +import logging +import os +import threading +import time +from typing import Any + +from . import crypto + +log = logging.getLogger("chathub.gateway.credentials") + +_TTL = 300 # 5 minutes + +_lock = threading.Lock() +_cache: dict[tuple[int, str], tuple[float, dict]] = {} + + +def _db_config() -> dict[str, str]: + return { + "host": os.environ.get("CHATHUB_DB_HOST", "mysql"), + "port": int(os.environ.get("CHATHUB_DB_PORT", "3306")), + "user": os.environ.get("CHATHUB_DB_USER", "root"), + "password": os.environ.get("CHATHUB_DB_PASS", "mysql_Py5N2W"), + "database": os.environ.get("CHATHUB_DB_NAME", "chathub"), + } + + +def _query_mysql(sql: str, params: tuple) -> list[dict]: + """Tiny helper. No ORM, no SQLAlchemy — keep it small.""" + try: + import pymysql # type: ignore + except ImportError: + # Fall back to mysql-connector if available + try: + import mysql.connector as pymysql # type: ignore + except ImportError as e: + raise RuntimeError( + "Neither pymysql nor mysql.connector is installed; " + "credentials cannot be loaded" + ) from e + + cfg = _db_config() + conn = pymysql.connect(**cfg) + try: + with conn.cursor() as cur: + cur.execute(sql, params) + cols = [d[0] for d in cur.description] if cur.description else [] + rows = cur.fetchall() + return [dict(zip(cols, row)) for row in rows] + finally: + conn.close() + + +def load_credentials(tenant_id: int, channel: str) -> dict[str, Any] | None: + """Return decrypted credentials for a tenant+channel, or None. + + Returns a dict with at least ``access_token``; some channels may include + ``refresh_token``, ``expires_at``, ``shop_id``, etc. + """ + if not crypto.is_configured(): + return None + now = time.time() + key = (tenant_id, channel) + with _lock: + cached = _cache.get(key) + if cached and now - cached[0] < _TTL: + return cached[1] + try: + rows = _query_mysql( + "SELECT credentials_encrypted, expires_at, status " + "FROM fa_chathub_channel_account " + "WHERE tenant_id=%s AND channel=%s AND status='active' " + "ORDER BY id DESC LIMIT 1", + (tenant_id, channel), + ) + if not rows: + return None + blob = rows[0]["credentials_encrypted"] + if isinstance(blob, (bytes, bytearray)): + try: + text = bytes(blob).decode("utf-8") + except UnicodeDecodeError: + if not crypto.is_configured(): + log.warning("AES key not set; cannot decrypt binary blob tenant=%s channel=%s", tenant_id, channel) + return None + creds = crypto.decrypt(bytes(blob)) + with _lock: + _cache[key] = (now, creds) + return creds + blob = text + if isinstance(blob, str): + if crypto.is_configured() and blob.startswith("enc:"): + creds = crypto.decrypt(blob[4:].encode("utf-8")) + else: + try: + creds = json.loads(blob) + if not crypto.is_configured(): + log.info("loaded plaintext credentials tenant=%s channel=%s (set GATEWAY_AES_KEY for encryption)", tenant_id, channel) + except Exception as e: + log.warning("credentials blob not JSON for tenant=%s channel=%s: %s", tenant_id, channel, e) + return None + else: + log.warning("credentials_encrypted for tenant=%s channel=%s is unsupported type %s", tenant_id, channel, type(blob).__name__) + return None + with _lock: + _cache[key] = (now, creds) + return creds + except Exception as e: + log.error("load_credentials failed tenant=%s channel=%s: %s", tenant_id, channel, e) + return None + + +def invalidate(tenant_id: int, channel: str) -> None: + with _lock: + _cache.pop((tenant_id, channel), None) diff --git a/gateway/crypto.py b/gateway/crypto.py new file mode 100644 index 0000000..73e2120 --- /dev/null +++ b/gateway/crypto.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +"""AES-256-GCM credential encryption. + +The 32-byte key is loaded from ``GATEWAY_AES_KEY`` (base64, 32 bytes raw). + +Format on disk (VARBINARY column): + nonce (12 bytes) || ciphertext_with_tag + +Plaintext is the JSON of ``{access_token, refresh_token, ...}`` per channel. +""" + +from __future__ import annotations + +import base64 +import json +import logging +import os +from typing import Any + +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + +log = logging.getLogger("chathub.gateway.crypto") + + +def _key() -> bytes: + raw = os.environ.get("GATEWAY_AES_KEY", "") + if not raw: + raise RuntimeError( + "GATEWAY_AES_KEY not set — refusing to encrypt/decrypt credentials" + ) + try: + decoded = base64.b64decode(raw, validate=True) + except Exception as e: + raise RuntimeError(f"GATEWAY_AES_KEY not valid base64: {e}") from None + if len(decoded) != 32: + raise RuntimeError( + f"GATEWAY_AES_KEY must decode to 32 bytes, got {len(decoded)}" + ) + return decoded + + +def encrypt(plaintext_obj: dict | str) -> bytes: + """Encrypt a dict (or string) under AES-256-GCM. Returns nonce||ct.""" + plaintext = ( + plaintext_obj + if isinstance(plaintext_obj, str) + else json.dumps(plaintext_obj, ensure_ascii=False, sort_keys=True) + ) + nonce = os.urandom(12) + return nonce + AESGCM(_key()).encrypt(nonce, plaintext.encode("utf-8"), None) + + +def decrypt(blob: bytes) -> dict: + """Decrypt a nonce||ct blob back to a dict.""" + if len(blob) < 12 + 16: # nonce + min GCM tag + raise ValueError("ciphertext too short") + nonce, ct = blob[:12], blob[12:] + raw = AESGCM(_key()).decrypt(nonce, ct, None) + return json.loads(raw.decode("utf-8")) + + +def is_configured() -> bool: + """Check whether a usable key is present. Used by callers to short-circuit.""" + try: + _key() + return True + except RuntimeError: + return False diff --git a/gateway/jd.py b/gateway/jd.py new file mode 100644 index 0000000..5673703 --- /dev/null +++ b/gateway/jd.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +"""JD (jingdong.com) union open platform adapter. + +Endpoint: https://api.jd.com/routerjson +Auth: app_key + app_secret + access_token (LWC OAuth 2.0) +Sign: MD5(app_secret + sorted(k1v1k2v2...) + app_secret) uppercased + +Methods: + jd.union.open.goods.promotiongoodsinfo.query by SKU ID + jd.union.open.goods.query by keyword + +cred shape: + {"app_key": "...", "app_secret": "...", "access_token": "...", "site_id": "..."} + +query shape: + {"sku": "100012345678"} -> goods.promotiongoodsinfo.query + {"keyword": "iPhone 15"} -> goods.query +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import time +from typing import Any +from urllib.parse import urlencode + +import httpx + +from .base import UnifiedResult + +log = logging.getLogger("chathub.gateway.jd") + +API_URL = "https://api.jd.com/routerjson" +SKU_QUERY_METHOD = "jd.union.open.goods.promotiongoodsinfo.query" +KEYWORD_QUERY_METHOD = "jd.union.open.goods.query" + + +def _json_dumps(obj: Any) -> str: + return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) + + +def _sign(app_secret: str, params: dict[str, str]) -> str: + """JD sign: app_secret + sorted(k1v1k2v2...) + app_secret, MD5, uppercase.""" + pieces = "".join(f"{k}{params[k]}" for k in sorted(params.keys())) + return hashlib.md5((app_secret + pieces + app_secret).encode("utf-8")).hexdigest().upper() + + +def _parse_sku_response(payload: dict, sku: str) -> UnifiedResult: + inner = payload.get("jd_union_open_goods_promotiongoodsinfo_query_response") or {} + result_str = inner.get("result", "{}") + try: + result = json.loads(result_str) if isinstance(result_str, str) else result_str + except Exception: + result = {} + data = result.get("data") or {} + if not data: + return UnifiedResult(status="error", error=f"sku {sku} not found", channel="jd") + price_info = data.get("priceInfo") or {} + img_info = data.get("imageInfo") or {} + base = data.get("baseInfo") or {} + return UnifiedResult( + status="success", + data={ + "title": base.get("name") or data.get("skuName") or f"SKU {sku}", + "price": price_info.get("price") or price_info.get("lowestPrice"), + "currency": "CNY", + "url": data.get("url") or f"https://item.jd.com/{sku}.html", + "image": (img_info.get("imageList") or [None])[0], + "in_stock": (data.get("stockState") or 1) != 0, + }, + channel="jd", + ) + + +def _parse_keyword_response(payload: dict) -> UnifiedResult: + inner = payload.get("jd_union_open_goods_query_response") or {} + result_str = inner.get("result", "{}") + try: + result = json.loads(result_str) if isinstance(result_str, str) else result_str + except Exception: + result = {} + items = result.get("data") or [] + if not items: + return UnifiedResult(status="error", error="no items for keyword", channel="jd") + out = [] + for it in items[:3]: + price_info = it.get("priceInfo") or {} + out.append({ + "title": it.get("skuName") or "(无标题)", + "price": price_info.get("price"), + "currency": "CNY", + "url": it.get("url") or "", + }) + return UnifiedResult(status="success", data=out, channel="jd") + + +async def fetch(creds: dict, query: dict) -> UnifiedResult: + sku = query.get("sku") + keyword = query.get("keyword") + + app_key = creds.get("app_key") or creds.get("app_id") + app_secret = creds.get("app_secret") + access_token = creds.get("access_token") or creds.get("refresh_token") + + if not app_key or not app_secret: + return UnifiedResult( + status="no_creds", + error="missing app_key/app_secret (set them via channelAuth)", + channel="jd", + ) + if not access_token: + return UnifiedResult( + status="no_creds", + error="missing access_token (use refresh_token via LWC OAuth to obtain)", + channel="jd", + ) + + if sku: + method = SKU_QUERY_METHOD + biz = {"skuIds": [str(sku)]} + elif keyword: + method = KEYWORD_QUERY_METHOD + biz = {"keyword": str(keyword), "pageSize": 3} + else: + return UnifiedResult(status="error", error="missing sku or keyword", channel="jd") + + public_params = { + "method": method, + "app_key": app_key, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "format": "json", + "v": "2.0", + "access_token": access_token, + "param_json": _json_dumps(biz), + } + public_params["sign"] = _sign(app_secret, public_params) + + try: + async with httpx.AsyncClient(timeout=8.0) as client: + r = await client.post( + API_URL, + data=urlencode(public_params), + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + r.raise_for_status() + payload = r.json() + except httpx.HTTPStatusError as e: + return UnifiedResult( + status="error", + error=f"HTTP {e.response.status_code}: {e.response.text[:200]}", + channel="jd", + ) + except Exception as e: + return UnifiedResult(status="error", error=str(e)[:200], channel="jd") + + jd_code = str(payload.get("code", "")) + if jd_code not in ("200", "0", ""): + return UnifiedResult( + status="error", + error=f"JD code={jd_code} message={payload.get('message') or payload.get('error_response', '')}", + channel="jd", + ) + + if sku: + return _parse_sku_response(payload, sku) + return _parse_keyword_response(payload) diff --git a/gateway/loop.py b/gateway/loop.py new file mode 100644 index 0000000..bf9265b --- /dev/null +++ b/gateway/loop.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +"""Background asyncio event loop running in a daemon thread. + +Sync code (chatwoot_ws_agent) submits coroutines via ``gateway_loop.run(coro)`` +and blocks on the result. All blocking I/O for the 3rd-party platforms happens +on this loop, so the WS Agent's main thread never stalls. +""" + +from __future__ import annotations + +import asyncio +import logging +import threading +from concurrent.futures import TimeoutError as FutTimeout +from typing import Any, Coroutine + +log = logging.getLogger("chathub.gateway.loop") + + +class BackgroundLoop: + """One asyncio loop in a daemon thread, exposed as a sync facade. + + Lifecycle: + loop = BackgroundLoop() + loop.start() # call once at process start + loop.run(coro, 5) # block on a coroutine + loop.stop() # at shutdown + """ + + def __init__(self, name: str = "gateway-loop") -> None: + self.name = name + self.loop: asyncio.AbstractEventLoop | None = None + self._thread: threading.Thread | None = None + self._ready = threading.Event() + self._closed = False + + def start(self) -> None: + if self._thread is not None: + return + self._thread = threading.Thread( + target=self._runner, daemon=True, name=self.name + ) + self._thread.start() + if not self._ready.wait(timeout=5.0): + raise RuntimeError(f"{self.name} failed to start in 5s") + log.info("Background loop %s started", self.name) + + def _runner(self) -> None: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self._ready.set() + try: + self.loop.run_forever() + finally: + self.loop.close() + + def run(self, coro: Coroutine, timeout: float = 30.0) -> Any: + """Submit a coroutine from sync code, block on result. + + Raises: + RuntimeError: loop not started + TimeoutError: coroutine exceeded ``timeout`` seconds + Exception: whatever the coroutine raised + """ + if self._closed: + raise RuntimeError("loop is closed") + if not self.loop or not self.loop.is_running(): + raise RuntimeError("loop not started; call .start() first") + future = asyncio.run_coroutine_threadsafe(coro, self.loop) + try: + return future.result(timeout=timeout) + except FutTimeout: + future.cancel() + raise TimeoutError(f"coroutine timed out after {timeout}s") from None + + def stop(self) -> None: + if self._closed or not self.loop: + return + self.loop.call_soon_threadsafe(self.loop.stop) + self._thread.join(timeout=5) + self._closed = True + log.info("Background loop %s stopped", self.name) + + +# Singleton used by router.py and the WS Agent hook. +gateway_loop = BackgroundLoop() diff --git a/gateway/pdd.py b/gateway/pdd.py new file mode 100644 index 0000000..1484f23 --- /dev/null +++ b/gateway/pdd.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +"""PDD (拼多多) DDK open platform adapter. + +Endpoint: https://api.pinduoduo.com/router +Auth: client_id + client_secret + access_token (多多进宝 OAuth) +Sign: MD5(secret + sorted(k1v1k2v2...) + secret) uppercased +Method: pdd.ddk.goods.search (by keyword) / pdd.ddk.goods.detail (by goods_sign) + +cred shape: + {"client_id": "...", "client_secret": "...", "access_token": "..."} + +query shape: + {"keyword": "iPhone 15"} -> goods.search + {"goods_sign": "c9r2omogKFFAc7WB..."} -> goods.detail + {"goods_id": "12345"} -> alias of goods_sign fallback +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import time +from typing import Any +from urllib.parse import urlencode + +import httpx + +from .base import UnifiedResult + +log = logging.getLogger("chathub.gateway.pdd") + +API_URL = "https://api.pinduoduo.com/router" +SEARCH_METHOD = "pdd.ddk.goods.search" +DETAIL_METHOD = "pdd.ddk.goods.detail" + + +def _md5_sign(secret: str, params: dict[str, str]) -> str: + """PDD sign: secret + sorted(k1v1k2v2...) + secret, MD5, uppercase.""" + pieces = "".join(f"{k}{params[k]}" for k in sorted(params.keys())) + return hashlib.md5((secret + pieces + secret).encode("utf-8")).hexdigest().upper() + + +async def fetch(creds: dict, query: dict) -> UnifiedResult: + keyword = query.get("keyword") + goods_sign = query.get("goods_sign") or query.get("sku") + + client_id = creds.get("client_id") or creds.get("app_id") or creds.get("app_key") + client_secret = creds.get("client_secret") or creds.get("app_secret") + access_token = creds.get("access_token") or creds.get("refresh_token") + + if not client_id or not client_secret: + return UnifiedResult( + status="no_creds", + error="missing client_id/client_secret (set them via channelAuth)", + channel="pdd", + ) + if not access_token: + return UnifiedResult( + status="no_creds", + error="missing access_token (obtain via 多多进宝 OAuth authorization)", + channel="pdd", + ) + + if goods_sign: + method = DETAIL_METHOD + biz = {"goods_sign": str(goods_sign)} + elif keyword: + method = SEARCH_METHOD + biz = {"keyword": str(keyword), "page": 1, "page_size": 10} + else: + return UnifiedResult(status="error", error="missing keyword or goods_sign", channel="pdd") + + public_params = { + "type": method, + "client_id": client_id, + "timestamp": str(int(time.time() * 1000)), + "data_type": "JSON", + "version": "V1", + "access_token": access_token, + } + for k, v in biz.items(): + public_params[k] = v + public_params["sign"] = _md5_sign(client_secret, public_params) + + try: + async with httpx.AsyncClient(timeout=8.0) as client: + r = await client.post(API_URL, data=urlencode(public_params), headers={"Content-Type": "application/x-www-form-urlencoded"}) + r.raise_for_status() + payload = r.json() + except httpx.HTTPStatusError as e: + return UnifiedResult( + status="error", + error=f"HTTP {e.response.status_code}: {e.response.text[:200]}", + channel="pdd", + ) + except Exception as e: + return UnifiedResult(status="error", error=str(e)[:200], channel="pdd") + + if goods_sign: + return _parse_detail(payload, goods_sign) + return _parse_search(payload, keyword) + + +def _parse_detail(payload: dict, goods_sign: str) -> UnifiedResult: + inner = payload.get("goods_detail_response") or {} + data = inner.get("goods_details") or [] + if not data: + return UnifiedResult(status="error", error=f"goods_sign {goods_sign} not found", channel="pdd") + g = data[0] + return UnifiedResult( + status="success", + data={ + "title": g.get("goods_name") or f"goods {goods_sign[:10]}", + "price": (g.get("min_group_price") or 0) / 100, + "currency": "CNY", + "url": f"https://mobile.yangkeduo.com/goods.html?goods_id={g.get('goods_id', '')}", + "image": (g.get("goods_image_url") or "").split(",")[0] if g.get("goods_image_url") else None, + "in_stock": (g.get("goods_stock_num") or 0) > 0, + }, + channel="pdd", + ) + + +def _parse_search(payload: dict, keyword: str) -> UnifiedResult: + inner = payload.get("goods_search_response") or {} + items = inner.get("goods_list") or [] + if not items: + return UnifiedResult(status="error", error=f"no items for keyword '{keyword}'", channel="pdd") + out = [] + for g in items[:3]: + out.append({ + "title": g.get("goods_name") or "(无标题)", + "price": (g.get("min_group_price") or 0) / 100, + "currency": "CNY", + "url": f"https://mobile.yangkeduo.com/goods.html?goods_id={g.get('goods_id', '')}", + }) + return UnifiedResult(status="success", data=out, channel="pdd") diff --git a/gateway/router.py b/gateway/router.py new file mode 100644 index 0000000..f44a025 --- /dev/null +++ b/gateway/router.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +"""Synchronous facade for the Gateway library. + +``fetch`` is the entry point used by ``chatwoot_ws_agent.py``. It runs +on the main thread but schedules its work onto ``gateway_loop`` and blocks +on the result. All caching, breaker, and rate-limit logic lives here so +the adapters stay minimal. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import time +from typing import Any + +from . import amazon, jd, taobao, pdd, tiktok +from .base import UnifiedResult +from .breaker import get_breaker, get_tenant_limiter +from .cache import TTLCache +from .credentials import load_credentials +from .loop import gateway_loop + +log = logging.getLogger("chathub.gateway.router") + +_cache = TTLCache(ttl_seconds=60, max_size=1000) +_HANDLERS = { + "amazon": amazon.fetch, + "jd": jd.fetch, + "taobao": taobao.fetch, + "pdd": pdd.fetch, + "tiktok": tiktok.fetch, +} + + +def _query_hash(channel: str, tenant_id: int, query: dict) -> str: + raw = json.dumps({"c": channel, "t": tenant_id, "q": query}, sort_keys=True) + return hashlib.sha256(raw.encode()).hexdigest() + + +async def _call_channel(channel: str, creds: dict, query: dict) -> UnifiedResult: + handler = _HANDLERS.get(channel) + if not handler: + return UnifiedResult(status="error", error=f"unknown channel: {channel}", channel=channel) + breaker = get_breaker(channel) + if not breaker.allow(): + return UnifiedResult(status="breaker_open", error="circuit breaker open", channel=channel) + try: + result = await handler(creds, query) + if result.ok: + breaker.on_success() + else: + breaker.on_failure() + return result + except Exception as e: + breaker.on_failure() + return UnifiedResult(status="error", error=str(e)[:200], channel=channel) + + +async def _async_fetch(channel: str, tenant_id: int, query: dict, timeout: float) -> UnifiedResult: + cache_key = f"{channel}:{tenant_id}:{json.dumps(query, sort_keys=True)}" + cached = _cache.get(cache_key) + if cached is not None: + # mark as cache_hit + hit = UnifiedResult( + status="cache_hit", + data=cached.data, + latency_ms=0, + channel=channel, + ) + return hit + creds = load_credentials(tenant_id, channel) + if not creds: + return UnifiedResult(status="no_creds", error="channel not configured", channel=channel) + limiter = get_tenant_limiter(tenant_id) + await limiter.acquire(str(tenant_id)) + start = time.time() + try: + result = await asyncio.wait_for(_call_channel(channel, creds, query), timeout=timeout) + except asyncio.TimeoutError: + result = UnifiedResult(status="timeout", error=f"timed out after {timeout}s", channel=channel) + result.latency_ms = int((time.time() - start) * 1000) + if result.ok: + _cache.set(cache_key, result) + return result + + +def fetch(channel: str, tenant_id: int, query: dict, timeout: float = 5.0) -> UnifiedResult: + """Synchronous entry point. Returns UnifiedResult. + + Args: + channel: "amazon" | "jd" | "taobao" | "pdd" | "tiktok" + tenant_id: chathub tenant id + query: {"asin"|"sku"|"num_iid"|"goods_id"|"keyword": ...} + timeout: seconds before giving up + """ + if not gateway_loop.loop or not gateway_loop.loop.is_running(): + return UnifiedResult( + status="error", + error="gateway loop not started; call gateway_loop.start() at process boot", + channel=channel, + ) + try: + return gateway_loop.run(_async_fetch(channel, tenant_id, query, timeout), timeout=timeout + 2.0) + except TimeoutError as e: + return UnifiedResult(status="timeout", error=str(e), channel=channel) + except RuntimeError as e: + return UnifiedResult(status="error", error=str(e), channel=channel) + except Exception as e: + log.exception("fetch failed channel=%s tenant=%s", channel, tenant_id) + return UnifiedResult(status="error", error=str(e)[:200], channel=channel) + + +def fetch_all( + tenant_id: int, + query: dict, + channels: list[str] | None = None, + timeout: float = 5.0, +) -> dict[str, UnifiedResult]: + """Fan-out to multiple channels in parallel; returns dict keyed by channel.""" + if channels is None: + channels = list(_HANDLERS.keys()) + + if not gateway_loop.loop or not gateway_loop.loop.is_running(): + err = UnifiedResult( + status="error", + error="gateway loop not started; call gateway_loop.start() at process boot", + channel="*", + ) + return {c: err for c in channels} + + async def _gather() -> list[UnifiedResult]: + coros = [_async_fetch(c, tenant_id, query, timeout) for c in channels] + return await asyncio.gather(*coros, return_exceptions=False) + + try: + results = gateway_loop.run(_gather(), timeout=timeout + 3.0) + except Exception as e: + log.exception("fetch_all failed tenant=%s", tenant_id) + return {c: UnifiedResult(status="error", error=str(e)[:200], channel=c) for c in channels} + return {c: r for c, r in zip(channels, results)} diff --git a/gateway/taobao.py b/gateway/taobao.py new file mode 100644 index 0000000..38e5d41 --- /dev/null +++ b/gateway/taobao.py @@ -0,0 +1,192 @@ +# -*- coding: utf-8 -*- +"""Taobao (淘宝/淘宝客) TOP API adapter. + +Endpoint: https://eco.taobao.com/router/rest +Auth: app_key + app_secret + session_key (OAuth 2.0授权) +Sign: MD5(secret + sorted(k1v1k2v2...) + secret) uppercased +Method: taobao.item.get (基础商品详情 by num_iid) + taobao.tbk.item.search (淘宝客商品搜索 by keyword + adzone_id) + +cred shape: + {"app_key": "...", "app_secret": "...", "session_key": "...", + "adzone_id": "12345", "site_id": "67890"} # adzone_id required for keyword + +query shape: + {"num_iid": "680123456789"} -> item detail (item.get) + {"keyword": "iPhone 15"} -> keyword search (tbk.item.search) +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import time +from typing import Any +from urllib.parse import urlencode + +import httpx + +from .base import UnifiedResult + +log = logging.getLogger("chathub.gateway.taobao") + +API_URL = "https://eco.taobao.com/router/rest" +ITEM_GET_METHOD = "taobao.item.get" +TBK_SEARCH_METHOD = "taobao.tbk.item.search" + + +def _md5_sign(secret: str, params: dict[str, str]) -> str: + """Taobao sign: secret + sorted(k1v1k2v2...) + secret, MD5, uppercase.""" + pieces = "".join(f"{k}{params[k]}" for k in sorted(params.keys())) + return hashlib.md5((secret + pieces + secret).encode("utf-8")).hexdigest().upper() + + +async def fetch(creds: dict, query: dict) -> UnifiedResult: + num_iid = query.get("num_iid") or query.get("sku") + keyword = query.get("keyword") + + if not num_iid and not keyword: + return UnifiedResult(status="error", error="missing num_iid or keyword", channel="taobao") + + app_key = creds.get("app_key") or creds.get("app_id") + app_secret = creds.get("app_secret") + session_key = creds.get("session_key") or creds.get("access_token") or creds.get("refresh_token") + + if not app_key or not app_secret: + return UnifiedResult( + status="no_creds", + error="missing app_key/app_secret (set them via channelAuth)", + channel="taobao", + ) + + if keyword and not num_iid: + adzone_id = creds.get("adzone_id") + if not adzone_id: + return UnifiedResult( + status="no_creds", + error="missing adzone_id in creds JSON (taobao.tbk.item.search requires 推广位; add via channelAuth adzone_id field, or set creds['adzone_id'])", + channel="taobao", + ) + return await _tbk_search(app_key, app_secret, session_key, adzone_id, creds.get("site_id"), keyword) + + if not session_key: + return UnifiedResult( + status="no_creds", + error="missing session_key (obtain via Taobao OAuth authorization code grant)", + channel="taobao", + ) + return await _item_get(app_key, app_secret, session_key, num_iid) + + +async def _item_get(app_key: str, app_secret: str, session_key: str, num_iid: str) -> UnifiedResult: + public_params = { + "method": ITEM_GET_METHOD, + "app_key": app_key, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "format": "json", + "v": "2.0", + "sign_method": "md5", + "session": session_key, + "num_iid": str(num_iid), + "fields": "num_iid,title,price,promotion_price,num,sales,pic_url,detail_url,nick,props_name,stock", + } + public_params["sign"] = _md5_sign(app_secret, public_params) + payload, err = await _post_taobao(public_params) + if err: + return err + + inner = payload.get("item_get_response") or {} + code = inner.get("code") + if code and int(code) != 0: + return UnifiedResult( + status="error", + error=f"Taobao code={code} msg={inner.get('msg', '')} sub={inner.get('sub_msg', '')}", + channel="taobao", + ) + item = inner.get("item") or {} + if not item: + return UnifiedResult(status="error", error=f"num_iid {num_iid} not found", channel="taobao") + return UnifiedResult( + status="success", + data={ + "title": item.get("title") or f"item {num_iid}", + "price": item.get("promotion_price") or item.get("price"), + "currency": "CNY", + "url": item.get("detail_url") or f"https://item.taobao.com/item.htm?id={num_iid}", + "image": item.get("pic_url"), + "in_stock": (item.get("num") or 0) > 0, + "sales": item.get("sales"), + }, + channel="taobao", + ) + + +async def _tbk_search(app_key: str, app_secret: str, session_key: str | None, + adzone_id: str, site_id: str | None, keyword: str) -> UnifiedResult: + public_params = { + "method": TBK_SEARCH_METHOD, + "app_key": app_key, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + "format": "json", + "v": "2.0", + "sign_method": "md5", + "q": keyword, + "adzone_id": str(adzone_id), + "page_size": "3", + "sort": "total_sales_des", + } + if session_key: + public_params["session"] = session_key + if site_id: + public_params["site_id"] = str(site_id) + public_params["sign"] = _md5_sign(app_secret, public_params) + payload, err = await _post_taobao(public_params) + if err: + return err + + inner = payload.get("tbk_item_search_response") or {} + code = inner.get("code") + if code and int(code) != 0: + return UnifiedResult( + status="error", + error=f"Taobao code={code} msg={inner.get('msg', '')} sub={inner.get('sub_msg', '')}", + channel="taobao", + ) + results = (inner.get("results") or {}).get("n_results") or [] + if not results: + results = inner.get("result_list") or inner.get("results") or [] + if not results: + return UnifiedResult(status="error", error=f"no items for keyword '{keyword}'", channel="taobao") + items = [] + for r in results[:3]: + if isinstance(r, dict) and "item" in r: + r = r["item"] + items.append({ + "title": r.get("title") or "(无标题)", + "price": r.get("zk_final_price") or r.get("price") or r.get("reserve_price"), + "currency": "CNY", + "url": r.get("item_url") or r.get("url") or r.get("click_url") or "", + "image": r.get("pict_url") or r.get("pic_url"), + }) + return UnifiedResult(status="success", data=items, channel="taobao") + + +async def _post_taobao(public_params: dict) -> tuple[dict | None, UnifiedResult | None]: + try: + async with httpx.AsyncClient(timeout=8.0) as client: + r = await client.post( + API_URL, + data=urlencode(public_params), + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + r.raise_for_status() + return r.json(), None + except httpx.HTTPStatusError as e: + return None, UnifiedResult( + status="error", + error=f"HTTP {e.response.status_code}: {e.response.text[:200]}", + channel="taobao", + ) + except Exception as e: + return None, UnifiedResult(status="error", error=str(e)[:200], channel="taobao") diff --git a/gateway/tiktok.py b/gateway/tiktok.py new file mode 100644 index 0000000..5d6a2ee --- /dev/null +++ b/gateway/tiktok.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +"""TikTok/Douyin (抖音) open platform adapter. + +Endpoint: https://open.douyin.com/ (multiple paths) +Auth: client_key + client_secret + access_token (OAuth 2.0) +Sign: HMAC-SHA256 (different from MD5 platforms) +Method: /goods/detail (by goods_id) -- requires video/goods scope + +cred shape: + {"client_key": "...", "client_secret": "...", "access_token": "..."} + +query shape: + {"goods_id": "12345"} -> /goods/detail + {"sku": "12345"} -> alias +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import logging +import time +from typing import Any +from urllib.parse import urlencode, quote + +import httpx + +from .base import UnifiedResult + +log = logging.getLogger("chathub.gateway.tiktok") + +OAUTH_TOKEN_URL = "https://open.douyin.com/oauth/access_token/" +GOODS_DETAIL_URL = "https://open.douyin.com/goods/detail" + + +def _hmac_sign(secret: str, message: str) -> str: + """Douyin HMAC-SHA256 hex (lowercase).""" + return hmac.new(secret.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).hexdigest() + + +async def fetch(creds: dict, query: dict) -> UnifiedResult: + goods_id = query.get("goods_id") or query.get("sku") + + client_key = creds.get("client_key") or creds.get("app_id") or creds.get("app_key") + client_secret = creds.get("client_secret") or creds.get("app_secret") + access_token = creds.get("access_token") or creds.get("refresh_token") + + if not client_key or not client_secret: + return UnifiedResult( + status="no_creds", + error="missing client_key/client_secret (set them via channelAuth)", + channel="tiktok", + ) + if not access_token: + return UnifiedResult( + status="no_creds", + error="missing access_token (obtain via 抖音 OAuth authorization; 2hr TTL, refresh via refresh_token)", + channel="tiktok", + ) + if not goods_id: + return UnifiedResult(status="error", error="missing goods_id", channel="tiktok") + + params = { + "access_token": access_token, + "goods_id": str(goods_id), + "app_id": client_key, + } + param_json = json.dumps({"goods_id": str(goods_id)}, ensure_ascii=False, separators=(",", ":")) + base_string = f"app_id={client_key}&goods_id={goods_id}&access_token={access_token}" + signature = _hmac_sign(client_secret, base_string) + + try: + async with httpx.AsyncClient(timeout=8.0) as client: + r = await client.post( + GOODS_DETAIL_URL, + params={"access_token": access_token, "app_id": client_key, "goods_id": str(goods_id), "sign": signature}, + ) + r.raise_for_status() + payload = r.json() + except httpx.HTTPStatusError as e: + return UnifiedResult( + status="error", + error=f"HTTP {e.response.status_code}: {e.response.text[:200]}", + channel="tiktok", + ) + except Exception as e: + return UnifiedResult(status="error", error=str(e)[:200], channel="tiktok") + + err_code = str(payload.get("err_no", payload.get("code", ""))) + if err_code not in ("", "0"): + return UnifiedResult( + status="error", + error=f"Douyin err_no={err_code} msg={payload.get('message', payload.get('errmsg', ''))}", + channel="tiktok", + ) + + data = payload.get("data") or payload.get("goods_detail") or {} + if not data: + return UnifiedResult(status="error", error=f"goods_id {goods_id} not found", channel="tiktok") + + price = data.get("price") or data.get("min_price") + if price and isinstance(price, str): + try: + price = float(price) / 100 + except (ValueError, TypeError): + pass + + return UnifiedResult( + status="success", + data={ + "title": data.get("title") or data.get("goods_name") or f"goods {goods_id}", + "price": price, + "currency": "CNY", + "url": data.get("share_url") or data.get("detail_url") or f"https://haohuo.jinritemai.com/GoodsDetail?goods_id={goods_id}", + "image": (data.get("cover") or {}).get("url") if isinstance(data.get("cover"), dict) else data.get("cover"), + "in_stock": (data.get("stock") or 0) > 0, + "sales": data.get("sales"), + }, + channel="tiktok", + ) diff --git a/start_provision_v2.sh b/start_provision_v2.sh new file mode 100755 index 0000000..64c9f53 --- /dev/null +++ b/start_provision_v2.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# Force correct env, then exec provision server +unset CW_BASE CW_INTERNAL CW_PLATFORM_TOKEN CW_ADMIN_EMAIL CW_ADMIN_PASSWORD CW_ACCOUNT_ID +export CW_BASE='http://chatwoot-chatwoot-1:3000' +export CW_INTERNAL='http://chatwoot-chatwoot-1:3000' +export CW_PLATFORM_TOKEN='csFwGySM0589tkhZHcLGJjfKLtYSgCGpcup9HSJZ9yE' +export CW_ADMIN_EMAIL='qiuzhida@greatqiu.cn' +export CW_ADMIN_PASSWORD='Qaly8980+' +export CW_ACCOUNT_ID='1' +export CHATHUB_API_KEY='chathub-default-key-change-me' +export GATEWAY_AES_KEY='uUjrtW3+w/rlBmGBOPv6rn7mP264bnOefkiQE9EL+X8=' +export CHATHUB_DB_HOST='mysql' +export CHATHUB_DB_PORT='3306' +export CHATHUB_DB_USER='root' +export CHATHUB_DB_PASS='mysql_Py5N2W' +export CHATHUB_DB_NAME='chathub' +cd /app/working/workspaces/wordpress +exec python3 /app/working/workspaces/wordpress/provision_server.py 5566