File: //opt/osm/main.py
#!/usr/bin/env python3
import argparse
import json
import os
import time
import subprocess
import re
import requests
from requests.adapters import HTTPAdapter
try:
    # Retry support for transient 5xx via urllib3
    from urllib3.util.retry import Retry  # type: ignore
except Exception:
    Retry = None
import sys
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict, deque
import shutil
import threading
import stat
from typing import Optional
def _load_env_file():
    env_path = Path(__file__).resolve().parent / ".env"
    if not env_path.exists():
        return
    try:
        for line in env_path.read_text(encoding='utf-8').splitlines():
            line = line.strip()
            if not line or line.startswith("#") or "=" not in line:
                continue
            key, value = line.split("=", 1)
            key = key.strip()
            value = value.strip().strip("'\"")
            if key and key not in os.environ:
                os.environ[key] = value
    except Exception:
        pass
_load_env_file()
BASE_URL = os.environ.get("SERVER_BASE_URL")
TOKEN = os.environ.get("AGENT_TOKEN")
API_KEY = os.environ.get("OSM_API_KEY", "")
if not BASE_URL or not TOKEN:
    print("[agent] Warning: SERVER_BASE_URL or AGENT_TOKEN not set. Running in offline mode.")
session = requests.Session()
# Configure light retries for transient gateway errors (e.g., tunneling hiccups)
try:
    if Retry is not None:
        retry = Retry(
            total=2,
            connect=1,
            read=1,
            backoff_factor=0.5,
            status_forcelist=[502, 503, 504],
            allowed_methods=frozenset(["GET", "POST"]),
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
except Exception:
    pass
DEFAULT_HEADERS = {"X-Api-Key": API_KEY} if API_KEY else {}
def _env_flag(name, default=False):
    val = os.environ.get(name)
    if val is None:
        return default
    try:
        return val.strip().lower() in {"1", "true", "yes", "on"}
    except Exception:
        return default
REQUEST_DEBUG_ENABLED = _env_flag("OSM_DEBUG_REQUESTS", False)
def _mask_headers(headers):
    if not headers:
        return {}
    masked = {}
    for key, value in headers.items():
        if key.lower() == "x-api-key":
            masked[key] = "***"
        else:
            masked[key] = value
    return masked
def _truncate_payload(payload, limit=500):
    if payload is None:
        return None
    try:
        text = json.dumps(payload, default=str)
    except Exception:
        text = str(payload)
    if len(text) > limit:
        return text[:limit] + "...(truncated)"
    return text
_original_request = session.request
def _logged_request(method, url, **kwargs):
    method_upper = method.upper()
    params_preview = _truncate_payload(kwargs.get("params"))
    json_preview = _truncate_payload(kwargs.get("json"))
    data_preview = _truncate_payload(kwargs.get("data"))
    headers_preview = _mask_headers(kwargs.get("headers") or {})
    timeout_value = kwargs.get("timeout")
    if REQUEST_DEBUG_ENABLED:
        try:
            print(f"[agent] -> {method_upper} {url} params={params_preview} json={json_preview} data={data_preview} headers={headers_preview} timeout={timeout_value}")
        except Exception as log_err:
            print(f"[agent] Failed to log request pre-flight: {log_err}")
    try:
        response = _original_request(method, url, **kwargs)
    except Exception as exc:
        if REQUEST_DEBUG_ENABLED:
            print(f"[agent] <- {method_upper} {url} raised {exc}")
        else:
            print(f"[agent] HTTP {method_upper} {url} failed: {exc}")
        raise
    if REQUEST_DEBUG_ENABLED:
        try:
            body_preview = response.text
            if body_preview and len(body_preview) > 500:
                body_preview = body_preview[:500] + "...(truncated)"
            print(f"[agent] <- {method_upper} {url} status={response.status_code} body={body_preview}")
        except Exception as log_err:
            print(f"[agent] Failed to log response: {log_err}")
    return response
session.request = _logged_request
# ---------- OSM-like tracking DB and Exim log processing (local on agent) ----------
# Global default limit (from server config)
DEFAULT_MAX_EMAILS: int = 0
DEFAULT_PER_SECONDS: int = 0
DEFAULT_ENABLED: bool = False
QUEUE_INGEST_ENABLED: bool = True
FREEZE_REPORTS_ENABLED: bool = True
IPDB_LOOKUP_ENABLED: bool = True
# Paths
AGENT_DIR = Path(__file__).resolve().parent
TRACKING_DB_PATH = AGENT_DIR / 'osm_tracking.db'
EXIM_LOG_STATE_PATH = AGENT_DIR / 'exim_log.state'
LIMITS_CACHE_PATH = AGENT_DIR / 'user_limits.json'
ACTIONS_QUEUE_PATH = AGENT_DIR / 'pending_actions.json'
AGENT_STATE_PATH = AGENT_DIR / 'agent_state.json'
# OSM-style integration files (as used by OSM on cPanel/DirectAdmin servers)
VIRTUAL_DIR = Path('/etc/virtual')
OSM_HOLD_FILE = VIRTUAL_DIR / 'osm_hold'      # usernames to freeze/hold
OSM_DISABLE_FILE = VIRTUAL_DIR / 'osm_disable'  # usernames to discard
# Legacy/alternate freeze list for generic exim setups (optional)
FREEZE_LIST_PATH = Path('/etc/exim/osm_freeze_senders')
# In-memory counters: key -> deque[timestamps]
# key is a tuple: (account_ref: str, trigger_name: str, window_seconds: int)
LOG_COUNTERS = defaultdict(deque)
LAST_TRIGGERED = {}
ACCOUNT_LIMIT_WINDOWS = defaultdict(deque)  # account -> deque[timestamps]
# Last seen subject per account_ref
LAST_SUBJECT_BY_ACCOUNT = {}
# Deduplication of log-based events by message-id
SEEN_LOG_MSG_IDS = deque()
SEEN_LOG_MSG_IDS_SET = set()
SEEN_LOG_MSG_IDS_RETENTION_SECS = 600  # 10 minutes
# Deduplication of per-recipient deliveries by (msgid, recipient)
SEEN_MSG_RCPT = deque()
SEEN_MSG_RCPT_SET = set()
SEEN_MSG_RCPT_RETENTION_SECS = 600
# Retain counters in DB for some time after last event
LAST_COUNTER_SEEN_TS = {}
COUNTER_RETENTION_SECS = 7200
# Per-message caches
MSG_ID_TO_ACCOUNT = {}
MSG_ID_TO_SUBJECT = {}
# Recent recipients per account_ref with last-seen timestamps
RECIPIENTS_BY_ACCOUNT = {}
RECIPIENT_RETENTION_SECS = 7200
# Domain -> owner cache (cpanel/directadmin)
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD = 0.0
DOMAIN_OWNER_TTL_SECS = 300
# Per-message metadata
MSG_ID_TO_CLIENT = {}
MSG_ID_TO_RELAY = {}
MSG_ID_TO_OWNER = {}
MSG_ID_TO_RCPTS = {}
MSG_ID_TO_RELAY_TYPE = {}
# Per-owner recent events (delivery granularity)
ACCOUNT_EVENTS = defaultdict(deque)
ACCOUNT_EVENTS_RETENTION_SECS = 7200
MAX_LOG_LOOKBACK_SECS = 3 * 3600  # Ignore historical log entries older than 3 hours
REPORTS_DIR = Path('/opt/osm/reports')
PENDING_REPORTS_DIR = REPORTS_DIR / 'pending'
LAST_LOCAL_HOLD_UPDATE_TS = 0.0
CURRENT_SERVER_ID = None
THRESHOLD_LOCK = threading.Lock()
CURRENT_THRESHOLDS = []
EXIM_MONITOR_THREAD = None
EXIM_MONITOR_INTERVAL_SECS = 2.0
LAST_REPORT_SYNC_TS = 0.0
try:
    sys.stdout.reconfigure(line_buffering=True)
    sys.stderr.reconfigure(line_buffering=True)
except Exception:
    pass
def parse_thresholds(config: dict):
    """Return list of threshold dicts: [{events:int, seconds:int, name:str, type:str}]."""
    try:
        default_events = int(config.get('defaultMaxEmails') or 50)
    except Exception:
        default_events = 50
    try:
        default_seconds = int(config.get('defaultPerSeconds') or 300)
    except Exception:
        default_seconds = 300
    if default_events <= 0:
        default_events = 50
    if default_seconds <= 0:
        default_seconds = 300
    default_thresholds = [
        {"events": default_events, "seconds": default_seconds, "name": "default_logline", "type": "logline"}
    ]
    try:
        raw = config.get("alertThresholdsJson")
        if not raw:
            return default_thresholds
        data = raw
        if isinstance(raw, str):
            try:
                data = json.loads(raw)
            except Exception:
                return default_thresholds
        # Accept either {"thresholds": [...]} or {"loglineThresholds": [...]} or a list directly
        if isinstance(data, dict):
            thr = data.get("thresholds") or data.get("loglineThresholds")
        elif isinstance(data, list):
            thr = data
        else:
            thr = None
        if not thr:
            return default_thresholds
        out = []
        for i, t in enumerate(thr, start=1):
            try:
                events = int(t.get("events"))
                seconds = int(t.get("seconds"))
                name = (t.get("name") or f"trigger{i}")
                typ = (t.get("type") or "logline")
                if events > 0 and seconds > 0:
                    out.append({"events": events, "seconds": seconds, "name": name, "type": typ})
            except Exception:
                continue
        return out or default_thresholds
    except Exception:
        return default_thresholds
def _load_log_state(log_path: Path):
    try:
        if not EXIM_LOG_STATE_PATH.exists():
            return {"path": str(log_path), "ino": None, "pos": 0}
        data = json.loads(EXIM_LOG_STATE_PATH.read_text())
        if data.get("path") != str(log_path):
            return {"path": str(log_path), "ino": None, "pos": 0}
        return {"path": str(log_path), "ino": data.get("ino"), "pos": int(data.get("pos", 0))}
    except Exception:
        return {"path": str(log_path), "ino": None, "pos": 0}
def _save_log_state(state: dict):
    try:
        EXIM_LOG_STATE_PATH.write_text(json.dumps(state))
    except Exception as e:
        print(f"[agent] Error saving log state: {e}")
def _snapshot_thresholds():
    with THRESHOLD_LOCK:
        if CURRENT_THRESHOLDS:
            return [dict(t) for t in CURRENT_THRESHOLDS]
    return parse_thresholds({})
def _exim_monitor_loop():
    print(f"[agent] Exim monitor thread started (interval={EXIM_MONITOR_INTERVAL_SECS}s)")
    while True:
        try:
            thresholds = _snapshot_thresholds()
            process_exim_logs(thresholds, CURRENT_SERVER_ID)
            write_tracking_db(thresholds)
        except Exception as e:
            print(f"[agent] Exim monitor error: {e}")
        time.sleep(EXIM_MONITOR_INTERVAL_SECS)
def _ensure_exim_monitor_thread():
    global EXIM_MONITOR_THREAD
    if EXIM_MONITOR_THREAD and EXIM_MONITOR_THREAD.is_alive():
        return
    EXIM_MONITOR_THREAD = threading.Thread(target=_exim_monitor_loop, daemon=True)
    EXIM_MONITOR_THREAD.start()
def _parse_exim_timestamp(line: str) -> float:
    """Parse leading timestamp from an Exim log line; fallback to current time."""
    try:
        ts_str = line[:19]
        if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", ts_str):
            try:
                dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
                return time.mktime(dt.timetuple())
            except Exception:
                pass
    except Exception:
        pass
    return time.time()
# --- Inserted helpers for enrollment, offline cache, and owner resolution ---
def enroll():
    """Enroll this agent with the server and return serverId."""
    resp = session.post(f"{BASE_URL}/api/osm/agent/enroll", json={"token": TOKEN}, headers=DEFAULT_HEADERS, timeout=10)
    resp.raise_for_status()
    return resp.json()["serverId"]
def _read_limits_cache() -> dict:
    try:
        if LIMITS_CACHE_PATH.exists():
            return json.loads(LIMITS_CACHE_PATH.read_text(encoding='utf-8'))
    except Exception as e:
        print(f"[agent] Error reading limits cache: {e}")
    return {"default": {"enabled": False, "maxEmails": 0, "perSeconds": 0}, "accounts": []}
def _write_limits_cache(cache: dict):
    try:
        LIMITS_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
        LIMITS_CACHE_PATH.write_text(json.dumps(cache, ensure_ascii=False, indent=2))
    except Exception as e:
        print(f"[agent] Error writing limits cache: {e}")
def _load_agent_state() -> dict:
    try:
        if AGENT_STATE_PATH.exists():
            return json.loads(AGENT_STATE_PATH.read_text(encoding='utf-8'))
    except Exception as e:
        print(f"[agent] Error reading agent state: {e}")
    return {}
def _save_agent_state(state: dict):
    try:
        AGENT_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
        AGENT_STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
    except Exception as e:
        print(f"[agent] Error writing agent state: {e}")
def _get_current_server_id() -> str | None:
    try:
        if CURRENT_SERVER_ID is not None:
            return str(CURRENT_SERVER_ID)
    except Exception:
        pass
    state = _load_agent_state()
    sid = state.get("serverId")
    return str(sid) if sid else None
def _read_actions_queue() -> list:
    try:
        if ACTIONS_QUEUE_PATH.exists():
            return json.loads(ACTIONS_QUEUE_PATH.read_text(encoding='utf-8'))
    except Exception as e:
        print(f"[agent] Error reading actions queue: {e}")
    return []
def _write_actions_queue(actions: list):
    try:
        ACTIONS_QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True)
        ACTIONS_QUEUE_PATH.write_text(json.dumps(actions, ensure_ascii=False, indent=2))
    except Exception as e:
        print(f"[agent] Error writing actions queue: {e}")
def _queue_action(action: dict):
    try:
        actions = _read_actions_queue()
        action = dict(action)
        action.setdefault("queuedAt", time.time())
        actions.append(action)
        _write_actions_queue(actions)
    except Exception as e:
        print(f"[agent] Failed to queue action: {e}")
def set_account_limit_local(account: str, max_emails: int, per_seconds: int, enabled: bool = True):
    try:
        account = (account or '').strip()
        if not account:
            return
        cache = _read_limits_cache()
        accounts = cache.get('accounts') or []
        updated = False
        for entry in accounts:
            if entry.get('account', '').strip().lower() == account.lower():
                entry['maxEmails'] = int(max_emails)
                entry['perSeconds'] = int(per_seconds)
                entry['enabled'] = bool(enabled)
                updated = True
                break
        if not updated:
            accounts.append({
                "account": account,
                "maxEmails": int(max_emails),
                "perSeconds": int(per_seconds),
                "enabled": bool(enabled)
            })
        cache['accounts'] = accounts
        _write_limits_cache(cache)
    except Exception as e:
        print(f"[agent] Failed to set local account limit: {e}")
def remove_account_limit_local(account: str):
    try:
        account = (account or '').strip()
        if not account:
            return
        cache = _read_limits_cache()
        accounts = cache.get('accounts') or []
        new_accounts = [entry for entry in accounts if entry.get('account', '').strip().lower() != account.lower()]
        cache['accounts'] = new_accounts
        _write_limits_cache(cache)
    except Exception as e:
        print(f"[agent] Failed to remove local account limit: {e}")
def set_default_limit_local(max_emails: int, per_seconds: int, enabled: bool = True):
    try:
        cache = _read_limits_cache()
        cache['default'] = {
            "enabled": bool(enabled),
            "maxEmails": int(max_emails),
            "perSeconds": int(per_seconds)
        }
        _write_limits_cache(cache)
        try:
            global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
            DEFAULT_MAX_EMAILS = int(max_emails)
            DEFAULT_PER_SECONDS = int(per_seconds)
            DEFAULT_ENABLED = bool(enabled)
        except Exception:
            pass
    except Exception as e:
        print(f"[agent] Failed to set local default limit: {e}")
