v1.7: session-id mapping + conversation summary + contact profiling + exponential backoff reconnect
This commit is contained in:
+194
-29
@@ -484,6 +484,16 @@ def save_state():
|
||||
"ai_pending_convs": list(_ai_pending_convs),
|
||||
"saved_at": time.time()
|
||||
}
|
||||
# Add session map
|
||||
with _conv_session_lock:
|
||||
state["conv_session_map"] = _conv_session_map.copy()
|
||||
# Add summaries & rounds
|
||||
with _conv_conv_lock:
|
||||
state["conv_summaries"] = _conv_summaries.copy()
|
||||
state["conv_rounds"] = _conv_rounds.copy()
|
||||
# Add contact profiles
|
||||
with _contact_profile_lock:
|
||||
state["contact_profiles"] = _contact_profiles.copy()
|
||||
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False), encoding="utf-8")
|
||||
except Exception as e:
|
||||
log(f"save_state error: {e}", "WARN")
|
||||
@@ -511,8 +521,20 @@ def load_state():
|
||||
human_active_convs = state.get("human_active_convs", {})
|
||||
with _ai_pending_lock:
|
||||
_ai_pending_convs = set(state.get("ai_pending_convs", []))
|
||||
# Restore session map
|
||||
with _conv_session_lock:
|
||||
_conv_session_map.update(state.get("conv_session_map", {}))
|
||||
# Restore summaries & rounds
|
||||
with _conv_conv_lock:
|
||||
_conv_summaries.update(state.get("conv_summaries", {}))
|
||||
_conv_rounds.update(state.get("conv_rounds", {}))
|
||||
# Restore contact profiles
|
||||
with _contact_profile_lock:
|
||||
_contact_profiles.update(state.get("contact_profiles", {}))
|
||||
|
||||
log(f"State restored: {len(ai_sent_msg_ids)} msg_ids, {len(human_active_convs)} active convs, {len(_ai_pending_convs)} pending")
|
||||
log(f"State restored: {len(ai_sent_msg_ids)} msg_ids, {len(human_active_convs)} active convs, "
|
||||
f"{len(_conv_session_map)} sessions, {len(_conv_summaries)} summaries, "
|
||||
f"{len(_contact_profiles)} profiles")
|
||||
except Exception as e:
|
||||
log(f"load_state error: {e} (starting fresh)", "WARN")
|
||||
STATE_FILE.unlink(missing_ok=True)
|
||||
@@ -527,13 +549,105 @@ _debounce_msgs: dict[int, list[str]] = {} # conv_id → accumulated messages
|
||||
_debounce_lock = Lock()
|
||||
|
||||
|
||||
# ===== SESSION-ID MAPPING (per-conversation QwenPaw context) =====
|
||||
_conv_session_map: dict[int, str] = {} # conv_id → qwenpaw session_id
|
||||
_conv_session_lock = Lock()
|
||||
|
||||
|
||||
def _get_or_create_session(conv_id):
|
||||
"""Return existing session_id for a conversation, or create a new one."""
|
||||
with _conv_session_lock:
|
||||
sid = _conv_session_map.get(conv_id)
|
||||
if not sid:
|
||||
sid = f"conv_{conv_id}_{int(time.time())}"
|
||||
_conv_session_map[conv_id] = sid
|
||||
return sid
|
||||
|
||||
|
||||
def _prune_sessions():
|
||||
"""Trim session map when it grows too large."""
|
||||
with _conv_session_lock:
|
||||
if len(_conv_session_map) > 10000:
|
||||
_conv_session_map.clear()
|
||||
log("🧹 Session map too large, cleared", "WARN")
|
||||
|
||||
|
||||
# ===== CONVERSATION SUMMARIZATION =====
|
||||
_conv_rounds: dict[int, int] = {} # conv_id → AI reply count
|
||||
_conv_summaries: dict[int, str] = {} # conv_id → latest AI-generated summary
|
||||
_conv_conv_lock = Lock()
|
||||
SUMMARY_THRESHOLD = 15 # generate summary after every N AI replies
|
||||
|
||||
|
||||
def _summarize_conversation(conv_id, prompt, reply, target_agent):
|
||||
"""Ask AI to produce a short summary of this conversation so far."""
|
||||
with _conv_conv_lock:
|
||||
old_summary = _conv_summaries.get(conv_id, "")
|
||||
summary_prompt = (
|
||||
f"Existing summary: {old_summary}\n\n"
|
||||
f"Latest customer message: {prompt[:200]}\n"
|
||||
f"Your reply: {reply[:500]}\n\n"
|
||||
"Produce a concise 1-2 sentence summary (in English) of the "
|
||||
"entire conversation so far. Focus on customer needs, products "
|
||||
"discussed, and any decisions made. Keep it under 100 words."
|
||||
)
|
||||
summary = call_qwenpaw_ai(summary_prompt, target_agent=target_agent)
|
||||
if summary:
|
||||
with _conv_conv_lock:
|
||||
_conv_summaries[conv_id] = summary
|
||||
|
||||
|
||||
def _get_conversation_context(conv_id):
|
||||
"""Return summary string for this conv, or empty string."""
|
||||
with _conv_conv_lock:
|
||||
return _conv_summaries.get(conv_id, "")
|
||||
|
||||
|
||||
# ===== CONTACT PROFILING =====
|
||||
_contact_profiles: dict[int, dict] = {} # contact_id → profile dict
|
||||
_contact_profile_lock = Lock()
|
||||
|
||||
|
||||
def _update_contact_profile(contact_id, sender_name, msg, reply):
|
||||
"""Extract relevant info from conversation and update contact profile."""
|
||||
if not contact_id:
|
||||
return
|
||||
with _contact_profile_lock:
|
||||
profile = _contact_profiles.get(contact_id, {})
|
||||
profile["name"] = sender_name
|
||||
profile["last_seen"] = time.time()
|
||||
# Keep a rolling log of last 3 interactions
|
||||
interactions = profile.get("interactions", [])
|
||||
interactions.append({"msg": msg[:100], "reply": reply[:100], "ts": time.time()})
|
||||
profile["interactions"] = interactions[-3:]
|
||||
_contact_profiles[contact_id] = profile
|
||||
|
||||
|
||||
def _get_contact_context(contact_id):
|
||||
"""Return a prompt-friendly profile string for this contact."""
|
||||
if not contact_id:
|
||||
return ""
|
||||
with _contact_profile_lock:
|
||||
profile = _contact_profiles.get(contact_id)
|
||||
if not profile:
|
||||
return ""
|
||||
parts = [f"Contact name: {profile.get('name', 'Unknown')}"]
|
||||
interactions = profile.get("interactions", [])
|
||||
if interactions:
|
||||
parts.append(f"Last {len(interactions)} interaction(s):")
|
||||
for i, ix in enumerate(interactions):
|
||||
parts.append(f" [{i+1}] \"{ix['msg']}\" → \"{ix['reply'][:80]}\"")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
# ===== AI HELPERS =====
|
||||
|
||||
import re as _re # used for session-id stripping
|
||||
|
||||
GATEWAY_ENABLED = os.environ.get("GATEWAY_ENABLED", "1") == "1"
|
||||
|
||||
def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", retries=2):
|
||||
def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent",
|
||||
retries=2, session_id=None):
|
||||
"""Call QwenPaw AI via agents chat, with retries on failure.
|
||||
|
||||
Args:
|
||||
@@ -541,18 +655,19 @@ def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", r
|
||||
system_prompt: Optional system instructions (appended before prompt)
|
||||
target_agent: Which agent to call (default:sourcing-agent, translation:default)
|
||||
retries: Number of retries on failure (exponential backoff)
|
||||
session_id: Optional QwenPaw session ID for conversation continuity
|
||||
"""
|
||||
full_prompt = (system_prompt + "\n\n" + prompt) if system_prompt else prompt
|
||||
cmd = ["qwenpaw", "agents", "chat",
|
||||
"--from-agent", "wordpress",
|
||||
"--to-agent", target_agent,
|
||||
"--text", full_prompt]
|
||||
if session_id:
|
||||
cmd.extend(["--session-id", session_id])
|
||||
last_error = None
|
||||
for attempt in range(1 + retries):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["qwenpaw", "agents", "chat",
|
||||
"--from-agent", "wordpress",
|
||||
"--to-agent", target_agent,
|
||||
"--text", full_prompt],
|
||||
capture_output=True, text=True, timeout=90
|
||||
)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=90)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
# Clean output: remove INFO/SESSION/Agent lines
|
||||
raw = result.stdout.strip()
|
||||
@@ -594,7 +709,7 @@ def translate_to_chinese(text):
|
||||
result = call_qwenpaw_ai(prompt, target_agent="default")
|
||||
return result if result else text
|
||||
|
||||
def generate_ai_reply(customer_msg, sender_name, inbox_id, context=""):
|
||||
def generate_ai_reply(customer_msg, sender_name, inbox_id, context="", session_id=None):
|
||||
config = INBOX_CONFIG.get(inbox_id)
|
||||
if not config:
|
||||
log(f"No config for inbox #{inbox_id}, skipping", "WARN")
|
||||
@@ -606,7 +721,8 @@ def generate_ai_reply(customer_msg, sender_name, inbox_id, context=""):
|
||||
customer_msg=body
|
||||
)
|
||||
reply = call_qwenpaw_ai(prompt, config["system_prompt"],
|
||||
target_agent=config["target_agent"])
|
||||
target_agent=config["target_agent"],
|
||||
session_id=session_id)
|
||||
|
||||
if not reply:
|
||||
return reply
|
||||
@@ -826,12 +942,47 @@ def handle_incoming_message(msg_data, headers):
|
||||
|
||||
log(f"📩 [{config['name']}] New msg #{msg_id} from '{sender_name}' in conv #{conv_id}: {content[:100]}", inbox_name=config['name'])
|
||||
|
||||
# ===== SESSION-ID: maintain conversation continuity =====
|
||||
session_id = _get_or_create_session(conv_id)
|
||||
# ===== CONVERSATION SUMMARY: inject past context =====
|
||||
conv_context = _get_conversation_context(conv_id)
|
||||
# ===== CONTACT PROFILE: inject customer history =====
|
||||
contact_id = msg_data.get("sender", {}).get("id")
|
||||
contact_context = _get_contact_context(contact_id)
|
||||
|
||||
# Build enriched context
|
||||
context_parts = []
|
||||
gateway_ctx = _enrich_context(inbox_id, content, config)
|
||||
if gateway_ctx:
|
||||
context_parts.append(gateway_ctx)
|
||||
if conv_context:
|
||||
context_parts.append(f"[Conversation so far: {conv_context}]")
|
||||
if contact_context:
|
||||
context_parts.append(f"[Customer history: {contact_context}]")
|
||||
all_context = "\n".join(context_parts)
|
||||
|
||||
# Generate AI reply with inbox-specific config
|
||||
log(f"🤖 [{config['name']}] Generating AI reply...", inbox_name=config['name'])
|
||||
start_time = time.time()
|
||||
context = _enrich_context(inbox_id, content, config)
|
||||
reply = generate_ai_reply(content, sender_name, inbox_id, context=context)
|
||||
reply = generate_ai_reply(content, sender_name, inbox_id,
|
||||
context=all_context, session_id=session_id)
|
||||
duration_ms = (time.time() - start_time) * 1000
|
||||
|
||||
if not reply:
|
||||
log(f"⚠️ [{config['name']}] Empty AI reply, skipping", "WARN", config['name'])
|
||||
metrics.record_reply(inbox_id, config['name'], False, duration_ms)
|
||||
return
|
||||
|
||||
log(f"💬 [{config['name']}] AI reply ({duration_ms:.0f}ms): {reply[:150]}", inbox_name=config['name'])
|
||||
|
||||
# ===== POST-REPLY: update conversation summary & contact profile =====
|
||||
with _conv_conv_lock:
|
||||
rounds = _conv_rounds.get(conv_id, 0) + 1
|
||||
_conv_rounds[conv_id] = rounds
|
||||
if rounds > 0 and rounds % SUMMARY_THRESHOLD == 0:
|
||||
# Generate summary every SUMMARY_THRESHOLD AI replies
|
||||
_summarize_conversation(conv_id, content, reply, config["target_agent"])
|
||||
_update_contact_profile(contact_id, sender_name, content, reply)
|
||||
|
||||
if not reply:
|
||||
log(f"⚠️ [{config['name']}] Empty AI reply, skipping", "WARN", config['name'])
|
||||
@@ -1021,25 +1172,39 @@ class WSAgent:
|
||||
metrics.ws_disconnected(f"status={status}, msg={msg}")
|
||||
|
||||
def start(self):
|
||||
"""Start WebSocket connection (blocking)."""
|
||||
"""Start WebSocket connection with exponential backoff reconnect."""
|
||||
self.headers = ensure_session()
|
||||
log(f"Connecting to {CW_WS_URL}")
|
||||
delay = 5 # start at 5s
|
||||
max_delay = 60
|
||||
attempt = 0
|
||||
|
||||
self.ws = websocket.WebSocketApp(
|
||||
CW_WS_URL,
|
||||
on_open=self.on_open,
|
||||
on_message=self.on_message,
|
||||
on_error=self.on_error,
|
||||
on_close=self.on_close
|
||||
)
|
||||
while self.running.is_set():
|
||||
if attempt > 0:
|
||||
log(f"🔄 Reconnecting in {delay}s (attempt #{attempt})...")
|
||||
time.sleep(delay)
|
||||
delay = min(delay * 2, max_delay) # 5 → 10 → 20 → 40 → 60
|
||||
# Re-fetch session headers in case they expired during downtime
|
||||
try:
|
||||
self.headers = ensure_session()
|
||||
except Exception as e:
|
||||
log(f"Session renewal failed before reconnect: {e}", "WARN")
|
||||
continue
|
||||
|
||||
# Run forever (blocks)
|
||||
self.ws.run_forever(
|
||||
sslopt={"cert_reqs": ssl.CERT_NONE},
|
||||
ping_interval=15,
|
||||
ping_timeout=5,
|
||||
reconnect=5 # built-in reconnect
|
||||
)
|
||||
log(f"Connecting to {CW_WS_URL}")
|
||||
self.ws = websocket.WebSocketApp(
|
||||
CW_WS_URL,
|
||||
on_open=self.on_open,
|
||||
on_message=self.on_message,
|
||||
on_error=self.on_error,
|
||||
on_close=self.on_close
|
||||
)
|
||||
self.ws.run_forever(
|
||||
sslopt={"cert_reqs": ssl.CERT_NONE},
|
||||
ping_interval=15,
|
||||
ping_timeout=5,
|
||||
reconnect=0 # disable internal reconnect; we handle it
|
||||
)
|
||||
attempt += 1
|
||||
|
||||
def stop(self):
|
||||
"""Stop the WebSocket connection."""
|
||||
|
||||
Reference in New Issue
Block a user