HEX
Server: Apache
System: Linux zacp120.webway.host 4.18.0-553.70.1.lve.el8.x86_64 #1 SMP Wed Aug 20 14:42:18 UTC 2025 x86_64
User: govancoz (1003)
PHP: 8.3.27
Disabled: exec,system,passthru,shell_exec,proc_close,proc_open,dl,popen,show_source,posix_kill,posix_mkfifo,posix_getpwuid,posix_setpgid,posix_setsid,posix_setuid,posix_setgid,posix_seteuid,posix_setegid,posix_uname
Upload Files
File: //proc/self/root/opt/osm/main.py
#!/usr/bin/env python3
import argparse
import json
import os
import time
import subprocess
import re
import requests
from requests.adapters import HTTPAdapter
try:
    # Retry support for transient 5xx via urllib3
    from urllib3.util.retry import Retry  # type: ignore
except Exception:
    Retry = None
import sys
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict, deque
import shutil
import threading
import stat
from typing import Optional

def _load_env_file():
    env_path = Path(__file__).resolve().parent / ".env"
    if not env_path.exists():
        return
    try:
        for line in env_path.read_text(encoding='utf-8').splitlines():
            line = line.strip()
            if not line or line.startswith("#") or "=" not in line:
                continue
            key, value = line.split("=", 1)
            key = key.strip()
            value = value.strip().strip("'\"")
            if key and key not in os.environ:
                os.environ[key] = value
    except Exception:
        pass

_load_env_file()

BASE_URL = os.environ.get("SERVER_BASE_URL")
TOKEN = os.environ.get("AGENT_TOKEN")
API_KEY = os.environ.get("OSM_API_KEY", "")

if not BASE_URL or not TOKEN:
    print("[agent] Warning: SERVER_BASE_URL or AGENT_TOKEN not set. Running in offline mode.")

session = requests.Session()
# Configure light retries for transient gateway errors (e.g., tunneling hiccups)
try:
    if Retry is not None:
        retry = Retry(
            total=2,
            connect=1,
            read=1,
            backoff_factor=0.5,
            status_forcelist=[502, 503, 504],
            allowed_methods=frozenset(["GET", "POST"]),
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
except Exception:
    pass
DEFAULT_HEADERS = {"X-Api-Key": API_KEY} if API_KEY else {}

def _env_flag(name, default=False):
    val = os.environ.get(name)
    if val is None:
        return default
    try:
        return val.strip().lower() in {"1", "true", "yes", "on"}
    except Exception:
        return default

REQUEST_DEBUG_ENABLED = _env_flag("OSM_DEBUG_REQUESTS", False)

def _mask_headers(headers):
    if not headers:
        return {}
    masked = {}
    for key, value in headers.items():
        if key.lower() == "x-api-key":
            masked[key] = "***"
        else:
            masked[key] = value
    return masked

def _truncate_payload(payload, limit=500):
    if payload is None:
        return None
    try:
        text = json.dumps(payload, default=str)
    except Exception:
        text = str(payload)
    if len(text) > limit:
        return text[:limit] + "...(truncated)"
    return text

_original_request = session.request

def _logged_request(method, url, **kwargs):
    method_upper = method.upper()
    params_preview = _truncate_payload(kwargs.get("params"))
    json_preview = _truncate_payload(kwargs.get("json"))
    data_preview = _truncate_payload(kwargs.get("data"))
    headers_preview = _mask_headers(kwargs.get("headers") or {})
    timeout_value = kwargs.get("timeout")

    if REQUEST_DEBUG_ENABLED:
        try:
            print(f"[agent] -> {method_upper} {url} params={params_preview} json={json_preview} data={data_preview} headers={headers_preview} timeout={timeout_value}")
        except Exception as log_err:
            print(f"[agent] Failed to log request pre-flight: {log_err}")

    try:
        response = _original_request(method, url, **kwargs)
    except Exception as exc:
        if REQUEST_DEBUG_ENABLED:
            print(f"[agent] <- {method_upper} {url} raised {exc}")
        else:
            print(f"[agent] HTTP {method_upper} {url} failed: {exc}")
        raise

    if REQUEST_DEBUG_ENABLED:
        try:
            body_preview = response.text
            if body_preview and len(body_preview) > 500:
                body_preview = body_preview[:500] + "...(truncated)"
            print(f"[agent] <- {method_upper} {url} status={response.status_code} body={body_preview}")
        except Exception as log_err:
            print(f"[agent] Failed to log response: {log_err}")

    return response

session.request = _logged_request

# ---------- OSM-like tracking DB and Exim log processing (local on agent) ----------

# Global default limit (from server config)
DEFAULT_MAX_EMAILS: int = 0
DEFAULT_PER_SECONDS: int = 0
DEFAULT_ENABLED: bool = False
QUEUE_INGEST_ENABLED: bool = True
FREEZE_REPORTS_ENABLED: bool = True
IPDB_LOOKUP_ENABLED: bool = True

# Paths
AGENT_DIR = Path(__file__).resolve().parent
TRACKING_DB_PATH = AGENT_DIR / 'osm_tracking.db'
EXIM_LOG_STATE_PATH = AGENT_DIR / 'exim_log.state'
LIMITS_CACHE_PATH = AGENT_DIR / 'user_limits.json'
ACTIONS_QUEUE_PATH = AGENT_DIR / 'pending_actions.json'
AGENT_STATE_PATH = AGENT_DIR / 'agent_state.json'

# OSM-style integration files (as used by OSM on cPanel/DirectAdmin servers)
VIRTUAL_DIR = Path('/etc/virtual')
OSM_HOLD_FILE = VIRTUAL_DIR / 'osm_hold'      # usernames to freeze/hold
OSM_DISABLE_FILE = VIRTUAL_DIR / 'osm_disable'  # usernames to discard

# Legacy/alternate freeze list for generic exim setups (optional)
FREEZE_LIST_PATH = Path('/etc/exim/osm_freeze_senders')

# In-memory counters: key -> deque[timestamps]
# key is a tuple: (account_ref: str, trigger_name: str, window_seconds: int)
LOG_COUNTERS = defaultdict(deque)
LAST_TRIGGERED = {}
LAST_LIMIT_TRIGGERED = {}  # Tracks last freeze time for per-account/default limits: owner_key -> timestamp
ACCOUNT_LIMIT_WINDOWS = defaultdict(deque)  # account -> deque[timestamps]
# Last seen subject per account_ref
LAST_SUBJECT_BY_ACCOUNT = {}
# Deduplication of log-based events by message-id
SEEN_LOG_MSG_IDS = deque()
SEEN_LOG_MSG_IDS_SET = set()
SEEN_LOG_MSG_IDS_RETENTION_SECS = 600  # 10 minutes
# Deduplication of per-recipient deliveries by (msgid, recipient)
SEEN_MSG_RCPT = deque()
SEEN_MSG_RCPT_SET = set()
SEEN_MSG_RCPT_RETENTION_SECS = 600
# Retain counters in DB for some time after last event
LAST_COUNTER_SEEN_TS = {}
COUNTER_RETENTION_SECS = 7200
# Per-message caches
MSG_ID_TO_ACCOUNT = {}
MSG_ID_TO_SUBJECT = {}
# Recent recipients per account_ref with last-seen timestamps
RECIPIENTS_BY_ACCOUNT = {}
RECIPIENT_RETENTION_SECS = 7200
# Domain -> owner cache (cpanel/directadmin)
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD = 0.0
DOMAIN_OWNER_TTL_SECS = 300
# Per-message metadata
MSG_ID_TO_CLIENT = {}
MSG_ID_TO_RELAY = {}
MSG_ID_TO_OWNER = {}
MSG_ID_TO_RCPTS = {}
MSG_ID_TO_RELAY_TYPE = {}
# Per-owner recent events (delivery granularity)
ACCOUNT_EVENTS = defaultdict(deque)
ACCOUNT_EVENTS_RETENTION_SECS = 7200
MAX_LOG_LOOKBACK_SECS = 3 * 3600  # Ignore historical log entries older than 3 hours
REPORTS_DIR = Path('/opt/osm/reports')
PENDING_REPORTS_DIR = REPORTS_DIR / 'pending'
LAST_LOCAL_HOLD_UPDATE_TS = 0.0
CURRENT_SERVER_ID = None
THRESHOLD_LOCK = threading.Lock()
CURRENT_THRESHOLDS = []
EXIM_MONITOR_THREAD = None
EXIM_MONITOR_INTERVAL_SECS = 2.0

try:
    sys.stdout.reconfigure(line_buffering=True)
    sys.stderr.reconfigure(line_buffering=True)
except Exception:
    pass


def parse_thresholds(config: dict):
    """Return list of threshold dicts: [{events:int, seconds:int, name:str, type:str}]."""
    try:
        default_events = int(config.get('defaultMaxEmails') or 50)
    except Exception:
        default_events = 50
    try:
        default_seconds = int(config.get('defaultPerSeconds') or 300)
    except Exception:
        default_seconds = 300
    if default_events <= 0:
        default_events = 50
    if default_seconds <= 0:
        default_seconds = 300
    default_thresholds = [
        {"events": default_events, "seconds": default_seconds, "name": "default_logline", "type": "logline"}
    ]
    try:
        raw = config.get("alertThresholdsJson")
        if not raw:
            return default_thresholds
        data = raw
        if isinstance(raw, str):
            try:
                data = json.loads(raw)
            except Exception:
                return default_thresholds
        # Accept either {"thresholds": [...]} or {"loglineThresholds": [...]} or a list directly
        if isinstance(data, dict):
            thr = data.get("thresholds") or data.get("loglineThresholds")
        elif isinstance(data, list):
            thr = data
        else:
            thr = None
        if not thr:
            return default_thresholds
        out = []
        for i, t in enumerate(thr, start=1):
            try:
                events = int(t.get("events"))
                seconds = int(t.get("seconds"))
                name = (t.get("name") or f"trigger{i}")
                typ = (t.get("type") or "logline")
                if events > 0 and seconds > 0:
                    out.append({"events": events, "seconds": seconds, "name": name, "type": typ})
            except Exception:
                continue
        return out or default_thresholds
    except Exception:
        return default_thresholds


def _load_log_state(log_path: Path):
    try:
        if not EXIM_LOG_STATE_PATH.exists():
            return {"path": str(log_path), "ino": None, "pos": 0}
        data = json.loads(EXIM_LOG_STATE_PATH.read_text())
        if data.get("path") != str(log_path):
            return {"path": str(log_path), "ino": None, "pos": 0}
        return {"path": str(log_path), "ino": data.get("ino"), "pos": int(data.get("pos", 0))}
    except Exception:
        return {"path": str(log_path), "ino": None, "pos": 0}


def _save_log_state(state: dict):
    try:
        EXIM_LOG_STATE_PATH.write_text(json.dumps(state))
    except Exception as e:
        print(f"[agent] Error saving log state: {e}")


def _snapshot_thresholds():
    with THRESHOLD_LOCK:
        if CURRENT_THRESHOLDS:
            return [dict(t) for t in CURRENT_THRESHOLDS]
    return parse_thresholds({})


def _exim_monitor_loop():
    print(f"[agent] Exim monitor thread started (interval={EXIM_MONITOR_INTERVAL_SECS}s)")
    while True:
        try:
            thresholds = _snapshot_thresholds()
            process_exim_logs(thresholds, CURRENT_SERVER_ID)
            write_tracking_db(thresholds)
        except Exception as e:
            print(f"[agent] Exim monitor error: {e}")
        time.sleep(EXIM_MONITOR_INTERVAL_SECS)


def _ensure_exim_monitor_thread():
    global EXIM_MONITOR_THREAD
    if EXIM_MONITOR_THREAD and EXIM_MONITOR_THREAD.is_alive():
        return
    EXIM_MONITOR_THREAD = threading.Thread(target=_exim_monitor_loop, daemon=True)
    EXIM_MONITOR_THREAD.start()


def _parse_exim_timestamp(line: str) -> float:
    """Parse leading timestamp from an Exim log line; fallback to current time."""
    try:
        ts_str = line[:19]
        if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", ts_str):
            try:
                dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
                return time.mktime(dt.timetuple())
            except Exception:
                pass
    except Exception:
        pass
    return time.time()


# --- Inserted helpers for enrollment, offline cache, and owner resolution ---

def enroll():
    """Enroll this agent with the server and return serverId."""
    resp = session.post(f"{BASE_URL}/api/osm/agent/enroll", json={"token": TOKEN}, headers=DEFAULT_HEADERS, timeout=10)
    resp.raise_for_status()
    return resp.json()["serverId"]


def _read_limits_cache() -> dict:
    try:
        if LIMITS_CACHE_PATH.exists():
            return json.loads(LIMITS_CACHE_PATH.read_text(encoding='utf-8'))
    except Exception as e:
        print(f"[agent] Error reading limits cache: {e}")
    return {"default": {"enabled": False, "maxEmails": 0, "perSeconds": 0}, "accounts": []}


def _write_limits_cache(cache: dict):
    try:
        LIMITS_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
        LIMITS_CACHE_PATH.write_text(json.dumps(cache, ensure_ascii=False, indent=2))
    except Exception as e:
        print(f"[agent] Error writing limits cache: {e}")


def _load_agent_state() -> dict:
    try:
        if AGENT_STATE_PATH.exists():
            return json.loads(AGENT_STATE_PATH.read_text(encoding='utf-8'))
    except Exception as e:
        print(f"[agent] Error reading agent state: {e}")
    return {}


def _save_agent_state(state: dict):
    try:
        AGENT_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
        AGENT_STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
    except Exception as e:
        print(f"[agent] Error writing agent state: {e}")


def _get_current_server_id() -> str | None:
    try:
        if CURRENT_SERVER_ID is not None:
            return str(CURRENT_SERVER_ID)
    except Exception:
        pass
    state = _load_agent_state()
    sid = state.get("serverId")
    return str(sid) if sid else None


def _read_actions_queue() -> list:
    try:
        if ACTIONS_QUEUE_PATH.exists():
            return json.loads(ACTIONS_QUEUE_PATH.read_text(encoding='utf-8'))
    except Exception as e:
        print(f"[agent] Error reading actions queue: {e}")
    return []


def _write_actions_queue(actions: list):
    try:
        ACTIONS_QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True)
        ACTIONS_QUEUE_PATH.write_text(json.dumps(actions, ensure_ascii=False, indent=2))
    except Exception as e:
        print(f"[agent] Error writing actions queue: {e}")


def _queue_action(action: dict):
    try:
        actions = _read_actions_queue()
        action = dict(action)
        action.setdefault("queuedAt", time.time())
        actions.append(action)
        _write_actions_queue(actions)
    except Exception as e:
        print(f"[agent] Failed to queue action: {e}")


def set_account_limit_local(account: str, max_emails: int, per_seconds: int, enabled: bool = True):
    try:
        account = (account or '').strip()
        if not account:
            return
        cache = _read_limits_cache()
        accounts = cache.get('accounts') or []
        updated = False
        for entry in accounts:
            if entry.get('account', '').strip().lower() == account.lower():
                entry['maxEmails'] = int(max_emails)
                entry['perSeconds'] = int(per_seconds)
                entry['enabled'] = bool(enabled)
                updated = True
                break
        if not updated:
            accounts.append({
                "account": account,
                "maxEmails": int(max_emails),
                "perSeconds": int(per_seconds),
                "enabled": bool(enabled)
            })
        cache['accounts'] = accounts
        _write_limits_cache(cache)
    except Exception as e:
        print(f"[agent] Failed to set local account limit: {e}")


def remove_account_limit_local(account: str):
    try:
        account = (account or '').strip()
        if not account:
            return
        cache = _read_limits_cache()
        accounts = cache.get('accounts') or []
        new_accounts = [entry for entry in accounts if entry.get('account', '').strip().lower() != account.lower()]
        cache['accounts'] = new_accounts
        _write_limits_cache(cache)
    except Exception as e:
        print(f"[agent] Failed to remove local account limit: {e}")


def set_default_limit_local(max_emails: int, per_seconds: int, enabled: bool = True):
    try:
        cache = _read_limits_cache()
        cache['default'] = {
            "enabled": bool(enabled),
            "maxEmails": int(max_emails),
            "perSeconds": int(per_seconds)
        }
        _write_limits_cache(cache)
        try:
            global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
            DEFAULT_MAX_EMAILS = int(max_emails)
            DEFAULT_PER_SECONDS = int(per_seconds)
            DEFAULT_ENABLED = bool(enabled)
        except Exception:
            pass
    except Exception as e:
        print(f"[agent] Failed to set local default limit: {e}")


def fetch_config(sid: int):
    """Fetch monitoring configuration from server and persist defaults; fallback to cache offline."""
    try:
        resp = session.get(f"{BASE_URL}/api/osm/agent/config", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
        resp.raise_for_status()
        cfg = resp.json()
        cfg.setdefault("ipdbLookupEnabled", True)
        cfg.setdefault("queueIngestEnabled", True)
        cfg.setdefault("freezeReportsEnabled", True)
        cache = _read_limits_cache()
        cache["default"] = {
            "enabled": bool(cfg.get('defaultEnabled') or False),
            "maxEmails": int(cfg.get('defaultMaxEmails') or 0),
            "perSeconds": int(cfg.get('defaultPerSeconds') or 0)
        }
        _write_limits_cache(cache)
        try:
            global QUEUE_INGEST_ENABLED, FREEZE_REPORTS_ENABLED, IPDB_LOOKUP_ENABLED
            QUEUE_INGEST_ENABLED = bool(cfg.get('queueIngestEnabled', True))
            FREEZE_REPORTS_ENABLED = bool(cfg.get('freezeReportsEnabled', True))
            IPDB_LOOKUP_ENABLED = bool(cfg.get('ipdbLookupEnabled', True))
            print(f"[agent] Feature toggles: queue={QUEUE_INGEST_ENABLED} freeze={FREEZE_REPORTS_ENABLED} ipdb={IPDB_LOOKUP_ENABLED}")
        except Exception as toggle_err:
            print(f"[agent] Failed to apply feature toggles: {toggle_err}")
        return cfg
    except Exception as e:
        print(f"[agent] Error fetching config: {e}")
        d = _read_limits_cache().get("default", {})
        return {
            "mailQueuePath": "/var/spool/postfix",
            "checkInterval": 30,
            "spamThreshold": 5.0,
            "blockedDomainsJson": "[]",
            "alertThresholdsJson": "{}",
            "defaultMaxEmails": int(d.get('maxEmails') or 0),
            "defaultPerSeconds": int(d.get('perSeconds') or 0),
            "defaultEnabled": bool(d.get('enabled') or False),
            "ipdbLookupEnabled": IPDB_LOOKUP_ENABLED,
            "queueIngestEnabled": QUEUE_INGEST_ENABLED,
            "freezeReportsEnabled": FREEZE_REPORTS_ENABLED
        }


def _extract_email(text):
    try:
        m = re.search(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+)", text or '')
        return m.group(1).lower() if m else None
    except Exception:
        return None


DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD: float = 0.0
DOMAIN_OWNER_TTL_SECS = 300


def _load_domain_owner_map():
    global DOMAIN_OWNER_LAST_LOAD
    now = time.time()
    if DOMAIN_OWNER_MAP and (now - DOMAIN_OWNER_LAST_LOAD) < DOMAIN_OWNER_TTL_SECS:
        return
    try:
        DOMAIN_OWNER_MAP.clear()
        # cPanel: /etc/trueuserdomains format: domain: user
        p = Path('/etc/trueuserdomains')
        if p.exists():
            for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # cPanel alternate mapping file /etc/userdomains
        p3 = Path('/etc/userdomains')
        if p3.exists():
            for line in p3.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # DirectAdmin: /etc/virtual/domainowners
        p2 = Path('/etc/virtual/domainowners')
        if p2.exists():
            for line in p2.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # DirectAdmin global domainowners file
        da_owners = Path('/usr/local/directadmin/data/admin/domainowners')
        if da_owners.exists():
            for line in da_owners.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # DirectAdmin per-user domain lists
        da_users_root = Path('/usr/local/directadmin/data/users')
        if da_users_root.exists():
            for user_dir in da_users_root.iterdir():
                if not user_dir.is_dir():
                    continue
                user = user_dir.name.strip().lower()
                try:
                    dom_list = user_dir / 'domains.list'
                    if dom_list.exists():
                        for dom in dom_list.read_text(encoding='utf-8', errors='ignore').splitlines():
                            dom = (dom or '').strip().lower()
                            if dom:
                                DOMAIN_OWNER_MAP[dom] = user
                except Exception:
                    continue
        DOMAIN_OWNER_LAST_LOAD = now
    except Exception as e:
        print(f"[agent] Failed loading domain owners: {e}")


def _resolve_owner_from_email(email_addr):
    try:
        _load_domain_owner_map()
        email_addr = (email_addr or '').strip().lower()
        if '@' not in email_addr:
            return None
        local_part, domain = email_addr.split('@', 1)
        domain = domain.rstrip('.')
        parts = domain.split('.')
        for i in range(len(parts)-1):
            d = '.'.join(parts[i:])
            owner = DOMAIN_OWNER_MAP.get(d)
            if owner:
                return owner
        owner = DOMAIN_OWNER_MAP.get(domain)
        if owner:
            return owner
        # Fallback: inspect virtual mailbox passwd files to infer owner
        owner = _lookup_owner_from_virtual_passwd(local_part, domain)
        if owner:
            DOMAIN_OWNER_MAP[domain] = owner
            return owner
        return None
    except Exception:
        return None


def _lookup_owner_from_virtual_passwd(local_part: str, domain: str):
    """Inspect /etc/virtual/<domain>/passwd files to determine owning account."""
    try:
        local_part = (local_part or '').strip().lower()
        domain = (domain or '').strip().lower()
        if not local_part or not domain:
            return None
        domains_to_try = []
        if domain:
            domains_to_try.append(domain)
            parts = domain.split('.')
            for i in range(1, len(parts)-1):
                parent = '.'.join(parts[i:])
                if parent not in domains_to_try:
                    domains_to_try.append(parent)
        for extra in ('lists', 'default'):
            if extra not in domains_to_try:
                domains_to_try.append(extra)
        for dom in domains_to_try:
            vpath = Path('/etc/virtual') / dom / 'passwd'
            if not vpath.exists():
                continue
            try:
                for line in vpath.read_text(encoding='utf-8', errors='ignore').splitlines():
                    if not line or line.startswith('#'):
                        continue
                    parts = line.split(':')
                    if not parts:
                        continue
                    account_field = parts[0].strip().lower()
                    if account_field != local_part and account_field != f"{local_part}@{dom}":
                        continue
                    # Attempt to derive owner from home/maildir path within the line
                    for token in parts[1:]:
                        if '/home/' in token:
                            m = re.search(r'/home/([^/]+)/', token)
                            if m:
                                return m.group(1).strip().lower()
                    # Sometimes the maildir path is at the end
                    m = re.search(r'/home/([^/]+)/', line)
                    if m:
                        return m.group(1).strip().lower()
            except Exception:
                continue
    except Exception:
        pass
    return None


def _resolve_limit_account_key(account_ref: str) -> str:
    """Map an account_ref to the hosting account owner key used by limits."""
    try:
        # Try to extract email first (sender part before '(' or in the string)
        email = None
        if '(' in account_ref:
            email = account_ref.split('(', 1)[0].strip()
        if not email:
            email = _extract_email(account_ref)
        owner = _resolve_owner_from_email(email) if email else None
        if owner:
            return owner
        # Fallback: local-part or username
        if '(' in account_ref and account_ref.endswith(')'):
            inside = account_ref[account_ref.rfind('(')+1:-1]
            if inside:
                return inside.lower()
        m = re.match(r"([^@]+)@", account_ref)
        if m:
            return m.group(1).lower()
        return account_ref.strip().lower()
    except Exception:
        return account_ref.strip().lower()


def extract_username_from_account_ref(account_ref: str) -> str:
    try:
        if '(' in account_ref and account_ref.endswith(')'):
            return account_ref[account_ref.rfind('(')+1:-1].strip().lower()
        m = re.match(r"([^@]+)@", account_ref)
        if m:
            return m.group(1).strip().lower()
        return account_ref.strip().lower()
    except Exception:
        return account_ref.strip().lower()


def fetch_freeze_list(sid: int):
    try:
        resp = session.get(f"{BASE_URL}/api/osm/agent/freeze-list", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
        resp.raise_for_status()
        return resp.json() or []
    except Exception as e:
        print(f"[agent] Error fetching freeze list: {e}")
        # Return None to indicate remote unavailable so we do not wipe local hold file
        return None


def reconcile_freeze_file_with_server(sid: int, prefer_server: bool = False):
    try:
        # Grace period after local updates to avoid wiping local holds before server reflects them
        # Skip the grace period when server explicitly requests reconciliation (prefer_server=True)
        try:
            if not prefer_server and time.time() - LAST_LOCAL_HOLD_UPDATE_TS < 30:
                return
        except Exception:
            pass

        server_items = fetch_freeze_list(sid)
        if server_items is None:
            # Server unreachable; do not modify local hold file
            return

        # Build server set of entries (username or sender email)
        server_set = set()
        for it in server_items:
            if not it.get('active', True):
                continue
            account = (it.get('account') or '').strip()
            sender = (it.get('senderEmail') or '').strip()
            if account:
                server_set.add(account.lower())
            elif sender:
                server_set.add(sender.lower())

        _ensure_virtual_files()
        local_set = set()
        try:
            for line in OSM_HOLD_FILE.read_text().splitlines():
                s = (line or '').strip().lower()
                if s:
                    local_set.add(s)
        except Exception:
            pass

        if prefer_server:
            # Server-wins mode (e.g., after an app-initiated unfreeze): adopt server state and do NOT push local-only entries
            desired = server_set
            if desired != local_set:
                with OSM_HOLD_FILE.open('w') as f:
                    for u in sorted(desired):
                        f.write(u + "\n")
                _reload_exim()
                print(f"[agent] Reconciled (server-wins). Server={len(server_set)} local={len(local_set)} applied={len(desired)}")
        else:
            # Merge mode (agent-wins for offline changes): push local-only entries to server then write union locally
            to_push = sorted(list(local_set - server_set))
            for val in to_push:
                try:
                    # Try to preserve the original freeze reason if we can infer it
                    freeze_reason = _find_freeze_reason(val)
                    payload = {"serverId": str(sid), "reason": freeze_reason}
                    if '@' in val:
                        payload["senderEmail"] = val
                        payload["account"] = ""
                    else:
                        payload["senderEmail"] = ""
                        payload["account"] = val
                    session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
                except Exception:
                    # Leave in local_set; future loops will retry via pending actions or here
                    pass

            # Compute union as desired hold file content
            desired = server_set | local_set
            if desired != local_set:
                with OSM_HOLD_FILE.open('w') as f:
                    for u in sorted(desired):
                        f.write(u + "\n")
                _reload_exim()
                print(f"[agent] Reconciled (merge). Server={len(server_set)} local={len(local_set)} merged={len(desired)}")
    except Exception as e:
        print(f"[agent] Reconcile freeze list failed: {e}")


def _find_freeze_reason(identifier: str) -> str:
    """Best-effort recovery of the original freeze reason for a given account/email.
    Checks queued actions first, then the most recent local report, else returns a safe default.
    """
    try:
        ident = (identifier or '').strip().lower()
        if not ident:
            return 'Manual freeze'
        # Look into pending actions (latest first) for a matching freeze entry
        try:
            actions = _read_actions_queue()
            for action in reversed(actions):
                if (action.get('type') or '') != 'freeze':
                    continue
                s = (action.get('senderEmail') or '').strip().lower()
                a = (action.get('account') or '').strip().lower()
                if ident == s or (a and ident == a):
                    r = (action.get('reason') or '').strip()
                    if r:
                        return r
        except Exception:
            pass
        # Search most recent report that matches sender/account
        try:
            if REPORTS_DIR.exists():
                files = sorted(REPORTS_DIR.glob('*.txt'), key=lambda p: p.stat().st_mtime, reverse=True)
                for path in files:
                    parsed = _parse_report_file(path)
                    if not parsed:
                        continue
                    s = (parsed.get('senderEmail') or '').strip().lower()
                    a = (parsed.get('account') or '').strip().lower()
                    if ident == s or (a and ident == a):
                        r = (parsed.get('reason') or '').strip()
                        if r:
                            return r
        except Exception:
            pass
        return 'Manual freeze'
    except Exception:
        return 'Manual freeze'


def get_exim_log_path() -> Path:
    """Return best-known Exim main log path for this system."""
    try:
        candidates = []
        try:
            if detect_control_panel() == 'cpanel':
                candidates.append(Path('/var/log/exim_mainlog'))
        except Exception:
            pass
        candidates.extend([
            Path('/var/log/exim/mainlog'),
            Path('/var/log/exim4/mainlog'),
            Path('/var/log/exim_mainlog'),
        ])
        for p in candidates:
            try:
                if p.exists() and p.is_file():
                    return p
            except Exception:
                continue
    except Exception:
        pass
    return Path('/var/log/exim/mainlog')


def process_exim_logs(thresholds, sid=None):
    """Incrementally parse Exim mainlog and update LOG_COUNTERS."""
    log_path = get_exim_log_path()
    try:
        if not log_path.exists():
            return
        st = log_path.stat()
        state = _load_log_state(log_path)
        # Handle rotation or first run
        if state.get("ino") != st.st_ino or state.get("pos", 0) > st.st_size:
            state = {"path": str(log_path), "ino": st.st_ino, "pos": 0}
        with log_path.open('r', encoding='utf-8', errors='ignore') as f:
            f.seek(state.get("pos", 0))
            new_data = f.read()
            state["pos"] = f.tell()
        _save_log_state(state)
        now_ts = time.time()
        # Parse new lines
        for line in new_data.splitlines():
            try:
                line_ts = _parse_exim_timestamp(line)
                if now_ts - line_ts > MAX_LOG_LOOKBACK_SECS:
                    continue
                if line_ts - now_ts > 300:
                    line_ts = now_ts
                # Record subject per account where available
                if ' <= ' in line or ' =>' in line:
                    msgid = _extract_message_id_from_exim_line(line) or ''
                    subj = _get_subject_for_message_id(msgid)
                    if subj:
                        acct = _extract_account_ref_from_exim_line(line)
                        if acct:
                            LAST_SUBJECT_BY_ACCOUNT[acct] = subj
                _update_counters_from_exim_line(line, thresholds, line_ts)
            except Exception:
                continue
        # Cleanup old dedupe entries
        cutoff = time.time() - SEEN_LOG_MSG_IDS_RETENTION_SECS
        while SEEN_LOG_MSG_IDS and SEEN_LOG_MSG_IDS[0][1] < cutoff:
            mid, _ = SEEN_LOG_MSG_IDS.popleft()
            SEEN_LOG_MSG_IDS_SET.discard(mid)
        cutoff_rcpt = time.time() - SEEN_MSG_RCPT_RETENTION_SECS
        while SEEN_MSG_RCPT and SEEN_MSG_RCPT[0][1] < cutoff_rcpt:
            key, _ = SEEN_MSG_RCPT.popleft()
            SEEN_MSG_RCPT_SET.discard(key)
    except Exception as e:
        print(f"[agent] Error processing exim logs: {e}")


def _extract_message_id_from_exim_line(line):
    """Extract the Exim queue id token that appears before '<=' on submission lines."""
    try:
        # Match the token immediately before '<='
        m = re.search(r"\b([A-Za-z0-9-]{6,})\b\s+<=", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None


def _extract_message_id_from_any_line(line):
    """Extract the Exim queue id from general log lines that start with timestamp then id."""
    try:
        m = re.search(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} ([A-Za-z0-9-]{6,})\b", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None


def _normalize_host(host: str) -> str:
    try:
        if not host:
            return ''
        h = host.strip()
        if h.startswith('[') and h.endswith(']'):
            h = h[1:-1]
        return h
    except Exception:
        return host or ''


def _extract_client_host_ip_from_line(line: str) -> tuple[str, str]:
    host = ''
    ip = ''
    try:
        m_parent = re.search(r"\bH=\(([^)]*)\)", line)
        if m_parent:
            host = (m_parent.group(1) or '').strip()
            if host.startswith('[') and host.endswith(']'):
                host = host[1:-1]
            m_after = re.search(r"\)\s*\[([^\]]+)\]", line)
            if m_after:
                ip = (m_after.group(1) or '').strip()
        else:
            m_host = re.search(r"\bH=([^\s]+)", line)
            if m_host:
                host = (m_host.group(1) or '').strip()
                if host.startswith('[') and host.endswith(']'):
                    host = host[1:-1]
            m_ip = re.search(r"\bH=[^\n]*?\[([^\]]+)\]", line)
            if m_ip:
                ip = (m_ip.group(1) or '').strip()
        bracket_values = re.findall(r"\[([^\]]+)\]", line)
        if not host and bracket_values:
            host = bracket_values[0].strip()
        if not ip and bracket_values:
            ip = bracket_values[-1].strip()
        return _normalize_host(host), ip
    except Exception:
        return '', ''


def _resolve_client_details(msg_id: str, client_dict: dict) -> tuple[str, str]:
    try:
        host = (client_dict or {}).get('h', '') or ''
        ip = (client_dict or {}).get('ip', '') or ''
        if (not host or host == '-') or (not ip or ip == '-'):
            fallback = MSG_ID_TO_CLIENT.get(msg_id) or {}
            if not host or host == '-':
                host = fallback.get('h', host) or host
            if not ip or ip == '-':
                ip = fallback.get('ip', ip) or ip
        host = _normalize_host(host)
        ip = (ip or '').strip()
        # strip any port if accidentally present
        if ip and ':' in ip:
            ip = ip.split(':', 1)[0]
        return host, ip
    except Exception:
        return '', ''


def _client_info_from_headers(headers: str) -> tuple[str, str]:
    host = ''
    ip = ''
    try:
        if not headers:
            return host, ip
        m_ip = re.search(r"-host_address\s+\[([^\]]+)\]", headers)
        if m_ip:
            ip = (m_ip.group(1) or '').strip()
        m_host = re.search(r"--helo_name\s+([^\s]+)", headers)
        if m_host:
            host = _normalize_host(m_host.group(1))
        if not host:
            m_recv = re.search(r"Received: from ([^\s]+)", headers)
            if m_recv:
                host = _normalize_host(m_recv.group(1))
    except Exception:
        pass
    return host, ip


def _fallback_events_from_queue_detail(detail: dict, sender_email: str, owner_key: str):
    events = []
    try:
        recipients = detail.get('recipients') or []
        if not recipients:
            rec = detail.get('recipient') or ''
            if rec:
                recipients = [rec]
        recipients = [r for r in recipients if r]
        subject = detail.get('subject') or ''
        headers = detail.get('headers') or detail.get('status') or ''
        client_h, client_ip = _client_info_from_headers(headers)
        qid = detail.get('queueId') or detail.get('messageId') or ''
        ts_now = time.time()
        base_sender = sender_email or detail.get('sender') or owner_key
        if not recipients:
            recipients = ['']
        for rcpt in recipients[:50]:
            events.append({
                'ts': ts_now,
                'sender': base_sender,
                'recipient': rcpt,
                'subject': subject,
                'client': {'h': client_h, 'ip': client_ip},
                'relay': {},
                'relayType': 'unknown',
                'qid': qid,
            })
    except Exception:
        pass
    return events


def _has_recent_log_events(owner_key: str, sender_email: str, window_secs: int) -> bool:
    try:
        if not owner_key or not sender_email:
            return False
        events = ACCOUNT_EVENTS.get(owner_key)
        if not events:
            return False
        cutoff = time.time() - max(window_secs, 0)
        sender_lc = sender_email.strip().lower()
        for ev in events:
            if ev.get('ts', 0) >= cutoff and (ev.get('sender', '') or '').strip().lower() == sender_lc:
                return True
        return False
    except Exception:
        return False


def _should_hold_owner(owner_key: str, sender_email: str) -> bool:
    try:
        if not owner_key:
            return False
        events = ACCOUNT_EVENTS.get(owner_key)
        if not events:
            return False
        sender_lc = (sender_email or '').strip().lower()
        for ev in reversed(events):
            ev_sender = (ev.get('sender', '') or '').strip().lower()
            if sender_lc and ev_sender != sender_lc:
                continue
            relay_type = (ev.get('relayType') or '').lower()
            if relay_type == 'local':
                return True
            if sender_lc:
                return False
        return False
    except Exception:
        return False


def _collect_events_for_report(owner_key: str, sender_email: str, window_secs: int):
    """Collect all events for owner within window (not filtered by sender).
    
    The limit applies to the whole account, so we need all events to show why the limit
    was exceeded, not just events from one specific sender address.
    """
    try:
        if not owner_key:
            return []
        evs_src = list(ACCOUNT_EVENTS.get(owner_key, deque()))
        if not evs_src:
            return []
        cutoff_ts = time.time() - max(window_secs, 0)
        # Collect ALL events for this owner within the window, not just one sender
        # The limit count is per-account, so report should show all contributing events
        events = [ev for ev in evs_src if ev.get('ts', 0) >= cutoff_ts]
        events.sort(key=lambda e: e.get('ts', 0))
        return events
    except Exception:
        return []


def _build_report_text(owner_key: str, sender_email: str, reason_text: str,
                       window_secs: int, limit_label: str, events_window: list) -> str:
    lines = []
    lines.append("Freeze Report\n")
    lines.append(f"Reason: {reason_text}\n")
    lines.append(f"Owner: {owner_key}\n")
    lines.append(f"Sender: {sender_email}\n")
    try:
        types = [(ev.get('relayType') or 'unknown') for ev in events_window]
        relay_summary = 'auth' if 'auth' in types else ('local' if 'local' in types else 'unknown')
        lines.append(f"Relay: {relay_summary}\n")
    except Exception:
        pass
    _append_client_summary(lines, events_window)
    try:
        srcs = []
        seen = set()
        for ev in events_window:
            c = ev.get('client', {}) or {}
            ch = c.get('h', '') or ''
            cip = c.get('ip', '') or ''
            label = f"{ch}[{cip}]" if cip or ch else ''
            if label and label not in seen:
                seen.add(label)
                srcs.append(label)
                if len(srcs) >= 5:
                    break
        if srcs:
            lines.append(f"Client sources: {', '.join(srcs)}\n")
    except Exception:
        pass
    limit_text = limit_label or 'Limit: n/a'
    lines.append(f"Window: last {window_secs}s; {limit_text}\n")
    lines.append(f"Events counted in window: {len(events_window)}\n")
    lines.append("\nEvents:\n")
    if events_window:
        for ev in events_window[-200:]:
            ts_iso = datetime.utcfromtimestamp(ev.get('ts', 0)).isoformat()
            rcpt = ev.get('recipient', '')
            subj = ev.get('subject', '')
            client = ev.get('client', {})
            relay = ev.get('relay', {})
            client_h, client_ip = _resolve_client_details(ev.get('qid', ''), client)
            relay_ip = relay.get('ip', '')
            relay_h = relay.get('h', '')
            rtype = (ev.get('relayType') or '').lower()
            relay_str = 'local' if rtype == 'local' else f"{relay_h}[{relay_ip}]"
            lines.append(f"- {ts_iso} to={rcpt} subj=\"{subj}\" client={client_h}[{client_ip}] relay={relay_str} qid={ev.get('qid', '')}\n")
    else:
        lines.append("- No detailed events captured for this window.\n")
    return ''.join(lines)


def _save_report_locally(owner_key: str, sender_email: str, reason_text: str,
                         report_text: str, sid):
    """Save freeze report locally. Reports are fetched on-demand by the app."""
    REPORTS_DIR.mkdir(parents=True, exist_ok=True)
    now_dt = datetime.utcnow()
    now_str = now_dt.strftime('%Y%m%d-%H%M%S')
    safe_sender = (sender_email or 'unknown').replace('@', '_')
    safe_owner = (owner_key or 'unknown').replace('@', '_')
    fname = f"{now_str}-{safe_owner}-{safe_sender}.txt"
    fpath = REPORTS_DIR / fname
    fpath.write_text(report_text)
    print(f"[agent] Freeze report saved locally: {fname}")


def _generate_freeze_report(account_ref: str, owner_key: str, sender_email: str,
                            reason_text: str, window_secs: int, limit_label: str,
                            sid, fallback_events: Optional[list] = None):
    if not FREEZE_REPORTS_ENABLED:
        print("[agent] Freeze report generation disabled; skipping report")
        return
    try:
        events_window = _collect_events_for_report(owner_key, sender_email, window_secs)
        if not events_window and fallback_events:
            events_window = list(fallback_events)
            try:
                acct_events = ACCOUNT_EVENTS.setdefault(owner_key, deque())
                for ev in fallback_events:
                    acct_events.append(ev)
            except Exception:
                pass
        report_text = _build_report_text(owner_key, sender_email, reason_text, window_secs, limit_label, events_window)
        _save_report_locally(owner_key, sender_email, reason_text, report_text, sid)
    except Exception as e:
        print(f"[agent] Failed to write report: {e}")


def _parse_report_file(path: Path):
    try:
        text = path.read_text(encoding='utf-8')
    except Exception as e:
        print(f"[agent] Failed reading report {path.name}: {e}")
        return None
    reason_text = ""
    owner_text = ""
    sender_text = ""
    try:
        m_reason = re.search(r"^Reason:\s*(.+)$", text, re.MULTILINE)
        if m_reason:
            reason_text = m_reason.group(1).strip()
        m_owner = re.search(r"^Owner:\s*(.+)$", text, re.MULTILINE)
        if m_owner:
            owner_text = m_owner.group(1).strip()
        m_sender = re.search(r"^Sender:\s*(.+)$", text, re.MULTILINE)
        if m_sender:
            sender_text = m_sender.group(1).strip()
    except Exception as e:
        print(f"[agent] Failed parsing metadata in {path.name}: {e}")
    try:
        created_at = datetime.utcfromtimestamp(path.stat().st_mtime).isoformat()
    except Exception:
        created_at = datetime.utcnow().isoformat()
    return {
        "reportText": text,
        "reason": reason_text,
        "account": owner_text,
        "senderEmail": sender_text,
        "createdAt": created_at,
        "reportKey": path.name,
    }


def _extract_recipient_from_arrow_line(line):
    try:
        m = re.search(r"(?:=>|->)\s+([^\s]+)", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None


def _extract_sender_from_arrow_line(line):
    try:
        m = re.search(r"\bF=<([^>]+)>", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None


def _extract_account_ref_from_exim_line(line):
    """Return account_ref like sender(user)."""
    try:
        # Prefer explicit 'from <sender>'
        m_from = re.search(r"from\s+<([^>\s]+@[^>\s]+)>", line)
        sender_email = m_from.group(1) if m_from else None
        if not sender_email:
            # Fallback to token after '<='
            m_after = re.search(r"<=\s*<?([^>\s]+@[^\s>]+)>?", line)
            if m_after:
                sender_email = m_after.group(1)
        if not sender_email:
            # Submission lines can also include F=<sender>
            m_f = re.search(r"\bF=<([^>]+)>", line)
            if m_f:
                candidate = (m_f.group(1) or '').strip()
                if candidate:
                    sender_email = candidate
        # Auth user: A=login:USER or A=dovecot_login:USER or courier_login
        m_auth = re.search(r"\bA=[A-Za-z0-9_-]+:([^\s]+)\b", line)
        auth_user = m_auth.group(1) if m_auth else None
        if not auth_user:
            # Local deliveries note the owner as U=username
            m_u = re.search(r"\bU=([^\s]+)\b", line)
            if m_u:
                auth_user = m_u.group(1)
        if auth_user:
            auth_user = auth_user.strip()
        # Compose account ref
        if sender_email and auth_user:
            return f"{sender_email}({auth_user})"
        if sender_email:
            return sender_email
        if auth_user:
            return auth_user
        return "Unknown"
    except Exception:
        return "Unknown"


def _update_counters_from_exim_line(line, thresholds, line_ts):
    """Parse Exim log line and update counters for tracking and per-account limits."""
    try:
        if 'diradm_user=' in line:
            msgid = _extract_message_id_from_any_line(line) or _extract_message_id_from_exim_line(line)
            if msgid:
                try:
                    m_user = re.search(r"diradm_user=([^\s]+)", line)
                    owner_user = m_user.group(1).strip().lower() if m_user else ''
                    if owner_user:
                        MSG_ID_TO_OWNER[msgid] = owner_user
                        existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
                        email_part = None
                        if existing_ref:
                            email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
                            email_part = (email_part or '').strip()
                        if not email_part:
                            m_addr = re.search(r"diradm_address=([^\s]+)", line)
                            if m_addr:
                                email_part = (m_addr.group(1) or '').strip()
                        if not email_part:
                            email_part = _extract_account_ref_from_exim_line(line)
                        if email_part:
                            MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
                        m_dom = re.search(r"diradm_domain=([^\s]+)", line)
                        if m_dom:
                            dom = (m_dom.group(1) or '').strip().lower().rstrip('.')
                            if dom:
                                DOMAIN_OWNER_MAP[dom] = owner_user
                        print(f"[agent] DirectAdmin owner resolved for msgid={msgid}: owner={owner_user}")
                except Exception as e:
                    print(f"[agent] DirectAdmin owner parse error: {e}")
        if 'Sender identification' in line:
            msgid = _extract_message_id_from_any_line(line)
            if msgid:
                try:
                    m_user = re.search(r"\bU=([^\s]+)", line)
                    if m_user:
                        owner_user = m_user.group(1).strip().lower()
                        if owner_user:
                            MSG_ID_TO_OWNER[msgid] = owner_user
                            existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
                            if existing_ref:
                                email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
                                email_part = (email_part or '').strip()
                                if '(' not in existing_ref or not existing_ref.endswith(')') or owner_user not in existing_ref:
                                    MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
                except Exception:
                    pass
            return

        # Only process lines with message submission ('<=' indicates incoming/outgoing)
        if ' <= ' not in line:
            # Also track deliveries ('=>' for first, '->' for additional recipients)
            if ' => ' in line or ' -> ' in line:
                # Delivery line format: => recipient ...
                msg_id = _extract_message_id_from_any_line(line)
                if not msg_id:
                    return
                # Only count deliveries for messages that originated locally or via auth relay
                rtype = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
                if rtype not in ('local', 'auth'):
                    return
                # Check dedupe for this (msgid, recipient) pair
                recipient = _extract_recipient_from_arrow_line(line)
                sender = _extract_sender_from_arrow_line(line) or MSG_ID_TO_ACCOUNT.get(msg_id, '')
                if not recipient:
                    return
                dedupe_key = (msg_id, recipient)
                if dedupe_key in SEEN_MSG_RCPT_SET:
                    return
                SEEN_MSG_RCPT.append((dedupe_key, line_ts))
                SEEN_MSG_RCPT_SET.add(dedupe_key)
                # Build account_ref from cached data or line
                account_ref = MSG_ID_TO_ACCOUNT.get(msg_id) or sender or 'Unknown'
                subject = MSG_ID_TO_SUBJECT.get(msg_id, '')
                # Record recipient with timestamp for this account
                try:
                    rec_map = RECIPIENTS_BY_ACCOUNT.setdefault(account_ref, {})
                    rec_map[recipient] = line_ts
                except Exception:
                    pass
                # Update per-owner event log (for freeze reports) - one event per delivery
                try:
                    owner = MSG_ID_TO_OWNER.get(msg_id) or _resolve_limit_account_key(account_ref)
                    client = MSG_ID_TO_CLIENT.get(msg_id, {})
                    relay = MSG_ID_TO_RELAY.get(msg_id, {})
                    relay_type = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
                    ev = {
                        'ts': line_ts,
                        'sender': account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref,
                        'recipient': recipient,
                        'subject': subject,
                        'qid': msg_id,
                        'client': client,
                        'relay': relay,
                        'relayType': relay_type
                    }
                    dq = ACCOUNT_EVENTS.setdefault(owner, deque())
                    dq.append(ev)
                    # Limit retention
                    cutoff = line_ts - ACCOUNT_EVENTS_RETENTION_SECS
                    while dq and dq[0].get('ts', 0) < cutoff:
                        dq.popleft()
                except Exception:
                    pass
                # Count each delivery (recipient) as one email for limits
                # Fetch account limits for inline enforcement
                try:
                    limits = fetch_account_limits(None)  # Use cached limits
                    lim_by_account = {}
                    for lim in limits:
                        if not lim.get('enabled', True):
                            continue
                        acc = (lim.get('account') or '').strip().lower()
                        max_emails = int(lim.get('maxEmails', 0))
                        per_seconds = int(lim.get('perSeconds', 60))
                        if acc and max_emails > 0 and per_seconds > 0:
                            lim_by_account[acc] = (max_emails, per_seconds)
                    print(f"[agent] Processing delivery: account_ref='{account_ref}', recipient='{recipient}', limits loaded={len(lim_by_account)}")
                    _record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
                except Exception as e:
                    print(f"[agent] Error in delivery limit check: {e}")
                    _record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
            return
        
        # Message submission line ('<=' indicates the message was received)
        msg_id = _extract_message_id_from_exim_line(line)
        if not msg_id or msg_id in SEEN_LOG_MSG_IDS_SET:
            return
        # Mark as seen
        SEEN_LOG_MSG_IDS.append((msg_id, line_ts))
        SEEN_LOG_MSG_IDS_SET.add(msg_id)
        # Extract account reference
        account_ref = _extract_account_ref_from_exim_line(line)
        if not account_ref or account_ref == "Unknown":
            return
        # Extract subject from line (T="...")
        subject = ''
        try:
            m_subj = re.search(r"\bT=\"([^\"]*)\"", line)
            if m_subj:
                subject = m_subj.group(1).strip()
        except Exception:
            pass
        # Cache for later delivery lines
        MSG_ID_TO_ACCOUNT[msg_id] = account_ref
        MSG_ID_TO_SUBJECT[msg_id] = subject
        # Parse client and relay info for freeze reports
        try:
            client_h, client_ip = _extract_client_host_ip_from_line(line)
            if client_h or client_ip:
                MSG_ID_TO_CLIENT[msg_id] = {'h': client_h, 'ip': client_ip}
            # Relay type: P=local, P=esmtpa (auth), etc.
            relay_type = 'unknown'
            line_lc = line.lower()
            if 'p=local' in line_lc:
                relay_type = 'local'
            elif 'a=' in line or 'p=esmtpa' in line_lc or 'p=esmtpsa' in line_lc:
                relay_type = 'auth'
            elif 'p=esmtps' in line_lc or 'p=esmtp' in line_lc or 'p=spam-scanned' in line_lc:
                relay_type = 'external'
            MSG_ID_TO_RELAY_TYPE[msg_id] = relay_type
            # Owner resolution
            sender_email = account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref
            owner = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account_ref)
            MSG_ID_TO_OWNER[msg_id] = owner
        except Exception:
            pass
        # Only count outgoing submissions (local or authenticated). Skip external/inbound.
        if MSG_ID_TO_RELAY_TYPE.get(msg_id) not in ('local', 'auth'):
            return
        # Fetch account limits for inline enforcement
        try:
            limits = fetch_account_limits(None)  # Use cached limits
            lim_by_account = {}
            for lim in limits:
                if not lim.get('enabled', True):
                    continue
                acc = (lim.get('account') or '').strip().lower()
                max_emails = int(lim.get('maxEmails', 0))
                per_seconds = int(lim.get('perSeconds', 60))
                if acc and max_emails > 0 and per_seconds > 0:
                    lim_by_account[acc] = (max_emails, per_seconds)
            _record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
        except Exception:
            _record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
    except Exception as e:
        pass  # Silent fail per line to keep processing


def _account_key_for_limits(account_ref: str) -> str:
    try:
        if '(' in account_ref and account_ref.endswith(')'):
            # sender(user)
            inside = account_ref[account_ref.rfind('(')+1:-1]
            if inside:
                return inside.lower()
        # else maybe it's just an email or username; prefer local-part for email
        m = re.match(r"([^@]+)@", account_ref)
        if m:
            return m.group(1).lower()
        return account_ref.strip().lower()
    except Exception:
        return account_ref.strip().lower()


def _prune_deque(dq: deque, cutoff_ts: float):
    while dq and dq[0] < cutoff_ts:
        dq.popleft()


def _reload_exim():
    try:
        # Prefer systemctl, fallback to service
        res = subprocess.run(['systemctl', 'reload', 'exim'], capture_output=True, text=True, timeout=5)
        if res.returncode != 0:
            subprocess.run(['service', 'exim', 'reload'], capture_output=True, text=True, timeout=5)
    except Exception as e:
        print(f"[agent] Exim reload failed: {e}")


def _ensure_freeze_list():
    try:
        if not FREEZE_LIST_PATH.exists():
            # Create file with safe perms
            FREEZE_LIST_PATH.parent.mkdir(parents=True, exist_ok=True)
            FREEZE_LIST_PATH.write_text("")
            FREEZE_LIST_PATH.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
    except Exception as e:
        print(f"[agent] Could not ensure freeze list: {e}")


def _ensure_virtual_files():
    try:
        VIRTUAL_DIR.mkdir(parents=True, exist_ok=True)
        if not OSM_HOLD_FILE.exists():
            OSM_HOLD_FILE.write_text("")
            OSM_HOLD_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
        if not OSM_DISABLE_FILE.exists():
            OSM_DISABLE_FILE.write_text("")
            OSM_DISABLE_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
    except Exception as e:
        print(f"[agent] Could not ensure /etc/virtual files: {e}")


def add_sender_to_freeze_list(sender_email: str, reason: str = None):
    """Legacy compatibility wrapper. OSM now relies on /etc/virtual/osm_hold only."""
    try:
        if not sender_email:
            return
        print(f"[agent] (notice) Skipping legacy freeze list update for {sender_email}")
    except Exception:
        pass


def remove_sender_from_freeze_list(sender_email: str):
    """Legacy compatibility wrapper. Nothing to remove from legacy list anymore."""
    try:
        sender_email = (sender_email or '').strip()
        if sender_email:
            print(f"[agent] (notice) Legacy freeze list disabled; nothing to remove for {sender_email}")
    except Exception:
        pass
    return False


def add_user_to_hold_list(username, sender_email=None, reason: str = None):
    try:
        username = (username or '').strip()
        email_full = (sender_email or '').strip()
        if not username and not email_full:
            return
        _ensure_virtual_files()
        existing = set()
        try:
            for line in OSM_HOLD_FILE.read_text().splitlines():
                s = (line or '').strip()
                if s:
                    existing.add(s)
        except Exception:
            pass
        to_add = []
        if username and username not in existing:
            to_add.append(username)
        if email_full and email_full not in existing:
            to_add.append(email_full)
        if to_add:
            with OSM_HOLD_FILE.open('a') as f:
                for item in to_add:
                    f.write(item + "\n")
            _reload_exim()
            print(f"[agent] Added to OSM hold list: {', '.join(to_add)}")
            # Mark local hold/freeze update time
            try:
                global LAST_LOCAL_HOLD_UPDATE_TS
                LAST_LOCAL_HOLD_UPDATE_TS = time.time()
            except Exception:
                pass
            # Notify server immediately so reconcile will retain this entry
            notified = False
            if BASE_URL and TOKEN and CURRENT_SERVER_ID is not None:
                try:
                    session.post(
                        f"{BASE_URL}/api/osm/agent/freeze",
                        json={
                            "serverId": str(CURRENT_SERVER_ID),
                            "senderEmail": email_full,
                            "account": username,
                            "reason": (reason or "locally added by agent")
                        },
                        headers=DEFAULT_HEADERS,
                        timeout=5,
                    )
                    notified = True
                except Exception as e:
                    print(f"[agent] Failed to notify server freeze (hold): {e}")
            if not notified:
                try:
                    _queue_action({
                        "type": "freeze",
                        "senderEmail": email_full or username,
                        "account": username or "",
                        "reason": reason or "locally added by agent",
                        "timestamp": time.time(),
                    })
                except Exception:
                    pass
    except Exception as e:
        print(f"[agent] Error updating OSM hold list: {e}")


def remove_user_from_hold_list(username=None, sender_email=None):
    try:
        targets = set()
        if username:
            targets.add(username.strip().lower())
        if sender_email:
            targets.add(sender_email.strip().lower())
        if not targets:
            return False
        _ensure_virtual_files()
        changed = False
        try:
            lines = []
            for line in OSM_HOLD_FILE.read_text().splitlines():
                entry = (line or '').strip()
                if entry and entry.lower() not in targets:
                    lines.append(entry)
                else:
                    changed = True
            if changed:
                OSM_HOLD_FILE.write_text("\n".join(lines) + ("\n" if lines else ""))
                _reload_exim()
                try:
                    global LAST_LOCAL_HOLD_UPDATE_TS
                    LAST_LOCAL_HOLD_UPDATE_TS = time.time()
                except Exception:
                    pass
        except FileNotFoundError:
            changed = False
        return changed
    except Exception as e:
        print(f"[agent] Error removing from OSM hold list: {e}")
        return False


def _maybe_trigger_freeze(account_ref, threshold, dq, sid=None):
    try:
        key = (account_ref, threshold["name"], threshold["seconds"])
        events_required = int(threshold["events"])
        if len(dq) >= events_required:
            last = LAST_TRIGGERED.get(key)
            now = time.time()
            # Avoid re-triggering too often: only once per window
            if not last or (now - last) >= threshold["seconds"]:
                sender_email = account_ref.split('(')[0]
                # Derive username for OSM hold list
                username = extract_username_from_account_ref(account_ref)
                owner_key = (_resolve_owner_from_email(sender_email) or username or '').strip()
                hold_owner = _should_hold_owner(owner_key, sender_email)
                username_for_hold = owner_key if hold_owner else ""
                # Local relay (hold_owner==True) → freeze account only; otherwise freeze email only
                add_user_to_hold_list(
                    username_for_hold,
                    None if hold_owner else sender_email,
                    reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s"
                )
                # Maintain legacy exim sender freeze list for generic setups
                add_sender_to_freeze_list(sender_email, reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s")
                LAST_TRIGGERED[key] = now
                reason_text = f"Exceeded {threshold['events']} in {threshold['seconds']}s"
                limit_label = f"Threshold: {threshold['events']} events"
                _generate_freeze_report(account_ref, owner_key or username, sender_email, reason_text, int(threshold["seconds"]), limit_label, sid)
                print(f"[agent] Threshold exceeded for {account_ref}: {len(dq)}/{events_required} in {threshold['seconds']}s (freeze)")
    except Exception as e:
        print(f"[agent] Freeze decision error: {e}")


def _record_event(account_ref, subject, thresholds, sid=None, lim_by_account=None, event_ts=None):
    event_time = event_ts if event_ts is not None else time.time()
    # Remember last subject (best-effort)
    try:
        sub = (subject or '').strip()
        if sub:
            # Collapse newlines/tabs and trim
            sub = re.sub(r"\s+", " ", sub)[:200]
            LAST_SUBJECT_BY_ACCOUNT[account_ref] = sub
    except Exception:
        pass

    owner_key_for_limits = _resolve_limit_account_key(account_ref)
    has_account_limit = bool(lim_by_account and owner_key_for_limits in lim_by_account)
    has_default_limit = bool(DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0)

    for t in thresholds:
        if (t.get("type") or "logline") != "logline":
            continue
        window = int(t["seconds"])
        key = (account_ref, t["name"], window)
        dq = LOG_COUNTERS[key]
        _prune_deque(dq, event_time - window)
        dq.append(event_time)
        LAST_COUNTER_SEEN_TS[key] = event_time
        if not has_account_limit and not has_default_limit:
            _maybe_trigger_freeze(account_ref, t, dq, sid)

    # Apply per-account limits from Manage
    try:
        if lim_by_account is not None:
            owner_key = owner_key_for_limits
            # Debug: print resolved owner and available limits
            if lim_by_account:
                print(f"[agent] Resolved owner_key='{owner_key}' from account_ref='{account_ref}', available limits={list(lim_by_account.keys())}")
            # 1) Account-specific limit
            if owner_key in lim_by_account:
                max_emails, per_seconds = lim_by_account[owner_key]
                dq = ACCOUNT_LIMIT_WINDOWS[owner_key]
                _prune_deque(dq, event_time - per_seconds)
                dq.append(event_time)
                # Also mirror this into tracking DB counters under a standard trigger name
                trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
                tdq = LOG_COUNTERS[trig_key]
                _prune_deque(tdq, event_time - per_seconds)
                tdq.append(event_time)
                if len(dq) > max_emails:
                    # Deduplicate: only trigger once per window
                    last_triggered = LAST_LIMIT_TRIGGERED.get(owner_key, 0)
                    already_triggered = (event_time - last_triggered) < per_seconds
                    if not already_triggered:
                        # Over limit: freeze sender
                        sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
                        reason_text = f"Exceeded {max_emails} in {per_seconds}s"
                        log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
                        if not log_evidence:
                            print(f"[agent] Queue limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
                        else:
                            hold_owner = _should_hold_owner(owner_key, sender_email)
                            username_for_hold = owner_key if hold_owner else ""
                            # Local relay → freeze account only; otherwise freeze email only
                            add_user_to_hold_list(
                                username_for_hold,
                                None if hold_owner else sender_email,
                                reason=reason_text
                            )
                            add_sender_to_freeze_list(sender_email, reason=reason_text)
                            LAST_LIMIT_TRIGGERED[owner_key] = event_time
                            limit_label = f"Limit: {max_emails}"
                            _generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
            # 2) Server default limit if no account-specific
            elif DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0:
                per_seconds = DEFAULT_PER_SECONDS
                max_emails = DEFAULT_MAX_EMAILS
                dq = ACCOUNT_LIMIT_WINDOWS[f"default::{owner_key}"]
                _prune_deque(dq, event_time - per_seconds)
                dq.append(event_time)
                # Mirror to tracking DB under default window
                trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
                tdq = LOG_COUNTERS[trig_key]
                _prune_deque(tdq, event_time - per_seconds)
                tdq.append(event_time)
                if len(dq) > max_emails:
                    # Deduplicate: only trigger once per window (use default:: prefix for key)
                    dedup_key = f"default::{owner_key}"
                    last_triggered = LAST_LIMIT_TRIGGERED.get(dedup_key, 0)
                    already_triggered = (event_time - last_triggered) < per_seconds
                    if not already_triggered:
                        sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
                        username = extract_username_from_account_ref(account_ref)
                        reason_text = f"Exceeded default {max_emails} in {per_seconds}s"
                        log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
                        if not log_evidence:
                            print(f"[agent] Default limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
                        else:
                            hold_owner = _should_hold_owner(owner_key, sender_email)
                            username_for_hold = owner_key if hold_owner else ""
                            # Local relay → freeze account only; otherwise freeze email only
                            add_user_to_hold_list(
                                username_for_hold,
                                None if hold_owner else sender_email,
                                reason=reason_text
                            )
                            add_sender_to_freeze_list(sender_email, reason=reason_text)
                            LAST_LIMIT_TRIGGERED[dedup_key] = event_time
                            limit_label = f"Default Limit: {max_emails}"
                            _generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
    except Exception as e:
        print(f"[agent] Error applying per-account limits: {e}")


def write_tracking_db(thresholds):
    try:
        lines = []
        now_ts = time.time()
        valid_windows = {(t["name"], int(t["seconds"])) for t in thresholds if (t.get("type") or "logline") == "logline"}
        # Build a set of keys to write: active counters and recently seen counters
        keys_to_write = set(LOG_COUNTERS.keys())
        for k, ts in list(LAST_COUNTER_SEEN_TS.items()):
            if now_ts - ts <= COUNTER_RETENTION_SECS:
                keys_to_write.add(k)
        for (account_ref, trig_name, window) in sorted(keys_to_write):
            dq = LOG_COUNTERS.get((account_ref, trig_name, window), deque())
            if not dq and (trig_name, window) not in valid_windows and not trig_name.startswith("limit_"):
                continue
            # prune before counting
            _prune_deque(dq, now_ts - window)
            count = len(dq)
            # Skip if no count and outside retention
            last_ts = LAST_COUNTER_SEEN_TS.get((account_ref, trig_name, window), 0)
            if count == 0 and (now_ts - last_ts) > COUNTER_RETENTION_SECS:
                continue
            subject = (LAST_SUBJECT_BY_ACCOUNT.get(account_ref) or '').strip()
            subject = re.sub(r"\s+", " ", subject)[:200] if subject else ''
            # Compose recipients string
            rcpts = []
            try:
                rec_map = RECIPIENTS_BY_ACCOUNT.get(account_ref) or {}
                # prune again relative to now
                for r, t in list(rec_map.items()):
                    if now_ts - t > RECIPIENT_RETENTION_SECS:
                        rec_map.pop(r, None)
                rcpts = sorted(rec_map.keys())[:50]
            except Exception:
                rcpts = []
            lines.append(f"{count}event percent")
            lines.append(f"{window}seconds")
            # Append recipients after subject using rcpt-> comma-separated
            rcpt_part = f"rcpt->{','.join(rcpts)}" if rcpts else "rcpt->"
            lines.append(f"{account_ref}->{trig_name}->logline->subject->{subject} {rcpt_part}")
        TRACKING_DB_PATH.write_text("\n".join(lines) + ("\n" if lines else ""))
    except Exception as e:
        print(f"[agent] Error writing tracking DB: {e}")


def _get_subject_for_message_id(msg_id: str) -> str:
    """Fetch Subject header for a given exim message id using exim -Mvh."""
    try:
        if not msg_id:
            return ""
        res = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
        if res.returncode != 0:
            return ""
        headers = res.stdout
        m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
        if m:
            return m.group(1).strip()
    except Exception:
        pass
    return ""


def _extract_subject_from_headers_text(headers: str) -> str:
    try:
        if not headers:
            return ""
        m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
        if m:
            return m.group(1).strip()
    except Exception:
        pass
    return ""


def get_email_headers(msg_id):
    """Fetch email headers using exim -Mvh"""
    try:
        result = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            headers = result.stdout.strip()
            return headers
        else:
            return f"Error fetching headers: {result.stderr.strip()}"
    except Exception as e:
        return f"Error fetching headers: {str(e)}"


def get_email_body(msg_id):
    """Fetch email body using exim -Mvb"""
    try:
        result = subprocess.run(['exim', '-Mvb', msg_id], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            body = result.stdout.strip()
            return body
        else:
            return f"Error fetching body: {result.stderr.strip()}"
    except Exception as e:
        return f"Error fetching body: {str(e)}"



def _process_pending_actions():
    sid = _get_current_server_id()
    if not BASE_URL or not TOKEN or not sid:
        return
    actions = _read_actions_queue()
    if not actions:
        return
    remaining = []
    for action in actions:
        typ = action.get('type')
        try:
            if typ == 'freeze':
                payload = {
                    "serverId": sid,
                    "senderEmail": action.get('senderEmail', ''),
                    "account": action.get('account') or '',
                    "reason": action.get('reason') or 'CLI freeze'
                }
                session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'unfreeze':
                payload = {
                    "serverId": sid,
                    "senderEmail": action.get('senderEmail', ''),
                    "account": action.get('account') or ''
                }
                session.post(f"{BASE_URL}/api/osm/agent/unfreeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'set_limit':
                payload = {
                    "serverId": sid,
                    "account": action.get('account', ''),
                    "maxEmails": int(action.get('maxEmails', 0)),
                    "perSeconds": int(action.get('perSeconds', 0)),
                    "enabled": bool(action.get('enabled', True))
                }
                session.post(f"{BASE_URL}/api/osm/agent/limit", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'remove_limit':
                payload = {
                    "serverId": sid,
                    "account": action.get('account', '')
                }
                session.post(f"{BASE_URL}/api/osm/agent/limit-remove", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'set_default_limit':
                payload = {
                    "serverId": sid,
                    "maxEmails": int(action.get('maxEmails', 0)),
                    "perSeconds": int(action.get('perSeconds', 0)),
                    "enabled": bool(action.get('enabled', True))
                }
                session.post(f"{BASE_URL}/api/osm/agent/limit-default", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            else:
                continue
        except Exception as e:
            print(f"[agent] Failed to sync action {typ}: {e}")
            remaining.append(action)
            continue
    _write_actions_queue(remaining)


def _append_client_summary(lines, events_window):
    try:
        ips = []
        hosts = []
        seen_ips = set()
        seen_hosts = set()
        for ev in events_window:
            client = ev.get('client', {}) or {}
            host, ip = _resolve_client_details(ev.get('qid') or '', client)
            if ip and ip not in seen_ips:
                seen_ips.add(ip)
                ips.append(ip)
            if host and host not in seen_hosts:
                seen_hosts.add(host)
                hosts.append(host)
        if ips:
            lines.append(f"Client IPs: {', '.join(ips)}\n")
        if hosts:
            lines.append(f"Client Hosts: {', '.join(hosts)}\n")
    except Exception:
        pass


def get_exim_queue_stats(stats):
    """Populate queue statistics using exim commands."""
    try:
        result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
        if result.returncode != 0:
            return stats
        lines = result.stdout.splitlines()
        details = []
        current = None
        for raw in lines:
            line = (raw or '').rstrip()
            if not line:
                continue
            stripped = line.lstrip()
            if not stripped:
                continue
            if stripped.startswith('**') or stripped.startswith('=='):
                continue
            is_header_line = False
            try:
                parts = stripped.split()
                if len(parts) < 3:
                    is_header_line = False
                else:
                    age_token = parts[0]
                    second_token = parts[1] if len(parts) > 1 else ''
                    third_token = parts[2] if len(parts) > 2 else ''
                    # very loose check: age token contains digit and second token is numeric-ish
                    size_token = second_token.upper()
                    if any(ch.isdigit() for ch in age_token) and (size_token.replace('.', '').replace('K','').replace('M','').isdigit()):
                        is_header_line = True
            except Exception:
                is_header_line = False

            if is_header_line:
                age = parts[0]
                size = parts[1]
                queue_id = parts[2]
                sender = ""
                if len(parts) >= 4:
                    sender = parts[3]
                if sender == "<=" and len(parts) >= 5:
                    sender = parts[4]
                current = {
                    "queueId": queue_id,
                    "messageId": queue_id,
                    "age": age,
                    "size": size,
                    "sender": sender,
                    "status": stripped,
                    "recipients": [],
                    "subject": "",
                    "headers": "",
                    "body": "",
                    "ownerAccount": "",
                }
                details.append(current)
                try:
                    headers = get_email_headers(queue_id)
                    if headers and not headers.startswith("Error fetching"):
                        current["headers"] = headers
                        current["subject"] = _extract_subject_from_headers_text(headers) or current["subject"]
                except Exception:
                    pass
                try:
                    subj_inline = _get_subject_for_message_id(queue_id)
                    if subj_inline:
                        current["subject"] = subj_inline
                except Exception:
                    pass
                continue

            if current is not None:
                recipient = stripped
                if recipient:
                    current["recipients"].append(recipient)
        # Optionally attach short recipient preview and body (lazy fetch only if small queue)
        for entry in details[:50]:
            try:
                if not entry.get("body"):
                    body = get_email_body(entry["queueId"])
                    if body and not body.startswith("Error fetching"):
                        # Avoid massive payloads
                        entry["body"] = body[:10000]
            except Exception:
                pass
            if entry.get("recipients"):
                entry["recipient"] = ", ".join(entry["recipients"])
            else:
                entry["recipient"] = ""
            if entry.get("headers"):
                owner = extract_account_from_detail({
                    "headers": entry["headers"],
                    "sender": entry.get("sender", ""),
                    "recipient": entry.get("recipient", "")
                })
                entry["ownerAccount"] = owner
        stats["queueDetails"] = details
        stats["totalMessages"] = len(details)
        stats["activeMessages"] = len(details)
        return stats
    except Exception as e:
        print(f"[agent] Error getting exim queue stats: {e}")
        return stats


def get_postfix_queue_stats(stats):
    """Get queue statistics using postfix commands"""
    try:
        # Get queue summary using postqueue
        result = subprocess.run(['postqueue', '-p'], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            output = result.stdout
            
            # Parse postqueue output
            lines = output.strip().split('\n')
            for line in lines[1:]:  # Skip header line
                if line.strip() and not line.startswith('-'):
                    parts = line.split()
                    if len(parts) >= 5:
                        queue_id = parts[0]
                        size = parts[1]
                        nrcpt = parts[2]
                        time_str = parts[3]
                        sender = parts[4] if len(parts) > 4 else ""
                        
                        stats["queueDetails"].append({
                            "queueId": queue_id,
                            "size": size,
                            "recipients": nrcpt,
                            "time": time_str,
                            "sender": sender,
                            "ownerAccount": (_resolve_owner_from_email(_extract_email(sender) or '') or '')
                        })
                        
                        # Categorize messages
                        if queue_id.startswith('active'):
                            stats["activeMessages"] += 1
                        elif queue_id.startswith('deferred'):
                            stats["deferredMessages"] += 1
                        elif queue_id.startswith('hold'):
                            stats["holdMessages"] += 1
                        elif queue_id.startswith('incoming'):
                            stats["incomingMessages"] += 1
                        else:
                            stats["outgoingMessages"] += 1
                            
                        stats["totalMessages"] += 1
        
    except Exception as e:
        print(f"[agent] Error getting postfix queue stats: {e}")
    
    return stats


def get_mail_queue_stats(queue_path: str):
    """Get mail queue statistics using appropriate mail server commands"""
    stats = {
        "totalMessages": 0,
        "activeMessages": 0,
        "deferredMessages": 0,
        "holdMessages": 0,
        "incomingMessages": 0,
        "outgoingMessages": 0,
        "queueDetails": [],
        "spamScores": [],
        "blockedDomains": []
    }
    mail_server = detect_mail_server()
    try:
        if mail_server == 'exim':
            return get_exim_queue_stats(stats)
        elif mail_server == 'postfix':
            return get_postfix_queue_stats(stats)
        else:
            # Fallback: try exim totals (noop) then postfix
            s = stats
            s = get_postfix_queue_stats(s)
            return s
    except Exception:
        return stats


def get_spam_scores(queue_path: str, mail_server: str):
    """Extract spam scores from mail headers"""
    spam_scores = []
    try:
        if mail_server == 'exim':
            spam_scores = get_exim_spam_scores()
        elif mail_server == 'postfix':
            spam_scores = get_postfix_spam_scores(queue_path)
        else:
            # Try both methods
            spam_scores = get_exim_spam_scores()
            if not spam_scores:
                spam_scores = get_postfix_spam_scores(queue_path)
    except Exception as e:
        print(f"[agent] Error getting spam scores: {e}")
    
    return spam_scores


def get_exim_spam_scores():
    """Get spam scores from exim queue using exim -Mvh"""
    spam_scores = []
    try:
        # Get list of queued messages (with sudo for permission)
        result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            lines = result.stdout.strip().split('\n')
            msg_ids = []
            for line in lines:
                if line.strip() and not line.startswith('-') and not line.startswith('Mail queue'):
                    parts = line.split()
                    if len(parts) >= 1:
                        msg_ids.append(parts[0])
            
            # Get headers for first few messages to check for spam scores
            for msg_id in msg_ids[:10]:  # Limit to 10 messages
                try:
                    header_result = subprocess.run(['sudo', 'exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
                    if header_result.returncode == 0:
                        headers = header_result.stdout
                        for line in headers.split('\n'):
                            if 'X-Spam-Score:' in line or 'X-Spam-Level:' in line or 'X-Spam-Status:' in line:
                                score_match = re.search(r'(\d+\.?\d*)', line)
                                if score_match:
                                    spam_scores.append({
                                        "messageId": msg_id,
                                        "score": float(score_match.group(1)),
                                        "header": line.strip()
                                    })
                except Exception:
                    continue
    except Exception as e:
        print(f"[agent] Error getting exim spam scores: {e}")
    
    return spam_scores


def get_postfix_spam_scores(queue_path: str):
    """Get spam scores from postfix queue files"""
    spam_scores = []
    try:
        # Look for recent messages in the queue
        queue_dirs = ['active', 'deferred', 'incoming']
        for queue_dir in queue_dirs:
            queue_dir_path = Path(queue_path) / queue_dir
            if queue_dir_path.exists():
                for msg_file in queue_dir_path.iterdir():
                    if msg_file.is_file():
                        try:
                            # Read first few lines to get headers
                            with open(msg_file, 'r', encoding='utf-8', errors='ignore') as f:
                                headers = []
                                for i, line in enumerate(f):
                                    if i > 50:  # Only read first 50 lines
                                        break
                                    headers.append(line.strip())
                                    if line.strip() == '':  # End of headers
                                        break
                                
                                # Look for spam score headers
                                for header in headers:
                                    if 'X-Spam-Score:' in header or 'X-Spam-Level:' in header:
                                        score_match = re.search(r'(\d+\.?\d*)', header)
                                        if score_match:
                                            spam_scores.append({
                                                "messageId": msg_file.name,
                                                "score": float(score_match.group(1)),
                                                "header": header
                                            })
                        except Exception:
                            continue
                        
                        if len(spam_scores) >= 20:  # Limit to 20 recent scores
                            break
    except Exception as e:
        print(f"[agent] Error getting postfix spam scores: {e}")
    
    return spam_scores


def check_blocked_domains(mail_server: str):
    """Check for blocked domains in recent mail"""
    blocked_domains = []
    try:
        if mail_server == 'exim':
            blocked_domains = check_exim_blocked_domains()
        elif mail_server == 'postfix':
            blocked_domains = check_postfix_blocked_domains()
        else:
            # Try both methods
            blocked_domains = check_exim_blocked_domains()
            blocked_domains.extend(check_postfix_blocked_domains())
    except Exception as e:
        print(f"[agent] Error checking blocked domains: {e}")
    
    return blocked_domains


def check_exim_blocked_domains():
    """Check exim logs for blocked domains"""
    blocked_domains = []
    try:
        # Check exim logs for blocked domains
        log_files = [
            '/var/log/exim/mainlog',
            '/var/log/exim/paniclog',
            '/var/log/mail.log',
            '/var/log/maillog'
        ]
        
        for log_file in log_files:
            if os.path.exists(log_file):
                try:
                    # Get recent log entries
                    result = subprocess.run(
                        ['tail', '-100', log_file], 
                        capture_output=True, text=True, timeout=5
                    )
                    if result.returncode == 0:
                        for line in result.stdout.split('\n'):
                            if any(keyword in line.lower() for keyword in ['blocked', 'reject', 'denied', 'blacklisted']):
                                # Extract domain from log line
                                domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
                                if domain_match:
                                    domain = domain_match.group(1)
                                    if domain not in [bd['domain'] for bd in blocked_domains]:
                                        blocked_domains.append({
                                            "domain": domain,
                                            "timestamp": datetime.now().isoformat(),
                                            "reason": line.strip()
                                        })
                except Exception:
                    continue
    except Exception as e:
        print(f"[agent] Error checking exim blocked domains: {e}")
    
    return blocked_domains


def check_postfix_blocked_domains():
    """Check postfix logs for blocked domains"""
    blocked_domains = []
    try:
        # Check postfix logs for blocked domains
        log_files = [
            '/var/log/mail.log',
            '/var/log/postfix.log',
            '/var/log/maillog'
        ]
        
        for log_file in log_files:
            if os.path.exists(log_file):
                try:
                    # Get recent log entries
                    result = subprocess.run(
                        ['tail', '-100', log_file], 
                        capture_output=True, text=True, timeout=5
                    )
                    if result.returncode == 0:
                        for line in result.stdout.split('\n'):
                            if 'blocked' in line.lower() or 'reject' in line.lower():
                                # Extract domain from log line
                                domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
                                if domain_match:
                                    domain = domain_match.group(1)
                                    if domain not in [bd['domain'] for bd in blocked_domains]:
                                        blocked_domains.append({
                                            "domain": domain,
                                            "timestamp": datetime.now().isoformat(),
                                            "reason": line.strip()
                                        })
                except Exception:
                    continue
    except Exception as e:
        print(f"[agent] Error checking postfix blocked domains: {e}")
    
    return blocked_domains


def detect_mail_server():
    """Detect which mail server (Exim/Postfix) is available on this host."""
    try:
        # Prefer explicit binaries first
        if shutil.which('exim') or Path('/usr/sbin/exim').exists():
            return 'exim'
        if shutil.which('postqueue') or shutil.which('postfix'):
            return 'postfix'
        # Fallback heuristics: check for config directories
        if Path('/etc/exim').exists() or Path('/var/log/exim').exists():
            return 'exim'
        if Path('/etc/postfix').exists():
            return 'postfix'
    except Exception:
        pass
    return 'unknown'


def detect_control_panel():
    """Detect which control panel is installed (cpanel, directadmin, or unknown)."""
    try:
        if os.path.isdir('/usr/local/cpanel'):
            return 'cpanel'
        if os.path.isdir('/usr/local/directadmin'):
            return 'directadmin'
        # Try binaries
        try:
            res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
            if res.returncode == 0:
                return 'cpanel'
        except Exception:
            pass
        return 'unknown'
    except Exception:
        return 'unknown'


def list_cpanel_accounts():
    """List all cPanel accounts."""
    accounts = []
    try:
        # Try whmapi1 listaccts
        res = subprocess.run(['whmapi1', 'listaccts'], capture_output=True, text=True, timeout=30)
        if res.returncode == 0:
            import json
            try:
                data = json.loads(res.stdout)
                accts = data.get('data', {}).get('acct', [])
                for a in accts:
                    user = a.get('user', '')
                    domain = a.get('domain', '')
                    if user:
                        accounts.append({'username': user, 'domain': domain})
            except Exception:
                pass
        # Fallback: parse /etc/trueuserdomains
        if not accounts:
            p = Path('/etc/trueuserdomains')
            if p.exists():
                for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
                    if ':' in line:
                        parts = line.split(':', 1)
                        if len(parts) == 2:
                            domain = parts[0].strip()
                            user = parts[1].strip()
                            if user and domain:
                                accounts.append({'username': user, 'domain': domain})
    except Exception as e:
        print(f"[agent] Error listing cPanel accounts: {e}")
    return accounts


def list_directadmin_accounts():
    """List all DirectAdmin accounts."""
    accounts = []
    try:
        users_dir = Path('/usr/local/directadmin/data/users')
        if users_dir.exists() and users_dir.is_dir():
            for user_dir in users_dir.iterdir():
                if user_dir.is_dir():
                    username = user_dir.name
                    # Try to get primary domain
                    domain = ''
                    try:
                        domain_file = user_dir / 'domains.list'
                        if domain_file.exists():
                            domains = domain_file.read_text(encoding='utf-8', errors='ignore').splitlines()
                            if domains:
                                domain = domains[0].strip()
                    except Exception:
                        pass
                    accounts.append({'username': username, 'domain': domain})
    except Exception as e:
        print(f"[agent] Error listing DirectAdmin accounts: {e}")
    return accounts


def send_queue_snapshot(sid: int, stats: dict):
    """Send queue snapshot to server"""
    if not QUEUE_INGEST_ENABLED:
        print(f"[agent] Queue ingestion disabled; not sending snapshot")
        return
    try:
        # Debug: print what we're sending
        print(f"[agent] Sending snapshot with {len(stats['queueDetails'])} queue details")
        if stats["queueDetails"]:
            first_detail = stats["queueDetails"][0]
            print(f"[agent] First detail: queueId={first_detail.get('queueId')}, headers_len={len(first_detail.get('headers', ''))}, body_len={len(first_detail.get('body', ''))}")
        
        # Test JSON serialization first
        try:
            queue_details_json = json.dumps(stats["queueDetails"])
            print(f"[agent] JSON serialization successful, size: {len(queue_details_json)} chars")
        except Exception as json_error:
            print(f"[agent] JSON serialization error: {json_error}")
            # Try with limited data
            limited_details = []
            for detail in stats["queueDetails"][:5]:  # Only first 5 messages
                limited_detail = {
                    "queueId": detail.get("queueId", ""),
                    "size": detail.get("size", ""),
                    "recipients": detail.get("recipients", ""),
                    "time": detail.get("time", ""),
                    "sender": detail.get("sender", ""),
                    "recipient": detail.get("recipient", ""),
                    "headers": detail.get("headers", "")[:1000] + "..." if len(detail.get("headers", "")) > 1000 else detail.get("headers", ""),
                    "body": detail.get("body", "")[:1000] + "..." if len(detail.get("body", "")) > 1000 else detail.get("body", "")
                }
                limited_details.append(limited_detail)
            queue_details_json = json.dumps(limited_details)
            print(f"[agent] Using limited data, size: {len(queue_details_json)} chars")
        
        payload = {
            "serverId": str(sid),
            "totalMessages": stats["totalMessages"],
            "activeMessages": stats["activeMessages"],
            "deferredMessages": stats["deferredMessages"],
            "holdMessages": stats["holdMessages"],
            "incomingMessages": stats["incomingMessages"],
            "outgoingMessages": stats["outgoingMessages"],
            "queueDetailsJson": queue_details_json,
            "spamScoreJson": json.dumps(stats["spamScores"]),
            "blockedDomainsJson": json.dumps(stats["blockedDomains"])
        }
        
        print(f"[agent] Sending payload with total size: {len(json.dumps(payload))} chars")
        resp = session.post(f"{BASE_URL}/api/osm/agent/queue-snapshot", json=payload, headers=DEFAULT_HEADERS, timeout=30)
        resp.raise_for_status()
        print(f"[agent] Queue snapshot sent successfully")
        
    except Exception as e:
        print(f"[agent] Error sending queue snapshot: {e}")
        import traceback
        print(f"[agent] Traceback: {traceback.format_exc()}")


def send_accounts_snapshot(sid: int):
    """Detect panel and send accounts list to server."""
    try:
        panel = 'unknown'
        accounts = []
        # simple detection without shutil to avoid import if not present
        if os.path.isdir('/usr/local/cpanel'):
            panel = 'cpanel'
            accounts = list_cpanel_accounts()
        elif os.path.isdir('/usr/local/directadmin'):
            panel = 'directadmin'
            accounts = list_directadmin_accounts()
        else:
            # Try binaries
            try:
                res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
                if res.returncode == 0:
                    panel = 'cpanel'
                    accounts = list_cpanel_accounts()
            except Exception:
                pass
            if panel == 'unknown' and Path('/usr/local/directadmin').exists():
                panel = 'directadmin'
                accounts = list_directadmin_accounts()
        payload = {
            "serverId": str(sid),
            "panelType": panel,
            "accountsJson": json.dumps(accounts)
        }
        print(f"[agent] Sending accounts snapshot: panel={panel}, count={len(accounts)}")
        resp = session.post(f"{BASE_URL}/api/osm/agent/accounts", json=payload, headers=DEFAULT_HEADERS, timeout=15)
        resp.raise_for_status()
        print(f"[agent] Accounts snapshot sent successfully (panel={panel}, count={len(accounts)})")
    except Exception as e:
        print(f"[agent] Error sending accounts snapshot: {e}")


def fetch_account_limits(sid=None):
    # If offline or no server id, use cached limits only
    if sid is None or not BASE_URL or not TOKEN:
        cache = _read_limits_cache()
        return cache.get("accounts", [])
    try:
        resp = session.get(f"{BASE_URL}/api/osm/agent/account-limits", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
        resp.raise_for_status()
        data = resp.json() or []
        cache = _read_limits_cache()
        cache["accounts"] = [
            {
                "account": str(x.get("account") or "").strip(),
                "maxEmails": int(x.get("maxEmails") or 0),
                "perSeconds": int(x.get("perSeconds") or 0),
                "enabled": bool(x.get("enabled", True))
            }
            for x in data
        ]
        _write_limits_cache(cache)
        return data
    except Exception as e:
        print(f"[agent] Error fetching account limits: {e}")
        cache = _read_limits_cache()
        return cache.get("accounts", [])


def parse_age_to_seconds(age_str: str) -> int:
    try:
        s = age_str.strip()
        if not s:
            return 0
        unit = s[-1]
        val = int(''.join(ch for ch in s if ch.isdigit())) if any(ch.isdigit() for ch in s) else 0
        if unit == 's':
            return val
        if unit == 'm':
            return val * 60
        if unit == 'h':
            return val * 3600
        if unit == 'd':
            return val * 86400
        return val
    except Exception:
        return 0


def extract_account_from_detail(detail: dict) -> str:
    # Try headers for -ident or -auth_sender then fallback to recipient/sender
    try:
        headers = detail.get('headers') or ''
        if headers:
            m = re.search(r"-ident\s+(\w+)", headers)
            if m:
                return m.group(1)
            m = re.search(r"-auth_sender\s+([^@\s]+)@", headers)
            if m:
                return m.group(1)
            m = re.search(r"Received: from (\w+) by", headers)
            if m:
                return m.group(1)
        recip = (detail.get('recipient') or '').strip()
        if recip.startswith('(') and recip.endswith(')') and len(recip) > 2:
            return recip[1:-1]
        sender = (detail.get('sender') or '')
        m = re.search(r"<([^@>]+)@", sender)
        if m:
            return m.group(1)
    except Exception:
        pass
    return 'Unknown'


def freeze_message_exim(queue_id: str):
    try:
        subprocess.run(['exim', '-Mf', queue_id], capture_output=True, text=True, timeout=5)
    except Exception as e:
        print(f"[agent] exim freeze failed for {queue_id}: {e}")


def hold_message_postfix(queue_id: str):
    try:
        subprocess.run(['postsuper', '-h', queue_id], capture_output=True, text=True, timeout=5)
    except Exception as e:
        print(f"[agent] postfix hold failed for {queue_id}: {e}")


def enforce_account_limits(sid: int, stats: dict):
    # Live rolling window using first-seen timestamps per message
    # State containers (persist across calls via attributes on function)
    if not hasattr(enforce_account_limits, "seen_ids"):
        enforce_account_limits.seen_ids = set()
    if not hasattr(enforce_account_limits, "account_windows"):
        enforce_account_limits.account_windows = defaultdict(deque)  # account -> deque[timestamps]

    limits = fetch_account_limits(sid)
    if not limits:
        return

    now_ts = time.time()
    details = stats.get('queueDetails') or []
    mail_server = detect_mail_server()

    # Index limits for quick lookup
    lim_by_account = {}
    for lim in limits:
        try:
            if not lim.get('enabled', True):
                continue
            account = (lim.get('account') or '').strip().lower()
            max_emails = int(lim.get('maxEmails', 0))
            per_seconds = int(lim.get('perSeconds', 60))
            if account and max_emails > 0 and per_seconds > 0:
                lim_by_account[account] = (max_emails, per_seconds)
        except Exception:
            continue

    if not lim_by_account:
        return

    for d in details:
        log_evidence = False
        try:
            qid = d.get('queueId') or d.get('messageId')
            if not qid or qid in enforce_account_limits.seen_ids:
                continue
            account = extract_account_from_detail(d)
            # Resolve owner from sender email (prefer) else map account_ref to owner key
            sender_email = _extract_email(d.get('sender') or '') or _extract_email(account) or ''
            owner_key = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account)
            if owner_key not in lim_by_account:
                continue
            # Mark first seen and evaluate window
            enforce_account_limits.seen_ids.add(qid)
            max_emails, per_seconds = lim_by_account[owner_key]
            dq = enforce_account_limits.account_windows[owner_key]
            # prune old timestamps
            cutoff = now_ts - per_seconds
            while dq and dq[0] < cutoff:
                dq.popleft()
            dq.append(now_ts)
            if len(dq) > max_emails:
                # Over limit: act on this message immediately
                print(f"[agent] Limit exceeded for {owner_key}: {len(dq)}/{max_emails} in {per_seconds}s. Freezing/holding {qid}.")
                if mail_server == 'exim':
                    freeze_message_exim(qid)
                elif mail_server == 'postfix':
                    hold_message_postfix(qid)
                # Also add to hold lists for sustained blocking
                reason_text = f"Exceeded {max_emails} in {per_seconds}s (count={len(dq)})"
                log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
                if not log_evidence:
                    print(f"[agent] Queue enforcement detected over-limit for {owner_key} but no recent exim log activity; skipping persistent freeze.")
                else:
                    hold_owner = _should_hold_owner(owner_key, sender_email)
                    username_for_hold = owner_key if hold_owner else ""
                    try:
                        # Local relay → freeze account only; otherwise freeze email only
                        add_user_to_hold_list(
                            username_for_hold,
                            None if hold_owner else sender_email,
                            reason=reason_text
                        )
                        add_sender_to_freeze_list(sender_email, reason=reason_text)
                    except Exception as e:
                        print(f"[agent] Failed to update hold/freeze lists: {e}")
                    # Inform server so reconcile won't remove our local entry
                    fallback_events = _fallback_events_from_queue_detail(d, sender_email, owner_key)
                    limit_label = f"Limit: {max_emails}"
                    _generate_freeze_report(account, owner_key, sender_email, reason_text, per_seconds, limit_label, sid, fallback_events if fallback_events else None)
            else:
                log_evidence = False
        except Exception as e:
            print(f"[agent] Error enforcing per-message limit: {e}")
            
        except Exception:
            pass


def check_spam_thresholds(sid: int, stats: dict, config: dict):
    """Check if spam thresholds are exceeded and send alerts"""
    try:
        spam_threshold = config.get("spamThreshold", 5.0)
        high_spam_count = 0
        
        for spam_score in stats["spamScores"]:
            if spam_score["score"] >= spam_threshold:
                high_spam_count += 1
        
        if high_spam_count > 0:
            alert_payload = {
                "serverId": str(sid),
                "alertType": "HIGH_SPAM_SCORE",
                "severity": "WARNING" if high_spam_count < 10 else "CRITICAL",
                "message": f"Found {high_spam_count} messages with spam score >= {spam_threshold}",
                "detailsJson": json.dumps({
                    "highSpamCount": high_spam_count,
                    "threshold": spam_threshold,
                    "samples": stats["spamScores"][:5]  # First 5 samples
                })
            }
            
            resp = session.post(f"{BASE_URL}/api/osm/agent/spam-alert", json=alert_payload, headers=DEFAULT_HEADERS, timeout=10)
            resp.raise_for_status()
            
    except Exception as e:
        print(f"[agent] Error checking spam thresholds: {e}")


def _execute_queue_action_delete(ids):
    try:
        ok = 0
        for qid in ids:
            if not qid:
                continue
            try:
                res = subprocess.run(['exim', '-Mrm', qid], capture_output=True, text=True, timeout=10)
                if res.returncode == 0:
                    ok += 1
                else:
                    print(f"[agent] exim -Mrm {qid} failed: rc={res.returncode} stdout={res.stdout.strip()} stderr={res.stderr.strip()}")
            except Exception as e:
                print(f"[agent] delete_queue exception for {qid}: {e}")
                continue
        return ok
    except Exception as e:
        print(f"[agent] delete_queue failed: {e}")
        return 0


def _execute_queue_action_thaw(ids):
    try:
        ok = 0
        for qid in ids:
            if not qid:
                continue
            try:
                res = subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
                if res.returncode == 0:
                    ok += 1
            except Exception:
                continue
        return ok
    except Exception as e:
        print(f"[agent] thaw_queue failed: {e}")
        return 0


def _execute_queue_action_release(ids):
    """Release a frozen message: thaw and then schedule delivery."""
    try:
        ok = 0
        for qid in ids:
            if not qid:
                continue
            try:
                # Thaw first
                subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
                # Then attempt immediate delivery
                res = subprocess.run(['exim', '-M', qid], capture_output=True, text=True, timeout=15)
                if res.returncode == 0:
                    ok += 1
            except Exception:
                continue
        return ok
    except Exception as e:
        print(f"[agent] release_queue failed: {e}")
        return 0


def process_sse_event(event_data: str, sid: int):
    """Process a single SSE event containing a command.
    Note: Only events that include a non-empty 'type' are considered commands.
    """
    try:
        cmd = json.loads(event_data)
        cid = cmd.get('id') or ''
        ctype = (cmd.get('type') or '').strip().lower()
        payload_json = cmd.get('payloadJson') or '{}'
        if not ctype:
            # Ignore non-command SSE payloads (e.g., queue-update snapshots)
            return
        print(f"[agent] SSE event received: id={cid} type={ctype}")
        result_msg = ''
        try:
            if isinstance(payload_json, (dict, list)):
                payload = payload_json
            else:
                payload = json.loads(payload_json)
        except Exception:
            payload = {}
        ids = payload.get('queueIds') or []
        if ctype == 'delete_queue':
            n = _execute_queue_action_delete(ids)
            result_msg = f"deleted {n}/{len(ids)}"
            print(f"[agent] delete_queue: {result_msg}")
        elif ctype == 'thaw_queue':
            n = _execute_queue_action_thaw(ids)
            result_msg = f"thawed {n}/{len(ids)}"
            print(f"[agent] thaw_queue: {result_msg}")
        elif ctype == 'release_queue':
            n = _execute_queue_action_release(ids)
            result_msg = f"released {n}/{len(ids)}"
            print(f"[agent] release_queue: {result_msg}")
        elif ctype == 'snapshot_queue':
            # Just send a fresh snapshot
            try:
                cfg = fetch_config(sid) or {}
                if not QUEUE_INGEST_ENABLED or not cfg.get("queueIngestEnabled", True):
                    result_msg = "snapshot skipped (queue ingest disabled)"
                    print("[agent] snapshot_queue: skipped, queue ingest disabled")
                else:
                    stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
                    send_queue_snapshot(sid, stats)
                    result_msg = "snapshot sent"
                    print("[agent] snapshot_queue: snapshot sent")
            except Exception as e:
                result_msg = f"snapshot failed: {e}"
                print(f"[agent] snapshot_queue failed: {e}")
        elif ctype == 'get_live_queue':
            # On-demand live queue fetch - send current queue data back via callback
            try:
                request_id = payload.get('requestId') or ''
                sender_filter = (payload.get('senderEmail') or '').strip().lower()
                account_filter = (payload.get('account') or '').strip().lower()
                cfg = fetch_config(sid) or {}
                stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
                queue_details = stats.get('queueDetails', [])
                # Apply filters if provided
                if sender_filter or account_filter:
                    filtered = []
                    for detail in queue_details:
                        sender = (detail.get('sender') or '').lower()
                        owner = (detail.get('ownerAccount') or '').lower()
                        if sender_filter and sender_filter in sender:
                            filtered.append(detail)
                        elif account_filter and account_filter == owner:
                            filtered.append(detail)
                    queue_details = filtered
                # Send live queue data back to server
                response_payload = {
                    "serverId": str(sid),
                    "requestId": request_id,
                    "totalMessages": stats.get("totalMessages", 0),
                    "activeMessages": stats.get("activeMessages", 0),
                    "deferredMessages": stats.get("deferredMessages", 0),
                    "holdMessages": stats.get("holdMessages", 0),
                    "queueDetails": queue_details,
                    "spamScores": stats.get("spamScores", []),
                }
                session.post(
                    f"{BASE_URL}/api/osm/agent/live-queue-response",
                    json=response_payload,
                    headers=DEFAULT_HEADERS,
                    timeout=30
                )
                result_msg = f"live queue sent ({len(queue_details)} items)"
                print(f"[agent] get_live_queue: sent {len(queue_details)} queue items")
            except Exception as e:
                result_msg = f"live queue failed: {e}"
                print(f"[agent] get_live_queue failed: {e}")
        elif ctype == 'get_accounts':
            try:
                request_id = payload.get('requestId') or ''
                panel = 'unknown'
                accounts = []
                if os.path.isdir('/usr/local/cpanel'):
                    panel = 'cpanel'
                    accounts = list_cpanel_accounts()
                elif os.path.isdir('/usr/local/directadmin'):
                    panel = 'directadmin'
                    accounts = list_directadmin_accounts()
                else:
                    try:
                        res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
                        if res.returncode == 0:
                            panel = 'cpanel'
                            accounts = list_cpanel_accounts()
                    except Exception:
                        pass
                    if panel == 'unknown' and Path('/usr/local/directadmin').exists():
                        panel = 'directadmin'
                        accounts = list_directadmin_accounts()
                response_payload = {
                    "serverId": str(sid),
                    "requestId": request_id,
                    "panelType": panel,
                    "accountsJson": json.dumps(accounts)
                }
                session.post(
                    f"{BASE_URL}/api/osm/agent/accounts",
                    json=response_payload,
                    headers=DEFAULT_HEADERS,
                    timeout=15
                )
                result_msg = f"accounts sent ({len(accounts)} items)"
                print(f"[agent] get_accounts: sent {len(accounts)} accounts")
            except Exception as e:
                result_msg = f"get accounts failed: {e}"
                print(f"[agent] get_accounts failed: {e}")
        elif ctype == 'verify_email':
            try:
                payload = json.loads(payload_json)
            except Exception:
                payload = {}
            email = (payload.get('email') or '').strip().lower()
            exists = False
            owner = ''
            def _directadmin_mailbox_exists(addr: str) -> bool:
                try:
                    if '@' not in addr:
                        return False
                    localpart, domain = addr.split('@', 1)
                    domain = domain.strip().lower().rstrip('.')
                    localpart = localpart.strip().lower()
                    passwd_path = Path(f"/etc/virtual/{domain}/passwd")
                    if not passwd_path.exists():
                        return False
                    try:
                        for line in passwd_path.read_text(encoding='utf-8', errors='ignore').splitlines():
                            # DirectAdmin passwd format: localpart:password_hash:...
                            if not line or ':' not in line:
                                continue
                            lp = line.split(':', 1)[0].strip().lower()
                            if lp == localpart:
                                return True
                    except Exception:
                        return False
                    return False
                except Exception:
                    return False
            try:
                # Resolve owner by domain mapping
                owner = _resolve_owner_from_email(email) or ''
                # Check if any queued or recent message references this sender
                qstats = get_mail_queue_stats('/var/spool/postfix')
                for d in qstats.get('queueDetails', []):
                    s = (d.get('sender') or '').lower()
                    if email and email in s:
                        exists = True
                        break
                # If not in queue, check mailbox existence on DirectAdmin
                if not exists:
                    if detect_control_panel() == 'directadmin':
                        if _directadmin_mailbox_exists(email):
                            exists = True
            except Exception:
                pass
            result_msg = f"exists={'true' if exists else 'false'};owner={owner}"
        elif ctype == 'update_limits_cache':
            try:
                payload = json.loads(payload_json)
            except Exception:
                payload = {}
            # Validate and write to cache
            d = payload.get('default') or {}
            accs = payload.get('accounts') or []
            if isinstance(accs, list):
                clean_accs = []
                for x in accs:
                    try:
                        clean_accs.append({
                            'account': str((x.get('account') if isinstance(x, dict) else '') or '').strip(),
                            'maxEmails': int((x.get('maxEmails') if isinstance(x, dict) else 0) or 0),
                            'perSeconds': int((x.get('perSeconds') if isinstance(x, dict) else 0) or 0),
                            'enabled': bool((x.get('enabled') if isinstance(x, dict) else True))
                        })
                    except Exception:
                        continue
            else:
                clean_accs = []
            cache = {
                'default': {
                    'enabled': bool(d.get('enabled') or False),
                    'maxEmails': int(d.get('maxEmails') or 0),
                    'perSeconds': int(d.get('perSeconds') or 0)
                },
                'accounts': clean_accs
            }
            _write_limits_cache(cache)
            # Apply defaults immediately in-memory
            try:
                global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
                DEFAULT_MAX_EMAILS = cache['default']['maxEmails']
                DEFAULT_PER_SECONDS = cache['default']['perSeconds']
                DEFAULT_ENABLED = cache['default']['enabled']
            except Exception:
                pass
            result_msg = 'limits updated'
            print("[agent] update_limits_cache: limits updated")
        elif ctype == 'reconcile_freeze':
            try:
                # Server-initiated reconcile: prefer the server state to avoid re-adding a user just un-frozen from the app
                reconcile_freeze_file_with_server(sid, prefer_server=True)
                result_msg = 'reconciled (server-wins)'
            except Exception as e:
                result_msg = f'reconcile failed: {e}'
            print(f"[agent] reconcile_freeze: {result_msg}")
        elif ctype == 'self_update':
            # Perform self-update in a detached shell so we can ack immediately
            try:
                print("[agent] self_update: scheduling self-update...")
                update_sh = f"""#!/usr/bin/env bash
set -euo pipefail
APP_DIR=/opt/osm
TMP_ZIP=$(mktemp)
echo "[agent] Starting self-update from {BASE_URL}"
curl -fsSL "{BASE_URL}/osm/agent/download" -o "$TMP_ZIP" || {{ echo "Download failed"; exit 1; }}
echo "[agent] Downloaded agent package"
mkdir -p "$APP_DIR"
unzip -o "$TMP_ZIP" -d "$APP_DIR" || {{ echo "Unzip failed"; exit 1; }}
rm -f "$TMP_ZIP"
echo "[agent] Extracted to $APP_DIR"

# Set up Python environment
if [ ! -x "$APP_DIR/venv/bin/python" ]; then
  echo "[agent] Creating Python virtual environment"
  if [ -x "/opt/osm/py310/bin/python3.10" ]; then PY_BIN="/opt/osm/py310/bin/python3.10";
  elif command -v python3.10 >/dev/null 2>&1; then PY_BIN=$(command -v python3.10);
  elif [ -x /usr/bin/python3.10 ]; then PY_BIN=/usr/bin/python3.10;
  elif [ -x /usr/local/bin/python3.10 ]; then PY_BIN=/usr/local/bin/python3.10;
  else PY_BIN=$(command -v python3 || echo /usr/bin/python3); fi
  "$PY_BIN" -m venv "$APP_DIR/venv" || {{ echo "Virtual environment creation failed"; exit 1; }}
fi

echo "[agent] Updating Python packages"
"$APP_DIR/venv/bin/pip" install --upgrade pip || true
"$APP_DIR/venv/bin/pip" install --upgrade requests psutil || echo "Package update warning"

echo "[agent] Setting permissions"
chmod +x "$APP_DIR/uninstall-agent.sh" 2>/dev/null || true
chmod +x "$APP_DIR/osm" 2>/dev/null || true
ln -sf "$APP_DIR/osm" /usr/local/bin/osm 2>/dev/null || true

echo "[agent] Restarting agent service"
systemctl restart osm 2>/dev/null || {{
  echo "[agent] Systemd restart failed, trying direct restart"
  pkill -f "python.*main.py" 2>/dev/null || true
  sleep 2
  cd "$APP_DIR" && nohup "$APP_DIR/venv/bin/python" main.py >/dev/null 2>&1 &
}}

echo "[agent] Self-update completed successfully"
"""
                # Write and run
                upd = Path('/tmp/osm_self_update.sh')
                upd.write_text(update_sh)
                os.chmod(str(upd), 0o755)
                # Run in background with proper logging
                subprocess.Popen(['nohup', 'bash', str(upd), '&>', '/tmp/osm_update.log', '&'],
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                result_msg = 'update scheduled'
                print(f"[agent] Self-update scheduled, check /tmp/osm_update.log for details")
            except Exception as e:
                result_msg = f'update failed: {e}'
                print(f"[agent] Self-update error: {e}")
        elif ctype == 'uninstall':
            # Execute uninstall script
            try:
                uninstall_script = Path(AGENT_DIR) / 'uninstall-agent.sh'
                if uninstall_script.exists():
                    # Make sure it's executable
                    os.chmod(str(uninstall_script), 0o755)
                    # Run it in background so we can ack first
                    subprocess.Popen(['nohup', 'bash', str(uninstall_script)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                    result_msg = 'uninstall started'
                else:
                    result_msg = 'uninstall script not found'
            except Exception as e:
                result_msg = f'uninstall failed: {e}'
        elif ctype == 'get_freeze_reports':
            # On-demand fetch of local freeze reports
            try:
                request_id = payload.get('requestId') or ''
                sender_filter = (payload.get('senderEmail') or '').strip().lower()
                account_filter = (payload.get('account') or '').strip().lower()
                reports = []
                if REPORTS_DIR.exists():
                    for path in sorted(REPORTS_DIR.glob('*.txt'), key=lambda p: p.stat().st_mtime, reverse=True):
                        parsed = _parse_report_file(path)
                        if not parsed:
                            continue
                        # Apply filters if provided
                        sender = (parsed.get('senderEmail') or '').strip().lower()
                        account = (parsed.get('account') or '').strip().lower()
                        if sender_filter and sender_filter not in sender:
                            continue
                        if account_filter and account_filter not in account:
                            continue
                        reports.append(parsed)
                response_payload = {
                    "serverId": str(sid),
                    "requestId": request_id,
                    "reports": reports,
                }
                session.post(
                    f"{BASE_URL}/api/osm/agent/freeze-reports-response",
                    json=response_payload,
                    headers=DEFAULT_HEADERS,
                    timeout=30
                )
                result_msg = f"freeze reports sent ({len(reports)} items)"
                print(f"[agent] get_freeze_reports: sent {len(reports)} reports")
            except Exception as e:
                result_msg = f"get freeze reports failed: {e}"
                print(f"[agent] get_freeze_reports failed: {e}")
        if cid:
            try:
                session.post(f"{BASE_URL}/api/osm/agent/command-ack", json={
                    "serverId": str(sid),
                    "commandId": cid,
                    "resultMessage": result_msg
                }, headers=DEFAULT_HEADERS, timeout=10)
                print(f"[agent] Acked command id={cid} result='{result_msg}'")
                # Queue snapshots no longer sent automatically - fetched live on-demand via get_live_queue
            except Exception:
                pass
    except Exception as e:
        print(f"[agent] Error processing SSE event: {e}")

# SSE connection configuration
SSE_CONNECT_TIMEOUT_SECS = 30
SSE_READ_TIMEOUT_SECS = 90  # Server sends heartbeat every 15s, so 90s allows 6 missed heartbeats
SSE_STALE_THRESHOLD_SECS = 120  # Reconnect if no data received for this long
SSE_INITIAL_BACKOFF_SECS = 5
SSE_MAX_BACKOFF_SECS = 60

def listen_for_commands_sse(sid: int):
    """Listen for real-time commands via Server-Sent Events.
    Parses SSE frames and only processes 'command' events.
    Implements robust reconnection with health monitoring.
    """
    print(f"[agent] Starting SSE command listener for server {sid}")
    consecutive_failures = 0
    last_event_time = time.time()
    
    while True:
        try:
            url = f"{BASE_URL}/api/osm/agent/queue-stream/{sid}"
            sse_headers = dict(DEFAULT_HEADERS)
            sse_headers.setdefault('Accept', 'text/event-stream')
            
            # Use a read timeout to detect dead connections
            response = session.get(url, headers=sse_headers, stream=True, 
                                   timeout=(SSE_CONNECT_TIMEOUT_SECS, SSE_READ_TIMEOUT_SECS))
            response.raise_for_status()
            print(f"[agent] Connected to SSE stream at {url}")
            consecutive_failures = 0  # Reset on successful connect
            last_event_time = time.time()

            event_name = None
            data_lines = []
            for raw in response.iter_lines(decode_unicode=True):
                if raw is None:
                    # Check if connection seems stale (no data for too long)
                    if time.time() - last_event_time > SSE_STALE_THRESHOLD_SECS:
                        print("[agent] SSE stream appears stale, reconnecting...")
                        break
                    continue
                    
                last_event_time = time.time()  # Update on any data received
                line = raw.rstrip('\r')
                
                if not line:
                    # dispatch accumulated event
                    if data_lines:
                        data_payload = '\n'.join(data_lines)
                        if (event_name or '').strip().lower() in ('', 'command'):
                            try:
                                process_sse_event(data_payload, sid)
                            except Exception as e:
                                print(f"[agent] SSE event dispatch error: {e}")
                    event_name = None
                    data_lines = []
                    continue
                # field parsing
                if line.startswith('event:'):
                    event_name = line[6:].strip()
                elif line.startswith('data:'):
                    # tolerate optional leading space
                    val = line[5:]
                    if val.startswith(' '):
                        val = val[1:]
                    data_lines.append(val)
                else:
                    # ignore other fields (id:, retry:, etc)
                    pass
                    
        except requests.exceptions.Timeout:
            print("[agent] SSE read timeout - server may be unresponsive, reconnecting...")
            consecutive_failures += 1
        except requests.exceptions.ConnectionError as e:
            print(f"[agent] SSE connection error: {e}")
            consecutive_failures += 1
        except Exception as e:
            print(f"[agent] SSE error: {e}")
            consecutive_failures += 1
        
        # Exponential backoff: 5s, 10s, 20s, 40s, max 60s
        backoff = min(SSE_INITIAL_BACKOFF_SECS * (2 ** min(consecutive_failures, 4)), SSE_MAX_BACKOFF_SECS)
        print(f"[agent] SSE reconnecting in {backoff}s (failures: {consecutive_failures})")
        time.sleep(backoff)

SSE_LISTENER_THREAD = None

def start_sse_listener_thread(sid: int):
    """Start the SSE listener thread for real-time command delivery."""
    global SSE_LISTENER_THREAD
    if SSE_LISTENER_THREAD and SSE_LISTENER_THREAD.is_alive():
        return
    SSE_LISTENER_THREAD = threading.Thread(target=listen_for_commands_sse, args=(sid,), daemon=True)
    SSE_LISTENER_THREAD.start()
    print(f"[agent] SSE listener thread started for server {sid}")



# ---------- CLI Helpers ----------

def _cli_build_empty_stats():
    return {
        "totalMessages": 0,
        "activeMessages": 0,
        "deferredMessages": 0,
        "holdMessages": 0,
        "incomingMessages": 0,
        "outgoingMessages": 0,
        "queueDetails": [],
        "spamScores": [],
        "blockedDomains": []
    }


def _cli_load_queue_matches(sender_email: str | None = None, account: str | None = None):
    stats = get_exim_queue_stats(_cli_build_empty_stats())
    matches = []
    target_email = (sender_email or '').strip().lower()
    target_account = (account or '').strip().lower()
    for detail in stats.get("queueDetails", []):
        sender = (detail.get("sender") or "").lower()
        recipients = [r.lower() for r in detail.get("recipients") or []]
        owner = (detail.get("ownerAccount") or "").lower()
        if target_email:
            if target_email == sender or any(target_email == r for r in recipients):
                matches.append(detail)
        elif target_account:
            if owner == target_account or target_account in sender:
                matches.append(detail)
    return matches


def _cli_print_queue_summary(matches):
    if not matches:
        print("No queued messages found.")
        return
    print(f"Found {len(matches)} queued message(s):")
    for item in matches:
        qid = item.get("queueId") or item.get("messageId")
        subject = item.get("subject") or ""
        sender = item.get("sender") or ""
        recipient = ", ".join(item.get("recipients") or [])
        print(f"  - {qid}: from {sender} to {recipient} | {subject}")


def _cli_handle_queue_action(matches, choice):
    qids = [item.get("queueId") or item.get("messageId") for item in matches if item.get("queueId") or item.get("messageId")]
    qids = [qid for qid in qids if qid]
    if not qids:
        return None, []
    if choice == "release":
        released = _execute_queue_action_release(qids)
        print(f"Released {released}/{len(qids)} message(s) from queue.")
    elif choice == "delete":
        deleted = _execute_queue_action_delete(qids)
        print(f"Deleted {deleted}/{len(qids)} message(s) from queue.")
    return choice if choice in {"release", "delete"} else None, qids


def _cli_command_freeze(args):
    target_email = args.email
    target_account = args.account
    if not target_email and not target_account:
        print("Specify --email or --account to freeze.")
        return 1
    reason = args.reason or "CLI freeze"
    resolved_account = target_account or ""
    sender_for_record = target_email or target_account or ""
    if target_email:
        add_user_to_hold_list("", target_email, reason=reason)
        add_sender_to_freeze_list(target_email, reason=reason)
        print(f"Frozen email address: {target_email}")
    if target_account:
        add_user_to_hold_list(target_account, reason=reason)
        print(f"Frozen account: {target_account}")
    _queue_action({
        "type": "freeze",
        "senderEmail": sender_for_record,
        "account": resolved_account,
        "reason": reason,
        "timestamp": time.time(),
    })
    return 0


def _cli_command_unfreeze(args):
    target_email = args.email
    target_account = args.account
    if not target_email and not target_account:
        print("Specify --email or --account to unfreeze.")
        return 1
    matches = _cli_load_queue_matches(sender_email=target_email, account=target_account)
    _cli_print_queue_summary(matches)

    queue_choice = None
    if matches:
        if args.release and args.delete:
            print("Specify only one of --release or --delete.")
            return 1
        if args.release:
            queue_choice = "release"
        elif args.delete:
            queue_choice = "delete"
        else:
            print("Queue action options:")
            print("  1) Unfreeze only")
            print("  2) Unfreeze and release queued emails")
            print("  3) Unfreeze and delete queued emails")
            selected = input("Select option [1/2/3]: ").strip()
            queue_choice = {"1": None, "2": "release", "3": "delete"}.get(selected, None)
    queue_ids = []
    if target_email:
        remove_user_from_hold_list(sender_email=target_email)
        remove_sender_from_freeze_list(target_email)
        print(f"Unfrozen email address: {target_email}")
    if target_account:
        remove_user_from_hold_list(username=target_account)
        print(f"Unfrozen account: {target_account}")
    if queue_choice:
        queue_choice, queue_ids = _cli_handle_queue_action(matches, queue_choice)
    resolved_account = target_account or ""
    sender_for_record = target_email or target_account or ""
    _queue_action({
        "type": "unfreeze",
        "senderEmail": sender_for_record,
        "account": resolved_account,
        "queueAction": queue_choice or "none",
        "queueIds": queue_ids,
        "timestamp": time.time(),
    })
    return 0


def _cli_command_limit_set(args):
    account = args.account.strip()
    max_emails = int(args.max)
    per_seconds = int(args.per)
    enabled = not args.disable
    set_account_limit_local(account, max_emails, per_seconds, enabled)
    _queue_action({
        "type": "set_limit",
        "account": account,
        "maxEmails": max_emails,
        "perSeconds": per_seconds,
        "enabled": enabled,
        "timestamp": time.time(),
    })
    print(f"Set limit for account '{account}': {max_emails} emails / {per_seconds}s (enabled={enabled})")
    return 0


def _cli_command_limit_remove(args):
    account = args.account.strip()
    remove_account_limit_local(account)
    _queue_action({
        "type": "remove_limit",
        "account": account,
        "timestamp": time.time(),
    })
    print(f"Removed limit for account '{account}'.")
    return 0


def _cli_command_limit_default(args):
    max_emails = int(args.max)
    per_seconds = int(args.per)
    enabled = not args.disable
    set_default_limit_local(max_emails, per_seconds, enabled)
    _queue_action({
        "type": "set_default_limit",
        "maxEmails": max_emails,
        "perSeconds": per_seconds,
        "enabled": enabled,
        "timestamp": time.time(),
    })
    state = "enabled" if enabled else "disabled"
    print(f"Default limit {state}: {max_emails} emails / {per_seconds}s")
    return 0


def _cli_ensure_enrolled():
    sid = _get_current_server_id()
    if not sid:
        print("Warning: agent server ID not known yet. CLI changes will apply locally and sync once enrollment succeeds.")


def _cli_build_parser():
    parser = argparse.ArgumentParser(prog="osm", description="OSM Agent CLI")
    subparsers = parser.add_subparsers(dest="command")

    freeze_parser = subparsers.add_parser("freeze", help="Freeze an email address or account")
    freeze_target = freeze_parser.add_mutually_exclusive_group(required=True)
    freeze_target.add_argument("--email", help="Email address to freeze")
    freeze_target.add_argument("--account", help="Hosting account to freeze")
    freeze_parser.add_argument("--reason", help="Reason for freeze")
    freeze_parser.set_defaults(func=_cli_command_freeze)

    unfreeze_parser = subparsers.add_parser("unfreeze", help="Unfreeze an email address or account")
    unfreeze_target = unfreeze_parser.add_mutually_exclusive_group(required=True)
    unfreeze_target.add_argument("--email", help="Email address to unfreeze")
    unfreeze_target.add_argument("--account", help="Hosting account to unfreeze")
    unfreeze_parser.add_argument("--release", action="store_true", help="Release queued emails after unfreeze")
    unfreeze_parser.add_argument("--delete", action="store_true", help="Delete queued emails after unfreeze")
    unfreeze_parser.set_defaults(func=_cli_command_unfreeze)

    limit_parser = subparsers.add_parser("limit", help="Manage per-account or default limits")
    limit_sub = limit_parser.add_subparsers(dest="limit_command")

    limit_set = limit_sub.add_parser("set", help="Set/update limit for an account")
    limit_set.add_argument("--account", required=True, help="Hosting account name")
    limit_set.add_argument("--max", required=True, type=int, help="Max emails in window")
    limit_set.add_argument("--per", required=True, type=int, help="Time window in seconds")
    limit_set.add_argument("--disable", action="store_true", help="Disable (but retain values)")
    limit_set.set_defaults(func=_cli_command_limit_set)

    limit_remove = limit_sub.add_parser("remove", help="Remove limit for an account")
    limit_remove.add_argument("--account", required=True, help="Hosting account name")
    limit_remove.set_defaults(func=_cli_command_limit_remove)

    limit_default = limit_sub.add_parser("default", help="Set default (server-wide) limit")
    limit_default.add_argument("--max", required=True, type=int, help="Max emails in window")
    limit_default.add_argument("--per", required=True, type=int, help="Time window in seconds")
    limit_default.add_argument("--disable", action="store_true", help="Disable the default limit")
    limit_default.set_defaults(func=_cli_command_limit_default)

    return parser


def run_cli(argv=None):
    _cli_ensure_enrolled()
    parser = _cli_build_parser()
    args = parser.parse_args(argv)
    if not hasattr(args, "func"):
        parser.print_help()
        return 1
    return args.func(args)


def main():
    """Main agent loop"""
    global CURRENT_THRESHOLDS
    print("[agent] Starting OSM agent...")
    
    try:
        sid = None
        if BASE_URL and TOKEN:
            try:
                sid = enroll()
                try:
                    global CURRENT_SERVER_ID
                    CURRENT_SERVER_ID = sid
                except Exception:
                    pass
                try:
                    _save_agent_state({"serverId": str(sid)})
                except Exception:
                    pass
                print(f"[agent] Enrolled with server ID: {sid}")
                # Report agent version to server
                try:
                    session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
                except Exception as e:
                    print(f"[agent] Failed to report version: {e}")
                # Start SSE listener for real-time commands
                start_sse_listener_thread(sid)
            except Exception as e:
                print(f"[agent] Enrollment failed, continuing offline: {e}")
                try:
                    cached_sid = _get_current_server_id()
                    if cached_sid:
                        sid = cached_sid
                        CURRENT_SERVER_ID = cached_sid
                        # Start SSE listener with cached server ID
                        start_sse_listener_thread(sid)
                except Exception:
                    pass
        else:
            try:
                cached_sid = _get_current_server_id()
                if cached_sid:
                    sid = cached_sid
                    CURRENT_SERVER_ID = cached_sid
                    # Start SSE listener with cached server ID
                    start_sse_listener_thread(sid)
            except Exception:
                pass
        
        # Load config (online or cached)
        if sid is not None:
            config = fetch_config(sid)
        else:
            d = _read_limits_cache().get("default", {})
            config = {
                "mailQueuePath": "/var/spool/postfix",
                "checkInterval": 30,
                "spamThreshold": 5.0,
                "blockedDomainsJson": "[]",
                "alertThresholdsJson": "{}",
                "defaultMaxEmails": int(d.get('maxEmails') or 0),
                "defaultPerSeconds": int(d.get('perSeconds') or 0),
                "defaultEnabled": bool(d.get('enabled') or False)
            }
        print(f"[agent] Using config: {config}")
        
        # Apply default server-wide limit settings from config
        def apply_defaults_from_config(cfg: dict):
            try:
                global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
                DEFAULT_MAX_EMAILS = int(cfg.get('defaultMaxEmails') or 0)
                DEFAULT_PER_SECONDS = int(cfg.get('defaultPerSeconds') or 0)
                DEFAULT_ENABLED = bool(cfg.get('defaultEnabled') or False)
                print(f"[agent] Default limit: enabled={DEFAULT_ENABLED} max={DEFAULT_MAX_EMAILS} per={DEFAULT_PER_SECONDS}s")
            except Exception as e:
                print(f"[agent] Failed to apply default limit from config: {e}")
        apply_defaults_from_config(config)
        

        # Send accounts snapshot immediately on startup
        if sid is not None:
            try:
                send_accounts_snapshot(sid)
            except Exception as e:
                print(f"[agent] Initial accounts snapshot failed: {e}")
            try:
                # Prime local limits cache so log parsing can enforce account limits immediately
                fetch_account_limits(sid)
            except Exception as e:
                print(f"[agent] Initial account limits fetch failed: {e}")

        # Initialize thresholds and start near-real-time Exim monitor
        initial_thresholds = parse_thresholds(config)
        try:
            global CURRENT_THRESHOLDS
            with THRESHOLD_LOCK:
                CURRENT_THRESHOLDS = list(initial_thresholds)
        except Exception:
            pass
        try:
            process_exim_logs(initial_thresholds, sid)
            write_tracking_db(initial_thresholds)
        except Exception as e:
            print(f"[agent] Initial Exim scan failed: {e}")
        _ensure_exim_monitor_thread()
        
        loop_count = 0
        while True:
            try:
                loop_count += 1
                print(f"[agent] Starting loop iteration {loop_count}")
                # Periodically confirm version and liveness
                if sid is not None and loop_count % 5 == 0:
                    try:
                        session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
                    except Exception as e:
                        print(f"[agent] Version hello failed: {e}")
                
                _process_pending_actions()

                # Keep freeze state aligned after pushing any queued actions
                if sid is not None:
                    reconcile_freeze_file_with_server(sid)

                # Periodically refresh config so default limits can change without restart
                if sid is not None and (loop_count % 3) == 0:
                    try:
                        config = fetch_config(sid)
                        apply_defaults_from_config(config)
                        new_thresholds = parse_thresholds(config)
                        try:
                            with THRESHOLD_LOCK:
                                CURRENT_THRESHOLDS = list(new_thresholds)
                        except Exception:
                            pass
                    except Exception as e:
                        print(f"[agent] Config refresh failed: {e}")
                stats = None
                if QUEUE_INGEST_ENABLED and config.get("queueIngestEnabled", True):
                    stats = get_mail_queue_stats(config["mailQueuePath"])
                    print(f"[agent] Got stats: {stats['totalMessages']} total messages")
                    # Queue snapshots no longer sent automatically - fetched live on-demand via get_live_queue
                    enforce_account_limits(sid, stats)
                else:
                    print("[agent] Queue ingestion disabled; skipping queue stats collection")

                # Check for spam alerts
                if sid is not None and stats is not None:
                    check_spam_thresholds(sid, stats, config)

                if stats is not None:
                    print(f"[agent] Queue stats: {stats['totalMessages']} total, "
                          f"{stats['activeMessages']} active, "
                          f"{stats['deferredMessages']} deferred, "
                          f"{len(stats['spamScores'])} spam scores")
                else:
                    print("[agent] Queue stats unavailable (ingest disabled)")
                
                # Sleep for configured interval
                print(f"[agent] Sleeping for {config.get('checkInterval', 30)} seconds...")
                time.sleep(config.get("checkInterval", 30))
                
            except Exception as e:
                print(f"[agent] Error in main loop: {e}")
                time.sleep(30)
                
    except Exception as e:
        print(f"[agent] Fatal error: {e}")
        time.sleep(30)
        return


if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] not in {"agent", "--agent"}:
        sys.exit(run_cli(sys.argv[1:]))
    main()