File: //proc/thread-self/root/opt/osm/main.py
#!/usr/bin/env python3
import argparse
import json
import os
import time
import subprocess
import re
import requests
from requests.adapters import HTTPAdapter
try:
# Retry support for transient 5xx via urllib3
from urllib3.util.retry import Retry # type: ignore
except Exception:
Retry = None
import sys
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict, deque
import shutil
import threading
import stat
from typing import Optional
def _load_env_file():
env_path = Path(__file__).resolve().parent / ".env"
if not env_path.exists():
return
try:
for line in env_path.read_text(encoding='utf-8').splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip("'\"")
if key and key not in os.environ:
os.environ[key] = value
except Exception:
pass
_load_env_file()
BASE_URL = os.environ.get("SERVER_BASE_URL")
TOKEN = os.environ.get("AGENT_TOKEN")
API_KEY = os.environ.get("OSM_API_KEY", "")
if not BASE_URL or not TOKEN:
print("[agent] Warning: SERVER_BASE_URL or AGENT_TOKEN not set. Running in offline mode.")
session = requests.Session()
# Configure light retries for transient gateway errors (e.g., tunneling hiccups)
try:
if Retry is not None:
retry = Retry(
total=2,
connect=1,
read=1,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
allowed_methods=frozenset(["GET", "POST"]),
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
except Exception:
pass
DEFAULT_HEADERS = {"X-Api-Key": API_KEY} if API_KEY else {}
def _env_flag(name, default=False):
val = os.environ.get(name)
if val is None:
return default
try:
return val.strip().lower() in {"1", "true", "yes", "on"}
except Exception:
return default
REQUEST_DEBUG_ENABLED = _env_flag("OSM_DEBUG_REQUESTS", False)
def _mask_headers(headers):
if not headers:
return {}
masked = {}
for key, value in headers.items():
if key.lower() == "x-api-key":
masked[key] = "***"
else:
masked[key] = value
return masked
def _truncate_payload(payload, limit=500):
if payload is None:
return None
try:
text = json.dumps(payload, default=str)
except Exception:
text = str(payload)
if len(text) > limit:
return text[:limit] + "...(truncated)"
return text
_original_request = session.request
def _logged_request(method, url, **kwargs):
method_upper = method.upper()
params_preview = _truncate_payload(kwargs.get("params"))
json_preview = _truncate_payload(kwargs.get("json"))
data_preview = _truncate_payload(kwargs.get("data"))
headers_preview = _mask_headers(kwargs.get("headers") or {})
timeout_value = kwargs.get("timeout")
if REQUEST_DEBUG_ENABLED:
try:
print(f"[agent] -> {method_upper} {url} params={params_preview} json={json_preview} data={data_preview} headers={headers_preview} timeout={timeout_value}")
except Exception as log_err:
print(f"[agent] Failed to log request pre-flight: {log_err}")
try:
response = _original_request(method, url, **kwargs)
except Exception as exc:
if REQUEST_DEBUG_ENABLED:
print(f"[agent] <- {method_upper} {url} raised {exc}")
else:
print(f"[agent] HTTP {method_upper} {url} failed: {exc}")
raise
if REQUEST_DEBUG_ENABLED:
try:
body_preview = response.text
if body_preview and len(body_preview) > 500:
body_preview = body_preview[:500] + "...(truncated)"
print(f"[agent] <- {method_upper} {url} status={response.status_code} body={body_preview}")
except Exception as log_err:
print(f"[agent] Failed to log response: {log_err}")
return response
session.request = _logged_request
# ---------- OSM-like tracking DB and Exim log processing (local on agent) ----------
# Global default limit (from server config)
DEFAULT_MAX_EMAILS: int = 0
DEFAULT_PER_SECONDS: int = 0
DEFAULT_ENABLED: bool = False
QUEUE_INGEST_ENABLED: bool = True
FREEZE_REPORTS_ENABLED: bool = True
IPDB_LOOKUP_ENABLED: bool = True
# Paths
AGENT_DIR = Path(__file__).resolve().parent
TRACKING_DB_PATH = AGENT_DIR / 'osm_tracking.db'
EXIM_LOG_STATE_PATH = AGENT_DIR / 'exim_log.state'
LIMITS_CACHE_PATH = AGENT_DIR / 'user_limits.json'
ACTIONS_QUEUE_PATH = AGENT_DIR / 'pending_actions.json'
AGENT_STATE_PATH = AGENT_DIR / 'agent_state.json'
# OSM-style integration files (as used by OSM on cPanel/DirectAdmin servers)
VIRTUAL_DIR = Path('/etc/virtual')
OSM_HOLD_FILE = VIRTUAL_DIR / 'osm_hold' # usernames to freeze/hold
OSM_DISABLE_FILE = VIRTUAL_DIR / 'osm_disable' # usernames to discard
# Legacy/alternate freeze list for generic exim setups (optional)
FREEZE_LIST_PATH = Path('/etc/exim/osm_freeze_senders')
# In-memory counters: key -> deque[timestamps]
# key is a tuple: (account_ref: str, trigger_name: str, window_seconds: int)
LOG_COUNTERS = defaultdict(deque)
LAST_TRIGGERED = {}
ACCOUNT_LIMIT_WINDOWS = defaultdict(deque) # account -> deque[timestamps]
# Last seen subject per account_ref
LAST_SUBJECT_BY_ACCOUNT = {}
# Deduplication of log-based events by message-id
SEEN_LOG_MSG_IDS = deque()
SEEN_LOG_MSG_IDS_SET = set()
SEEN_LOG_MSG_IDS_RETENTION_SECS = 600 # 10 minutes
# Deduplication of per-recipient deliveries by (msgid, recipient)
SEEN_MSG_RCPT = deque()
SEEN_MSG_RCPT_SET = set()
SEEN_MSG_RCPT_RETENTION_SECS = 600
# Retain counters in DB for some time after last event
LAST_COUNTER_SEEN_TS = {}
COUNTER_RETENTION_SECS = 7200
# Per-message caches
MSG_ID_TO_ACCOUNT = {}
MSG_ID_TO_SUBJECT = {}
# Recent recipients per account_ref with last-seen timestamps
RECIPIENTS_BY_ACCOUNT = {}
RECIPIENT_RETENTION_SECS = 7200
# Domain -> owner cache (cpanel/directadmin)
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD = 0.0
DOMAIN_OWNER_TTL_SECS = 300
# Per-message metadata
MSG_ID_TO_CLIENT = {}
MSG_ID_TO_RELAY = {}
MSG_ID_TO_OWNER = {}
MSG_ID_TO_RCPTS = {}
MSG_ID_TO_RELAY_TYPE = {}
# Per-owner recent events (delivery granularity)
ACCOUNT_EVENTS = defaultdict(deque)
ACCOUNT_EVENTS_RETENTION_SECS = 7200
MAX_LOG_LOOKBACK_SECS = 3 * 3600 # Ignore historical log entries older than 3 hours
REPORTS_DIR = Path('/opt/osm/reports')
PENDING_REPORTS_DIR = REPORTS_DIR / 'pending'
LAST_LOCAL_HOLD_UPDATE_TS = 0.0
CURRENT_SERVER_ID = None
THRESHOLD_LOCK = threading.Lock()
CURRENT_THRESHOLDS = []
EXIM_MONITOR_THREAD = None
EXIM_MONITOR_INTERVAL_SECS = 2.0
LAST_REPORT_SYNC_TS = 0.0
try:
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
except Exception:
pass
def parse_thresholds(config: dict):
"""Return list of threshold dicts: [{events:int, seconds:int, name:str, type:str}]."""
try:
default_events = int(config.get('defaultMaxEmails') or 50)
except Exception:
default_events = 50
try:
default_seconds = int(config.get('defaultPerSeconds') or 300)
except Exception:
default_seconds = 300
if default_events <= 0:
default_events = 50
if default_seconds <= 0:
default_seconds = 300
default_thresholds = [
{"events": default_events, "seconds": default_seconds, "name": "default_logline", "type": "logline"}
]
try:
raw = config.get("alertThresholdsJson")
if not raw:
return default_thresholds
data = raw
if isinstance(raw, str):
try:
data = json.loads(raw)
except Exception:
return default_thresholds
# Accept either {"thresholds": [...]} or {"loglineThresholds": [...]} or a list directly
if isinstance(data, dict):
thr = data.get("thresholds") or data.get("loglineThresholds")
elif isinstance(data, list):
thr = data
else:
thr = None
if not thr:
return default_thresholds
out = []
for i, t in enumerate(thr, start=1):
try:
events = int(t.get("events"))
seconds = int(t.get("seconds"))
name = (t.get("name") or f"trigger{i}")
typ = (t.get("type") or "logline")
if events > 0 and seconds > 0:
out.append({"events": events, "seconds": seconds, "name": name, "type": typ})
except Exception:
continue
return out or default_thresholds
except Exception:
return default_thresholds
def _load_log_state(log_path: Path):
try:
if not EXIM_LOG_STATE_PATH.exists():
return {"path": str(log_path), "ino": None, "pos": 0}
data = json.loads(EXIM_LOG_STATE_PATH.read_text())
if data.get("path") != str(log_path):
return {"path": str(log_path), "ino": None, "pos": 0}
return {"path": str(log_path), "ino": data.get("ino"), "pos": int(data.get("pos", 0))}
except Exception:
return {"path": str(log_path), "ino": None, "pos": 0}
def _save_log_state(state: dict):
try:
EXIM_LOG_STATE_PATH.write_text(json.dumps(state))
except Exception as e:
print(f"[agent] Error saving log state: {e}")
def _snapshot_thresholds():
with THRESHOLD_LOCK:
if CURRENT_THRESHOLDS:
return [dict(t) for t in CURRENT_THRESHOLDS]
return parse_thresholds({})
def _exim_monitor_loop():
print(f"[agent] Exim monitor thread started (interval={EXIM_MONITOR_INTERVAL_SECS}s)")
while True:
try:
thresholds = _snapshot_thresholds()
process_exim_logs(thresholds, CURRENT_SERVER_ID)
write_tracking_db(thresholds)
except Exception as e:
print(f"[agent] Exim monitor error: {e}")
time.sleep(EXIM_MONITOR_INTERVAL_SECS)
def _ensure_exim_monitor_thread():
global EXIM_MONITOR_THREAD
if EXIM_MONITOR_THREAD and EXIM_MONITOR_THREAD.is_alive():
return
EXIM_MONITOR_THREAD = threading.Thread(target=_exim_monitor_loop, daemon=True)
EXIM_MONITOR_THREAD.start()
def _parse_exim_timestamp(line: str) -> float:
"""Parse leading timestamp from an Exim log line; fallback to current time."""
try:
ts_str = line[:19]
if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", ts_str):
try:
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
return time.mktime(dt.timetuple())
except Exception:
pass
except Exception:
pass
return time.time()
# --- Inserted helpers for enrollment, offline cache, and owner resolution ---
def enroll():
"""Enroll this agent with the server and return serverId."""
resp = session.post(f"{BASE_URL}/api/osm/agent/enroll", json={"token": TOKEN}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
return resp.json()["serverId"]
def _read_limits_cache() -> dict:
try:
if LIMITS_CACHE_PATH.exists():
return json.loads(LIMITS_CACHE_PATH.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Error reading limits cache: {e}")
return {"default": {"enabled": False, "maxEmails": 0, "perSeconds": 0}, "accounts": []}
def _write_limits_cache(cache: dict):
try:
LIMITS_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
LIMITS_CACHE_PATH.write_text(json.dumps(cache, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[agent] Error writing limits cache: {e}")
def _load_agent_state() -> dict:
try:
if AGENT_STATE_PATH.exists():
return json.loads(AGENT_STATE_PATH.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Error reading agent state: {e}")
return {}
def _save_agent_state(state: dict):
try:
AGENT_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
AGENT_STATE_PATH.write_text(json.dumps(state, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[agent] Error writing agent state: {e}")
def _get_current_server_id() -> str | None:
try:
if CURRENT_SERVER_ID is not None:
return str(CURRENT_SERVER_ID)
except Exception:
pass
state = _load_agent_state()
sid = state.get("serverId")
return str(sid) if sid else None
def _read_actions_queue() -> list:
try:
if ACTIONS_QUEUE_PATH.exists():
return json.loads(ACTIONS_QUEUE_PATH.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Error reading actions queue: {e}")
return []
def _write_actions_queue(actions: list):
try:
ACTIONS_QUEUE_PATH.parent.mkdir(parents=True, exist_ok=True)
ACTIONS_QUEUE_PATH.write_text(json.dumps(actions, ensure_ascii=False, indent=2))
except Exception as e:
print(f"[agent] Error writing actions queue: {e}")
def _queue_action(action: dict):
try:
actions = _read_actions_queue()
action = dict(action)
action.setdefault("queuedAt", time.time())
actions.append(action)
_write_actions_queue(actions)
except Exception as e:
print(f"[agent] Failed to queue action: {e}")
def set_account_limit_local(account: str, max_emails: int, per_seconds: int, enabled: bool = True):
try:
account = (account or '').strip()
if not account:
return
cache = _read_limits_cache()
accounts = cache.get('accounts') or []
updated = False
for entry in accounts:
if entry.get('account', '').strip().lower() == account.lower():
entry['maxEmails'] = int(max_emails)
entry['perSeconds'] = int(per_seconds)
entry['enabled'] = bool(enabled)
updated = True
break
if not updated:
accounts.append({
"account": account,
"maxEmails": int(max_emails),
"perSeconds": int(per_seconds),
"enabled": bool(enabled)
})
cache['accounts'] = accounts
_write_limits_cache(cache)
except Exception as e:
print(f"[agent] Failed to set local account limit: {e}")
def remove_account_limit_local(account: str):
try:
account = (account or '').strip()
if not account:
return
cache = _read_limits_cache()
accounts = cache.get('accounts') or []
new_accounts = [entry for entry in accounts if entry.get('account', '').strip().lower() != account.lower()]
cache['accounts'] = new_accounts
_write_limits_cache(cache)
except Exception as e:
print(f"[agent] Failed to remove local account limit: {e}")
def set_default_limit_local(max_emails: int, per_seconds: int, enabled: bool = True):
try:
cache = _read_limits_cache()
cache['default'] = {
"enabled": bool(enabled),
"maxEmails": int(max_emails),
"perSeconds": int(per_seconds)
}
_write_limits_cache(cache)
try:
global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
DEFAULT_MAX_EMAILS = int(max_emails)
DEFAULT_PER_SECONDS = int(per_seconds)
DEFAULT_ENABLED = bool(enabled)
except Exception:
pass
except Exception as e:
print(f"[agent] Failed to set local default limit: {e}")
def fetch_config(sid: int):
"""Fetch monitoring configuration from server and persist defaults; fallback to cache offline."""
try:
resp = session.get(f"{BASE_URL}/api/osm/agent/config", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
cfg = resp.json()
cfg.setdefault("ipdbLookupEnabled", True)
cfg.setdefault("queueIngestEnabled", True)
cfg.setdefault("freezeReportsEnabled", True)
cache = _read_limits_cache()
cache["default"] = {
"enabled": bool(cfg.get('defaultEnabled') or False),
"maxEmails": int(cfg.get('defaultMaxEmails') or 0),
"perSeconds": int(cfg.get('defaultPerSeconds') or 0)
}
_write_limits_cache(cache)
try:
global QUEUE_INGEST_ENABLED, FREEZE_REPORTS_ENABLED, IPDB_LOOKUP_ENABLED
QUEUE_INGEST_ENABLED = bool(cfg.get('queueIngestEnabled', True))
FREEZE_REPORTS_ENABLED = bool(cfg.get('freezeReportsEnabled', True))
IPDB_LOOKUP_ENABLED = bool(cfg.get('ipdbLookupEnabled', True))
print(f"[agent] Feature toggles: queue={QUEUE_INGEST_ENABLED} freeze={FREEZE_REPORTS_ENABLED} ipdb={IPDB_LOOKUP_ENABLED}")
except Exception as toggle_err:
print(f"[agent] Failed to apply feature toggles: {toggle_err}")
return cfg
except Exception as e:
print(f"[agent] Error fetching config: {e}")
d = _read_limits_cache().get("default", {})
return {
"mailQueuePath": "/var/spool/postfix",
"checkInterval": 30,
"spamThreshold": 5.0,
"blockedDomainsJson": "[]",
"alertThresholdsJson": "{}",
"defaultMaxEmails": int(d.get('maxEmails') or 0),
"defaultPerSeconds": int(d.get('perSeconds') or 0),
"defaultEnabled": bool(d.get('enabled') or False),
"ipdbLookupEnabled": IPDB_LOOKUP_ENABLED,
"queueIngestEnabled": QUEUE_INGEST_ENABLED,
"freezeReportsEnabled": FREEZE_REPORTS_ENABLED
}
def _extract_email(text):
try:
m = re.search(r"([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+)", text or '')
return m.group(1).lower() if m else None
except Exception:
return None
DOMAIN_OWNER_MAP = {}
DOMAIN_OWNER_LAST_LOAD: float = 0.0
DOMAIN_OWNER_TTL_SECS = 300
def _load_domain_owner_map():
global DOMAIN_OWNER_LAST_LOAD
now = time.time()
if DOMAIN_OWNER_MAP and (now - DOMAIN_OWNER_LAST_LOAD) < DOMAIN_OWNER_TTL_SECS:
return
try:
DOMAIN_OWNER_MAP.clear()
# cPanel: /etc/trueuserdomains format: domain: user
p = Path('/etc/trueuserdomains')
if p.exists():
for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# cPanel alternate mapping file /etc/userdomains
p3 = Path('/etc/userdomains')
if p3.exists():
for line in p3.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# DirectAdmin: /etc/virtual/domainowners
p2 = Path('/etc/virtual/domainowners')
if p2.exists():
for line in p2.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# DirectAdmin global domainowners file
da_owners = Path('/usr/local/directadmin/data/admin/domainowners')
if da_owners.exists():
for line in da_owners.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
dom, user = line.split(':', 1)
dom = (dom or '').strip().lower()
user = (user or '').strip().lower()
if dom and user:
DOMAIN_OWNER_MAP[dom] = user
# DirectAdmin per-user domain lists
da_users_root = Path('/usr/local/directadmin/data/users')
if da_users_root.exists():
for user_dir in da_users_root.iterdir():
if not user_dir.is_dir():
continue
user = user_dir.name.strip().lower()
try:
dom_list = user_dir / 'domains.list'
if dom_list.exists():
for dom in dom_list.read_text(encoding='utf-8', errors='ignore').splitlines():
dom = (dom or '').strip().lower()
if dom:
DOMAIN_OWNER_MAP[dom] = user
except Exception:
continue
DOMAIN_OWNER_LAST_LOAD = now
except Exception as e:
print(f"[agent] Failed loading domain owners: {e}")
def _resolve_owner_from_email(email_addr):
try:
_load_domain_owner_map()
email_addr = (email_addr or '').strip().lower()
if '@' not in email_addr:
return None
local_part, domain = email_addr.split('@', 1)
domain = domain.rstrip('.')
parts = domain.split('.')
for i in range(len(parts)-1):
d = '.'.join(parts[i:])
owner = DOMAIN_OWNER_MAP.get(d)
if owner:
return owner
owner = DOMAIN_OWNER_MAP.get(domain)
if owner:
return owner
# Fallback: inspect virtual mailbox passwd files to infer owner
owner = _lookup_owner_from_virtual_passwd(local_part, domain)
if owner:
DOMAIN_OWNER_MAP[domain] = owner
return owner
return None
except Exception:
return None
def _lookup_owner_from_virtual_passwd(local_part: str, domain: str):
"""Inspect /etc/virtual/<domain>/passwd files to determine owning account."""
try:
local_part = (local_part or '').strip().lower()
domain = (domain or '').strip().lower()
if not local_part or not domain:
return None
domains_to_try = []
if domain:
domains_to_try.append(domain)
parts = domain.split('.')
for i in range(1, len(parts)-1):
parent = '.'.join(parts[i:])
if parent not in domains_to_try:
domains_to_try.append(parent)
for extra in ('lists', 'default'):
if extra not in domains_to_try:
domains_to_try.append(extra)
for dom in domains_to_try:
vpath = Path('/etc/virtual') / dom / 'passwd'
if not vpath.exists():
continue
try:
for line in vpath.read_text(encoding='utf-8', errors='ignore').splitlines():
if not line or line.startswith('#'):
continue
parts = line.split(':')
if not parts:
continue
account_field = parts[0].strip().lower()
if account_field != local_part and account_field != f"{local_part}@{dom}":
continue
# Attempt to derive owner from home/maildir path within the line
for token in parts[1:]:
if '/home/' in token:
m = re.search(r'/home/([^/]+)/', token)
if m:
return m.group(1).strip().lower()
# Sometimes the maildir path is at the end
m = re.search(r'/home/([^/]+)/', line)
if m:
return m.group(1).strip().lower()
except Exception:
continue
except Exception:
pass
return None
def _resolve_limit_account_key(account_ref: str) -> str:
"""Map an account_ref to the hosting account owner key used by limits."""
try:
# Try to extract email first (sender part before '(' or in the string)
email = None
if '(' in account_ref:
email = account_ref.split('(', 1)[0].strip()
if not email:
email = _extract_email(account_ref)
owner = _resolve_owner_from_email(email) if email else None
if owner:
return owner
# Fallback: local-part or username
if '(' in account_ref and account_ref.endswith(')'):
inside = account_ref[account_ref.rfind('(')+1:-1]
if inside:
return inside.lower()
m = re.match(r"([^@]+)@", account_ref)
if m:
return m.group(1).lower()
return account_ref.strip().lower()
except Exception:
return account_ref.strip().lower()
def extract_username_from_account_ref(account_ref: str) -> str:
try:
if '(' in account_ref and account_ref.endswith(')'):
return account_ref[account_ref.rfind('(')+1:-1].strip().lower()
m = re.match(r"([^@]+)@", account_ref)
if m:
return m.group(1).strip().lower()
return account_ref.strip().lower()
except Exception:
return account_ref.strip().lower()
def fetch_freeze_list(sid: int):
try:
resp = session.get(f"{BASE_URL}/api/osm/agent/freeze-list", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
return resp.json() or []
except Exception as e:
print(f"[agent] Error fetching freeze list: {e}")
# Return None to indicate remote unavailable so we do not wipe local hold file
return None
def reconcile_freeze_file_with_server(sid: int, prefer_server: bool = False):
try:
# Grace period after local updates to avoid wiping local holds before server reflects them
# Skip the grace period when server explicitly requests reconciliation (prefer_server=True)
try:
if not prefer_server and time.time() - LAST_LOCAL_HOLD_UPDATE_TS < 30:
return
except Exception:
pass
server_items = fetch_freeze_list(sid)
if server_items is None:
# Server unreachable; do not modify local hold file
return
# Build server set of entries (username or sender email)
server_set = set()
for it in server_items:
if not it.get('active', True):
continue
account = (it.get('account') or '').strip()
sender = (it.get('senderEmail') or '').strip()
if account:
server_set.add(account.lower())
elif sender:
server_set.add(sender.lower())
_ensure_virtual_files()
local_set = set()
try:
for line in OSM_HOLD_FILE.read_text().splitlines():
s = (line or '').strip().lower()
if s:
local_set.add(s)
except Exception:
pass
if prefer_server:
# Server-wins mode (e.g., after an app-initiated unfreeze): adopt server state and do NOT push local-only entries
desired = server_set
if desired != local_set:
with OSM_HOLD_FILE.open('w') as f:
for u in sorted(desired):
f.write(u + "\n")
_reload_exim()
print(f"[agent] Reconciled (server-wins). Server={len(server_set)} local={len(local_set)} applied={len(desired)}")
else:
# Merge mode (agent-wins for offline changes): push local-only entries to server then write union locally
to_push = sorted(list(local_set - server_set))
for val in to_push:
try:
# Try to preserve the original freeze reason if we can infer it
freeze_reason = _find_freeze_reason(val)
payload = {"serverId": str(sid), "reason": freeze_reason}
if '@' in val:
payload["senderEmail"] = val
payload["account"] = ""
else:
payload["senderEmail"] = ""
payload["account"] = val
session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
except Exception:
# Leave in local_set; future loops will retry via pending actions or here
pass
# Compute union as desired hold file content
desired = server_set | local_set
if desired != local_set:
with OSM_HOLD_FILE.open('w') as f:
for u in sorted(desired):
f.write(u + "\n")
_reload_exim()
print(f"[agent] Reconciled (merge). Server={len(server_set)} local={len(local_set)} merged={len(desired)}")
except Exception as e:
print(f"[agent] Reconcile freeze list failed: {e}")
def _find_freeze_reason(identifier: str) -> str:
"""Best-effort recovery of the original freeze reason for a given account/email.
Checks queued actions first, then the most recent local report, else returns a safe default.
"""
try:
ident = (identifier or '').strip().lower()
if not ident:
return 'Manual freeze'
# Look into pending actions (latest first) for a matching freeze entry
try:
actions = _read_actions_queue()
for action in reversed(actions):
if (action.get('type') or '') != 'freeze':
continue
s = (action.get('senderEmail') or '').strip().lower()
a = (action.get('account') or '').strip().lower()
if ident == s or (a and ident == a):
r = (action.get('reason') or '').strip()
if r:
return r
except Exception:
pass
# Search most recent report that matches sender/account
try:
if REPORTS_DIR.exists():
files = sorted(REPORTS_DIR.glob('*.txt'), key=lambda p: p.stat().st_mtime, reverse=True)
for path in files:
parsed = _parse_report_file(path)
if not parsed:
continue
s = (parsed.get('senderEmail') or '').strip().lower()
a = (parsed.get('account') or '').strip().lower()
if ident == s or (a and ident == a):
r = (parsed.get('reason') or '').strip()
if r:
return r
except Exception:
pass
return 'Manual freeze'
except Exception:
return 'Manual freeze'
def get_exim_log_path() -> Path:
"""Return best-known Exim main log path for this system."""
try:
candidates = []
try:
if detect_control_panel() == 'cpanel':
candidates.append(Path('/var/log/exim_mainlog'))
except Exception:
pass
candidates.extend([
Path('/var/log/exim/mainlog'),
Path('/var/log/exim4/mainlog'),
Path('/var/log/exim_mainlog'),
])
for p in candidates:
try:
if p.exists() and p.is_file():
return p
except Exception:
continue
except Exception:
pass
return Path('/var/log/exim/mainlog')
def process_exim_logs(thresholds, sid=None):
"""Incrementally parse Exim mainlog and update LOG_COUNTERS."""
log_path = get_exim_log_path()
try:
if not log_path.exists():
return
st = log_path.stat()
state = _load_log_state(log_path)
# Handle rotation or first run
if state.get("ino") != st.st_ino or state.get("pos", 0) > st.st_size:
state = {"path": str(log_path), "ino": st.st_ino, "pos": 0}
with log_path.open('r', encoding='utf-8', errors='ignore') as f:
f.seek(state.get("pos", 0))
new_data = f.read()
state["pos"] = f.tell()
_save_log_state(state)
now_ts = time.time()
# Parse new lines
for line in new_data.splitlines():
try:
line_ts = _parse_exim_timestamp(line)
if now_ts - line_ts > MAX_LOG_LOOKBACK_SECS:
continue
if line_ts - now_ts > 300:
line_ts = now_ts
# Record subject per account where available
if ' <= ' in line or ' =>' in line:
msgid = _extract_message_id_from_exim_line(line) or ''
subj = _get_subject_for_message_id(msgid)
if subj:
acct = _extract_account_ref_from_exim_line(line)
if acct:
LAST_SUBJECT_BY_ACCOUNT[acct] = subj
_update_counters_from_exim_line(line, thresholds, line_ts)
except Exception:
continue
# Cleanup old dedupe entries
cutoff = time.time() - SEEN_LOG_MSG_IDS_RETENTION_SECS
while SEEN_LOG_MSG_IDS and SEEN_LOG_MSG_IDS[0][1] < cutoff:
mid, _ = SEEN_LOG_MSG_IDS.popleft()
SEEN_LOG_MSG_IDS_SET.discard(mid)
cutoff_rcpt = time.time() - SEEN_MSG_RCPT_RETENTION_SECS
while SEEN_MSG_RCPT and SEEN_MSG_RCPT[0][1] < cutoff_rcpt:
key, _ = SEEN_MSG_RCPT.popleft()
SEEN_MSG_RCPT_SET.discard(key)
except Exception as e:
print(f"[agent] Error processing exim logs: {e}")
def _extract_message_id_from_exim_line(line):
"""Extract the Exim queue id token that appears before '<=' on submission lines."""
try:
# Match the token immediately before '<='
m = re.search(r"\b([A-Za-z0-9-]{6,})\b\s+<=", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _extract_message_id_from_any_line(line):
"""Extract the Exim queue id from general log lines that start with timestamp then id."""
try:
m = re.search(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} ([A-Za-z0-9-]{6,})\b", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _normalize_host(host: str) -> str:
try:
if not host:
return ''
h = host.strip()
if h.startswith('[') and h.endswith(']'):
h = h[1:-1]
return h
except Exception:
return host or ''
def _extract_client_host_ip_from_line(line: str) -> tuple[str, str]:
host = ''
ip = ''
try:
m_parent = re.search(r"\bH=\(([^)]*)\)", line)
if m_parent:
host = (m_parent.group(1) or '').strip()
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
m_after = re.search(r"\)\s*\[([^\]]+)\]", line)
if m_after:
ip = (m_after.group(1) or '').strip()
else:
m_host = re.search(r"\bH=([^\s]+)", line)
if m_host:
host = (m_host.group(1) or '').strip()
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
m_ip = re.search(r"\bH=[^\n]*?\[([^\]]+)\]", line)
if m_ip:
ip = (m_ip.group(1) or '').strip()
bracket_values = re.findall(r"\[([^\]]+)\]", line)
if not host and bracket_values:
host = bracket_values[0].strip()
if not ip and bracket_values:
ip = bracket_values[-1].strip()
return _normalize_host(host), ip
except Exception:
return '', ''
def _resolve_client_details(msg_id: str, client_dict: dict) -> tuple[str, str]:
try:
host = (client_dict or {}).get('h', '') or ''
ip = (client_dict or {}).get('ip', '') or ''
if (not host or host == '-') or (not ip or ip == '-'):
fallback = MSG_ID_TO_CLIENT.get(msg_id) or {}
if not host or host == '-':
host = fallback.get('h', host) or host
if not ip or ip == '-':
ip = fallback.get('ip', ip) or ip
host = _normalize_host(host)
ip = (ip or '').strip()
# strip any port if accidentally present
if ip and ':' in ip:
ip = ip.split(':', 1)[0]
return host, ip
except Exception:
return '', ''
def _client_info_from_headers(headers: str) -> tuple[str, str]:
host = ''
ip = ''
try:
if not headers:
return host, ip
m_ip = re.search(r"-host_address\s+\[([^\]]+)\]", headers)
if m_ip:
ip = (m_ip.group(1) or '').strip()
m_host = re.search(r"--helo_name\s+([^\s]+)", headers)
if m_host:
host = _normalize_host(m_host.group(1))
if not host:
m_recv = re.search(r"Received: from ([^\s]+)", headers)
if m_recv:
host = _normalize_host(m_recv.group(1))
except Exception:
pass
return host, ip
def _fallback_events_from_queue_detail(detail: dict, sender_email: str, owner_key: str):
events = []
try:
recipients = detail.get('recipients') or []
if not recipients:
rec = detail.get('recipient') or ''
if rec:
recipients = [rec]
recipients = [r for r in recipients if r]
subject = detail.get('subject') or ''
headers = detail.get('headers') or detail.get('status') or ''
client_h, client_ip = _client_info_from_headers(headers)
qid = detail.get('queueId') or detail.get('messageId') or ''
ts_now = time.time()
base_sender = sender_email or detail.get('sender') or owner_key
if not recipients:
recipients = ['']
for rcpt in recipients[:50]:
events.append({
'ts': ts_now,
'sender': base_sender,
'recipient': rcpt,
'subject': subject,
'client': {'h': client_h, 'ip': client_ip},
'relay': {},
'relayType': 'unknown',
'qid': qid,
})
except Exception:
pass
return events
def _has_recent_log_events(owner_key: str, sender_email: str, window_secs: int) -> bool:
try:
if not owner_key or not sender_email:
return False
events = ACCOUNT_EVENTS.get(owner_key)
if not events:
return False
cutoff = time.time() - max(window_secs, 0)
sender_lc = sender_email.strip().lower()
for ev in events:
if ev.get('ts', 0) >= cutoff and (ev.get('sender', '') or '').strip().lower() == sender_lc:
return True
return False
except Exception:
return False
def _should_hold_owner(owner_key: str, sender_email: str) -> bool:
try:
if not owner_key:
return False
events = ACCOUNT_EVENTS.get(owner_key)
if not events:
return False
sender_lc = (sender_email or '').strip().lower()
for ev in reversed(events):
ev_sender = (ev.get('sender', '') or '').strip().lower()
if sender_lc and ev_sender != sender_lc:
continue
relay_type = (ev.get('relayType') or '').lower()
if relay_type == 'local':
return True
if sender_lc:
return False
return False
except Exception:
return False
def _collect_events_for_report(owner_key: str, sender_email: str, window_secs: int):
try:
if not owner_key:
return []
evs_src = list(ACCOUNT_EVENTS.get(owner_key, deque()))
if not evs_src:
return []
cutoff_ts = time.time() - max(window_secs, 0)
sender_lc = (sender_email or '').strip().lower()
events = [
ev for ev in evs_src
if ev.get('ts', 0) >= cutoff_ts and (not sender_lc or (ev.get('sender', '') or '').strip().lower() == sender_lc)
]
events.sort(key=lambda e: e.get('ts', 0))
return events
except Exception:
return []
def _build_report_text(owner_key: str, sender_email: str, reason_text: str,
window_secs: int, limit_label: str, events_window: list) -> str:
lines = []
lines.append("Freeze Report\n")
lines.append(f"Reason: {reason_text}\n")
lines.append(f"Owner: {owner_key}\n")
lines.append(f"Sender: {sender_email}\n")
try:
types = [(ev.get('relayType') or 'unknown') for ev in events_window]
relay_summary = 'auth' if 'auth' in types else ('local' if 'local' in types else 'unknown')
lines.append(f"Relay: {relay_summary}\n")
except Exception:
pass
_append_client_summary(lines, events_window)
try:
srcs = []
seen = set()
for ev in events_window:
c = ev.get('client', {}) or {}
ch = c.get('h', '') or ''
cip = c.get('ip', '') or ''
label = f"{ch}[{cip}]" if cip or ch else ''
if label and label not in seen:
seen.add(label)
srcs.append(label)
if len(srcs) >= 5:
break
if srcs:
lines.append(f"Client sources: {', '.join(srcs)}\n")
except Exception:
pass
limit_text = limit_label or 'Limit: n/a'
lines.append(f"Window: last {window_secs}s; {limit_text}\n")
lines.append(f"Events counted in window: {len(events_window)}\n")
lines.append("\nEvents:\n")
if events_window:
for ev in events_window[-200:]:
ts_iso = datetime.utcfromtimestamp(ev.get('ts', 0)).isoformat()
rcpt = ev.get('recipient', '')
subj = ev.get('subject', '')
client = ev.get('client', {})
relay = ev.get('relay', {})
client_h, client_ip = _resolve_client_details(ev.get('qid', ''), client)
relay_ip = relay.get('ip', '')
relay_h = relay.get('h', '')
rtype = (ev.get('relayType') or '').lower()
relay_str = 'local' if rtype == 'local' else f"{relay_h}[{relay_ip}]"
lines.append(f"- {ts_iso} to={rcpt} subj=\"{subj}\" client={client_h}[{client_ip}] relay={relay_str} qid={ev.get('qid', '')}\n")
else:
lines.append("- No detailed events captured for this window.\n")
return ''.join(lines)
def _save_and_upload_report(owner_key: str, sender_email: str, reason_text: str,
report_text: str, sid):
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
now_dt = datetime.utcnow()
now_str = now_dt.strftime('%Y%m%d-%H%M%S')
created_at_iso = now_dt.isoformat()
safe_sender = (sender_email or 'unknown').replace('@', '_')
safe_owner = (owner_key or 'unknown').replace('@', '_')
fname = f"{now_str}-{safe_owner}-{safe_sender}.txt"
fpath = REPORTS_DIR / fname
fpath.write_text(report_text)
payload = {
"serverId": str(sid) if sid is not None else str(CURRENT_SERVER_ID or ""),
"senderEmail": sender_email,
"account": owner_key,
"reason": reason_text,
"reportText": report_text,
"reportKey": fname,
"createdAt": created_at_iso
}
target_sid = payload["serverId"]
if target_sid:
try:
session.post(f"{BASE_URL}/api/osm/agent/freeze-report", json=payload, headers=DEFAULT_HEADERS, timeout=10)
return
except Exception as e:
print(f"[agent] Failed to upload freeze report: {e}")
try:
_queue_pending_report(target_sid, sender_email, owner_key or '', reason_text, report_text, fname, created_at_iso)
except Exception:
pass
else:
try:
_queue_pending_report(target_sid, sender_email, owner_key or '', reason_text, report_text, fname, created_at_iso)
except Exception:
pass
def _generate_freeze_report(account_ref: str, owner_key: str, sender_email: str,
reason_text: str, window_secs: int, limit_label: str,
sid, fallback_events: Optional[list] = None):
if not FREEZE_REPORTS_ENABLED:
print("[agent] Freeze report generation disabled; skipping report")
return
try:
events_window = _collect_events_for_report(owner_key, sender_email, window_secs)
if not events_window and fallback_events:
events_window = list(fallback_events)
try:
acct_events = ACCOUNT_EVENTS.setdefault(owner_key, deque())
for ev in fallback_events:
acct_events.append(ev)
except Exception:
pass
report_text = _build_report_text(owner_key, sender_email, reason_text, window_secs, limit_label, events_window)
_save_and_upload_report(owner_key, sender_email, reason_text, report_text, sid)
except Exception as e:
print(f"[agent] Failed to write/upload report: {e}")
def _parse_report_file(path: Path):
try:
text = path.read_text(encoding='utf-8')
except Exception as e:
print(f"[agent] Failed reading report {path.name}: {e}")
return None
reason_text = ""
owner_text = ""
sender_text = ""
try:
m_reason = re.search(r"^Reason:\s*(.+)$", text, re.MULTILINE)
if m_reason:
reason_text = m_reason.group(1).strip()
m_owner = re.search(r"^Owner:\s*(.+)$", text, re.MULTILINE)
if m_owner:
owner_text = m_owner.group(1).strip()
m_sender = re.search(r"^Sender:\s*(.+)$", text, re.MULTILINE)
if m_sender:
sender_text = m_sender.group(1).strip()
except Exception as e:
print(f"[agent] Failed parsing metadata in {path.name}: {e}")
try:
created_at = datetime.utcfromtimestamp(path.stat().st_mtime).isoformat()
except Exception:
created_at = datetime.utcnow().isoformat()
return {
"reportText": text,
"reason": reason_text,
"account": owner_text,
"senderEmail": sender_text,
"createdAt": created_at,
"reportKey": path.name,
}
def _extract_recipient_from_arrow_line(line):
try:
m = re.search(r"(?:=>|->)\s+([^\s]+)", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _extract_sender_from_arrow_line(line):
try:
m = re.search(r"\bF=<([^>]+)>", line)
if m:
return m.group(1)
except Exception:
pass
return None
def _extract_account_ref_from_exim_line(line):
"""Return account_ref like sender(user)."""
try:
# Prefer explicit 'from <sender>'
m_from = re.search(r"from\s+<([^>\s]+@[^>\s]+)>", line)
sender_email = m_from.group(1) if m_from else None
if not sender_email:
# Fallback to token after '<='
m_after = re.search(r"<=\s*<?([^>\s]+@[^\s>]+)>?", line)
if m_after:
sender_email = m_after.group(1)
if not sender_email:
# Submission lines can also include F=<sender>
m_f = re.search(r"\bF=<([^>]+)>", line)
if m_f:
candidate = (m_f.group(1) or '').strip()
if candidate:
sender_email = candidate
# Auth user: A=login:USER or A=dovecot_login:USER or courier_login
m_auth = re.search(r"\bA=[A-Za-z0-9_-]+:([^\s]+)\b", line)
auth_user = m_auth.group(1) if m_auth else None
if not auth_user:
# Local deliveries note the owner as U=username
m_u = re.search(r"\bU=([^\s]+)\b", line)
if m_u:
auth_user = m_u.group(1)
if auth_user:
auth_user = auth_user.strip()
# Compose account ref
if sender_email and auth_user:
return f"{sender_email}({auth_user})"
if sender_email:
return sender_email
if auth_user:
return auth_user
return "Unknown"
except Exception:
return "Unknown"
def _update_counters_from_exim_line(line, thresholds, line_ts):
"""Parse Exim log line and update counters for tracking and per-account limits."""
try:
if 'diradm_user=' in line:
msgid = _extract_message_id_from_any_line(line) or _extract_message_id_from_exim_line(line)
if msgid:
try:
m_user = re.search(r"diradm_user=([^\s]+)", line)
owner_user = m_user.group(1).strip().lower() if m_user else ''
if owner_user:
MSG_ID_TO_OWNER[msgid] = owner_user
existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
email_part = None
if existing_ref:
email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
email_part = (email_part or '').strip()
if not email_part:
m_addr = re.search(r"diradm_address=([^\s]+)", line)
if m_addr:
email_part = (m_addr.group(1) or '').strip()
if not email_part:
email_part = _extract_account_ref_from_exim_line(line)
if email_part:
MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
m_dom = re.search(r"diradm_domain=([^\s]+)", line)
if m_dom:
dom = (m_dom.group(1) or '').strip().lower().rstrip('.')
if dom:
DOMAIN_OWNER_MAP[dom] = owner_user
print(f"[agent] DirectAdmin owner resolved for msgid={msgid}: owner={owner_user}")
except Exception as e:
print(f"[agent] DirectAdmin owner parse error: {e}")
if 'Sender identification' in line:
msgid = _extract_message_id_from_any_line(line)
if msgid:
try:
m_user = re.search(r"\bU=([^\s]+)", line)
if m_user:
owner_user = m_user.group(1).strip().lower()
if owner_user:
MSG_ID_TO_OWNER[msgid] = owner_user
existing_ref = MSG_ID_TO_ACCOUNT.get(msgid)
if existing_ref:
email_part = existing_ref.split('(')[0] if '(' in existing_ref else existing_ref
email_part = (email_part or '').strip()
if '(' not in existing_ref or not existing_ref.endswith(')') or owner_user not in existing_ref:
MSG_ID_TO_ACCOUNT[msgid] = f"{email_part}({owner_user})"
except Exception:
pass
return
# Only process lines with message submission ('<=' indicates incoming/outgoing)
if ' <= ' not in line:
# Also track deliveries ('=>' for first, '->' for additional recipients)
if ' => ' in line or ' -> ' in line:
# Delivery line format: => recipient ...
msg_id = _extract_message_id_from_any_line(line)
if not msg_id:
return
# Only count deliveries for messages that originated locally or via auth relay
rtype = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
if rtype not in ('local', 'auth'):
return
# Check dedupe for this (msgid, recipient) pair
recipient = _extract_recipient_from_arrow_line(line)
sender = _extract_sender_from_arrow_line(line) or MSG_ID_TO_ACCOUNT.get(msg_id, '')
if not recipient:
return
dedupe_key = (msg_id, recipient)
if dedupe_key in SEEN_MSG_RCPT_SET:
return
SEEN_MSG_RCPT.append((dedupe_key, line_ts))
SEEN_MSG_RCPT_SET.add(dedupe_key)
# Build account_ref from cached data or line
account_ref = MSG_ID_TO_ACCOUNT.get(msg_id) or sender or 'Unknown'
subject = MSG_ID_TO_SUBJECT.get(msg_id, '')
# Record recipient with timestamp for this account
try:
rec_map = RECIPIENTS_BY_ACCOUNT.setdefault(account_ref, {})
rec_map[recipient] = line_ts
except Exception:
pass
# Update per-owner event log (for freeze reports) - one event per delivery
try:
owner = MSG_ID_TO_OWNER.get(msg_id) or _resolve_limit_account_key(account_ref)
client = MSG_ID_TO_CLIENT.get(msg_id, {})
relay = MSG_ID_TO_RELAY.get(msg_id, {})
relay_type = MSG_ID_TO_RELAY_TYPE.get(msg_id, 'unknown')
ev = {
'ts': line_ts,
'sender': account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref,
'recipient': recipient,
'subject': subject,
'qid': msg_id,
'client': client,
'relay': relay,
'relayType': relay_type
}
dq = ACCOUNT_EVENTS.setdefault(owner, deque())
dq.append(ev)
# Limit retention
cutoff = now_ts - ACCOUNT_EVENTS_RETENTION_SECS
while dq and dq[0].get('ts', 0) < cutoff:
dq.popleft()
except Exception:
pass
# Count each delivery (recipient) as one email for limits
# Fetch account limits for inline enforcement
try:
limits = fetch_account_limits(None) # Use cached limits
lim_by_account = {}
for lim in limits:
if not lim.get('enabled', True):
continue
acc = (lim.get('account') or '').strip().lower()
max_emails = int(lim.get('maxEmails', 0))
per_seconds = int(lim.get('perSeconds', 60))
if acc and max_emails > 0 and per_seconds > 0:
lim_by_account[acc] = (max_emails, per_seconds)
print(f"[agent] Processing delivery: account_ref='{account_ref}', recipient='{recipient}', limits loaded={len(lim_by_account)}")
_record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
except Exception as e:
print(f"[agent] Error in delivery limit check: {e}")
_record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
return
# Message submission line ('<=' indicates the message was received)
msg_id = _extract_message_id_from_exim_line(line)
if not msg_id or msg_id in SEEN_LOG_MSG_IDS_SET:
return
# Mark as seen
SEEN_LOG_MSG_IDS.append((msg_id, line_ts))
SEEN_LOG_MSG_IDS_SET.add(msg_id)
# Extract account reference
account_ref = _extract_account_ref_from_exim_line(line)
if not account_ref or account_ref == "Unknown":
return
# Extract subject from line (T="...")
subject = ''
try:
m_subj = re.search(r"\bT=\"([^\"]*)\"", line)
if m_subj:
subject = m_subj.group(1).strip()
except Exception:
pass
# Cache for later delivery lines
MSG_ID_TO_ACCOUNT[msg_id] = account_ref
MSG_ID_TO_SUBJECT[msg_id] = subject
# Parse client and relay info for freeze reports
try:
client_h, client_ip = _extract_client_host_ip_from_line(line)
if client_h or client_ip:
MSG_ID_TO_CLIENT[msg_id] = {'h': client_h, 'ip': client_ip}
# Relay type: P=local, P=esmtpa (auth), etc.
relay_type = 'unknown'
line_lc = line.lower()
if 'p=local' in line_lc:
relay_type = 'local'
elif 'a=' in line or 'p=esmtpa' in line_lc or 'p=esmtpsa' in line_lc:
relay_type = 'auth'
elif 'p=esmtps' in line_lc or 'p=esmtp' in line_lc or 'p=spam-scanned' in line_lc:
relay_type = 'external'
MSG_ID_TO_RELAY_TYPE[msg_id] = relay_type
# Owner resolution
sender_email = account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref) or account_ref
owner = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account_ref)
MSG_ID_TO_OWNER[msg_id] = owner
except Exception:
pass
# Only count outgoing submissions (local or authenticated). Skip external/inbound.
if MSG_ID_TO_RELAY_TYPE.get(msg_id) not in ('local', 'auth'):
return
# Fetch account limits for inline enforcement
try:
limits = fetch_account_limits(None) # Use cached limits
lim_by_account = {}
for lim in limits:
if not lim.get('enabled', True):
continue
acc = (lim.get('account') or '').strip().lower()
max_emails = int(lim.get('maxEmails', 0))
per_seconds = int(lim.get('perSeconds', 60))
if acc and max_emails > 0 and per_seconds > 0:
lim_by_account[acc] = (max_emails, per_seconds)
_record_event(account_ref, subject, thresholds, None, lim_by_account, event_ts=line_ts)
except Exception:
_record_event(account_ref, subject, thresholds, None, None, event_ts=line_ts)
except Exception as e:
pass # Silent fail per line to keep processing
def _account_key_for_limits(account_ref: str) -> str:
try:
if '(' in account_ref and account_ref.endswith(')'):
# sender(user)
inside = account_ref[account_ref.rfind('(')+1:-1]
if inside:
return inside.lower()
# else maybe it's just an email or username; prefer local-part for email
m = re.match(r"([^@]+)@", account_ref)
if m:
return m.group(1).lower()
return account_ref.strip().lower()
except Exception:
return account_ref.strip().lower()
def _prune_deque(dq: deque, cutoff_ts: float):
while dq and dq[0] < cutoff_ts:
dq.popleft()
def _reload_exim():
try:
# Prefer systemctl, fallback to service
res = subprocess.run(['systemctl', 'reload', 'exim'], capture_output=True, text=True, timeout=5)
if res.returncode != 0:
subprocess.run(['service', 'exim', 'reload'], capture_output=True, text=True, timeout=5)
except Exception as e:
print(f"[agent] Exim reload failed: {e}")
def _ensure_freeze_list():
try:
if not FREEZE_LIST_PATH.exists():
# Create file with safe perms
FREEZE_LIST_PATH.parent.mkdir(parents=True, exist_ok=True)
FREEZE_LIST_PATH.write_text("")
FREEZE_LIST_PATH.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except Exception as e:
print(f"[agent] Could not ensure freeze list: {e}")
def _ensure_virtual_files():
try:
VIRTUAL_DIR.mkdir(parents=True, exist_ok=True)
if not OSM_HOLD_FILE.exists():
OSM_HOLD_FILE.write_text("")
OSM_HOLD_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
if not OSM_DISABLE_FILE.exists():
OSM_DISABLE_FILE.write_text("")
OSM_DISABLE_FILE.chmod(stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP)
except Exception as e:
print(f"[agent] Could not ensure /etc/virtual files: {e}")
def add_sender_to_freeze_list(sender_email: str, reason: str = None):
"""Legacy compatibility wrapper. OSM now relies on /etc/virtual/osm_hold only."""
try:
if not sender_email:
return
print(f"[agent] (notice) Skipping legacy freeze list update for {sender_email}")
except Exception:
pass
def remove_sender_from_freeze_list(sender_email: str):
"""Legacy compatibility wrapper. Nothing to remove from legacy list anymore."""
try:
sender_email = (sender_email or '').strip()
if sender_email:
print(f"[agent] (notice) Legacy freeze list disabled; nothing to remove for {sender_email}")
except Exception:
pass
return False
def add_user_to_hold_list(username, sender_email=None, reason: str = None):
try:
username = (username or '').strip()
email_full = (sender_email or '').strip()
if not username and not email_full:
return
_ensure_virtual_files()
existing = set()
try:
for line in OSM_HOLD_FILE.read_text().splitlines():
s = (line or '').strip()
if s:
existing.add(s)
except Exception:
pass
to_add = []
if username and username not in existing:
to_add.append(username)
if email_full and email_full not in existing:
to_add.append(email_full)
if to_add:
with OSM_HOLD_FILE.open('a') as f:
for item in to_add:
f.write(item + "\n")
_reload_exim()
print(f"[agent] Added to OSM hold list: {', '.join(to_add)}")
# Mark local hold/freeze update time
try:
global LAST_LOCAL_HOLD_UPDATE_TS
LAST_LOCAL_HOLD_UPDATE_TS = time.time()
except Exception:
pass
# Notify server immediately so reconcile will retain this entry
notified = False
if BASE_URL and TOKEN and CURRENT_SERVER_ID is not None:
try:
session.post(
f"{BASE_URL}/api/osm/agent/freeze",
json={
"serverId": str(CURRENT_SERVER_ID),
"senderEmail": email_full,
"account": username,
"reason": (reason or "locally added by agent")
},
headers=DEFAULT_HEADERS,
timeout=5,
)
notified = True
except Exception as e:
print(f"[agent] Failed to notify server freeze (hold): {e}")
if not notified:
try:
_queue_action({
"type": "freeze",
"senderEmail": email_full or username,
"account": username or "",
"reason": reason or "locally added by agent",
"timestamp": time.time(),
})
except Exception:
pass
except Exception as e:
print(f"[agent] Error updating OSM hold list: {e}")
def remove_user_from_hold_list(username=None, sender_email=None):
try:
targets = set()
if username:
targets.add(username.strip().lower())
if sender_email:
targets.add(sender_email.strip().lower())
if not targets:
return False
_ensure_virtual_files()
changed = False
try:
lines = []
for line in OSM_HOLD_FILE.read_text().splitlines():
entry = (line or '').strip()
if entry and entry.lower() not in targets:
lines.append(entry)
else:
changed = True
if changed:
OSM_HOLD_FILE.write_text("\n".join(lines) + ("\n" if lines else ""))
_reload_exim()
try:
global LAST_LOCAL_HOLD_UPDATE_TS
LAST_LOCAL_HOLD_UPDATE_TS = time.time()
except Exception:
pass
except FileNotFoundError:
changed = False
return changed
except Exception as e:
print(f"[agent] Error removing from OSM hold list: {e}")
return False
def _maybe_trigger_freeze(account_ref, threshold, dq, sid=None):
try:
key = (account_ref, threshold["name"], threshold["seconds"])
events_required = int(threshold["events"])
if len(dq) >= events_required:
last = LAST_TRIGGERED.get(key)
now = time.time()
# Avoid re-triggering too often: only once per window
if not last or (now - last) >= threshold["seconds"]:
sender_email = account_ref.split('(')[0]
# Derive username for OSM hold list
username = extract_username_from_account_ref(account_ref)
owner_key = (_resolve_owner_from_email(sender_email) or username or '').strip()
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
# Local relay (hold_owner==True) → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s"
)
# Maintain legacy exim sender freeze list for generic setups
add_sender_to_freeze_list(sender_email, reason=f"Exceeded {threshold['events']} in {threshold['seconds']}s")
LAST_TRIGGERED[key] = now
reason_text = f"Exceeded {threshold['events']} in {threshold['seconds']}s"
limit_label = f"Threshold: {threshold['events']} events"
_generate_freeze_report(account_ref, owner_key or username, sender_email, reason_text, int(threshold["seconds"]), limit_label, sid)
print(f"[agent] Threshold exceeded for {account_ref}: {len(dq)}/{events_required} in {threshold['seconds']}s (freeze)")
except Exception as e:
print(f"[agent] Freeze decision error: {e}")
def _record_event(account_ref, subject, thresholds, sid=None, lim_by_account=None, event_ts=None):
event_time = event_ts if event_ts is not None else time.time()
# Remember last subject (best-effort)
try:
sub = (subject or '').strip()
if sub:
# Collapse newlines/tabs and trim
sub = re.sub(r"\s+", " ", sub)[:200]
LAST_SUBJECT_BY_ACCOUNT[account_ref] = sub
except Exception:
pass
owner_key_for_limits = _resolve_limit_account_key(account_ref)
has_account_limit = bool(lim_by_account and owner_key_for_limits in lim_by_account)
has_default_limit = bool(DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0)
for t in thresholds:
if (t.get("type") or "logline") != "logline":
continue
window = int(t["seconds"])
key = (account_ref, t["name"], window)
dq = LOG_COUNTERS[key]
_prune_deque(dq, event_time - window)
dq.append(event_time)
LAST_COUNTER_SEEN_TS[key] = event_time
if not has_account_limit and not has_default_limit:
_maybe_trigger_freeze(account_ref, t, dq, sid)
# Apply per-account limits from Manage
try:
if lim_by_account is not None:
owner_key = owner_key_for_limits
# Debug: print resolved owner and available limits
if lim_by_account:
print(f"[agent] Resolved owner_key='{owner_key}' from account_ref='{account_ref}', available limits={list(lim_by_account.keys())}")
# 1) Account-specific limit
if owner_key in lim_by_account:
max_emails, per_seconds = lim_by_account[owner_key]
dq = ACCOUNT_LIMIT_WINDOWS[owner_key]
_prune_deque(dq, event_time - per_seconds)
dq.append(event_time)
# Also mirror this into tracking DB counters under a standard trigger name
trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
tdq = LOG_COUNTERS[trig_key]
_prune_deque(tdq, event_time - per_seconds)
tdq.append(event_time)
if len(dq) > max_emails:
# Over limit: freeze sender
sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
reason_text = f"Exceeded {max_emails} in {per_seconds}s"
log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
if not log_evidence:
print(f"[agent] Queue limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
else:
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
# Local relay → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=reason_text
)
add_sender_to_freeze_list(sender_email, reason=reason_text)
limit_label = f"Limit: {max_emails}"
_generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
# 2) Server default limit if no account-specific
elif DEFAULT_ENABLED and DEFAULT_MAX_EMAILS > 0 and DEFAULT_PER_SECONDS > 0:
per_seconds = DEFAULT_PER_SECONDS
max_emails = DEFAULT_MAX_EMAILS
dq = ACCOUNT_LIMIT_WINDOWS[f"default::{owner_key}"]
_prune_deque(dq, event_time - per_seconds)
dq.append(event_time)
# Mirror to tracking DB under default window
trig_key = (account_ref, f"limit_{per_seconds}", int(per_seconds))
tdq = LOG_COUNTERS[trig_key]
_prune_deque(tdq, event_time - per_seconds)
tdq.append(event_time)
if len(dq) > max_emails:
sender_email = (account_ref.split('(')[0] if '(' in account_ref else _extract_email(account_ref)) or ''
username = extract_username_from_account_ref(account_ref)
reason_text = f"Exceeded default {max_emails} in {per_seconds}s"
log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
if not log_evidence:
print(f"[agent] Default limit exceeded for {owner_key} but no recent exim log events; skipping account freeze.")
else:
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
# Local relay → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=reason_text
)
add_sender_to_freeze_list(sender_email, reason=reason_text)
limit_label = f"Default Limit: {max_emails}"
_generate_freeze_report(account_ref, owner_key, sender_email, reason_text, per_seconds, limit_label, sid)
except Exception as e:
print(f"[agent] Error applying per-account limits: {e}")
def write_tracking_db(thresholds):
try:
lines = []
now_ts = time.time()
valid_windows = {(t["name"], int(t["seconds"])) for t in thresholds if (t.get("type") or "logline") == "logline"}
# Build a set of keys to write: active counters and recently seen counters
keys_to_write = set(LOG_COUNTERS.keys())
for k, ts in list(LAST_COUNTER_SEEN_TS.items()):
if now_ts - ts <= COUNTER_RETENTION_SECS:
keys_to_write.add(k)
for (account_ref, trig_name, window) in sorted(keys_to_write):
dq = LOG_COUNTERS.get((account_ref, trig_name, window), deque())
if not dq and (trig_name, window) not in valid_windows and not trig_name.startswith("limit_"):
continue
# prune before counting
_prune_deque(dq, now_ts - window)
count = len(dq)
# Skip if no count and outside retention
last_ts = LAST_COUNTER_SEEN_TS.get((account_ref, trig_name, window), 0)
if count == 0 and (now_ts - last_ts) > COUNTER_RETENTION_SECS:
continue
subject = (LAST_SUBJECT_BY_ACCOUNT.get(account_ref) or '').strip()
subject = re.sub(r"\s+", " ", subject)[:200] if subject else ''
# Compose recipients string
rcpts = []
try:
rec_map = RECIPIENTS_BY_ACCOUNT.get(account_ref) or {}
# prune again relative to now
for r, t in list(rec_map.items()):
if now_ts - t > RECIPIENT_RETENTION_SECS:
rec_map.pop(r, None)
rcpts = sorted(rec_map.keys())[:50]
except Exception:
rcpts = []
lines.append(f"{count}event percent")
lines.append(f"{window}seconds")
# Append recipients after subject using rcpt-> comma-separated
rcpt_part = f"rcpt->{','.join(rcpts)}" if rcpts else "rcpt->"
lines.append(f"{account_ref}->{trig_name}->logline->subject->{subject} {rcpt_part}")
TRACKING_DB_PATH.write_text("\n".join(lines) + ("\n" if lines else ""))
except Exception as e:
print(f"[agent] Error writing tracking DB: {e}")
def _get_subject_for_message_id(msg_id: str) -> str:
"""Fetch Subject header for a given exim message id using exim -Mvh."""
try:
if not msg_id:
return ""
res = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
if res.returncode != 0:
return ""
headers = res.stdout
m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
if m:
return m.group(1).strip()
except Exception:
pass
return ""
def _extract_subject_from_headers_text(headers: str) -> str:
try:
if not headers:
return ""
m = re.search(r"^Subject:\s*(.+)$", headers, re.MULTILINE)
if m:
return m.group(1).strip()
except Exception:
pass
return ""
def get_email_headers(msg_id):
"""Fetch email headers using exim -Mvh"""
try:
result = subprocess.run(['exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
headers = result.stdout.strip()
return headers
else:
return f"Error fetching headers: {result.stderr.strip()}"
except Exception as e:
return f"Error fetching headers: {str(e)}"
def get_email_body(msg_id):
"""Fetch email body using exim -Mvb"""
try:
result = subprocess.run(['exim', '-Mvb', msg_id], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
body = result.stdout.strip()
return body
else:
return f"Error fetching body: {result.stderr.strip()}"
except Exception as e:
return f"Error fetching body: {str(e)}"
def _queue_pending_report(server_id: str, sender_email: str, account: str, reason: str, report_text: str, report_key: str, created_at: str):
try:
PENDING_REPORTS_DIR.mkdir(parents=True, exist_ok=True)
fname = f"{int(time.time())}-{uuid.uuid4().hex}.json"
payload = {
"serverId": server_id,
"senderEmail": sender_email,
"account": account,
"reason": reason,
"reportText": report_text,
"reportKey": report_key,
"createdAt": created_at,
}
(PENDING_REPORTS_DIR / fname).write_text(json.dumps(payload), encoding='utf-8')
print(f"[agent] Queued freeze report for retry: {fname}")
except Exception as e:
print(f"[agent] Failed to queue freeze report for retry: {e}")
def _flush_pending_reports():
if not BASE_URL or not TOKEN or CURRENT_SERVER_ID is None:
return
if not FREEZE_REPORTS_ENABLED:
return
try:
if not PENDING_REPORTS_DIR.exists():
return
for path in sorted(PENDING_REPORTS_DIR.glob('*.json')):
try:
data = json.loads(path.read_text(encoding='utf-8'))
except Exception as e:
print(f"[agent] Could not read pending report {path.name}: {e}")
continue
if "createdAt" not in data or not data.get("createdAt"):
data["createdAt"] = datetime.utcnow().isoformat()
try:
session.post(f"{BASE_URL}/api/osm/agent/freeze-report", json=data, headers=DEFAULT_HEADERS, timeout=10)
path.unlink()
print(f"[agent] Uploaded pending freeze report: {path.name}")
except Exception as e:
print(f"[agent] Failed to upload pending freeze report {path.name}: {e}")
# Stop after first failure to avoid hammering server
break
except Exception as e:
print(f"[agent] Pending report flush failed: {e}")
def _process_pending_actions():
sid = _get_current_server_id()
if not BASE_URL or not TOKEN or not sid:
return
actions = _read_actions_queue()
if not actions:
return
remaining = []
for action in actions:
typ = action.get('type')
try:
if typ == 'freeze':
payload = {
"serverId": sid,
"senderEmail": action.get('senderEmail', ''),
"account": action.get('account') or '',
"reason": action.get('reason') or 'CLI freeze'
}
session.post(f"{BASE_URL}/api/osm/agent/freeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'unfreeze':
payload = {
"serverId": sid,
"senderEmail": action.get('senderEmail', ''),
"account": action.get('account') or ''
}
session.post(f"{BASE_URL}/api/osm/agent/unfreeze", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'set_limit':
payload = {
"serverId": sid,
"account": action.get('account', ''),
"maxEmails": int(action.get('maxEmails', 0)),
"perSeconds": int(action.get('perSeconds', 0)),
"enabled": bool(action.get('enabled', True))
}
session.post(f"{BASE_URL}/api/osm/agent/limit", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'remove_limit':
payload = {
"serverId": sid,
"account": action.get('account', '')
}
session.post(f"{BASE_URL}/api/osm/agent/limit-remove", json=payload, headers=DEFAULT_HEADERS, timeout=10)
elif typ == 'set_default_limit':
payload = {
"serverId": sid,
"maxEmails": int(action.get('maxEmails', 0)),
"perSeconds": int(action.get('perSeconds', 0)),
"enabled": bool(action.get('enabled', True))
}
session.post(f"{BASE_URL}/api/osm/agent/limit-default", json=payload, headers=DEFAULT_HEADERS, timeout=10)
else:
continue
except Exception as e:
print(f"[agent] Failed to sync action {typ}: {e}")
remaining.append(action)
continue
_write_actions_queue(remaining)
def _append_client_summary(lines, events_window):
try:
ips = []
hosts = []
seen_ips = set()
seen_hosts = set()
for ev in events_window:
client = ev.get('client', {}) or {}
host, ip = _resolve_client_details(ev.get('qid') or '', client)
if ip and ip not in seen_ips:
seen_ips.add(ip)
ips.append(ip)
if host and host not in seen_hosts:
seen_hosts.add(host)
hosts.append(host)
if ips:
lines.append(f"Client IPs: {', '.join(ips)}\n")
if hosts:
lines.append(f"Client Hosts: {', '.join(hosts)}\n")
except Exception:
pass
def _sync_reports_with_server(force: bool = False):
global LAST_REPORT_SYNC_TS
if not BASE_URL or not TOKEN or CURRENT_SERVER_ID is None:
return
if not FREEZE_REPORTS_ENABLED:
return
now = time.time()
if not force and (now - LAST_REPORT_SYNC_TS) < 60:
return
LAST_REPORT_SYNC_TS = now
try:
reports_payload = []
if REPORTS_DIR.exists():
for path in sorted(REPORTS_DIR.glob('*.txt')):
parsed = _parse_report_file(path)
if not parsed:
continue
if not parsed.get('senderEmail'):
continue
parsed['serverId'] = str(CURRENT_SERVER_ID)
reports_payload.append(parsed)
payload = {
"serverId": str(CURRENT_SERVER_ID),
"reports": reports_payload,
}
session.post(
f"{BASE_URL}/api/osm/agent/report-sync",
json=payload,
headers=DEFAULT_HEADERS,
timeout=10,
)
except Exception as e:
print(f"[agent] Report sync failed: {e}")
def get_exim_queue_stats(stats):
"""Populate queue statistics using exim commands."""
try:
result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return stats
lines = result.stdout.splitlines()
details = []
current = None
for raw in lines:
line = (raw or '').rstrip()
if not line:
continue
stripped = line.lstrip()
if not stripped:
continue
if stripped.startswith('**') or stripped.startswith('=='):
continue
is_header_line = False
try:
parts = stripped.split()
if len(parts) < 3:
is_header_line = False
else:
age_token = parts[0]
second_token = parts[1] if len(parts) > 1 else ''
third_token = parts[2] if len(parts) > 2 else ''
# very loose check: age token contains digit and second token is numeric-ish
size_token = second_token.upper()
if any(ch.isdigit() for ch in age_token) and (size_token.replace('.', '').replace('K','').replace('M','').isdigit()):
is_header_line = True
except Exception:
is_header_line = False
if is_header_line:
age = parts[0]
size = parts[1]
queue_id = parts[2]
sender = ""
if len(parts) >= 4:
sender = parts[3]
if sender == "<=" and len(parts) >= 5:
sender = parts[4]
current = {
"queueId": queue_id,
"messageId": queue_id,
"age": age,
"size": size,
"sender": sender,
"status": stripped,
"recipients": [],
"subject": "",
"headers": "",
"body": "",
"ownerAccount": "",
}
details.append(current)
try:
headers = get_email_headers(queue_id)
if headers and not headers.startswith("Error fetching"):
current["headers"] = headers
current["subject"] = _extract_subject_from_headers_text(headers) or current["subject"]
except Exception:
pass
try:
subj_inline = _get_subject_for_message_id(queue_id)
if subj_inline:
current["subject"] = subj_inline
except Exception:
pass
continue
if current is not None:
recipient = stripped
if recipient:
current["recipients"].append(recipient)
# Optionally attach short recipient preview and body (lazy fetch only if small queue)
for entry in details[:50]:
try:
if not entry.get("body"):
body = get_email_body(entry["queueId"])
if body and not body.startswith("Error fetching"):
# Avoid massive payloads
entry["body"] = body[:10000]
except Exception:
pass
if entry.get("recipients"):
entry["recipient"] = ", ".join(entry["recipients"])
else:
entry["recipient"] = ""
if entry.get("headers"):
owner = extract_account_from_detail({
"headers": entry["headers"],
"sender": entry.get("sender", ""),
"recipient": entry.get("recipient", "")
})
entry["ownerAccount"] = owner
stats["queueDetails"] = details
stats["totalMessages"] = len(details)
stats["activeMessages"] = len(details)
return stats
except Exception as e:
print(f"[agent] Error getting exim queue stats: {e}")
return stats
def get_postfix_queue_stats(stats):
"""Get queue statistics using postfix commands"""
try:
# Get queue summary using postqueue
result = subprocess.run(['postqueue', '-p'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
output = result.stdout
# Parse postqueue output
lines = output.strip().split('\n')
for line in lines[1:]: # Skip header line
if line.strip() and not line.startswith('-'):
parts = line.split()
if len(parts) >= 5:
queue_id = parts[0]
size = parts[1]
nrcpt = parts[2]
time_str = parts[3]
sender = parts[4] if len(parts) > 4 else ""
stats["queueDetails"].append({
"queueId": queue_id,
"size": size,
"recipients": nrcpt,
"time": time_str,
"sender": sender,
"ownerAccount": (_resolve_owner_from_email(_extract_email(sender) or '') or '')
})
# Categorize messages
if queue_id.startswith('active'):
stats["activeMessages"] += 1
elif queue_id.startswith('deferred'):
stats["deferredMessages"] += 1
elif queue_id.startswith('hold'):
stats["holdMessages"] += 1
elif queue_id.startswith('incoming'):
stats["incomingMessages"] += 1
else:
stats["outgoingMessages"] += 1
stats["totalMessages"] += 1
except Exception as e:
print(f"[agent] Error getting postfix queue stats: {e}")
return stats
def get_mail_queue_stats(queue_path: str):
"""Get mail queue statistics using appropriate mail server commands"""
stats = {
"totalMessages": 0,
"activeMessages": 0,
"deferredMessages": 0,
"holdMessages": 0,
"incomingMessages": 0,
"outgoingMessages": 0,
"queueDetails": [],
"spamScores": [],
"blockedDomains": []
}
mail_server = detect_mail_server()
try:
if mail_server == 'exim':
return get_exim_queue_stats(stats)
elif mail_server == 'postfix':
return get_postfix_queue_stats(stats)
else:
# Fallback: try exim totals (noop) then postfix
s = stats
s = get_postfix_queue_stats(s)
return s
except Exception:
return stats
def get_spam_scores(queue_path: str, mail_server: str):
"""Extract spam scores from mail headers"""
spam_scores = []
try:
if mail_server == 'exim':
spam_scores = get_exim_spam_scores()
elif mail_server == 'postfix':
spam_scores = get_postfix_spam_scores(queue_path)
else:
# Try both methods
spam_scores = get_exim_spam_scores()
if not spam_scores:
spam_scores = get_postfix_spam_scores(queue_path)
except Exception as e:
print(f"[agent] Error getting spam scores: {e}")
return spam_scores
def get_exim_spam_scores():
"""Get spam scores from exim queue using exim -Mvh"""
spam_scores = []
try:
# Get list of queued messages (with sudo for permission)
result = subprocess.run(['exim', '-bp'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
msg_ids = []
for line in lines:
if line.strip() and not line.startswith('-') and not line.startswith('Mail queue'):
parts = line.split()
if len(parts) >= 1:
msg_ids.append(parts[0])
# Get headers for first few messages to check for spam scores
for msg_id in msg_ids[:10]: # Limit to 10 messages
try:
header_result = subprocess.run(['sudo', 'exim', '-Mvh', msg_id], capture_output=True, text=True, timeout=5)
if header_result.returncode == 0:
headers = header_result.stdout
for line in headers.split('\n'):
if 'X-Spam-Score:' in line or 'X-Spam-Level:' in line or 'X-Spam-Status:' in line:
score_match = re.search(r'(\d+\.?\d*)', line)
if score_match:
spam_scores.append({
"messageId": msg_id,
"score": float(score_match.group(1)),
"header": line.strip()
})
except Exception:
continue
except Exception as e:
print(f"[agent] Error getting exim spam scores: {e}")
return spam_scores
def get_postfix_spam_scores(queue_path: str):
"""Get spam scores from postfix queue files"""
spam_scores = []
try:
# Look for recent messages in the queue
queue_dirs = ['active', 'deferred', 'incoming']
for queue_dir in queue_dirs:
queue_dir_path = Path(queue_path) / queue_dir
if queue_dir_path.exists():
for msg_file in queue_dir_path.iterdir():
if msg_file.is_file():
try:
# Read first few lines to get headers
with open(msg_file, 'r', encoding='utf-8', errors='ignore') as f:
headers = []
for i, line in enumerate(f):
if i > 50: # Only read first 50 lines
break
headers.append(line.strip())
if line.strip() == '': # End of headers
break
# Look for spam score headers
for header in headers:
if 'X-Spam-Score:' in header or 'X-Spam-Level:' in header:
score_match = re.search(r'(\d+\.?\d*)', header)
if score_match:
spam_scores.append({
"messageId": msg_file.name,
"score": float(score_match.group(1)),
"header": header
})
except Exception:
continue
if len(spam_scores) >= 20: # Limit to 20 recent scores
break
except Exception as e:
print(f"[agent] Error getting postfix spam scores: {e}")
return spam_scores
def check_blocked_domains(mail_server: str):
"""Check for blocked domains in recent mail"""
blocked_domains = []
try:
if mail_server == 'exim':
blocked_domains = check_exim_blocked_domains()
elif mail_server == 'postfix':
blocked_domains = check_postfix_blocked_domains()
else:
# Try both methods
blocked_domains = check_exim_blocked_domains()
blocked_domains.extend(check_postfix_blocked_domains())
except Exception as e:
print(f"[agent] Error checking blocked domains: {e}")
return blocked_domains
def check_exim_blocked_domains():
"""Check exim logs for blocked domains"""
blocked_domains = []
try:
# Check exim logs for blocked domains
log_files = [
'/var/log/exim/mainlog',
'/var/log/exim/paniclog',
'/var/log/mail.log',
'/var/log/maillog'
]
for log_file in log_files:
if os.path.exists(log_file):
try:
# Get recent log entries
result = subprocess.run(
['tail', '-100', log_file],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if any(keyword in line.lower() for keyword in ['blocked', 'reject', 'denied', 'blacklisted']):
# Extract domain from log line
domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
if domain_match:
domain = domain_match.group(1)
if domain not in [bd['domain'] for bd in blocked_domains]:
blocked_domains.append({
"domain": domain,
"timestamp": datetime.now().isoformat(),
"reason": line.strip()
})
except Exception:
continue
except Exception as e:
print(f"[agent] Error checking exim blocked domains: {e}")
return blocked_domains
def check_postfix_blocked_domains():
"""Check postfix logs for blocked domains"""
blocked_domains = []
try:
# Check postfix logs for blocked domains
log_files = [
'/var/log/mail.log',
'/var/log/postfix.log',
'/var/log/maillog'
]
for log_file in log_files:
if os.path.exists(log_file):
try:
# Get recent log entries
result = subprocess.run(
['tail', '-100', log_file],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if 'blocked' in line.lower() or 'reject' in line.lower():
# Extract domain from log line
domain_match = re.search(r'@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', line)
if domain_match:
domain = domain_match.group(1)
if domain not in [bd['domain'] for bd in blocked_domains]:
blocked_domains.append({
"domain": domain,
"timestamp": datetime.now().isoformat(),
"reason": line.strip()
})
except Exception:
continue
except Exception as e:
print(f"[agent] Error checking postfix blocked domains: {e}")
return blocked_domains
def detect_mail_server():
"""Detect which mail server (Exim/Postfix) is available on this host."""
try:
# Prefer explicit binaries first
if shutil.which('exim') or Path('/usr/sbin/exim').exists():
return 'exim'
if shutil.which('postqueue') or shutil.which('postfix'):
return 'postfix'
# Fallback heuristics: check for config directories
if Path('/etc/exim').exists() or Path('/var/log/exim').exists():
return 'exim'
if Path('/etc/postfix').exists():
return 'postfix'
except Exception:
pass
return 'unknown'
def detect_control_panel():
"""Detect which control panel is installed (cpanel, directadmin, or unknown)."""
try:
if os.path.isdir('/usr/local/cpanel'):
return 'cpanel'
if os.path.isdir('/usr/local/directadmin'):
return 'directadmin'
# Try binaries
try:
res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
return 'cpanel'
except Exception:
pass
return 'unknown'
except Exception:
return 'unknown'
def list_cpanel_accounts():
"""List all cPanel accounts."""
accounts = []
try:
# Try whmapi1 listaccts
res = subprocess.run(['whmapi1', 'listaccts'], capture_output=True, text=True, timeout=30)
if res.returncode == 0:
import json
try:
data = json.loads(res.stdout)
accts = data.get('data', {}).get('acct', [])
for a in accts:
user = a.get('user', '')
domain = a.get('domain', '')
if user:
accounts.append({'username': user, 'domain': domain})
except Exception:
pass
# Fallback: parse /etc/trueuserdomains
if not accounts:
p = Path('/etc/trueuserdomains')
if p.exists():
for line in p.read_text(encoding='utf-8', errors='ignore').splitlines():
if ':' in line:
parts = line.split(':', 1)
if len(parts) == 2:
domain = parts[0].strip()
user = parts[1].strip()
if user and domain:
accounts.append({'username': user, 'domain': domain})
except Exception as e:
print(f"[agent] Error listing cPanel accounts: {e}")
return accounts
def list_directadmin_accounts():
"""List all DirectAdmin accounts."""
accounts = []
try:
users_dir = Path('/usr/local/directadmin/data/users')
if users_dir.exists() and users_dir.is_dir():
for user_dir in users_dir.iterdir():
if user_dir.is_dir():
username = user_dir.name
# Try to get primary domain
domain = ''
try:
domain_file = user_dir / 'domains.list'
if domain_file.exists():
domains = domain_file.read_text(encoding='utf-8', errors='ignore').splitlines()
if domains:
domain = domains[0].strip()
except Exception:
pass
accounts.append({'username': username, 'domain': domain})
except Exception as e:
print(f"[agent] Error listing DirectAdmin accounts: {e}")
return accounts
def send_queue_snapshot(sid: int, stats: dict):
"""Send queue snapshot to server"""
if not QUEUE_INGEST_ENABLED:
print(f"[agent] Queue ingestion disabled; not sending snapshot")
return
try:
# Debug: print what we're sending
print(f"[agent] Sending snapshot with {len(stats['queueDetails'])} queue details")
if stats["queueDetails"]:
first_detail = stats["queueDetails"][0]
print(f"[agent] First detail: queueId={first_detail.get('queueId')}, headers_len={len(first_detail.get('headers', ''))}, body_len={len(first_detail.get('body', ''))}")
# Test JSON serialization first
try:
queue_details_json = json.dumps(stats["queueDetails"])
print(f"[agent] JSON serialization successful, size: {len(queue_details_json)} chars")
except Exception as json_error:
print(f"[agent] JSON serialization error: {json_error}")
# Try with limited data
limited_details = []
for detail in stats["queueDetails"][:5]: # Only first 5 messages
limited_detail = {
"queueId": detail.get("queueId", ""),
"size": detail.get("size", ""),
"recipients": detail.get("recipients", ""),
"time": detail.get("time", ""),
"sender": detail.get("sender", ""),
"recipient": detail.get("recipient", ""),
"headers": detail.get("headers", "")[:1000] + "..." if len(detail.get("headers", "")) > 1000 else detail.get("headers", ""),
"body": detail.get("body", "")[:1000] + "..." if len(detail.get("body", "")) > 1000 else detail.get("body", "")
}
limited_details.append(limited_detail)
queue_details_json = json.dumps(limited_details)
print(f"[agent] Using limited data, size: {len(queue_details_json)} chars")
payload = {
"serverId": str(sid),
"totalMessages": stats["totalMessages"],
"activeMessages": stats["activeMessages"],
"deferredMessages": stats["deferredMessages"],
"holdMessages": stats["holdMessages"],
"incomingMessages": stats["incomingMessages"],
"outgoingMessages": stats["outgoingMessages"],
"queueDetailsJson": queue_details_json,
"spamScoreJson": json.dumps(stats["spamScores"]),
"blockedDomainsJson": json.dumps(stats["blockedDomains"])
}
print(f"[agent] Sending payload with total size: {len(json.dumps(payload))} chars")
resp = session.post(f"{BASE_URL}/api/osm/agent/queue-snapshot", json=payload, headers=DEFAULT_HEADERS, timeout=30)
resp.raise_for_status()
print(f"[agent] Queue snapshot sent successfully")
except Exception as e:
print(f"[agent] Error sending queue snapshot: {e}")
import traceback
print(f"[agent] Traceback: {traceback.format_exc()}")
def send_accounts_snapshot(sid: int):
"""Detect panel and send accounts list to server."""
try:
panel = 'unknown'
accounts = []
# simple detection without shutil to avoid import if not present
if os.path.isdir('/usr/local/cpanel'):
panel = 'cpanel'
accounts = list_cpanel_accounts()
elif os.path.isdir('/usr/local/directadmin'):
panel = 'directadmin'
accounts = list_directadmin_accounts()
else:
# Try binaries
try:
res = subprocess.run(['which', 'whmapi1'], capture_output=True, text=True, timeout=5)
if res.returncode == 0:
panel = 'cpanel'
accounts = list_cpanel_accounts()
except Exception:
pass
if panel == 'unknown' and Path('/usr/local/directadmin').exists():
panel = 'directadmin'
accounts = list_directadmin_accounts()
payload = {
"serverId": str(sid),
"panelType": panel,
"accountsJson": json.dumps(accounts)
}
print(f"[agent] Sending accounts snapshot: panel={panel}, count={len(accounts)}")
resp = session.post(f"{BASE_URL}/api/osm/agent/accounts", json=payload, headers=DEFAULT_HEADERS, timeout=15)
resp.raise_for_status()
print(f"[agent] Accounts snapshot sent successfully (panel={panel}, count={len(accounts)})")
except Exception as e:
print(f"[agent] Error sending accounts snapshot: {e}")
def fetch_account_limits(sid=None):
# If offline or no server id, use cached limits only
if sid is None or not BASE_URL or not TOKEN:
cache = _read_limits_cache()
return cache.get("accounts", [])
try:
resp = session.get(f"{BASE_URL}/api/osm/agent/account-limits", params={"serverId": sid}, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
data = resp.json() or []
cache = _read_limits_cache()
cache["accounts"] = [
{
"account": str(x.get("account") or "").strip(),
"maxEmails": int(x.get("maxEmails") or 0),
"perSeconds": int(x.get("perSeconds") or 0),
"enabled": bool(x.get("enabled", True))
}
for x in data
]
_write_limits_cache(cache)
return data
except Exception as e:
print(f"[agent] Error fetching account limits: {e}")
cache = _read_limits_cache()
return cache.get("accounts", [])
def parse_age_to_seconds(age_str: str) -> int:
try:
s = age_str.strip()
if not s:
return 0
unit = s[-1]
val = int(''.join(ch for ch in s if ch.isdigit())) if any(ch.isdigit() for ch in s) else 0
if unit == 's':
return val
if unit == 'm':
return val * 60
if unit == 'h':
return val * 3600
if unit == 'd':
return val * 86400
return val
except Exception:
return 0
def extract_account_from_detail(detail: dict) -> str:
# Try headers for -ident or -auth_sender then fallback to recipient/sender
try:
headers = detail.get('headers') or ''
if headers:
m = re.search(r"-ident\s+(\w+)", headers)
if m:
return m.group(1)
m = re.search(r"-auth_sender\s+([^@\s]+)@", headers)
if m:
return m.group(1)
m = re.search(r"Received: from (\w+) by", headers)
if m:
return m.group(1)
recip = (detail.get('recipient') or '').strip()
if recip.startswith('(') and recip.endswith(')') and len(recip) > 2:
return recip[1:-1]
sender = (detail.get('sender') or '')
m = re.search(r"<([^@>]+)@", sender)
if m:
return m.group(1)
except Exception:
pass
return 'Unknown'
def freeze_message_exim(queue_id: str):
try:
subprocess.run(['exim', '-Mf', queue_id], capture_output=True, text=True, timeout=5)
except Exception as e:
print(f"[agent] exim freeze failed for {queue_id}: {e}")
def hold_message_postfix(queue_id: str):
try:
subprocess.run(['postsuper', '-h', queue_id], capture_output=True, text=True, timeout=5)
except Exception as e:
print(f"[agent] postfix hold failed for {queue_id}: {e}")
def enforce_account_limits(sid: int, stats: dict):
# Live rolling window using first-seen timestamps per message
# State containers (persist across calls via attributes on function)
if not hasattr(enforce_account_limits, "seen_ids"):
enforce_account_limits.seen_ids = set()
if not hasattr(enforce_account_limits, "account_windows"):
enforce_account_limits.account_windows = defaultdict(deque) # account -> deque[timestamps]
limits = fetch_account_limits(sid)
if not limits:
return
now_ts = time.time()
details = stats.get('queueDetails') or []
mail_server = detect_mail_server()
# Index limits for quick lookup
lim_by_account = {}
for lim in limits:
try:
if not lim.get('enabled', True):
continue
account = (lim.get('account') or '').strip().lower()
max_emails = int(lim.get('maxEmails', 0))
per_seconds = int(lim.get('perSeconds', 60))
if account and max_emails > 0 and per_seconds > 0:
lim_by_account[account] = (max_emails, per_seconds)
except Exception:
continue
if not lim_by_account:
return
for d in details:
log_evidence = False
try:
qid = d.get('queueId') or d.get('messageId')
if not qid or qid in enforce_account_limits.seen_ids:
continue
account = extract_account_from_detail(d)
# Resolve owner from sender email (prefer) else map account_ref to owner key
sender_email = _extract_email(d.get('sender') or '') or _extract_email(account) or ''
owner_key = _resolve_owner_from_email(sender_email) or _resolve_limit_account_key(account)
if owner_key not in lim_by_account:
continue
# Mark first seen and evaluate window
enforce_account_limits.seen_ids.add(qid)
max_emails, per_seconds = lim_by_account[owner_key]
dq = enforce_account_limits.account_windows[owner_key]
# prune old timestamps
cutoff = now_ts - per_seconds
while dq and dq[0] < cutoff:
dq.popleft()
dq.append(now_ts)
if len(dq) > max_emails:
# Over limit: act on this message immediately
print(f"[agent] Limit exceeded for {owner_key}: {len(dq)}/{max_emails} in {per_seconds}s. Freezing/holding {qid}.")
if mail_server == 'exim':
freeze_message_exim(qid)
elif mail_server == 'postfix':
hold_message_postfix(qid)
# Also add to hold lists for sustained blocking
reason_text = f"Exceeded {max_emails} in {per_seconds}s (count={len(dq)})"
log_evidence = _has_recent_log_events(owner_key, sender_email, per_seconds)
if not log_evidence:
print(f"[agent] Queue enforcement detected over-limit for {owner_key} but no recent exim log activity; skipping persistent freeze.")
else:
hold_owner = _should_hold_owner(owner_key, sender_email)
username_for_hold = owner_key if hold_owner else ""
try:
# Local relay → freeze account only; otherwise freeze email only
add_user_to_hold_list(
username_for_hold,
None if hold_owner else sender_email,
reason=reason_text
)
add_sender_to_freeze_list(sender_email, reason=reason_text)
except Exception as e:
print(f"[agent] Failed to update hold/freeze lists: {e}")
# Inform server so reconcile won't remove our local entry
fallback_events = _fallback_events_from_queue_detail(d, sender_email, owner_key)
limit_label = f"Limit: {max_emails}"
_generate_freeze_report(account, owner_key, sender_email, reason_text, per_seconds, limit_label, sid, fallback_events if fallback_events else None)
else:
log_evidence = False
except Exception as e:
print(f"[agent] Error enforcing per-message limit: {e}")
except Exception:
pass
def check_spam_thresholds(sid: int, stats: dict, config: dict):
"""Check if spam thresholds are exceeded and send alerts"""
try:
spam_threshold = config.get("spamThreshold", 5.0)
high_spam_count = 0
for spam_score in stats["spamScores"]:
if spam_score["score"] >= spam_threshold:
high_spam_count += 1
if high_spam_count > 0:
alert_payload = {
"serverId": str(sid),
"alertType": "HIGH_SPAM_SCORE",
"severity": "WARNING" if high_spam_count < 10 else "CRITICAL",
"message": f"Found {high_spam_count} messages with spam score >= {spam_threshold}",
"detailsJson": json.dumps({
"highSpamCount": high_spam_count,
"threshold": spam_threshold,
"samples": stats["spamScores"][:5] # First 5 samples
})
}
resp = session.post(f"{BASE_URL}/api/osm/agent/spam-alert", json=alert_payload, headers=DEFAULT_HEADERS, timeout=10)
resp.raise_for_status()
except Exception as e:
print(f"[agent] Error checking spam thresholds: {e}")
def _execute_queue_action_delete(ids):
try:
ok = 0
for qid in ids:
if not qid:
continue
try:
res = subprocess.run(['exim', '-Mrm', qid], capture_output=True, text=True, timeout=10)
if res.returncode == 0:
ok += 1
else:
print(f"[agent] exim -Mrm {qid} failed: rc={res.returncode} stdout={res.stdout.strip()} stderr={res.stderr.strip()}")
except Exception as e:
print(f"[agent] delete_queue exception for {qid}: {e}")
continue
return ok
except Exception as e:
print(f"[agent] delete_queue failed: {e}")
return 0
def _execute_queue_action_thaw(ids):
try:
ok = 0
for qid in ids:
if not qid:
continue
try:
res = subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
if res.returncode == 0:
ok += 1
except Exception:
continue
return ok
except Exception as e:
print(f"[agent] thaw_queue failed: {e}")
return 0
def _execute_queue_action_release(ids):
"""Release a frozen message: thaw and then schedule delivery."""
try:
ok = 0
for qid in ids:
if not qid:
continue
try:
# Thaw first
subprocess.run(['exim', '-Mt', qid], capture_output=True, text=True, timeout=10)
# Then attempt immediate delivery
res = subprocess.run(['exim', '-M', qid], capture_output=True, text=True, timeout=15)
if res.returncode == 0:
ok += 1
except Exception:
continue
return ok
except Exception as e:
print(f"[agent] release_queue failed: {e}")
return 0
def process_sse_event(event_data: str, sid: int):
"""Process a single SSE event containing a command.
Note: Only events that include a non-empty 'type' are considered commands.
"""
try:
cmd = json.loads(event_data)
cid = cmd.get('id') or ''
ctype = (cmd.get('type') or '').strip().lower()
payload_json = cmd.get('payloadJson') or '{}'
if not ctype:
# Ignore non-command SSE payloads (e.g., queue-update snapshots)
return
print(f"[agent] SSE event received: id={cid} type={ctype}")
result_msg = ''
try:
if isinstance(payload_json, (dict, list)):
payload = payload_json
else:
payload = json.loads(payload_json)
except Exception:
payload = {}
ids = payload.get('queueIds') or []
if ctype == 'delete_queue':
n = _execute_queue_action_delete(ids)
result_msg = f"deleted {n}/{len(ids)}"
print(f"[agent] delete_queue: {result_msg}")
elif ctype == 'thaw_queue':
n = _execute_queue_action_thaw(ids)
result_msg = f"thawed {n}/{len(ids)}"
print(f"[agent] thaw_queue: {result_msg}")
elif ctype == 'release_queue':
n = _execute_queue_action_release(ids)
result_msg = f"released {n}/{len(ids)}"
print(f"[agent] release_queue: {result_msg}")
elif ctype == 'snapshot_queue':
# Just send a fresh snapshot
try:
cfg = fetch_config(sid) or {}
if not QUEUE_INGEST_ENABLED or not cfg.get("queueIngestEnabled", True):
result_msg = "snapshot skipped (queue ingest disabled)"
print("[agent] snapshot_queue: skipped, queue ingest disabled")
else:
stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
send_queue_snapshot(sid, stats)
result_msg = "snapshot sent"
print("[agent] snapshot_queue: snapshot sent")
except Exception as e:
result_msg = f"snapshot failed: {e}"
print(f"[agent] snapshot_queue failed: {e}")
elif ctype == 'verify_email':
try:
payload = json.loads(payload_json)
except Exception:
payload = {}
email = (payload.get('email') or '').strip().lower()
exists = False
owner = ''
def _directadmin_mailbox_exists(addr: str) -> bool:
try:
if '@' not in addr:
return False
localpart, domain = addr.split('@', 1)
domain = domain.strip().lower().rstrip('.')
localpart = localpart.strip().lower()
passwd_path = Path(f"/etc/virtual/{domain}/passwd")
if not passwd_path.exists():
return False
try:
for line in passwd_path.read_text(encoding='utf-8', errors='ignore').splitlines():
# DirectAdmin passwd format: localpart:password_hash:...
if not line or ':' not in line:
continue
lp = line.split(':', 1)[0].strip().lower()
if lp == localpart:
return True
except Exception:
return False
return False
except Exception:
return False
try:
# Resolve owner by domain mapping
owner = _resolve_owner_from_email(email) or ''
# Check if any queued or recent message references this sender
qstats = get_mail_queue_stats('/var/spool/postfix')
for d in qstats.get('queueDetails', []):
s = (d.get('sender') or '').lower()
if email and email in s:
exists = True
break
# If not in queue, check mailbox existence on DirectAdmin
if not exists:
if detect_control_panel() == 'directadmin':
if _directadmin_mailbox_exists(email):
exists = True
except Exception:
pass
result_msg = f"exists={'true' if exists else 'false'};owner={owner}"
elif ctype == 'update_limits_cache':
try:
payload = json.loads(payload_json)
except Exception:
payload = {}
# Validate and write to cache
d = payload.get('default') or {}
accs = payload.get('accounts') or []
if isinstance(accs, list):
clean_accs = []
for x in accs:
try:
clean_accs.append({
'account': str((x.get('account') if isinstance(x, dict) else '') or '').strip(),
'maxEmails': int((x.get('maxEmails') if isinstance(x, dict) else 0) or 0),
'perSeconds': int((x.get('perSeconds') if isinstance(x, dict) else 0) or 0),
'enabled': bool((x.get('enabled') if isinstance(x, dict) else True))
})
except Exception:
continue
else:
clean_accs = []
cache = {
'default': {
'enabled': bool(d.get('enabled') or False),
'maxEmails': int(d.get('maxEmails') or 0),
'perSeconds': int(d.get('perSeconds') or 0)
},
'accounts': clean_accs
}
_write_limits_cache(cache)
# Apply defaults immediately in-memory
try:
global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
DEFAULT_MAX_EMAILS = cache['default']['maxEmails']
DEFAULT_PER_SECONDS = cache['default']['perSeconds']
DEFAULT_ENABLED = cache['default']['enabled']
except Exception:
pass
result_msg = 'limits updated'
print("[agent] update_limits_cache: limits updated")
elif ctype == 'reconcile_freeze':
try:
# Server-initiated reconcile: prefer the server state to avoid re-adding a user just un-frozen from the app
reconcile_freeze_file_with_server(sid, prefer_server=True)
result_msg = 'reconciled (server-wins)'
except Exception as e:
result_msg = f'reconcile failed: {e}'
print(f"[agent] reconcile_freeze: {result_msg}")
elif ctype == 'self_update':
# Perform self-update in a detached shell so we can ack immediately
try:
print("[agent] self_update: scheduling self-update...")
update_sh = f"""#!/usr/bin/env bash
set -euo pipefail
APP_DIR=/opt/osm
TMP_ZIP=$(mktemp)
echo "[agent] Starting self-update from {BASE_URL}"
curl -fsSL "{BASE_URL}/osm/agent/download" -o "$TMP_ZIP" || {{ echo "Download failed"; exit 1; }}
echo "[agent] Downloaded agent package"
mkdir -p "$APP_DIR"
unzip -o "$TMP_ZIP" -d "$APP_DIR" || {{ echo "Unzip failed"; exit 1; }}
rm -f "$TMP_ZIP"
echo "[agent] Extracted to $APP_DIR"
# Set up Python environment
if [ ! -x "$APP_DIR/venv/bin/python" ]; then
echo "[agent] Creating Python virtual environment"
if [ -x "/opt/osm/py310/bin/python3.10" ]; then PY_BIN="/opt/osm/py310/bin/python3.10";
elif command -v python3.10 >/dev/null 2>&1; then PY_BIN=$(command -v python3.10);
elif [ -x /usr/bin/python3.10 ]; then PY_BIN=/usr/bin/python3.10;
elif [ -x /usr/local/bin/python3.10 ]; then PY_BIN=/usr/local/bin/python3.10;
else PY_BIN=$(command -v python3 || echo /usr/bin/python3); fi
"$PY_BIN" -m venv "$APP_DIR/venv" || {{ echo "Virtual environment creation failed"; exit 1; }}
fi
echo "[agent] Updating Python packages"
"$APP_DIR/venv/bin/pip" install --upgrade pip || true
"$APP_DIR/venv/bin/pip" install --upgrade requests psutil || echo "Package update warning"
echo "[agent] Setting permissions"
chmod +x "$APP_DIR/uninstall-agent.sh" 2>/dev/null || true
chmod +x "$APP_DIR/osm" 2>/dev/null || true
ln -sf "$APP_DIR/osm" /usr/local/bin/osm 2>/dev/null || true
echo "[agent] Restarting agent service"
systemctl restart osm 2>/dev/null || {{
echo "[agent] Systemd restart failed, trying direct restart"
pkill -f "python.*main.py" 2>/dev/null || true
sleep 2
cd "$APP_DIR" && nohup "$APP_DIR/venv/bin/python" main.py >/dev/null 2>&1 &
}}
echo "[agent] Self-update completed successfully"
"""
# Write and run
upd = Path('/tmp/osm_self_update.sh')
upd.write_text(update_sh)
os.chmod(str(upd), 0o755)
# Run in background with proper logging
subprocess.Popen(['nohup', 'bash', str(upd), '&>', '/tmp/osm_update.log', '&'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
result_msg = 'update scheduled'
print(f"[agent] Self-update scheduled, check /tmp/osm_update.log for details")
except Exception as e:
result_msg = f'update failed: {e}'
print(f"[agent] Self-update error: {e}")
elif ctype == 'uninstall':
# Execute uninstall script
try:
uninstall_script = Path(AGENT_DIR) / 'uninstall-agent.sh'
if uninstall_script.exists():
# Make sure it's executable
os.chmod(str(uninstall_script), 0o755)
# Run it in background so we can ack first
subprocess.Popen(['nohup', 'bash', str(uninstall_script)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
result_msg = 'uninstall started'
else:
result_msg = 'uninstall script not found'
except Exception as e:
result_msg = f'uninstall failed: {e}'
if cid:
try:
session.post(f"{BASE_URL}/api/osm/agent/command-ack", json={
"serverId": str(sid),
"commandId": cid,
"resultMessage": result_msg
}, headers=DEFAULT_HEADERS, timeout=10)
print(f"[agent] Acked command id={cid} result='{result_msg}'")
# Send a fresh queue snapshot so UI can update immediately
if QUEUE_INGEST_ENABLED:
try:
cfg = fetch_config(sid) or {}
if cfg.get("queueIngestEnabled", True):
stats = get_mail_queue_stats(cfg.get("mailQueuePath", "/var/spool/postfix"))
send_queue_snapshot(sid, stats)
except Exception as e:
print(f"[agent] Failed to send immediate queue snapshot: {e}")
except Exception:
pass
except Exception as e:
print(f"[agent] Error processing SSE event: {e}")
def listen_for_commands_sse(sid: int):
"""Listen for real-time commands via Server-Sent Events.
Parses SSE frames and only processes 'command' events.
"""
print(f"[agent] Starting SSE command listener for server {sid}")
while True:
try:
url = f"{BASE_URL}/api/osm/agent/queue-stream/{sid}"
sse_headers = dict(DEFAULT_HEADERS)
sse_headers.setdefault('Accept', 'text/event-stream')
response = session.get(url, headers=sse_headers, stream=True, timeout=None)
response.raise_for_status()
print(f"[agent] Connected to SSE stream at {url}")
event_name = None
data_lines = []
for raw in response.iter_lines(decode_unicode=True):
if raw is None:
continue
line = raw.rstrip('\r')
if not line:
# dispatch accumulated event
if data_lines:
data_payload = '\n'.join(data_lines)
if (event_name or '').strip().lower() in ('', 'command'):
try:
process_sse_event(data_payload, sid)
except Exception as e:
print(f"[agent] SSE event dispatch error: {e}")
event_name = None
data_lines = []
continue
# field parsing
if line.startswith('event:'):
event_name = line[6:].strip()
elif line.startswith('data:'):
# tolerate optional leading space
val = line[5:]
if val.startswith(' '):
val = val[1:]
data_lines.append(val)
else:
# ignore other fields (id:, retry:, etc)
pass
except Exception as e:
print(f"[agent] SSE connection error: {e}")
time.sleep(5) # Wait before reconnecting
SSE_LISTENER_THREAD = None
def start_sse_listener_thread(sid: int):
"""Start the SSE listener thread for real-time command delivery."""
global SSE_LISTENER_THREAD
if SSE_LISTENER_THREAD and SSE_LISTENER_THREAD.is_alive():
return
SSE_LISTENER_THREAD = threading.Thread(target=listen_for_commands_sse, args=(sid,), daemon=True)
SSE_LISTENER_THREAD.start()
print(f"[agent] SSE listener thread started for server {sid}")
# ---------- CLI Helpers ----------
def _cli_build_empty_stats():
return {
"totalMessages": 0,
"activeMessages": 0,
"deferredMessages": 0,
"holdMessages": 0,
"incomingMessages": 0,
"outgoingMessages": 0,
"queueDetails": [],
"spamScores": [],
"blockedDomains": []
}
def _cli_load_queue_matches(sender_email: str | None = None, account: str | None = None):
stats = get_exim_queue_stats(_cli_build_empty_stats())
matches = []
target_email = (sender_email or '').strip().lower()
target_account = (account or '').strip().lower()
for detail in stats.get("queueDetails", []):
sender = (detail.get("sender") or "").lower()
recipients = [r.lower() for r in detail.get("recipients") or []]
owner = (detail.get("ownerAccount") or "").lower()
if target_email:
if target_email == sender or any(target_email == r for r in recipients):
matches.append(detail)
elif target_account:
if owner == target_account or target_account in sender:
matches.append(detail)
return matches
def _cli_print_queue_summary(matches):
if not matches:
print("No queued messages found.")
return
print(f"Found {len(matches)} queued message(s):")
for item in matches:
qid = item.get("queueId") or item.get("messageId")
subject = item.get("subject") or ""
sender = item.get("sender") or ""
recipient = ", ".join(item.get("recipients") or [])
print(f" - {qid}: from {sender} to {recipient} | {subject}")
def _cli_handle_queue_action(matches, choice):
qids = [item.get("queueId") or item.get("messageId") for item in matches if item.get("queueId") or item.get("messageId")]
qids = [qid for qid in qids if qid]
if not qids:
return None, []
if choice == "release":
released = _execute_queue_action_release(qids)
print(f"Released {released}/{len(qids)} message(s) from queue.")
elif choice == "delete":
deleted = _execute_queue_action_delete(qids)
print(f"Deleted {deleted}/{len(qids)} message(s) from queue.")
return choice if choice in {"release", "delete"} else None, qids
def _cli_command_freeze(args):
target_email = args.email
target_account = args.account
if not target_email and not target_account:
print("Specify --email or --account to freeze.")
return 1
reason = args.reason or "CLI freeze"
resolved_account = target_account or ""
sender_for_record = target_email or target_account or ""
if target_email:
add_user_to_hold_list("", target_email, reason=reason)
add_sender_to_freeze_list(target_email, reason=reason)
print(f"Frozen email address: {target_email}")
if target_account:
add_user_to_hold_list(target_account, reason=reason)
print(f"Frozen account: {target_account}")
_queue_action({
"type": "freeze",
"senderEmail": sender_for_record,
"account": resolved_account,
"reason": reason,
"timestamp": time.time(),
})
return 0
def _cli_command_unfreeze(args):
target_email = args.email
target_account = args.account
if not target_email and not target_account:
print("Specify --email or --account to unfreeze.")
return 1
matches = _cli_load_queue_matches(sender_email=target_email, account=target_account)
_cli_print_queue_summary(matches)
queue_choice = None
if matches:
if args.release and args.delete:
print("Specify only one of --release or --delete.")
return 1
if args.release:
queue_choice = "release"
elif args.delete:
queue_choice = "delete"
else:
print("Queue action options:")
print(" 1) Unfreeze only")
print(" 2) Unfreeze and release queued emails")
print(" 3) Unfreeze and delete queued emails")
selected = input("Select option [1/2/3]: ").strip()
queue_choice = {"1": None, "2": "release", "3": "delete"}.get(selected, None)
queue_ids = []
if target_email:
remove_user_from_hold_list(sender_email=target_email)
remove_sender_from_freeze_list(target_email)
print(f"Unfrozen email address: {target_email}")
if target_account:
remove_user_from_hold_list(username=target_account)
print(f"Unfrozen account: {target_account}")
if queue_choice:
queue_choice, queue_ids = _cli_handle_queue_action(matches, queue_choice)
resolved_account = target_account or ""
sender_for_record = target_email or target_account or ""
_queue_action({
"type": "unfreeze",
"senderEmail": sender_for_record,
"account": resolved_account,
"queueAction": queue_choice or "none",
"queueIds": queue_ids,
"timestamp": time.time(),
})
return 0
def _cli_command_limit_set(args):
account = args.account.strip()
max_emails = int(args.max)
per_seconds = int(args.per)
enabled = not args.disable
set_account_limit_local(account, max_emails, per_seconds, enabled)
_queue_action({
"type": "set_limit",
"account": account,
"maxEmails": max_emails,
"perSeconds": per_seconds,
"enabled": enabled,
"timestamp": time.time(),
})
print(f"Set limit for account '{account}': {max_emails} emails / {per_seconds}s (enabled={enabled})")
return 0
def _cli_command_limit_remove(args):
account = args.account.strip()
remove_account_limit_local(account)
_queue_action({
"type": "remove_limit",
"account": account,
"timestamp": time.time(),
})
print(f"Removed limit for account '{account}'.")
return 0
def _cli_command_limit_default(args):
max_emails = int(args.max)
per_seconds = int(args.per)
enabled = not args.disable
set_default_limit_local(max_emails, per_seconds, enabled)
_queue_action({
"type": "set_default_limit",
"maxEmails": max_emails,
"perSeconds": per_seconds,
"enabled": enabled,
"timestamp": time.time(),
})
state = "enabled" if enabled else "disabled"
print(f"Default limit {state}: {max_emails} emails / {per_seconds}s")
return 0
def _cli_ensure_enrolled():
sid = _get_current_server_id()
if not sid:
print("Warning: agent server ID not known yet. CLI changes will apply locally and sync once enrollment succeeds.")
def _cli_build_parser():
parser = argparse.ArgumentParser(prog="osm", description="OSM Agent CLI")
subparsers = parser.add_subparsers(dest="command")
freeze_parser = subparsers.add_parser("freeze", help="Freeze an email address or account")
freeze_target = freeze_parser.add_mutually_exclusive_group(required=True)
freeze_target.add_argument("--email", help="Email address to freeze")
freeze_target.add_argument("--account", help="Hosting account to freeze")
freeze_parser.add_argument("--reason", help="Reason for freeze")
freeze_parser.set_defaults(func=_cli_command_freeze)
unfreeze_parser = subparsers.add_parser("unfreeze", help="Unfreeze an email address or account")
unfreeze_target = unfreeze_parser.add_mutually_exclusive_group(required=True)
unfreeze_target.add_argument("--email", help="Email address to unfreeze")
unfreeze_target.add_argument("--account", help="Hosting account to unfreeze")
unfreeze_parser.add_argument("--release", action="store_true", help="Release queued emails after unfreeze")
unfreeze_parser.add_argument("--delete", action="store_true", help="Delete queued emails after unfreeze")
unfreeze_parser.set_defaults(func=_cli_command_unfreeze)
limit_parser = subparsers.add_parser("limit", help="Manage per-account or default limits")
limit_sub = limit_parser.add_subparsers(dest="limit_command")
limit_set = limit_sub.add_parser("set", help="Set/update limit for an account")
limit_set.add_argument("--account", required=True, help="Hosting account name")
limit_set.add_argument("--max", required=True, type=int, help="Max emails in window")
limit_set.add_argument("--per", required=True, type=int, help="Time window in seconds")
limit_set.add_argument("--disable", action="store_true", help="Disable (but retain values)")
limit_set.set_defaults(func=_cli_command_limit_set)
limit_remove = limit_sub.add_parser("remove", help="Remove limit for an account")
limit_remove.add_argument("--account", required=True, help="Hosting account name")
limit_remove.set_defaults(func=_cli_command_limit_remove)
limit_default = limit_sub.add_parser("default", help="Set default (server-wide) limit")
limit_default.add_argument("--max", required=True, type=int, help="Max emails in window")
limit_default.add_argument("--per", required=True, type=int, help="Time window in seconds")
limit_default.add_argument("--disable", action="store_true", help="Disable the default limit")
limit_default.set_defaults(func=_cli_command_limit_default)
return parser
def run_cli(argv=None):
_cli_ensure_enrolled()
parser = _cli_build_parser()
args = parser.parse_args(argv)
if not hasattr(args, "func"):
parser.print_help()
return 1
return args.func(args)
def main():
"""Main agent loop"""
global CURRENT_THRESHOLDS
print("[agent] Starting OSM agent...")
try:
sid = None
if BASE_URL and TOKEN:
try:
sid = enroll()
try:
global CURRENT_SERVER_ID
CURRENT_SERVER_ID = sid
except Exception:
pass
try:
_save_agent_state({"serverId": str(sid)})
except Exception:
pass
print(f"[agent] Enrolled with server ID: {sid}")
# Report agent version to server
try:
session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
except Exception as e:
print(f"[agent] Failed to report version: {e}")
# Start SSE listener for real-time commands
start_sse_listener_thread(sid)
except Exception as e:
print(f"[agent] Enrollment failed, continuing offline: {e}")
try:
cached_sid = _get_current_server_id()
if cached_sid:
sid = cached_sid
CURRENT_SERVER_ID = cached_sid
# Start SSE listener with cached server ID
start_sse_listener_thread(sid)
except Exception:
pass
else:
try:
cached_sid = _get_current_server_id()
if cached_sid:
sid = cached_sid
CURRENT_SERVER_ID = cached_sid
# Start SSE listener with cached server ID
start_sse_listener_thread(sid)
except Exception:
pass
# Load config (online or cached)
if sid is not None:
config = fetch_config(sid)
else:
d = _read_limits_cache().get("default", {})
config = {
"mailQueuePath": "/var/spool/postfix",
"checkInterval": 30,
"spamThreshold": 5.0,
"blockedDomainsJson": "[]",
"alertThresholdsJson": "{}",
"defaultMaxEmails": int(d.get('maxEmails') or 0),
"defaultPerSeconds": int(d.get('perSeconds') or 0),
"defaultEnabled": bool(d.get('enabled') or False)
}
print(f"[agent] Using config: {config}")
# Apply default server-wide limit settings from config
def apply_defaults_from_config(cfg: dict):
try:
global DEFAULT_MAX_EMAILS, DEFAULT_PER_SECONDS, DEFAULT_ENABLED
DEFAULT_MAX_EMAILS = int(cfg.get('defaultMaxEmails') or 0)
DEFAULT_PER_SECONDS = int(cfg.get('defaultPerSeconds') or 0)
DEFAULT_ENABLED = bool(cfg.get('defaultEnabled') or False)
print(f"[agent] Default limit: enabled={DEFAULT_ENABLED} max={DEFAULT_MAX_EMAILS} per={DEFAULT_PER_SECONDS}s")
except Exception as e:
print(f"[agent] Failed to apply default limit from config: {e}")
apply_defaults_from_config(config)
# Send accounts snapshot immediately on startup
if sid is not None:
try:
send_accounts_snapshot(sid)
except Exception as e:
print(f"[agent] Initial accounts snapshot failed: {e}")
try:
# Prime local limits cache so log parsing can enforce account limits immediately
fetch_account_limits(sid)
except Exception as e:
print(f"[agent] Initial account limits fetch failed: {e}")
# Initialize thresholds and start near-real-time Exim monitor
initial_thresholds = parse_thresholds(config)
try:
global CURRENT_THRESHOLDS
with THRESHOLD_LOCK:
CURRENT_THRESHOLDS = list(initial_thresholds)
except Exception:
pass
try:
process_exim_logs(initial_thresholds, sid)
write_tracking_db(initial_thresholds)
except Exception as e:
print(f"[agent] Initial Exim scan failed: {e}")
_ensure_exim_monitor_thread()
loop_count = 0
while True:
try:
loop_count += 1
print(f"[agent] Starting loop iteration {loop_count}")
# Periodically confirm version and liveness
if sid is not None and loop_count % 5 == 0:
try:
session.post(f"{BASE_URL}/api/osm/agent/hello", json={"serverId": str(sid), "version": "v1.2"}, headers=DEFAULT_HEADERS, timeout=10)
except Exception as e:
print(f"[agent] Version hello failed: {e}")
_process_pending_actions()
_flush_pending_reports()
# Keep freeze state aligned after pushing any queued actions
if sid is not None:
reconcile_freeze_file_with_server(sid)
_sync_reports_with_server()
# Periodically refresh config so default limits can change without restart
if sid is not None and (loop_count % 3) == 0:
try:
config = fetch_config(sid)
apply_defaults_from_config(config)
new_thresholds = parse_thresholds(config)
try:
with THRESHOLD_LOCK:
CURRENT_THRESHOLDS = list(new_thresholds)
except Exception:
pass
except Exception as e:
print(f"[agent] Config refresh failed: {e}")
stats = None
if QUEUE_INGEST_ENABLED and config.get("queueIngestEnabled", True):
stats = get_mail_queue_stats(config["mailQueuePath"])
print(f"[agent] Got stats: {stats['totalMessages']} total messages")
if sid is not None:
send_queue_snapshot(sid, stats)
enforce_account_limits(sid, stats)
else:
print("[agent] Queue ingestion disabled; skipping queue stats collection")
# Periodically send accounts snapshot (every 5 loops)
if sid is not None and loop_count % 5 == 1:
send_accounts_snapshot(sid)
# Check for spam alerts
if sid is not None and stats is not None:
check_spam_thresholds(sid, stats, config)
if stats is not None:
print(f"[agent] Queue stats: {stats['totalMessages']} total, "
f"{stats['activeMessages']} active, "
f"{stats['deferredMessages']} deferred, "
f"{len(stats['spamScores'])} spam scores")
else:
print("[agent] Queue stats unavailable (ingest disabled)")
# Sleep for configured interval
print(f"[agent] Sleeping for {config.get('checkInterval', 30)} seconds...")
time.sleep(config.get("checkInterval", 30))
except Exception as e:
print(f"[agent] Error in main loop: {e}")
time.sleep(30)
except Exception as e:
print(f"[agent] Fatal error: {e}")
time.sleep(30)
return
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] not in {"agent", "--agent"}:
sys.exit(run_cli(sys.argv[1:]))
main()