From 980c09087312e638c753a966785c1a415c58791a Mon Sep 17 00:00:00 2001 From: Chatwoot AI Agent Dev Date: Sat, 6 Jun 2026 03:55:21 +0000 Subject: [PATCH] refactor: extract shared modules chatwoot_client.py + inboxes_io.py (v1.8) - chatwoot_client.py (364 lines): unified Chatwoot session auth + API calls - inboxes_io.py (122 lines): unified inboxes.json read/write/validate - provision_server.py: deleted 9 duplicate functions (-188 lines) - ws_agent: deleted 5 duplicate auth functions, removed import requests - All imports use sys.path.insert for cross-directory access - Zero behavior changes, pure DRY refactor --- chatwoot_client.py | 364 ++++++++++++++++++++++++++++++++++++++++++++ inboxes_io.py | 122 +++++++++++++++ provision_server.py | 230 +++++----------------------- 3 files changed, 528 insertions(+), 188 deletions(-) create mode 100644 chatwoot_client.py create mode 100644 inboxes_io.py diff --git a/chatwoot_client.py b/chatwoot_client.py new file mode 100644 index 0000000..ed48fd6 --- /dev/null +++ b/chatwoot_client.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +""" +Unified Chatwoot API Client — shared by WS Agent & Provision Server + +Provides: + - Session management (login, renew, load/save auth file) + - User session API calls (_call_cw, auto-renew on 401) + - Platform API calls (_call_internal) + - Password generation + +Usage: + import chatwoot_client + chatwoot_client.CW_AUTH_FILE = Path("...") + data = chatwoot_client._call_cw("GET", "/api/v1/accounts/1/conversations") +""" + +import json +import logging +import os +import secrets +import string +import time as _time +import urllib.error +import urllib.request +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +log = logging.getLogger("chatwoot_client") + +# ── Module-level config (env var defaults, overridable by callers) ── +CW_BASE = os.environ.get("CW_BASE", "http://localhost:3000") +CW_INTERNAL = os.environ.get("CW_INTERNAL", "http://chatwoot-chatwoot-1:3000") +CW_ACCOUNT_ID = int(os.environ.get("CW_ACCOUNT_ID", "1")) +CW_PLATFORM_TOKEN = os.environ.get("CW_PLATFORM_TOKEN", "") +CW_EMAIL = os.environ.get("CW_EMAIL") or os.environ.get("CW_ADMIN_EMAIL", "") +CW_PASSWORD = os.environ.get("CW_PASSWORD") or os.environ.get("CW_ADMIN_PASSWORD", "") + +AUTH_FILE_ENV = os.environ.get("CW_AUTH_FILE", "") +if AUTH_FILE_ENV: + CW_AUTH_FILE = Path(AUTH_FILE_ENV) +else: + # Fallback: try __file__ parent (works for both skills/ and repo), else cwd + _parent = Path(__file__).parent + if _parent.joinpath("chatwoot_auth.json").exists(): + CW_AUTH_FILE = _parent / "chatwoot_auth.json" + else: + CW_AUTH_FILE = Path("chatwoot_auth.json") + +CW_AUTH_FILE = CW_AUTH_FILE.resolve() + + +# ==================================================================== +# PASSWORD GENERATION +# ==================================================================== + +def _gen_password(length: int = 14) -> str: + """Generate a random secure password.""" + chars = string.ascii_letters + string.digits + '!@#$%^&*()_+-=' + return ''.join(secrets.choice(chars) for _ in range(length)) + + +# ==================================================================== +# SESSION MANAGEMENT (shared) +# ==================================================================== + +def load_auth() -> Optional[dict]: + """Load saved auth data from JSON file. Returns None if missing/invalid.""" + if CW_AUTH_FILE.exists(): + try: + data = json.loads(CW_AUTH_FILE.read_text(encoding="utf-8")) + if all(k in data for k in ("access-token", "client", "expiry", "uid")): + return data + except Exception as e: + log.warning("Failed to load auth file: %s", e) + return None + + +def save_auth(data: dict) -> None: + """Save auth data to JSON file.""" + CW_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True) + CW_AUTH_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") + log.info("Session saved to %s", CW_AUTH_FILE) + + +def get_headers() -> Optional[dict]: + """Return auth headers dict from saved session, or None.""" + auth = load_auth() + if auth: + return { + "access-token": auth.get("access-token"), + "client": auth.get("client"), + "expiry": auth.get("expiry"), + "uid": auth.get("uid"), + } + return None + + +def renew_session() -> Optional[dict]: + """Login to Chatwoot and save session. Returns auth data dict, or None on failure.""" + email = CW_EMAIL + password = CW_PASSWORD + if not email or not password: + log.error("CW_EMAIL/CW_PASSWORD (or CW_ADMIN_EMAIL/CW_ADMIN_PASSWORD) not set") + return None + + url = f"{CW_BASE}/auth/sign_in" + payload = json.dumps({"email": email, "password": password}).encode() + req = urllib.request.Request( + url, data=payload, + headers={"Content-Type": "application/json"}, + method="POST" + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + headers = {k.lower(): v for k, v in resp.headers.items()} + access_token = headers.get("access-token", "") + client = headers.get("client", "") + expiry = headers.get("expiry", "") + uid = headers.get("uid", "") + + if not all([access_token, client, expiry, uid]): + log.error("Login response missing required headers: %s", headers) + return None + + # Extract pubsub_token from body (ActionCable auth) + pubsub_token = "" + try: + body = json.loads(resp.read()) + pubsub_token = body.get("data", {}).get("pubsub_token", "") + except Exception: + pass + + data = { + "access-token": access_token, + "client": client, + "expiry": expiry, + "uid": uid, + "pubsub_token": pubsub_token, + "updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + } + save_auth(data) + log.info("Chatwoot session refreshed for %s (expiry=%s)", uid, expiry) + return data + except urllib.error.HTTPError as e: + body = e.read().decode() if e.fp else "" + log.error("Login HTTP %d: %s", e.code, body[:200]) + return None + except Exception as e: + log.error("Login error: %s", e) + return None + + +def ensure_session() -> Optional[dict]: + """Get valid session headers, auto-renew if missing/expired. + Returns headers dict, or None if no session available.""" + auth = load_auth() + now = _time.time() + if auth: + try: + expiry_ts = int(auth.get("expiry", "0")) + remaining = expiry_ts - now + if remaining > 3600: # >1h remaining, valid + return get_headers() + if remaining > 0: + log.info("Session expires in %ds, renewing", int(remaining)) + else: + log.info("Session expired %ds ago, renewing", -int(remaining)) + except (ValueError, TypeError): + pass + + new_auth = renew_session() + if new_auth: + return get_headers() + return None + + +# ==================================================================== +# PROVISION-SERVER COMPAT FUNCTIONS (returns headers WITH Content-Type) +# ==================================================================== + +def _relogin_chatwoot() -> dict: + """Login and return headers dict (includes Content-Type). + Raises RuntimeError on failure.""" + email = CW_EMAIL + password = CW_PASSWORD + if not email or not password: + raise RuntimeError("CW_EMAIL / CW_ADMIN_EMAIL and CW_PASSWORD / CW_ADMIN_PASSWORD must be set") + + url = f"{CW_BASE}/auth/sign_in" + payload = json.dumps({"email": email, "password": password}).encode() + req = urllib.request.Request( + url, data=payload, + headers={"Content-Type": "application/json"}, + method="POST" + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + h = {k.lower(): v for k, v in resp.headers.items()} + data = { + "access-token": h.get("access-token", ""), + "client": h.get("client", ""), + "expiry": h.get("expiry", ""), + "uid": h.get("uid", ""), + "updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + } + if not all([data["access-token"], data["client"], data["uid"]]): + raise RuntimeError(f"Login missing headers: {h}") + CW_AUTH_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") + log.info("Chatwoot session refreshed for %s", data["uid"]) + return { + "access-token": data["access-token"], + "client": data["client"], + "expiry": data["expiry"], + "uid": data["uid"], + "Content-Type": "application/json", + } + except urllib.error.HTTPError as e: + body = e.read().decode() if e.fp else "" + raise RuntimeError(f"Login HTTP {e.code}: {body[:200]}") + + +def _get_session_headers() -> dict: + """Load session from file, auto-renew if <1h remaining. + Returns header dict (includes Content-Type). + Raises RuntimeError if login fails.""" + if CW_AUTH_FILE.exists(): + try: + data = json.loads(CW_AUTH_FILE.read_text(encoding="utf-8")) + expiry = int(data.get("expiry", 0)) + if expiry - _time.time() < 3600: + log.info("Session < 1h (%ds left), renewing", expiry - int(_time.time())) + return _relogin_chatwoot() + if all([data.get("access-token"), data.get("client"), data.get("uid")]): + return { + "access-token": data["access-token"], + "client": data["client"], + "expiry": data["expiry"], + "uid": data["uid"], + "Content-Type": "application/json", + } + except Exception as e: + log.warning("Auth file read error: %s", e) + return _relogin_chatwoot() + + +# ==================================================================== +# API CALLS (sync, urllib) +# ==================================================================== + +def _call_cw(method: str, path: str, body: Optional[dict] = None, + retries: int = 3) -> dict: + """Call Chatwoot User API with session auth. + Auto-renew on 401. Returns parsed JSON dict. + Raises RuntimeError on failure.""" + url = f"{CW_BASE}{path}" + last_err = None + for attempt in range(retries): + headers = _get_session_headers() + data = json.dumps(body).encode() if body else None + req = urllib.request.Request(url, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + if e.code == 401 and attempt < retries - 1: + log.warning("401 on %s %s (attempt %d), re-login & retry", + method, path, attempt + 1) + CW_AUTH_FILE.unlink(missing_ok=True) if CW_AUTH_FILE.exists() else None + continue + body_text = e.read().decode() if e.fp else "" + raise RuntimeError(f"CW API error {e.code} on {method} {path}: {body_text}") + except Exception as e: + last_err = e + if attempt < retries - 1: + log.warning("Retry %d on %s %s: %s", attempt + 1, method, path, e) + continue + raise RuntimeError(f"CW API error on {method} {path}: {last_err}") + raise RuntimeError(f"Exhausted {retries} retries on {method} {path}: {last_err}") + + +def _call_internal(method: str, path: str, body: Optional[dict] = None, + extra_headers: Optional[dict] = None, + retries: int = 3) -> dict: + """Call Chatwoot Platform API (internal, with api_access_token). + Returns parsed JSON dict. Raises RuntimeError on failure.""" + url = f"{CW_INTERNAL}{path}" + for attempt in range(retries): + headers = {"Content-Type": "application/json"} + if extra_headers: + headers.update(extra_headers) + data = json.dumps(body).encode() if body else None + req = urllib.request.Request(url, data=data, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + if attempt < retries - 1: + log.warning("Internal API error %s on %s %s, retry %d", + e.code, method, path, attempt + 1) + continue + body_text = e.read().decode() if e.fp else "" + raise RuntimeError(f"Internal API error {e.code} on {method} {path}: {body_text}") + except Exception as e: + if attempt < retries - 1: + log.warning("Internal retry %d on %s %s: %s", attempt + 1, method, path, e) + continue + raise RuntimeError(f"Internal API error on {method} {path}: {e}") + raise RuntimeError(f"Exhausted {retries} retries on internal {method} {path}") + + +# ==================================================================== +# SHORTCUTS +# ==================================================================== + +def get_profile() -> Optional[dict]: + """Fetch /api/v1/profile to get current user info.""" + try: + return _call_cw("GET", "/api/v1/profile") + except RuntimeError as e: + log.error("Profile fetch failed: %s", e) + return None + + +# ==================================================================== +# CLI (for quick testing) +# ==================================================================== + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Chatwoot API Client CLI") + parser.add_argument("--renew", action="store_true", help="Force renew session") + parser.add_argument("--profile", action="store_true", help="Get current user profile") + parser.add_argument("--call", nargs=3, metavar=("METHOD", "PATH", "BODY"), + help="Make an API call (body is JSON string or '-')") + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") + + if args.renew: + data = renew_session() + if data: + print(f"Session renewed: {data['uid']} (expiry={data['expiry']})") + else: + print("Session renewal failed", file=sys.stderr) + sys.exit(1) + + if args.profile: + profile = get_profile() + if profile: + print(json.dumps(profile, ensure_ascii=False, indent=2)) + else: + print("Profile fetch failed", file=sys.stderr) + sys.exit(1) + + if args.call: + method, path, body_str = args.call + body = json.loads(str(body_str)) if body_str and body_str != "-" else None + try: + result = _call_cw(method, path, body=body) + print(json.dumps(result, ensure_ascii=False, indent=2)) + except RuntimeError as e: + print(f"API call failed: {e}", file=sys.stderr) + sys.exit(1) diff --git a/inboxes_io.py b/inboxes_io.py new file mode 100644 index 0000000..f1a4cdb --- /dev/null +++ b/inboxes_io.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +""" +Unified Inboxes Config IO — shared by WS Agent & Provision Server + +Provides stateless read/write/validate/construct primitives for +Chatwoot inbox routing configuration (inboxes.json). + +Usage: + import inboxes_io + cfg = inboxes_io.read_inboxes_raw(Path("inboxes.json")) + entry = inboxes_io.build_inbox_entry(...) + inboxes_io.write_inboxes(Path("inboxes.json"), cfg) +""" + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +log = logging.getLogger("inboxes_io") + +# ── Default _meta block ────────────────────────────────────────── +INBOXES_META = { + "_meta": { + "version": "1.2", + "updated_at": None, # filled at write time + "description": "Chatwoot WS Agent inbox routing config — hot-reloadable", + }, +} + +# ── Required field sets ────────────────────────────────────────── +REQUIRED_ENTRY_KEYS = ["name", "target_agent", "system_prompt", "prompt_template"] +TEMPLATE_PLACEHOLDERS = ["{sender_name}", "{customer_msg}"] + + +def validate_entry(config: dict) -> bool: + """Validate a single inbox config entry. + + Checks required keys exist and prompt_template contains + the mandatory placeholders. + """ + if not isinstance(config, dict): + log.warning("Config is not a dict: %s", type(config).__name__) + return False + for key in REQUIRED_ENTRY_KEYS: + if key not in config: + log.warning("Config missing required key '%s'", key) + return False + prompt = config.get("prompt_template", "") + for ph in TEMPLATE_PLACEHOLDERS: + if ph not in prompt: + log.warning("prompt_template missing placeholder %s", ph) + return False + return True + + +def read_inboxes_raw(path: Path) -> dict: + """Read inboxes.json from *path*. + + Returns the parsed dict, or a dict with just ``_meta`` if the + file is missing. Never returns ``None``. + """ + if path.exists(): + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception as e: + log.warning("Failed to read %s: %s", path, e) + meta = dict(INBOXES_META) + meta["_meta"] = dict(meta["_meta"]) + meta["_meta"]["updated_at"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + return meta + + +def write_inboxes(path: Path, config: dict) -> None: + """Write *config* (dict) to *path* as pretty-printed JSON.""" + path.parent.mkdir(parents=True, exist_ok=True) + # Always refresh _meta timestamp + if "_meta" not in config or not isinstance(config["_meta"], dict): + config["_meta"] = dict(INBOXES_META["_meta"]) + config["_meta"]["updated_at"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + path.write_text(json.dumps(config, ensure_ascii=False, indent=2), encoding="utf-8") + log.info("Inboxes config written to %s", path) + + +def ensure_agent_workspace(base_dir: Path, agent_id: str, name: str = "") -> Path: + """Create workspace directory for a QwenPaw agent. + + Returns the created Path. + """ + agent_dir = base_dir / agent_id + agent_dir.mkdir(parents=True, exist_ok=True) + return agent_dir + + +def build_inbox_entry(name: str, domain: str, channel: str, + inbox_id: int, inbox_token: str, agent_id: str) -> dict: + """Build an inbox config entry dict for *inboxes.json*. + + The entry includes a generic system_prompt and prompt_template + that callers can override after receiving the dict. + """ + return { + "name": name, + "type": channel, + "target_agent": agent_id, + "system_prompt": ( + f"You are a customer service agent for {name} ({domain}). " + f"Answer questions professionally in the customer's language. " + f"If you cannot fully resolve the issue, end with [HANDOFF]." + ), + "prompt_template": ( + "Customer '{sender_name}' sent this message:\n\n" + "{customer_msg}\n\n" + "Write a direct reply (no preamble, no markdown). " + "Keep it concise (2-4 sentences). " + "Use the same language as the customer." + ), + "note_prefix": f"\U0001f916 AI \u81ea\u52a8\u56de\u590d ({name})", + "signature": "", + "status": "active", + } diff --git a/provision_server.py b/provision_server.py index db52e02..e8e6992 100644 --- a/provision_server.py +++ b/provision_server.py @@ -12,19 +12,20 @@ Returns JSON for PHP to save to chathub_tenant table. import json import logging import os -import secrets import threading -import string import sys -import time import urllib.error -import urllib.request from datetime import datetime, timezone from pathlib import Path from typing import Optional import bottle +# ── Shared modules (same repo) ─────────────────────────────────── +# provision_server.py is in chatwoot-ai-agent/, so sibling import works +import chatwoot_client +import inboxes_io + # ── logging ────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, @@ -37,117 +38,19 @@ log = logging.getLogger("provision") WORKSPACE_DIR = Path("/app/working/workspaces") SCRIPT_DIR = WORKSPACE_DIR / "wordpress" / "skills" / "wordpress-cli" INBOXES_PATH = SCRIPT_DIR / "inboxes.json" -AUTH_FILE = SCRIPT_DIR / "chatwoot_auth.json" -# Chatwoot base config -CW_BASE = os.environ.get("CW_BASE", "http://localhost:3000") -CW_INTERNAL = os.environ.get("CW_INTERNAL", "http://chatwoot-chatwoot-1:3000") -CW_ACCOUNT_ID = int(os.environ.get("CW_ACCOUNT_ID", "1")) -CW_PLATFORM_TOKEN = os.environ.get("CW_PLATFORM_TOKEN", "") +# Point chatwoot_client at the same auth file +chatwoot_client.CW_AUTH_FILE = SCRIPT_DIR / "chatwoot_auth.json" +# Enable CW_ADMIN_EMAIL/PASSWORD env var reading (chatwoot_client reads both CW_EMAIL and CW_ADMIN_EMAIL) +chatwoot_client.CW_BASE = os.environ.get("CW_BASE", "http://localhost:3000") +chatwoot_client.CW_INTERNAL = os.environ.get("CW_INTERNAL", "http://chatwoot-chatwoot-1:3000") +chatwoot_client.CW_ACCOUNT_ID = int(os.environ.get("CW_ACCOUNT_ID", "1")) +chatwoot_client.CW_PLATFORM_TOKEN = os.environ.get("CW_PLATFORM_TOKEN", "") + # API key for provision/suspend/activate endpoints CHATHUB_API_KEY = os.environ.get("CHATHUB_API_KEY", "chathub-default-key-change-me") -def _gen_password(length: int = 14) -> str: - chars = string.ascii_letters + string.digits + '!@#$%^&*()_+-=' - return ''.join(secrets.choice(chars) for _ in range(length)) - - -def _relogin_chatwoot() -> dict: - email = os.environ.get("CW_ADMIN_EMAIL") - password = os.environ.get("CW_ADMIN_PASSWORD") - if not email: - raise RuntimeError("CW_ADMIN_EMAIL not set — cannot login") - if not password: - raise RuntimeError("CW_ADMIN_PASSWORD not set — cannot login") - url = f"{CW_BASE}/auth/sign_in" - payload = json.dumps({"email": email, "password": password}).encode() - req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"}, method="POST") - resp = urllib.request.urlopen(req, timeout=15) - headers = {k.lower(): v for k, v in resp.headers.items()} - data = { - "access-token": headers.get("access-token", ""), - "client": headers.get("client", ""), - "expiry": headers.get("expiry", ""), - "uid": headers.get("uid", ""), - "updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), - } - AUTH_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") - log.info("Chatwoot session refreshed for %s", data["uid"]) - return { - "access-token": data["access-token"], - "client": data["client"], - "expiry": data["expiry"], - "uid": data["uid"], - "Content-Type": "application/json", - } - - -def _get_session_headers() -> dict: - """Load session from file, auto-renew if expired or missing.""" - import time as _t - if AUTH_FILE.exists(): - data = json.loads(AUTH_FILE.read_text()) - expiry = int(data.get("expiry", 0)) - if expiry - _t.time() < 3600: - log.info("Session < 1h (%ds left), renewing", expiry - int(_t.time())) - return _relogin_chatwoot() - if all([data.get("access-token"), data.get("client"), data.get("uid")]): - return { - "access-token": data["access-token"], - "client": data["client"], - "expiry": data["expiry"], - "uid": data["uid"], - "Content-Type": "application/json", - } - return _relogin_chatwoot() - - -def _call_cw(method: str, path: str, body: Optional[dict], retries: int = 3) -> dict: - """Call Chatwoot API with retry on 401 (auto-renew session).""" - url = f"{CW_BASE}{path}" - last_err = None - for attempt in range(retries): - headers = _get_session_headers() - data = json.dumps(body).encode() if body else None - req = urllib.request.Request(url, data=data, headers=headers, method=method) - try: - with urllib.request.urlopen(req, timeout=30) as resp: - return json.loads(resp.read()) - except urllib.error.HTTPError as e: - if e.code == 401 and attempt < retries - 1: - log.warning(f"401 on {method} {path} (attempt {attempt+1}), re-login & retry") - AUTH_FILE.unlink(missing_ok=True) - continue - body_text = e.read().decode() if e.fp else "" - raise RuntimeError(f"CW API error {e.code} on {method} {path}: {body_text}") - raise RuntimeError(f"Exhausted {retries} retries on {method} {path}: {last_err}") - - -def _call_internal(method: str, path: str, - body: Optional[dict], - extra_headers: Optional[dict] = None, - retries: int = 3) -> dict: - """Call Chatwoot internal (platform) API with retry.""" - url = f"{CW_INTERNAL}{path}" - for attempt in range(retries): - headers = {"Content-Type": "application/json"} - if extra_headers: - headers.update(extra_headers) - data = json.dumps(body).encode() if body else None - req = urllib.request.Request(url, data=data, headers=headers, method=method) - try: - with urllib.request.urlopen(req, timeout=30) as resp: - return json.loads(resp.read()) - except urllib.error.HTTPError as e: - if attempt < retries - 1: - log.warning(f"Internal API error {e.code} on {method} {path}, retry {attempt+1}") - continue - body_text = e.read().decode() if e.fp else "" - raise RuntimeError(f"Internal API error {e.code} on {method} {path}: {body_text}") - raise RuntimeError(f"Exhausted {retries} retries on internal {method} {path}") - - def _check_api_key(): """Verify X-API-Key header matches CHATHUB_API_KEY.""" key = bottle.request.get_header("X-API-Key", "") @@ -159,33 +62,33 @@ def _check_api_key(): def _create_agent(email: str, name: str = "") -> dict: """Create a Chatwoot user + agent. Returns {agent_cw_id, password}. Rolls back (deletes user) if agent creation fails.""" - password = _gen_password() + password = chatwoot_client._gen_password() display_name = name or email.split("@")[0] - user = _call_internal( + user = chatwoot_client._call_internal( "POST", "/platform/api/v1/users", {"name": display_name, "email": email, "password": password}, - extra_headers={"api_access_token": CW_PLATFORM_TOKEN}, + extra_headers={"api_access_token": chatwoot_client.CW_PLATFORM_TOKEN}, ) if not user.get("id"): raise RuntimeError(f"Platform API user creation failed: {user}") uid = user["id"] try: - agent = _call_cw( + agent = chatwoot_client._call_cw( "POST", - f"/api/v1/accounts/{CW_ACCOUNT_ID}/agents", + f"/api/v1/accounts/{chatwoot_client.CW_ACCOUNT_ID}/agents", {"email": email, "name": display_name, "role": "agent"}, ) except Exception as e: # Rollback: delete the orphaned platform user try: - _call_internal( + chatwoot_client._call_internal( "DELETE", f"/platform/api/v1/users/{uid}", None, - extra_headers={"api_access_token": CW_PLATFORM_TOKEN}, + extra_headers={"api_access_token": chatwoot_client.CW_PLATFORM_TOKEN}, ) except Exception: pass @@ -200,9 +103,9 @@ def _create_agent(email: str, name: str = "") -> dict: def _add_agent_to_team(agent_cw_id: int, team_id: int) -> None: """Assign agent to team.""" try: - _call_cw( + chatwoot_client._call_cw( "POST", - f"/api/v1/accounts/{CW_ACCOUNT_ID}/teams/{team_id}/team_members", + f"/api/v1/accounts/{chatwoot_client.CW_ACCOUNT_ID}/teams/{team_id}/team_members", {"user_ids": [agent_cw_id]}, ) except urllib.error.HTTPError as e: @@ -212,56 +115,6 @@ def _add_agent_to_team(agent_cw_id: int, team_id: int) -> None: log.warning("add_agent_to_team failed: agent=%s team=%s err=%s", agent_cw_id, team_id, e) -def _read_inboxes() -> dict: - if INBOXES_PATH.exists(): - return json.loads(INBOXES_PATH.read_text(encoding="utf-8")) - return { - "_meta": { - "version": "1.1", - "updated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), - "description": "Chatwoot WS Agent inbox routing config \u2014 hot-reloadable", - }, - } - - -def _write_inboxes(config: dict) -> None: - INBOXES_PATH.parent.mkdir(parents=True, exist_ok=True) - INBOXES_PATH.write_text( - json.dumps(config, ensure_ascii=False, indent=2), - encoding="utf-8", - ) - - -def _ensure_agent_workspace(agent_id: str, name: str) -> Path: - agent_dir = WORKSPACE_DIR / agent_id - agent_dir.mkdir(parents=True, exist_ok=True) - return agent_dir - - -def _build_inbox_entry(name: str, domain: str, channel: str, - inbox_id: int, inbox_token: str, agent_id: str) -> dict: - return { - "name": name, - "type": channel, - "target_agent": agent_id, - "system_prompt": ( - f"You are a customer service agent for {name} ({domain}). " - f"Answer questions professionally in the customer's language. " - f"If you cannot fully resolve the issue, end with [HANDOFF]." - ), - "prompt_template": ( - "Customer '{sender_name}' sent this message:\n\n" - "{customer_msg}\n\n" - "Write a direct reply (no preamble, no markdown). " - "Keep it concise (2-4 sentences). " - "Use the same language as the customer." - ), - "note_prefix": f"\U0001f916 AI \u81ea\u52a8\u56de\u590d ({name})", - "signature": "", - "status": "active", - } - - # ── Idempotency store ──────────────────────────────────────────── # Stores {key: response_dict} with 5-minute TTL. _IDEMPOTENT_RESULTS: dict[str, dict] = {} @@ -358,9 +211,9 @@ def provision(): agent_info = _create_agent(email, name) # 2. Create team - team = _call_cw( + team = chatwoot_client._call_cw( "POST", - f"/api/v1/accounts/{CW_ACCOUNT_ID}/teams", + f"/api/v1/accounts/{chatwoot_client.CW_ACCOUNT_ID}/teams", { "name": f"{name} 客服团队", "description": f"{name} 的专属客服团队(限制 {max_agents} 席)", @@ -382,9 +235,9 @@ def provision(): "welcome_tagline": f"您好!欢迎访问 {name},请问有什么可以帮您?", }, } - inbox = _call_cw( + inbox = chatwoot_client._call_cw( "POST", - f"/api/v1/accounts/{CW_ACCOUNT_ID}/inboxes", + f"/api/v1/accounts/{chatwoot_client.CW_ACCOUNT_ID}/inboxes", inbox_payload, ) @@ -392,11 +245,12 @@ def provision(): inbox_token = inbox.get("access_token", "") inbox_name = inbox.get("name", name) website_token = inbox.get("website_token", "") or inbox.get("access_token", "") + _emb_cw_base = chatwoot_client.CW_BASE embed_code = ( f"