v1.4: 多租户开通 + 安全性重构 + 数据脱敏

新增:
- provision_server.py HTTP API 服务 (Bottle, 端口 5566)
- 状态持久化 (JSON, 每30秒保存, 1小时内可恢复)
- 会议室模式 (开发团队 Inbox 多 AI 路由)
- supervisor 托管, SIGTERM 优雅退出
- PUBSUB_TOKEN 三级 fallback

修复:
- 所有硬编码凭证清除 (CW_EMAIL/CW_PASSWORD 无 fallback)
- 双重 WebSocket 重连
- 内存泄漏 (无界 Set 清理)
- INBOX_CONFIG 兜底 (skip+log 不崩溃)
- PID 文件竞争, Metrics 热路径优化
- 幂等性正确实现 (存真实响应含 HTTP 状态码)

安全:
- 完整数据脱敏 (无 URL/邮箱/密码/token 硬编码)
- .env.example / chatwoot_auth.example.json / inboxes.example.json
This commit is contained in:
Chatwoot AI Agent Dev
2026-06-04 12:56:11 +00:00
parent 504b9b2e40
commit d0b20a0e14
17 changed files with 1375 additions and 868 deletions
+163 -35
View File
@@ -30,7 +30,7 @@ from threading import Thread, Event, Lock
from collections import defaultdict
# ===== CONFIG =====
CW_BASE = "https://chatwoot.275763.xyz"
CW_BASE = os.environ.get("CW_BASE", "http://localhost:3000")
CW_WS_URL = CW_BASE.replace("https://", "wss://").replace("http://", "ws://") + "/cable"
CW_ACCOUNT_ID = 1
CW_AUTH_URL = f"{CW_BASE}/auth/sign_in"
@@ -40,6 +40,7 @@ SCRIPT_DIR = Path(__file__).parent
AUTH_FILE = SCRIPT_DIR / "chatwoot_auth.json"
PROCESSED_FILE = SCRIPT_DIR / ".chatwoot_ws_processed.json"
METRICS_FILE = SCRIPT_DIR / ".chatwoot_ws_metrics.json"
STATE_FILE = SCRIPT_DIR / ".chatwoot_ws_state.json"
# ===== INBOX ROUTING CONFIG (hot-reloadable from inboxes.json) =====
INBOX_CONFIG_FILE = Path(os.environ.get(
@@ -74,12 +75,18 @@ DEFAULT_INBOX_CONFIG = {
}
def _validate_config(config):
"""Validate inbox config structure."""
"""Validate inbox config structure and required placeholders."""
required_keys = ["name", "target_agent", "system_prompt", "prompt_template"]
template_placeholders = ["{sender_name}", "{customer_msg}"]
if not isinstance(config, dict):
return False
for key in required_keys:
if key not in config:
log(f"Config missing required key '{key}'", "WARN")
return False
for ph in template_placeholders:
if ph not in config.get("prompt_template", ""):
log(f"prompt_template missing placeholder {ph}", "WARN")
return False
return True
@@ -134,13 +141,11 @@ def _load_inboxes_config():
if not INBOX_CONFIG:
INBOX_CONFIG = DEFAULT_INBOX_CONFIG.copy()
# Agent identity (from /api/v1/profile response)
PUBSUB_TOKEN = "JQ3wQYDy6LUMwvHouKKV2scr"
USER_ID = 1
# Login credentials for auto-renewal
CW_EMAIL = os.environ.get("CW_EMAIL", "qiuzhida@greatqiu.cn")
CW_PASSWORD = os.environ.get("CW_PASSWORD", "Qaly8980+")
CW_EMAIL = os.environ.get("CW_EMAIL")
CW_PASSWORD = os.environ.get("CW_PASSWORD")
# Agent identity (from /api/v1/profile response)
RENEW_THRESHOLD = timedelta(hours=6)
TZ = timezone(timedelta(hours=8))
@@ -160,12 +165,13 @@ def log(msg, level="INFO", inbox_name=None):
# ===== MONITORING & METRICS =====
class Metrics:
"""Track performance metrics per inbox."""
"""Track performance metrics per inbox. Saves to disk every 30s to avoid hot-path I/O."""
def __init__(self, filepath):
self.filepath = Path(filepath)
self.lock = Lock()
self.data = self._load()
self._dirty = False
def _load(self):
if self.filepath.exists():
@@ -182,15 +188,24 @@ class Metrics:
}
def _save(self):
"""Write to disk only if dirty."""
if not self._dirty:
return
try:
self.filepath.write_text(json.dumps(self.data, indent=2))
self._dirty = False
except Exception as e:
log(f"Failed to save metrics: {e}", "WARN")
def flush(self):
"""Force write to disk (called on SIGTERM / periodic flush)."""
with self.lock:
self._save()
def ws_connected(self):
with self.lock:
self.data["ws_connected"] = True
self._save()
self._dirty = True
def ws_disconnected(self, reason="unknown"):
with self.lock:
@@ -200,7 +215,7 @@ class Metrics:
"time": datetime.now(TZ).isoformat(),
"reason": reason
}
self._save()
self._dirty = True
log(f"⚠️ WebSocket disconnected: {reason}", "WARN")
def record_reply(self, inbox_id, inbox_name, success, duration_ms):
@@ -226,7 +241,7 @@ class Metrics:
inbox["total_duration_ms"] += duration_ms
inbox["avg_duration_ms"] = inbox["total_duration_ms"] / inbox["total_requests"]
inbox["last_reply"] = datetime.now(TZ).isoformat()
self._save()
self._dirty = True
def get_summary(self):
with self.lock:
@@ -281,12 +296,7 @@ def get_headers():
"expiry": auth.get("expiry"),
"uid": auth.get("uid"),
}
return {
"access-token": "uueUhS5OBWOeabNdleUa8w",
"client": "4xu1KgEP3RzNoM86hAkeCg",
"expiry": "1785135457",
"uid": "qiuzhida@greatqiu.cn",
}
return None
def renew_session():
log("Renewing Chatwoot session...")
@@ -302,7 +312,14 @@ def renew_session():
if not all([access_token, client, expiry, uid]):
log(f"Missing headers: {dict(r.headers)}")
return None
# Extract pubsub_token from response body (ActionCable auth)
pubsub_token = ""
try:
pubsub_token = r.json().get("data", {}).get("pubsub_token", "")
except Exception:
pass
data = {"access-token": access_token, "client": client, "expiry": expiry, "uid": uid,
"pubsub_token": pubsub_token,
"updated_at": datetime.now(TZ).isoformat()}
save_auth(data)
exp_time = datetime.fromtimestamp(int(expiry))
@@ -333,8 +350,10 @@ def ensure_session():
"expiry": new_auth["expiry"],
"uid": new_auth["uid"],
}
log("WARNING: Using fallback headers")
return get_headers()
raise RuntimeError(
"No Chatwoot session available. "
"Ensure 'chatwoot_auth.json' exists or CW_EMAIL/CW_PASSWORD env vars are set."
)
# ===== PROCESSED MESSAGE TRACKING =====
@@ -351,6 +370,16 @@ def save_processed(ids):
processed_ids = load_processed()
processed_lock = Lock()
MAX_PROCESSED_IDS = 10000
def prune_processed_ids():
"""Keep processed_ids from growing unbounded."""
global processed_ids
with processed_lock:
if len(processed_ids) > MAX_PROCESSED_IDS:
sorted_ids = sorted(processed_ids, reverse=True)
processed_ids = set(sorted_ids[:MAX_PROCESSED_IDS // 2])
save_processed(processed_ids)
def is_processed(msg_id):
with processed_lock:
@@ -366,9 +395,18 @@ def mark_processed(msg_id):
# Track message IDs that OUR AI sent via the API.
# This lets us distinguish our own AI replies from human agent messages
# when receiving events via WebSocket (since both use sender_type="User").
MAX_AI_SENT_IDS = 10000
ai_sent_msg_ids = set()
ai_sent_lock = Lock()
def prune_ai_sent_ids():
"""Keep ai_sent_msg_ids from growing unbounded."""
global ai_sent_msg_ids
with ai_sent_lock:
if len(ai_sent_msg_ids) > MAX_AI_SENT_IDS:
sorted_ids = sorted(ai_sent_msg_ids, reverse=True)
ai_sent_msg_ids = set(sorted_ids[:MAX_AI_SENT_IDS // 2])
# Track conversation IDs where we just sent a message (race condition safety net)
# Entries are added BEFORE API call, removed AFTER tracking the real message ID.
_ai_pending_convs = set()
@@ -433,6 +471,53 @@ def clean_expired_human_active():
del human_active_convs[cid]
log(f"⏱️ Conv #{cid} human timeout (cleanup), AI resuming")
# ===== STATE PERSISTENCE (survive restart) =====
def save_state():
"""Persist ai_sent_msg_ids, human_active_convs, _ai_pending_convs to JSON."""
try:
with ai_sent_lock, human_active_lock, _ai_pending_lock:
state = {
"ai_sent_msg_ids": list(ai_sent_msg_ids),
"human_active_convs": human_active_convs.copy(),
"ai_pending_convs": list(_ai_pending_convs),
"saved_at": time.time()
}
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
except Exception as e:
log(f"save_state error: {e}", "WARN")
def load_state():
"""Restore state from JSON. Safety-first: only restore if file is recent (< 1h old)."""
global ai_sent_msg_ids, human_active_convs, _ai_pending_convs
try:
if not STATE_FILE.exists():
log("No state file found, starting fresh")
return
state = json.loads(STATE_FILE.read_text(encoding="utf-8"))
saved_at = state.get("saved_at", 0)
age = time.time() - saved_at
if age > 3600:
log(f"State file too old ({age/60:.0f}min), ignoring — safety first", "WARN")
STATE_FILE.unlink(missing_ok=True)
return
with ai_sent_lock:
ai_sent_msg_ids = set(state.get("ai_sent_msg_ids", []))
with human_active_lock:
human_active_convs = state.get("human_active_convs", {})
with _ai_pending_lock:
_ai_pending_convs = set(state.get("ai_pending_convs", []))
log(f"State restored: {len(ai_sent_msg_ids)} msg_ids, {len(human_active_convs)} active convs, {len(_ai_pending_convs)} pending")
except Exception as e:
log(f"load_state error: {e} (starting fresh)", "WARN")
STATE_FILE.unlink(missing_ok=True)
# ===== AI HELPERS =====
import re as _re # used for session-id stripping
@@ -499,8 +584,8 @@ def generate_ai_reply(customer_msg, sender_name, inbox_id):
"""
config = INBOX_CONFIG.get(inbox_id)
if not config:
log(f"No config for inbox #{inbox_id}, falling back to sourcing-agent")
config = INBOX_CONFIG[1]
log(f"No config for inbox #{inbox_id}, skipping", "WARN")
return None
prompt = config["prompt_template"].format(
sender_name=sender_name,
@@ -752,7 +837,6 @@ class WSAgent:
self.headers = None
self.running = Event()
self.running.set()
self.reconnect_delay = 1
self.last_pong = time.time()
def on_open(self, ws):
@@ -786,7 +870,6 @@ class WSAgent:
if t == "confirm_subscription":
log("✅ RoomChannel subscription confirmed")
self.reconnect_delay = 1
return
if t == "reject_subscription":
@@ -879,15 +962,6 @@ class WSAgent:
def on_close(self, ws, status, msg):
log(f"🔴 WebSocket closed (status={status}, msg={msg})", "WARN")
metrics.ws_disconnected(f"status={status}, msg={msg}")
if self.running.is_set():
self._reconnect()
def _reconnect(self):
delay = min(self.reconnect_delay, 60)
log(f"Reconnecting in {delay}s...")
time.sleep(delay)
self.reconnect_delay = min(delay * 2, 60)
self.start()
def start(self):
"""Start WebSocket connection (blocking)."""
@@ -920,18 +994,58 @@ class WSAgent:
# ===== TIMEOUT CHECKER THREAD =====
def timeout_checker_loop():
"""Background thread: clean expired handoffs + hot-reload config."""
"""Background thread: clean expired handoffs + hot-reload config + persist state."""
while True:
time.sleep(30) # Check every 30s
try:
clean_expired_human_active()
prune_ai_sent_ids()
prune_processed_ids()
_load_inboxes_config() # hot-reload if file changed
save_state() # persist state every 30s
metrics.flush() # persist metrics every 30s
except Exception as e:
log(f"Timeout checker error: {e}")
PUBSUB_TOKEN = os.environ.get("CW_PUBSUB_TOKEN", "")
USER_ID = int(os.environ.get("CW_USER_ID", "1"))
if not PUBSUB_TOKEN:
# Fallback: read directly from auth file
_auth_path = Path(__file__).parent / "chatwoot_auth.json"
if _auth_path.exists():
try:
_auth_data = json.loads(_auth_path.read_text())
PUBSUB_TOKEN = _auth_data.get("pubsub_token", "")
except Exception:
pass
if not PUBSUB_TOKEN:
# Last resort: try renewing session to get pubsub_token from login response
log("PUBSUB_TOKEN not set, attempting session renewal to obtain it...", "WARN")
_new_auth = None
try:
_r = requests.post(
CW_AUTH_URL,
json={"email": CW_EMAIL, "password": CW_PASSWORD},
timeout=15
)
if _r.status_code == 200:
_body = _r.json()
PUBSUB_TOKEN = _body.get("data", {}).get("pubsub_token", "")
except Exception:
pass
if not PUBSUB_TOKEN:
raise RuntimeError(
"Missing Chatwoot pubsub token. "
"Set CW_PUBSUB_TOKEN env var or include 'pubsub_token' in chatwoot_auth.json"
)
# ===== MAIN =====
def main():
# Restore persisted state BEFORE handling any args (--health etc. need it)
load_state()
parser = argparse.ArgumentParser(description="Chatwoot WebSocket AI Agent")
parser.add_argument("--renew", action="store_true", help="Force renew session & exit")
parser.add_argument("--test-ws", action="store_true", help="Test WebSocket connection & exit")
@@ -995,7 +1109,7 @@ def main():
"ws_last_disconnect": None,
"inboxes": {}
}
metrics._save()
metrics.flush()
print("Metrics reset successfully")
return
@@ -1084,12 +1198,26 @@ def main():
# Start the main agent
agent = WSAgent()
def _shutdown(signum, frame):
log(f"Received signal {signum}, shutting down gracefully...")
agent.stop()
save_state()
metrics.flush()
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
try:
agent.start()
except KeyboardInterrupt:
log("Shutting down...")
agent.stop()
save_state()
metrics.flush()
if __name__ == "__main__":
main()