| Server IP : 213.136.93.164 / Your IP : 216.73.216.104 Web Server : Apache System : Linux m14200.contabo.net 5.14.0-611.54.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Wed May 6 18:03:03 EDT 2026 x86_64 User : ki692510 ( 1047) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/plugins/ |
Upload File : |
"""
Feature flags synchronisation plugin (AV mode only).
In IM360 mode the Go resident-agent handles feature-flag sync.
In AV mode there is no resident-agent, so this plugin takes over.
Periodically POSTs the local file checksum to the API and writes
back any updated flags to ``/var/imunify360/feature_flags.json`` (legacy map
``{flag: true}`` on disk) and ``/var/imunify360/feature_flags`` (plain names,
one per line). The POSTed checksum is over the canonical JSON **array** of
enabled names, matching the correlation sync API—not over the on-disk map bytes.
"""
import asyncio
import json
import logging
import os
import urllib.error
import urllib.request
from defence360agent.contracts.config import Core
from defence360agent.contracts.plugins import MessageSource
from defence360agent.internals.feature_flags import (
FLAGS_PATH,
FLAGS_PLAIN_PATH,
enabled_flag_names_sorted,
plain_text_payload_for_enabled_flags,
serialize_feature_flags_file_payload,
sync_checksum_hex_from_flags_file,
sync_response_file_bytes,
)
from defence360agent.internals.iaid import (
IAIDTokenError,
IndependentAgentIDAPI,
)
from defence360agent.utils import Scope, atomic_rewrite
logger = logging.getLogger(__name__)
_SYNC_URL = "/api/sync/v1/feature-flags"
def _env_int(name: str, default: int) -> int:
"""Read an int env var tolerantly.
A non-numeric value (empty string, typo, etc.) must NOT raise at
import time — the plugin lives in the AV agent entry point and a
bad env var would otherwise kill the whole agent.
"""
raw = os.environ.get(name)
if not raw:
return default
try:
return int(raw)
except ValueError:
logger.warning(
"feature-flags: %s=%r is not an int, using default %d",
name,
raw,
default,
)
return default
_TRUE_VALUES = frozenset({"1", "true", "yes", "on"})
_FALSE_VALUES = frozenset({"0", "false", "no", "off"})
def _env_bool(name: str, default: bool) -> bool:
raw = os.environ.get(name)
if not raw:
return default
normalized = raw.strip().lower()
if normalized in _TRUE_VALUES:
return True
if normalized in _FALSE_VALUES:
return False
logger.warning(
"feature-flags: %s=%r is not a bool, using default %s",
name,
raw,
default,
)
return default
_SYNC_INTERVAL = _env_int("I360_FEATURE_FLAGS_SYNC_INTERVAL", 3600)
_INITIAL_DELAY = _env_int("I360_FEATURE_FLAGS_INIT_DELAY", 10)
_UNREGISTERED_DELAY = _env_int("I360_FEATURE_FLAGS_UNREG_DELAY", 30)
_USE_SERVER_DELAY = _env_bool("I360_FEATURE_FLAGS_USE_SERVER_DELAY", True)
_HTTP_TIMEOUT = 30
def _next_delay(server_delay: int) -> int:
if _USE_SERVER_DELAY and server_delay > 0:
return server_delay
return _SYNC_INTERVAL
class FeatureFlagsSync(MessageSource):
SCOPE = Scope.AV
async def create_source(self, loop, sink):
self._loop = loop
self._sink = sink
self._task = loop.create_task(self._sync_loop())
async def shutdown(self):
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
def _local_checksum(self) -> str:
return sync_checksum_hex_from_flags_file(FLAGS_PATH)
async def _sync_loop(self):
await asyncio.sleep(_INITIAL_DELAY)
while True:
delay = _SYNC_INTERVAL
try:
if not IndependentAgentIDAPI.is_registered():
delay = _UNREGISTERED_DELAY
else:
delay = _next_delay(await self._do_sync())
except asyncio.CancelledError:
raise
except Exception:
logger.warning("feature flags sync failed", exc_info=True)
await asyncio.sleep(delay)
async def _do_sync(self) -> int:
try:
token = await IndependentAgentIDAPI.get_token()
except IAIDTokenError:
logger.warning("no IAID token, skipping feature flags sync")
return 0
loop = asyncio.get_event_loop()
checksum = await loop.run_in_executor(None, self._local_checksum)
payload = json.dumps({"checksum": checksum}).encode()
base_url = os.getenv("I360_FEATURE_FLAGS_API_URL", Core.API_BASE_URL)
url = base_url.rstrip("/") + _SYNC_URL
req = urllib.request.Request(
url,
data=payload,
headers={
"Content-Type": "application/json",
"X-Auth": token,
},
method="POST",
)
try:
resp_body = await loop.run_in_executor(
None, self._blocking_request, req
)
except urllib.error.HTTPError as e:
# Non-5xx (404/403/4xx) is usually a server-side routing or
# auth state, not an agent bug — keep it a one-line WARNING.
# 5xx means the server actually misbehaved; keep the traceback.
if 500 <= e.code < 600:
logger.error(
"feature flags sync HTTP %s on %s: %s",
e.code,
url,
e.reason,
exc_info=e,
)
else:
logger.warning(
"feature flags sync HTTP %s on %s: %s",
e.code,
url,
e.reason,
)
return 0
except urllib.error.URLError as e:
# DNS, connection refused, TLS, timeout — transient network
# conditions, not bugs. One-line WARNING so logs stay readable.
logger.warning(
"feature flags sync connection failed on %s: %s",
url,
e.reason,
)
return 0
except Exception:
logger.error(
"feature flags sync request failed on %s",
url,
exc_info=True,
)
return 0
try:
result = json.loads(resp_body)
except json.JSONDecodeError:
logger.error("failed to parse feature flags response")
return 0
server_delay = result.get("delay", 0)
if result.get("changed") is False:
logger.debug("feature flags unchanged, skipping write")
return server_delay
flags = result.get("flags")
params = result.get("params") or {}
if flags is not None:
await loop.run_in_executor(None, self._write_flags, flags, params)
return server_delay
@staticmethod
def _blocking_request(req: urllib.request.Request) -> bytes:
with urllib.request.urlopen(req, timeout=_HTTP_TIMEOUT) as resp:
return resp.read()
@staticmethod
def _write_flags(flags, params=None) -> None:
"""Persist flags + params on disk in the canonical sync-response
shape so the next sync's checksum matches what the server returned.
Falls back to the legacy ``{name: true}`` map when ``flags`` is not
a list (response shape we don't recognise) — keeps the long-standing
on-disk contract from older code paths.
"""
params = params or {}
try:
if isinstance(flags, list):
names = [n for n in flags if isinstance(n, str)]
cleaned = {
name: [v for v in vals if isinstance(v, str)]
for name, vals in params.items()
if isinstance(name, str) and isinstance(vals, list)
}
data = sync_response_file_bytes(names, cleaned)
else:
data = serialize_feature_flags_file_payload(flags)
except TypeError:
logger.warning(
"feature flags sync: unexpected flags type %r, skipping write",
type(flags).__name__,
)
return
n_active = len(enabled_flag_names_sorted(flags))
try:
os.makedirs(os.path.dirname(FLAGS_PATH), exist_ok=True)
# Atomic write-to-temp + rename so a crash mid-write can't
# leave the flags file truncated/corrupt — otherwise readers
# would fall back to defaults until the next sync.
atomic_rewrite(FLAGS_PATH, data, backup=False)
plain = plain_text_payload_for_enabled_flags(flags)
atomic_rewrite(FLAGS_PLAIN_PATH, plain, backup=False)
logger.info("feature flags synced: %d flags active", n_active)
except OSError:
logger.error("failed to write flags file", exc_info=True)