diff --git a/chatwoot_ws_agent.py b/chatwoot_ws_agent.py new file mode 100644 index 0000000..9d2867a --- /dev/null +++ b/chatwoot_ws_agent.py @@ -0,0 +1,737 @@ +#!/usr/bin/env python3 +""" +Chatwoot WebSocket Agent — Real-time AI Auto-Reply with Human Handoff +======================================================================= +Connects to Chatwoot ActionCable WebSocket for instant message delivery, +generates AI replies, and sends Chinese private notes. + +Features: + - Real-time AI auto-reply to customer messages + - Human handoff: detects human agent replies and backs off + - AI-initiated handoff: AI suggests human intervention when needed + - Timeout recovery: AI resumes after human inactivity (configurable) + - Status-based recovery: AI resumes when conversation is resolved/pending + +Usage: + python3 chatwoot_ws_agent.py # Run (daemon mode) + python3 chatwoot_ws_agent.py --renew # Force renew session & exit + python3 chatwoot_ws_agent.py --test-ws # Test WebSocket connection & exit + +Requirements: + pip install websocket-client +""" + +import os, sys, json, time, subprocess, argparse, ssl, signal +import requests +import websocket +from pathlib import Path +from datetime import datetime, timezone, timedelta +from threading import Thread, Event, Lock + +# ===== CONFIG ===== +CW_BASE = "https://chatwoot.275763.xyz" +CW_WS_URL = CW_BASE.replace("https://", "wss://").replace("http://", "ws://") + "/cable" +CW_ACCOUNT_ID = 1 +CW_AUTH_URL = f"{CW_BASE}/auth/sign_in" +CW_API_BASE = f"{CW_BASE}/api/v1/accounts/{CW_ACCOUNT_ID}" + +SCRIPT_DIR = Path(__file__).parent +AUTH_FILE = SCRIPT_DIR / "chatwoot_auth.json" +PROCESSED_FILE = SCRIPT_DIR / ".chatwoot_ws_processed.json" + +# ===== INBOX ROUTING CONFIG ===== +# Map inbox_id → AI agent config +INBOX_CONFIG = { + 1: { + "name": "GreatQiu", + "target_agent": "sourcing-agent", + "system_prompt": ( + "You are a professional China sourcing agent from GreatQiu (based in Shaoxing, Zhejiang, China). " + "You help international clients with product sourcing, supplier verification, " + "quality control, logistics, and supply chain management.\n\n" + "IMPORTANT - Decide if you can fully handle this or need a human:\n" + "- If the customer asks about specific PRICING, MOQ, PLACING ORDERS, " + "CUSTOMIZATION, SHIPPING QUOTES, or COMPLEX TECHNICAL SPECS that require " + "real-time data from suppliers → end your reply with [HANDOFF] on a new line.\n" + "- If you can answer the question fully using your general knowledge " + "(company info, services, processes, general timelines) → do NOT add [HANDOFF]." + ), + "prompt_template": ( + "A customer named '{sender_name}' sent this message:\n\n" + "{customer_msg}\n\n" + "Write a direct reply (no preamble, no markdown). Keep it concise (2-4 sentences). " + "Use the same language as the customer. " + "Always sign with '- GreatQiu Team'." + ), + "note_prefix": "🤖 AI 自动回复 (GreatQiu)", + "signature": "- GreatQiu Team", + }, + 7: { + "name": "HALO Blog", + "target_agent": "halo-blog-agent", + "system_prompt": ( + "你是一名专业的安防弱电与IT基础设施技术顾问,来自Q师傅知识库。" + "你帮助客户解答关于交换机配置、监控系统、网络布线、弱电工程、服务器运维等技术问题。\n\n" + "重要 - 判断是否需要人工介入:\n" + "- 如果客户询问具体的项目报价、施工方案定制、现场勘查需求、" + "或需要实际采购产品 → 在回复末尾加一行 [HANDOFF]。\n" + "- 如果是回答一般性的技术问题(设备参数、配置方法、故障排查思路等)→ 不加 [HANDOFF]。" + ), + "prompt_template": ( + "客户 '{sender_name}' 发送了以下消息:\n\n" + "{customer_msg}\n\n" + "请用中文直接回复(不要前缀,不要Markdown)。保持简洁(2-4句话)。" + "回复语气专业友善,体现技术专家的可靠性。" + "以'- Q师傅知识库'署名。" + ), + "note_prefix": "🤖 AI 自动回复 (Q师傅知识库)", + "signature": "- Q师傅知识库", + } +} + +# Agent identity (from /api/v1/profile response) +PUBSUB_TOKEN = "JQ3wQYDy6LUMwvHouKKV2scr" +USER_ID = 1 + +# Login credentials for auto-renewal +CW_EMAIL = os.environ.get("CW_EMAIL", "qiuzhida@greatqiu.cn") +CW_PASSWORD = os.environ.get("CW_PASSWORD", "Qaly8980+") + +RENEW_THRESHOLD = timedelta(hours=6) +TZ = timezone(timedelta(hours=8)) + +# ===== HUMAN HANDOFF CONFIG ===== +# If a human agent has replied in this conversation but hasn't replied +# for HUMAN_TIMEOUT_MINUTES, the AI resumes auto-replying. +HUMAN_TIMEOUT_MINUTES = 15 + +# ===== LOGGING ===== + +def log(msg): + ts = datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") + print(f"[{ts}] {msg}", flush=True) + +# ===== SESSION MANAGEMENT (same as polling agent) ===== + +def load_auth(): + if AUTH_FILE.exists(): + try: + data = json.loads(AUTH_FILE.read_text()) + if all(k in data for k in ("access-token", "client", "expiry", "uid")): + return data + except Exception as e: + log(f"WARNING: Failed to load auth file: {e}") + return None + +def save_auth(data): + AUTH_FILE.write_text(json.dumps(data, indent=2)) + log(f"Session saved to {AUTH_FILE}") + +def get_headers(): + auth = load_auth() + if auth: + return { + "access-token": auth.get("access-token"), + "client": auth.get("client"), + "expiry": auth.get("expiry"), + "uid": auth.get("uid"), + } + return { + "access-token": "uueUhS5OBWOeabNdleUa8w", + "client": "4xu1KgEP3RzNoM86hAkeCg", + "expiry": "1785135457", + "uid": "qiuzhida@greatqiu.cn", + } + +def renew_session(): + log("Renewing Chatwoot session...") + try: + r = requests.post(CW_AUTH_URL, json={"email": CW_EMAIL, "password": CW_PASSWORD}, timeout=15) + if r.status_code != 200: + log(f"Login failed: {r.status_code} {r.text[:200]}") + return None + access_token = r.headers.get("access-token") + client = r.headers.get("client") + expiry = r.headers.get("expiry") + uid = r.headers.get("uid") + if not all([access_token, client, expiry, uid]): + log(f"Missing headers: {dict(r.headers)}") + return None + data = {"access-token": access_token, "client": client, "expiry": expiry, "uid": uid, + "updated_at": datetime.now(TZ).isoformat()} + save_auth(data) + exp_time = datetime.fromtimestamp(int(expiry)) + log(f"Session renewed, expires: {exp_time}") + return data + except Exception as e: + log(f"Renewal error: {e}") + return None + +def ensure_session(): + auth = load_auth() + now = time.time() + if auth: + try: + expiry_ts = int(auth.get("expiry", "0")) + remaining = timedelta(seconds=expiry_ts - now) + log(f"Session expires in {remaining}") + if remaining > RENEW_THRESHOLD: + return get_headers() + except (ValueError, TypeError): + pass + log("Session needs renewal") + new_auth = renew_session() + if new_auth: + return { + "access-token": new_auth["access-token"], + "client": new_auth["client"], + "expiry": new_auth["expiry"], + "uid": new_auth["uid"], + } + log("WARNING: Using fallback headers") + return get_headers() + +# ===== PROCESSED MESSAGE TRACKING ===== + +def load_processed(): + if PROCESSED_FILE.exists(): + try: + return set(json.loads(PROCESSED_FILE.read_text())) + except: pass + return set() + +def save_processed(ids): + keep = list(ids)[-1000:] # keep last 1000 + PROCESSED_FILE.write_text(json.dumps(keep)) + +processed_ids = load_processed() +processed_lock = Lock() + +def is_processed(msg_id): + with processed_lock: + return msg_id in processed_ids + +def mark_processed(msg_id): + with processed_lock: + processed_ids.add(msg_id) + if len(processed_ids) % 20 == 0: + save_processed(processed_ids) + +# ===== AI-SENT MESSAGE TRACKING ===== +# Track message IDs that OUR AI sent via the API. +# This lets us distinguish our own AI replies from human agent messages +# when receiving events via WebSocket (since both use sender_type="User"). +ai_sent_msg_ids = set() +ai_sent_lock = Lock() + +# Track conversation IDs where we just sent a message (race condition safety net) +# Entries are added BEFORE API call, removed AFTER tracking the real message ID. +_ai_pending_convs = set() +_ai_pending_lock = Lock() + +def track_sent_message(msg_id): + if msg_id: + with ai_sent_lock: + ai_sent_msg_ids.add(msg_id) + +def is_ai_sent_message(msg_id): + with ai_sent_lock: + return msg_id in ai_sent_msg_ids + +# ===== HUMAN ACTIVE TRACKING ===== +# Track conversations where a human agent has sent a message. +# {conversation_id: timestamp_of_last_human_message} +human_active_convs = {} +human_active_lock = Lock() + +def mark_human_active(conv_id): + """Record that a human agent has engaged in this conversation.""" + with human_active_lock: + human_active_convs[conv_id] = time.time() + log(f"👤 Human marked active for conv #{conv_id}") + +def is_human_active(conv_id): + """Check if a human is still active in this conversation. + + Returns True if a human has replied within HUMAN_TIMEOUT_MINUTES. + Auto-removes expired entries. + """ + with human_active_lock: + ts = human_active_convs.get(conv_id) + if ts is None: + return False + elapsed = time.time() - ts + if elapsed > HUMAN_TIMEOUT_MINUTES * 60: + del human_active_convs[conv_id] + log(f"⏱️ Conv #{conv_id} human timeout ({HUMAN_TIMEOUT_MINUTES}min), AI resuming") + return False + return True + +def remove_human_active(conv_id): + """Remove a conversation from human active tracking. + + Called when conversation is resolved, changed to pending, + or explicitly handed back to AI. + """ + with human_active_lock: + if conv_id in human_active_convs: + del human_active_convs[conv_id] + log(f"🔄 Conv #{conv_id} removed from human active, AI may resume") + +def clean_expired_human_active(): + """Periodic cleanup of expired human active entries.""" + with human_active_lock: + now = time.time() + expired = [cid for cid, ts in human_active_convs.items() + if now - ts > HUMAN_TIMEOUT_MINUTES * 60] + for cid in expired: + del human_active_convs[cid] + log(f"⏱️ Conv #{cid} human timeout (cleanup), AI resuming") + +# ===== AI HELPERS ===== + +import re as _re # used for session-id stripping + +def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent"): + """Call QwenPaw AI via agents chat. + + Args: + prompt: The user message + system_prompt: Optional system instructions (appended before prompt) + target_agent: Which agent to call (default:sourcing-agent, translation:default) + """ + full_prompt = (system_prompt + "\n\n" + prompt) if system_prompt else prompt + try: + result = subprocess.run( + ["qwenpaw", "agents", "chat", + "--from-agent", "wordpress", + "--to-agent", target_agent, + "--text", full_prompt], + capture_output=True, text=True, timeout=90 + ) + if result.returncode != 0: + log(f"AI error (rc={result.returncode}): {result.stderr[:200]}") + return None + # Clean output: remove INFO/SESSION/Agent lines + raw = result.stdout.strip() + lines = raw.split("\n") + clean_lines = [] + for line in lines: + stripped = line.strip() + if stripped.startswith(("INFO:", "[Agent")): + continue + # Remove [SESSION: ...] prefix from any line + stripped = _re.sub(r'\[SESSION:\s*[^\]]*\]\s*', '', stripped) + if stripped: + clean_lines.append(stripped) + reply = "\n".join(clean_lines).strip() + if not reply: + log(f"⚠️ AI reply empty after cleaning. Raw: {raw[:200]}") + return None + return reply + except subprocess.TimeoutExpired: + log("AI call timed out") + return None + except Exception as e: + log(f"AI error: {e}") + return None + +def translate_to_chinese(text): + """Translate text to Simplified Chinese.""" + prompt = ( + f"Translate the following text to Simplified Chinese. " + f"Keep all factual details (names, numbers, URLs) intact. " + f"Only output the translation, no explanations.\n\n{text}" + ) + result = call_qwenpaw_ai(prompt, target_agent="default") + return result if result else text + +def generate_ai_reply(customer_msg, sender_name, inbox_id): + """Generate an AI reply for a customer message, routed by inbox_id. + + Uses INBOX_CONFIG[inbox_id] to pick the right AI agent, system prompt, + and prompt template. Instructs AI to self-assess handoff need via [HANDOFF]. + """ + config = INBOX_CONFIG.get(inbox_id) + if not config: + log(f"No config for inbox #{inbox_id}, falling back to sourcing-agent") + config = INBOX_CONFIG[1] + + prompt = config["prompt_template"].format( + sender_name=sender_name, + customer_msg=customer_msg + ) + reply = call_qwenpaw_ai(prompt, config["system_prompt"], + target_agent=config["target_agent"]) + + if not reply: + return reply + + # Strip common meta-prefixes the AI sometimes adds (for English prompts) + for prefix in [ + "Here's the professional reply:\n\n---\n\n", + "Here's the professional reply:\n\n", + "Here's my professional reply:\n\n", + "Here's the reply:\n\n", + "Professional reply:\n\n", + ]: + if reply.startswith(prefix): + reply = reply[len(prefix):] + break + + return reply + +# ===== CHATWOOT API ===== + +def send_reply(conv_id, content, headers): + """Send a message as the current user. Tracks the returned message ID.""" + with _ai_pending_lock: + _ai_pending_convs.add(conv_id) + try: + r = requests.post(f"{CW_API_BASE}/conversations/{conv_id}/messages", + json={"content": content}, headers=headers, timeout=10) + if r.status_code in (200, 201): + data = r.json() + track_sent_message(data.get("id")) + return True, data + return False, r.text + finally: + with _ai_pending_lock: + _ai_pending_convs.discard(conv_id) + +def send_private_note(conv_id, content, headers): + """Send a private note (agents-only). Tracks message ID to avoid false human detection.""" + with _ai_pending_lock: + _ai_pending_convs.add(conv_id) + try: + r = requests.post(f"{CW_API_BASE}/conversations/{conv_id}/messages", + json={"content": content, "private": True}, headers=headers, timeout=10) + if r.status_code in (200, 201): + data = r.json() + track_sent_message(data.get("id")) + return True + return False + finally: + with _ai_pending_lock: + _ai_pending_convs.discard(conv_id) + +def update_conversation_status(conv_id, status, headers): + """Update conversation status (pending/open/resolved).""" + try: + r = requests.post(f"{CW_API_BASE}/conversations/{conv_id}/toggle_status", + json={"status": status}, headers=headers, timeout=5) + return r.status_code in (200, 201) + except Exception as e: + log(f"Status update error: {e}") + return False + +# ===== MESSAGE HANDLER ===== + +def handle_incoming_message(msg_data, headers): + """Process an incoming customer message (with human handoff awareness).""" + msg_id = msg_data.get("id") + if not msg_id: + return + if is_processed(msg_id): + return + + # Validate it's a customer incoming message + sender_type = msg_data.get("sender_type") + # Chatwoot enum: 0=incoming, 1=outgoing, 2=activity, 3=template + message_type = msg_data.get("message_type") + is_private = msg_data.get("private", False) + + if sender_type != "Contact": + return + if message_type != 0: # 0 = incoming (customer message) + return + if is_private: + return + + # Route by inbox_id + inbox_id = msg_data.get("inbox_id") + config = INBOX_CONFIG.get(inbox_id) + if not config: + log(f"⏭️ Unknown inbox #{inbox_id}, skipping") + return + + content = str(msg_data.get("content", "")).strip() + conv_id = msg_data.get("conversation_id") + sender_name = "Customer" + if msg_data.get("sender"): + sender_name = msg_data["sender"].get("name", "Customer") + + if not content or not conv_id: + return + + # 🔑 Check if a human agent is active in this conversation + if is_human_active(conv_id): + log(f"⏭️ [{config['name']}] Conv #{conv_id} has active human agent, skipping AI reply") + return + + # Mark immediately to avoid duplicate processing + mark_processed(msg_id) + + log(f"📩 [{config['name']}] New msg #{msg_id} from '{sender_name}' in conv #{conv_id}: {content[:100]}") + + # Generate AI reply with inbox-specific config + log(f"🤖 [{config['name']}] Generating AI reply...") + reply = generate_ai_reply(content, sender_name, inbox_id) + if not reply: + log(f"⚠️ [{config['name']}] Empty AI reply, skipping") + return + + log(f"💬 [{config['name']}] AI reply: {reply[:150]}") + + # Check if AI thinks this needs human handoff + needs_handoff = False + if "[HANDOFF]" in reply: + needs_handoff = True + reply = reply.replace("[HANDOFF]", "").replace("\n[HANDOFF]", "").replace("[HANDOFF]\n", "").strip() + log(f"🚩 [{config['name']}] AI detected handoff needed for conv #{conv_id}") + + # Send the reply (will be tracked in ai_sent_msg_ids) + ok, resp_data = send_reply(conv_id, reply, headers) + if ok: + log(f"✅ Reply sent to conv #{conv_id}") + else: + log(f"❌ Failed to send reply: {resp_data}") + + # Send private note with translation + chinese_summary = translate_to_chinese( + f"Customer {sender_name}: {content}\n\nAI Agent replied: {reply}" + ) + note = ( + f"🤖 AI 自动回复\n" + f"━━━━━━━━━━━━━━━\n" + f"📩 客户消息:\n{content}\n\n" + f"💬 AI 回复:\n{reply}\n\n" + f"🌐 中文摘要:\n{chinese_summary}" + ) + if send_private_note(conv_id, note, headers): + log(f"✅ Private note sent to conv #{conv_id}") + else: + log(f"❌ Failed to send private note") + + # If handoff needed: send alert + change conversation status to 'open' + if needs_handoff: + handoff_note = ( + f"⚠️ ⚠️ ⚠️ 建议人工介入 ⚠️ ⚠️ ⚠️\n" + f"━━━━━━━━━━━━━━━━━━━━━━━\n" + f"客户 '{sender_name}' 的询问 AI 判断需要人工处理。\n" + f"请尽快回复此会话。\n\n" + f"📩 客户消息原文:\n{content}" + ) + send_private_note(conv_id, handoff_note, headers) + # Change conversation to 'open' so Chatwoot alerts human agents + update_conversation_status(conv_id, "open", headers) + log(f"🔔 Handoff alert sent for conv #{conv_id}") + +# ===== WEBSOCKET CLIENT ===== + +class WSAgent: + def __init__(self): + self.ws = None + self.headers = None + self.running = Event() + self.running.set() + self.reconnect_delay = 1 + self.last_pong = time.time() + + def on_open(self, ws): + log("🟢 WebSocket connected") + # Subscribe to RoomChannel + sub = json.dumps({ + "channel": "RoomChannel", + "pubsub_token": PUBSUB_TOKEN, + "user_id": USER_ID, + "account_id": CW_ACCOUNT_ID + }) + ws.send(json.dumps({"command": "subscribe", "identifier": sub})) + log("📡 Subscribing to RoomChannel...") + + def on_message(self, ws, raw): + try: + data = json.loads(raw) + except json.JSONDecodeError: + return + + t = data.get("type") + + if t == "ping": + self.last_pong = time.time() + return + + if t == "welcome": + log("📡 Connected to Chatwoot ActionCable") + return + + if t == "confirm_subscription": + log("✅ RoomChannel subscription confirmed") + self.reconnect_delay = 1 + return + + if t == "reject_subscription": + log("❌ Subscription REJECTED! Check credentials") + return + + # Process events + msg = data.get("message", {}) + if isinstance(msg, dict): + event = msg.get("event", "") + if event == "presence.update": + pass # ignore silently + elif event in ("message.created", "MESSAGE_CREATED"): + self._on_message_created(msg.get("data", {})) + elif event in ("conversation.updated", "CONVERSATION_UPDATED"): + self._on_conversation_updated(msg.get("data", {})) + elif event in ("conversation.created", "CONVERSATION_CREATED"): + log(f"📋 New conversation created") + elif event: + log(f"📡 Event: {event}") + + def _on_message_created(self, msg_data): + """Handle a message_created event. + + Detects human agent messages (for handoff tracking) + and processes incoming customer messages (for AI reply). + """ + msg_id = msg_data.get("id") + sender_type = msg_data.get("sender_type") + conv_id = msg_data.get("conversation_id") + + # 🔑 Detect human agent messages (sender_type="User" that are NOT from AI) + if sender_type == "User" and conv_id and msg_id: + with _ai_pending_lock: + is_pending = conv_id in _ai_pending_convs + if is_pending or is_ai_sent_message(msg_id): + pass # This is our own AI message, ignore + else: + # This message was sent by a human agent (not our AI) + mark_human_active(conv_id) + sender_info = msg_data.get("sender", {}) + agent_name = sender_info.get("name", "Unknown") if isinstance(sender_info, dict) else "Unknown" + content = str(msg_data.get("content", ""))[:100] + log(f"👤 Human agent '{agent_name}' replied in conv #{conv_id}: {content}") + + # Process incoming customer messages as before + Thread(target=handle_incoming_message, args=(msg_data, self.headers), daemon=True).start() + + def _on_conversation_updated(self, data): + """Handle conversation status changes. + + If conversation is resolved or changed to pending, + allow AI to resume. + """ + if not data: + return + conv_id = data.get("id") + status = data.get("status") + if not conv_id: + return + if status in ("resolved", "pending"): + remove_human_active(conv_id) + log(f"🔄 Conv #{conv_id} status -> '{status}', AI may resume") + elif status == "open": + log(f"🔄 Conv #{conv_id} status -> 'open'") + + def on_error(self, ws, error): + log(f"🔴 WS Error: {error}") + + def on_close(self, ws, status, msg): + log(f"🔴 WebSocket closed (status={status}, msg={msg})") + if self.running.is_set(): + self._reconnect() + + def _reconnect(self): + delay = min(self.reconnect_delay, 60) + log(f"Reconnecting in {delay}s...") + time.sleep(delay) + self.reconnect_delay = min(delay * 2, 60) + self.start() + + def start(self): + """Start WebSocket connection (blocking).""" + self.headers = ensure_session() + log(f"Connecting to {CW_WS_URL}") + + self.ws = websocket.WebSocketApp( + CW_WS_URL, + on_open=self.on_open, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close + ) + + # Run forever (blocks) + self.ws.run_forever( + sslopt={"cert_reqs": ssl.CERT_NONE}, + ping_interval=15, + ping_timeout=5, + reconnect=5 # built-in reconnect + ) + + def stop(self): + """Stop the WebSocket connection.""" + self.running.clear() + if self.ws: + self.ws.close() + log("Agent stopped") + +# ===== TIMEOUT CHECKER THREAD ===== + +def timeout_checker_loop(): + """Background thread that cleans up expired human_active entries.""" + while True: + time.sleep(60) # Check every minute + try: + clean_expired_human_active() + except Exception as e: + log(f"Timeout checker error: {e}") + +# ===== MAIN ===== + +def main(): + parser = argparse.ArgumentParser(description="Chatwoot WebSocket AI Agent") + parser.add_argument("--renew", action="store_true", help="Force renew session & exit") + parser.add_argument("--test-ws", action="store_true", help="Test WebSocket connection & exit") + args = parser.parse_args() + + if args.renew: + renew_session() + return + + if args.test_ws: + log("Testing WebSocket connection...") + headers = ensure_session() + log("Session OK, testing WS...") + agent = WSAgent() + # Just connect and subscribe, then exit after 5 seconds + def _timeout(): + time.sleep(5) + log("Test OK, disconnecting...") + agent.stop() + Thread(target=_timeout, daemon=True).start() + agent.start() + return + + log(f"🚀 Chatwoot WS Agent starting...") + log(f"⚙️ Human timeout: {HUMAN_TIMEOUT_MINUTES}min") + log(f"⚙️ Account ID: {CW_ACCOUNT_ID}") + + # Start the timeout checker daemon + checker = Thread(target=timeout_checker_loop, daemon=True) + checker.start() + log("⏱️ Timeout checker started") + + # Start the main agent + agent = WSAgent() + try: + agent.start() + except KeyboardInterrupt: + log("Shutting down...") + agent.stop() + +if __name__ == "__main__": + main() +