v1.5: debounce (5s coalesce) + AI retry (exp backoff)

This commit is contained in:
hanmolabiqiu
2026-06-05 12:28:09 +08:00
parent d0b20a0e14
commit d22380b252
+90 -19
View File
@@ -518,19 +518,33 @@ def load_state():
STATE_FILE.unlink(missing_ok=True)
# ===== DEBOUNCE (message coalescing) =====
# Accumulate messages from same conv within 5s window.
# Only process the last one with all accumulated context.
DEBOUNCE_WINDOW = 5 # seconds
_debounce_timers: dict[int, float] = {} # conv_id → last message time
_debounce_msgs: dict[int, list[str]] = {} # conv_id → accumulated messages
_debounce_lock = Lock()
# ===== AI HELPERS =====
import re as _re # used for session-id stripping
def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent"):
"""Call QwenPaw AI via agents chat.
GATEWAY_ENABLED = os.environ.get("GATEWAY_ENABLED", "1") == "1"
def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent", retries=2):
"""Call QwenPaw AI via agents chat, with retries on failure.
Args:
prompt: The user message
system_prompt: Optional system instructions (appended before prompt)
target_agent: Which agent to call (default:sourcing-agent, translation:default)
retries: Number of retries on failure (exponential backoff)
"""
full_prompt = (system_prompt + "\n\n" + prompt) if system_prompt else prompt
last_error = None
for attempt in range(1 + retries):
try:
result = subprocess.run(
["qwenpaw", "agents", "chat",
@@ -539,9 +553,7 @@ def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent"):
"--text", full_prompt],
capture_output=True, text=True, timeout=90
)
if result.returncode != 0:
log(f"AI error (rc={result.returncode}): {result.stderr[:200]}")
return None
if result.returncode == 0 and result.stdout.strip():
# Clean output: remove INFO/SESSION/Agent lines
raw = result.stdout.strip()
lines = raw.split("\n")
@@ -555,17 +567,23 @@ def call_qwenpaw_ai(prompt, system_prompt=None, target_agent="sourcing-agent"):
if stripped:
clean_lines.append(stripped)
reply = "\n".join(clean_lines).strip()
if not reply:
log(f"⚠️ AI reply empty after cleaning. Raw: {raw[:200]}")
return None
if reply:
return reply
last_error = f"rc={result.returncode}" if result.returncode != 0 else "empty reply"
if result.stderr:
last_error += f" stderr={result.stderr[:200]}"
except subprocess.TimeoutExpired:
log("AI call timed out")
return None
last_error = "timeout (90s)"
except Exception as e:
log(f"AI error: {e}")
last_error = str(e)[:200]
if attempt < retries:
wait = 2 ** attempt # 1s, 2s
log(f"⚠️ AI call failed ({last_error}), retry {attempt+1}/{retries} in {wait}s", "WARN")
time.sleep(wait)
log(f"❌ AI call failed after {retries+1} attempts: {last_error}", "ERROR")
return None
def translate_to_chinese(text):
"""Translate text to Simplified Chinese."""
prompt = (
@@ -576,20 +594,16 @@ def translate_to_chinese(text):
result = call_qwenpaw_ai(prompt, target_agent="default")
return result if result else text
def generate_ai_reply(customer_msg, sender_name, inbox_id):
"""Generate an AI reply for a customer message, routed by inbox_id.
Uses INBOX_CONFIG[inbox_id] to pick the right AI agent, system prompt,
and prompt template. Instructs AI to self-assess handoff need via [HANDOFF].
"""
def generate_ai_reply(customer_msg, sender_name, inbox_id, context=""):
config = INBOX_CONFIG.get(inbox_id)
if not config:
log(f"No config for inbox #{inbox_id}, skipping", "WARN")
return None
body = f"{context}\n\n{customer_msg}" if context else customer_msg
prompt = config["prompt_template"].format(
sender_name=sender_name,
customer_msg=customer_msg
customer_msg=body
)
reply = call_qwenpaw_ai(prompt, config["system_prompt"],
target_agent=config["target_agent"])
@@ -723,6 +737,25 @@ def handle_meeting_message(msg_data, headers):
# ===== MESSAGE HANDLER =====
def _enrich_context(inbox_id, customer_msg, config):
if not GATEWAY_ENABLED:
return ""
channel = config.get("type", "web_widget")
if channel not in ("amazon", "jd", "taobao", "pdd", "tiktok"):
return ""
tenant_id = config.get("tenant_id")
if not tenant_id:
return ""
asin_match = _re.search(r'\b(B0[A-Z0-9]{8})\b', customer_msg, _re.IGNORECASE)
query = {"asin": asin_match.group(1).upper()} if asin_match else {"keyword": customer_msg[:50]}
try:
from gateway import fetch
result = fetch(channel, tenant_id, query, timeout=3.0)
return result.to_prompt_block()
except Exception as e:
log(f"Gateway enrich error: {e}")
return ""
def handle_incoming_message(msg_data, headers):
"""Process an incoming customer message (with human handoff awareness)."""
msg_id = msg_data.get("id")
@@ -768,12 +801,36 @@ def handle_incoming_message(msg_data, headers):
# Mark immediately to avoid duplicate processing
mark_processed(msg_id)
# ===== DEBOUNCE: coalesce rapid messages from same conv =====
with _debounce_lock:
now = time.time()
last_time = _debounce_timers.get(conv_id, 0)
if now - last_time < DEBOUNCE_WINDOW:
# Within debounce window → accumulate, don't reply yet
if conv_id not in _debounce_msgs:
_debounce_msgs[conv_id] = []
_debounce_msgs[conv_id].append(content)
_debounce_timers[conv_id] = now
log(f"⏳ [{config['name']}] Debounce #{conv_id}: accumulated msg #{msg_id} (window {DEBOUNCE_WINDOW}s)")
return
# First message or window expired → process now
_debounce_timers[conv_id] = now
# Collect any previously accumulated messages
accumulated = _debounce_msgs.pop(conv_id, [])
if accumulated:
accumulated.append(content)
content = "\n---\n".join(accumulated)
log(f"📦 [{config['name']}] Debounce #{conv_id}: processing {len(accumulated)} merged msgs")
# ===== END DEBOUNCE =====
log(f"📩 [{config['name']}] New msg #{msg_id} from '{sender_name}' in conv #{conv_id}: {content[:100]}", inbox_name=config['name'])
# Generate AI reply with inbox-specific config
log(f"🤖 [{config['name']}] Generating AI reply...", inbox_name=config['name'])
start_time = time.time()
reply = generate_ai_reply(content, sender_name, inbox_id)
context = _enrich_context(inbox_id, content, config)
reply = generate_ai_reply(content, sender_name, inbox_id, context=context)
duration_ms = (time.time() - start_time) * 1000
if not reply:
@@ -1191,6 +1248,14 @@ def main():
log(f"⚙️ Human timeout: {HUMAN_TIMEOUT_MINUTES}min")
log(f"⚙️ Account ID: {CW_ACCOUNT_ID}")
if GATEWAY_ENABLED:
try:
from gateway import gateway_loop as _gw_loop
_gw_loop.start()
log("🌐 Platform Gateway event loop started")
except Exception as e:
log(f"⚠️ Gateway init failed (continuing without): {e}", "WARN")
# Start the timeout checker daemon
checker = Thread(target=timeout_checker_loop, daemon=True)
checker.start()
@@ -1202,6 +1267,12 @@ def main():
def _shutdown(signum, frame):
log(f"Received signal {signum}, shutting down gracefully...")
agent.stop()
if GATEWAY_ENABLED:
try:
from gateway import gateway_loop as _gw_loop
_gw_loop.stop()
except Exception:
pass
save_state()
metrics.flush()
sys.exit(0)