diff --git a/chatwoot_ws_agent.py b/chatwoot_ws_agent.py index 101b446..ff01a9d 100644 --- a/chatwoot_ws_agent.py +++ b/chatwoot_ws_agent.py @@ -518,53 +518,71 @@ def load_state(): STATE_FILE.unlink(missing_ok=True) +# ===== DEBOUNCE (message coalescing) ===== +# Accumulate messages from same conv within 5s window. +# Only process the last one with all accumulated context. +DEBOUNCE_WINDOW = 5 # seconds +_debounce_timers: dict[int, float] = {} # conv_id → last message time +_debounce_msgs: dict[int, list[str]] = {} # conv_id → accumulated messages +_debounce_lock = Lock() + + # ===== AI HELPERS ===== import re as _re # used for session-id stripping -def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent"): - """Call QwenPaw AI via agents chat. +GATEWAY_ENABLED = os.environ.get("GATEWAY_ENABLED", "1") == "1" + +def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", retries=2): + """Call QwenPaw AI via agents chat, with retries on failure. Args: prompt: The user message system_prompt: Optional system instructions (appended before prompt) target_agent: Which agent to call (default:sourcing-agent, translation:default) + retries: Number of retries on failure (exponential backoff) """ full_prompt = (system_prompt + "\n\n" + prompt) if system_prompt else prompt - try: - result = subprocess.run( - ["qwenpaw", "agents", "chat", - "--from-agent", "wordpress", - "--to-agent", target_agent, - "--text", full_prompt], - capture_output=True, text=True, timeout=90 - ) - if result.returncode != 0: - log(f"AI error (rc={result.returncode}): {result.stderr[:200]}") - return None - # Clean output: remove INFO/SESSION/Agent lines - raw = result.stdout.strip() - lines = raw.split("\n") - clean_lines = [] - for line in lines: - stripped = line.strip() - if stripped.startswith(("INFO:", "[Agent")): - continue - # Remove [SESSION: ...] prefix from any line - stripped = _re.sub(r'\[SESSION:\s*[^\]]*\]\s*', '', stripped) - if stripped: - clean_lines.append(stripped) - reply = "\n".join(clean_lines).strip() - if not reply: - log(f"⚠️ AI reply empty after cleaning. Raw: {raw[:200]}") - return None - return reply - except subprocess.TimeoutExpired: - log("AI call timed out") - return None - except Exception as e: - log(f"AI error: {e}") - return None + last_error = None + for attempt in range(1 + retries): + try: + result = subprocess.run( + ["qwenpaw", "agents", "chat", + "--from-agent", "wordpress", + "--to-agent", target_agent, + "--text", full_prompt], + capture_output=True, text=True, timeout=90 + ) + if result.returncode == 0 and result.stdout.strip(): + # Clean output: remove INFO/SESSION/Agent lines + raw = result.stdout.strip() + lines = raw.split("\n") + clean_lines = [] + for line in lines: + stripped = line.strip() + if stripped.startswith(("INFO:", "[Agent")): + continue + # Remove [SESSION: ...] prefix from any line + stripped = _re.sub(r'\[SESSION:\s*[^\]]*\]\s*', '', stripped) + if stripped: + clean_lines.append(stripped) + reply = "\n".join(clean_lines).strip() + if reply: + return reply + last_error = f"rc={result.returncode}" if result.returncode != 0 else "empty reply" + if result.stderr: + last_error += f" stderr={result.stderr[:200]}" + except subprocess.TimeoutExpired: + last_error = "timeout (90s)" + except Exception as e: + last_error = str(e)[:200] + if attempt < retries: + wait = 2 ** attempt # 1s, 2s + log(f"⚠️ AI call failed ({last_error}), retry {attempt+1}/{retries} in {wait}s", "WARN") + time.sleep(wait) + log(f"❌ AI call failed after {retries+1} attempts: {last_error}", "ERROR") + return None + def translate_to_chinese(text): """Translate text to Simplified Chinese.""" @@ -576,20 +594,16 @@ def translate_to_chinese(text): result = call_qwenpaw_ai(prompt, target_agent="default") return result if result else text -def generate_ai_reply(customer_msg, sender_name, inbox_id): - """Generate an AI reply for a customer message, routed by inbox_id. - - Uses INBOX_CONFIG[inbox_id] to pick the right AI agent, system prompt, - and prompt template. Instructs AI to self-assess handoff need via [HANDOFF]. - """ +def generate_ai_reply(customer_msg, sender_name, inbox_id, context=""): config = INBOX_CONFIG.get(inbox_id) if not config: log(f"No config for inbox #{inbox_id}, skipping", "WARN") return None + body = f"{context}\n\n{customer_msg}" if context else customer_msg prompt = config["prompt_template"].format( sender_name=sender_name, - customer_msg=customer_msg + customer_msg=body ) reply = call_qwenpaw_ai(prompt, config["system_prompt"], target_agent=config["target_agent"]) @@ -723,6 +737,25 @@ def handle_meeting_message(msg_data, headers): # ===== MESSAGE HANDLER ===== +def _enrich_context(inbox_id, customer_msg, config): + if not GATEWAY_ENABLED: + return "" + channel = config.get("type", "web_widget") + if channel not in ("amazon", "jd", "taobao", "pdd", "tiktok"): + return "" + tenant_id = config.get("tenant_id") + if not tenant_id: + return "" + asin_match = _re.search(r'\b(B0[A-Z0-9]{8})\b', customer_msg, _re.IGNORECASE) + query = {"asin": asin_match.group(1).upper()} if asin_match else {"keyword": customer_msg[:50]} + try: + from gateway import fetch + result = fetch(channel, tenant_id, query, timeout=3.0) + return result.to_prompt_block() + except Exception as e: + log(f"Gateway enrich error: {e}") + return "" + def handle_incoming_message(msg_data, headers): """Process an incoming customer message (with human handoff awareness).""" msg_id = msg_data.get("id") @@ -768,12 +801,36 @@ def handle_incoming_message(msg_data, headers): # Mark immediately to avoid duplicate processing mark_processed(msg_id) + # ===== DEBOUNCE: coalesce rapid messages from same conv ===== + with _debounce_lock: + now = time.time() + last_time = _debounce_timers.get(conv_id, 0) + if now - last_time < DEBOUNCE_WINDOW: + # Within debounce window → accumulate, don't reply yet + if conv_id not in _debounce_msgs: + _debounce_msgs[conv_id] = [] + _debounce_msgs[conv_id].append(content) + _debounce_timers[conv_id] = now + log(f"⏳ [{config['name']}] Debounce #{conv_id}: accumulated msg #{msg_id} (window {DEBOUNCE_WINDOW}s)") + return + # First message or window expired → process now + _debounce_timers[conv_id] = now + # Collect any previously accumulated messages + accumulated = _debounce_msgs.pop(conv_id, []) + if accumulated: + accumulated.append(content) + content = "\n---\n".join(accumulated) + log(f"📦 [{config['name']}] Debounce #{conv_id}: processing {len(accumulated)} merged msgs") + + # ===== END DEBOUNCE ===== + log(f"📩 [{config['name']}] New msg #{msg_id} from '{sender_name}' in conv #{conv_id}: {content[:100]}", inbox_name=config['name']) # Generate AI reply with inbox-specific config log(f"🤖 [{config['name']}] Generating AI reply...", inbox_name=config['name']) start_time = time.time() - reply = generate_ai_reply(content, sender_name, inbox_id) + context = _enrich_context(inbox_id, content, config) + reply = generate_ai_reply(content, sender_name, inbox_id, context=context) duration_ms = (time.time() - start_time) * 1000 if not reply: @@ -1191,6 +1248,14 @@ def main(): log(f"⚙️ Human timeout: {HUMAN_TIMEOUT_MINUTES}min") log(f"⚙️ Account ID: {CW_ACCOUNT_ID}") + if GATEWAY_ENABLED: + try: + from gateway import gateway_loop as _gw_loop + _gw_loop.start() + log("🌐 Platform Gateway event loop started") + except Exception as e: + log(f"⚠️ Gateway init failed (continuing without): {e}", "WARN") + # Start the timeout checker daemon checker = Thread(target=timeout_checker_loop, daemon=True) checker.start() @@ -1202,6 +1267,12 @@ def main(): def _shutdown(signum, frame): log(f"Received signal {signum}, shutting down gracefully...") agent.stop() + if GATEWAY_ENABLED: + try: + from gateway import gateway_loop as _gw_loop + _gw_loop.stop() + except Exception: + pass save_state() metrics.flush() sys.exit(0)