From e608d6ba1c8d4b9b2ac54672a0eac2bd3fa4665e Mon Sep 17 00:00:00 2001 From: Chatwoot AI Agent Dev Date: Fri, 5 Jun 2026 05:16:36 +0000 Subject: [PATCH] v1.7: session-id mapping + conversation summary + contact profiling + exponential backoff reconnect --- CHANGELOG.md | 27 ++++++ README.md | 1 + chatwoot_ws_agent.py | 223 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 222 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3810ab8..b2eea37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,32 @@ # Changelog +## v1.7 (2026-06-05) — 对话上下文 + 客户画像 + 指数退避重连 + +### 新增 +- **`--session-id` 对话上下文** — WS Agent 维护 `conv_id → session_id` 映射,每次调 `qwenpaw agents chat` 时传入 `--session-id`,AI 获得完整对话历史 + - 同一会话的连续消息不再断开上下文,AI 知道"刚才说过什么" + - 持久化到状态文件,重启不丢失 + - 自动清理超过 10000 条的大映射表 +- **对话摘要** — 每 15 轮 AI 回复后自动调用 AI 压缩历史,生成 1-2 句话摘要 + - 下次请求时将摘要注入 prompt 开头,减少 token 消耗 + - 长对话 AI 仍能记住核心信息(客户需求、讨论过的产品) +- **客户画像** — 维护 `contact_id → 画像` 映射(姓名、最近 3 次交互记录) + - 每次 AI 回复后自动更新画像 + - 下次对话自动注入客户历史上下文 + - 持久化到状态文件,重启不丢失 +- **WebSocket 指数退避重连** — 断线重连从固定 5s 改为指数退避 + - 初始 5s → 10s → 20s → 40s → 最大 60s + - 重连前自动续期 Chatwoot session(防止长时间断线后 token 过期) + - 失败时继续尝试,不会停止 + +### 文件 +- `chatwoot_ws_agent.py` 从 1294 行增至 1459 行(+165 行) +- 新增 9 个函数:`_get_or_create_session`、`_prune_sessions`、`_summarize_conversation`、`_get_conversation_context`、`_update_contact_profile`、`_get_contact_context` +- 修改 4 个函数:`call_qwenpaw_ai`(+session_id)、`generate_ai_reply`(+session_id)、`handle_incoming_message`(+3 层 context)、`save_state/load_state`(+3 个持久化字段) +- 重写 1 个函数:`WSAgent.start`(指数退避重连) + +--- + ## v1.6 (2026-06-05) — Platform Gateway + 5 平台 API 集成 ### 新增 diff --git a/README.md b/README.md index 539591a..c5d7337 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,7 @@ chatwoot-ai-agent/ | v1.4 | 多租户架构,Provision Server,状态持久化,安全性重构 | | v1.5 | 消息防抖(5s 累积合并),AI 错误重试(指数退避)| | v1.6 | Platform Gateway 库——Amazon/JD/Taobao/PDD/TikTok 5 平台统一 API 集成 | +| v1.7 | 对话上下文(`--session-id`)+ 对话摘要 + 客户画像 + WebSocket 指数退避重连 | ## 许可证 diff --git a/chatwoot_ws_agent.py b/chatwoot_ws_agent.py index ff01a9d..b990529 100644 --- a/chatwoot_ws_agent.py +++ b/chatwoot_ws_agent.py @@ -484,6 +484,16 @@ def save_state(): "ai_pending_convs": list(_ai_pending_convs), "saved_at": time.time() } + # Add session map + with _conv_session_lock: + state["conv_session_map"] = _conv_session_map.copy() + # Add summaries & rounds + with _conv_conv_lock: + state["conv_summaries"] = _conv_summaries.copy() + state["conv_rounds"] = _conv_rounds.copy() + # Add contact profiles + with _contact_profile_lock: + state["contact_profiles"] = _contact_profiles.copy() STATE_FILE.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8") except Exception as e: log(f"save_state error: {e}", "WARN") @@ -511,8 +521,20 @@ def load_state(): human_active_convs = state.get("human_active_convs", {}) with _ai_pending_lock: _ai_pending_convs = set(state.get("ai_pending_convs", [])) + # Restore session map + with _conv_session_lock: + _conv_session_map.update(state.get("conv_session_map", {})) + # Restore summaries & rounds + with _conv_conv_lock: + _conv_summaries.update(state.get("conv_summaries", {})) + _conv_rounds.update(state.get("conv_rounds", {})) + # Restore contact profiles + with _contact_profile_lock: + _contact_profiles.update(state.get("contact_profiles", {})) - log(f"State restored: {len(ai_sent_msg_ids)} msg_ids, {len(human_active_convs)} active convs, {len(_ai_pending_convs)} pending") + log(f"State restored: {len(ai_sent_msg_ids)} msg_ids, {len(human_active_convs)} active convs, " + f"{len(_conv_session_map)} sessions, {len(_conv_summaries)} summaries, " + f"{len(_contact_profiles)} profiles") except Exception as e: log(f"load_state error: {e} (starting fresh)", "WARN") STATE_FILE.unlink(missing_ok=True) @@ -527,13 +549,105 @@ _debounce_msgs: dict[int, list[str]] = {} # conv_id → accumulated messages _debounce_lock = Lock() +# ===== SESSION-ID MAPPING (per-conversation QwenPaw context) ===== +_conv_session_map: dict[int, str] = {} # conv_id → qwenpaw session_id +_conv_session_lock = Lock() + + +def _get_or_create_session(conv_id): + """Return existing session_id for a conversation, or create a new one.""" + with _conv_session_lock: + sid = _conv_session_map.get(conv_id) + if not sid: + sid = f"conv_{conv_id}_{int(time.time())}" + _conv_session_map[conv_id] = sid + return sid + + +def _prune_sessions(): + """Trim session map when it grows too large.""" + with _conv_session_lock: + if len(_conv_session_map) > 10000: + _conv_session_map.clear() + log("🧹 Session map too large, cleared", "WARN") + + +# ===== CONVERSATION SUMMARIZATION ===== +_conv_rounds: dict[int, int] = {} # conv_id → AI reply count +_conv_summaries: dict[int, str] = {} # conv_id → latest AI-generated summary +_conv_conv_lock = Lock() +SUMMARY_THRESHOLD = 15 # generate summary after every N AI replies + + +def _summarize_conversation(conv_id, prompt, reply, target_agent): + """Ask AI to produce a short summary of this conversation so far.""" + with _conv_conv_lock: + old_summary = _conv_summaries.get(conv_id, "") + summary_prompt = ( + f"Existing summary: {old_summary}\n\n" + f"Latest customer message: {prompt[:200]}\n" + f"Your reply: {reply[:500]}\n\n" + "Produce a concise 1-2 sentence summary (in English) of the " + "entire conversation so far. Focus on customer needs, products " + "discussed, and any decisions made. Keep it under 100 words." + ) + summary = call_qwenpaw_ai(summary_prompt, target_agent=target_agent) + if summary: + with _conv_conv_lock: + _conv_summaries[conv_id] = summary + + +def _get_conversation_context(conv_id): + """Return summary string for this conv, or empty string.""" + with _conv_conv_lock: + return _conv_summaries.get(conv_id, "") + + +# ===== CONTACT PROFILING ===== +_contact_profiles: dict[int, dict] = {} # contact_id → profile dict +_contact_profile_lock = Lock() + + +def _update_contact_profile(contact_id, sender_name, msg, reply): + """Extract relevant info from conversation and update contact profile.""" + if not contact_id: + return + with _contact_profile_lock: + profile = _contact_profiles.get(contact_id, {}) + profile["name"] = sender_name + profile["last_seen"] = time.time() + # Keep a rolling log of last 3 interactions + interactions = profile.get("interactions", []) + interactions.append({"msg": msg[:100], "reply": reply[:100], "ts": time.time()}) + profile["interactions"] = interactions[-3:] + _contact_profiles[contact_id] = profile + + +def _get_contact_context(contact_id): + """Return a prompt-friendly profile string for this contact.""" + if not contact_id: + return "" + with _contact_profile_lock: + profile = _contact_profiles.get(contact_id) + if not profile: + return "" + parts = [f"Contact name: {profile.get('name', 'Unknown')}"] + interactions = profile.get("interactions", []) + if interactions: + parts.append(f"Last {len(interactions)} interaction(s):") + for i, ix in enumerate(interactions): + parts.append(f" [{i+1}] \"{ix['msg']}\" → \"{ix['reply'][:80]}\"") + return "\n".join(parts) + + # ===== AI HELPERS ===== import re as _re # used for session-id stripping GATEWAY_ENABLED = os.environ.get("GATEWAY_ENABLED", "1") == "1" -def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", retries=2): +def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", + retries=2, session_id=None): """Call QwenPaw AI via agents chat, with retries on failure. Args: @@ -541,18 +655,19 @@ def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", r system_prompt: Optional system instructions (appended before prompt) target_agent: Which agent to call (default:sourcing-agent, translation:default) retries: Number of retries on failure (exponential backoff) + session_id: Optional QwenPaw session ID for conversation continuity """ full_prompt = (system_prompt + "\n\n" + prompt) if system_prompt else prompt + cmd = ["qwenpaw", "agents", "chat", + "--from-agent", "wordpress", + "--to-agent", target_agent, + "--text", full_prompt] + if session_id: + cmd.extend(["--session-id", session_id]) last_error = None for attempt in range(1 + retries): try: - result = subprocess.run( - ["qwenpaw", "agents", "chat", - "--from-agent", "wordpress", - "--to-agent", target_agent, - "--text", full_prompt], - capture_output=True, text=True, timeout=90 - ) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=90) if result.returncode == 0 and result.stdout.strip(): # Clean output: remove INFO/SESSION/Agent lines raw = result.stdout.strip() @@ -594,7 +709,7 @@ def translate_to_chinese(text): result = call_qwenpaw_ai(prompt, target_agent="default") return result if result else text -def generate_ai_reply(customer_msg, sender_name, inbox_id, context=""): +def generate_ai_reply(customer_msg, sender_name, inbox_id, context="", session_id=None): config = INBOX_CONFIG.get(inbox_id) if not config: log(f"No config for inbox #{inbox_id}, skipping", "WARN") @@ -606,7 +721,8 @@ def generate_ai_reply(customer_msg, sender_name, inbox_id, context=""): customer_msg=body ) reply = call_qwenpaw_ai(prompt, config["system_prompt"], - target_agent=config["target_agent"]) + target_agent=config["target_agent"], + session_id=session_id) if not reply: return reply @@ -826,12 +942,47 @@ def handle_incoming_message(msg_data, headers): log(f"📩 [{config['name']}] New msg #{msg_id} from '{sender_name}' in conv #{conv_id}: {content[:100]}", inbox_name=config['name']) + # ===== SESSION-ID: maintain conversation continuity ===== + session_id = _get_or_create_session(conv_id) + # ===== CONVERSATION SUMMARY: inject past context ===== + conv_context = _get_conversation_context(conv_id) + # ===== CONTACT PROFILE: inject customer history ===== + contact_id = msg_data.get("sender", {}).get("id") + contact_context = _get_contact_context(contact_id) + + # Build enriched context + context_parts = [] + gateway_ctx = _enrich_context(inbox_id, content, config) + if gateway_ctx: + context_parts.append(gateway_ctx) + if conv_context: + context_parts.append(f"[Conversation so far: {conv_context}]") + if contact_context: + context_parts.append(f"[Customer history: {contact_context}]") + all_context = "\n".join(context_parts) + # Generate AI reply with inbox-specific config log(f"🤖 [{config['name']}] Generating AI reply...", inbox_name=config['name']) start_time = time.time() - context = _enrich_context(inbox_id, content, config) - reply = generate_ai_reply(content, sender_name, inbox_id, context=context) + reply = generate_ai_reply(content, sender_name, inbox_id, + context=all_context, session_id=session_id) duration_ms = (time.time() - start_time) * 1000 + + if not reply: + log(f"⚠️ [{config['name']}] Empty AI reply, skipping", "WARN", config['name']) + metrics.record_reply(inbox_id, config['name'], False, duration_ms) + return + + log(f"💬 [{config['name']}] AI reply ({duration_ms:.0f}ms): {reply[:150]}", inbox_name=config['name']) + + # ===== POST-REPLY: update conversation summary & contact profile ===== + with _conv_conv_lock: + rounds = _conv_rounds.get(conv_id, 0) + 1 + _conv_rounds[conv_id] = rounds + if rounds > 0 and rounds % SUMMARY_THRESHOLD == 0: + # Generate summary every SUMMARY_THRESHOLD AI replies + _summarize_conversation(conv_id, content, reply, config["target_agent"]) + _update_contact_profile(contact_id, sender_name, content, reply) if not reply: log(f"⚠️ [{config['name']}] Empty AI reply, skipping", "WARN", config['name']) @@ -1021,25 +1172,39 @@ class WSAgent: metrics.ws_disconnected(f"status={status}, msg={msg}") def start(self): - """Start WebSocket connection (blocking).""" + """Start WebSocket connection with exponential backoff reconnect.""" self.headers = ensure_session() - log(f"Connecting to {CW_WS_URL}") + delay = 5 # start at 5s + max_delay = 60 + attempt = 0 - self.ws = websocket.WebSocketApp( - CW_WS_URL, - on_open=self.on_open, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close - ) + while self.running.is_set(): + if attempt > 0: + log(f"🔄 Reconnecting in {delay}s (attempt #{attempt})...") + time.sleep(delay) + delay = min(delay * 2, max_delay) # 5 → 10 → 20 → 40 → 60 + # Re-fetch session headers in case they expired during downtime + try: + self.headers = ensure_session() + except Exception as e: + log(f"Session renewal failed before reconnect: {e}", "WARN") + continue - # Run forever (blocks) - self.ws.run_forever( - sslopt={"cert_reqs": ssl.CERT_NONE}, - ping_interval=15, - ping_timeout=5, - reconnect=5 # built-in reconnect - ) + log(f"Connecting to {CW_WS_URL}") + self.ws = websocket.WebSocketApp( + CW_WS_URL, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close + ) + self.ws.run_forever( + sslopt={"cert_reqs": ssl.CERT_NONE}, + ping_interval=15, + ping_timeout=5, + reconnect=0 # disable internal reconnect; we handle it + ) + attempt += 1 def stop(self): """Stop the WebSocket connection."""