From 74f57a1a435b3fccd952f17b46e2974640c116e1 Mon Sep 17 00:00:00 2001 From: Chatwoot AI Agent Dev Date: Sat, 6 Jun 2026 03:55:40 +0000 Subject: [PATCH] refactor(ws_agent): use shared chatwoot_client + inboxes_io modules - Replace 5 duplicate auth functions with chatwoot_client delegation - Replace _validate_config with inboxes_io.validate_entry - Replace requests.post with chatwoot_client._call_cw (4 call sites) - Remove import requests dependency - Clean up unused CW_EMAIL/CW_PASSWORD/RENEW_THRESHOLD vars - Add _api_path() helper for clean API path construction --- chatwoot_ws_agent.py | 202 +++++++++++++------------------------------ 1 file changed, 62 insertions(+), 140 deletions(-) diff --git a/chatwoot_ws_agent.py b/chatwoot_ws_agent.py index b990529..771b0b7 100644 --- a/chatwoot_ws_agent.py +++ b/chatwoot_ws_agent.py @@ -3,7 +3,7 @@ Chatwoot WebSocket Agent — Real-time AI Auto-Reply with Human Handoff ======================================================================= Connects to Chatwoot ActionCable WebSocket for instant message delivery, -generates AI replies, and sends Chinese private notes. +generates AI replies, and manages AI ↔ human handoff. Features: - Real-time AI auto-reply to customer messages @@ -11,6 +11,7 @@ Features: - AI-initiated handoff: AI suggests human intervention when needed - Timeout recovery: AI resumes after human inactivity (configurable) - Status-based recovery: AI resumes when conversation is resolved/pending + - Multi-tenant inbox routing (hot-reloadable from inboxes.json) Usage: python3 chatwoot_ws_agent.py # Run (daemon mode) @@ -22,22 +23,32 @@ Requirements: """ import os, sys, json, time, subprocess, argparse, ssl, signal -import requests import websocket from pathlib import Path from datetime import datetime, timezone, timedelta from threading import Thread, Event, Lock from collections import defaultdict +# ── Shared modules (chatwoot-ai-agent repo) ───────────────────── +_REPO_DIR = Path(__file__).parent.parent.parent / "chatwoot-ai-agent" +if _REPO_DIR.exists(): + sys.path.insert(0, str(_REPO_DIR.resolve())) +import chatwoot_client +import inboxes_io + # ===== CONFIG ===== CW_BASE = os.environ.get("CW_BASE", "http://localhost:3000") CW_WS_URL = CW_BASE.replace("https://", "wss://").replace("http://", "ws://") + "/cable" CW_ACCOUNT_ID = 1 -CW_AUTH_URL = f"{CW_BASE}/auth/sign_in" -CW_API_BASE = f"{CW_BASE}/api/v1/accounts/{CW_ACCOUNT_ID}" SCRIPT_DIR = Path(__file__).parent -AUTH_FILE = SCRIPT_DIR / "chatwoot_auth.json" +# Point chatwoot_client at our auth file +chatwoot_client.CW_AUTH_FILE = SCRIPT_DIR / "chatwoot_auth.json" +chatwoot_client.CW_EMAIL = os.environ.get("CW_EMAIL", "") +chatwoot_client.CW_PASSWORD = os.environ.get("CW_PASSWORD", "") +chatwoot_client.CW_BASE = CW_BASE +chatwoot_client.CW_ACCOUNT_ID = CW_ACCOUNT_ID + PROCESSED_FILE = SCRIPT_DIR / ".chatwoot_ws_processed.json" METRICS_FILE = SCRIPT_DIR / ".chatwoot_ws_metrics.json" STATE_FILE = SCRIPT_DIR / ".chatwoot_ws_state.json" @@ -74,22 +85,6 @@ DEFAULT_INBOX_CONFIG = { } } -def _validate_config(config): - """Validate inbox config structure and required placeholders.""" - required_keys = ["name", "target_agent", "system_prompt", "prompt_template"] - template_placeholders = ["{sender_name}", "{customer_msg}"] - if not isinstance(config, dict): - return False - for key in required_keys: - if key not in config: - log(f"Config missing required key '{key}'", "WARN") - return False - for ph in template_placeholders: - if ph not in config.get("prompt_template", ""): - log(f"prompt_template missing placeholder {ph}", "WARN") - return False - return True - def _load_inboxes_config(): """Load inbox config from JSON file. Falls back to DEFAULT_INBOX_CONFIG if file missing.""" global INBOX_CONFIG, _INBOX_CONFIG_MTIME @@ -103,14 +98,14 @@ def _load_inboxes_config(): mtime = INBOX_CONFIG_FILE.stat().st_mtime if mtime <= _INBOX_CONFIG_MTIME: return # no change - raw = json.loads(INBOX_CONFIG_FILE.read_text()) + raw = inboxes_io.read_inboxes_raw(INBOX_CONFIG_FILE) new_cfg = {} for k, v in raw.items(): if k.startswith("_"): continue # skip _meta etc. try: inbox_id = int(k) - if _validate_config(v): + if inboxes_io.validate_entry(v): new_cfg[inbox_id] = v else: log(f"Invalid config for inbox #{k}, skipping", "WARN") @@ -142,12 +137,6 @@ def _load_inboxes_config(): INBOX_CONFIG = DEFAULT_INBOX_CONFIG.copy() # Login credentials for auto-renewal -CW_EMAIL = os.environ.get("CW_EMAIL") -CW_PASSWORD = os.environ.get("CW_PASSWORD") -# Agent identity (from /api/v1/profile response) - - -RENEW_THRESHOLD = timedelta(hours=6) TZ = timezone(timedelta(hours=8)) # ===== HUMAN HANDOFF CONFIG ===== @@ -271,89 +260,17 @@ metrics = Metrics(METRICS_FILE) # Initial load (after log is defined) _load_inboxes_config() -# ===== SESSION MANAGEMENT (same as polling agent) ===== - -def load_auth(): - if AUTH_FILE.exists(): - try: - data = json.loads(AUTH_FILE.read_text()) - if all(k in data for k in ("access-token", "client", "expiry", "uid")): - return data - except Exception as e: - log(f"WARNING: Failed to load auth file: {e}") - return None - -def save_auth(data): - AUTH_FILE.write_text(json.dumps(data, indent=2)) - log(f"Session saved to {AUTH_FILE}") - -def get_headers(): - auth = load_auth() - if auth: - return { - "access-token": auth.get("access-token"), - "client": auth.get("client"), - "expiry": auth.get("expiry"), - "uid": auth.get("uid"), - } - return None - -def renew_session(): - log("Renewing Chatwoot session...") - try: - r = requests.post(CW_AUTH_URL, json={"email": CW_EMAIL, "password": CW_PASSWORD}, timeout=15) - if r.status_code != 200: - log(f"Login failed: {r.status_code} {r.text[:200]}") - return None - access_token = r.headers.get("access-token") - client = r.headers.get("client") - expiry = r.headers.get("expiry") - uid = r.headers.get("uid") - if not all([access_token, client, expiry, uid]): - log(f"Missing headers: {dict(r.headers)}") - return None - # Extract pubsub_token from response body (ActionCable auth) - pubsub_token = "" - try: - pubsub_token = r.json().get("data", {}).get("pubsub_token", "") - except Exception: - pass - data = {"access-token": access_token, "client": client, "expiry": expiry, "uid": uid, - "pubsub_token": pubsub_token, - "updated_at": datetime.now(TZ).isoformat()} - save_auth(data) - exp_time = datetime.fromtimestamp(int(expiry)) - log(f"Session renewed, expires: {exp_time}") - return data - except Exception as e: - log(f"Renewal error: {e}") - return None +# ===== SESSION MANAGEMENT (delegated to chatwoot_client) ===== def ensure_session(): - auth = load_auth() - now = time.time() - if auth: - try: - expiry_ts = int(auth.get("expiry", "0")) - remaining = timedelta(seconds=expiry_ts - now) - log(f"Session expires in {remaining}") - if remaining > RENEW_THRESHOLD: - return get_headers() - except (ValueError, TypeError): - pass - log("Session needs renewal") - new_auth = renew_session() - if new_auth: - return { - "access-token": new_auth["access-token"], - "client": new_auth["client"], - "expiry": new_auth["expiry"], - "uid": new_auth["uid"], - } - raise RuntimeError( - "No Chatwoot session available. " - "Ensure 'chatwoot_auth.json' exists or CW_EMAIL/CW_PASSWORD env vars are set." - ) + """Get valid Chatwoot session headers. Returns headers dict or raises RuntimeError.""" + headers = chatwoot_client.ensure_session() + if headers is None: + raise RuntimeError( + "No Chatwoot session available. " + "Ensure 'chatwoot_auth.json' exists or CW_EMAIL/CW_PASSWORD env vars are set." + ) + return headers # ===== PROCESSED MESSAGE TRACKING ===== @@ -741,46 +658,57 @@ def generate_ai_reply(customer_msg, sender_name, inbox_id, context="", session_i return reply -# ===== CHATWOOT API ===== +# ===== CHATWOOT API (wraps chatwoot_client._call_cw) ===== -def send_reply(conv_id, content, headers): +def _api_path(suffix): + """Build a full API path relative to CW_BASE.""" + return f"/api/v1/accounts/{CW_ACCOUNT_ID}{suffix}" + +def send_reply(conv_id, content, headers=None): """Send a message as the current user. Tracks the returned message ID.""" with _ai_pending_lock: _ai_pending_convs.add(conv_id) try: - r = requests.post(f"{CW_API_BASE}/conversations/{conv_id}/messages", - json={"content": content}, headers=headers, timeout=10) - if r.status_code in (200, 201): - data = r.json() - track_sent_message(data.get("id")) - return True, data - return False, r.text + data = chatwoot_client._call_cw( + "POST", + _api_path(f"/conversations/{conv_id}/messages"), + {"content": content}, + ) + track_sent_message(data.get("id")) + return True, data + except Exception as e: + return False, str(e) finally: with _ai_pending_lock: _ai_pending_convs.discard(conv_id) -def send_private_note(conv_id, content, headers): +def send_private_note(conv_id, content, headers=None): """Send a private note (agents-only). Tracks message ID to avoid false human detection.""" with _ai_pending_lock: _ai_pending_convs.add(conv_id) try: - r = requests.post(f"{CW_API_BASE}/conversations/{conv_id}/messages", - json={"content": content, "private": True}, headers=headers, timeout=10) - if r.status_code in (200, 201): - data = r.json() - track_sent_message(data.get("id")) - return True + data = chatwoot_client._call_cw( + "POST", + _api_path(f"/conversations/{conv_id}/messages"), + {"content": content, "private": True}, + ) + track_sent_message(data.get("id")) + return True + except Exception as e: return False finally: with _ai_pending_lock: _ai_pending_convs.discard(conv_id) -def update_conversation_status(conv_id, status, headers): +def update_conversation_status(conv_id, status, headers=None): """Update conversation status (pending/open/resolved).""" try: - r = requests.post(f"{CW_API_BASE}/conversations/{conv_id}/toggle_status", - json={"status": status}, headers=headers, timeout=5) - return r.status_code in (200, 201) + chatwoot_client._call_cw( + "POST", + _api_path(f"/conversations/{conv_id}/toggle_status"), + {"status": status}, + ) + return True except Exception as e: log(f"Status update error: {e}") return False @@ -1243,16 +1171,10 @@ if not PUBSUB_TOKEN: if not PUBSUB_TOKEN: # Last resort: try renewing session to get pubsub_token from login response log("PUBSUB_TOKEN not set, attempting session renewal to obtain it...", "WARN") - _new_auth = None try: - _r = requests.post( - CW_AUTH_URL, - json={"email": CW_EMAIL, "password": CW_PASSWORD}, - timeout=15 - ) - if _r.status_code == 200: - _body = _r.json() - PUBSUB_TOKEN = _body.get("data", {}).get("pubsub_token", "") + _new_auth = chatwoot_client.renew_session() + if _new_auth: + PUBSUB_TOKEN = _new_auth.get("pubsub_token", "") except Exception: pass if not PUBSUB_TOKEN: @@ -1284,7 +1206,7 @@ def main(): args = parser.parse_args() if args.renew: - renew_session() + chatwoot_client.renew_session() return if args.test_ws: