File: //proc/self/root/opt/osm/main.py
#!/usr/bin/env python3
import argparse
import json
import os
import time
import subprocess
import re
import requests
from requests.adapters import HTTPAdapter
try:
# Retry support for transient 5xx via urllib3
from urllib3.util.retry import Retry # type: ignore
except Exception:
Retry = None
import sys
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict, deque
import shutil
import threading
import stat
from typing import Optional
def _load_env_file():
env_path = Path(__file__).resolve().parent / ".env"
if not env_path.exists():
return
try:
for line in env_path.read_text(encoding='utf-8').splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip("'\"")
if key and key not in os.environ:
os.environ[key] = value
except Exception:
pass
_load_env_file()
BASE_URL = os.environ.get("SERVER_BASE_URL")
TOKEN = os.environ.get("AGENT_TOKEN")
API_KEY = os.environ.get("OSM_API_KEY", "")
if not BASE_URL or not TOKEN:
print("[agent] Warning: SERVER_BASE_URL or AGENT_TOKEN not set. Running in offline mode.")
session = requests.Session()
# Configure light retries for transient gateway errors (e.g., tunneling hiccups)
try:
if Retry is not None:
retry = Retry(
total=2,
connect=1,
read=1,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
allowed_methods=frozenset(["GET", "POST"]),
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
except Exception:
pass
DEFAULT_HEADERS = {"X-Api-Key": API_KEY} if API_KEY else {}
def _env_flag(name, default=False):
val = os.environ.get(name)
if val is None:
return default
try:
return val.strip().lower() in {"1", "true", "yes", "on"}
except Exception:
return default
REQUEST_DEBUG_ENABLED = _env_flag("OSM_DEBUG_REQUESTS", False)
def _mask_headers(headers):
if not headers:
return {}
masked = {}
for key, value in headers.items():
if key.lower() == "x-api-key":
masked[key] = "***"
else:
masked[key] = value
return masked
def _truncate_payload(payload, limit=500):
if payload is None:
return None
try:
text = json.dumps(payload, default=str)
except Exception:
text = str(payload)
if len(text) > limit:
return text[:limit] + "...(truncated)"
return text
_original_request = session.request
def _logged_request(method, url, **kwargs):
method_upper = method.upper()
params_preview = _truncate_payload(kwargs.get("params"))
json_preview = _truncate_payload(kwargs.get("json"))
data_preview = _truncate_payload(kwargs.get("data"))
headers_preview = _mask_headers(kwargs.get("headers") or {})
timeout_value = kwargs.get("timeout")
if REQUEST_DEBUG_ENABLED:
try:
print(f"[agent] -> {method_upper} {url} params={params_preview} json={json_preview} data={data_preview} headers={headers_preview} timeout={timeout_value}")
except Exception as log_err:
print(f"[agent] Failed to log request pre-flight: {log_err}")
try:
response = _original_request(method, url, **kwargs)
except Exception as exc:
if REQUEST_DEBUG_ENABLED:
print(f"[agent] <- {method_upper} {url} raised {exc}")
else:
print(f"[agent] HTTP {method_upper} {url} failed: {exc}")
raise
if REQUEST_DEBUG_ENABLED:
try:
body_preview = response.text
if body_preview and len(body_preview) > 500:
body_preview = body_preview[:500] + "...(truncated)"
print(f"[agent] <- {method_upper} {url} status={response.status_code} body={body_preview}")
except Exception as log_err:
print(f"[agent] Failed to log response: {log_err}")
return response
session.request = _logged_request
# ---------- OSM-like tracking DB and Exim log processing (local on agent) ----------
# Global default limit (from server config)
DEFAULT_MAX_EMAILS: int = 0
DEFAULT_PER_SECONDS: int = 0
DEFAULT_ENABLED: bool = False
QUEUE_INGEST_ENABLED: bool = True
FREEZE_REPORTS_ENABLED: bool = True
IPDB_LOOKUP_ENABLED: bool = True
# Paths
AGENT_DIR = Path(__file__).resolve().parent
TRACKING_DB_PATH = AGENT_DIR / 'osm_tracking.db'
EXIM_LOG_STATE_PATH = AGENT_DIR / 'exim_log.state'
LIMITS_CACHE_PATH = AGENT_DIR / 'user_limits.json'
ACTIONS_QUEUE_PATH = AGENT_DIR / 'pending_actions.json'
AGENT_STATE_PATH = AGENT_DIR / 'agent_state.json'
# OSM-style integration files (as used by OSM on cPanel/DirectAdmin servers)
VIRTUAL_DIR = Path('/etc/virtual')
OSM_HOLD_FILE = VIRTUAL_DIR / 'osm_hold' # usernames to freeze/hold
OSM_DISABLE_FILE = VIRTUAL_DIR / 'osm_disable' # usernames to discard
# Legacy/alternate freeze list for generic exim setups (optional)
FREEZE_LIST_PATH = Path('/etc/exim/osm_freeze_senders')
# In-memory counters: key -> deque[timestamps]
# key is a tuple: (account_ref: str, trigger_name: str, window_seconds: int)
LOG_COUNTERS = defaultdict(deque)
LAST_TRIGGERED = {}
LAST_LIMIT_TRIGGERED = {} # Tracks last freeze time for per-account/default limits: owner_key -> timestamp
ACCOUNT_LIMIT_WINDOWS = defaultdict(deque) # account -> deque[timestamps]
# Last seen subject per account_ref
LAST_SUBJECT_BY_ACCOUNT = {}
# Deduplication of log-based events by message-id
SEEN_LOG_MSG_IDS = deque()
SEEN_LOG_MSG_IDS_SET = set()
SEEN_LOG_MSG_IDS_RETENTION_SECS = 600 # 10 minutes
# Deduplication of per-recipient deliveries by (msgid, recipient)
SEEN_MSG_RCPT = deque()
SEEN_MSG_RCPT_SET = set()
SEEN_MSG_RCPT_RETENTION_SECS = 600
# Retain counters in DB for some time after last event
LAST_COUNTER_SEEN_TS = {}
COUNTER_RETENTION_SECS = 7200
# Per-message caches
MSG_ID_TO_ACCOUNT = {}
MSG_ID_TO_SUBJECT = {}
# Recent recipients per account_ref with last-seen timestamps
RECIPIENTS_BY_ACCOUNT = {}
RECIPIENT_RETENTION_SECS = 7200
# Domain -> owner cache (cpanel/directadmin)
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD = 0.0
DOMAIN_OWNER_TTL_SECS = 300
# Per-message metadata
MSG_ID_TO_CLIENT = {}
MSG_ID_TO_RELAY = {}
MSG_ID_TO_OWNER = {}
MSG_ID_TO_RCPTS = {}
MSG_ID_TO_RELAY_TYPE = {}
# Per-owner recent events (delivery granularity)
ACCOUNT_EVENTS = defaultdict(deque)
ACCOUNT_EVENTS_RETENTION_SECS = 7200
MAX_LOG_LOOKBACK_SECS = 3 * 3600 # Ignore historical log entries older than 3 hours
REPORTS_DIR = Path('/opt/osm/reports')
PENDING_REPORTS_DIR = REPORTS_DIR / 'pending'
LAST_LOCAL_HOLD_UPDATE_TS = 0.0
CURRENT_SERVER_ID = None
THRESHOLD_LOCK = threading.Lock()
CURRENT_THRESHOLDS = []
EXIM_MONITOR_THREAD = None
EXIM_MONITOR_INTERVAL_SECS = 2.0
try:
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
except Exception:
pass
def parse_thresholds(config: dict):
"""Return list of threshold dicts: [{events:int, seconds:int, name:str, type:str}]."""
try:
default_events = int(config.get('defaultMaxEmails') or 50)
except Exception:
default_events = 50
try:
default_seconds = int(config.get('defaultPerSeconds') or 300)
except Exception:
default_seconds = 300
if default_events <= 0:
default_events = 50
if default_seconds <= 0:
default_seconds = 300
default_thresholds = [
{"events": default_events, "seconds": default_seconds, "name": "default_logline", "type": "logline"}
]
try:
raw = config.get("alertThresholdsJson")
if not raw:
return default_thresholds
data = raw
if isinstance(raw, str):
try:
data = json.loads(raw)
except Exception:
return default_thresholds
# Accept either {"thresholds": [...]} or {"loglineThresholds": [...]} or a list directly
if isinstance(data, dict):
thr = data.get("thresholds") or data.get("loglineThresholds")
elif isinstance(data, list):
thr = data
else:
thr = None
if not thr:
return default_thresholds
out = []
for i, t in enumerate(thr, start=1):
try:
events = int(t.get("events"))
seconds = int(t.get("seconds"))
name = (t.get("name") or f"trigger{i}")
typ = (t.get("type") or "logline")
if events > 0 and seconds > 0:
out.append({"events": events, "seconds": seconds, "name": name, "type": typ})
except Exception:
continue
return out or default_thresholds
except Exception:
return default_thresholds
def _load_log_state(log_path: Path):
try:
if not EXIM_LOG_STATE_PATH.exists():
return {"path": str(log_path), "ino": None, "pos": 0}
data = json.loads(EXIM_LOG_STATE_PATH.read_text())
if data.get("path") != str(log_path):
return {"path": str(log_path), "ino": None, "pos": 0}
return {"path": str(log_path), "ino": data.get("ino"), "pos": int(data.get("pos", 0))}
except Exception:
return {"path": str(log_path), "ino": None, "pos": 0}
def _save_log_state(state: dict):
try:
EXIM_LOG_STATE_PATH.write_text(json.dumps(state))
except Exception as e:
print(f"[agent] Error saving log state: {e}")
def _snapshot_thresholds():
with THRESHOLD_LOCK:
if CURRENT_THRESHOLDS:
return [dict(t) for t in CURRENT_THRESHOLDS]
return parse_thresholds({})
def _exim_monitor_loop():
print(f"[agent] Exim monitor thread started (interval={EXIM_MONITOR_INTERVAL_SECS}s)")
while True:
try:
thresholds = _snapshot_thresholds()
process_exim_logs(thresholds, CURRENT_SERVER_ID)
write_tracking_db(thresholds)
except Exception as e:
print(f"[agent] Exim monitor error: {e}")
time.sleep(EXIM_MONITOR_INTERVAL_SECS)
def _ensure_exim_monitor_thread():
global EXIM_MONITOR_THREAD
if EXIM_MONITOR_THREAD and EXIM_MONITOR_THREAD.is_alive():
return
EXIM_MONITOR_THREAD = threading.Thread(target=_exim_monitor_loop, daemon=True)
EXIM_MONITOR_THREAD.start()
def _parse_exim_timestamp(line: str) -> float:
"""Parse leading timestamp from an Exim log line; fallback to current time."""
try:
ts_str = line[:19]
if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", ts_str):
try:
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
return time.mktime(dt.timetuple())
except Exception:
pass
except Exception:
pass
return time.time()
# --- Inserted helpers for enrollment, offline cache, and owner resolution ---
def enroll():
"""Enroll this agent with the server and return serverId."""
resp = session.post(f"{BASE_URL}/api/osm/agent/enroll", json={"token": TOKEN}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
return resp.json()["serverId"]
def _read_limits_cache() -> dict:
try:
if LIMITS_CACHE_PATH.exists():
return json.loads(LIMITS_CACHE_PATH.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Error reading limits cache: {e}")
return {"default": {"enabled": False, "maxEmails": 0, "perSeconds": 0}, "accounts": []}
def _write_limits_cache(cache: dict):
try:
LIMITS_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
LIMITS_CACHE_PATH.write_text(json.dumps(cache, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[agent] Error writing limits cache: {e}")
def _load_agent_state() -> dict:
try:
if AGENT_STATE_PATH.exists():
return json.loads(AGENT_STATE_PATH.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Error reading agent state: {e}")
return {}
def _save_agent_state(state: dict):
try:
AGENT_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
AGENT_STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[agent] Error writing agent state: {e}")
def _get_current_server_id() -> str | None:
try:
if CURRENT_SERVER_ID is not None:
return str(CURRENT_SERVER_ID)
except Exception:
pass
state = _load_agent_state()
sid = state.get("serverId")
return str(sid) if sid else None
def _read_actions_queue() -> list:
try:
if ACTIONS_QUEUE_PATH.exists():
return json.loads(ACTIONS_QUEUE_PATH.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Error reading actions queue: {e}")
return []
def _write_actions_queue(actions: list):
try:
ACTIONS_QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True)
ACTIONS_QUEUE_PATH.write_text(json.dumps(actions, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[agent] Error writing actions queue: {e}")
def _queue_action(action: dict):
try:
actions = _read_actions_queue()
action = dict(action)
action.setdefault("queuedAt", time.time())
actions.append(action)
_write_actions_queue(actions)
except Exception as e:
print(f"[agent] Failed to queue action: {e}")
def set_account_limit_local(account: str, max_emails: int, per_seconds: int, enabled: bool = True):
try:
account = (account or '').strip()
if not account:
return
cache = _read_limits_cache()
accounts = cache.get('accounts') or []
updated = False
for entry in accounts:
if entry.get('account', '').strip().lower() == account.lower():
entry['maxEmails'] = int(max_emails)
entry['perSeconds'] = int(per_seconds)
entry['enabled'] = bool(enabled)
updated = True
break
if not updated:
accounts.append({
"account": account,
"maxEmails": int(max_emails),
"perSeconds": int(per_seconds),
"enabled": bool(enabled)
})
cache['accounts'] = accounts
_write_limits_cache(cache)
except Exception as e:
print(f"[agent] Failed to set local account limit: {e}")
def remove_account_limit_local(account: str):
try:
account = (account or '').strip()
if not account:
return
cache = _read_limits_cache()
accounts = cache.get('accounts') or []
new_accounts = [entry for entry in accounts if entry.get('account', '').strip().lower() != account.lower()]
cache['accounts'] = new_accounts
_write_limits_cache(cache)
except Exception as e:
print(f"[agent] Failed to remove local account limit: {e}")
def set_default_limit_local(max_emails: int, per_seconds: int, enabled: bool = True):
try:
cache = _read_limits_cache()
cache['default'] = {
"enabled": bool(enabled),
"maxEmails": int(max_emails),
"perSeconds": int(per_seconds)
}
_write_limits_cache(cache)
try:
global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
DEFAULT_MAX_EMAILS = int(max_emails)
DEFAULT_PER_SECONDS = int(per_seconds)
DEFAULT_ENABLED = bool(enabled)
except Exception:
pass
except Exception as e:
print(f"[agent] Failed to set local default limit: {e}")
def fetch_config(sid: int):
"""Fetch monitoring configuration from server and persist defaults; fallback to cache offline."""
try:
resp = session.get(f"{BASE_URL}/api/osm/agent/config", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
cfg = resp.json()
cfg.setdefault("ipdbLookupEnabled", True)
cfg.setdefault("queueIngestEnabled", True)
cfg.setdefault("freezeReportsEnabled", True)
cache = _read_limits_cache()
cache["default"] = {
"enabled": bool(cfg.get('defaultEnabled') or False),
"maxEmails": int(cfg.get('defaultMaxEmails') or 0),
"perSeconds": int(cfg.get('defaultPerSeconds') or 0)
}
_write_limits_cache(cache)
try:
global QUEUE_INGEST_ENABLED, FREEZE_REPORTS_ENABLED, IPDB_LOOKUP_ENABLED
QUEUE_INGEST_ENABLED = bool(cfg.get('queueIngestEnabled', True))
FREEZE_REPORTS_ENABLED = bool(cfg.get('freezeReportsEnabled', True))
IPDB_LOOKUP_ENABLED = bool(cfg.get('ipdbLookupEnabled', True))
print(f"[agent] Feature toggles: queue={QUEUE_INGEST_ENABLED} freeze={FREEZE_REPORTS_ENABLED} ipdb={IPDB_LOOKUP_ENABLED}")
except Exception as toggle_err:
print(f"[agent] Failed to apply feature toggles: {toggle_err}")
return cfg
except Exception as e:
print(f"[agent] Error fetching config: {e}")
d = _read_limits_cache().get("default", {})
return {
"mailQueuePath": "/var/spool/postfix",
"checkInterval": 30,
"spamThreshold": 5.0,
"blockedDomainsJson": "[]",
"alertThresholdsJson": "{}",
"defaultMaxEmails": int(d.get('maxEmails') or 0),
"defaultPerSeconds": int(d.get('perSeconds') or 0),
"defaultEnabled": bool(d.get('enabled') or False),
"ipdbLookupEnabled": IPDB_LOOKUP_ENABLED,
"queueIngestEnabled": QUEUE_INGEST_ENABLED,
"freezeReportsEnabled": FREEZE_REPORTS_ENABLED
}
def _extract_email(text):
try:
m = re.search(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+)", text or '')
return m.group(1).lower() if m else None
except Exception:
return None
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD: float = 0.0
DOMAIN_OWNER_TTL_SECS = 300
def _load_domain_owner_map():
global DOMAIN_OWNER_LAST_LOAD
now = time.time()
if DOMAIN_OWNER_MAP and (now - DOMAIN_OWNER_LAST_LOAD) < DOMAIN_OWNER_TTL_SECS:
return
try:
DOMAIN_OWNER_MAP.clear()
# cPanel: /etc/trueuserdomains format: domain: user
p = Path('/etc/trueuserdomains')
if p.exists():
for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# cPanel alternate mapping file /etc/userdomains
p3 = Path('/etc/userdomains')
if p3.exists():
for line in p3.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# DirectAdmin: /etc/virtual/domainowners
p2 = Path('/etc/virtual/domainowners')
if p2.exists():
for line in p2.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# DirectAdmin global domainowners file
da_owners = Path('/usr/local/directadmin/data/admin/domainowners')
if da_owners.exists():
for line in da_owners.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# DirectAdmin per-user domain lists
da_users_root = Path('/usr/local/directadmin/data/users')
if da_users_root.exists():
for user_dir in da_users_root.iterdir():
if not user_dir.is_dir():
continue
user = user_dir.name.strip().lower()
try:
dom_list = user_dir / 'domains.list'
if dom_list.exists():
for dom in dom_list.read_text(encoding='utf-8', errors='ignore').splitlines():
dom = (dom or '').strip().lower()
if dom:
DOMAIN_OWNER_MAP[dom] = user
except Exception:
continue
DOMAIN_OWNER_LAST_LOAD = now
except Exception as e:
print(f"[agent] Failed loading domain owners: {e}")
def _resolve_owner_from_email(email_addr):
try:
_load_domain_owner_map()
email_addr = (email_addr or '').strip().lower()
if '@' not in email_addr:
return None
local_part, domain = email_addr.split('@', 1)
domain = domain.rstrip('.')
parts = domain.split('.')
for i in range(len(parts)-1):
d = '.'.join(parts[i:])
owner = DOMAIN_OWNER_MAP.get(d)
if owner:
return owner
owner = DOMAIN_OWNER_MAP.get(domain)
if owner:
return owner
# Fallback: inspect virtual mailbox passwd files to infer owner
owner = _lookup_owner_from_virtual_passwd(local_part, domain)
if owner:
DOMAIN_OWNER_MAP[domain] = owner
return owner
return None
except Exception:
return None
def _lookup_owner_from_virtual_passwd(local_part: str, domain: str):
"""Inspect /etc/virtual/<domain>/passwd files to determine owning account."""
try:
local_part = (local_part or '').strip().lower()
domain = (domain or '').strip().lower()
if not local_part or not domain:
return None
domains_to_try = []
if domain:
domains_to_try.append(domain)
parts = domain.split('.')
for i in range(1, len(parts)-1):
parent = '.'.join(parts[i:])
if parent not in domains_to_try:
domains_to_try.append(parent)
for extra in ('lists', 'default'):
if extra not in domains_to_try:
domains_to_try.append(extra)
for dom in domains_to_try:
vpath = Path('/etc/virtual') / dom / 'passwd'
if not vpath.exists():
continue
try:
for line in vpath.read_text(encoding='utf-8', errors='ignore').splitlines():
if not line or line.startswith('#'):
continue
parts = line.split(':')
if not parts:
continue
account_field = parts[0].strip().lower()
if account_field != local_part and account_field != f"{local_part}@{dom}":
continue
# Attempt to derive owner from home/maildir path within the line
for token in parts[1:]:
if '/home/' in token:
m = re.search(r'/home/([^/]+)/', token)
if m:
return m.group(1).strip().lower()
# Sometimes the maildir path is at the end
m = re.search(r'/home/([^/]+)/', line)
if m:
return m.group(1).strip().lower()
except Exception:
continue
except Exception:
pass
return None
def _resolve_limit_account_key(account_ref: str) -> str:
"""Map an account_ref to the hosting account owner key used by limits."""
try:
# Try to extract email first (sender part before '(' or in the string)
email = None
if '(' in account_ref:
email = account_ref.split('(', 1)[0].strip()
if not email:
email = _extract_email(account_ref)
owner = _resolve_owner_from_email(email) if email else None
if owner:
return owner
# Fallback: local-part or username
if '(' in account_ref and account_ref.endswith(')'):
inside = account_ref[account_ref.rfind('(')+1:-1]
if inside:
return inside.lower()
m = re.match(r"([^@]+)@", account_ref)
if m:
return m.group(1).lower()
return account_ref.strip().lower()
except Exception:
return account_ref.strip().lower()
def extract_username_from_account_ref(account_ref: str) -> str:
try:
if '(' in account_ref and account_ref.endswith(')'):
return account_ref[account_ref.rfind('(')+1:-1].strip().lower()
m = re.match(r"([^@]+)@", account_ref)
if m:
return m.group(1).strip().lower()
return account_ref.strip().lower()
except Exception:
return account_ref.strip().lower()
def fetch_freeze_list(sid: int):
try:
resp = session.get(f"{BASE_URL}/api/osm/agent/freeze-list", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
return resp.json() or []
except Exception as e:
print(f"[agent] Error fetching freeze list: {e}")
# Return None to indicate remote unavailable so we do not wipe local hold file
return None
def reconcile_freeze_file_with_server(sid: int, prefer_server: bool = False):
try:
# Grace period after local updates to avoid wiping local holds before server reflects them
# Skip the grace period when server explicitly requests reconciliation (prefer_server=True)
try:
if not prefer_server and time.time() - LAST_LOCAL_HOLD_UPDATE_TS < 30:
return
except Exception:
pass
server_items = fetch_freeze_list(sid)
if server_items is None:
# Server unreachable; do not modify local hold file
return
# Build server set of entries (username or sender email)
server_set = set()
for it in server_items:
if not it.get('active', True):
continue
account = (it.get('account') or '').strip()
sender = (it.get('senderEmail') or '').strip()
if account:
server_set.add(account.lower())
elif sender:
server_set.add(sender.lower())
_ensure_virtual_files()
local_set = set()
try:
for line in OSM_HOLD_FILE.read_text().splitlines():
s = (line or '').strip().lower()
if s:
local_set.add(s)
except Exception:
pass
if prefer_server:
# Server-wins mode (e.g., after an app-initiated unfreeze): adopt server state and do NOT push local-only entries
desired = server_set
if desired != local_set:
with OSM_HOLD_FILE.open('w') as f:
for u in sorted(desired):
f.write(u + "\n")
_reload_exim()
print(f"[agent] Reconciled (server-wins). Server={len(server_set)} local={len(local_set)} applied={len(desired)}")
else:
# Merge mode (agent-wins for offline changes): push local-only entries to server then write union locally
to_push = sorted(list(local_set - server_set))
for val in to_push:
try:
# Try to preserve the original freeze reason if we can infer it
freeze_reason = _find_freeze_reason(val)
payload = {"serverId": str(sid), "reason": freeze_reason}
if '@' in val:
payload["senderEmail"] = val
payload["account"] = ""
else:
payload["senderEmail"] = ""
payload["account"] = val
session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
except Exception:
# Leave in local_set; future loops will retry via pending actions or here
pass
# Compute union as desired hold file content
desired = server_set | local_set
if desired != local_set:
with OSM_HOLD_FILE.open('w') as f:
for u in sorted(desired):
f.write(u + "\n")
_reload_exim()
print(f"[agent] Reconciled (merge). Server={len(server_set)} local={len(local_set)} merged={len(desired)}")
except Exception as e:
print(f"[agent] Reconcile freeze list failed: {e}")
def _find_freeze_reason(identifier: str) -> str:
"""Best-effort recovery of the original freeze reason for a given account/email.
Checks queued actions first, then the most recent local report, else returns a safe default.
"""
try:
ident = (identifier or '').strip().lower()
if not ident:
return 'Manual freeze'
# Look into pending actions (latest first) for a matching freeze entry
try:
actions = _read_actions_queue()
for action in reversed(actions):
if (action.get('type') or '') != 'freeze':
continue
s = (action.get('senderEmail') or '').strip().lower()
a = (action.get('account') or '').strip().lower()
if ident == s or (a and ident == a):
r = (action.get('reason') or '').strip()
if r:
return r
except Exception:
pass
# Search most recent report that matches sender/account
try:
if REPORTS_DIR.exists():
files = sorted(REPORTS_DIR.glob('*.txt'), key=lambda p: p.stat().st_mtime, reverse=True)
for path in files:
parsed = _parse_report_file(path)
if not parsed:
continue
s = (parsed.get('senderEmail') or '').strip().lower()
a = (parsed.get('account') or '').strip().lower()
if ident == s or (a and ident == a):
r = (parsed.get('reason') or '').strip()
if r:
return r
except Exception:
pass
return 'Manual freeze'
except Exception:
return 'Manual freeze'
def get_exim_log_path() -> Path:
"""Return best-known Exim main log path for this system."""
try:
candidates = []
try:
if detect_control_panel() == 'cpanel':
candidates.append(Path('/var/log/exim_mainlog'))
except Exception:
pass
candidates.extend([
Path('/var/log/exim/mainlog'),
Path('/var/log/exim4/mainlog'),
Path('/var/log/exim_mainlog'),
])
for p in candidates:
try:
if p.exists() and p.is_file():
return p
except Exception:
continue
except Exception:
pass
return Path('/var/log/exim/mainlog')
def process_exim_logs(thresholds, sid=None):
"""Incrementally parse Exim mainlog and update LOG_COUNTERS."""
log_path = get_exim_log_path()
try:
if not log_path.exists():
return
st = log_path.stat()
state = _load_log_state(log_path)
# Handle rotation or first run
if state.get("ino") != st.st_ino or state.get("pos", 0) > st.st_size:
state = {"path": str(log_path), "ino": st.st_ino, "pos": 0}
with log_path.open('r', encoding='utf-8', errors='ignore') as f:
f.seek(state.get("pos", 0))
new_data = f.read()
state["pos"] = f.tell()
_save_log_state(state)
now_ts = time.time()
# Parse new lines
for line in new_data.splitlines():
try:
line_ts = _parse_exim_timestamp(line)
if now_ts - line_ts > MAX_LOG_LOOKBACK_SECS:
continue
if line_ts - now_ts > 300:
line_ts = now_ts
# Record subject per account where available
if ' <= ' in line or ' =>' in line:
msgid = _extract_message_id_from_exim_line(line) or ''
subj = _get_subject_for_message_id(msgid)
if subj:
acct = _extract_account_ref_from_exim_line(line)
if acct:
LAST_SUBJECT_BY_ACCOUNT[acct] = subj
_update_counters_from_exim_line(line, thresholds, line_ts)
except Exception:
continue
# Cleanup old dedupe entries
cutoff = time.time() - SEEN_LOG_MSG_IDS_RETENTION_SECS
while SEEN_LOG_MSG_IDS and SEEN_LOG_MSG_IDS[0][1] < cutoff:
mid, _ = SEEN_LOG_MSG_IDS.popleft()
SEEN_LOG_MSG_IDS_SET.discard(mid)
cutoff_rcpt = time.time() - SEEN_MSG_RCPT_RETENTION_SECS
while SEEN_MSG_RCPT and SEEN_MSG_RCPT[0][1] < cutoff_rcpt:
key, _ = SEEN_MSG_RCPT.popleft()
SEEN_MSG_RCPT_SET.discard(key)
except Exception as e:
print(f"[agent] Error processing exim logs: {e}")
def _extract_message_id_from_exim_line(line):
"""Extract the Exim queue id token that appears before '<=' on submission lines."""
try:
# Match the token immediately before '<='
m = re.search(r"\b([A-Za-z0-9-]{6,})\b\s+<=", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _extract_message_id_from_any_line(line):
"""Extract the Exim queue id from general log lines that start with timestamp then id."""
try:
m = re.search(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} ([A-Za-z0-9-]{6,})\b", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _normalize_host(host: str) -> str:
try:
if not host:
return ''
h = host.strip()
if h.startswith('[') and h.endswith(']'):
h = h[1:-1]
return h
except Exception:
return host or ''
def _extract_client_host_ip_from_line(line: str) -> tuple[str, str]:
host = ''
ip = ''
try:
m_parent = re.search(r"\bH=\(([^)]*)\)", line)
if m_parent:
host = (m_parent.group(1) or '').strip()
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
m_after = re.search(r"\)\s*\[([^\]]+)\]", line)
if m_after:
ip = (m_after.group(1) or '').strip()
else:
m_host = re.search(r"\bH=([^\s]+)", line)
if m_host:
host = (m_host.group(1) or '').strip()
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
m_ip = re.search(r"\bH=[^\n]*?\[([^\]]+)\]", line)
if m_ip:
ip = (m_ip.group(1) or '').strip()
bracket_values = re.findall(r"\[([^\]]+)\]", line)
if not host and bracket_values:
host = bracket_values[0].strip()
if not ip and bracket_values:
ip = bracket_values[-1].strip()
return _normalize_host(host), ip
except Exception:
return '', ''
def _resolve_client_details(msg_id: str, client_dict: dict) -> tuple[str, str]:
try:
host = (client_dict or {}).get('h', '') or ''
ip = (client_dict or {}).get('ip', '') or ''
if (not host or host == '-') or (not ip or ip == '-'):
fallback = MSG_ID_TO_CLIENT.get(msg_id) or {}
if not host or host == '-':
host = fallback.get('h', host) or host
if not ip or ip == '-':
ip = fallback.get('ip', ip) or ip
host = _normalize_host(host)
ip = (ip or '').strip()
# strip any port if accidentally present
if ip and ':' in ip:
ip = ip.split(':', 1)[0]
return host, ip
except Exception:
return '', ''
def _client_info_from_headers(headers: str) -> tuple[str, str]:
host = ''
ip = ''
try:
if not headers:
return host, ip
m_ip = re.search(r"-host_address\s+\[([^\]]+)\]", headers)
if m_ip:
ip = (m_ip.group(1) or '').strip()
m_host = re.search(r"--helo_name\s+([^\s]+)", headers)
if m_host:
host = _normalize_host(m_host.group(1))
if not host:
m_recv = re.search(r"Received: from ([^\s]+)", headers)
if m_recv:
host = _normalize_host(m_recv.group(1))
except Exception:
pass
return host, ip
def _fallback_events_from_queue_detail(detail: dict, sender_email: str, owner_key: str):
events = []
try:
recipients = detail.get('recipients') or []
if not recipients:
rec = detail.get('recipient') or ''
if rec:
recipients = [rec]
recipients = [r for r in recipients if r]
subject = detail.get('subject') or ''
headers = detail.get('headers') or detail.get('status') or ''
client_h, client_ip = _client_info_from_headers(headers)
qid = detail.get('queueId') or detail.get('messageId') or ''
ts_now = time.time()
base_sender = sender_email or detail.get('sender') or owner_key
if not recipients:
recipients = ['']
for rcpt in recipients[:50]:
events.append({
'ts': ts_now,
'sender': base_sender,
'recipient': rcpt,
'subject': subject,
'client': {'h': client_h, 'ip': client_ip},
'relay': {},
'relayType': 'unknown',
'qid': qid,
})
except Exception:
pass
return events
def _has_recent_log_events(owner_key: str, sender_email: str, window_secs: int) -> bool:
try:
if not owner_key or not sender_email:
return False
events = ACCOUNT_EVENTS.get(owner_key)
if not events:
return False
cutoff = time.time() - max(window_secs, 0)
sender_lc = sender_email.strip().lower()
for ev in events:
if ev.get('ts', 0) >= cutoff and (ev.get('sender', '') or '').strip().lower() == sender_lc:
return True
return False
except Exception:
return False
def _should_hold_owner(owner_key: str, sender_email: str) -> bool:
try:
if not owner_key:
return False
events = ACCOUNT_EVENTS.get(owner_key)
if not events:
return False
sender_lc = (sender_email or '').strip().lower()
for ev in reversed(events):
ev_sender = (ev.get('sender', '') or '').strip().lower()
if sender_lc and ev_sender != sender_lc:
continue
relay_type = (ev.get('relayType') or '').lower()
if relay_type == 'local':
return True
if sender_lc:
return False
return False
except Exception:
return False
def _collect_events_for_report(owner_key: str, sender_email: str, window_secs: int):
"""Collect all events for owner within window (not filtered by sender).
The limit applies to the whole account, so we need all events to show why the limit
was exceeded, not just events from one specific sender address.
"""
try:
if not owner_key:
return []
evs_src = list(ACCOUNT_EVENTS.get(owner_key, deque()))
if not evs_src:
return []
cutoff_ts = time.time() - max(window_secs, 0)
# Collect ALL events for this owner within the window, not just one sender
# The limit count is per-account, so report should show all contributing events
events = [ev for ev in evs_src if ev.get('ts', 0) >= cutoff_ts]
events.sort(key=lambda e: e.get('ts', 0))
return events
except Exception:
return []
def _build_report_text(owner_key: str, sender_email: str, reason_text: str,
window_secs: int, limit_label: str, events_window: list) -> str:
lines = []
lines.append("Freeze Report\n")
lines.append(f"Reason: {reason_text}\n")
lines.append(f"Owner: {owner_key}\n")
lines.append(f"Sender: {sender_email}\n")
try:
types = [(ev.get('relayType') or 'unknown') for ev in events_window]
relay_summary = 'auth' if 'auth' in types else ('local' if 'local' in types else 'unknown')
lines.append(f"Relay: {relay_summary}\n")
except Exception:
pass
_append_client_summary(lines, events_window)
try:
srcs = []
seen = set()
for ev in events_window:
c = ev.get('client', {}) or {}
ch = c.get('h', '') or ''
cip = c.get('ip', '') or ''
label = f"{ch}[{cip}]" if cip or ch else ''
if label and label not in seen:
seen.add(label)
srcs.append(label)
if len(srcs) >= 5:
break
if srcs:
lines.append(f"Client sources: {', '.join(srcs)}\n")
except Exception:
pass
limit_text = limit_label or 'Limit: n/a'
lines.append(f"Window: last {window_secs}s; {limit_text}\n")
lines.append(f"Events counted in window: {len(events_window)}\n")
lines.append("\nEvents:\n")
if events_window:
for ev in events_window[-200:]:
ts_iso = datetime.utcfromtimestamp(ev.get('ts', 0)).isoformat()
rcpt = ev.get('recipient', '')
subj = ev.get('subject', '')
client = ev.get('client', {})
relay = ev.get('relay', {})
client_h, client_ip = _resolve_client_details(ev.get('qid', ''), client)
relay_ip = relay.get('ip', '')
relay_h = relay.get('h', '')
rtype = (ev.get('relayType') or '').lower()
relay_str = 'local' if rtype == 'local' else f"{relay_h}[{relay_ip}]"
lines.append(f"- {ts_iso} to={rcpt} subj=\"{subj}\" client={client_h}[{client_ip}] relay={relay_str} qid={ev.get('qid', '')}\n")
else:
lines.append("- No detailed events captured for this window.\n")
return ''.join(lines)
def _save_report_locally(owner_key: str, sender_email: str, reason_text: str,
report_text: str, sid):
"""Save freeze report locally. Reports are fetched on-demand by the app."""
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
now_dt = datetime.utcnow()
now_str = now_dt.strftime('%Y%m%d-%H%M%S')
safe_sender = (sender_email or 'unknown').replace('@', '_')
safe_owner = (owner_key or 'unknown').replace('@', '_')
fname = f"{now_str}-{safe_owner}-{safe_sender}.txt"
fpath = REPORTS_DIR / fname
fpath.write_text(report_text)
print(f"[agent] Freeze report saved locally: {fname}")
def _generate_freeze_report(account_ref: str, owner_key: str, sender_email: str,
reason_text: str, window_secs: int, limit_label: str,
sid, fallback_events: Optional[list] = None):
if not FREEZE_REPORTS_ENABLED:
print("[agent] Freeze report generation disabled; skipping report")
return
try:
events_window = _collect_events_for_report(owner_key, sender_email, window_secs)
if not events_window and fallback_events:
events_window = list(fallback_events)
try:
acct_events = ACCOUNT_EVENTS.setdefault(owner_key, deque())
for ev in fallback_events:
acct_events.append(ev)
except Exception:
pass
report_text = _build_report_text(owner_key, sender_email, reason_text, window_secs, limit_label, events_window)
_save_report_locally(owner_key, sender_email, reason_text, report_text, sid)
except Exception as e:
print(f"[agent] Failed to write report: {e}")
def _parse_report_file(path: Path):
try:
text = path.read_text(encoding='utf-8')
except Exception as e:
print(f"[agent] Failed reading report {path.name}: {e}")
return None
reason_text = ""
owner_text = ""
sender_text = ""
try:
m_reason = re.search(r"^Reason:\s*(.+)$", text, re.MULTILINE)
if m_reason:
reason_text = m_reason.group(1).strip()
m_owner = re.search(r"^Owner:\s*(.+)$", text, re.MULTILINE)
if m_owner:
owner_text = m_owner.group(1).strip()
m_sender = re.search(r"^Sender:\s*(.+)$", text, re.MULTILINE)
if m_sender:
sender_text = m_sender.group(1).strip()
except Exception as e:
print(f"[agent] Failed parsing metadata in {path.name}: {e}")
try:
created_at = datetime.utcfromtimestamp(path.stat().st_mtime).isoformat()
except Exception:
created_at = datetime.utcnow().isoformat()
return {
"reportText": text,
"reason": reason_text,
"account": owner_text,
"senderEmail": sender_text,
"createdAt": created_at,
"reportKey": path.name,
}
def _extract_recipient_from_arrow_line(line):
try:
m = re.search(r"(?:=>|->)\s+([^\s]+)", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _extract_sender_from_arrow_line(line):
try:
m = re.search(r"\bF=<([^>]+)>", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _extract_account_ref_from_exim_line(line):
"""Return account_ref like sender(user)."""
try:
# Prefer explicit 'from <sender>'
m_from = re.search(r"from\s+<([^>\s]+@[^>\s]+)>", line)
sender_email = m_from.group(1) if m_from else None
if not sender_email:
# Fallback to token after '<='
m_after = re.search(r"<=\s*<?([^>\s]+@[^\s>]+)>?", line)
if m_after:
sender_email = m_after.group(1)
if not sender_email:
# Submission lines can also include F=<sender>
m_f = re.search(r"\bF=<([^>]+)>", line)
if m_f:
candidate = (m_f.group(1) or '').strip()
if candidate:
sender_email = candidate
# Auth user: A=login:USER or A=dovecot_login:USER or courier_login
m_auth = re.search(r"\bA=[A-Za-z0-9_-]+:([^\s]+)\b", line)
auth_user = m_auth.group(1) if m_auth else None
if not auth_user:
# Local deliveries note the owner as U=username
m_u = re.search(r"\bU=([^\s]+)\b", line)
if m_u:
auth_user = m_u.group(1)
if auth_user:
auth_user = auth_user.strip()
# Compose account ref
if sender_email and auth_user:
return f"{sender_email}({auth_user})"
if sender_email:
return sender_email
if auth_user:
return auth_user
return "Unknown"
except Exception:
return "Unknown"
def _update_counters_from_exim_line(line, thresholds, line_ts):
"""Parse Exim log line and update counters for tracking and per-account limits."""
try:
if 'diradm_user=' in line:
msgid = _extract_message_id_from_any_line(line) or _extract_message_id_from_exim_line(line)
if msgid:
try:
m_user = re.search(r"diradm_user=([^\s]+)", line)
owner_user = m_user.group(1).strip().lower() if m_user else ''
if owner_user:
MSG_ID_TO_OWNER[msgid] = owner_user
existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
email_part = None
if existing_ref:
email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
email_part = (email_part or '').strip()
if not email_part:
m_addr = re.search(r"diradm_address=([^\s]+)", line)
if m_addr:
email_part = (m_addr.group(1) or '').strip()
if not email_part:
email_part = _extract_account_ref_from_exim_line(line)
if email_part:
MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
m_dom = re.search(r"diradm_domain=([^\s]+)", line)
if m_dom:
dom = (m_dom.group(1) or '').strip().lower().rstrip('.')
if dom:
DOMAIN_OWNER_MAP[dom] = owner_user
print(f"[agent] DirectAdmin owner resolved for msgid={msgid}: owner={owner_user}")
except Exception as e:
print(f"[agent] DirectAdmin owner parse error: {e}")
if 'Sender identification' in line:
msgid = _extract_message_id_from_any_line(line)
if msgid:
try:
m_user = re.search(r"\bU=([^\s]+)", line)
if m_user:
owner_user = m_user.group(1).strip().lower()
if owner_user:
MSG_ID_TO_OWNER[msgid] = owner_user
existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
if existing_ref:
email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
email_part = (email_part or '').strip()
if '(' not in existing_ref or not existing_ref.endswith(')') or owner_user not in existing_ref:
MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
except Exception:
pass
return
# Only process lines with message submission ('<=' indicates incoming/outgoing)
if ' <= ' not in line:
# Also track deliveries ('=>' for first, '->' for additional recipients)
if ' => ' in line or ' -> ' in line:
# Delivery line format: => recipient ...
msg_id = _extract_message_id_from_any_line(line)
if not msg_id:
return
# Only count deliveries for messages that originated locally or via auth relay
rtype = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
if rtype not in ('local', 'auth'):
return
# Check dedupe for this (msgid, recipient) pair
recipient = _extract_recipient_from_arrow_line(line)
sender = _extract_sender_from_arrow_line(line) or MSG_ID_TO_ACCOUNT.get(msg_id, '')
if not recipient:
return
dedupe_key = (msg_id, recipient)
if dedupe_key in SEEN_MSG_RCPT_SET:
return
SEEN_MSG_RCPT.append((dedupe_key, line_ts))
SEEN_MSG_RCPT_SET.add(dedupe_key)
# Build account_ref from cached data or line
account_ref = MSG_ID_TO_ACCOUNT.get(msg_id) or sender or 'Unknown'
subject = MSG_ID_TO_SUBJECT.get(msg_id, '')
# Record recipient with timestamp for this account
try:
rec_map = RECIPIENTS_BY_ACCOUNT.setdefault(account_ref, {})
rec_map[recipient] = line_ts
except Exception:
pass
# Update per-owner event log (for freeze reports) - one event per delivery
try:
owner = MSG_ID_TO_OWNER.get(msg_id) or _resolve_limit_account_key(account_ref)
client = MSG_ID_TO_CLIENT.get(msg_id, {})
relay = MSG_ID_TO_RELAY.get(msg_id, {})
relay_type = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
ev = {
'ts': line_ts,
'sender': account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref,
'recipient': recipient,
'subject': subject,
'qid': msg_id,
'client': client,
'relay': relay,
'relayType': relay_type
}
dq = ACCOUNT_EVENTS.setdefault(owner, deque())
dq.append(ev)
# Limit retention
cutoff = line_ts - ACCOUNT_EVENTS_RETENTION_SECS
while dq and dq[0].get('ts', 0) < cutoff:
dq.popleft()
except Exception:
pass
# Count each delivery (recipient) as one email for limits
# Fetch account limits for inline enforcement
try:
limits = fetch_account_limits(None) # Use cached limits
lim_by_account = {}
for lim in limits:
if not lim.get('enabled', True):
continue
acc = (lim.get('account') or '').strip().lower()
max_emails = int(lim.get('maxEmails', 0))
per_seconds = int(lim.get('perSeconds', 60))
if acc and max_emails > 0 and per_seconds > 0:
lim_by_account[acc] = (max_emails, per_seconds)
print(f"[agent] Processing delivery: account_ref='{account_ref}', recipient='{recipient}', limits loaded={len(lim_by_account)}")
_record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
except Exception as e:
print(f"[agent] Error in delivery limit check: {e}")
_record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
return
# Message submission line ('<=' indicates the message was received)
msg_id = _extract_message_id_from_exim_line(line)
if not msg_id or msg_id in SEEN_LOG_MSG_IDS_SET:
return
# Mark as seen
SEEN_LOG_MSG_IDS.append((msg_id, line_ts))
SEEN_LOG_MSG_IDS_SET.add(msg_id)
# Extract account reference
account_ref = _extract_account_ref_from_exim_line(line)
if not account_ref or account_ref == "Unknown":
return
# Extract subject from line (T="...")
subject = ''
try:
m_subj = re.search(r"\bT=\"([^\"]*)\"", line)
if m_subj:
subject = m_subj.group(1).strip()
except Exception:
pass
# Cache for later delivery lines
MSG_ID_TO_ACCOUNT[msg_id] = account_ref
MSG_ID_TO_SUBJECT[msg_id] = subject
# Parse client and relay info for freeze reports
try:
client_h, client_ip = _extract_client_host_ip_from_line(line)
if client_h or client_ip:
MSG_ID_TO_CLIENT[msg_id] = {'h': client_h, 'ip': client_ip}
# Relay type: P=local, P=esmtpa (auth), etc.
relay_type = 'unknown'
line_lc = line.lower()
if 'p=local' in line_lc:
relay_type = 'local'
elif 'a=' in line or 'p=esmtpa' in line_lc or 'p=esmtpsa' in line_lc:
relay_type = 'auth'
elif 'p=esmtps' in line_lc or 'p=esmtp' in line_lc or 'p=spam-scanned' in line_lc:
relay_type = 'external'
MSG_ID_TO_RELAY_TYPE[msg_id] = relay_type
# Owner resolution
sender_email = account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref
owner = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account_ref)
MSG_ID_TO_OWNER[msg_id] = owner
except Exception:
pass
# Only count outgoing submissions (local or authenticated). Skip external/inbound.
if MSG_ID_TO_RELAY_TYPE.get(msg_id) not in ('local', 'auth'):
return
# Fetch account limits for inline enforcement
try:
limits = fetch_account_limits(None) # Use cached limits
lim_by_account = {}
for lim in limits:
if not lim.get('enabled', True):
continue
acc = (lim.get('account') or '').strip().lower()
max_emails = int(lim.get('maxEmails', 0))
per_seconds = int(lim.get('perSeconds', 60))
if acc and max_emails > 0 and per_seconds > 0:
lim_by_account[acc] = (max_emails, per_seconds)
_record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
except Exception:
_record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
except Exception as e:
pass # Silent fail per line to keep processing
def _account_key_for_limits(account_ref: str) -> str:
try:
if '(' in account_ref and account_ref.endswith(')'):
# sender(user)
inside = account_ref[account_ref.rfind('(')+1:-1]
if inside:
return inside.lower()
# else maybe it's just an email or username; prefer local-part for email
m = re.match(r"([^@]+)@", account_ref)
if m:
return m.group(1).lower()
return account_ref.strip().lower()
except Exception:
return account_ref.strip().lower()
def _prune_deque(dq: deque, cutoff_ts: float):
while dq and dq[0] < cutoff_ts:
dq.popleft()
def _reload_exim():
try:
# Prefer systemctl, fallback to service
res = subprocess.run(['systemctl', 'reload', 'exim'], capture_output=True, text=True, timeout=5)
if res.returncode != 0:
subprocess.run(['service', 'exim', 'reload'], capture_output=True, text=True, timeout=5)
except Exception as e:
print(f"[agent] Exim reload failed: {e}")
def _ensure_freeze_list():
try:
if not FREEZE_LIST_PATH.exists():
# Create file with safe perms
FREEZE_LIST_PATH.parent.mkdir(parents=True, exist_ok=True)
FREEZE_LIST_PATH.write_text("")
FREEZE_LIST_PATH.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except Exception as e:
print(f"[agent] Could not ensure freeze list: {e}")
def _ensure_virtual_files():
try:
VIRTUAL_DIR.mkdir(parents=True, exist_ok=True)
if not OSM_HOLD_FILE.exists():
OSM_HOLD_FILE.write_text("")
OSM_HOLD_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
if not OSM_DISABLE_FILE.exists():
OSM_DISABLE_FILE.write_text("")
OSM_DISABLE_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except Exception as e:
print(f"[agent] Could not ensure /etc/virtual files: {e}")
def add_sender_to_freeze_list(sender_email: str, reason: str = None):
"""Legacy compatibility wrapper. OSM now relies on /etc/virtual/osm_hold only."""
try:
if not sender_email:
return
print(f"[agent] (notice) Skipping legacy freeze list update for {sender_email}")
except Exception:
pass
def remove_sender_from_freeze_list(sender_email: str):
"""Legacy compatibility wrapper. Nothing to remove from legacy list anymore."""
try:
sender_email = (sender_email or '').strip()
if sender_email:
print(f"[agent] (notice) Legacy freeze list disabled; nothing to remove for {sender_email}")
except Exception:
pass
return False
def add_user_to_hold_list(username, sender_email=None, reason: str = None):
try:
username = (username or '').strip()
email_full = (sender_email or '').strip()
if not username and not email_full:
return
_ensure_virtual_files()
existing = set()
try:
for line in OSM_HOLD_FILE.read_text().splitlines():
s = (line or '').strip()
if s:
existing.add(s)
except Exception:
pass
to_add = []
if username and username not in existing:
to_add.append(username)
if email_full and email_full not in existing:
to_add.append(email_full)
if to_add:
with OSM_HOLD_FILE.open('a') as f:
for item in to_add:
f.write(item + "\n")
_reload_exim()
print(f"[agent] Added to OSM hold list: {', '.join(to_add)}")
# Mark local hold/freeze update time
try:
global LAST_LOCAL_HOLD_UPDATE_TS
LAST_LOCAL_HOLD_UPDATE_TS = time.time()
except Exception:
pass
# Notify server immediately so reconcile will retain this entry
notified = False
if BASE_URL and TOKEN and CURRENT_SERVER_ID is not None:
try:
session.post(
f"{BASE_URL}/api/osm/agent/freeze",
json={
"serverId": str(CURRENT_SERVER_ID),
"senderEmail": email_full,
"account": username,
"reason": (reason or "locally added by agent")
},
headers=DEFAULT_HEADERS,
timeout=5,
)
notified = True
except Exception as e:
print(f"[agent] Failed to notify server freeze (hold): {e}")
if not notified:
try:
_queue_action({
"type": "freeze",
"senderEmail": email_full or username,
"account": username or "",
"reason": reason or "locally added by agent",
"timestamp": time.time(),
})
except Exception:
pass
except Exception as e:
print(f"[agent] Error updating OSM hold list: {e}")
def remove_user_from_hold_list(username=None, sender_email=None):
try:
targets = set()
if username:
targets.add(username.strip().lower())
if sender_email:
targets.add(sender_email.strip().lower())
if not targets:
return False
_ensure_virtual_files()
changed = False
try:
lines = []
for line in OSM_HOLD_FILE.read_text().splitlines():
entry = (line or '').strip()
if entry and entry.lower() not in targets:
lines.append(entry)
else:
changed = True
if changed:
OSM_HOLD_FILE.write_text("\n".join(lines) + ("\n" if lines else ""))
_reload_exim()
try:
global LAST_LOCAL_HOLD_UPDATE_TS
LAST_LOCAL_HOLD_UPDATE_TS = time.time()
except Exception:
pass
except FileNotFoundError:
changed = False
return changed
except Exception as e:
print(f"[agent] Error removing from OSM hold list: {e}")
return False
def _maybe_trigger_freeze(account_ref, threshold, dq, sid=None):
try:
key = (account_ref, threshold["name"], threshold["seconds"])
events_required = int(threshold["events"])
if len(dq) >= events_required:
last = LAST_TRIGGERED.get(key)
now = time.time()
# Avoid re-triggering too often: only once per window
if not last or (now - last) >= threshold["seconds"]:
sender_email = account_ref.split('(')[0]
# Derive username for OSM hold list
username = extract_username_from_account_ref(account_ref)
owner_key = (_resolve_owner_from_email(sender_email) or username or '').strip()
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
# Local relay (hold_owner==True) → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s"
)
# Maintain legacy exim sender freeze list for generic setups
add_sender_to_freeze_list(sender_email, reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s")
LAST_TRIGGERED[key] = now
reason_text = f"Exceeded {threshold['events']} in {threshold['seconds']}s"
limit_label = f"Threshold: {threshold['events']} events"
_generate_freeze_report(account_ref, owner_key or username, sender_email, reason_text, int(threshold["seconds"]), limit_label, sid)
print(f"[agent] Threshold exceeded for {account_ref}: {len(dq)}/{events_required} in {threshold['seconds']}s (freeze)")
except Exception as e:
print(f"[agent] Freeze decision error: {e}")
def _record_event(account_ref, subject, thresholds, sid=None, lim_by_account=None, event_ts=None):
event_time = event_ts if event_ts is not None else time.time()
# Remember last subject (best-effort)
try:
sub = (subject or '').strip()
if sub:
# Collapse newlines/tabs and trim
sub = re.sub(r"\s+", " ", sub)[:200]
LAST_SUBJECT_BY_ACCOUNT[account_ref] = sub
except Exception:
pass
owner_key_for_limits = _resolve_limit_account_key(account_ref)
has_account_limit = bool(lim_by_account and owner_key_for_limits in lim_by_account)
has_default_limit = bool(DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0)
for t in thresholds:
if (t.get("type") or "logline") != "logline":
continue
window = int(t["seconds"])
key = (account_ref, t["name"], window)
dq = LOG_COUNTERS[key]
_prune_deque(dq, event_time - window)
dq.append(event_time)
LAST_COUNTER_SEEN_TS[key] = event_time
if not has_account_limit and not has_default_limit:
_maybe_trigger_freeze(account_ref, t, dq, sid)
# Apply per-account limits from Manage
try:
if lim_by_account is not None:
owner_key = owner_key_for_limits
# Debug: print resolved owner and available limits
if lim_by_account:
print(f"[agent] Resolved owner_key='{owner_key}' from account_ref='{account_ref}', available limits={list(lim_by_account.keys())}")
# 1) Account-specific limit
if owner_key in lim_by_account:
max_emails, per_seconds = lim_by_account[owner_key]
dq = ACCOUNT_LIMIT_WINDOWS[owner_key]
_prune_deque(dq, event_time - per_seconds)
dq.append(event_time)
# Also mirror this into tracking DB counters under a standard trigger name
trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
tdq = LOG_COUNTERS[trig_key]
_prune_deque(tdq, event_time - per_seconds)
tdq.append(event_time)
if len(dq) > max_emails:
# Deduplicate: only trigger once per window
last_triggered = LAST_LIMIT_TRIGGERED.get(owner_key, 0)
already_triggered = (event_time - last_triggered) < per_seconds
if not already_triggered:
# Over limit: freeze sender
sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
reason_text = f"Exceeded {max_emails} in {per_seconds}s"
log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
if not log_evidence:
print(f"[agent] Queue limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
else:
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
# Local relay → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=reason_text
)
add_sender_to_freeze_list(sender_email, reason=reason_text)
LAST_LIMIT_TRIGGERED[owner_key] = event_time
limit_label = f"Limit: {max_emails}"
_generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
# 2) Server default limit if no account-specific
elif DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0:
per_seconds = DEFAULT_PER_SECONDS
max_emails = DEFAULT_MAX_EMAILS
dq = ACCOUNT_LIMIT_WINDOWS[f"default::{owner_key}"]
_prune_deque(dq, event_time - per_seconds)
dq.append(event_time)
# Mirror to tracking DB under default window
trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
tdq = LOG_COUNTERS[trig_key]
_prune_deque(tdq, event_time - per_seconds)
tdq.append(event_time)
if len(dq) > max_emails:
# Deduplicate: only trigger once per window (use default:: prefix for key)
dedup_key = f"default::{owner_key}"
last_triggered = LAST_LIMIT_TRIGGERED.get(dedup_key, 0)
already_triggered = (event_time - last_triggered) < per_seconds
if not already_triggered:
sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
username = extract_username_from_account_ref(account_ref)
reason_text = f"Exceeded default {max_emails} in {per_seconds}s"
log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
if not log_evidence:
print(f"[agent] Default limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
else:
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
# Local relay → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=reason_text
)
add_sender_to_freeze_list(sender_email, reason=reason_text)
LAST_LIMIT_TRIGGERED[dedup_key] = event_time
limit_label = f"Default Limit: {max_emails}"
_generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
except Exception as e:
print(f"[agent] Error applying per-account limits: {e}")
def write_tracking_db(thresholds):
try:
lines = []
now_ts = time.time()
valid_windows = {(t["name"], int(t["seconds"])) for t in thresholds if (t.get("type") or "logline") == "logline"}
# Build a set of keys to write: active counters and recently seen counters
keys_to_write = set(LOG_COUNTERS.keys())
for k, ts in list(LAST_COUNTER_SEEN_TS.items()):
if now_ts - ts <= COUNTER_RETENTION_SECS:
keys_to_write.add(k)
for (account_ref, trig_name, window) in sorted(keys_to_write):
dq = LOG_COUNTERS.get((account_ref, trig_name, window), deque())
if not dq and (trig_name, window) not in valid_windows and not trig_name.startswith("limit_"):
continue
# prune before counting
_prune_deque(dq, now_ts - window)
count = len(dq)
# Skip if no count and outside retention
last_ts = LAST_COUNTER_SEEN_TS.get((account_ref, trig_name, window), 0)
if count == 0 and (now_ts - last_ts) > COUNTER_RETENTION_SECS:
continue
subject = (LAST_SUBJECT_BY_ACCOUNT.get(account_ref) or '').strip()
subject = re.sub(r"\s+", " ", subject)[:200] if subject else ''
# Compose recipients string
rcpts = []
try:
rec_map = RECIPIENTS_BY_ACCOUNT.get(account_ref) or {}
# prune again relative to now
for r, t in list(rec_map.items()):
if now_ts - t > RECIPIENT_RETENTION_SECS:
rec_map.pop(r, None)
rcpts = sorted(rec_map.keys())[:50]
except Exception:
rcpts = []
lines.append(f"{count}event percent")
lines.append(f"{window}seconds")
# Append recipients after subject using rcpt-> comma-separated
rcpt_part = f"rcpt->{','.join(rcpts)}" if rcpts else "rcpt->"
lines.append(f"{account_ref}->{trig_name}->logline->subject->{subject} {rcpt_part}")
TRACKING_DB_PATH.write_text("\n".join(lines) + ("\n" if lines else ""))
except Exception as e:
print(f"[agent] Error writing tracking DB: {e}")
def _get_subject_for_message_id(msg_id: str) -> str:
"""Fetch Subject header for a given exim message id using exim -Mvh."""
try:
if not msg_id:
return ""
res = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
if res.returncode != 0:
return ""
headers = res.stdout
m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
if m:
return m.group(1).strip()
except Exception:
pass
return ""
def _extract_subject_from_headers_text(headers: str) -> str:
try:
if not headers:
return ""
m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
if m:
return m.group(1).strip()
except Exception:
pass
return ""
def get_email_headers(msg_id):
"""Fetch email headers using exim -Mvh"""
try:
result = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
headers = result.stdout.strip()
return headers
else:
return f"Error fetching headers: {result.stderr.strip()}"
except Exception as e:
return f"Error fetching headers: {str(e)}"
def get_email_body(msg_id):
"""Fetch email body using exim -Mvb"""
try:
result = subprocess.run(['exim', '-Mvb', msg_id], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
body = result.stdout.strip()
return body
else:
return f"Error fetching body: {result.stderr.strip()}"
except Exception as e:
return f"Error fetching body: {str(e)}"
def _process_pending_actions():
sid = _get_current_server_id()
if not BASE_URL or not TOKEN or not sid:
return
actions = _read_actions_queue()
if not actions:
return
remaining = []
for action in actions:
typ = action.get('type')
try:
if typ == 'freeze':
payload = {
"serverId": sid,
"senderEmail": action.get('senderEmail', ''),
"account": action.get('account') or '',
"reason": action.get('reason') or 'CLI freeze'
}
session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'unfreeze':
payload = {
"serverId": sid,
"senderEmail": action.get('senderEmail', ''),
"account": action.get('account') or ''
}
session.post(f"{BASE_URL}/api/osm/agent/unfreeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'set_limit':
payload = {
"serverId": sid,
"account": action.get('account', ''),
"maxEmails": int(action.get('maxEmails', 0)),
"perSeconds": int(action.get('perSeconds', 0)),
"enabled": bool(action.get('enabled', True))
}
session.post(f"{BASE_URL}/api/osm/agent/limit", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'remove_limit':
payload = {
"serverId": sid,
"account": action.get('account', '')
}
session.post(f"{BASE_URL}/api/osm/agent/limit-remove", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'set_default_limit':
payload = {
"serverId": sid,
"maxEmails": int(action.get('maxEmails', 0)),
"perSeconds": int(action.get('perSeconds', 0)),
"enabled": bool(action.get('enabled', True))
}
session.post(f"{BASE_URL}/api/osm/agent/limit-default", json=payload, headers=DEFAULT_HEADERS, timeout=10)
else:
continue
except Exception as e:
print(f"[agent] Failed to sync action {typ}: {e}")
remaining.append(action)
continue
_write_actions_queue(remaining)
def _append_client_summary(lines, events_window):
try:
ips = []
hosts = []
seen_ips = set()
seen_hosts = set()
for ev in events_window:
client = ev.get('client', {}) or {}
host, ip = _resolve_client_details(ev.get('qid') or '', client)
if ip and ip not in seen_ips:
seen_ips.add(ip)
ips.append(ip)
if host and host not in seen_hosts:
seen_hosts.add(host)
hosts.append(host)
if ips:
lines.append(f"Client IPs: {', '.join(ips)}\n")
if hosts:
lines.append(f"Client Hosts: {', '.join(hosts)}\n")
except Exception:
pass
def get_exim_queue_stats(stats):
"""Populate queue statistics using exim commands."""
try:
result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return stats
lines = result.stdout.splitlines()
details = []
current = None
for raw in lines:
line = (raw or '').rstrip()
if not line:
continue
stripped = line.lstrip()
if not stripped:
continue
if stripped.startswith('**') or stripped.startswith('=='):
continue
is_header_line = False
try:
parts = stripped.split()
if len(parts) < 3:
is_header_line = False
else:
age_token = parts[0]
second_token = parts[1] if len(parts) > 1 else ''
third_token = parts[2] if len(parts) > 2 else ''
# very loose check: age token contains digit and second token is numeric-ish
size_token = second_token.upper()
if any(ch.isdigit() for ch in age_token) and (size_token.replace('.', '').replace('K','').replace('M','').isdigit()):
is_header_line = True
except Exception:
is_header_line = False
if is_header_line:
age = parts[0]
size = parts[1]
queue_id = parts[2]
sender = ""
if len(parts) >= 4:
sender = parts[3]
if sender == "<=" and len(parts) >= 5:
sender = parts[4]
current = {
"queueId": queue_id,
"messageId": queue_id,
"age": age,
"size": size,
"sender": sender,
"status": stripped,
"recipients": [],
"subject": "",
"headers": "",
"body": "",
"ownerAccount": "",
}
details.append(current)
try:
headers = get_email_headers(queue_id)
if headers and not headers.startswith("Error fetching"):
current["headers"] = headers
current["subject"] = _extract_subject_from_headers_text(headers) or current["subject"]
except Exception:
pass
try:
subj_inline = _get_subject_for_message_id(queue_id)
if subj_inline:
current["subject"] = subj_inline
except Exception:
pass
continue
if current is not None:
recipient = stripped
if recipient:
current["recipients"].append(recipient)
# Optionally attach short recipient preview and body (lazy fetch only if small queue)
for entry in details[:50]:
try:
if not entry.get("body"):
body = get_email_body(entry["queueId"])
if body and not body.startswith("Error fetching"):
# Avoid massive payloads
entry["body"] = body[:10000]
except Exception:
pass
if entry.get("recipients"):
entry["recipient"] = ", ".join(entry["recipients"])
else:
entry["recipient"] = ""
if entry.get("headers"):
owner = extract_account_from_detail({
"headers": entry["headers"],
"sender": entry.get("sender", ""),
"recipient": entry.get("recipient", "")
})
entry["ownerAccount"] = owner
stats["queueDetails"] = details
stats["totalMessages"] = len(details)
stats["activeMessages"] = len(details)
return stats
except Exception as e:
print(f"[agent] Error getting exim queue stats: {e}")
return stats
def get_postfix_queue_stats(stats):
"""Get queue statistics using postfix commands"""
try:
# Get queue summary using postqueue
result = subprocess.run(['postqueue', '-p'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
output = result.stdout
# Parse postqueue output
lines = output.strip().split('\n')
for line in lines[1:]: # Skip header line
if line.strip() and not line.startswith('-'):
parts = line.split()
if len(parts) >= 5:
queue_id = parts[0]
size = parts[1]
nrcpt = parts[2]
time_str = parts[3]
sender = parts[4] if len(parts) > 4 else ""
stats["queueDetails"].append({
"queueId": queue_id,
"size": size,
"recipients": nrcpt,
"time": time_str,
"sender": sender,
"ownerAccount": (_resolve_owner_from_email(_extract_email(sender) or '') or '')
})
# Categorize messages
if queue_id.startswith('active'):
stats["activeMessages"] += 1
elif queue_id.startswith('deferred'):
stats["deferredMessages"] += 1
elif queue_id.startswith('hold'):
stats["holdMessages"] += 1
elif queue_id.startswith('incoming'):
stats["incomingMessages"] += 1
else:
stats["outgoingMessages"] += 1
stats["totalMessages"] += 1
except Exception as e:
print(f"[agent] Error getting postfix queue stats: {e}")
return stats
def get_mail_queue_stats(queue_path: str):
"""Get mail queue statistics using appropriate mail server commands"""
stats = {
"totalMessages": 0,
"activeMessages": 0,
"deferredMessages": 0,
"holdMessages": 0,
"incomingMessages": 0,
"outgoingMessages": 0,
"queueDetails": [],
"spamScores": [],
"blockedDomains": []
}
mail_server = detect_mail_server()
try:
if mail_server == 'exim':
return get_exim_queue_stats(stats)
elif mail_server == 'postfix':
return get_postfix_queue_stats(stats)
else:
# Fallback: try exim totals (noop) then postfix
s = stats
s = get_postfix_queue_stats(s)
return s
except Exception:
return stats
def get_spam_scores(queue_path: str, mail_server: str):
"""Extract spam scores from mail headers"""
spam_scores = []
try:
if mail_server == 'exim':
spam_scores = get_exim_spam_scores()
elif mail_server == 'postfix':
spam_scores = get_postfix_spam_scores(queue_path)
else:
# Try both methods
spam_scores = get_exim_spam_scores()
if not spam_scores:
spam_scores = get_postfix_spam_scores(queue_path)
except Exception as e:
print(f"[agent] Error getting spam scores: {e}")
return spam_scores
def get_exim_spam_scores():
"""Get spam scores from exim queue using exim -Mvh"""
spam_scores = []
try:
# Get list of queued messages (with sudo for permission)
result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
msg_ids = []
for line in lines:
if line.strip() and not line.startswith('-') and not line.startswith('Mail queue'):
parts = line.split()
if len(parts) >= 1:
msg_ids.append(parts[0])
# Get headers for first few messages to check for spam scores
for msg_id in msg_ids[:10]: # Limit to 10 messages
try:
header_result = subprocess.run(['sudo', 'exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
if header_result.returncode == 0:
headers = header_result.stdout
for line in headers.split('\n'):
if 'X-Spam-Score:' in line or 'X-Spam-Level:' in line or 'X-Spam-Status:' in line:
score_match = re.search(r'(\d+\.?\d*)', line)
if score_match:
spam_scores.append({
"messageId": msg_id,
"score": float(score_match.group(1)),
"header": line.strip()
})
except Exception:
continue
except Exception as e:
print(f"[agent] Error getting exim spam scores: {e}")
return spam_scores
def get_postfix_spam_scores(queue_path: str):
"""Get spam scores from postfix queue files"""
spam_scores = []
try:
# Look for recent messages in the queue
queue_dirs = ['active', 'deferred', 'incoming']
for queue_dir in queue_dirs:
queue_dir_path = Path(queue_path) / queue_dir
if queue_dir_path.exists():
for msg_file in queue_dir_path.iterdir():
if msg_file.is_file():
try:
# Read first few lines to get headers
with open(msg_file, 'r', encoding='utf-8', errors='ignore') as f:
headers = []
for i, line in enumerate(f):
if i > 50: # Only read first 50 lines
break
headers.append(line.strip())
if line.strip() == '': # End of headers
break
# Look for spam score headers
for header in headers:
if 'X-Spam-Score:' in header or 'X-Spam-Level:' in header:
score_match = re.search(r'(\d+\.?\d*)', header)
if score_match:
spam_scores.append({
"messageId": msg_file.name,
"score": float(score_match.group(1)),
"header": header
})
except Exception:
continue
if len(spam_scores) >= 20: # Limit to 20 recent scores
break
except Exception as e:
print(f"[agent] Error getting postfix spam scores: {e}")
return spam_scores
def check_blocked_domains(mail_server: str):
"""Check for blocked domains in recent mail"""
blocked_domains = []
try:
if mail_server == 'exim':
blocked_domains = check_exim_blocked_domains()
elif mail_server == 'postfix':
blocked_domains = check_postfix_blocked_domains()
else:
# Try both methods
blocked_domains = check_exim_blocked_domains()
blocked_domains.extend(check_postfix_blocked_domains())
except Exception as e:
print(f"[agent] Error checking blocked domains: {e}")
return blocked_domains
def check_exim_blocked_domains():
"""Check exim logs for blocked domains"""
blocked_domains = []
try:
# Check exim logs for blocked domains
log_files = [
'/var/log/exim/mainlog',
'/var/log/exim/paniclog',
'/var/log/mail.log',
'/var/log/maillog'
]
for log_file in log_files:
if os.path.exists(log_file):
try:
# Get recent log entries
result = subprocess.run(
['tail', '-100', log_file],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if any(keyword in line.lower() for keyword in ['blocked', 'reject', 'denied', 'blacklisted']):
# Extract domain from log line
domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
if domain_match:
domain = domain_match.group(1)
if domain not in [bd['domain'] for bd in blocked_domains]:
blocked_domains.append({
"domain": domain,
"timestamp": datetime.now().isoformat(),
"reason": line.strip()
})
except Exception:
continue
except Exception as e:
print(f"[agent] Error checking exim blocked domains: {e}")
return blocked_domains
def check_postfix_blocked_domains():
"""Check postfix logs for blocked domains"""
blocked_domains = []
try:
# Check postfix logs for blocked domains
log_files = [
'/var/log/mail.log',
'/var/log/postfix.log',
'/var/log/maillog'
]
for log_file in log_files:
if os.path.exists(log_file):
try:
# Get recent log entries
result = subprocess.run(
['tail', '-100', log_file],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if 'blocked' in line.lower() or 'reject' in line.lower():
# Extract domain from log line
domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
if domain_match:
domain = domain_match.group(1)
if domain not in [bd['domain'] for bd in blocked_domains]:
blocked_domains.append({
"domain": domain,
"timestamp": datetime.now().isoformat(),
"reason": line.strip()
})
except Exception:
continue
except Exception as e:
print(f"[agent] Error checking postfix blocked domains: {e}")
return blocked_domains
def detect_mail_server():
"""Detect which mail server (Exim/Postfix) is available on this host."""
try:
# Prefer explicit binaries first
if shutil.which('exim') or Path('/usr/sbin/exim').exists():
return 'exim'
if shutil.which('postqueue') or shutil.which('postfix'):
return 'postfix'
# Fallback heuristics: check for config directories
if Path('/etc/exim').exists() or Path('/var/log/exim').exists():
return 'exim'
if Path('/etc/postfix').exists():
return 'postfix'
except Exception:
pass
return 'unknown'
def detect_control_panel():
"""Detect which control panel is installed (cpanel, directadmin, or unknown)."""
try:
if os.path.isdir('/usr/local/cpanel'):
return 'cpanel'
if os.path.isdir('/usr/local/directadmin'):
return 'directadmin'
# Try binaries
try:
res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
return 'cpanel'
except Exception:
pass
return 'unknown'
except Exception:
return 'unknown'
def list_cpanel_accounts():
"""List all cPanel accounts."""
accounts = []
try:
# Try whmapi1 listaccts
res = subprocess.run(['whmapi1', 'listaccts'], capture_output=True, text=True, timeout=30)
if res.returncode == 0:
import json
try:
data = json.loads(res.stdout)
accts = data.get('data', {}).get('acct', [])
for a in accts:
user = a.get('user', '')
domain = a.get('domain', '')
if user:
accounts.append({'username': user, 'domain': domain})
except Exception:
pass
# Fallback: parse /etc/trueuserdomains
if not accounts:
p = Path('/etc/trueuserdomains')
if p.exists():
for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
parts = line.split(':', 1)
if len(parts) == 2:
domain = parts[0].strip()
user = parts[1].strip()
if user and domain:
accounts.append({'username': user, 'domain': domain})
except Exception as e:
print(f"[agent] Error listing cPanel accounts: {e}")
return accounts
def list_directadmin_accounts():
"""List all DirectAdmin accounts."""
accounts = []
try:
users_dir = Path('/usr/local/directadmin/data/users')
if users_dir.exists() and users_dir.is_dir():
for user_dir in users_dir.iterdir():
if user_dir.is_dir():
username = user_dir.name
# Try to get primary domain
domain = ''
try:
domain_file = user_dir / 'domains.list'
if domain_file.exists():
domains = domain_file.read_text(encoding='utf-8', errors='ignore').splitlines()
if domains:
domain = domains[0].strip()
except Exception:
pass
accounts.append({'username': username, 'domain': domain})
except Exception as e:
print(f"[agent] Error listing DirectAdmin accounts: {e}")
return accounts
def send_queue_snapshot(sid: int, stats: dict):
"""Send queue snapshot to server"""
if not QUEUE_INGEST_ENABLED:
print(f"[agent] Queue ingestion disabled; not sending snapshot")
return
try:
# Debug: print what we're sending
print(f"[agent] Sending snapshot with {len(stats['queueDetails'])} queue details")
if stats["queueDetails"]:
first_detail = stats["queueDetails"][0]
print(f"[agent] First detail: queueId={first_detail.get('queueId')}, headers_len={len(first_detail.get('headers', ''))}, body_len={len(first_detail.get('body', ''))}")
# Test JSON serialization first
try:
queue_details_json = json.dumps(stats["queueDetails"])
print(f"[agent] JSON serialization successful, size: {len(queue_details_json)} chars")
except Exception as json_error:
print(f"[agent] JSON serialization error: {json_error}")
# Try with limited data
limited_details = []
for detail in stats["queueDetails"][:5]: # Only first 5 messages
limited_detail = {
"queueId": detail.get("queueId", ""),
"size": detail.get("size", ""),
"recipients": detail.get("recipients", ""),
"time": detail.get("time", ""),
"sender": detail.get("sender", ""),
"recipient": detail.get("recipient", ""),
"headers": detail.get("headers", "")[:1000] + "..." if len(detail.get("headers", "")) > 1000 else detail.get("headers", ""),
"body": detail.get("body", "")[:1000] + "..." if len(detail.get("body", "")) > 1000 else detail.get("body", "")
}
limited_details.append(limited_detail)
queue_details_json = json.dumps(limited_details)
print(f"[agent] Using limited data, size: {len(queue_details_json)} chars")
payload = {
"serverId": str(sid),
"totalMessages": stats["totalMessages"],
"activeMessages": stats["activeMessages"],
"deferredMessages": stats["deferredMessages"],
"holdMessages": stats["holdMessages"],
"incomingMessages": stats["incomingMessages"],
"outgoingMessages": stats["outgoingMessages"],
"queueDetailsJson": queue_details_json,
"spamScoreJson": json.dumps(stats["spamScores"]),
"blockedDomainsJson": json.dumps(stats["blockedDomains"])
}
print(f"[agent] Sending payload with total size: {len(json.dumps(payload))} chars")
resp = session.post(f"{BASE_URL}/api/osm/agent/queue-snapshot", json=payload, headers=DEFAULT_HEADERS, timeout=30)
resp.raise_for_status()
print(f"[agent] Queue snapshot sent successfully")
except Exception as e:
print(f"[agent] Error sending queue snapshot: {e}")
import traceback
print(f"[agent] Traceback: {traceback.format_exc()}")
def send_accounts_snapshot(sid: int):
"""Detect panel and send accounts list to server."""
try:
panel = 'unknown'
accounts = []
# simple detection without shutil to avoid import if not present
if os.path.isdir('/usr/local/cpanel'):
panel = 'cpanel'
accounts = list_cpanel_accounts()
elif os.path.isdir('/usr/local/directadmin'):
panel = 'directadmin'
accounts = list_directadmin_accounts()
else:
# Try binaries
try:
res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
panel = 'cpanel'
accounts = list_cpanel_accounts()
except Exception:
pass
if panel == 'unknown' and Path('/usr/local/directadmin').exists():
panel = 'directadmin'
accounts = list_directadmin_accounts()
payload = {
"serverId": str(sid),
"panelType": panel,
"accountsJson": json.dumps(accounts)
}
print(f"[agent] Sending accounts snapshot: panel={panel}, count={len(accounts)}")
resp = session.post(f"{BASE_URL}/api/osm/agent/accounts", json=payload, headers=DEFAULT_HEADERS, timeout=15)
resp.raise_for_status()
print(f"[agent] Accounts snapshot sent successfully (panel={panel}, count={len(accounts)})")
except Exception as e:
print(f"[agent] Error sending accounts snapshot: {e}")
def fetch_account_limits(sid=None):
# If offline or no server id, use cached limits only
if sid is None or not BASE_URL or not TOKEN:
cache = _read_limits_cache()
return cache.get("accounts", [])
try:
resp = session.get(f"{BASE_URL}/api/osm/agent/account-limits", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
data = resp.json() or []
cache = _read_limits_cache()
cache["accounts"] = [
{
"account": str(x.get("account") or "").strip(),
"maxEmails": int(x.get("maxEmails") or 0),
"perSeconds": int(x.get("perSeconds") or 0),
"enabled": bool(x.get("enabled", True))
}
for x in data
]
_write_limits_cache(cache)
return data
except Exception as e:
print(f"[agent] Error fetching account limits: {e}")
cache = _read_limits_cache()
return cache.get("accounts", [])
def parse_age_to_seconds(age_str: str) -> int:
try:
s = age_str.strip()
if not s:
return 0
unit = s[-1]
val = int(''.join(ch for ch in s if ch.isdigit())) if any(ch.isdigit() for ch in s) else 0
if unit == 's':
return val
if unit == 'm':
return val * 60
if unit == 'h':
return val * 3600
if unit == 'd':
return val * 86400
return val
except Exception:
return 0
def extract_account_from_detail(detail: dict) -> str:
# Try headers for -ident or -auth_sender then fallback to recipient/sender
try:
headers = detail.get('headers') or ''
if headers:
m = re.search(r"-ident\s+(\w+)", headers)
if m:
return m.group(1)
m = re.search(r"-auth_sender\s+([^@\s]+)@", headers)
if m:
return m.group(1)
m = re.search(r"Received: from (\w+) by", headers)
if m:
return m.group(1)
recip = (detail.get('recipient') or '').strip()
if recip.startswith('(') and recip.endswith(')') and len(recip) > 2:
return recip[1:-1]
sender = (detail.get('sender') or '')
m = re.search(r"<([^@>]+)@", sender)
if m:
return m.group(1)
except Exception:
pass
return 'Unknown'
def freeze_message_exim(queue_id: str):
try:
subprocess.run(['exim', '-Mf', queue_id], capture_output=True, text=True, timeout=5)
except Exception as e:
print(f"[agent] exim freeze failed for {queue_id}: {e}")
def hold_message_postfix(queue_id: str):
try:
subprocess.run(['postsuper', '-h', queue_id], capture_output=True, text=True, timeout=5)
except Exception as e:
print(f"[agent] postfix hold failed for {queue_id}: {e}")
def enforce_account_limits(sid: int, stats: dict):
# Live rolling window using first-seen timestamps per message
# State containers (persist across calls via attributes on function)
if not hasattr(enforce_account_limits, "seen_ids"):
enforce_account_limits.seen_ids = set()
if not hasattr(enforce_account_limits, "account_windows"):
enforce_account_limits.account_windows = defaultdict(deque) # account -> deque[timestamps]
limits = fetch_account_limits(sid)
if not limits:
return
now_ts = time.time()
details = stats.get('queueDetails') or []
mail_server = detect_mail_server()
# Index limits for quick lookup
lim_by_account = {}
for lim in limits:
try:
if not lim.get('enabled', True):
continue
account = (lim.get('account') or '').strip().lower()
max_emails = int(lim.get('maxEmails', 0))
per_seconds = int(lim.get('perSeconds', 60))
if account and max_emails > 0 and per_seconds > 0:
lim_by_account[account] = (max_emails, per_seconds)
except Exception:
continue
if not lim_by_account:
return
for d in details:
log_evidence = False
try:
qid = d.get('queueId') or d.get('messageId')
if not qid or qid in enforce_account_limits.seen_ids:
continue
account = extract_account_from_detail(d)
# Resolve owner from sender email (prefer) else map account_ref to owner key
sender_email = _extract_email(d.get('sender') or '') or _extract_email(account) or ''
owner_key = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account)
if owner_key not in lim_by_account:
continue
# Mark first seen and evaluate window
enforce_account_limits.seen_ids.add(qid)
max_emails, per_seconds = lim_by_account[owner_key]
dq = enforce_account_limits.account_windows[owner_key]
# prune old timestamps
cutoff = now_ts - per_seconds
while dq and dq[0] < cutoff:
dq.popleft()
dq.append(now_ts)
if len(dq) > max_emails:
# Over limit: act on this message immediately
print(f"[agent] Limit exceeded for {owner_key}: {len(dq)}/{max_emails} in {per_seconds}s. Freezing/holding {qid}.")
if mail_server == 'exim':
freeze_message_exim(qid)
elif mail_server == 'postfix':
hold_message_postfix(qid)
# Also add to hold lists for sustained blocking
reason_text = f"Exceeded {max_emails} in {per_seconds}s (count={len(dq)})"
log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
if not log_evidence:
print(f"[agent] Queue enforcement detected over-limit for {owner_key} but no recent exim log activity; skipping persistent freeze.")
else:
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
try:
# Local relay → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=reason_text
)
add_sender_to_freeze_list(sender_email, reason=reason_text)
except Exception as e:
print(f"[agent] Failed to update hold/freeze lists: {e}")
# Inform server so reconcile won't remove our local entry
fallback_events = _fallback_events_from_queue_detail(d, sender_email, owner_key)
limit_label = f"Limit: {max_emails}"
_generate_freeze_report(account, owner_key, sender_email, reason_text, per_seconds, limit_label, sid, fallback_events if fallback_events else None)
else:
log_evidence = False
except Exception as e:
print(f"[agent] Error enforcing per-message limit: {e}")
except Exception:
pass
def check_spam_thresholds(sid: int, stats: dict, config: dict):
"""Check if spam thresholds are exceeded and send alerts"""
try:
spam_threshold = config.get("spamThreshold", 5.0)
high_spam_count = 0
for spam_score in stats["spamScores"]:
if spam_score["score"] >= spam_threshold:
high_spam_count += 1
if high_spam_count > 0:
alert_payload = {
"serverId": str(sid),
"alertType": "HIGH_SPAM_SCORE",
"severity": "WARNING" if high_spam_count < 10 else "CRITICAL",
"message": f"Found {high_spam_count} messages with spam score >= {spam_threshold}",
"detailsJson": json.dumps({
"highSpamCount": high_spam_count,
"threshold": spam_threshold,
"samples": stats["spamScores"][:5] # First 5 samples
})
}
resp = session.post(f"{BASE_URL}/api/osm/agent/spam-alert", json=alert_payload, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
except Exception as e:
print(f"[agent] Error checking spam thresholds: {e}")
def _execute_queue_action_delete(ids):
try:
ok = 0
for qid in ids:
if not qid:
continue
try:
res = subprocess.run(['exim', '-Mrm', qid], capture_output=True, text=True, timeout=10)
if res.returncode == 0:
ok += 1
else:
print(f"[agent] exim -Mrm {qid} failed: rc={res.returncode} stdout={res.stdout.strip()} stderr={res.stderr.strip()}")
except Exception as e:
print(f"[agent] delete_queue exception for {qid}: {e}")
continue
return ok
except Exception as e:
print(f"[agent] delete_queue failed: {e}")
return 0
def _execute_queue_action_thaw(ids):
try:
ok = 0
for qid in ids:
if not qid:
continue
try:
res = subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
if res.returncode == 0:
ok += 1
except Exception:
continue
return ok
except Exception as e:
print(f"[agent] thaw_queue failed: {e}")
return 0
def _execute_queue_action_release(ids):
"""Release a frozen message: thaw and then schedule delivery."""
try:
ok = 0
for qid in ids:
if not qid:
continue
try:
# Thaw first
subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
# Then attempt immediate delivery
res = subprocess.run(['exim', '-M', qid], capture_output=True, text=True, timeout=15)
if res.returncode == 0:
ok += 1
except Exception:
continue
return ok
except Exception as e:
print(f"[agent] release_queue failed: {e}")
return 0
def process_sse_event(event_data: str, sid: int):
"""Process a single SSE event containing a command.
Note: Only events that include a non-empty 'type' are considered commands.
"""
try:
cmd = json.loads(event_data)
cid = cmd.get('id') or ''
ctype = (cmd.get('type') or '').strip().lower()
payload_json = cmd.get('payloadJson') or '{}'
if not ctype:
# Ignore non-command SSE payloads (e.g., queue-update snapshots)
return
print(f"[agent] SSE event received: id={cid} type={ctype}")
result_msg = ''
try:
if isinstance(payload_json, (dict, list)):
payload = payload_json
else:
payload = json.loads(payload_json)
except Exception:
payload = {}
ids = payload.get('queueIds') or []
if ctype == 'delete_queue':
n = _execute_queue_action_delete(ids)
result_msg = f"deleted {n}/{len(ids)}"
print(f"[agent] delete_queue: {result_msg}")
elif ctype == 'thaw_queue':
n = _execute_queue_action_thaw(ids)
result_msg = f"thawed {n}/{len(ids)}"
print(f"[agent] thaw_queue: {result_msg}")
elif ctype == 'release_queue':
n = _execute_queue_action_release(ids)
result_msg = f"released {n}/{len(ids)}"
print(f"[agent] release_queue: {result_msg}")
elif ctype == 'snapshot_queue':
# Just send a fresh snapshot
try:
cfg = fetch_config(sid) or {}
if not QUEUE_INGEST_ENABLED or not cfg.get("queueIngestEnabled", True):
result_msg = "snapshot skipped (queue ingest disabled)"
print("[agent] snapshot_queue: skipped, queue ingest disabled")
else:
stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
send_queue_snapshot(sid, stats)
result_msg = "snapshot sent"
print("[agent] snapshot_queue: snapshot sent")
except Exception as e:
result_msg = f"snapshot failed: {e}"
print(f"[agent] snapshot_queue failed: {e}")
elif ctype == 'get_live_queue':
# On-demand live queue fetch - send current queue data back via callback
try:
request_id = payload.get('requestId') or ''
sender_filter = (payload.get('senderEmail') or '').strip().lower()
account_filter = (payload.get('account') or '').strip().lower()
cfg = fetch_config(sid) or {}
stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
queue_details = stats.get('queueDetails', [])
# Apply filters if provided
if sender_filter or account_filter:
filtered = []
for detail in queue_details:
sender = (detail.get('sender') or '').lower()
owner = (detail.get('ownerAccount') or '').lower()
if sender_filter and sender_filter in sender:
filtered.append(detail)
elif account_filter and account_filter == owner:
filtered.append(detail)
queue_details = filtered
# Send live queue data back to server
response_payload = {
"serverId": str(sid),
"requestId": request_id,
"totalMessages": stats.get("totalMessages", 0),
"activeMessages": stats.get("activeMessages", 0),
"deferredMessages": stats.get("deferredMessages", 0),
"holdMessages": stats.get("holdMessages", 0),
"queueDetails": queue_details,
"spamScores": stats.get("spamScores", []),
}
session.post(
f"{BASE_URL}/api/osm/agent/live-queue-response",
json=response_payload,
headers=DEFAULT_HEADERS,
timeout=30
)
result_msg = f"live queue sent ({len(queue_details)} items)"
print(f"[agent] get_live_queue: sent {len(queue_details)} queue items")
except Exception as e:
result_msg = f"live queue failed: {e}"
print(f"[agent] get_live_queue failed: {e}")
elif ctype == 'get_accounts':
try:
request_id = payload.get('requestId') or ''
panel = 'unknown'
accounts = []
if os.path.isdir('/usr/local/cpanel'):
panel = 'cpanel'
accounts = list_cpanel_accounts()
elif os.path.isdir('/usr/local/directadmin'):
panel = 'directadmin'
accounts = list_directadmin_accounts()
else:
try:
res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
panel = 'cpanel'
accounts = list_cpanel_accounts()
except Exception:
pass
if panel == 'unknown' and Path('/usr/local/directadmin').exists():
panel = 'directadmin'
accounts = list_directadmin_accounts()
response_payload = {
"serverId": str(sid),
"requestId": request_id,
"panelType": panel,
"accountsJson": json.dumps(accounts)
}
session.post(
f"{BASE_URL}/api/osm/agent/accounts",
json=response_payload,
headers=DEFAULT_HEADERS,
timeout=15
)
result_msg = f"accounts sent ({len(accounts)} items)"
print(f"[agent] get_accounts: sent {len(accounts)} accounts")
except Exception as e:
result_msg = f"get accounts failed: {e}"
print(f"[agent] get_accounts failed: {e}")
elif ctype == 'verify_email':
try:
payload = json.loads(payload_json)
except Exception:
payload = {}
email = (payload.get('email') or '').strip().lower()
exists = False
owner = ''
def _directadmin_mailbox_exists(addr: str) -> bool:
try:
if '@' not in addr:
return False
localpart, domain = addr.split('@', 1)
domain = domain.strip().lower().rstrip('.')
localpart = localpart.strip().lower()
passwd_path = Path(f"/etc/virtual/{domain}/passwd")
if not passwd_path.exists():
return False
try:
for line in passwd_path.read_text(encoding='utf-8', errors='ignore').splitlines():
# DirectAdmin passwd format: localpart:password_hash:...
if not line or ':' not in line:
continue
lp = line.split(':', 1)[0].strip().lower()
if lp == localpart:
return True
except Exception:
return False
return False
except Exception:
return False
try:
# Resolve owner by domain mapping
owner = _resolve_owner_from_email(email) or ''
# Check if any queued or recent message references this sender
qstats = get_mail_queue_stats('/var/spool/postfix')
for d in qstats.get('queueDetails', []):
s = (d.get('sender') or '').lower()
if email and email in s:
exists = True
break
# If not in queue, check mailbox existence on DirectAdmin
if not exists:
if detect_control_panel() == 'directadmin':
if _directadmin_mailbox_exists(email):
exists = True
except Exception:
pass
result_msg = f"exists={'true' if exists else 'false'};owner={owner}"
elif ctype == 'update_limits_cache':
try:
payload = json.loads(payload_json)
except Exception:
payload = {}
# Validate and write to cache
d = payload.get('default') or {}
accs = payload.get('accounts') or []
if isinstance(accs, list):
clean_accs = []
for x in accs:
try:
clean_accs.append({
'account': str((x.get('account') if isinstance(x, dict) else '') or '').strip(),
'maxEmails': int((x.get('maxEmails') if isinstance(x, dict) else 0) or 0),
'perSeconds': int((x.get('perSeconds') if isinstance(x, dict) else 0) or 0),
'enabled': bool((x.get('enabled') if isinstance(x, dict) else True))
})
except Exception:
continue
else:
clean_accs = []
cache = {
'default': {
'enabled': bool(d.get('enabled') or False),
'maxEmails': int(d.get('maxEmails') or 0),
'perSeconds': int(d.get('perSeconds') or 0)
},
'accounts': clean_accs
}
_write_limits_cache(cache)
# Apply defaults immediately in-memory
try:
global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
DEFAULT_MAX_EMAILS = cache['default']['maxEmails']
DEFAULT_PER_SECONDS = cache['default']['perSeconds']
DEFAULT_ENABLED = cache['default']['enabled']
except Exception:
pass
result_msg = 'limits updated'
print("[agent] update_limits_cache: limits updated")
elif ctype == 'reconcile_freeze':
try:
# Server-initiated reconcile: prefer the server state to avoid re-adding a user just un-frozen from the app
reconcile_freeze_file_with_server(sid, prefer_server=True)
result_msg = 'reconciled (server-wins)'
except Exception as e:
result_msg = f'reconcile failed: {e}'
print(f"[agent] reconcile_freeze: {result_msg}")
elif ctype == 'self_update':
# Perform self-update in a detached shell so we can ack immediately
try:
print("[agent] self_update: scheduling self-update...")
update_sh = f"""#!/usr/bin/env bash
set -euo pipefail
APP_DIR=/opt/osm
TMP_ZIP=$(mktemp)
echo "[agent] Starting self-update from {BASE_URL}"
curl -fsSL "{BASE_URL}/osm/agent/download" -o "$TMP_ZIP" || {{ echo "Download failed"; exit 1; }}
echo "[agent] Downloaded agent package"
mkdir -p "$APP_DIR"
unzip -o "$TMP_ZIP" -d "$APP_DIR" || {{ echo "Unzip failed"; exit 1; }}
rm -f "$TMP_ZIP"
echo "[agent] Extracted to $APP_DIR"
# Set up Python environment
if [ ! -x "$APP_DIR/venv/bin/python" ]; then
echo "[agent] Creating Python virtual environment"
if [ -x "/opt/osm/py310/bin/python3.10" ]; then PY_BIN="/opt/osm/py310/bin/python3.10";
elif command -v python3.10 >/dev/null 2>&1; then PY_BIN=$(command -v python3.10);
elif [ -x /usr/bin/python3.10 ]; then PY_BIN=/usr/bin/python3.10;
elif [ -x /usr/local/bin/python3.10 ]; then PY_BIN=/usr/local/bin/python3.10;
else PY_BIN=$(command -v python3 || echo /usr/bin/python3); fi
"$PY_BIN" -m venv "$APP_DIR/venv" || {{ echo "Virtual environment creation failed"; exit 1; }}
fi
echo "[agent] Updating Python packages"
"$APP_DIR/venv/bin/pip" install --upgrade pip || true
"$APP_DIR/venv/bin/pip" install --upgrade requests psutil || echo "Package update warning"
echo "[agent] Setting permissions"
chmod +x "$APP_DIR/uninstall-agent.sh" 2>/dev/null || true
chmod +x "$APP_DIR/osm" 2>/dev/null || true
ln -sf "$APP_DIR/osm" /usr/local/bin/osm 2>/dev/null || true
echo "[agent] Restarting agent service"
systemctl restart osm 2>/dev/null || {{
echo "[agent] Systemd restart failed, trying direct restart"
pkill -f "python.*main.py" 2>/dev/null || true
sleep 2
cd "$APP_DIR" && nohup "$APP_DIR/venv/bin/python" main.py >/dev/null 2>&1 &
}}
echo "[agent] Self-update completed successfully"
"""
# Write and run
upd = Path('/tmp/osm_self_update.sh')
upd.write_text(update_sh)
os.chmod(str(upd), 0o755)
# Run in background with proper logging
subprocess.Popen(['nohup', 'bash', str(upd), '&>', '/tmp/osm_update.log', '&'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
result_msg = 'update scheduled'
print(f"[agent] Self-update scheduled, check /tmp/osm_update.log for details")
except Exception as e:
result_msg = f'update failed: {e}'
print(f"[agent] Self-update error: {e}")
elif ctype == 'uninstall':
# Execute uninstall script
try:
uninstall_script = Path(AGENT_DIR) / 'uninstall-agent.sh'
if uninstall_script.exists():
# Make sure it's executable
os.chmod(str(uninstall_script), 0o755)
# Run it in background so we can ack first
subprocess.Popen(['nohup', 'bash', str(uninstall_script)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
result_msg = 'uninstall started'
else:
result_msg = 'uninstall script not found'
except Exception as e:
result_msg = f'uninstall failed: {e}'
elif ctype == 'get_freeze_reports':
# On-demand fetch of local freeze reports
try:
request_id = payload.get('requestId') or ''
sender_filter = (payload.get('senderEmail') or '').strip().lower()
account_filter = (payload.get('account') or '').strip().lower()
reports = []
if REPORTS_DIR.exists():
for path in sorted(REPORTS_DIR.glob('*.txt'), key=lambda p: p.stat().st_mtime, reverse=True):
parsed = _parse_report_file(path)
if not parsed:
continue
# Apply filters if provided
sender = (parsed.get('senderEmail') or '').strip().lower()
account = (parsed.get('account') or '').strip().lower()
if sender_filter and sender_filter not in sender:
continue
if account_filter and account_filter not in account:
continue
reports.append(parsed)
response_payload = {
"serverId": str(sid),
"requestId": request_id,
"reports": reports,
}
session.post(
f"{BASE_URL}/api/osm/agent/freeze-reports-response",
json=response_payload,
headers=DEFAULT_HEADERS,
timeout=30
)
result_msg = f"freeze reports sent ({len(reports)} items)"
print(f"[agent] get_freeze_reports: sent {len(reports)} reports")
except Exception as e:
result_msg = f"get freeze reports failed: {e}"
print(f"[agent] get_freeze_reports failed: {e}")
if cid:
try:
session.post(f"{BASE_URL}/api/osm/agent/command-ack", json={
"serverId": str(sid),
"commandId": cid,
"resultMessage": result_msg
}, headers=DEFAULT_HEADERS, timeout=10)
print(f"[agent] Acked command id={cid} result='{result_msg}'")
# Queue snapshots no longer sent automatically - fetched live on-demand via get_live_queue
except Exception:
pass
except Exception as e:
print(f"[agent] Error processing SSE event: {e}")
# SSE connection configuration
SSE_CONNECT_TIMEOUT_SECS = 30
SSE_READ_TIMEOUT_SECS = 90 # Server sends heartbeat every 15s, so 90s allows 6 missed heartbeats
SSE_STALE_THRESHOLD_SECS = 120 # Reconnect if no data received for this long
SSE_INITIAL_BACKOFF_SECS = 5
SSE_MAX_BACKOFF_SECS = 60
def listen_for_commands_sse(sid: int):
"""Listen for real-time commands via Server-Sent Events.
Parses SSE frames and only processes 'command' events.
Implements robust reconnection with health monitoring.
"""
print(f"[agent] Starting SSE command listener for server {sid}")
consecutive_failures = 0
last_event_time = time.time()
while True:
try:
url = f"{BASE_URL}/api/osm/agent/queue-stream/{sid}"
sse_headers = dict(DEFAULT_HEADERS)
sse_headers.setdefault('Accept', 'text/event-stream')
# Use a read timeout to detect dead connections
response = session.get(url, headers=sse_headers, stream=True,
timeout=(SSE_CONNECT_TIMEOUT_SECS, SSE_READ_TIMEOUT_SECS))
response.raise_for_status()
print(f"[agent] Connected to SSE stream at {url}")
consecutive_failures = 0 # Reset on successful connect
last_event_time = time.time()
event_name = None
data_lines = []
for raw in response.iter_lines(decode_unicode=True):
if raw is None:
# Check if connection seems stale (no data for too long)
if time.time() - last_event_time > SSE_STALE_THRESHOLD_SECS:
print("[agent] SSE stream appears stale, reconnecting...")
break
continue
last_event_time = time.time() # Update on any data received
line = raw.rstrip('\r')
if not line:
# dispatch accumulated event
if data_lines:
data_payload = '\n'.join(data_lines)
if (event_name or '').strip().lower() in ('', 'command'):
try:
process_sse_event(data_payload, sid)
except Exception as e:
print(f"[agent] SSE event dispatch error: {e}")
event_name = None
data_lines = []
continue
# field parsing
if line.startswith('event:'):
event_name = line[6:].strip()
elif line.startswith('data:'):
# tolerate optional leading space
val = line[5:]
if val.startswith(' '):
val = val[1:]
data_lines.append(val)
else:
# ignore other fields (id:, retry:, etc)
pass
except requests.exceptions.Timeout:
print("[agent] SSE read timeout - server may be unresponsive, reconnecting...")
consecutive_failures += 1
except requests.exceptions.ConnectionError as e:
print(f"[agent] SSE connection error: {e}")
consecutive_failures += 1
except Exception as e:
print(f"[agent] SSE error: {e}")
consecutive_failures += 1
# Exponential backoff: 5s, 10s, 20s, 40s, max 60s
backoff = min(SSE_INITIAL_BACKOFF_SECS * (2 ** min(consecutive_failures, 4)), SSE_MAX_BACKOFF_SECS)
print(f"[agent] SSE reconnecting in {backoff}s (failures: {consecutive_failures})")
time.sleep(backoff)
SSE_LISTENER_THREAD = None
def start_sse_listener_thread(sid: int):
"""Start the SSE listener thread for real-time command delivery."""
global SSE_LISTENER_THREAD
if SSE_LISTENER_THREAD and SSE_LISTENER_THREAD.is_alive():
return
SSE_LISTENER_THREAD = threading.Thread(target=listen_for_commands_sse, args=(sid,), daemon=True)
SSE_LISTENER_THREAD.start()
print(f"[agent] SSE listener thread started for server {sid}")
# ---------- CLI Helpers ----------
def _cli_build_empty_stats():
return {
"totalMessages": 0,
"activeMessages": 0,
"deferredMessages": 0,
"holdMessages": 0,
"incomingMessages": 0,
"outgoingMessages": 0,
"queueDetails": [],
"spamScores": [],
"blockedDomains": []
}
def _cli_load_queue_matches(sender_email: str | None = None, account: str | None = None):
stats = get_exim_queue_stats(_cli_build_empty_stats())
matches = []
target_email = (sender_email or '').strip().lower()
target_account = (account or '').strip().lower()
for detail in stats.get("queueDetails", []):
sender = (detail.get("sender") or "").lower()
recipients = [r.lower() for r in detail.get("recipients") or []]
owner = (detail.get("ownerAccount") or "").lower()
if target_email:
if target_email == sender or any(target_email == r for r in recipients):
matches.append(detail)
elif target_account:
if owner == target_account or target_account in sender:
matches.append(detail)
return matches
def _cli_print_queue_summary(matches):
if not matches:
print("No queued messages found.")
return
print(f"Found {len(matches)} queued message(s):")
for item in matches:
qid = item.get("queueId") or item.get("messageId")
subject = item.get("subject") or ""
sender = item.get("sender") or ""
recipient = ", ".join(item.get("recipients") or [])
print(f" - {qid}: from {sender} to {recipient} | {subject}")
def _cli_handle_queue_action(matches, choice):
qids = [item.get("queueId") or item.get("messageId") for item in matches if item.get("queueId") or item.get("messageId")]
qids = [qid for qid in qids if qid]
if not qids:
return None, []
if choice == "release":
released = _execute_queue_action_release(qids)
print(f"Released {released}/{len(qids)} message(s) from queue.")
elif choice == "delete":
deleted = _execute_queue_action_delete(qids)
print(f"Deleted {deleted}/{len(qids)} message(s) from queue.")
return choice if choice in {"release", "delete"} else None, qids
def _cli_command_freeze(args):
target_email = args.email
target_account = args.account
if not target_email and not target_account:
print("Specify --email or --account to freeze.")
return 1
reason = args.reason or "CLI freeze"
resolved_account = target_account or ""
sender_for_record = target_email or target_account or ""
if target_email:
add_user_to_hold_list("", target_email, reason=reason)
add_sender_to_freeze_list(target_email, reason=reason)
print(f"Frozen email address: {target_email}")
if target_account:
add_user_to_hold_list(target_account, reason=reason)
print(f"Frozen account: {target_account}")
_queue_action({
"type": "freeze",
"senderEmail": sender_for_record,
"account": resolved_account,
"reason": reason,
"timestamp": time.time(),
})
return 0
def _cli_command_unfreeze(args):
target_email = args.email
target_account = args.account
if not target_email and not target_account:
print("Specify --email or --account to unfreeze.")
return 1
matches = _cli_load_queue_matches(sender_email=target_email, account=target_account)
_cli_print_queue_summary(matches)
queue_choice = None
if matches:
if args.release and args.delete:
print("Specify only one of --release or --delete.")
return 1
if args.release:
queue_choice = "release"
elif args.delete:
queue_choice = "delete"
else:
print("Queue action options:")
print(" 1) Unfreeze only")
print(" 2) Unfreeze and release queued emails")
print(" 3) Unfreeze and delete queued emails")
selected = input("Select option [1/2/3]: ").strip()
queue_choice = {"1": None, "2": "release", "3": "delete"}.get(selected, None)
queue_ids = []
if target_email:
remove_user_from_hold_list(sender_email=target_email)
remove_sender_from_freeze_list(target_email)
print(f"Unfrozen email address: {target_email}")
if target_account:
remove_user_from_hold_list(username=target_account)
print(f"Unfrozen account: {target_account}")
if queue_choice:
queue_choice, queue_ids = _cli_handle_queue_action(matches, queue_choice)
resolved_account = target_account or ""
sender_for_record = target_email or target_account or ""
_queue_action({
"type": "unfreeze",
"senderEmail": sender_for_record,
"account": resolved_account,
"queueAction": queue_choice or "none",
"queueIds": queue_ids,
"timestamp": time.time(),
})
return 0
def _cli_command_limit_set(args):
account = args.account.strip()
max_emails = int(args.max)
per_seconds = int(args.per)
enabled = not args.disable
set_account_limit_local(account, max_emails, per_seconds, enabled)
_queue_action({
"type": "set_limit",
"account": account,
"maxEmails": max_emails,
"perSeconds": per_seconds,
"enabled": enabled,
"timestamp": time.time(),
})
print(f"Set limit for account '{account}': {max_emails} emails / {per_seconds}s (enabled={enabled})")
return 0
def _cli_command_limit_remove(args):
account = args.account.strip()
remove_account_limit_local(account)
_queue_action({
"type": "remove_limit",
"account": account,
"timestamp": time.time(),
})
print(f"Removed limit for account '{account}'.")
return 0
def _cli_command_limit_default(args):
max_emails = int(args.max)
per_seconds = int(args.per)
enabled = not args.disable
set_default_limit_local(max_emails, per_seconds, enabled)
_queue_action({
"type": "set_default_limit",
"maxEmails": max_emails,
"perSeconds": per_seconds,
"enabled": enabled,
"timestamp": time.time(),
})
state = "enabled" if enabled else "disabled"
print(f"Default limit {state}: {max_emails} emails / {per_seconds}s")
return 0
def _cli_ensure_enrolled():
sid = _get_current_server_id()
if not sid:
print("Warning: agent server ID not known yet. CLI changes will apply locally and sync once enrollment succeeds.")
def _cli_build_parser():
parser = argparse.ArgumentParser(prog="osm", description="OSM Agent CLI")
subparsers = parser.add_subparsers(dest="command")
freeze_parser = subparsers.add_parser("freeze", help="Freeze an email address or account")
freeze_target = freeze_parser.add_mutually_exclusive_group(required=True)
freeze_target.add_argument("--email", help="Email address to freeze")
freeze_target.add_argument("--account", help="Hosting account to freeze")
freeze_parser.add_argument("--reason", help="Reason for freeze")
freeze_parser.set_defaults(func=_cli_command_freeze)
unfreeze_parser = subparsers.add_parser("unfreeze", help="Unfreeze an email address or account")
unfreeze_target = unfreeze_parser.add_mutually_exclusive_group(required=True)
unfreeze_target.add_argument("--email", help="Email address to unfreeze")
unfreeze_target.add_argument("--account", help="Hosting account to unfreeze")
unfreeze_parser.add_argument("--release", action="store_true", help="Release queued emails after unfreeze")
unfreeze_parser.add_argument("--delete", action="store_true", help="Delete queued emails after unfreeze")
unfreeze_parser.set_defaults(func=_cli_command_unfreeze)
limit_parser = subparsers.add_parser("limit", help="Manage per-account or default limits")
limit_sub = limit_parser.add_subparsers(dest="limit_command")
limit_set = limit_sub.add_parser("set", help="Set/update limit for an account")
limit_set.add_argument("--account", required=True, help="Hosting account name")
limit_set.add_argument("--max", required=True, type=int, help="Max emails in window")
limit_set.add_argument("--per", required=True, type=int, help="Time window in seconds")
limit_set.add_argument("--disable", action="store_true", help="Disable (but retain values)")
limit_set.set_defaults(func=_cli_command_limit_set)
limit_remove = limit_sub.add_parser("remove", help="Remove limit for an account")
limit_remove.add_argument("--account", required=True, help="Hosting account name")
limit_remove.set_defaults(func=_cli_command_limit_remove)
limit_default = limit_sub.add_parser("default", help="Set default (server-wide) limit")
limit_default.add_argument("--max", required=True, type=int, help="Max emails in window")
limit_default.add_argument("--per", required=True, type=int, help="Time window in seconds")
limit_default.add_argument("--disable", action="store_true", help="Disable the default limit")
limit_default.set_defaults(func=_cli_command_limit_default)
return parser
def run_cli(argv=None):
_cli_ensure_enrolled()
parser = _cli_build_parser()
args = parser.parse_args(argv)
if not hasattr(args, "func"):
parser.print_help()
return 1
return args.func(args)
def main():
"""Main agent loop"""
global CURRENT_THRESHOLDS
print("[agent] Starting OSM agent...")
try:
sid = None
if BASE_URL and TOKEN:
try:
sid = enroll()
try:
global CURRENT_SERVER_ID
CURRENT_SERVER_ID = sid
except Exception:
pass
try:
_save_agent_state({"serverId": str(sid)})
except Exception:
pass
print(f"[agent] Enrolled with server ID: {sid}")
# Report agent version to server
try:
session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
except Exception as e:
print(f"[agent] Failed to report version: {e}")
# Start SSE listener for real-time commands
start_sse_listener_thread(sid)
except Exception as e:
print(f"[agent] Enrollment failed, continuing offline: {e}")
try:
cached_sid = _get_current_server_id()
if cached_sid:
sid = cached_sid
CURRENT_SERVER_ID = cached_sid
# Start SSE listener with cached server ID
start_sse_listener_thread(sid)
except Exception:
pass
else:
try:
cached_sid = _get_current_server_id()
if cached_sid:
sid = cached_sid
CURRENT_SERVER_ID = cached_sid
# Start SSE listener with cached server ID
start_sse_listener_thread(sid)
except Exception:
pass
# Load config (online or cached)
if sid is not None:
config = fetch_config(sid)
else:
d = _read_limits_cache().get("default", {})
config = {
"mailQueuePath": "/var/spool/postfix",
"checkInterval": 30,
"spamThreshold": 5.0,
"blockedDomainsJson": "[]",
"alertThresholdsJson": "{}",
"defaultMaxEmails": int(d.get('maxEmails') or 0),
"defaultPerSeconds": int(d.get('perSeconds') or 0),
"defaultEnabled": bool(d.get('enabled') or False)
}
print(f"[agent] Using config: {config}")
# Apply default server-wide limit settings from config
def apply_defaults_from_config(cfg: dict):
try:
global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
DEFAULT_MAX_EMAILS = int(cfg.get('defaultMaxEmails') or 0)
DEFAULT_PER_SECONDS = int(cfg.get('defaultPerSeconds') or 0)
DEFAULT_ENABLED = bool(cfg.get('defaultEnabled') or False)
print(f"[agent] Default limit: enabled={DEFAULT_ENABLED} max={DEFAULT_MAX_EMAILS} per={DEFAULT_PER_SECONDS}s")
except Exception as e:
print(f"[agent] Failed to apply default limit from config: {e}")
apply_defaults_from_config(config)
# Send accounts snapshot immediately on startup
if sid is not None:
try:
send_accounts_snapshot(sid)
except Exception as e:
print(f"[agent] Initial accounts snapshot failed: {e}")
try:
# Prime local limits cache so log parsing can enforce account limits immediately
fetch_account_limits(sid)
except Exception as e:
print(f"[agent] Initial account limits fetch failed: {e}")
# Initialize thresholds and start near-real-time Exim monitor
initial_thresholds = parse_thresholds(config)
try:
global CURRENT_THRESHOLDS
with THRESHOLD_LOCK:
CURRENT_THRESHOLDS = list(initial_thresholds)
except Exception:
pass
try:
process_exim_logs(initial_thresholds, sid)
write_tracking_db(initial_thresholds)
except Exception as e:
print(f"[agent] Initial Exim scan failed: {e}")
_ensure_exim_monitor_thread()
loop_count = 0
while True:
try:
loop_count += 1
print(f"[agent] Starting loop iteration {loop_count}")
# Periodically confirm version and liveness
if sid is not None and loop_count % 5 == 0:
try:
session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
except Exception as e:
print(f"[agent] Version hello failed: {e}")
_process_pending_actions()
# Keep freeze state aligned after pushing any queued actions
if sid is not None:
reconcile_freeze_file_with_server(sid)
# Periodically refresh config so default limits can change without restart
if sid is not None and (loop_count % 3) == 0:
try:
config = fetch_config(sid)
apply_defaults_from_config(config)
new_thresholds = parse_thresholds(config)
try:
with THRESHOLD_LOCK:
CURRENT_THRESHOLDS = list(new_thresholds)
except Exception:
pass
except Exception as e:
print(f"[agent] Config refresh failed: {e}")
stats = None
if QUEUE_INGEST_ENABLED and config.get("queueIngestEnabled", True):
stats = get_mail_queue_stats(config["mailQueuePath"])
print(f"[agent] Got stats: {stats['totalMessages']} total messages")
# Queue snapshots no longer sent automatically - fetched live on-demand via get_live_queue
enforce_account_limits(sid, stats)
else:
print("[agent] Queue ingestion disabled; skipping queue stats collection")
# Check for spam alerts
if sid is not None and stats is not None:
check_spam_thresholds(sid, stats, config)
if stats is not None:
print(f"[agent] Queue stats: {stats['totalMessages']} total, "
f"{stats['activeMessages']} active, "
f"{stats['deferredMessages']} deferred, "
f"{len(stats['spamScores'])} spam scores")
else:
print("[agent] Queue stats unavailable (ingest disabled)")
# Sleep for configured interval
print(f"[agent] Sleeping for {config.get('checkInterval', 30)} seconds...")
time.sleep(config.get("checkInterval", 30))
except Exception as e:
print(f"[agent] Error in main loop: {e}")
time.sleep(30)
except Exception as e:
print(f"[agent] Fatal error: {e}")
time.sleep(30)
return
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] not in {"agent", "--agent"}:
sys.exit(run_cli(sys.argv[1:]))
main()