69 lines
1.9 KiB
Python
69 lines
1.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""AES-256-GCM credential encryption.
|
|
|
|
The 32-byte key is loaded from ``GATEWAY_AES_KEY`` (base64, 32 bytes raw).
|
|
|
|
Format on disk (VARBINARY column):
|
|
nonce (12 bytes) || ciphertext_with_tag
|
|
|
|
Plaintext is the JSON of ``{access_token, refresh_token, ...}`` per channel.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
log = logging.getLogger("chathub.gateway.crypto")
|
|
|
|
|
|
def _key() -> bytes:
|
|
raw = os.environ.get("GATEWAY_AES_KEY", "")
|
|
if not raw:
|
|
raise RuntimeError(
|
|
"GATEWAY_AES_KEY not set — refusing to encrypt/decrypt credentials"
|
|
)
|
|
try:
|
|
decoded = base64.b64decode(raw, validate=True)
|
|
except Exception as e:
|
|
raise RuntimeError(f"GATEWAY_AES_KEY not valid base64: {e}") from None
|
|
if len(decoded) != 32:
|
|
raise RuntimeError(
|
|
f"GATEWAY_AES_KEY must decode to 32 bytes, got {len(decoded)}"
|
|
)
|
|
return decoded
|
|
|
|
|
|
def encrypt(plaintext_obj: dict | str) -> bytes:
|
|
"""Encrypt a dict (or string) under AES-256-GCM. Returns nonce||ct."""
|
|
plaintext = (
|
|
plaintext_obj
|
|
if isinstance(plaintext_obj, str)
|
|
else json.dumps(plaintext_obj, ensure_ascii=False, sort_keys=True)
|
|
)
|
|
nonce = os.urandom(12)
|
|
return nonce + AESGCM(_key()).encrypt(nonce, plaintext.encode("utf-8"), None)
|
|
|
|
|
|
def decrypt(blob: bytes) -> dict:
|
|
"""Decrypt a nonce||ct blob back to a dict."""
|
|
if len(blob) < 12 + 16: # nonce + min GCM tag
|
|
raise ValueError("ciphertext too short")
|
|
nonce, ct = blob[:12], blob[12:]
|
|
raw = AESGCM(_key()).decrypt(nonce, ct, None)
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
|
|
def is_configured() -> bool:
|
|
"""Check whether a usable key is present. Used by callers to short-circuit."""
|
|
try:
|
|
_key()
|
|
return True
|
|
except RuntimeError:
|
|
return False
|