def fetch_config(sid: int):
    """Fetch monitoring configuration from server and persist defaults; fallback to cache offline."""
    try:
        resp = session.get(f"{BASE_URL}/api/osm/agent/config", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
        resp.raise_for_status()
        cfg = resp.json()
        cfg.setdefault("ipdbLookupEnabled", True)
        cfg.setdefault("queueIngestEnabled", True)
        cfg.setdefault("freezeReportsEnabled", True)
        cache = _read_limits_cache()
        cache["default"] = {
            "enabled": bool(cfg.get('defaultEnabled') or False),
            "maxEmails": int(cfg.get('defaultMaxEmails') or 0),
            "perSeconds": int(cfg.get('defaultPerSeconds') or 0)
        }
        _write_limits_cache(cache)
        try:
            global QUEUE_INGEST_ENABLED, FREEZE_REPORTS_ENABLED, IPDB_LOOKUP_ENABLED
            QUEUE_INGEST_ENABLED = bool(cfg.get('queueIngestEnabled', True))
            FREEZE_REPORTS_ENABLED = bool(cfg.get('freezeReportsEnabled', True))
            IPDB_LOOKUP_ENABLED = bool(cfg.get('ipdbLookupEnabled', True))
            print(f"[agent] Feature toggles: queue={QUEUE_INGEST_ENABLED} freeze={FREEZE_REPORTS_ENABLED} ipdb={IPDB_LOOKUP_ENABLED}")
        except Exception as toggle_err:
            print(f"[agent] Failed to apply feature toggles: {toggle_err}")
        return cfg
    except Exception as e:
        print(f"[agent] Error fetching config: {e}")
        d = _read_limits_cache().get("default", {})
        return {
            "mailQueuePath": "/var/spool/postfix",
            "checkInterval": 30,
            "spamThreshold": 5.0,
            "blockedDomainsJson": "[]",
            "alertThresholdsJson": "{}",
            "defaultMaxEmails": int(d.get('maxEmails') or 0),
            "defaultPerSeconds": int(d.get('perSeconds') or 0),
            "defaultEnabled": bool(d.get('enabled') or False),
            "ipdbLookupEnabled": IPDB_LOOKUP_ENABLED,
            "queueIngestEnabled": QUEUE_INGEST_ENABLED,
            "freezeReportsEnabled": FREEZE_REPORTS_ENABLED
        }
def _extract_email(text):
    try:
        m = re.search(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+)", text or '')
        return m.group(1).lower() if m else None
    except Exception:
        return None
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD: float = 0.0
DOMAIN_OWNER_TTL_SECS = 300
def _load_domain_owner_map():
    global DOMAIN_OWNER_LAST_LOAD
    now = time.time()
    if DOMAIN_OWNER_MAP and (now - DOMAIN_OWNER_LAST_LOAD) < DOMAIN_OWNER_TTL_SECS:
        return
    try:
        DOMAIN_OWNER_MAP.clear()
        # cPanel: /etc/trueuserdomains format: domain: user
        p = Path('/etc/trueuserdomains')
        if p.exists():
            for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # cPanel alternate mapping file /etc/userdomains
        p3 = Path('/etc/userdomains')
        if p3.exists():
            for line in p3.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # DirectAdmin: /etc/virtual/domainowners
        p2 = Path('/etc/virtual/domainowners')
        if p2.exists():
            for line in p2.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # DirectAdmin global domainowners file
        da_owners = Path('/usr/local/directadmin/data/admin/domainowners')
        if da_owners.exists():
            for line in da_owners.read_text(encoding='utf-8', errors='ignore').splitlines():
                if ':' in line:
                    dom, user = line.split(':', 1)
                    dom = (dom or '').strip().lower()
                    user = (user or '').strip().lower()
                    if dom and user:
                        DOMAIN_OWNER_MAP[dom] = user
        # DirectAdmin per-user domain lists
        da_users_root = Path('/usr/local/directadmin/data/users')
        if da_users_root.exists():
            for user_dir in da_users_root.iterdir():
                if not user_dir.is_dir():
                    continue
                user = user_dir.name.strip().lower()
                try:
                    dom_list = user_dir / 'domains.list'
                    if dom_list.exists():
                        for dom in dom_list.read_text(encoding='utf-8', errors='ignore').splitlines():
                            dom = (dom or '').strip().lower()
                            if dom:
                                DOMAIN_OWNER_MAP[dom] = user
                except Exception:
                    continue
        DOMAIN_OWNER_LAST_LOAD = now
    except Exception as e:
        print(f"[agent] Failed loading domain owners: {e}")
def _resolve_owner_from_email(email_addr):
    try:
        _load_domain_owner_map()
        email_addr = (email_addr or '').strip().lower()
        if '@' not in email_addr:
            return None
        local_part, domain = email_addr.split('@', 1)
        domain = domain.rstrip('.')
        parts = domain.split('.')
        for i in range(len(parts)-1):
            d = '.'.join(parts[i:])
            owner = DOMAIN_OWNER_MAP.get(d)
            if owner:
                return owner
        owner = DOMAIN_OWNER_MAP.get(domain)
        if owner:
            return owner
        # Fallback: inspect virtual mailbox passwd files to infer owner
        owner = _lookup_owner_from_virtual_passwd(local_part, domain)
        if owner:
            DOMAIN_OWNER_MAP[domain] = owner
            return owner
        return None
    except Exception:
        return None
def _lookup_owner_from_virtual_passwd(local_part: str, domain: str):
    """Inspect /etc/virtual/<domain>/passwd files to determine owning account."""
    try:
        local_part = (local_part or '').strip().lower()
        domain = (domain or '').strip().lower()
        if not local_part or not domain:
            return None
        domains_to_try = []
        if domain:
            domains_to_try.append(domain)
            parts = domain.split('.')
            for i in range(1, len(parts)-1):
                parent = '.'.join(parts[i:])
                if parent not in domains_to_try:
                    domains_to_try.append(parent)
        for extra in ('lists', 'default'):
            if extra not in domains_to_try:
                domains_to_try.append(extra)
        for dom in domains_to_try:
            vpath = Path('/etc/virtual') / dom / 'passwd'
            if not vpath.exists():
                continue
            try:
                for line in vpath.read_text(encoding='utf-8', errors='ignore').splitlines():
                    if not line or line.startswith('#'):
                        continue
                    parts = line.split(':')
                    if not parts:
                        continue
                    account_field = parts[0].strip().lower()
                    if account_field != local_part and account_field != f"{local_part}@{dom}":
                        continue
                    # Attempt to derive owner from home/maildir path within the line
                    for token in parts[1:]:
                        if '/home/' in token:
                            m = re.search(r'/home/([^/]+)/', token)
                            if m:
                                return m.group(1).strip().lower()
                    # Sometimes the maildir path is at the end
                    m = re.search(r'/home/([^/]+)/', line)
                    if m:
                        return m.group(1).strip().lower()
            except Exception:
                continue
    except Exception:
        pass
    return None
def _resolve_limit_account_key(account_ref: str) -> str:
    """Map an account_ref to the hosting account owner key used by limits."""
    try:
        # Try to extract email first (sender part before '(' or in the string)
        email = None
        if '(' in account_ref:
            email = account_ref.split('(', 1)[0].strip()
        if not email:
            email = _extract_email(account_ref)
        owner = _resolve_owner_from_email(email) if email else None
        if owner:
            return owner
        # Fallback: local-part or username
        if '(' in account_ref and account_ref.endswith(')'):
            inside = account_ref[account_ref.rfind('(')+1:-1]
            if inside:
                return inside.lower()
        m = re.match(r"([^@]+)@", account_ref)
        if m:
            return m.group(1).lower()
        return account_ref.strip().lower()
    except Exception:
        return account_ref.strip().lower()
def extract_username_from_account_ref(account_ref: str) -> str:
    try:
        if '(' in account_ref and account_ref.endswith(')'):
            return account_ref[account_ref.rfind('(')+1:-1].strip().lower()
        m = re.match(r"([^@]+)@", account_ref)
        if m:
            return m.group(1).strip().lower()
        return account_ref.strip().lower()
    except Exception:
        return account_ref.strip().lower()
def fetch_freeze_list(sid: int):
    try:
        resp = session.get(f"{BASE_URL}/api/osm/agent/freeze-list", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
        resp.raise_for_status()
        return resp.json() or []
    except Exception as e:
        print(f"[agent] Error fetching freeze list: {e}")
        # Return None to indicate remote unavailable so we do not wipe local hold file
        return None
def reconcile_freeze_file_with_server(sid: int, prefer_server: bool = False):
    try:
        # Grace period after local updates to avoid wiping local holds before server reflects them
        # Skip the grace period when server explicitly requests reconciliation (prefer_server=True)
        try:
            if not prefer_server and time.time() - LAST_LOCAL_HOLD_UPDATE_TS < 30:
                return
        except Exception:
            pass
        server_items = fetch_freeze_list(sid)
        if server_items is None:
            # Server unreachable; do not modify local hold file
            return
        # Build server set of entries (username or sender email)
        server_set = set()
        for it in server_items:
            if not it.get('active', True):
                continue
            account = (it.get('account') or '').strip()
            sender = (it.get('senderEmail') or '').strip()
            if account:
                server_set.add(account.lower())
            elif sender:
                server_set.add(sender.lower())
        _ensure_virtual_files()
        local_set = set()
        try:
            for line in OSM_HOLD_FILE.read_text().splitlines():
                s = (line or '').strip().lower()
                if s:
                    local_set.add(s)
        except Exception:
            pass
        if prefer_server:
            # Server-wins mode (e.g., after an app-initiated unfreeze): adopt server state and do NOT push local-only entries
            desired = server_set
            if desired != local_set:
                with OSM_HOLD_FILE.open('w') as f:
                    for u in sorted(desired):
                        f.write(u + "\n")
                _reload_exim()
                print(f"[agent] Reconciled (server-wins). Server={len(server_set)} local={len(local_set)} applied={len(desired)}")
        else:
            # Merge mode (agent-wins for offline changes): push local-only entries to server then write union locally
            to_push = sorted(list(local_set - server_set))
            for val in to_push:
                try:
                    # Try to preserve the original freeze reason if we can infer it
                    freeze_reason = _find_freeze_reason(val)
                    payload = {"serverId": str(sid), "reason": freeze_reason}
                    if '@' in val:
                        payload["senderEmail"] = val
                        payload["account"] = ""
                    else:
                        payload["senderEmail"] = ""
                        payload["account"] = val
                    session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
                except Exception:
                    # Leave in local_set; future loops will retry via pending actions or here
                    pass
            # Compute union as desired hold file content
            desired = server_set | local_set
            if desired != local_set:
                with OSM_HOLD_FILE.open('w') as f:
                    for u in sorted(desired):
                        f.write(u + "\n")
                _reload_exim()
                print(f"[agent] Reconciled (merge). Server={len(server_set)} local={len(local_set)} merged={len(desired)}")
    except Exception as e:
        print(f"[agent] Reconcile freeze list failed: {e}")
def _find_freeze_reason(identifier: str) -> str:
    """Best-effort recovery of the original freeze reason for a given account/email.
    Checks queued actions first, then the most recent local report, else returns a safe default.
    """
    try:
        ident = (identifier or '').strip().lower()
        if not ident:
            return 'Manual freeze'
        # Look into pending actions (latest first) for a matching freeze entry
        try:
            actions = _read_actions_queue()
            for action in reversed(actions):
                if (action.get('type') or '') != 'freeze':
                    continue
                s = (action.get('senderEmail') or '').strip().lower()
                a = (action.get('account') or '').strip().lower()
                if ident == s or (a and ident == a):
                    r = (action.get('reason') or '').strip()
                    if r:
                        return r
        except Exception:
            pass
        # Search most recent report that matches sender/account
        try:
            if REPORTS_DIR.exists():
                files = sorted(REPORTS_DIR.glob('*.txt'), key=lambda p: p.stat().st_mtime, reverse=True)
                for path in files:
                    parsed = _parse_report_file(path)
                    if not parsed:
                        continue
                    s = (parsed.get('senderEmail') or '').strip().lower()
                    a = (parsed.get('account') or '').strip().lower()
                    if ident == s or (a and ident == a):
                        r = (parsed.get('reason') or '').strip()
                        if r:
                            return r
        except Exception:
            pass
        return 'Manual freeze'
    except Exception:
        return 'Manual freeze'
def get_exim_log_path() -> Path:
    """Return best-known Exim main log path for this system."""
    try:
        candidates = []
        try:
            if detect_control_panel() == 'cpanel':
                candidates.append(Path('/var/log/exim_mainlog'))
        except Exception:
            pass
        candidates.extend([
            Path('/var/log/exim/mainlog'),
            Path('/var/log/exim4/mainlog'),
            Path('/var/log/exim_mainlog'),
        ])
        for p in candidates:
            try:
                if p.exists() and p.is_file():
                    return p
            except Exception:
                continue
    except Exception:
        pass
    return Path('/var/log/exim/mainlog')
def process_exim_logs(thresholds, sid=None):
    """Incrementally parse Exim mainlog and update LOG_COUNTERS."""
    log_path = get_exim_log_path()
    try:
        if not log_path.exists():
            return
        st = log_path.stat()
        state = _load_log_state(log_path)
        # Handle rotation or first run
        if state.get("ino") != st.st_ino or state.get("pos", 0) > st.st_size:
            state = {"path": str(log_path), "ino": st.st_ino, "pos": 0}
        with log_path.open('r', encoding='utf-8', errors='ignore') as f:
            f.seek(state.get("pos", 0))
            new_data = f.read()
            state["pos"] = f.tell()
        _save_log_state(state)
        now_ts = time.time()
        # Parse new lines
        for line in new_data.splitlines():
            try:
                line_ts = _parse_exim_timestamp(line)
                if now_ts - line_ts > MAX_LOG_LOOKBACK_SECS:
                    continue
                if line_ts - now_ts > 300:
                    line_ts = now_ts
                # Record subject per account where available
                if ' <= ' in line or ' =>' in line:
                    msgid = _extract_message_id_from_exim_line(line) or ''
                    subj = _get_subject_for_message_id(msgid)
                    if subj:
                        acct = _extract_account_ref_from_exim_line(line)
                        if acct:
                            LAST_SUBJECT_BY_ACCOUNT[acct] = subj
                _update_counters_from_exim_line(line, thresholds, line_ts)
            except Exception:
                continue
        # Cleanup old dedupe entries
        cutoff = time.time() - SEEN_LOG_MSG_IDS_RETENTION_SECS
        while SEEN_LOG_MSG_IDS and SEEN_LOG_MSG_IDS[0][1] < cutoff:
            mid, _ = SEEN_LOG_MSG_IDS.popleft()
            SEEN_LOG_MSG_IDS_SET.discard(mid)
        cutoff_rcpt = time.time() - SEEN_MSG_RCPT_RETENTION_SECS
        while SEEN_MSG_RCPT and SEEN_MSG_RCPT[0][1] < cutoff_rcpt:
            key, _ = SEEN_MSG_RCPT.popleft()
            SEEN_MSG_RCPT_SET.discard(key)
    except Exception as e:
        print(f"[agent] Error processing exim logs: {e}")
def _extract_message_id_from_exim_line(line):
    """Extract the Exim queue id token that appears before '<=' on submission lines."""
    try:
        # Match the token immediately before '<='
        m = re.search(r"\b([A-Za-z0-9-]{6,})\b\s+<=", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None
def _extract_message_id_from_any_line(line):
    """Extract the Exim queue id from general log lines that start with timestamp then id."""
    try:
        m = re.search(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} ([A-Za-z0-9-]{6,})\b", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None
def _normalize_host(host: str) -> str:
    try:
        if not host:
            return ''
        h = host.strip()
        if h.startswith('[') and h.endswith(']'):
            h = h[1:-1]
        return h
    except Exception:
        return host or ''
def _extract_client_host_ip_from_line(line: str) -> tuple[str, str]:
    host = ''
    ip = ''
    try:
        m_parent = re.search(r"\bH=\(([^)]*)\)", line)
        if m_parent:
            host = (m_parent.group(1) or '').strip()
            if host.startswith('[') and host.endswith(']'):
                host = host[1:-1]
            m_after = re.search(r"\)\s*\[([^\]]+)\]", line)
            if m_after:
                ip = (m_after.group(1) or '').strip()
        else:
            m_host = re.search(r"\bH=([^\s]+)", line)
            if m_host:
                host = (m_host.group(1) or '').strip()
                if host.startswith('[') and host.endswith(']'):
                    host = host[1:-1]
            m_ip = re.search(r"\bH=[^\n]*?\[([^\]]+)\]", line)
            if m_ip:
                ip = (m_ip.group(1) or '').strip()
        bracket_values = re.findall(r"\[([^\]]+)\]", line)
        if not host and bracket_values:
            host = bracket_values[0].strip()
        if not ip and bracket_values:
            ip = bracket_values[-1].strip()
        return _normalize_host(host), ip
    except Exception:
        return '', ''
def _resolve_client_details(msg_id: str, client_dict: dict) -> tuple[str, str]:
    try:
        host = (client_dict or {}).get('h', '') or ''
        ip = (client_dict or {}).get('ip', '') or ''
        if (not host or host == '-') or (not ip or ip == '-'):
            fallback = MSG_ID_TO_CLIENT.get(msg_id) or {}
            if not host or host == '-':
                host = fallback.get('h', host) or host
            if not ip or ip == '-':
                ip = fallback.get('ip', ip) or ip
        host = _normalize_host(host)
        ip = (ip or '').strip()
        # strip any port if accidentally present
        if ip and ':' in ip:
            ip = ip.split(':', 1)[0]
        return host, ip
    except Exception:
        return '', ''
def _client_info_from_headers(headers: str) -> tuple[str, str]:
    host = ''
    ip = ''
    try:
        if not headers:
            return host, ip
        m_ip = re.search(r"-host_address\s+\[([^\]]+)\]", headers)
        if m_ip:
            ip = (m_ip.group(1) or '').strip()
        m_host = re.search(r"--helo_name\s+([^\s]+)", headers)
        if m_host:
            host = _normalize_host(m_host.group(1))
        if not host:
            m_recv = re.search(r"Received: from ([^\s]+)", headers)
            if m_recv:
                host = _normalize_host(m_recv.group(1))
    except Exception:
        pass
    return host, ip
def _fallback_events_from_queue_detail(detail: dict, sender_email: str, owner_key: str):
    events = []
    try:
        recipients = detail.get('recipients') or []
        if not recipients:
            rec = detail.get('recipient') or ''
            if rec:
                recipients = [rec]
        recipients = [r for r in recipients if r]
        subject = detail.get('subject') or ''
        headers = detail.get('headers') or detail.get('status') or ''
        client_h, client_ip = _client_info_from_headers(headers)
        qid = detail.get('queueId') or detail.get('messageId') or ''
        ts_now = time.time()
        base_sender = sender_email or detail.get('sender') or owner_key
        if not recipients:
            recipients = ['']
        for rcpt in recipients[:50]:
            events.append({
                'ts': ts_now,
                'sender': base_sender,
                'recipient': rcpt,
                'subject': subject,
                'client': {'h': client_h, 'ip': client_ip},
                'relay': {},
                'relayType': 'unknown',
                'qid': qid,
            })
    except Exception:
        pass
    return events
def _has_recent_log_events(owner_key: str, sender_email: str, window_secs: int) -> bool:
    try:
        if not owner_key or not sender_email:
            return False
        events = ACCOUNT_EVENTS.get(owner_key)
        if not events:
            return False
        cutoff = time.time() - max(window_secs, 0)
        sender_lc = sender_email.strip().lower()
        for ev in events:
            if ev.get('ts', 0) >= cutoff and (ev.get('sender', '') or '').strip().lower() == sender_lc:
                return True
        return False
    except Exception:
        return False
def _should_hold_owner(owner_key: str, sender_email: str) -> bool:
    try:
        if not owner_key:
            return False
        events = ACCOUNT_EVENTS.get(owner_key)
        if not events:
            return False
        sender_lc = (sender_email or '').strip().lower()
        for ev in reversed(events):
            ev_sender = (ev.get('sender', '') or '').strip().lower()
            if sender_lc and ev_sender != sender_lc:
                continue
            relay_type = (ev.get('relayType') or '').lower()
            if relay_type == 'local':
                return True
            if sender_lc:
                return False
        return False
    except Exception:
        return False
def _collect_events_for_report(owner_key: str, sender_email: str, window_secs: int):
    try:
        if not owner_key:
            return []
        evs_src = list(ACCOUNT_EVENTS.get(owner_key, deque()))
        if not evs_src:
            return []
        cutoff_ts = time.time() - max(window_secs, 0)
        sender_lc = (sender_email or '').strip().lower()
        events = [
            ev for ev in evs_src
            if ev.get('ts', 0) >= cutoff_ts and (not sender_lc or (ev.get('sender', '') or '').strip().lower() == sender_lc)
        ]
        events.sort(key=lambda e: e.get('ts', 0))
        return events
    except Exception:
        return []
def _build_report_text(owner_key: str, sender_email: str, reason_text: str,
                       window_secs: int, limit_label: str, events_window: list) -> str:
    lines = []
    lines.append("Freeze Report\n")
    lines.append(f"Reason: {reason_text}\n")
    lines.append(f"Owner: {owner_key}\n")
    lines.append(f"Sender: {sender_email}\n")
    try:
        types = [(ev.get('relayType') or 'unknown') for ev in events_window]
        relay_summary = 'auth' if 'auth' in types else ('local' if 'local' in types else 'unknown')
        lines.append(f"Relay: {relay_summary}\n")
    except Exception:
        pass
    _append_client_summary(lines, events_window)
    try:
        srcs = []
        seen = set()
        for ev in events_window:
            c = ev.get('client', {}) or {}
            ch = c.get('h', '') or ''
            cip = c.get('ip', '') or ''
            label = f"{ch}[{cip}]" if cip or ch else ''
            if label and label not in seen:
                seen.add(label)
                srcs.append(label)
                if len(srcs) >= 5:
                    break
        if srcs:
            lines.append(f"Client sources: {', '.join(srcs)}\n")
    except Exception:
        pass
    limit_text = limit_label or 'Limit: n/a'
    lines.append(f"Window: last {window_secs}s; {limit_text}\n")
    lines.append(f"Events counted in window: {len(events_window)}\n")
    lines.append("\nEvents:\n")
    if events_window:
        for ev in events_window[-200:]:
            ts_iso = datetime.utcfromtimestamp(ev.get('ts', 0)).isoformat()
            rcpt = ev.get('recipient', '')
            subj = ev.get('subject', '')
            client = ev.get('client', {})
            relay = ev.get('relay', {})
            client_h, client_ip = _resolve_client_details(ev.get('qid', ''), client)
            relay_ip = relay.get('ip', '')
            relay_h = relay.get('h', '')
            rtype = (ev.get('relayType') or '').lower()
            relay_str = 'local' if rtype == 'local' else f"{relay_h}[{relay_ip}]"
            lines.append(f"- {ts_iso} to={rcpt} subj=\"{subj}\" client={client_h}[{client_ip}] relay={relay_str} qid={ev.get('qid', '')}\n")
    else:
        lines.append("- No detailed events captured for this window.\n")
    return ''.join(lines)
def _save_and_upload_report(owner_key: str, sender_email: str, reason_text: str,
                            report_text: str, sid):
    REPORTS_DIR.mkdir(parents=True, exist_ok=True)
    now_dt = datetime.utcnow()
    now_str = now_dt.strftime('%Y%m%d-%H%M%S')
    created_at_iso = now_dt.isoformat()
    safe_sender = (sender_email or 'unknown').replace('@', '_')
    safe_owner = (owner_key or 'unknown').replace('@', '_')
    fname = f"{now_str}-{safe_owner}-{safe_sender}.txt"
    fpath = REPORTS_DIR / fname
    fpath.write_text(report_text)
    payload = {
        "serverId": str(sid) if sid is not None else str(CURRENT_SERVER_ID or ""),
        "senderEmail": sender_email,
        "account": owner_key,
        "reason": reason_text,
        "reportText": report_text,
        "reportKey": fname,
        "createdAt": created_at_iso
    }
    target_sid = payload["serverId"]
    if target_sid:
        try:
            session.post(f"{BASE_URL}/api/osm/agent/freeze-report", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            return
        except Exception as e:
            print(f"[agent] Failed to upload freeze report: {e}")
            try:
                _queue_pending_report(target_sid, sender_email, owner_key or '', reason_text, report_text, fname, created_at_iso)
            except Exception:
                pass
    else:
        try:
            _queue_pending_report(target_sid, sender_email, owner_key or '', reason_text, report_text, fname, created_at_iso)
        except Exception:
            pass
def _generate_freeze_report(account_ref: str, owner_key: str, sender_email: str,
                            reason_text: str, window_secs: int, limit_label: str,
                            sid, fallback_events: Optional[list] = None):
    if not FREEZE_REPORTS_ENABLED:
        print("[agent] Freeze report generation disabled; skipping report")
        return
    try:
        events_window = _collect_events_for_report(owner_key, sender_email, window_secs)
        if not events_window and fallback_events:
            events_window = list(fallback_events)
            try:
                acct_events = ACCOUNT_EVENTS.setdefault(owner_key, deque())
                for ev in fallback_events:
                    acct_events.append(ev)
            except Exception:
                pass
        report_text = _build_report_text(owner_key, sender_email, reason_text, window_secs, limit_label, events_window)
        _save_and_upload_report(owner_key, sender_email, reason_text, report_text, sid)
    except Exception as e:
        print(f"[agent] Failed to write/upload report: {e}")
def _parse_report_file(path: Path):
    try:
        text = path.read_text(encoding='utf-8')
    except Exception as e:
        print(f"[agent] Failed reading report {path.name}: {e}")
        return None
    reason_text = ""
    owner_text = ""
    sender_text = ""
    try:
        m_reason = re.search(r"^Reason:\s*(.+)$", text, re.MULTILINE)
        if m_reason:
            reason_text = m_reason.group(1).strip()
        m_owner = re.search(r"^Owner:\s*(.+)$", text, re.MULTILINE)
        if m_owner:
            owner_text = m_owner.group(1).strip()
        m_sender = re.search(r"^Sender:\s*(.+)$", text, re.MULTILINE)
        if m_sender:
            sender_text = m_sender.group(1).strip()
    except Exception as e:
        print(f"[agent] Failed parsing metadata in {path.name}: {e}")
    try:
        created_at = datetime.utcfromtimestamp(path.stat().st_mtime).isoformat()
    except Exception:
        created_at = datetime.utcnow().isoformat()
    return {
        "reportText": text,
        "reason": reason_text,
        "account": owner_text,
        "senderEmail": sender_text,
        "createdAt": created_at,
        "reportKey": path.name,
    }
def _extract_recipient_from_arrow_line(line):
    try:
        m = re.search(r"(?:=>|->)\s+([^\s]+)", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None
def _extract_sender_from_arrow_line(line):
    try:
        m = re.search(r"\bF=<([^>]+)>", line)
        if m:
            return m.group(1)
    except Exception:
        pass
    return None
def _extract_account_ref_from_exim_line(line):
    """Return account_ref like sender(user)."""
    try:
        # Prefer explicit 'from <sender>'
        m_from = re.search(r"from\s+<([^>\s]+@[^>\s]+)>", line)
        sender_email = m_from.group(1) if m_from else None
        if not sender_email:
            # Fallback to token after '<='
            m_after = re.search(r"<=\s*<?([^>\s]+@[^\s>]+)>?", line)
            if m_after:
                sender_email = m_after.group(1)
        if not sender_email:
            # Submission lines can also include F=<sender>
            m_f = re.search(r"\bF=<([^>]+)>", line)
            if m_f:
                candidate = (m_f.group(1) or '').strip()
                if candidate:
                    sender_email = candidate
        # Auth user: A=login:USER or A=dovecot_login:USER or courier_login
        m_auth = re.search(r"\bA=[A-Za-z0-9_-]+:([^\s]+)\b", line)
        auth_user = m_auth.group(1) if m_auth else None
        if not auth_user:
            # Local deliveries note the owner as U=username
            m_u = re.search(r"\bU=([^\s]+)\b", line)
            if m_u:
                auth_user = m_u.group(1)
        if auth_user:
            auth_user = auth_user.strip()
        # Compose account ref
        if sender_email and auth_user:
            return f"{sender_email}({auth_user})"
        if sender_email:
            return sender_email
        if auth_user:
            return auth_user
        return "Unknown"
    except Exception:
        return "Unknown"
def _update_counters_from_exim_line(line, thresholds, line_ts):
    """Parse Exim log line and update counters for tracking and per-account limits."""
    try:
        if 'diradm_user=' in line:
            msgid = _extract_message_id_from_any_line(line) or _extract_message_id_from_exim_line(line)
            if msgid:
                try:
                    m_user = re.search(r"diradm_user=([^\s]+)", line)
                    owner_user = m_user.group(1).strip().lower() if m_user else ''
                    if owner_user:
                        MSG_ID_TO_OWNER[msgid] = owner_user
                        existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
                        email_part = None
                        if existing_ref:
                            email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
                            email_part = (email_part or '').strip()
                        if not email_part:
                            m_addr = re.search(r"diradm_address=([^\s]+)", line)
                            if m_addr:
                                email_part = (m_addr.group(1) or '').strip()
                        if not email_part:
                            email_part = _extract_account_ref_from_exim_line(line)
                        if email_part:
                            MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
                        m_dom = re.search(r"diradm_domain=([^\s]+)", line)
                        if m_dom:
                            dom = (m_dom.group(1) or '').strip().lower().rstrip('.')
                            if dom:
                                DOMAIN_OWNER_MAP[dom] = owner_user
                        print(f"[agent] DirectAdmin owner resolved for msgid={msgid}: owner={owner_user}")
                except Exception as e:
                    print(f"[agent] DirectAdmin owner parse error: {e}")
        if 'Sender identification' in line:
            msgid = _extract_message_id_from_any_line(line)
            if msgid:
                try:
                    m_user = re.search(r"\bU=([^\s]+)", line)
                    if m_user:
                        owner_user = m_user.group(1).strip().lower()
                        if owner_user:
                            MSG_ID_TO_OWNER[msgid] = owner_user
                            existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
                            if existing_ref:
                                email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
                                email_part = (email_part or '').strip()
                                if '(' not in existing_ref or not existing_ref.endswith(')') or owner_user not in existing_ref:
                                    MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
                except Exception:
                    pass
            return
        # Only process lines with message submission ('<=' indicates incoming/outgoing)
        if ' <= ' not in line:
            # Also track deliveries ('=>' for first, '->' for additional recipients)
            if ' => ' in line or ' -> ' in line:
                # Delivery line format: => recipient ...
                msg_id = _extract_message_id_from_any_line(line)
                if not msg_id:
                    return
                # Only count deliveries for messages that originated locally or via auth relay
                rtype = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
                if rtype not in ('local', 'auth'):
                    return
                # Check dedupe for this (msgid, recipient) pair
                recipient = _extract_recipient_from_arrow_line(line)
                sender = _extract_sender_from_arrow_line(line) or MSG_ID_TO_ACCOUNT.get(msg_id, '')
                if not recipient:
                    return
                dedupe_key = (msg_id, recipient)
                if dedupe_key in SEEN_MSG_RCPT_SET:
                    return
                SEEN_MSG_RCPT.append((dedupe_key, line_ts))
                SEEN_MSG_RCPT_SET.add(dedupe_key)
                # Build account_ref from cached data or line
                account_ref = MSG_ID_TO_ACCOUNT.get(msg_id) or sender or 'Unknown'
                subject = MSG_ID_TO_SUBJECT.get(msg_id, '')
                # Record recipient with timestamp for this account
                try:
                    rec_map = RECIPIENTS_BY_ACCOUNT.setdefault(account_ref, {})
                    rec_map[recipient] = line_ts
                except Exception:
                    pass
                # Update per-owner event log (for freeze reports) - one event per delivery
                try:
                    owner = MSG_ID_TO_OWNER.get(msg_id) or _resolve_limit_account_key(account_ref)
                    client = MSG_ID_TO_CLIENT.get(msg_id, {})
                    relay = MSG_ID_TO_RELAY.get(msg_id, {})
                    relay_type = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
                    ev = {
                        'ts': line_ts,
                        'sender': account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref,
                        'recipient': recipient,
                        'subject': subject,
                        'qid': msg_id,
                        'client': client,
                        'relay': relay,
                        'relayType': relay_type
                    }
                    dq = ACCOUNT_EVENTS.setdefault(owner, deque())
                    dq.append(ev)
                    # Limit retention
                    cutoff = now_ts - ACCOUNT_EVENTS_RETENTION_SECS
                    while dq and dq[0].get('ts', 0) < cutoff:
                        dq.popleft()
                except Exception:
                    pass
                # Count each delivery (recipient) as one email for limits
                # Fetch account limits for inline enforcement
                try:
                    limits = fetch_account_limits(None)  # Use cached limits
                    lim_by_account = {}
                    for lim in limits:
                        if not lim.get('enabled', True):
                            continue
                        acc = (lim.get('account') or '').strip().lower()
                        max_emails = int(lim.get('maxEmails', 0))
                        per_seconds = int(lim.get('perSeconds', 60))
                        if acc and max_emails > 0 and per_seconds > 0:
                            lim_by_account[acc] = (max_emails, per_seconds)
                    print(f"[agent] Processing delivery: account_ref='{account_ref}', recipient='{recipient}', limits loaded={len(lim_by_account)}")
                    _record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
                except Exception as e:
                    print(f"[agent] Error in delivery limit check: {e}")
                    _record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
            return
        
        # Message submission line ('<=' indicates the message was received)
        msg_id = _extract_message_id_from_exim_line(line)
        if not msg_id or msg_id in SEEN_LOG_MSG_IDS_SET:
            return
        # Mark as seen
        SEEN_LOG_MSG_IDS.append((msg_id, line_ts))
        SEEN_LOG_MSG_IDS_SET.add(msg_id)
        # Extract account reference
        account_ref = _extract_account_ref_from_exim_line(line)
        if not account_ref or account_ref == "Unknown":
            return
        # Extract subject from line (T="...")
        subject = ''
        try:
            m_subj = re.search(r"\bT=\"([^\"]*)\"", line)
            if m_subj:
                subject = m_subj.group(1).strip()
        except Exception:
            pass
        # Cache for later delivery lines
        MSG_ID_TO_ACCOUNT[msg_id] = account_ref
        MSG_ID_TO_SUBJECT[msg_id] = subject
        # Parse client and relay info for freeze reports
        try:
            client_h, client_ip = _extract_client_host_ip_from_line(line)
            if client_h or client_ip:
                MSG_ID_TO_CLIENT[msg_id] = {'h': client_h, 'ip': client_ip}
            # Relay type: P=local, P=esmtpa (auth), etc.
            relay_type = 'unknown'
            line_lc = line.lower()
            if 'p=local' in line_lc:
                relay_type = 'local'
            elif 'a=' in line or 'p=esmtpa' in line_lc or 'p=esmtpsa' in line_lc:
                relay_type = 'auth'
            elif 'p=esmtps' in line_lc or 'p=esmtp' in line_lc or 'p=spam-scanned' in line_lc:
                relay_type = 'external'
            MSG_ID_TO_RELAY_TYPE[msg_id] = relay_type
            # Owner resolution
            sender_email = account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref
            owner = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account_ref)
            MSG_ID_TO_OWNER[msg_id] = owner
        except Exception:
            pass
        # Only count outgoing submissions (local or authenticated). Skip external/inbound.
        if MSG_ID_TO_RELAY_TYPE.get(msg_id) not in ('local', 'auth'):
            return
        # Fetch account limits for inline enforcement
        try:
            limits = fetch_account_limits(None)  # Use cached limits
            lim_by_account = {}
            for lim in limits:
                if not lim.get('enabled', True):
                    continue
                acc = (lim.get('account') or '').strip().lower()
                max_emails = int(lim.get('maxEmails', 0))
                per_seconds = int(lim.get('perSeconds', 60))
                if acc and max_emails > 0 and per_seconds > 0:
                    lim_by_account[acc] = (max_emails, per_seconds)
            _record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
        except Exception:
            _record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
    except Exception as e:
        pass  # Silent fail per line to keep processing
def _account_key_for_limits(account_ref: str) -> str:
    try:
        if '(' in account_ref and account_ref.endswith(')'):
            # sender(user)
            inside = account_ref[account_ref.rfind('(')+1:-1]
            if inside:
                return inside.lower()
        # else maybe it's just an email or username; prefer local-part for email
        m = re.match(r"([^@]+)@", account_ref)
        if m:
            return m.group(1).lower()
        return account_ref.strip().lower()
    except Exception:
        return account_ref.strip().lower()
def _prune_deque(dq: deque, cutoff_ts: float):
    while dq and dq[0] < cutoff_ts:
        dq.popleft()
def _reload_exim():
    try:
        # Prefer systemctl, fallback to service
        res = subprocess.run(['systemctl', 'reload', 'exim'], capture_output=True, text=True, timeout=5)
        if res.returncode != 0:
            subprocess.run(['service', 'exim', 'reload'], capture_output=True, text=True, timeout=5)
    except Exception as e:
        print(f"[agent] Exim reload failed: {e}")
def _ensure_freeze_list():
    try:
        if not FREEZE_LIST_PATH.exists():
            # Create file with safe perms
            FREEZE_LIST_PATH.parent.mkdir(parents=True, exist_ok=True)
            FREEZE_LIST_PATH.write_text("")
            FREEZE_LIST_PATH.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
    except Exception as e:
        print(f"[agent] Could not ensure freeze list: {e}")
def _ensure_virtual_files():
    try:
        VIRTUAL_DIR.mkdir(parents=True, exist_ok=True)
        if not OSM_HOLD_FILE.exists():
            OSM_HOLD_FILE.write_text("")
            OSM_HOLD_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
        if not OSM_DISABLE_FILE.exists():
            OSM_DISABLE_FILE.write_text("")
            OSM_DISABLE_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
    except Exception as e:
        print(f"[agent] Could not ensure /etc/virtual files: {e}")
def add_sender_to_freeze_list(sender_email: str, reason: str = None):
    """Legacy compatibility wrapper. OSM now relies on /etc/virtual/osm_hold only."""
    try:
        if not sender_email:
            return
        print(f"[agent] (notice) Skipping legacy freeze list update for {sender_email}")
    except Exception:
        pass
def remove_sender_from_freeze_list(sender_email: str):
    """Legacy compatibility wrapper. Nothing to remove from legacy list anymore."""
    try:
        sender_email = (sender_email or '').strip()
        if sender_email:
            print(f"[agent] (notice) Legacy freeze list disabled; nothing to remove for {sender_email}")
    except Exception:
        pass
    return False
def add_user_to_hold_list(username, sender_email=None, reason: str = None):
    try:
        username = (username or '').strip()
        email_full = (sender_email or '').strip()
        if not username and not email_full:
            return
        _ensure_virtual_files()
        existing = set()
        try:
            for line in OSM_HOLD_FILE.read_text().splitlines():
                s = (line or '').strip()
                if s:
                    existing.add(s)
        except Exception:
            pass
        to_add = []
        if username and username not in existing:
            to_add.append(username)
        if email_full and email_full not in existing:
            to_add.append(email_full)
        if to_add:
            with OSM_HOLD_FILE.open('a') as f:
                for item in to_add:
                    f.write(item + "\n")
            _reload_exim()
            print(f"[agent] Added to OSM hold list: {', '.join(to_add)}")
            # Mark local hold/freeze update time
            try:
                global LAST_LOCAL_HOLD_UPDATE_TS
                LAST_LOCAL_HOLD_UPDATE_TS = time.time()
            except Exception:
                pass
            # Notify server immediately so reconcile will retain this entry
            notified = False
            if BASE_URL and TOKEN and CURRENT_SERVER_ID is not None:
                try:
                    session.post(
                        f"{BASE_URL}/api/osm/agent/freeze",
                        json={
                            "serverId": str(CURRENT_SERVER_ID),
                            "senderEmail": email_full,
                            "account": username,
                            "reason": (reason or "locally added by agent")
                        },
                        headers=DEFAULT_HEADERS,
                        timeout=5,
                    )
                    notified = True
                except Exception as e:
                    print(f"[agent] Failed to notify server freeze (hold): {e}")
            if not notified:
                try:
                    _queue_action({
                        "type": "freeze",
                        "senderEmail": email_full or username,
                        "account": username or "",
                        "reason": reason or "locally added by agent",
                        "timestamp": time.time(),
                    })
                except Exception:
                    pass
    except Exception as e:
        print(f"[agent] Error updating OSM hold list: {e}")
def remove_user_from_hold_list(username=None, sender_email=None):
    try:
        targets = set()
        if username:
            targets.add(username.strip().lower())
        if sender_email:
            targets.add(sender_email.strip().lower())
        if not targets:
            return False
        _ensure_virtual_files()
        changed = False
        try:
            lines = []
            for line in OSM_HOLD_FILE.read_text().splitlines():
                entry = (line or '').strip()
                if entry and entry.lower() not in targets:
                    lines.append(entry)
                else:
                    changed = True
            if changed:
                OSM_HOLD_FILE.write_text("\n".join(lines) + ("\n" if lines else ""))
                _reload_exim()
                try:
                    global LAST_LOCAL_HOLD_UPDATE_TS
                    LAST_LOCAL_HOLD_UPDATE_TS = time.time()
                except Exception:
                    pass
        except FileNotFoundError:
            changed = False
        return changed
    except Exception as e:
        print(f"[agent] Error removing from OSM hold list: {e}")
        return False
def _maybe_trigger_freeze(account_ref, threshold, dq, sid=None):
    try:
        key = (account_ref, threshold["name"], threshold["seconds"])
        events_required = int(threshold["events"])
        if len(dq) >= events_required:
            last = LAST_TRIGGERED.get(key)
            now = time.time()
            # Avoid re-triggering too often: only once per window
            if not last or (now - last) >= threshold["seconds"]:
                sender_email = account_ref.split('(')[0]
                # Derive username for OSM hold list
                username = extract_username_from_account_ref(account_ref)
                owner_key = (_resolve_owner_from_email(sender_email) or username or '').strip()
                hold_owner = _should_hold_owner(owner_key, sender_email)
                username_for_hold = owner_key if hold_owner else ""
                # Local relay (hold_owner==True) → freeze account only; otherwise freeze email only
                add_user_to_hold_list(
                    username_for_hold,
                    None if hold_owner else sender_email,
                    reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s"
                )
                # Maintain legacy exim sender freeze list for generic setups
                add_sender_to_freeze_list(sender_email, reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s")
                LAST_TRIGGERED[key] = now
                reason_text = f"Exceeded {threshold['events']} in {threshold['seconds']}s"
                limit_label = f"Threshold: {threshold['events']} events"
                _generate_freeze_report(account_ref, owner_key or username, sender_email, reason_text, int(threshold["seconds"]), limit_label, sid)
                print(f"[agent] Threshold exceeded for {account_ref}: {len(dq)}/{events_required} in {threshold['seconds']}s (freeze)")
    except Exception as e:
        print(f"[agent] Freeze decision error: {e}")
def _record_event(account_ref, subject, thresholds, sid=None, lim_by_account=None, event_ts=None):
    event_time = event_ts if event_ts is not None else time.time()
    # Remember last subject (best-effort)
    try:
        sub = (subject or '').strip()
        if sub:
            # Collapse newlines/tabs and trim
            sub = re.sub(r"\s+", " ", sub)[:200]
            LAST_SUBJECT_BY_ACCOUNT[account_ref] = sub
    except Exception:
        pass
    owner_key_for_limits = _resolve_limit_account_key(account_ref)
    has_account_limit = bool(lim_by_account and owner_key_for_limits in lim_by_account)
    has_default_limit = bool(DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0)
    for t in thresholds:
        if (t.get("type") or "logline") != "logline":
            continue
        window = int(t["seconds"])
        key = (account_ref, t["name"], window)
        dq = LOG_COUNTERS[key]
        _prune_deque(dq, event_time - window)
        dq.append(event_time)
        LAST_COUNTER_SEEN_TS[key] = event_time
        if not has_account_limit and not has_default_limit:
            _maybe_trigger_freeze(account_ref, t, dq, sid)
    # Apply per-account limits from Manage
    try:
        if lim_by_account is not None:
            owner_key = owner_key_for_limits
            # Debug: print resolved owner and available limits
            if lim_by_account:
                print(f"[agent] Resolved owner_key='{owner_key}' from account_ref='{account_ref}', available limits={list(lim_by_account.keys())}")
            # 1) Account-specific limit
            if owner_key in lim_by_account:
                max_emails, per_seconds = lim_by_account[owner_key]
                dq = ACCOUNT_LIMIT_WINDOWS[owner_key]
                _prune_deque(dq, event_time - per_seconds)
                dq.append(event_time)
                # Also mirror this into tracking DB counters under a standard trigger name
                trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
                tdq = LOG_COUNTERS[trig_key]
                _prune_deque(tdq, event_time - per_seconds)
                tdq.append(event_time)
                if len(dq) > max_emails:
                    # Over limit: freeze sender
                    sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
                    reason_text = f"Exceeded {max_emails} in {per_seconds}s"
                    log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
                    if not log_evidence:
                        print(f"[agent] Queue limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
                    else:
                        hold_owner = _should_hold_owner(owner_key, sender_email)
                        username_for_hold = owner_key if hold_owner else ""
                        # Local relay → freeze account only; otherwise freeze email only
                        add_user_to_hold_list(
                            username_for_hold,
                            None if hold_owner else sender_email,
                            reason=reason_text
                        )
                        add_sender_to_freeze_list(sender_email, reason=reason_text)
                        limit_label = f"Limit: {max_emails}"
                        _generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
            # 2) Server default limit if no account-specific
            elif DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0:
                per_seconds = DEFAULT_PER_SECONDS
                max_emails = DEFAULT_MAX_EMAILS
                dq = ACCOUNT_LIMIT_WINDOWS[f"default::{owner_key}"]
                _prune_deque(dq, event_time - per_seconds)
                dq.append(event_time)
                # Mirror to tracking DB under default window
                trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
                tdq = LOG_COUNTERS[trig_key]
                _prune_deque(tdq, event_time - per_seconds)
                tdq.append(event_time)
                if len(dq) > max_emails:
                    sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
                    username = extract_username_from_account_ref(account_ref)
                    reason_text = f"Exceeded default {max_emails} in {per_seconds}s"
                    log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
                    if not log_evidence:
                        print(f"[agent] Default limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
                    else:
                        hold_owner = _should_hold_owner(owner_key, sender_email)
                        username_for_hold = owner_key if hold_owner else ""
                        # Local relay → freeze account only; otherwise freeze email only
                        add_user_to_hold_list(
                            username_for_hold,
                            None if hold_owner else sender_email,
                            reason=reason_text
                        )
                        add_sender_to_freeze_list(sender_email, reason=reason_text)
                        limit_label = f"Default Limit: {max_emails}"
                        _generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
    except Exception as e:
        print(f"[agent] Error applying per-account limits: {e}")
def write_tracking_db(thresholds):
    try:
        lines = []
        now_ts = time.time()
        valid_windows = {(t["name"], int(t["seconds"])) for t in thresholds if (t.get("type") or "logline") == "logline"}
        # Build a set of keys to write: active counters and recently seen counters
        keys_to_write = set(LOG_COUNTERS.keys())
        for k, ts in list(LAST_COUNTER_SEEN_TS.items()):
            if now_ts - ts <= COUNTER_RETENTION_SECS:
                keys_to_write.add(k)
        for (account_ref, trig_name, window) in sorted(keys_to_write):
            dq = LOG_COUNTERS.get((account_ref, trig_name, window), deque())
            if not dq and (trig_name, window) not in valid_windows and not trig_name.startswith("limit_"):
                continue
            # prune before counting
            _prune_deque(dq, now_ts - window)
            count = len(dq)
            # Skip if no count and outside retention
            last_ts = LAST_COUNTER_SEEN_TS.get((account_ref, trig_name, window), 0)
            if count == 0 and (now_ts - last_ts) > COUNTER_RETENTION_SECS:
                continue
            subject = (LAST_SUBJECT_BY_ACCOUNT.get(account_ref) or '').strip()
            subject = re.sub(r"\s+", " ", subject)[:200] if subject else ''
            # Compose recipients string
            rcpts = []
            try:
                rec_map = RECIPIENTS_BY_ACCOUNT.get(account_ref) or {}
                # prune again relative to now
                for r, t in list(rec_map.items()):
                    if now_ts - t > RECIPIENT_RETENTION_SECS:
                        rec_map.pop(r, None)
                rcpts = sorted(rec_map.keys())[:50]
            except Exception:
                rcpts = []
            lines.append(f"{count}event percent")
            lines.append(f"{window}seconds")
            # Append recipients after subject using rcpt-> comma-separated
            rcpt_part = f"rcpt->{','.join(rcpts)}" if rcpts else "rcpt->"
            lines.append(f"{account_ref}->{trig_name}->logline->subject->{subject} {rcpt_part}")
        TRACKING_DB_PATH.write_text("\n".join(lines) + ("\n" if lines else ""))
    except Exception as e:
        print(f"[agent] Error writing tracking DB: {e}")
def _get_subject_for_message_id(msg_id: str) -> str:
    """Fetch Subject header for a given exim message id using exim -Mvh."""
    try:
        if not msg_id:
            return ""
        res = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
        if res.returncode != 0:
            return ""
        headers = res.stdout
        m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
        if m:
            return m.group(1).strip()
    except Exception:
        pass
    return ""
def _extract_subject_from_headers_text(headers: str) -> str:
    try:
        if not headers:
            return ""
        m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
        if m:
            return m.group(1).strip()
    except Exception:
        pass
    return ""
def get_email_headers(msg_id):
    """Fetch email headers using exim -Mvh"""
    try:
        result = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            headers = result.stdout.strip()
            return headers
        else:
            return f"Error fetching headers: {result.stderr.strip()}"
    except Exception as e:
        return f"Error fetching headers: {str(e)}"
def get_email_body(msg_id):
    """Fetch email body using exim -Mvb"""
    try:
        result = subprocess.run(['exim', '-Mvb', msg_id], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            body = result.stdout.strip()
            return body
        else:
            return f"Error fetching body: {result.stderr.strip()}"
    except Exception as e:
        return f"Error fetching body: {str(e)}"
def _queue_pending_report(server_id: str, sender_email: str, account: str, reason: str, report_text: str, report_key: str, created_at: str):
    try:
        PENDING_REPORTS_DIR.mkdir(parents=True, exist_ok=True)
        fname = f"{int(time.time())}-{uuid.uuid4().hex}.json"
        payload = {
            "serverId": server_id,
            "senderEmail": sender_email,
            "account": account,
            "reason": reason,
            "reportText": report_text,
            "reportKey": report_key,
            "createdAt": created_at,
        }
        (PENDING_REPORTS_DIR / fname).write_text(json.dumps(payload), encoding='utf-8')
        print(f"[agent] Queued freeze report for retry: {fname}")
    except Exception as e:
        print(f"[agent] Failed to queue freeze report for retry: {e}")
def _flush_pending_reports():
    if not BASE_URL or not TOKEN or CURRENT_SERVER_ID is None:
        return
    if not FREEZE_REPORTS_ENABLED:
        return
    try:
        if not PENDING_REPORTS_DIR.exists():
            return
        for path in sorted(PENDING_REPORTS_DIR.glob('*.json')):
            try:
                data = json.loads(path.read_text(encoding='utf-8'))
            except Exception as e:
                print(f"[agent] Could not read pending report {path.name}: {e}")
                continue
            if "createdAt" not in data or not data.get("createdAt"):
                data["createdAt"] = datetime.utcnow().isoformat()
            try:
                session.post(f"{BASE_URL}/api/osm/agent/freeze-report", json=data, headers=DEFAULT_HEADERS, timeout=10)
                path.unlink()
                print(f"[agent] Uploaded pending freeze report: {path.name}")
            except Exception as e:
                print(f"[agent] Failed to upload pending freeze report {path.name}: {e}")
                # Stop after first failure to avoid hammering server
                break
    except Exception as e:
        print(f"[agent] Pending report flush failed: {e}")
def _process_pending_actions():
    sid = _get_current_server_id()
    if not BASE_URL or not TOKEN or not sid:
        return
    actions = _read_actions_queue()
    if not actions:
        return
    remaining = []
    for action in actions:
        typ = action.get('type')
        try:
            if typ == 'freeze':
                payload = {
                    "serverId": sid,
                    "senderEmail": action.get('senderEmail', ''),
                    "account": action.get('account') or '',
                    "reason": action.get('reason') or 'CLI freeze'
                }
                session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'unfreeze':
                payload = {
                    "serverId": sid,
                    "senderEmail": action.get('senderEmail', ''),
                    "account": action.get('account') or ''
                }
                session.post(f"{BASE_URL}/api/osm/agent/unfreeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'set_limit':
                payload = {
                    "serverId": sid,
                    "account": action.get('account', ''),
                    "maxEmails": int(action.get('maxEmails', 0)),
                    "perSeconds": int(action.get('perSeconds', 0)),
                    "enabled": bool(action.get('enabled', True))
                }
                session.post(f"{BASE_URL}/api/osm/agent/limit", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'remove_limit':
                payload = {
                    "serverId": sid,
                    "account": action.get('account', '')
                }
                session.post(f"{BASE_URL}/api/osm/agent/limit-remove", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            elif typ == 'set_default_limit':
                payload = {
                    "serverId": sid,
                    "maxEmails": int(action.get('maxEmails', 0)),
                    "perSeconds": int(action.get('perSeconds', 0)),
                    "enabled": bool(action.get('enabled', True))
                }
                session.post(f"{BASE_URL}/api/osm/agent/limit-default", json=payload, headers=DEFAULT_HEADERS, timeout=10)
            else:
                continue
        except Exception as e:
            print(f"[agent] Failed to sync action {typ}: {e}")
            remaining.append(action)
            continue
    _write_actions_queue(remaining)
def _append_client_summary(lines, events_window):
    try:
        ips = []
        hosts = []
        seen_ips = set()
        seen_hosts = set()
        for ev in events_window:
            client = ev.get('client', {}) or {}
            host, ip = _resolve_client_details(ev.get('qid') or '', client)
            if ip and ip not in seen_ips:
                seen_ips.add(ip)
                ips.append(ip)
            if host and host not in seen_hosts:
                seen_hosts.add(host)
                hosts.append(host)
        if ips:
            lines.append(f"Client IPs: {', '.join(ips)}\n")
        if hosts:
            lines.append(f"Client Hosts: {', '.join(hosts)}\n")
    except Exception:
        pass
def _sync_reports_with_server(force: bool = False):
    global LAST_REPORT_SYNC_TS
    if not BASE_URL or not TOKEN or CURRENT_SERVER_ID is None:
        return
    if not FREEZE_REPORTS_ENABLED:
        return
    now = time.time()
    if not force and (now - LAST_REPORT_SYNC_TS) < 60:
        return
    LAST_REPORT_SYNC_TS = now
    try:
        reports_payload = []
        if REPORTS_DIR.exists():
            for path in sorted(REPORTS_DIR.glob('*.txt')):
                parsed = _parse_report_file(path)
                if not parsed:
                    continue
                if not parsed.get('senderEmail'):
                    continue
                parsed['serverId'] = str(CURRENT_SERVER_ID)
                reports_payload.append(parsed)
        payload = {
            "serverId": str(CURRENT_SERVER_ID),
            "reports": reports_payload,
        }
        session.post(
            f"{BASE_URL}/api/osm/agent/report-sync",
            json=payload,
            headers=DEFAULT_HEADERS,
            timeout=10,
        )
    except Exception as e:
        print(f"[agent] Report sync failed: {e}")
def get_exim_queue_stats(stats):
    """Populate queue statistics using exim commands."""
    try:
        result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
        if result.returncode != 0:
            return stats
        lines = result.stdout.splitlines()
        details = []
        current = None
        for raw in lines:
            line = (raw or '').rstrip()
            if not line:
                continue
            stripped = line.lstrip()
            if not stripped:
                continue
            if stripped.startswith('**') or stripped.startswith('=='):
                continue
            is_header_line = False
            try:
                parts = stripped.split()
                if len(parts) < 3:
                    is_header_line = False
                else:
                    age_token = parts[0]
                    second_token = parts[1] if len(parts) > 1 else ''
                    third_token = parts[2] if len(parts) > 2 else ''
                    # very loose check: age token contains digit and second token is numeric-ish
                    size_token = second_token.upper()
                    if any(ch.isdigit() for ch in age_token) and (size_token.replace('.', '').replace('K','').replace('M','').isdigit()):
                        is_header_line = True
            except Exception:
                is_header_line = False
            if is_header_line:
                age = parts[0]
                size = parts[1]
                queue_id = parts[2]
                sender = ""
                if len(parts) >= 4:
                    sender = parts[3]
                if sender == "<=" and len(parts) >= 5:
                    sender = parts[4]
                current = {
                    "queueId": queue_id,
                    "messageId": queue_id,
                    "age": age,
                    "size": size,
                    "sender": sender,
                    "status": stripped,
                    "recipients": [],
                    "subject": "",
                    "headers": "",
                    "body": "",
                    "ownerAccount": "",
                }
                details.append(current)
                try:
                    headers = get_email_headers(queue_id)
                    if headers and not headers.startswith("Error fetching"):
                        current["headers"] = headers
                        current["subject"] = _extract_subject_from_headers_text(headers) or current["subject"]
                except Exception:
                    pass
                try:
                    subj_inline = _get_subject_for_message_id(queue_id)
                    if subj_inline:
                        current["subject"] = subj_inline
                except Exception:
                    pass
                continue
            if current is not None:
                recipient = stripped
                if recipient:
                    current["recipients"].append(recipient)
        # Optionally attach short recipient preview and body (lazy fetch only if small queue)
        for entry in details[:50]:
            try:
                if not entry.get("body"):
                    body = get_email_body(entry["queueId"])
                    if body and not body.startswith("Error fetching"):
                        # Avoid massive payloads
                        entry["body"] = body[:10000]
            except Exception:
                pass
            if entry.get("recipients"):
                entry["recipient"] = ", ".join(entry["recipients"])
            else:
                entry["recipient"] = ""
            if entry.get("headers"):
                owner = extract_account_from_detail({
                    "headers": entry["headers"],
                    "sender": entry.get("sender", ""),
                    "recipient": entry.get("recipient", "")
                })
                entry["ownerAccount"] = owner
        stats["queueDetails"] = details
        stats["totalMessages"] = len(details)
        stats["activeMessages"] = len(details)
        return stats
    except Exception as e:
        print(f"[agent] Error getting exim queue stats: {e}")
        return stats
def get_postfix_queue_stats(stats):
    """Get queue statistics using postfix commands"""
    try:
        # Get queue summary using postqueue
        result = subprocess.run(['postqueue', '-p'], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            output = result.stdout
            
            # Parse postqueue output
            lines = output.strip().split('\n')
            for line in lines[1:]:  # Skip header line
                if line.strip() and not line.startswith('-'):
                    parts = line.split()
                    if len(parts) >= 5:
                        queue_id = parts[0]
                        size = parts[1]
                        nrcpt = parts[2]
                        time_str = parts[3]
                        sender = parts[4] if len(parts) > 4 else ""
                        
                        stats["queueDetails"].append({
                            "queueId": queue_id,
                            "size": size,
                            "recipients": nrcpt,
                            "time": time_str,
                            "sender": sender,
                            "ownerAccount": (_resolve_owner_from_email(_extract_email(sender) or '') or '')
                        })
                        
                        # Categorize messages
                        if queue_id.startswith('active'):
                            stats["activeMessages"] += 1
                        elif queue_id.startswith('deferred'):
                            stats["deferredMessages"] += 1
                        elif queue_id.startswith('hold'):
                            stats["holdMessages"] += 1
                        elif queue_id.startswith('incoming'):
                            stats["incomingMessages"] += 1
                        else:
                            stats["outgoingMessages"] += 1
                            
                        stats["totalMessages"] += 1
        
    except Exception as e:
        print(f"[agent] Error getting postfix queue stats: {e}")
    
    return stats
def get_mail_queue_stats(queue_path: str):
    """Get mail queue statistics using appropriate mail server commands"""
    stats = {
        "totalMessages": 0,
        "activeMessages": 0,
        "deferredMessages": 0,
        "holdMessages": 0,
        "incomingMessages": 0,
        "outgoingMessages": 0,
        "queueDetails": [],
        "spamScores": [],
        "blockedDomains": []
    }
    mail_server = detect_mail_server()
    try:
        if mail_server == 'exim':
            return get_exim_queue_stats(stats)
        elif mail_server == 'postfix':
            return get_postfix_queue_stats(stats)
        else:
            # Fallback: try exim totals (noop) then postfix
            s = stats
            s = get_postfix_queue_stats(s)
            return s
    except Exception:
        return stats
def get_spam_scores(queue_path: str, mail_server: str):
    """Extract spam scores from mail headers"""
    spam_scores = []
    try:
        if mail_server == 'exim':
            spam_scores = get_exim_spam_scores()
        elif mail_server == 'postfix':
            spam_scores = get_postfix_spam_scores(queue_path)
        else:
            # Try both methods
            spam_scores = get_exim_spam_scores()
            if not spam_scores:
                spam_scores = get_postfix_spam_scores(queue_path)
    except Exception as e:
        print(f"[agent] Error getting spam scores: {e}")
    
    return spam_scores
def get_exim_spam_scores():
    """Get spam scores from exim queue using exim -Mvh"""
    spam_scores = []
    try:
        # Get list of queued messages (with sudo for permission)
        result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            lines = result.stdout.strip().split('\n')
            msg_ids = []
            for line in lines:
                if line.strip() and not line.startswith('-') and not line.startswith('Mail queue'):
                    parts = line.split()
                    if len(parts) >= 1:
                        msg_ids.append(parts[0])
            
            # Get headers for first few messages to check for spam scores
            for msg_id in msg_ids[:10]:  # Limit to 10 messages
                try:
                    header_result = subprocess.run(['sudo', 'exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
                    if header_result.returncode == 0:
                        headers = header_result.stdout
                        for line in headers.split('\n'):
                            if 'X-Spam-Score:' in line or 'X-Spam-Level:' in line or 'X-Spam-Status:' in line:
                                score_match = re.search(r'(\d+\.?\d*)', line)
                                if score_match:
                                    spam_scores.append({
                                        "messageId": msg_id,
                                        "score": float(score_match.group(1)),
                                        "header": line.strip()
                                    })
                except Exception:
                    continue
    except Exception as e:
        print(f"[agent] Error getting exim spam scores: {e}")
    
    return spam_scores
def get_postfix_spam_scores(queue_path: str):
    """Get spam scores from postfix queue files"""
    spam_scores = []
    try:
        # Look for recent messages in the queue
        queue_dirs = ['active', 'deferred', 'incoming']
        for queue_dir in queue_dirs:
            queue_dir_path = Path(queue_path) / queue_dir
            if queue_dir_path.exists():
                for msg_file in queue_dir_path.iterdir():
                    if msg_file.is_file():
                        try:
                            # Read first few lines to get headers
                            with open(msg_file, 'r', encoding='utf-8', errors='ignore') as f:
                                headers = []
                                for i, line in enumerate(f):
                                    if i > 50:  # Only read first 50 lines
                                        break
                                    headers.append(line.strip())
                                    if line.strip() == '':  # End of headers
                                        break
                                
                                # Look for spam score headers
                                for header in headers:
                                    if 'X-Spam-Score:' in header or 'X-Spam-Level:' in header:
                                        score_match = re.search(r'(\d+\.?\d*)', header)
                                        if score_match:
                                            spam_scores.append({
                                                "messageId": msg_file.name,
                                                "score": float(score_match.group(1)),
                                                "header": header
                                            })
                        except Exception:
                            continue
                        
                        if len(spam_scores) >= 20:  # Limit to 20 recent scores
                            break
    except Exception as e:
        print(f"[agent] Error getting postfix spam scores: {e}")
    
    return spam_scores
def check_blocked_domains(mail_server: str):
    """Check for blocked domains in recent mail"""
    blocked_domains = []
    try:
        if mail_server == 'exim':
            blocked_domains = check_exim_blocked_domains()
        elif mail_server == 'postfix':
            blocked_domains = check_postfix_blocked_domains()
        else:
            # Try both methods
            blocked_domains = check_exim_blocked_domains()
            blocked_domains.extend(check_postfix_blocked_domains())
    except Exception as e:
        print(f"[agent] Error checking blocked domains: {e}")
    
    return blocked_domains
def check_exim_blocked_domains():
    """Check exim logs for blocked domains"""
    blocked_domains = []
    try:
        # Check exim logs for blocked domains
        log_files = [
            '/var/log/exim/mainlog',
            '/var/log/exim/paniclog',
            '/var/log/mail.log',
            '/var/log/maillog'
        ]
        
        for log_file in log_files:
            if os.path.exists(log_file):
                try:
                    # Get recent log entries
                    result = subprocess.run(
                        ['tail', '-100', log_file], 
                        capture_output=True, text=True, timeout=5
                    )
                    if result.returncode == 0:
                        for line in result.stdout.split('\n'):
                            if any(keyword in line.lower() for keyword in ['blocked', 'reject', 'denied', 'blacklisted']):
                                # Extract domain from log line
                                domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
                                if domain_match:
                                    domain = domain_match.group(1)
                                    if domain not in [bd['domain'] for bd in blocked_domains]:
                                        blocked_domains.append({
                                            "domain": domain,
                                            "timestamp": datetime.now().isoformat(),
                                            "reason": line.strip()
                                        })
                except Exception:
                    continue
    except Exception as e:
        print(f"[agent] Error checking exim blocked domains: {e}")
    
    return blocked_domains
def check_postfix_blocked_domains():
    """Check postfix logs for blocked domains"""
    blocked_domains = []
    try:
        # Check postfix logs for blocked domains
        log_files = [
            '/var/log/mail.log',
            '/var/log/postfix.log',
            '/var/log/maillog'
        ]
        
        for log_file in log_files:
            if os.path.exists(log_file):
                try:
                    # Get recent log entries
                    result = subprocess.run(
                        ['tail', '-100', log_file], 
                        capture_output=True, text=True, timeout=5
                    )
                    if result.returncode == 0:
                        for line in result.stdout.split('\n'):
                            if 'blocked' in line.lower() or 'reject' in line.lower():
                                # Extract domain from log line
                                domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
                                if domain_match:
                                    domain = domain_match.group(1)
                                    if domain not in [bd['domain'] for bd in blocked_domains]:
                                        blocked_domains.append({
                                            "domain": domain,
                                            "timestamp": datetime.now().isoformat(),
                                            "reason": line.strip()
                                        })
                except Exception:
                    continue
    except Exception as e:
        print(f"[agent] Error checking postfix blocked domains: {e}")
    
    return blocked_domains
def detect_mail_server():
    """Detect which mail server (Exim/Postfix) is available on this host."""
    try:
        # Prefer explicit binaries first
        if shutil.which('exim') or Path('/usr/sbin/exim').exists():
            return 'exim'
        if shutil.which('postqueue') or shutil.which('postfix'):
            return 'postfix'
        # Fallback heuristics: check for config directories
        if Path('/etc/exim').exists() or Path('/var/log/exim').exists():
            return 'exim'
        if Path('/etc/postfix').exists():
            return 'postfix'
    except Exception:
        pass
    return 'unknown'
def detect_control_panel():
    """Detect which control panel is installed (cpanel, directadmin, or unknown)."""
    try:
        if os.path.isdir('/usr/local/cpanel'):
            return 'cpanel'
        if os.path.isdir('/usr/local/directadmin'):
            return 'directadmin'
        # Try binaries
        try:
            res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
            if res.returncode == 0:
                return 'cpanel'
        except Exception:
            pass
        return 'unknown'
    except Exception:
        return 'unknown'
def list_cpanel_accounts():
    """List all cPanel accounts."""
    accounts = []
    try:
        # Try whmapi1 listaccts
        res = subprocess.run(['whmapi1', 'listaccts'], capture_output=True, text=True, timeout=30)
        if res.returncode == 0:
            import json
            try:
                data = json.loads(res.stdout)
                accts = data.get('data', {}).get('acct', [])
                for a in accts:
                    user = a.get('user', '')
                    domain = a.get('domain', '')
                    if user:
                        accounts.append({'username': user, 'domain': domain})
            except Exception:
                pass
        # Fallback: parse /etc/trueuserdomains
        if not accounts:
            p = Path('/etc/trueuserdomains')
            if p.exists():
                for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
                    if ':' in line:
                        parts = line.split(':', 1)
                        if len(parts) == 2:
                            domain = parts[0].strip()
                            user = parts[1].strip()
                            if user and domain:
                                accounts.append({'username': user, 'domain': domain})
    except Exception as e:
        print(f"[agent] Error listing cPanel accounts: {e}")
    return accounts
def list_directadmin_accounts():
    """List all DirectAdmin accounts."""
    accounts = []
    try:
        users_dir = Path('/usr/local/directadmin/data/users')
        if users_dir.exists() and users_dir.is_dir():
            for user_dir in users_dir.iterdir():
                if user_dir.is_dir():
                    username = user_dir.name
                    # Try to get primary domain
                    domain = ''
                    try:
                        domain_file = user_dir / 'domains.list'
                        if domain_file.exists():
                            domains = domain_file.read_text(encoding='utf-8', errors='ignore').splitlines()
                            if domains:
                                domain = domains[0].strip()
                    except Exception:
                        pass
                    accounts.append({'username': username, 'domain': domain})
    except Exception as e:
        print(f"[agent] Error listing DirectAdmin accounts: {e}")
    return accounts
def send_queue_snapshot(sid: int, stats: dict):
    """Send queue snapshot to server"""
    if not QUEUE_INGEST_ENABLED:
        print(f"[agent] Queue ingestion disabled; not sending snapshot")
        return
    try:
        # Debug: print what we're sending
        print(f"[agent] Sending snapshot with {len(stats['queueDetails'])} queue details")
        if stats["queueDetails"]:
            first_detail = stats["queueDetails"][0]
            print(f"[agent] First detail: queueId={first_detail.get('queueId')}, headers_len={len(first_detail.get('headers', ''))}, body_len={len(first_detail.get('body', ''))}")
        
        # Test JSON serialization first
        try:
            queue_details_json = json.dumps(stats["queueDetails"])
            print(f"[agent] JSON serialization successful, size: {len(queue_details_json)} chars")
        except Exception as json_error:
            print(f"[agent] JSON serialization error: {json_error}")
            # Try with limited data
            limited_details = []
            for detail in stats["queueDetails"][:5]:  # Only first 5 messages
                limited_detail = {
                    "queueId": detail.get("queueId", ""),
                    "size": detail.get("size", ""),
                    "recipients": detail.get("recipients", ""),
                    "time": detail.get("time", ""),
                    "sender": detail.get("sender", ""),
                    "recipient": detail.get("recipient", ""),
                    "headers": detail.get("headers", "")[:1000] + "..." if len(detail.get("headers", "")) > 1000 else detail.get("headers", ""),
                    "body": detail.get("body", "")[:1000] + "..." if len(detail.get("body", "")) > 1000 else detail.get("body", "")
                }
                limited_details.append(limited_detail)
            queue_details_json = json.dumps(limited_details)
            print(f"[agent] Using limited data, size: {len(queue_details_json)} chars")
        
        payload = {
            "serverId": str(sid),
            "totalMessages": stats["totalMessages"],
            "activeMessages": stats["activeMessages"],
            "deferredMessages": stats["deferredMessages"],
            "holdMessages": stats["holdMessages"],
            "incomingMessages": stats["incomingMessages"],
            "outgoingMessages": stats["outgoingMessages"],
            "queueDetailsJson": queue_details_json,
            "spamScoreJson": json.dumps(stats["spamScores"]),
            "blockedDomainsJson": json.dumps(stats["blockedDomains"])
        }
        
        print(f"[agent] Sending payload with total size: {len(json.dumps(payload))} chars")
        resp = session.post(f"{BASE_URL}/api/osm/agent/queue-snapshot", json=payload, headers=DEFAULT_HEADERS, timeout=30)
        resp.raise_for_status()
        print(f"[agent] Queue snapshot sent successfully")
        
    except Exception as e:
        print(f"[agent] Error sending queue snapshot: {e}")
        import traceback
        print(f"[agent] Traceback: {traceback.format_exc()}")
def send_accounts_snapshot(sid: int):
    """Detect panel and send accounts list to server."""
    try:
        panel = 'unknown'
        accounts = []
        # simple detection without shutil to avoid import if not present
        if os.path.isdir('/usr/local/cpanel'):
            panel = 'cpanel'
            accounts = list_cpanel_accounts()
        elif os.path.isdir('/usr/local/directadmin'):
            panel = 'directadmin'
            accounts = list_directadmin_accounts()
        else:
            # Try binaries
            try:
                res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
                if res.returncode == 0:
                    panel = 'cpanel'
                    accounts = list_cpanel_accounts()
            except Exception:
                pass
            if panel == 'unknown' and Path('/usr/local/directadmin').exists():
                panel = 'directadmin'
                accounts = list_directadmin_accounts()
        payload = {
            "serverId": str(sid),
            "panelType": panel,
            "accountsJson": json.dumps(accounts)
        }
        print(f"[agent] Sending accounts snapshot: panel={panel}, count={len(accounts)}")
        resp = session.post(f"{BASE_URL}/api/osm/agent/accounts", json=payload, headers=DEFAULT_HEADERS, timeout=15)
        resp.raise_for_status()
        print(f"[agent] Accounts snapshot sent successfully (panel={panel}, count={len(accounts)})")
    except Exception as e:
        print(f"[agent] Error sending accounts snapshot: {e}")
def fetch_account_limits(sid=None):
    # If offline or no server id, use cached limits only
    if sid is None or not BASE_URL or not TOKEN:
        cache = _read_limits_cache()
        return cache.get("accounts", [])
    try:
        resp = session.get(f"{BASE_URL}/api/osm/agent/account-limits", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
        resp.raise_for_status()
        data = resp.json() or []
        cache = _read_limits_cache()
        cache["accounts"] = [
            {
                "account": str(x.get("account") or "").strip(),
                "maxEmails": int(x.get("maxEmails") or 0),
                "perSeconds": int(x.get("perSeconds") or 0),
                "enabled": bool(x.get("enabled", True))
            }
            for x in data
        ]
        _write_limits_cache(cache)
        return data
    except Exception as e:
        print(f"[agent] Error fetching account limits: {e}")
        cache = _read_limits_cache()
        return cache.get("accounts", [])
def parse_age_to_seconds(age_str: str) -> int:
    try:
        s = age_str.strip()
        if not s:
            return 0
        unit = s[-1]
        val = int(''.join(ch for ch in s if ch.isdigit())) if any(ch.isdigit() for ch in s) else 0
        if unit == 's':
            return val
        if unit == 'm':
            return val * 60
        if unit == 'h':
            return val * 3600
        if unit == 'd':
            return val * 86400
        return val
    except Exception:
        return 0
def extract_account_from_detail(detail: dict) -> str:
    # Try headers for -ident or -auth_sender then fallback to recipient/sender
    try:
        headers = detail.get('headers') or ''
        if headers:
            m = re.search(r"-ident\s+(\w+)", headers)
            if m:
                return m.group(1)
            m = re.search(r"-auth_sender\s+([^@\s]+)@", headers)
            if m:
                return m.group(1)
            m = re.search(r"Received: from (\w+) by", headers)
            if m:
                return m.group(1)
        recip = (detail.get('recipient') or '').strip()
        if recip.startswith('(') and recip.endswith(')') and len(recip) > 2:
            return recip[1:-1]
        sender = (detail.get('sender') or '')
        m = re.search(r"<([^@>]+)@", sender)
        if m:
            return m.group(1)
    except Exception:
        pass
    return 'Unknown'
def freeze_message_exim(queue_id: str):
    try:
        subprocess.run(['exim', '-Mf', queue_id], capture_output=True, text=True, timeout=5)
    except Exception as e:
        print(f"[agent] exim freeze failed for {queue_id}: {e}")
def hold_message_postfix(queue_id: str):
    try:
        subprocess.run(['postsuper', '-h', queue_id], capture_output=True, text=True, timeout=5)
    except Exception as e:
        print(f"[agent] postfix hold failed for {queue_id}: {e}")
def enforce_account_limits(sid: int, stats: dict):
    # Live rolling window using first-seen timestamps per message
    # State containers (persist across calls via attributes on function)
    if not hasattr(enforce_account_limits, "seen_ids"):
        enforce_account_limits.seen_ids = set()
    if not hasattr(enforce_account_limits, "account_windows"):
        enforce_account_limits.account_windows = defaultdict(deque)  # account -> deque[timestamps]
    limits = fetch_account_limits(sid)
    if not limits:
        return
    now_ts = time.time()
    details = stats.get('queueDetails') or []
    mail_server = detect_mail_server()
    # Index limits for quick lookup
    lim_by_account = {}
    for lim in limits:
        try:
            if not lim.get('enabled', True):
                continue
            account = (lim.get('account') or '').strip().lower()
            max_emails = int(lim.get('maxEmails', 0))
            per_seconds = int(lim.get('perSeconds', 60))
            if account and max_emails > 0 and per_seconds > 0:
                lim_by_account[account] = (max_emails, per_seconds)
        except Exception:
            continue
    if not lim_by_account:
        return
    for d in details:
        log_evidence = False
        try:
            qid = d.get('queueId') or d.get('messageId')
            if not qid or qid in enforce_account_limits.seen_ids:
                continue
            account = extract_account_from_detail(d)
            # Resolve owner from sender email (prefer) else map account_ref to owner key
            sender_email = _extract_email(d.get('sender') or '') or _extract_email(account) or ''
            owner_key = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account)
            if owner_key not in lim_by_account:
                continue
            # Mark first seen and evaluate window
            enforce_account_limits.seen_ids.add(qid)
            max_emails, per_seconds = lim_by_account[owner_key]
            dq = enforce_account_limits.account_windows[owner_key]
            # prune old timestamps
            cutoff = now_ts - per_seconds
            while dq and dq[0] < cutoff:
                dq.popleft()
            dq.append(now_ts)
            if len(dq) > max_emails:
                # Over limit: act on this message immediately
                print(f"[agent] Limit exceeded for {owner_key}: {len(dq)}/{max_emails} in {per_seconds}s. Freezing/holding {qid}.")
                if mail_server == 'exim':
                    freeze_message_exim(qid)
                elif mail_server == 'postfix':
                    hold_message_postfix(qid)
                # Also add to hold lists for sustained blocking
                reason_text = f"Exceeded {max_emails} in {per_seconds}s (count={len(dq)})"
                log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
                if not log_evidence:
                    print(f"[agent] Queue enforcement detected over-limit for {owner_key} but no recent exim log activity; skipping persistent freeze.")
                else:
                    hold_owner = _should_hold_owner(owner_key, sender_email)
                    username_for_hold = owner_key if hold_owner else ""
                    try:
                        # Local relay → freeze account only; otherwise freeze email only
                        add_user_to_hold_list(
                            username_for_hold,
                            None if hold_owner else sender_email,
                            reason=reason_text
                        )
                        add_sender_to_freeze_list(sender_email, reason=reason_text)
                    except Exception as e:
                        print(f"[agent] Failed to update hold/freeze lists: {e}")
                    # Inform server so reconcile won't remove our local entry
                    fallback_events = _fallback_events_from_queue_detail(d, sender_email, owner_key)
                    limit_label = f"Limit: {max_emails}"
                    _generate_freeze_report(account, owner_key, sender_email, reason_text, per_seconds, limit_label, sid, fallback_events if fallback_events else None)
            else:
                log_evidence = False
        except Exception as e:
            print(f"[agent] Error enforcing per-message limit: {e}")
            
        except Exception:
            pass
def check_spam_thresholds(sid: int, stats: dict, config: dict):
    """Check if spam thresholds are exceeded and send alerts"""
    try:
        spam_threshold = config.get("spamThreshold", 5.0)
        high_spam_count = 0
        
        for spam_score in stats["spamScores"]:
            if spam_score["score"] >= spam_threshold:
                high_spam_count += 1
        
        if high_spam_count > 0:
            alert_payload = {
                "serverId": str(sid),
                "alertType": "HIGH_SPAM_SCORE",
                "severity": "WARNING" if high_spam_count < 10 else "CRITICAL",
                "message": f"Found {high_spam_count} messages with spam score >= {spam_threshold}",
                "detailsJson": json.dumps({
                    "highSpamCount": high_spam_count,
                    "threshold": spam_threshold,
                    "samples": stats["spamScores"][:5]  # First 5 samples
                })
            }
            
            resp = session.post(f"{BASE_URL}/api/osm/agent/spam-alert", json=alert_payload, headers=DEFAULT_HEADERS, timeout=10)
            resp.raise_for_status()
            
    except Exception as e:
        print(f"[agent] Error checking spam thresholds: {e}")
def _execute_queue_action_delete(ids):
    try:
        ok = 0
        for qid in ids:
            if not qid:
                continue
            try:
                res = subprocess.run(['exim', '-Mrm', qid], capture_output=True, text=True, timeout=10)
                if res.returncode == 0:
                    ok += 1
                else:
                    print(f"[agent] exim -Mrm {qid} failed: rc={res.returncode} stdout={res.stdout.strip()} stderr={res.stderr.strip()}")
            except Exception as e:
                print(f"[agent] delete_queue exception for {qid}: {e}")
                continue
        return ok
    except Exception as e:
        print(f"[agent] delete_queue failed: {e}")
        return 0
def _execute_queue_action_thaw(ids):
    try:
        ok = 0
        for qid in ids:
            if not qid:
                continue
            try:
                res = subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
                if res.returncode == 0:
                    ok += 1
            except Exception:
                continue
        return ok
    except Exception as e:
        print(f"[agent] thaw_queue failed: {e}")
        return 0
def _execute_queue_action_release(ids):
    """Release a frozen message: thaw and then schedule delivery."""
    try:
        ok = 0
        for qid in ids:
            if not qid:
                continue
            try:
                # Thaw first
                subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
                # Then attempt immediate delivery
                res = subprocess.run(['exim', '-M', qid], capture_output=True, text=True, timeout=15)
                if res.returncode == 0:
                    ok += 1
            except Exception:
                continue
        return ok
    except Exception as e:
        print(f"[agent] release_queue failed: {e}")
        return 0
def process_sse_event(event_data: str, sid: int):
    """Process a single SSE event containing a command.
    Note: Only events that include a non-empty 'type' are considered commands.
    """
    try:
        cmd = json.loads(event_data)
        cid = cmd.get('id') or ''
        ctype = (cmd.get('type') or '').strip().lower()
        payload_json = cmd.get('payloadJson') or '{}'
        if not ctype:
            # Ignore non-command SSE payloads (e.g., queue-update snapshots)
            return
        print(f"[agent] SSE event received: id={cid} type={ctype}")
        result_msg = ''
        try:
            if isinstance(payload_json, (dict, list)):
                payload = payload_json
            else:
                payload = json.loads(payload_json)
        except Exception:
            payload = {}
        ids = payload.get('queueIds') or []
        if ctype == 'delete_queue':
            n = _execute_queue_action_delete(ids)
            result_msg = f"deleted {n}/{len(ids)}"
            print(f"[agent] delete_queue: {result_msg}")
        elif ctype == 'thaw_queue':
            n = _execute_queue_action_thaw(ids)
            result_msg = f"thawed {n}/{len(ids)}"
            print(f"[agent] thaw_queue: {result_msg}")
        elif ctype == 'release_queue':
            n = _execute_queue_action_release(ids)
            result_msg = f"released {n}/{len(ids)}"
            print(f"[agent] release_queue: {result_msg}")
        elif ctype == 'snapshot_queue':
            # Just send a fresh snapshot
            try:
                cfg = fetch_config(sid) or {}
                if not QUEUE_INGEST_ENABLED or not cfg.get("queueIngestEnabled", True):
                    result_msg = "snapshot skipped (queue ingest disabled)"
                    print("[agent] snapshot_queue: skipped, queue ingest disabled")
                else:
                    stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
                    send_queue_snapshot(sid, stats)
                    result_msg = "snapshot sent"
                    print("[agent] snapshot_queue: snapshot sent")
            except Exception as e:
                result_msg = f"snapshot failed: {e}"
                print(f"[agent] snapshot_queue failed: {e}")
        elif ctype == 'verify_email':
            try:
                payload = json.loads(payload_json)
            except Exception:
                payload = {}
            email = (payload.get('email') or '').strip().lower()
            exists = False
            owner = ''
            def _directadmin_mailbox_exists(addr: str) -> bool:
                try:
                    if '@' not in addr:
                        return False
                    localpart, domain = addr.split('@', 1)
                    domain = domain.strip().lower().rstrip('.')
                    localpart = localpart.strip().lower()
                    passwd_path = Path(f"/etc/virtual/{domain}/passwd")
                    if not passwd_path.exists():
                        return False
                    try:
                        for line in passwd_path.read_text(encoding='utf-8', errors='ignore').splitlines():
                            # DirectAdmin passwd format: localpart:password_hash:...
                            if not line or ':' not in line:
                                continue
                            lp = line.split(':', 1)[0].strip().lower()
                            if lp == localpart:
                                return True
                    except Exception:
                        return False
                    return False
                except Exception:
                    return False
            try:
                # Resolve owner by domain mapping
                owner = _resolve_owner_from_email(email) or ''
                # Check if any queued or recent message references this sender
                qstats = get_mail_queue_stats('/var/spool/postfix')
                for d in qstats.get('queueDetails', []):
                    s = (d.get('sender') or '').lower()
                    if email and email in s:
                        exists = True
                        break
                # If not in queue, check mailbox existence on DirectAdmin
                if not exists:
                    if detect_control_panel() == 'directadmin':
                        if _directadmin_mailbox_exists(email):
                            exists = True
            except Exception:
                pass
            result_msg = f"exists={'true' if exists else 'false'};owner={owner}"
        elif ctype == 'update_limits_cache':
            try:
                payload = json.loads(payload_json)
            except Exception:
                payload = {}
            # Validate and write to cache
            d = payload.get('default') or {}
            accs = payload.get('accounts') or []
            if isinstance(accs, list):
                clean_accs = []
                for x in accs:
                    try:
                        clean_accs.append({
                            'account': str((x.get('account') if isinstance(x, dict) else '') or '').strip(),
                            'maxEmails': int((x.get('maxEmails') if isinstance(x, dict) else 0) or 0),
                            'perSeconds': int((x.get('perSeconds') if isinstance(x, dict) else 0) or 0),
                            'enabled': bool((x.get('enabled') if isinstance(x, dict) else True))
                        })
                    except Exception:
                        continue
            else:
                clean_accs = []
            cache = {
                'default': {
                    'enabled': bool(d.get('enabled') or False),
                    'maxEmails': int(d.get('maxEmails') or 0),
                    'perSeconds': int(d.get('perSeconds') or 0)
                },
                'accounts': clean_accs
            }
            _write_limits_cache(cache)
            # Apply defaults immediately in-memory
            try:
                global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
                DEFAULT_MAX_EMAILS = cache['default']['maxEmails']
                DEFAULT_PER_SECONDS = cache['default']['perSeconds']
                DEFAULT_ENABLED = cache['default']['enabled']
            except Exception:
                pass
            result_msg = 'limits updated'
            print("[agent] update_limits_cache: limits updated")
        elif ctype == 'reconcile_freeze':
            try:
                # Server-initiated reconcile: prefer the server state to avoid re-adding a user just un-frozen from the app
                reconcile_freeze_file_with_server(sid, prefer_server=True)
                result_msg = 'reconciled (server-wins)'
            except Exception as e:
                result_msg = f'reconcile failed: {e}'
            print(f"[agent] reconcile_freeze: {result_msg}")
        elif ctype == 'self_update':
            # Perform self-update in a detached shell so we can ack immediately
            try:
                print("[agent] self_update: scheduling self-update...")
                update_sh = f"""#!/usr/bin/env bash
set -euo pipefail
APP_DIR=/opt/osm
TMP_ZIP=$(mktemp)
echo "[agent] Starting self-update from {BASE_URL}"
curl -fsSL "{BASE_URL}/osm/agent/download" -o "$TMP_ZIP" || {{ echo "Download failed"; exit 1; }}
echo "[agent] Downloaded agent package"
mkdir -p "$APP_DIR"
unzip -o "$TMP_ZIP" -d "$APP_DIR" || {{ echo "Unzip failed"; exit 1; }}
rm -f "$TMP_ZIP"
echo "[agent] Extracted to $APP_DIR"
# Set up Python environment
if [ ! -x "$APP_DIR/venv/bin/python" ]; then
  echo "[agent] Creating Python virtual environment"
  if [ -x "/opt/osm/py310/bin/python3.10" ]; then PY_BIN="/opt/osm/py310/bin/python3.10";
  elif command -v python3.10 >/dev/null 2>&1; then PY_BIN=$(command -v python3.10);
  elif [ -x /usr/bin/python3.10 ]; then PY_BIN=/usr/bin/python3.10;
  elif [ -x /usr/local/bin/python3.10 ]; then PY_BIN=/usr/local/bin/python3.10;
  else PY_BIN=$(command -v python3 || echo /usr/bin/python3); fi
  "$PY_BIN" -m venv "$APP_DIR/venv" || {{ echo "Virtual environment creation failed"; exit 1; }}
fi
echo "[agent] Updating Python packages"
"$APP_DIR/venv/bin/pip" install --upgrade pip || true
"$APP_DIR/venv/bin/pip" install --upgrade requests psutil || echo "Package update warning"
echo "[agent] Setting permissions"
chmod +x "$APP_DIR/uninstall-agent.sh" 2>/dev/null || true
chmod +x "$APP_DIR/osm" 2>/dev/null || true
ln -sf "$APP_DIR/osm" /usr/local/bin/osm 2>/dev/null || true
echo "[agent] Restarting agent service"
systemctl restart osm 2>/dev/null || {{
  echo "[agent] Systemd restart failed, trying direct restart"
  pkill -f "python.*main.py" 2>/dev/null || true
  sleep 2
  cd "$APP_DIR" && nohup "$APP_DIR/venv/bin/python" main.py >/dev/null 2>&1 &
}}
echo "[agent] Self-update completed successfully"
"""
                # Write and run
                upd = Path('/tmp/osm_self_update.sh')
                upd.write_text(update_sh)
                os.chmod(str(upd), 0o755)
                # Run in background with proper logging
                subprocess.Popen(['nohup', 'bash', str(upd), '&>', '/tmp/osm_update.log', '&'],
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                result_msg = 'update scheduled'
                print(f"[agent] Self-update scheduled, check /tmp/osm_update.log for details")
            except Exception as e:
                result_msg = f'update failed: {e}'
                print(f"[agent] Self-update error: {e}")
        elif ctype == 'uninstall':
            # Execute uninstall script
            try:
                uninstall_script = Path(AGENT_DIR) / 'uninstall-agent.sh'
                if uninstall_script.exists():
                    # Make sure it's executable
                    os.chmod(str(uninstall_script), 0o755)
                    # Run it in background so we can ack first
                    subprocess.Popen(['nohup', 'bash', str(uninstall_script)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                    result_msg = 'uninstall started'
                else:
                    result_msg = 'uninstall script not found'
            except Exception as e:
                result_msg = f'uninstall failed: {e}'
        if cid:
            try:
                session.post(f"{BASE_URL}/api/osm/agent/command-ack", json={
                    "serverId": str(sid),
                    "commandId": cid,
                    "resultMessage": result_msg
                }, headers=DEFAULT_HEADERS, timeout=10)
                print(f"[agent] Acked command id={cid} result='{result_msg}'")
                # Send a fresh queue snapshot so UI can update immediately
                if QUEUE_INGEST_ENABLED:
                    try:
                        cfg = fetch_config(sid) or {}
                        if cfg.get("queueIngestEnabled", True):
                            stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
                            send_queue_snapshot(sid, stats)
                    except Exception as e:
                        print(f"[agent] Failed to send immediate queue snapshot: {e}")
            except Exception:
                pass
    except Exception as e:
        print(f"[agent] Error processing SSE event: {e}")
def listen_for_commands_sse(sid: int):
    """Listen for real-time commands via Server-Sent Events.
    Parses SSE frames and only processes 'command' events.
    """
    print(f"[agent] Starting SSE command listener for server {sid}")
    while True:
        try:
            url = f"{BASE_URL}/api/osm/agent/queue-stream/{sid}"
            sse_headers = dict(DEFAULT_HEADERS)
            sse_headers.setdefault('Accept', 'text/event-stream')
            response = session.get(url, headers=sse_headers, stream=True, timeout=None)
            response.raise_for_status()
            print(f"[agent] Connected to SSE stream at {url}")
            event_name = None
            data_lines = []
            for raw in response.iter_lines(decode_unicode=True):
                if raw is None:
                    continue
                line = raw.rstrip('\r')
                if not line:
                    # dispatch accumulated event
                    if data_lines:
                        data_payload = '\n'.join(data_lines)
                        if (event_name or '').strip().lower() in ('', 'command'):
                            try:
                                process_sse_event(data_payload, sid)
                            except Exception as e:
                                print(f"[agent] SSE event dispatch error: {e}")
                    event_name = None
                    data_lines = []
                    continue
                # field parsing
                if line.startswith('event:'):
                    event_name = line[6:].strip()
                elif line.startswith('data:'):
                    # tolerate optional leading space
                    val = line[5:]
                    if val.startswith(' '):
                        val = val[1:]
                    data_lines.append(val)
                else:
                    # ignore other fields (id:, retry:, etc)
                    pass
        except Exception as e:
            print(f"[agent] SSE connection error: {e}")
            time.sleep(5)  # Wait before reconnecting
SSE_LISTENER_THREAD = None
def start_sse_listener_thread(sid: int):
    """Start the SSE listener thread for real-time command delivery."""
    global SSE_LISTENER_THREAD
    if SSE_LISTENER_THREAD and SSE_LISTENER_THREAD.is_alive():
        return
    SSE_LISTENER_THREAD = threading.Thread(target=listen_for_commands_sse, args=(sid,), daemon=True)
    SSE_LISTENER_THREAD.start()
    print(f"[agent] SSE listener thread started for server {sid}")
# ---------- CLI Helpers ----------
def _cli_build_empty_stats():
    return {
        "totalMessages": 0,
        "activeMessages": 0,
        "deferredMessages": 0,
        "holdMessages": 0,
        "incomingMessages": 0,
        "outgoingMessages": 0,
        "queueDetails": [],
        "spamScores": [],
        "blockedDomains": []
    }
def _cli_load_queue_matches(sender_email: str | None = None, account: str | None = None):
    stats = get_exim_queue_stats(_cli_build_empty_stats())
    matches = []
    target_email = (sender_email or '').strip().lower()
    target_account = (account or '').strip().lower()
    for detail in stats.get("queueDetails", []):
        sender = (detail.get("sender") or "").lower()
        recipients = [r.lower() for r in detail.get("recipients") or []]
        owner = (detail.get("ownerAccount") or "").lower()
        if target_email:
            if target_email == sender or any(target_email == r for r in recipients):
                matches.append(detail)
        elif target_account:
            if owner == target_account or target_account in sender:
                matches.append(detail)
    return matches
def _cli_print_queue_summary(matches):
    if not matches:
        print("No queued messages found.")
        return
    print(f"Found {len(matches)} queued message(s):")
    for item in matches:
        qid = item.get("queueId") or item.get("messageId")
        subject = item.get("subject") or ""
        sender = item.get("sender") or ""
        recipient = ", ".join(item.get("recipients") or [])
        print(f"  - {qid}: from {sender} to {recipient} | {subject}")
def _cli_handle_queue_action(matches, choice):
    qids = [item.get("queueId") or item.get("messageId") for item in matches if item.get("queueId") or item.get("messageId")]
    qids = [qid for qid in qids if qid]
    if not qids:
        return None, []
    if choice == "release":
        released = _execute_queue_action_release(qids)
        print(f"Released {released}/{len(qids)} message(s) from queue.")
    elif choice == "delete":
        deleted = _execute_queue_action_delete(qids)
        print(f"Deleted {deleted}/{len(qids)} message(s) from queue.")
    return choice if choice in {"release", "delete"} else None, qids
def _cli_command_freeze(args):
    target_email = args.email
    target_account = args.account
    if not target_email and not target_account:
        print("Specify --email or --account to freeze.")
        return 1
    reason = args.reason or "CLI freeze"
    resolved_account = target_account or ""
    sender_for_record = target_email or target_account or ""
    if target_email:
        add_user_to_hold_list("", target_email, reason=reason)
        add_sender_to_freeze_list(target_email, reason=reason)
        print(f"Frozen email address: {target_email}")
    if target_account:
        add_user_to_hold_list(target_account, reason=reason)
        print(f"Frozen account: {target_account}")
    _queue_action({
        "type": "freeze",
        "senderEmail": sender_for_record,
        "account": resolved_account,
        "reason": reason,
        "timestamp": time.time(),
    })
    return 0
def _cli_command_unfreeze(args):
    target_email = args.email
    target_account = args.account
    if not target_email and not target_account:
        print("Specify --email or --account to unfreeze.")
        return 1
    matches = _cli_load_queue_matches(sender_email=target_email, account=target_account)
    _cli_print_queue_summary(matches)
    queue_choice = None
    if matches:
        if args.release and args.delete:
            print("Specify only one of --release or --delete.")
            return 1
        if args.release:
            queue_choice = "release"
        elif args.delete:
            queue_choice = "delete"
        else:
            print("Queue action options:")
            print("  1) Unfreeze only")
            print("  2) Unfreeze and release queued emails")
            print("  3) Unfreeze and delete queued emails")
            selected = input("Select option [1/2/3]: ").strip()
            queue_choice = {"1": None, "2": "release", "3": "delete"}.get(selected, None)
    queue_ids = []
    if target_email:
        remove_user_from_hold_list(sender_email=target_email)
        remove_sender_from_freeze_list(target_email)
        print(f"Unfrozen email address: {target_email}")
    if target_account:
        remove_user_from_hold_list(username=target_account)
        print(f"Unfrozen account: {target_account}")
    if queue_choice:
        queue_choice, queue_ids = _cli_handle_queue_action(matches, queue_choice)
    resolved_account = target_account or ""
    sender_for_record = target_email or target_account or ""
    _queue_action({
        "type": "unfreeze",
        "senderEmail": sender_for_record,
        "account": resolved_account,
        "queueAction": queue_choice or "none",
        "queueIds": queue_ids,
        "timestamp": time.time(),
    })
    return 0
def _cli_command_limit_set(args):
    account = args.account.strip()
    max_emails = int(args.max)
    per_seconds = int(args.per)
    enabled = not args.disable
    set_account_limit_local(account, max_emails, per_seconds, enabled)
    _queue_action({
        "type": "set_limit",
        "account": account,
        "maxEmails": max_emails,
        "perSeconds": per_seconds,
        "enabled": enabled,
        "timestamp": time.time(),
    })
    print(f"Set limit for account '{account}': {max_emails} emails / {per_seconds}s (enabled={enabled})")
    return 0
def _cli_command_limit_remove(args):
    account = args.account.strip()
    remove_account_limit_local(account)
    _queue_action({
        "type": "remove_limit",
        "account": account,
        "timestamp": time.time(),
    })
    print(f"Removed limit for account '{account}'.")
    return 0
def _cli_command_limit_default(args):
    max_emails = int(args.max)
    per_seconds = int(args.per)
    enabled = not args.disable
    set_default_limit_local(max_emails, per_seconds, enabled)
    _queue_action({
        "type": "set_default_limit",
        "maxEmails": max_emails,
        "perSeconds": per_seconds,
        "enabled": enabled,
        "timestamp": time.time(),
    })
    state = "enabled" if enabled else "disabled"
    print(f"Default limit {state}: {max_emails} emails / {per_seconds}s")
    return 0
def _cli_ensure_enrolled():
    sid = _get_current_server_id()
    if not sid:
        print("Warning: agent server ID not known yet. CLI changes will apply locally and sync once enrollment succeeds.")
def _cli_build_parser():
    parser = argparse.ArgumentParser(prog="osm", description="OSM Agent CLI")
    subparsers = parser.add_subparsers(dest="command")
    freeze_parser = subparsers.add_parser("freeze", help="Freeze an email address or account")
    freeze_target = freeze_parser.add_mutually_exclusive_group(required=True)
    freeze_target.add_argument("--email", help="Email address to freeze")
    freeze_target.add_argument("--account", help="Hosting account to freeze")
    freeze_parser.add_argument("--reason", help="Reason for freeze")
    freeze_parser.set_defaults(func=_cli_command_freeze)
    unfreeze_parser = subparsers.add_parser("unfreeze", help="Unfreeze an email address or account")
    unfreeze_target = unfreeze_parser.add_mutually_exclusive_group(required=True)
    unfreeze_target.add_argument("--email", help="Email address to unfreeze")
    unfreeze_target.add_argument("--account", help="Hosting account to unfreeze")
    unfreeze_parser.add_argument("--release", action="store_true", help="Release queued emails after unfreeze")
    unfreeze_parser.add_argument("--delete", action="store_true", help="Delete queued emails after unfreeze")
    unfreeze_parser.set_defaults(func=_cli_command_unfreeze)
    limit_parser = subparsers.add_parser("limit", help="Manage per-account or default limits")
    limit_sub = limit_parser.add_subparsers(dest="limit_command")
    limit_set = limit_sub.add_parser("set", help="Set/update limit for an account")
    limit_set.add_argument("--account", required=True, help="Hosting account name")
    limit_set.add_argument("--max", required=True, type=int, help="Max emails in window")
    limit_set.add_argument("--per", required=True, type=int, help="Time window in seconds")
    limit_set.add_argument("--disable", action="store_true", help="Disable (but retain values)")
    limit_set.set_defaults(func=_cli_command_limit_set)
    limit_remove = limit_sub.add_parser("remove", help="Remove limit for an account")
    limit_remove.add_argument("--account", required=True, help="Hosting account name")
    limit_remove.set_defaults(func=_cli_command_limit_remove)
    limit_default = limit_sub.add_parser("default", help="Set default (server-wide) limit")
    limit_default.add_argument("--max", required=True, type=int, help="Max emails in window")
    limit_default.add_argument("--per", required=True, type=int, help="Time window in seconds")
    limit_default.add_argument("--disable", action="store_true", help="Disable the default limit")
    limit_default.set_defaults(func=_cli_command_limit_default)
    return parser
def run_cli(argv=None):
    _cli_ensure_enrolled()
    parser = _cli_build_parser()
    args = parser.parse_args(argv)
    if not hasattr(args, "func"):
        parser.print_help()
        return 1
    return args.func(args)
def main():
    """Main agent loop"""
    global CURRENT_THRESHOLDS
    print("[agent] Starting OSM agent...")
    
    try:
        sid = None
        if BASE_URL and TOKEN:
            try:
                sid = enroll()
                try:
                    global CURRENT_SERVER_ID
                    CURRENT_SERVER_ID = sid
                except Exception:
                    pass
                try:
                    _save_agent_state({"serverId": str(sid)})
                except Exception:
                    pass
                print(f"[agent] Enrolled with server ID: {sid}")
                # Report agent version to server
                try:
                    session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
                except Exception as e:
                    print(f"[agent] Failed to report version: {e}")
                # Start SSE listener for real-time commands
                start_sse_listener_thread(sid)
            except Exception as e:
                print(f"[agent] Enrollment failed, continuing offline: {e}")
                try:
                    cached_sid = _get_current_server_id()
                    if cached_sid:
                        sid = cached_sid
                        CURRENT_SERVER_ID = cached_sid
                        # Start SSE listener with cached server ID
                        start_sse_listener_thread(sid)
                except Exception:
                    pass
        else:
            try:
                cached_sid = _get_current_server_id()
                if cached_sid:
                    sid = cached_sid
                    CURRENT_SERVER_ID = cached_sid
                    # Start SSE listener with cached server ID
                    start_sse_listener_thread(sid)
            except Exception:
                pass
        
        # Load config (online or cached)
        if sid is not None:
            config = fetch_config(sid)
        else:
            d = _read_limits_cache().get("default", {})
            config = {
                "mailQueuePath": "/var/spool/postfix",
                "checkInterval": 30,
                "spamThreshold": 5.0,
                "blockedDomainsJson": "[]",
                "alertThresholdsJson": "{}",
                "defaultMaxEmails": int(d.get('maxEmails') or 0),
                "defaultPerSeconds": int(d.get('perSeconds') or 0),
                "defaultEnabled": bool(d.get('enabled') or False)
            }
        print(f"[agent] Using config: {config}")
        
        # Apply default server-wide limit settings from config
        def apply_defaults_from_config(cfg: dict):
            try:
                global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
                DEFAULT_MAX_EMAILS = int(cfg.get('defaultMaxEmails') or 0)
                DEFAULT_PER_SECONDS = int(cfg.get('defaultPerSeconds') or 0)
                DEFAULT_ENABLED = bool(cfg.get('defaultEnabled') or False)
                print(f"[agent] Default limit: enabled={DEFAULT_ENABLED} max={DEFAULT_MAX_EMAILS} per={DEFAULT_PER_SECONDS}s")
            except Exception as e:
                print(f"[agent] Failed to apply default limit from config: {e}")
        apply_defaults_from_config(config)
        
        # Send accounts snapshot immediately on startup
        if sid is not None:
            try:
                send_accounts_snapshot(sid)
            except Exception as e:
                print(f"[agent] Initial accounts snapshot failed: {e}")
            try:
                # Prime local limits cache so log parsing can enforce account limits immediately
                fetch_account_limits(sid)
            except Exception as e:
                print(f"[agent] Initial account limits fetch failed: {e}")
        # Initialize thresholds and start near-real-time Exim monitor
        initial_thresholds = parse_thresholds(config)
        try:
            global CURRENT_THRESHOLDS
            with THRESHOLD_LOCK:
                CURRENT_THRESHOLDS = list(initial_thresholds)
        except Exception:
            pass
        try:
            process_exim_logs(initial_thresholds, sid)
            write_tracking_db(initial_thresholds)
        except Exception as e:
            print(f"[agent] Initial Exim scan failed: {e}")
        _ensure_exim_monitor_thread()
        
        loop_count = 0
        while True:
            try:
                loop_count += 1
                print(f"[agent] Starting loop iteration {loop_count}")
                # Periodically confirm version and liveness
                if sid is not None and loop_count % 5 == 0:
                    try:
                        session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
                    except Exception as e:
                        print(f"[agent] Version hello failed: {e}")
                
                _process_pending_actions()
                _flush_pending_reports()
                # Keep freeze state aligned after pushing any queued actions
                if sid is not None:
                    reconcile_freeze_file_with_server(sid)
                _sync_reports_with_server()
                # Periodically refresh config so default limits can change without restart
                if sid is not None and (loop_count % 3) == 0:
                    try:
                        config = fetch_config(sid)
                        apply_defaults_from_config(config)
                        new_thresholds = parse_thresholds(config)
                        try:
                            with THRESHOLD_LOCK:
                                CURRENT_THRESHOLDS = list(new_thresholds)
                        except Exception:
                            pass
                    except Exception as e:
                        print(f"[agent] Config refresh failed: {e}")
                stats = None
                if QUEUE_INGEST_ENABLED and config.get("queueIngestEnabled", True):
                    stats = get_mail_queue_stats(config["mailQueuePath"])
                    print(f"[agent] Got stats: {stats['totalMessages']} total messages")
                    if sid is not None:
                        send_queue_snapshot(sid, stats)
                    enforce_account_limits(sid, stats)
                else:
                    print("[agent] Queue ingestion disabled; skipping queue stats collection")
                # Periodically send accounts snapshot (every 5 loops)
                if sid is not None and loop_count % 5 == 1:
                    send_accounts_snapshot(sid)
                
                # Check for spam alerts
                if sid is not None and stats is not None:
                    check_spam_thresholds(sid, stats, config)
                if stats is not None:
                    print(f"[agent] Queue stats: {stats['totalMessages']} total, "
                          f"{stats['activeMessages']} active, "
                          f"{stats['deferredMessages']} deferred, "
                          f"{len(stats['spamScores'])} spam scores")
                else:
                    print("[agent] Queue stats unavailable (ingest disabled)")
                
                # Sleep for configured interval
                print(f"[agent] Sleeping for {config.get('checkInterval', 30)} seconds...")
                time.sleep(config.get("checkInterval", 30))
                
            except Exception as e:
                print(f"[agent] Error in main loop: {e}")
                time.sleep(30)
                
    except Exception as e:
        print(f"[agent] Fatal error: {e}")
        time.sleep(30)
        return
if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] not in {"agent", "--agent"}:
        sys.exit(run_cli(sys.argv[1:]))
    main()